001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035import static java.util.logging.Level.FINER; 036import static java.util.logging.Level.SEVERE; 037import static java.util.logging.Level.WARNING; 038 039import com.google.common.annotations.Beta; 040import com.google.common.annotations.VisibleForTesting; 041import com.google.common.base.Function; 042import com.google.common.collect.FluentIterable; 043import com.google.common.collect.ImmutableList; 044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 046import com.google.common.util.concurrent.Futures.FutureCombiner; 047import com.google.errorprone.annotations.CanIgnoreReturnValue; 048import com.google.errorprone.annotations.DoNotMock; 049import com.google.j2objc.annotations.RetainedWith; 050import java.io.Closeable; 051import java.io.IOException; 052import java.util.IdentityHashMap; 053import java.util.Map; 054import java.util.concurrent.Callable; 055import java.util.concurrent.CancellationException; 056import java.util.concurrent.CountDownLatch; 057import java.util.concurrent.ExecutionException; 058import java.util.concurrent.Executor; 059import java.util.concurrent.Future; 060import java.util.concurrent.RejectedExecutionException; 061import java.util.concurrent.atomic.AtomicReference; 062import java.util.logging.Logger; 063import org.checkerframework.checker.nullness.compatqual.NullableDecl; 064 065/** 066 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 067 * complete, some objects captured during the computation are closed. 068 * 069 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 070 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 071 * cancellation of the operation so far. The only way to extract the value or exception from a step 072 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 073 * "value" of a successful step or the "result" (value or exception) of any step. 074 * 075 * <ol> 076 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 077 * block or a {@link ListenableFuture}. 078 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 079 * can be captured for later closing. 080 * <li>There is one last step (the root of the tree), from which you can extract the final result 081 * of the computation. After that result is available (or the computation fails), all objects 082 * captured by any of the steps in the pipeline are closed. 083 * </ol> 084 * 085 * <h3>Starting a pipeline</h3> 086 * 087 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 088 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 089 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 090 * #from(ListenableFuture)} instead. 091 * 092 * <h3>Derived steps</h3> 093 * 094 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 095 * ways similar to {@link FluentFuture}s: 096 * 097 * <ul> 098 * <li>by transforming the value from a successful input step, 099 * <li>by catching the exception from a failed input step, or 100 * <li>by combining the results of several input steps. 101 * </ul> 102 * 103 * Each derivation can capture the next value or any intermediate objects for later closing. 104 * 105 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 106 * exception, or combine it with others, you cannot do anything else with it, including declare it 107 * to be the last step of the pipeline. 108 * 109 * <h4>Transforming</h4> 110 * 111 * To derive the next step by asynchronously applying a function to an input step's value, call 112 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 113 * Executor)} on the input step. 114 * 115 * <h4>Catching</h4> 116 * 117 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 118 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 119 * 120 * <h4>Combining</h4> 121 * 122 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 123 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 124 * 125 * <h3>Cancelling</h3> 126 * 127 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 128 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 129 * successfully cancelled step will immediately start closing all objects captured for later closing 130 * by it and by its input steps. 131 * 132 * <h3>Ending a pipeline</h3> 133 * 134 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 135 * close the captured objects automatically or manually. 136 * 137 * <h4>Automatically closing</h4> 138 * 139 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 140 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin 141 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future 142 * completes before closing starts, rather than once it has finished. 143 * 144 * <pre>{@code 145 * FluentFuture<UserName> userName = 146 * ClosingFuture.submit( 147 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 148 * executor) 149 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 150 * .transform((closer, result) -> result.get("userName"), directExecutor()) 151 * .catching(DBException.class, e -> "no user", directExecutor()) 152 * .finishToFuture(); 153 * }</pre> 154 * 155 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 156 * result cursor will both be closed, even if the operation is cancelled or fails. 157 * 158 * <h4>Manually closing</h4> 159 * 160 * If you want to close the captured objects manually, after you've used the final result, call 161 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 162 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 163 * 164 * <pre>{@code 165 * ClosingFuture.submit( 166 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 167 * executor) 168 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 169 * .transform((closer, result) -> result.get("userName"), directExecutor()) 170 * .catching(DBException.class, e -> "no user", directExecutor()) 171 * .finishToValueAndCloser( 172 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 173 * 174 * // later 175 * try { // get() will throw if the operation failed or was cancelled. 176 * UserName userName = userNameValueAndCloser.get(); 177 * // do something with userName 178 * } finally { 179 * userNameValueAndCloser.closeAsync(); 180 * } 181 * }</pre> 182 * 183 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 184 * the query result cursor will both be closed, even if the operation is cancelled or fails. 185 * 186 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 187 * automatic-closing approach described above is safer. 188 * 189 * @param <V> the type of the value of this step 190 * @since 30.0 191 */ 192// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 193@Beta // @Beta for one release. 194@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 195// TODO(dpb): GWT compatibility. 196public final class ClosingFuture<V> { 197 198 private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName()); 199 200 /** 201 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 202 * done. 203 */ 204 public static final class DeferredCloser { 205 @RetainedWith private final CloseableList list; 206 207 DeferredCloser(CloseableList list) { 208 this.list = list; 209 } 210 211 /** 212 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 213 * 214 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 215 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 216 * Closeable}. (For more about the flavors, see <a 217 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 218 * build</a>.) 219 * 220 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 221 * building for Android): Ensure that any object you pass implements the interface not just in 222 * your current SDK version but also at the oldest version you support. For example, <a 223 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 224 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 225 * {@code Closeable} with a method reference like {@code cursor::close}. 226 * 227 * <p>Note that this method is still binary-compatible between flavors because the erasure of 228 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 229 * 230 * @param closeable the object to be closed (see notes above) 231 * @param closingExecutor the object will be closed on this executor 232 * @return the first argument 233 */ 234 @CanIgnoreReturnValue 235 @NullableDecl 236 // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19. 237 public <C extends Object & Closeable> C eventuallyClose( 238 @NullableDecl C closeable, Executor closingExecutor) { 239 checkNotNull(closingExecutor); 240 if (closeable != null) { 241 list.add(closeable, closingExecutor); 242 } 243 return closeable; 244 } 245 } 246 247 /** 248 * An operation that computes a result. 249 * 250 * @param <V> the type of the result 251 */ 252 public interface ClosingCallable<V extends Object> { 253 /** 254 * Computes a result, or throws an exception if unable to do so. 255 * 256 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 257 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 258 * not before this method completes), even if this method throws or the pipeline is cancelled. 259 */ 260 @NullableDecl 261 V call(DeferredCloser closer) throws Exception; 262 } 263 264 /** 265 * An operation that computes a {@link ClosingFuture} of a result. 266 * 267 * @param <V> the type of the result 268 * @since 30.1 269 */ 270 public interface AsyncClosingCallable<V extends Object> { 271 /** 272 * Computes a result, or throws an exception if unable to do so. 273 * 274 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 275 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 276 * not before this method completes), even if this method throws or the pipeline is cancelled. 277 */ 278 ClosingFuture<V> call(DeferredCloser closer) throws Exception; 279 } 280 281 /** 282 * A function from an input to a result. 283 * 284 * @param <T> the type of the input to the function 285 * @param <U> the type of the result of the function 286 */ 287 public interface ClosingFunction<T extends Object, U extends Object> { 288 289 /** 290 * Applies this function to an input, or throws an exception if unable to do so. 291 * 292 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 293 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 294 * not before this method completes), even if this method throws or the pipeline is cancelled. 295 */ 296 @NullableDecl 297 U apply(DeferredCloser closer, @NullableDecl T input) throws Exception; 298 } 299 300 /** 301 * A function from an input to a {@link ClosingFuture} of a result. 302 * 303 * @param <T> the type of the input to the function 304 * @param <U> the type of the result of the function 305 */ 306 public interface AsyncClosingFunction<T extends Object, U extends Object> { 307 /** 308 * Applies this function to an input, or throws an exception if unable to do so. 309 * 310 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 311 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 312 * not before this method completes), even if this method throws or the pipeline is cancelled. 313 */ 314 ClosingFuture<U> apply(DeferredCloser closer, @NullableDecl T input) throws Exception; 315 } 316 317 /** 318 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 319 * allows the user to close all the closeable objects that were captured during it for later 320 * closing. 321 * 322 * <p>The asynchronous operation will have completed before this object is created. 323 * 324 * @param <V> the type of the value of a successful operation 325 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 326 */ 327 public static final class ValueAndCloser<V> { 328 329 private final ClosingFuture<? extends V> closingFuture; 330 331 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 332 this.closingFuture = checkNotNull(closingFuture); 333 } 334 335 /** 336 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 337 * {@link Future#get()} would. 338 * 339 * <p>Because the asynchronous operation has already completed, this method is synchronous and 340 * returns immediately. 341 * 342 * @throws CancellationException if the computation was cancelled 343 * @throws ExecutionException if the computation threw an exception 344 */ 345 @NullableDecl 346 public V get() throws ExecutionException { 347 return getDone(closingFuture.future); 348 } 349 350 /** 351 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 352 * operation on the {@link Executor}s specified by calls to {@link 353 * DeferredCloser#eventuallyClose(Closeable, Executor)}. 354 * 355 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 356 * closed synchronously. 357 * 358 * <p>Idempotent: objects will be closed at most once. 359 */ 360 public void closeAsync() { 361 closingFuture.close(); 362 } 363 } 364 365 /** 366 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 367 * ClosingFuture} pipeline. 368 * 369 * @param <V> the type of the final value of a successful pipeline 370 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 371 */ 372 public interface ValueAndCloserConsumer<V> { 373 374 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 375 void accept(ValueAndCloser<V> valueAndCloser); 376 } 377 378 /** 379 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 380 * 381 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 382 * execution 383 */ 384 public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) { 385 return new ClosingFuture<>(callable, executor); 386 } 387 388 /** 389 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 390 * 391 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 392 * execution 393 * @since 30.1 394 */ 395 public static <V> ClosingFuture<V> submitAsync( 396 AsyncClosingCallable<V> callable, Executor executor) { 397 return new ClosingFuture<>(callable, executor); 398 } 399 400 /** 401 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 402 * 403 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 404 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 405 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 406 */ 407 public static <V> ClosingFuture<V> from(ListenableFuture<V> future) { 408 return new ClosingFuture<V>(future); 409 } 410 411 /** 412 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 413 * 414 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when 415 * the pipeline is done, even if the pipeline is canceled or fails. 416 * 417 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 418 * value in order to close it. 419 * 420 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 421 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable, 422 * Executor)}. 423 * @param closingExecutor the future's result will be closed on this executor 424 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 425 * underlying value may never be closed if the {@link Future} is canceled after its operation 426 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 427 * including those that pass them to this method, with {@link #submit(ClosingCallable, 428 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 429 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 430 * ClosingFuture#from}. 431 */ 432 @Deprecated 433 // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19. 434 public static <C extends Object & Closeable> ClosingFuture<C> eventuallyClosing( 435 ListenableFuture<C> future, final Executor closingExecutor) { 436 checkNotNull(closingExecutor); 437 final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 438 Futures.addCallback( 439 future, 440 new FutureCallback<Closeable>() { 441 @Override 442 public void onSuccess(@NullableDecl Closeable result) { 443 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 444 } 445 446 @Override 447 public void onFailure(Throwable t) {} 448 }, 449 directExecutor()); 450 return closingFuture; 451 } 452 453 /** 454 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 455 * 456 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 457 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 458 */ 459 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 460 return new Combiner(false, futures); 461 } 462 463 /** 464 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 465 * 466 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 467 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 468 */ 469 public static Combiner whenAllComplete( 470 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 471 return whenAllComplete(asList(future1, moreFutures)); 472 } 473 474 /** 475 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 476 * all succeed. If any fail, the resulting pipeline will fail. 477 * 478 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 479 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 480 */ 481 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 482 return new Combiner(true, futures); 483 } 484 485 /** 486 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 487 * they all succeed. If any fail, the resulting pipeline will fail. 488 * 489 * <p>Calling this method allows you to use lambdas or method references typed with the types of 490 * the input {@link ClosingFuture}s. 491 * 492 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 493 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 494 */ 495 public static <V1, V2> Combiner2<V1, V2> whenAllSucceed( 496 ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 497 return new Combiner2<>(future1, future2); 498 } 499 500 /** 501 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 502 * they all succeed. If any fail, the resulting pipeline will fail. 503 * 504 * <p>Calling this method allows you to use lambdas or method references typed with the types of 505 * the input {@link ClosingFuture}s. 506 * 507 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 508 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 509 */ 510 public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed( 511 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 512 return new Combiner3<>(future1, future2, future3); 513 } 514 515 /** 516 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 517 * they all succeed. If any fail, the resulting pipeline will fail. 518 * 519 * <p>Calling this method allows you to use lambdas or method references typed with the types of 520 * the input {@link ClosingFuture}s. 521 * 522 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 523 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 524 */ 525 public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed( 526 ClosingFuture<V1> future1, 527 ClosingFuture<V2> future2, 528 ClosingFuture<V3> future3, 529 ClosingFuture<V4> future4) { 530 return new Combiner4<>(future1, future2, future3, future4); 531 } 532 533 /** 534 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 535 * they all succeed. If any fail, the resulting pipeline will fail. 536 * 537 * <p>Calling this method allows you to use lambdas or method references typed with the types of 538 * the input {@link ClosingFuture}s. 539 * 540 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 541 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 542 */ 543 public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 544 ClosingFuture<V1> future1, 545 ClosingFuture<V2> future2, 546 ClosingFuture<V3> future3, 547 ClosingFuture<V4> future4, 548 ClosingFuture<V5> future5) { 549 return new Combiner5<>(future1, future2, future3, future4, future5); 550 } 551 552 /** 553 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 554 * all succeed. If any fail, the resulting pipeline will fail. 555 * 556 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 557 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 558 */ 559 public static Combiner whenAllSucceed( 560 ClosingFuture<?> future1, 561 ClosingFuture<?> future2, 562 ClosingFuture<?> future3, 563 ClosingFuture<?> future4, 564 ClosingFuture<?> future5, 565 ClosingFuture<?> future6, 566 ClosingFuture<?>... moreFutures) { 567 return whenAllSucceed( 568 FluentIterable.of(future1, future2, future3, future4, future5, future6) 569 .append(moreFutures)); 570 } 571 572 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 573 private final CloseableList closeables = new CloseableList(); 574 private final FluentFuture<V> future; 575 576 private ClosingFuture(ListenableFuture<V> future) { 577 this.future = FluentFuture.from(future); 578 } 579 580 private ClosingFuture(final ClosingCallable<V> callable, Executor executor) { 581 checkNotNull(callable); 582 TrustedListenableFutureTask<V> task = 583 TrustedListenableFutureTask.create( 584 new Callable<V>() { 585 @Override 586 public V call() throws Exception { 587 return callable.call(closeables.closer); 588 } 589 590 @Override 591 public String toString() { 592 return callable.toString(); 593 } 594 }); 595 executor.execute(task); 596 this.future = task; 597 } 598 599 private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) { 600 checkNotNull(callable); 601 TrustedListenableFutureTask<V> task = 602 TrustedListenableFutureTask.create( 603 new AsyncCallable<V>() { 604 @Override 605 public ListenableFuture<V> call() throws Exception { 606 CloseableList newCloseables = new CloseableList(); 607 try { 608 ClosingFuture<V> closingFuture = callable.call(newCloseables.closer); 609 closingFuture.becomeSubsumedInto(closeables); 610 return closingFuture.future; 611 } finally { 612 closeables.add(newCloseables, directExecutor()); 613 } 614 } 615 616 @Override 617 public String toString() { 618 return callable.toString(); 619 } 620 }); 621 executor.execute(task); 622 this.future = task; 623 } 624 625 /** 626 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 627 * future returns {@code null} if the step is successful or throws the same exception that would 628 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 629 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 630 * 631 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 632 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 633 * a derivation method <i>on the same instance</i>. This is important because calling {@code 634 * statusFuture} alone does not provide a way to close the pipeline. 635 */ 636 public ListenableFuture<?> statusFuture() { 637 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 638 } 639 640 /** 641 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 642 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 643 * when the pipeline is done. 644 * 645 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 646 * ClosingFuture} will be equivalent to this one. 647 * 648 * <p>If the function throws an exception, that exception is used as the result of the derived 649 * {@code ClosingFuture}. 650 * 651 * <p>Example usage: 652 * 653 * <pre>{@code 654 * ClosingFuture<List<Row>> rowsFuture = 655 * queryFuture.transform((closer, result) -> result.getRows(), executor); 656 * }</pre> 657 * 658 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 659 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 660 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 661 * 662 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 663 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 664 * this {@code ClosingFuture}. 665 * 666 * @param function transforms the value of this step to the value of the derived step 667 * @param executor executor to run the function in 668 * @return the derived step 669 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 670 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 671 * finished} 672 */ 673 public <U> ClosingFuture<U> transform( 674 final ClosingFunction<? super V, U> function, Executor executor) { 675 checkNotNull(function); 676 AsyncFunction<V, U> applyFunction = 677 new AsyncFunction<V, U>() { 678 @Override 679 public ListenableFuture<U> apply(V input) throws Exception { 680 return closeables.applyClosingFunction(function, input); 681 } 682 683 @Override 684 public String toString() { 685 return function.toString(); 686 } 687 }; 688 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 689 return derive(future.transformAsync(applyFunction, executor)); 690 } 691 692 /** 693 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 694 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 695 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 696 * captured by the returned {@link ClosingFuture}). 697 * 698 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 699 * returned by the function. 700 * 701 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 702 * ClosingFuture} will be equivalent to this one. 703 * 704 * <p>If the function throws an exception, that exception is used as the result of the derived 705 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 706 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 707 * closed. 708 * 709 * <p>Usage guidelines for this method: 710 * 711 * <ul> 712 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 713 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 714 * Executor)} instead, with a function that returns the next value directly. 715 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 716 * for every closeable object this step creates in order to capture it for later closing. 717 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 718 * ClosingFuture} call {@link #from(ListenableFuture)}. 719 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 720 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 721 * {@link #withoutCloser(AsyncFunction)} 722 * </ul> 723 * 724 * <p>Example usage: 725 * 726 * <pre>{@code 727 * // Result.getRowsClosingFuture() returns a ClosingFuture. 728 * ClosingFuture<List<Row>> rowsFuture = 729 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 730 * 731 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 732 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 733 * // Closeable). 734 * ClosingFuture<Integer> rowsFuture2 = 735 * queryFuture.transformAsync( 736 * (closer, result) -> { 737 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 738 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 739 * }, 740 * executor); 741 * 742 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 743 * ClosingFuture<List<Row>> rowsFuture3 = 744 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 745 * 746 * }</pre> 747 * 748 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 749 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 750 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 751 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 752 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 753 * responsible for completing the returned {@code ClosingFuture}.) 754 * 755 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 756 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 757 * this {@code ClosingFuture}. 758 * 759 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 760 * the derived step 761 * @param executor executor to run the function in 762 * @return the derived step 763 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 764 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 765 * finished} 766 */ 767 public <U> ClosingFuture<U> transformAsync( 768 final AsyncClosingFunction<? super V, U> function, Executor executor) { 769 checkNotNull(function); 770 AsyncFunction<V, U> applyFunction = 771 new AsyncFunction<V, U>() { 772 @Override 773 public ListenableFuture<U> apply(V input) throws Exception { 774 return closeables.applyAsyncClosingFunction(function, input); 775 } 776 777 @Override 778 public String toString() { 779 return function.toString(); 780 } 781 }; 782 return derive(future.transformAsync(applyFunction, executor)); 783 } 784 785 /** 786 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 787 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 788 * {@link ListenableFuture}. 789 * 790 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 791 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 792 * meets these conditions: 793 * 794 * <ul> 795 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 796 * DeferredCloser#eventuallyClose(Closeable, Executor)}. 797 * <li>It returns a {@link ListenableFuture}. 798 * </ul> 799 * 800 * <p>Example usage: 801 * 802 * <pre>{@code 803 * // Result.getRowsFuture() returns a ListenableFuture. 804 * ClosingFuture<List<Row>> rowsFuture = 805 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 806 * }</pre> 807 * 808 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 809 * ListenableFuture} with the value of a derived step 810 */ 811 public static <V, U> AsyncClosingFunction<V, U> withoutCloser( 812 final AsyncFunction<V, U> function) { 813 checkNotNull(function); 814 return new AsyncClosingFunction<V, U>() { 815 @Override 816 public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 817 return ClosingFuture.from(function.apply(input)); 818 } 819 }; 820 } 821 822 /** 823 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 824 * to its exception if it is an instance of a given exception type. The function can use a {@link 825 * DeferredCloser} to capture objects to be closed when the pipeline is done. 826 * 827 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 828 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 829 * one. 830 * 831 * <p>If the function throws an exception, that exception is used as the result of the derived 832 * {@code ClosingFuture}. 833 * 834 * <p>Example usage: 835 * 836 * <pre>{@code 837 * ClosingFuture<QueryResult> queryFuture = 838 * queryFuture.catching( 839 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 840 * }</pre> 841 * 842 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 843 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 844 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 845 * 846 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 847 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 848 * this {@code ClosingFuture}. 849 * 850 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 851 * type is matched against this step's exception. "This step's exception" means the cause of 852 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 853 * underlying this step or, if {@code get()} throws a different kind of exception, that 854 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 855 * prefer more specific types, avoiding {@code Throwable.class} in particular. 856 * @param fallback the function to be called if this step fails with the expected exception type. 857 * The function's argument is this step's exception. "This step's exception" means the cause 858 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 859 * underlying this step or, if {@code get()} throws a different kind of exception, that 860 * exception itself. 861 * @param executor the executor that runs {@code fallback} if the input fails 862 */ 863 public <X extends Throwable> ClosingFuture<V> catching( 864 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 865 return catchingMoreGeneric(exceptionType, fallback, executor); 866 } 867 868 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 869 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 870 Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 871 checkNotNull(fallback); 872 AsyncFunction<X, W> applyFallback = 873 new AsyncFunction<X, W>() { 874 @Override 875 public ListenableFuture<W> apply(X exception) throws Exception { 876 return closeables.applyClosingFunction(fallback, exception); 877 } 878 879 @Override 880 public String toString() { 881 return fallback.toString(); 882 } 883 }; 884 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 885 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 886 } 887 888 /** 889 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 890 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 891 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 892 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 893 * 894 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 895 * ClosingFuture} will be equivalent to the one returned by the function. 896 * 897 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 898 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 899 * one. 900 * 901 * <p>If the function throws an exception, that exception is used as the result of the derived 902 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 903 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 904 * closed. 905 * 906 * <p>Usage guidelines for this method: 907 * 908 * <ul> 909 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 910 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 911 * ClosingFunction, Executor)} instead, with a function that returns the next value 912 * directly. 913 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 914 * for every closeable object this step creates in order to capture it for later closing. 915 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 916 * ClosingFuture} call {@link #from(ListenableFuture)}. 917 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 918 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 919 * {@link #withoutCloser(AsyncFunction)} 920 * </ul> 921 * 922 * <p>Example usage: 923 * 924 * <pre>{@code 925 * // Fall back to a secondary input stream in case of IOException. 926 * ClosingFuture<InputStream> inputFuture = 927 * firstInputFuture.catchingAsync( 928 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 929 * } 930 * }</pre> 931 * 932 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 933 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 934 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 935 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 936 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 937 * responsible for completing the returned {@code ClosingFuture}.) 938 * 939 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 940 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 941 * this {@code ClosingFuture}. 942 * 943 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 944 * type is matched against this step's exception. "This step's exception" means the cause of 945 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 946 * underlying this step or, if {@code get()} throws a different kind of exception, that 947 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 948 * prefer more specific types, avoiding {@code Throwable.class} in particular. 949 * @param fallback the function to be called if this step fails with the expected exception type. 950 * The function's argument is this step's exception. "This step's exception" means the cause 951 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 952 * underlying this step or, if {@code get()} throws a different kind of exception, that 953 * exception itself. 954 * @param executor the executor that runs {@code fallback} if the input fails 955 */ 956 // TODO(dpb): Should this do something special if the function throws CancellationException or 957 // ExecutionException? 958 public <X extends Throwable> ClosingFuture<V> catchingAsync( 959 Class<X> exceptionType, 960 AsyncClosingFunction<? super X, ? extends V> fallback, 961 Executor executor) { 962 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 963 } 964 965 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 966 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 967 Class<X> exceptionType, 968 final AsyncClosingFunction<? super X, W> fallback, 969 Executor executor) { 970 checkNotNull(fallback); 971 AsyncFunction<X, W> asyncFunction = 972 new AsyncFunction<X, W>() { 973 @Override 974 public ListenableFuture<W> apply(X exception) throws Exception { 975 return closeables.applyAsyncClosingFunction(fallback, exception); 976 } 977 978 @Override 979 public String toString() { 980 return fallback.toString(); 981 } 982 }; 983 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 984 } 985 986 /** 987 * Marks this step as the last step in the {@code ClosingFuture} pipeline. 988 * 989 * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when 990 * the pipeline is cancelled. 991 * 992 * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously 993 * <b>after</b> the returned {@code Future} is done: the future completes before closing starts, 994 * rather than once it has finished. 995 * 996 * <p>After calling this method, you may not call {@link 997 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 998 * derivation method on this {@code ClosingFuture}. 999 * 1000 * @return a {@link Future} that represents the final value or exception of the pipeline 1001 */ 1002 public FluentFuture<V> finishToFuture() { 1003 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 1004 logger.log(FINER, "will close {0}", this); 1005 future.addListener( 1006 new Runnable() { 1007 @Override 1008 public void run() { 1009 checkAndUpdateState(WILL_CLOSE, CLOSING); 1010 close(); 1011 checkAndUpdateState(CLOSING, CLOSED); 1012 } 1013 }, 1014 directExecutor()); 1015 } else { 1016 switch (state.get()) { 1017 case SUBSUMED: 1018 throw new IllegalStateException( 1019 "Cannot call finishToFuture() after deriving another step"); 1020 1021 case WILL_CREATE_VALUE_AND_CLOSER: 1022 throw new IllegalStateException( 1023 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 1024 1025 case WILL_CLOSE: 1026 case CLOSING: 1027 case CLOSED: 1028 throw new IllegalStateException("Cannot call finishToFuture() twice"); 1029 1030 case OPEN: 1031 throw new AssertionError(); 1032 } 1033 } 1034 return future; 1035 } 1036 1037 /** 1038 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 1039 * {@code receiver} will be called with an object that contains the result of the operation. The 1040 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 1041 * 1042 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 1043 * any other derivation method on this {@code ClosingFuture}. 1044 * 1045 * @param consumer a callback whose method will be called (using {@code executor}) when this 1046 * operation is done 1047 */ 1048 public void finishToValueAndCloser( 1049 final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 1050 checkNotNull(consumer); 1051 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 1052 switch (state.get()) { 1053 case SUBSUMED: 1054 throw new IllegalStateException( 1055 "Cannot call finishToValueAndCloser() after deriving another step"); 1056 1057 case WILL_CLOSE: 1058 case CLOSING: 1059 case CLOSED: 1060 throw new IllegalStateException( 1061 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1062 1063 case WILL_CREATE_VALUE_AND_CLOSER: 1064 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1065 1066 case OPEN: 1067 break; 1068 } 1069 throw new AssertionError(state); 1070 } 1071 future.addListener( 1072 new Runnable() { 1073 @Override 1074 public void run() { 1075 provideValueAndCloser(consumer, ClosingFuture.this); 1076 } 1077 }, 1078 executor); 1079 } 1080 1081 private static <C, V extends C> void provideValueAndCloser( 1082 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1083 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1084 } 1085 1086 /** 1087 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1088 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1089 * successful, and this step has not started when {@code cancel} is called, this step should never 1090 * run. 1091 * 1092 * <p>If successful, causes the objects captured by this step (if already started) and its input 1093 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1094 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1095 * 1096 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1097 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1098 * cancelled regardless 1099 * @return {@code false} if the step could not be cancelled, typically because it has already 1100 * completed normally; {@code true} otherwise 1101 */ 1102 @CanIgnoreReturnValue 1103 public boolean cancel(boolean mayInterruptIfRunning) { 1104 logger.log(FINER, "cancelling {0}", this); 1105 boolean cancelled = future.cancel(mayInterruptIfRunning); 1106 if (cancelled) { 1107 close(); 1108 } 1109 return cancelled; 1110 } 1111 1112 private void close() { 1113 logger.log(FINER, "closing {0}", this); 1114 closeables.close(); 1115 } 1116 1117 private <U> ClosingFuture<U> derive(FluentFuture<U> future) { 1118 ClosingFuture<U> derived = new ClosingFuture<>(future); 1119 becomeSubsumedInto(derived.closeables); 1120 return derived; 1121 } 1122 1123 private void becomeSubsumedInto(CloseableList otherCloseables) { 1124 checkAndUpdateState(OPEN, SUBSUMED); 1125 otherCloseables.add(closeables, directExecutor()); 1126 } 1127 1128 /** 1129 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1130 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1131 * 1132 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1133 */ 1134 public static final class Peeker { 1135 private final ImmutableList<ClosingFuture<?>> futures; 1136 private volatile boolean beingCalled; 1137 1138 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1139 this.futures = checkNotNull(futures); 1140 } 1141 1142 /** 1143 * Returns the value of {@code closingFuture}. 1144 * 1145 * @throws ExecutionException if {@code closingFuture} is a failed step 1146 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1147 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1148 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1149 * @throws IllegalStateException if called outside of a call to {@link 1150 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1151 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1152 */ 1153 @NullableDecl 1154 public final <D extends Object> D getDone(ClosingFuture<D> closingFuture) 1155 throws ExecutionException { 1156 checkState(beingCalled); 1157 checkArgument(futures.contains(closingFuture)); 1158 return Futures.getDone(closingFuture.future); 1159 } 1160 1161 @NullableDecl 1162 private <V extends Object> V call(CombiningCallable<V> combiner, CloseableList closeables) 1163 throws Exception { 1164 beingCalled = true; 1165 CloseableList newCloseables = new CloseableList(); 1166 try { 1167 return combiner.call(newCloseables.closer, this); 1168 } finally { 1169 closeables.add(newCloseables, directExecutor()); 1170 beingCalled = false; 1171 } 1172 } 1173 1174 private <V extends Object> FluentFuture<V> callAsync( 1175 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1176 beingCalled = true; 1177 CloseableList newCloseables = new CloseableList(); 1178 try { 1179 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1180 closingFuture.becomeSubsumedInto(closeables); 1181 return closingFuture.future; 1182 } finally { 1183 closeables.add(newCloseables, directExecutor()); 1184 beingCalled = false; 1185 } 1186 } 1187 } 1188 1189 /** 1190 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1191 * 1192 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1193 * instantiate this class. 1194 * 1195 * <p>Example: 1196 * 1197 * <pre>{@code 1198 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1199 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1200 * ListenableFuture<Integer> numberOfDifferentLines = 1201 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1202 * .call( 1203 * (closer, peeker) -> { 1204 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1205 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1206 * return countDifferentLines(file1Reader, file2Reader); 1207 * }, 1208 * executor) 1209 * .closing(executor); 1210 * }</pre> 1211 */ 1212 // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8. 1213 @com.google.errorprone.annotations.DoNotMock( 1214 "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1215 public static class Combiner { 1216 1217 private final CloseableList closeables = new CloseableList(); 1218 1219 /** 1220 * An operation that returns a result and may throw an exception. 1221 * 1222 * @param <V> the type of the result 1223 */ 1224 public interface CombiningCallable<V extends Object> { 1225 /** 1226 * Computes a result, or throws an exception if unable to do so. 1227 * 1228 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1229 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1230 * is done (but not before this method completes), even if this method throws or the pipeline 1231 * is cancelled. 1232 * 1233 * @param peeker used to get the value of any of the input futures 1234 */ 1235 @NullableDecl 1236 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1237 } 1238 1239 /** 1240 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1241 * 1242 * @param <V> the type of the result 1243 */ 1244 public interface AsyncCombiningCallable<V extends Object> { 1245 /** 1246 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1247 * 1248 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1249 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1250 * is done (but not before this method completes), even if this method throws or the pipeline 1251 * is cancelled. 1252 * 1253 * @param peeker used to get the value of any of the input futures 1254 */ 1255 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1256 } 1257 1258 private final boolean allMustSucceed; 1259 protected final ImmutableList<ClosingFuture<?>> inputs; 1260 1261 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1262 this.allMustSucceed = allMustSucceed; 1263 this.inputs = ImmutableList.copyOf(inputs); 1264 for (ClosingFuture<?> input : inputs) { 1265 input.becomeSubsumedInto(closeables); 1266 } 1267 } 1268 1269 /** 1270 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1271 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1272 * objects to be closed when the pipeline is done. 1273 * 1274 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1275 * fail, so will the returned step. 1276 * 1277 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1278 * cancelled. 1279 * 1280 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1281 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1282 */ 1283 public <V> ClosingFuture<V> call( 1284 final CombiningCallable<V> combiningCallable, Executor executor) { 1285 Callable<V> callable = 1286 new Callable<V>() { 1287 @Override 1288 public V call() throws Exception { 1289 return new Peeker(inputs).call(combiningCallable, closeables); 1290 } 1291 1292 @Override 1293 public String toString() { 1294 return combiningCallable.toString(); 1295 } 1296 }; 1297 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1298 derived.closeables.add(closeables, directExecutor()); 1299 return derived; 1300 } 1301 1302 /** 1303 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1304 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1305 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1306 * captured by the returned {@link ClosingFuture}). 1307 * 1308 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1309 * fail, so will the returned step. 1310 * 1311 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1312 * cancelled. 1313 * 1314 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1315 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1316 * 1317 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1318 * derived step. 1319 * 1320 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1321 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1322 * 1323 * <p>Usage guidelines for this method: 1324 * 1325 * <ul> 1326 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1327 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1328 * Executor)} instead, with a function that returns the next value directly. 1329 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1330 * closer.eventuallyClose()} for every closeable object this step creates in order to 1331 * capture it for later closing. 1332 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1333 * ClosingFuture} call {@link #from(ListenableFuture)}. 1334 * </ul> 1335 * 1336 * <p>The same warnings about doing heavyweight operations within {@link 1337 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1338 */ 1339 public <V> ClosingFuture<V> callAsync( 1340 final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1341 AsyncCallable<V> asyncCallable = 1342 new AsyncCallable<V>() { 1343 @Override 1344 public ListenableFuture<V> call() throws Exception { 1345 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1346 } 1347 1348 @Override 1349 public String toString() { 1350 return combiningCallable.toString(); 1351 } 1352 }; 1353 ClosingFuture<V> derived = 1354 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1355 derived.closeables.add(closeables, directExecutor()); 1356 return derived; 1357 } 1358 1359 private FutureCombiner<Object> futureCombiner() { 1360 return allMustSucceed 1361 ? Futures.whenAllSucceed(inputFutures()) 1362 : Futures.whenAllComplete(inputFutures()); 1363 } 1364 1365 private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE = 1366 new Function<ClosingFuture<?>, FluentFuture<?>>() { 1367 @Override 1368 public FluentFuture<?> apply(ClosingFuture<?> future) { 1369 return future.future; 1370 } 1371 }; 1372 1373 private ImmutableList<FluentFuture<?>> inputFutures() { 1374 return FluentIterable.from(inputs).transform(INNER_FUTURE).toList(); 1375 } 1376 } 1377 1378 /** 1379 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1380 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1381 * combination. 1382 * 1383 * @param <V1> the type returned by the first future 1384 * @param <V2> the type returned by the second future 1385 */ 1386 public static final class Combiner2<V1 extends Object, V2 extends Object> extends Combiner { 1387 1388 /** 1389 * A function that returns a value when applied to the values of the two futures passed to 1390 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1391 * 1392 * @param <V1> the type returned by the first future 1393 * @param <V2> the type returned by the second future 1394 * @param <U> the type returned by the function 1395 */ 1396 public interface ClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> { 1397 1398 /** 1399 * Applies this function to two inputs, or throws an exception if unable to do so. 1400 * 1401 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1402 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1403 * is done (but not before this method completes), even if this method throws or the pipeline 1404 * is cancelled. 1405 */ 1406 @NullableDecl 1407 U apply(DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2) 1408 throws Exception; 1409 } 1410 1411 /** 1412 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1413 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1414 * 1415 * @param <V1> the type returned by the first future 1416 * @param <V2> the type returned by the second future 1417 * @param <U> the type returned by the function 1418 */ 1419 public interface AsyncClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> { 1420 1421 /** 1422 * Applies this function to two inputs, or throws an exception if unable to do so. 1423 * 1424 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1425 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1426 * is done (but not before this method completes), even if this method throws or the pipeline 1427 * is cancelled. 1428 */ 1429 ClosingFuture<U> apply( 1430 DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2) throws Exception; 1431 } 1432 1433 private final ClosingFuture<V1> future1; 1434 private final ClosingFuture<V2> future2; 1435 1436 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1437 super(true, ImmutableList.of(future1, future2)); 1438 this.future1 = future1; 1439 this.future2 = future2; 1440 } 1441 1442 /** 1443 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1444 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1445 * objects to be closed when the pipeline is done. 1446 * 1447 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1448 * any of the inputs fail, so will the returned step. 1449 * 1450 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1451 * 1452 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1453 * ExecutionException} will be extracted and used as the failure of the derived step. 1454 */ 1455 public <U extends Object> ClosingFuture<U> call( 1456 final ClosingFunction2<V1, V2, U> function, Executor executor) { 1457 return call( 1458 new CombiningCallable<U>() { 1459 @Override 1460 @NullableDecl 1461 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1462 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1463 } 1464 1465 @Override 1466 public String toString() { 1467 return function.toString(); 1468 } 1469 }, 1470 executor); 1471 } 1472 1473 /** 1474 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1475 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1476 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1477 * captured by the returned {@link ClosingFuture}). 1478 * 1479 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1480 * any of the inputs fail, so will the returned step. 1481 * 1482 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1483 * 1484 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1485 * ExecutionException} will be extracted and used as the failure of the derived step. 1486 * 1487 * <p>If the function throws any other exception, it will be used as the failure of the derived 1488 * step. 1489 * 1490 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1491 * the closeable objects in that {@code ClosingFuture} will be closed. 1492 * 1493 * <p>Usage guidelines for this method: 1494 * 1495 * <ul> 1496 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1497 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1498 * Executor)} instead, with a function that returns the next value directly. 1499 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1500 * closer.eventuallyClose()} for every closeable object this step creates in order to 1501 * capture it for later closing. 1502 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1503 * ClosingFuture} call {@link #from(ListenableFuture)}. 1504 * </ul> 1505 * 1506 * <p>The same warnings about doing heavyweight operations within {@link 1507 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1508 */ 1509 public <U extends Object> ClosingFuture<U> callAsync( 1510 final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1511 return callAsync( 1512 new AsyncCombiningCallable<U>() { 1513 @Override 1514 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1515 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1516 } 1517 1518 @Override 1519 public String toString() { 1520 return function.toString(); 1521 } 1522 }, 1523 executor); 1524 } 1525 } 1526 1527 /** 1528 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1529 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1530 * ClosingFuture)} to start this combination. 1531 * 1532 * @param <V1> the type returned by the first future 1533 * @param <V2> the type returned by the second future 1534 * @param <V3> the type returned by the third future 1535 */ 1536 public static final class Combiner3<V1 extends Object, V2 extends Object, V3 extends Object> 1537 extends Combiner { 1538 /** 1539 * A function that returns a value when applied to the values of the three futures passed to 1540 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1541 * 1542 * @param <V1> the type returned by the first future 1543 * @param <V2> the type returned by the second future 1544 * @param <V3> the type returned by the third future 1545 * @param <U> the type returned by the function 1546 */ 1547 public interface ClosingFunction3< 1548 V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> { 1549 /** 1550 * Applies this function to three inputs, or throws an exception if unable to do so. 1551 * 1552 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1553 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1554 * is done (but not before this method completes), even if this method throws or the pipeline 1555 * is cancelled. 1556 */ 1557 @NullableDecl 1558 U apply( 1559 DeferredCloser closer, 1560 @NullableDecl V1 value1, 1561 @NullableDecl V2 value2, 1562 @NullableDecl V3 v3) 1563 throws Exception; 1564 } 1565 1566 /** 1567 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1568 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1569 * 1570 * @param <V1> the type returned by the first future 1571 * @param <V2> the type returned by the second future 1572 * @param <V3> the type returned by the third future 1573 * @param <U> the type returned by the function 1574 */ 1575 public interface AsyncClosingFunction3< 1576 V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> { 1577 /** 1578 * Applies this function to three inputs, or throws an exception if unable to do so. 1579 * 1580 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1581 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1582 * is done (but not before this method completes), even if this method throws or the pipeline 1583 * is cancelled. 1584 */ 1585 ClosingFuture<U> apply( 1586 DeferredCloser closer, 1587 @NullableDecl V1 value1, 1588 @NullableDecl V2 value2, 1589 @NullableDecl V3 value3) 1590 throws Exception; 1591 } 1592 1593 private final ClosingFuture<V1> future1; 1594 private final ClosingFuture<V2> future2; 1595 private final ClosingFuture<V3> future3; 1596 1597 private Combiner3( 1598 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1599 super(true, ImmutableList.of(future1, future2, future3)); 1600 this.future1 = future1; 1601 this.future2 = future2; 1602 this.future3 = future3; 1603 } 1604 1605 /** 1606 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1607 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1608 * objects to be closed when the pipeline is done. 1609 * 1610 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1611 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1612 * 1613 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1614 * 1615 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1616 * ExecutionException} will be extracted and used as the failure of the derived step. 1617 */ 1618 public <U extends Object> ClosingFuture<U> call( 1619 final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1620 return call( 1621 new CombiningCallable<U>() { 1622 @Override 1623 @NullableDecl 1624 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1625 return function.apply( 1626 closer, 1627 peeker.getDone(future1), 1628 peeker.getDone(future2), 1629 peeker.getDone(future3)); 1630 } 1631 1632 @Override 1633 public String toString() { 1634 return function.toString(); 1635 } 1636 }, 1637 executor); 1638 } 1639 1640 /** 1641 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1642 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1643 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1644 * captured by the returned {@link ClosingFuture}). 1645 * 1646 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1647 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1648 * 1649 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1650 * 1651 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1652 * ExecutionException} will be extracted and used as the failure of the derived step. 1653 * 1654 * <p>If the function throws any other exception, it will be used as the failure of the derived 1655 * step. 1656 * 1657 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1658 * the closeable objects in that {@code ClosingFuture} will be closed. 1659 * 1660 * <p>Usage guidelines for this method: 1661 * 1662 * <ul> 1663 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1664 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1665 * Executor)} instead, with a function that returns the next value directly. 1666 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1667 * closer.eventuallyClose()} for every closeable object this step creates in order to 1668 * capture it for later closing. 1669 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1670 * ClosingFuture} call {@link #from(ListenableFuture)}. 1671 * </ul> 1672 * 1673 * <p>The same warnings about doing heavyweight operations within {@link 1674 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1675 */ 1676 public <U extends Object> ClosingFuture<U> callAsync( 1677 final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1678 return callAsync( 1679 new AsyncCombiningCallable<U>() { 1680 @Override 1681 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1682 return function.apply( 1683 closer, 1684 peeker.getDone(future1), 1685 peeker.getDone(future2), 1686 peeker.getDone(future3)); 1687 } 1688 1689 @Override 1690 public String toString() { 1691 return function.toString(); 1692 } 1693 }, 1694 executor); 1695 } 1696 } 1697 1698 /** 1699 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1700 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1701 * ClosingFuture)} to start this combination. 1702 * 1703 * @param <V1> the type returned by the first future 1704 * @param <V2> the type returned by the second future 1705 * @param <V3> the type returned by the third future 1706 * @param <V4> the type returned by the fourth future 1707 */ 1708 public static final class Combiner4< 1709 V1 extends Object, V2 extends Object, V3 extends Object, V4 extends Object> 1710 extends Combiner { 1711 /** 1712 * A function that returns a value when applied to the values of the four futures passed to 1713 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1714 * 1715 * @param <V1> the type returned by the first future 1716 * @param <V2> the type returned by the second future 1717 * @param <V3> the type returned by the third future 1718 * @param <V4> the type returned by the fourth future 1719 * @param <U> the type returned by the function 1720 */ 1721 public interface ClosingFunction4< 1722 V1 extends Object, 1723 V2 extends Object, 1724 V3 extends Object, 1725 V4 extends Object, 1726 U extends Object> { 1727 /** 1728 * Applies this function to four inputs, or throws an exception if unable to do so. 1729 * 1730 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1731 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1732 * is done (but not before this method completes), even if this method throws or the pipeline 1733 * is cancelled. 1734 */ 1735 @NullableDecl 1736 U apply( 1737 DeferredCloser closer, 1738 @NullableDecl V1 value1, 1739 @NullableDecl V2 value2, 1740 @NullableDecl V3 value3, 1741 @NullableDecl V4 value4) 1742 throws Exception; 1743 } 1744 1745 /** 1746 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1747 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1748 * ClosingFuture)}. 1749 * 1750 * @param <V1> the type returned by the first future 1751 * @param <V2> the type returned by the second future 1752 * @param <V3> the type returned by the third future 1753 * @param <V4> the type returned by the fourth future 1754 * @param <U> the type returned by the function 1755 */ 1756 public interface AsyncClosingFunction4< 1757 V1 extends Object, 1758 V2 extends Object, 1759 V3 extends Object, 1760 V4 extends Object, 1761 U extends Object> { 1762 /** 1763 * Applies this function to four inputs, or throws an exception if unable to do so. 1764 * 1765 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1766 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1767 * is done (but not before this method completes), even if this method throws or the pipeline 1768 * is cancelled. 1769 */ 1770 ClosingFuture<U> apply( 1771 DeferredCloser closer, 1772 @NullableDecl V1 value1, 1773 @NullableDecl V2 value2, 1774 @NullableDecl V3 value3, 1775 @NullableDecl V4 value4) 1776 throws Exception; 1777 } 1778 1779 private final ClosingFuture<V1> future1; 1780 private final ClosingFuture<V2> future2; 1781 private final ClosingFuture<V3> future3; 1782 private final ClosingFuture<V4> future4; 1783 1784 private Combiner4( 1785 ClosingFuture<V1> future1, 1786 ClosingFuture<V2> future2, 1787 ClosingFuture<V3> future3, 1788 ClosingFuture<V4> future4) { 1789 super(true, ImmutableList.of(future1, future2, future3, future4)); 1790 this.future1 = future1; 1791 this.future2 = future2; 1792 this.future3 = future3; 1793 this.future4 = future4; 1794 } 1795 1796 /** 1797 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1798 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1799 * objects to be closed when the pipeline is done. 1800 * 1801 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1802 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1803 * 1804 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1805 * 1806 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1807 * ExecutionException} will be extracted and used as the failure of the derived step. 1808 */ 1809 public <U extends Object> ClosingFuture<U> call( 1810 final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1811 return call( 1812 new CombiningCallable<U>() { 1813 @Override 1814 @NullableDecl 1815 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1816 return function.apply( 1817 closer, 1818 peeker.getDone(future1), 1819 peeker.getDone(future2), 1820 peeker.getDone(future3), 1821 peeker.getDone(future4)); 1822 } 1823 1824 @Override 1825 public String toString() { 1826 return function.toString(); 1827 } 1828 }, 1829 executor); 1830 } 1831 1832 /** 1833 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1834 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1835 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1836 * captured by the returned {@link ClosingFuture}). 1837 * 1838 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1839 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1840 * 1841 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1842 * 1843 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1844 * ExecutionException} will be extracted and used as the failure of the derived step. 1845 * 1846 * <p>If the function throws any other exception, it will be used as the failure of the derived 1847 * step. 1848 * 1849 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1850 * the closeable objects in that {@code ClosingFuture} will be closed. 1851 * 1852 * <p>Usage guidelines for this method: 1853 * 1854 * <ul> 1855 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1856 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1857 * Executor)} instead, with a function that returns the next value directly. 1858 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1859 * closer.eventuallyClose()} for every closeable object this step creates in order to 1860 * capture it for later closing. 1861 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1862 * ClosingFuture} call {@link #from(ListenableFuture)}. 1863 * </ul> 1864 * 1865 * <p>The same warnings about doing heavyweight operations within {@link 1866 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1867 */ 1868 public <U extends Object> ClosingFuture<U> callAsync( 1869 final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1870 return callAsync( 1871 new AsyncCombiningCallable<U>() { 1872 @Override 1873 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1874 return function.apply( 1875 closer, 1876 peeker.getDone(future1), 1877 peeker.getDone(future2), 1878 peeker.getDone(future3), 1879 peeker.getDone(future4)); 1880 } 1881 1882 @Override 1883 public String toString() { 1884 return function.toString(); 1885 } 1886 }, 1887 executor); 1888 } 1889 } 1890 1891 /** 1892 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1893 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1894 * ClosingFuture, ClosingFuture)} to start this combination. 1895 * 1896 * @param <V1> the type returned by the first future 1897 * @param <V2> the type returned by the second future 1898 * @param <V3> the type returned by the third future 1899 * @param <V4> the type returned by the fourth future 1900 * @param <V5> the type returned by the fifth future 1901 */ 1902 public static final class Combiner5< 1903 V1 extends Object, 1904 V2 extends Object, 1905 V3 extends Object, 1906 V4 extends Object, 1907 V5 extends Object> 1908 extends Combiner { 1909 /** 1910 * A function that returns a value when applied to the values of the five futures passed to 1911 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1912 * ClosingFuture)}. 1913 * 1914 * @param <V1> the type returned by the first future 1915 * @param <V2> the type returned by the second future 1916 * @param <V3> the type returned by the third future 1917 * @param <V4> the type returned by the fourth future 1918 * @param <V5> the type returned by the fifth future 1919 * @param <U> the type returned by the function 1920 */ 1921 public interface ClosingFunction5< 1922 V1 extends Object, 1923 V2 extends Object, 1924 V3 extends Object, 1925 V4 extends Object, 1926 V5 extends Object, 1927 U extends Object> { 1928 /** 1929 * Applies this function to five inputs, or throws an exception if unable to do so. 1930 * 1931 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1932 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1933 * is done (but not before this method completes), even if this method throws or the pipeline 1934 * is cancelled. 1935 */ 1936 @NullableDecl 1937 U apply( 1938 DeferredCloser closer, 1939 @NullableDecl V1 value1, 1940 @NullableDecl V2 value2, 1941 @NullableDecl V3 value3, 1942 @NullableDecl V4 value4, 1943 @NullableDecl V5 value5) 1944 throws Exception; 1945 } 1946 1947 /** 1948 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1949 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1950 * ClosingFuture, ClosingFuture)}. 1951 * 1952 * @param <V1> the type returned by the first future 1953 * @param <V2> the type returned by the second future 1954 * @param <V3> the type returned by the third future 1955 * @param <V4> the type returned by the fourth future 1956 * @param <V5> the type returned by the fifth future 1957 * @param <U> the type returned by the function 1958 */ 1959 public interface AsyncClosingFunction5< 1960 V1 extends Object, 1961 V2 extends Object, 1962 V3 extends Object, 1963 V4 extends Object, 1964 V5 extends Object, 1965 U extends Object> { 1966 /** 1967 * Applies this function to five inputs, or throws an exception if unable to do so. 1968 * 1969 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1970 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1971 * is done (but not before this method completes), even if this method throws or the pipeline 1972 * is cancelled. 1973 */ 1974 ClosingFuture<U> apply( 1975 DeferredCloser closer, 1976 @NullableDecl V1 value1, 1977 @NullableDecl V2 value2, 1978 @NullableDecl V3 value3, 1979 @NullableDecl V4 value4, 1980 @NullableDecl V5 value5) 1981 throws Exception; 1982 } 1983 1984 private final ClosingFuture<V1> future1; 1985 private final ClosingFuture<V2> future2; 1986 private final ClosingFuture<V3> future3; 1987 private final ClosingFuture<V4> future4; 1988 private final ClosingFuture<V5> future5; 1989 1990 private Combiner5( 1991 ClosingFuture<V1> future1, 1992 ClosingFuture<V2> future2, 1993 ClosingFuture<V3> future3, 1994 ClosingFuture<V4> future4, 1995 ClosingFuture<V5> future5) { 1996 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 1997 this.future1 = future1; 1998 this.future2 = future2; 1999 this.future3 = future3; 2000 this.future4 = future4; 2001 this.future5 = future5; 2002 } 2003 2004 /** 2005 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2006 * combining function to their values. The function can use a {@link DeferredCloser} to capture 2007 * objects to be closed when the pipeline is done. 2008 * 2009 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2010 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2011 * returned step. 2012 * 2013 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2014 * 2015 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2016 * ExecutionException} will be extracted and used as the failure of the derived step. 2017 */ 2018 public <U extends Object> ClosingFuture<U> call( 2019 final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2020 return call( 2021 new CombiningCallable<U>() { 2022 @Override 2023 @NullableDecl 2024 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 2025 return function.apply( 2026 closer, 2027 peeker.getDone(future1), 2028 peeker.getDone(future2), 2029 peeker.getDone(future3), 2030 peeker.getDone(future4), 2031 peeker.getDone(future5)); 2032 } 2033 2034 @Override 2035 public String toString() { 2036 return function.toString(); 2037 } 2038 }, 2039 executor); 2040 } 2041 2042 /** 2043 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2044 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 2045 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 2046 * captured by the returned {@link ClosingFuture}). 2047 * 2048 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2049 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2050 * returned step. 2051 * 2052 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2053 * 2054 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2055 * ExecutionException} will be extracted and used as the failure of the derived step. 2056 * 2057 * <p>If the function throws any other exception, it will be used as the failure of the derived 2058 * step. 2059 * 2060 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2061 * the closeable objects in that {@code ClosingFuture} will be closed. 2062 * 2063 * <p>Usage guidelines for this method: 2064 * 2065 * <ul> 2066 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2067 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2068 * Executor)} instead, with a function that returns the next value directly. 2069 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 2070 * closer.eventuallyClose()} for every closeable object this step creates in order to 2071 * capture it for later closing. 2072 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2073 * ClosingFuture} call {@link #from(ListenableFuture)}. 2074 * </ul> 2075 * 2076 * <p>The same warnings about doing heavyweight operations within {@link 2077 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2078 */ 2079 public <U extends Object> ClosingFuture<U> callAsync( 2080 final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2081 return callAsync( 2082 new AsyncCombiningCallable<U>() { 2083 @Override 2084 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2085 return function.apply( 2086 closer, 2087 peeker.getDone(future1), 2088 peeker.getDone(future2), 2089 peeker.getDone(future3), 2090 peeker.getDone(future4), 2091 peeker.getDone(future5)); 2092 } 2093 2094 @Override 2095 public String toString() { 2096 return function.toString(); 2097 } 2098 }, 2099 executor); 2100 } 2101 } 2102 2103 @Override 2104 public String toString() { 2105 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2106 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2107 } 2108 2109 @Override 2110 protected void finalize() { 2111 if (state.get().equals(OPEN)) { 2112 logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2113 FluentFuture<V> unused = finishToFuture(); 2114 } 2115 } 2116 2117 private static void closeQuietly(final Closeable closeable, Executor executor) { 2118 if (closeable == null) { 2119 return; 2120 } 2121 try { 2122 executor.execute( 2123 new Runnable() { 2124 @Override 2125 public void run() { 2126 try { 2127 closeable.close(); 2128 } catch (IOException | RuntimeException e) { 2129 logger.log(WARNING, "thrown by close()", e); 2130 } 2131 } 2132 }); 2133 } catch (RejectedExecutionException e) { 2134 if (logger.isLoggable(WARNING)) { 2135 logger.log( 2136 WARNING, String.format("while submitting close to %s; will close inline", executor), e); 2137 } 2138 closeQuietly(closeable, directExecutor()); 2139 } 2140 } 2141 2142 private void checkAndUpdateState(State oldState, State newState) { 2143 checkState( 2144 compareAndUpdateState(oldState, newState), 2145 "Expected state to be %s, but it was %s", 2146 oldState, 2147 newState); 2148 } 2149 2150 private boolean compareAndUpdateState(State oldState, State newState) { 2151 return state.compareAndSet(oldState, newState); 2152 } 2153 2154 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2155 private static final class CloseableList extends IdentityHashMap<Closeable, Executor> 2156 implements Closeable { 2157 private final DeferredCloser closer = new DeferredCloser(this); 2158 private volatile boolean closed; 2159 private volatile CountDownLatch whenClosed; 2160 2161 <V, U> ListenableFuture<U> applyClosingFunction( 2162 ClosingFunction<? super V, U> transformation, V input) throws Exception { 2163 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2164 CloseableList newCloseables = new CloseableList(); 2165 try { 2166 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2167 } finally { 2168 add(newCloseables, directExecutor()); 2169 } 2170 } 2171 2172 <V, U> FluentFuture<U> applyAsyncClosingFunction( 2173 AsyncClosingFunction<V, U> transformation, V input) throws Exception { 2174 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2175 CloseableList newCloseables = new CloseableList(); 2176 try { 2177 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2178 closingFuture.becomeSubsumedInto(newCloseables); 2179 return closingFuture.future; 2180 } finally { 2181 add(newCloseables, directExecutor()); 2182 } 2183 } 2184 2185 @Override 2186 public void close() { 2187 if (closed) { 2188 return; 2189 } 2190 synchronized (this) { 2191 if (closed) { 2192 return; 2193 } 2194 closed = true; 2195 } 2196 for (Map.Entry<Closeable, Executor> entry : entrySet()) { 2197 closeQuietly(entry.getKey(), entry.getValue()); 2198 } 2199 clear(); 2200 if (whenClosed != null) { 2201 whenClosed.countDown(); 2202 } 2203 } 2204 2205 void add(@NullableDecl Closeable closeable, Executor executor) { 2206 checkNotNull(executor); 2207 if (closeable == null) { 2208 return; 2209 } 2210 synchronized (this) { 2211 if (!closed) { 2212 put(closeable, executor); 2213 return; 2214 } 2215 } 2216 closeQuietly(closeable, executor); 2217 } 2218 2219 /** 2220 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2221 */ 2222 CountDownLatch whenClosedCountDown() { 2223 if (closed) { 2224 return new CountDownLatch(0); 2225 } 2226 synchronized (this) { 2227 if (closed) { 2228 return new CountDownLatch(0); 2229 } 2230 checkState(whenClosed == null); 2231 return whenClosed = new CountDownLatch(1); 2232 } 2233 } 2234 } 2235 2236 /** 2237 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2238 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2239 */ 2240 @VisibleForTesting 2241 CountDownLatch whenClosedCountDown() { 2242 return closeables.whenClosedCountDown(); 2243 } 2244 2245 /** The state of a {@link CloseableList}. */ 2246 enum State { 2247 /** The {@link CloseableList} has not been subsumed or closed. */ 2248 OPEN, 2249 2250 /** 2251 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2252 * into any other. 2253 */ 2254 SUBSUMED, 2255 2256 /** 2257 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2258 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2259 */ 2260 WILL_CLOSE, 2261 2262 /** 2263 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2264 * {@link CloseableList} may not be subsumed. 2265 */ 2266 CLOSING, 2267 2268 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2269 CLOSED, 2270 2271 /** 2272 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2273 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2274 */ 2275 WILL_CREATE_VALUE_AND_CLOSER, 2276 } 2277}