001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035import static java.util.logging.Level.FINER; 036import static java.util.logging.Level.SEVERE; 037import static java.util.logging.Level.WARNING; 038 039import com.google.common.annotations.Beta; 040import com.google.common.annotations.VisibleForTesting; 041import com.google.common.base.Function; 042import com.google.common.collect.FluentIterable; 043import com.google.common.collect.ImmutableList; 044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 046import com.google.common.util.concurrent.Futures.FutureCombiner; 047import com.google.errorprone.annotations.CanIgnoreReturnValue; 048import com.google.errorprone.annotations.DoNotMock; 049import com.google.j2objc.annotations.RetainedWith; 050import java.io.Closeable; 051import java.io.IOException; 052import java.util.IdentityHashMap; 053import java.util.Map; 054import java.util.concurrent.Callable; 055import java.util.concurrent.CancellationException; 056import java.util.concurrent.CountDownLatch; 057import java.util.concurrent.ExecutionException; 058import java.util.concurrent.Executor; 059import java.util.concurrent.Future; 060import java.util.concurrent.RejectedExecutionException; 061import java.util.concurrent.atomic.AtomicReference; 062import java.util.logging.Logger; 063import org.checkerframework.checker.nullness.compatqual.NullableDecl; 064 065/** 066 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 067 * complete, some objects captured during the computation are closed. 068 * 069 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 070 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 071 * cancellation of the operation so far. The only way to extract the value or exception from a step 072 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 073 * "value" of a successful step or the "result" (value or exception) of any step. 074 * 075 * <ol> 076 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 077 * block or a {@link ListenableFuture}. 078 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 079 * can be captured for later closing. 080 * <li>There is one last step (the root of the tree), from which you can extract the final result 081 * of the computation. After that result is available (or the computation fails), all objects 082 * captured by any of the steps in the pipeline are closed. 083 * </ol> 084 * 085 * <h3>Starting a pipeline</h3> 086 * 087 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 088 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 089 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 090 * #from(ListenableFuture)} instead. 091 * 092 * <h3>Derived steps</h3> 093 * 094 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 095 * ways similar to {@link FluentFuture}s: 096 * 097 * <ul> 098 * <li>by transforming the value from a successful input step, 099 * <li>by catching the exception from a failed input step, or 100 * <li>by combining the results of several input steps. 101 * </ul> 102 * 103 * Each derivation can capture the next value or any intermediate objects for later closing. 104 * 105 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 106 * exception, or combine it with others, you cannot do anything else with it, including declare it 107 * to be the last step of the pipeline. 108 * 109 * <h4>Transforming</h4> 110 * 111 * To derive the next step by asynchronously applying a function to an input step's value, call 112 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 113 * Executor)} on the input step. 114 * 115 * <h4>Catching</h4> 116 * 117 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 118 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 119 * 120 * <h4>Combining</h4> 121 * 122 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 123 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 124 * 125 * <h3>Cancelling</h3> 126 * 127 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 128 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 129 * successfully cancelled step will immediately start closing all objects captured for later closing 130 * by it and by its input steps. 131 * 132 * <h3>Ending a pipeline</h3> 133 * 134 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 135 * close the captured objects automatically or manually. 136 * 137 * <h4>Automatically closing</h4> 138 * 139 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 140 * calling {@link #finishToFuture()}. When that final {@link Future} is done, all objects captured 141 * by all steps in the pipeline will be closed. 142 * 143 * <pre>{@code 144 * FluentFuture<UserName> userName = 145 * ClosingFuture.submit( 146 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 147 * executor) 148 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 149 * .transform((closer, result) -> result.get("userName"), directExecutor()) 150 * .catching(DBException.class, e -> "no user", directExecutor()) 151 * .finishToFuture(); 152 * }</pre> 153 * 154 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 155 * result cursor will both be closed, even if the operation is cancelled or fails. 156 * 157 * <h4>Manually closing</h4> 158 * 159 * If you want to close the captured objects manually, after you've used the final result, call 160 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 161 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 162 * 163 * <pre>{@code 164 * ClosingFuture.submit( 165 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 166 * executor) 167 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 168 * .transform((closer, result) -> result.get("userName"), directExecutor()) 169 * .catching(DBException.class, e -> "no user", directExecutor()) 170 * .finishToValueAndCloser( 171 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 172 * 173 * // later 174 * try { // get() will throw if the operation failed or was cancelled. 175 * UserName userName = userNameValueAndCloser.get(); 176 * // do something with userName 177 * } finally { 178 * userNameValueAndCloser.closeAsync(); 179 * } 180 * }</pre> 181 * 182 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 183 * the query result cursor will both be closed, even if the operation is cancelled or fails. 184 * 185 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 186 * automatic-closing approach described above is safer. 187 * 188 * @param <V> the type of the value of this step 189 * @since 30.0 190 */ 191// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 192@Beta // @Beta for one release. 193@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 194// TODO(dpb): GWT compatibility. 195public final class ClosingFuture<V> { 196 197 private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName()); 198 199 /** 200 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 201 * done. 202 */ 203 public static final class DeferredCloser { 204 @RetainedWith private final CloseableList list; 205 206 DeferredCloser(CloseableList list) { 207 this.list = list; 208 } 209 210 /** 211 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 212 * 213 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 214 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 215 * Closeable}. (For more about the flavors, see <a 216 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 217 * build</a>.) 218 * 219 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 220 * building for Android): Ensure that any object you pass implements the interface not just in 221 * your current SDK version but also at the oldest version you support. For example, <a 222 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 223 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 224 * {@code Closeable} with a method reference like {@code cursor::close}. 225 * 226 * <p>Note that this method is still binary-compatible between flavors because the erasure of 227 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 228 * 229 * @param closeable the object to be closed (see notes above) 230 * @param closingExecutor the object will be closed on this executor 231 * @return the first argument 232 */ 233 @CanIgnoreReturnValue 234 @NullableDecl 235 // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19. 236 public <C extends Object & Closeable> C eventuallyClose( 237 @NullableDecl C closeable, Executor closingExecutor) { 238 checkNotNull(closingExecutor); 239 if (closeable != null) { 240 list.add(closeable, closingExecutor); 241 } 242 return closeable; 243 } 244 } 245 246 /** 247 * An operation that computes a result. 248 * 249 * @param <V> the type of the result 250 */ 251 public interface ClosingCallable<V extends Object> { 252 /** 253 * Computes a result, or throws an exception if unable to do so. 254 * 255 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 256 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 257 * not before this method completes), even if this method throws or the pipeline is cancelled. 258 */ 259 @NullableDecl 260 V call(DeferredCloser closer) throws Exception; 261 } 262 263 /** 264 * An operation that computes a {@link ClosingFuture} of a result. 265 * 266 * @param <V> the type of the result 267 * @since 30.1 268 */ 269 public interface AsyncClosingCallable<V extends Object> { 270 /** 271 * Computes a result, or throws an exception if unable to do so. 272 * 273 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 274 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 275 * not before this method completes), even if this method throws or the pipeline is cancelled. 276 */ 277 ClosingFuture<V> call(DeferredCloser closer) throws Exception; 278 } 279 280 /** 281 * A function from an input to a result. 282 * 283 * @param <T> the type of the input to the function 284 * @param <U> the type of the result of the function 285 */ 286 public interface ClosingFunction<T extends Object, U extends Object> { 287 288 /** 289 * Applies this function to an input, or throws an exception if unable to do so. 290 * 291 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 292 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 293 * not before this method completes), even if this method throws or the pipeline is cancelled. 294 */ 295 @NullableDecl 296 U apply(DeferredCloser closer, @NullableDecl T input) throws Exception; 297 } 298 299 /** 300 * A function from an input to a {@link ClosingFuture} of a result. 301 * 302 * @param <T> the type of the input to the function 303 * @param <U> the type of the result of the function 304 */ 305 public interface AsyncClosingFunction<T extends Object, U extends Object> { 306 /** 307 * Applies this function to an input, or throws an exception if unable to do so. 308 * 309 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 310 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 311 * not before this method completes), even if this method throws or the pipeline is cancelled. 312 */ 313 ClosingFuture<U> apply(DeferredCloser closer, @NullableDecl T input) throws Exception; 314 } 315 316 /** 317 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 318 * allows the user to close all the closeable objects that were captured during it for later 319 * closing. 320 * 321 * <p>The asynchronous operation will have completed before this object is created. 322 * 323 * @param <V> the type of the value of a successful operation 324 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 325 */ 326 public static final class ValueAndCloser<V> { 327 328 private final ClosingFuture<? extends V> closingFuture; 329 330 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 331 this.closingFuture = checkNotNull(closingFuture); 332 } 333 334 /** 335 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 336 * {@link Future#get()} would. 337 * 338 * <p>Because the asynchronous operation has already completed, this method is synchronous and 339 * returns immediately. 340 * 341 * @throws CancellationException if the computation was cancelled 342 * @throws ExecutionException if the computation threw an exception 343 */ 344 @NullableDecl 345 public V get() throws ExecutionException { 346 return getDone(closingFuture.future); 347 } 348 349 /** 350 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 351 * operation on the {@link Executor}s specified by calls to {@link 352 * DeferredCloser#eventuallyClose(Closeable, Executor)}. 353 * 354 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 355 * closed synchronously. 356 * 357 * <p>Idempotent: objects will be closed at most once. 358 */ 359 public void closeAsync() { 360 closingFuture.close(); 361 } 362 } 363 364 /** 365 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 366 * ClosingFuture} pipeline. 367 * 368 * @param <V> the type of the final value of a successful pipeline 369 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 370 */ 371 public interface ValueAndCloserConsumer<V> { 372 373 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 374 void accept(ValueAndCloser<V> valueAndCloser); 375 } 376 377 /** 378 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 379 * 380 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 381 * execution 382 */ 383 public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) { 384 return new ClosingFuture<>(callable, executor); 385 } 386 387 /** 388 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 389 * 390 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 391 * execution 392 * @since 30.1 393 */ 394 public static <V> ClosingFuture<V> submitAsync( 395 AsyncClosingCallable<V> callable, Executor executor) { 396 return new ClosingFuture<>(callable, executor); 397 } 398 399 /** 400 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 401 * 402 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 403 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 404 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 405 */ 406 public static <V> ClosingFuture<V> from(ListenableFuture<V> future) { 407 return new ClosingFuture<V>(future); 408 } 409 410 /** 411 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 412 * 413 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when 414 * the pipeline is done, even if the pipeline is canceled or fails. 415 * 416 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 417 * value in order to close it. 418 * 419 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 420 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable, 421 * Executor)}. 422 * @param closingExecutor the future's result will be closed on this executor 423 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 424 * underlying value may never be closed if the {@link Future} is canceled after its operation 425 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 426 * including those that pass them to this method, with {@link #submit(ClosingCallable, 427 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 428 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 429 * ClosingFuture#from}. 430 */ 431 @Deprecated 432 // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19. 433 public static <C extends Object & Closeable> ClosingFuture<C> eventuallyClosing( 434 ListenableFuture<C> future, final Executor closingExecutor) { 435 checkNotNull(closingExecutor); 436 final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 437 Futures.addCallback( 438 future, 439 new FutureCallback<Closeable>() { 440 @Override 441 public void onSuccess(@NullableDecl Closeable result) { 442 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 443 } 444 445 @Override 446 public void onFailure(Throwable t) {} 447 }, 448 directExecutor()); 449 return closingFuture; 450 } 451 452 /** 453 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 454 * 455 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 456 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 457 */ 458 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 459 return new Combiner(false, futures); 460 } 461 462 /** 463 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 464 * 465 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 466 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 467 */ 468 public static Combiner whenAllComplete( 469 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 470 return whenAllComplete(asList(future1, moreFutures)); 471 } 472 473 /** 474 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 475 * all succeed. If any fail, the resulting pipeline will fail. 476 * 477 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 478 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 479 */ 480 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 481 return new Combiner(true, futures); 482 } 483 484 /** 485 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 486 * they all succeed. If any fail, the resulting pipeline will fail. 487 * 488 * <p>Calling this method allows you to use lambdas or method references typed with the types of 489 * the input {@link ClosingFuture}s. 490 * 491 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 492 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 493 */ 494 public static <V1, V2> Combiner2<V1, V2> whenAllSucceed( 495 ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 496 return new Combiner2<>(future1, future2); 497 } 498 499 /** 500 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 501 * they all succeed. If any fail, the resulting pipeline will fail. 502 * 503 * <p>Calling this method allows you to use lambdas or method references typed with the types of 504 * the input {@link ClosingFuture}s. 505 * 506 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 507 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 508 */ 509 public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed( 510 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 511 return new Combiner3<>(future1, future2, future3); 512 } 513 514 /** 515 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 516 * they all succeed. If any fail, the resulting pipeline will fail. 517 * 518 * <p>Calling this method allows you to use lambdas or method references typed with the types of 519 * the input {@link ClosingFuture}s. 520 * 521 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 522 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 523 */ 524 public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed( 525 ClosingFuture<V1> future1, 526 ClosingFuture<V2> future2, 527 ClosingFuture<V3> future3, 528 ClosingFuture<V4> future4) { 529 return new Combiner4<>(future1, future2, future3, future4); 530 } 531 532 /** 533 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 534 * they all succeed. If any fail, the resulting pipeline will fail. 535 * 536 * <p>Calling this method allows you to use lambdas or method references typed with the types of 537 * the input {@link ClosingFuture}s. 538 * 539 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 540 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 541 */ 542 public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 543 ClosingFuture<V1> future1, 544 ClosingFuture<V2> future2, 545 ClosingFuture<V3> future3, 546 ClosingFuture<V4> future4, 547 ClosingFuture<V5> future5) { 548 return new Combiner5<>(future1, future2, future3, future4, future5); 549 } 550 551 /** 552 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 553 * all succeed. If any fail, the resulting pipeline will fail. 554 * 555 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 556 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 557 */ 558 public static Combiner whenAllSucceed( 559 ClosingFuture<?> future1, 560 ClosingFuture<?> future2, 561 ClosingFuture<?> future3, 562 ClosingFuture<?> future4, 563 ClosingFuture<?> future5, 564 ClosingFuture<?> future6, 565 ClosingFuture<?>... moreFutures) { 566 return whenAllSucceed( 567 FluentIterable.of(future1, future2, future3, future4, future5, future6) 568 .append(moreFutures)); 569 } 570 571 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 572 private final CloseableList closeables = new CloseableList(); 573 private final FluentFuture<V> future; 574 575 private ClosingFuture(ListenableFuture<V> future) { 576 this.future = FluentFuture.from(future); 577 } 578 579 private ClosingFuture(final ClosingCallable<V> callable, Executor executor) { 580 checkNotNull(callable); 581 TrustedListenableFutureTask<V> task = 582 TrustedListenableFutureTask.create( 583 new Callable<V>() { 584 @Override 585 public V call() throws Exception { 586 return callable.call(closeables.closer); 587 } 588 589 @Override 590 public String toString() { 591 return callable.toString(); 592 } 593 }); 594 executor.execute(task); 595 this.future = task; 596 } 597 598 private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) { 599 checkNotNull(callable); 600 TrustedListenableFutureTask<V> task = 601 TrustedListenableFutureTask.create( 602 new AsyncCallable<V>() { 603 @Override 604 public ListenableFuture<V> call() throws Exception { 605 CloseableList newCloseables = new CloseableList(); 606 try { 607 ClosingFuture<V> closingFuture = callable.call(newCloseables.closer); 608 closingFuture.becomeSubsumedInto(closeables); 609 return closingFuture.future; 610 } finally { 611 closeables.add(newCloseables, directExecutor()); 612 } 613 } 614 615 @Override 616 public String toString() { 617 return callable.toString(); 618 } 619 }); 620 executor.execute(task); 621 this.future = task; 622 } 623 624 /** 625 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 626 * future returns {@code null} if the step is successful or throws the same exception that would 627 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 628 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 629 * 630 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 631 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 632 * a derivation method <i>on the same instance</i>. This is important because calling {@code 633 * statusFuture} alone does not provide a way to close the pipeline. 634 */ 635 public ListenableFuture<?> statusFuture() { 636 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 637 } 638 639 /** 640 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 641 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 642 * when the pipeline is done. 643 * 644 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 645 * ClosingFuture} will be equivalent to this one. 646 * 647 * <p>If the function throws an exception, that exception is used as the result of the derived 648 * {@code ClosingFuture}. 649 * 650 * <p>Example usage: 651 * 652 * <pre>{@code 653 * ClosingFuture<List<Row>> rowsFuture = 654 * queryFuture.transform((closer, result) -> result.getRows(), executor); 655 * }</pre> 656 * 657 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 658 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 659 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 660 * 661 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 662 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 663 * this {@code ClosingFuture}. 664 * 665 * @param function transforms the value of this step to the value of the derived step 666 * @param executor executor to run the function in 667 * @return the derived step 668 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 669 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 670 * finished} 671 */ 672 public <U> ClosingFuture<U> transform( 673 final ClosingFunction<? super V, U> function, Executor executor) { 674 checkNotNull(function); 675 AsyncFunction<V, U> applyFunction = 676 new AsyncFunction<V, U>() { 677 @Override 678 public ListenableFuture<U> apply(V input) throws Exception { 679 return closeables.applyClosingFunction(function, input); 680 } 681 682 @Override 683 public String toString() { 684 return function.toString(); 685 } 686 }; 687 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 688 return derive(future.transformAsync(applyFunction, executor)); 689 } 690 691 /** 692 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 693 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 694 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 695 * captured by the returned {@link ClosingFuture}). 696 * 697 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 698 * returned by the function. 699 * 700 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 701 * ClosingFuture} will be equivalent to this one. 702 * 703 * <p>If the function throws an exception, that exception is used as the result of the derived 704 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 705 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 706 * closed. 707 * 708 * <p>Usage guidelines for this method: 709 * 710 * <ul> 711 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 712 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 713 * Executor)} instead, with a function that returns the next value directly. 714 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 715 * for every closeable object this step creates in order to capture it for later closing. 716 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 717 * ClosingFuture} call {@link #from(ListenableFuture)}. 718 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 719 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 720 * {@link #withoutCloser(AsyncFunction)} 721 * </ul> 722 * 723 * <p>Example usage: 724 * 725 * <pre>{@code 726 * // Result.getRowsClosingFuture() returns a ClosingFuture. 727 * ClosingFuture<List<Row>> rowsFuture = 728 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 729 * 730 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 731 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 732 * // Closeable). 733 * ClosingFuture<Integer> rowsFuture2 = 734 * queryFuture.transformAsync( 735 * (closer, result) -> { 736 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 737 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 738 * }, 739 * executor); 740 * 741 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 742 * ClosingFuture<List<Row>> rowsFuture3 = 743 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 744 * 745 * }</pre> 746 * 747 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 748 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 749 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 750 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 751 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 752 * responsible for completing the returned {@code ClosingFuture}.) 753 * 754 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 755 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 756 * this {@code ClosingFuture}. 757 * 758 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 759 * the derived step 760 * @param executor executor to run the function in 761 * @return the derived step 762 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 763 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 764 * finished} 765 */ 766 public <U> ClosingFuture<U> transformAsync( 767 final AsyncClosingFunction<? super V, U> function, Executor executor) { 768 checkNotNull(function); 769 AsyncFunction<V, U> applyFunction = 770 new AsyncFunction<V, U>() { 771 @Override 772 public ListenableFuture<U> apply(V input) throws Exception { 773 return closeables.applyAsyncClosingFunction(function, input); 774 } 775 776 @Override 777 public String toString() { 778 return function.toString(); 779 } 780 }; 781 return derive(future.transformAsync(applyFunction, executor)); 782 } 783 784 /** 785 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 786 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 787 * {@link ListenableFuture}. 788 * 789 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 790 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 791 * meets these conditions: 792 * 793 * <ul> 794 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 795 * DeferredCloser#eventuallyClose(Closeable, Executor)}. 796 * <li>It returns a {@link ListenableFuture}. 797 * </ul> 798 * 799 * <p>Example usage: 800 * 801 * <pre>{@code 802 * // Result.getRowsFuture() returns a ListenableFuture. 803 * ClosingFuture<List<Row>> rowsFuture = 804 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 805 * }</pre> 806 * 807 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 808 * ListenableFuture} with the value of a derived step 809 */ 810 public static <V, U> AsyncClosingFunction<V, U> withoutCloser( 811 final AsyncFunction<V, U> function) { 812 checkNotNull(function); 813 return new AsyncClosingFunction<V, U>() { 814 @Override 815 public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 816 return ClosingFuture.from(function.apply(input)); 817 } 818 }; 819 } 820 821 /** 822 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 823 * to its exception if it is an instance of a given exception type. The function can use a {@link 824 * DeferredCloser} to capture objects to be closed when the pipeline is done. 825 * 826 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 827 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 828 * one. 829 * 830 * <p>If the function throws an exception, that exception is used as the result of the derived 831 * {@code ClosingFuture}. 832 * 833 * <p>Example usage: 834 * 835 * <pre>{@code 836 * ClosingFuture<QueryResult> queryFuture = 837 * queryFuture.catching( 838 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 839 * }</pre> 840 * 841 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 842 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 843 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 844 * 845 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 846 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 847 * this {@code ClosingFuture}. 848 * 849 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 850 * type is matched against this step's exception. "This step's exception" means the cause of 851 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 852 * underlying this step or, if {@code get()} throws a different kind of exception, that 853 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 854 * prefer more specific types, avoiding {@code Throwable.class} in particular. 855 * @param fallback the function to be called if this step fails with the expected exception type. 856 * The function's argument is this step's exception. "This step's exception" means the cause 857 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 858 * underlying this step or, if {@code get()} throws a different kind of exception, that 859 * exception itself. 860 * @param executor the executor that runs {@code fallback} if the input fails 861 */ 862 public <X extends Throwable> ClosingFuture<V> catching( 863 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 864 return catchingMoreGeneric(exceptionType, fallback, executor); 865 } 866 867 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 868 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 869 Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 870 checkNotNull(fallback); 871 AsyncFunction<X, W> applyFallback = 872 new AsyncFunction<X, W>() { 873 @Override 874 public ListenableFuture<W> apply(X exception) throws Exception { 875 return closeables.applyClosingFunction(fallback, exception); 876 } 877 878 @Override 879 public String toString() { 880 return fallback.toString(); 881 } 882 }; 883 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 884 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 885 } 886 887 /** 888 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 889 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 890 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 891 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 892 * 893 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 894 * ClosingFuture} will be equivalent to the one returned by the function. 895 * 896 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 897 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 898 * one. 899 * 900 * <p>If the function throws an exception, that exception is used as the result of the derived 901 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 902 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 903 * closed. 904 * 905 * <p>Usage guidelines for this method: 906 * 907 * <ul> 908 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 909 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 910 * ClosingFunction, Executor)} instead, with a function that returns the next value 911 * directly. 912 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 913 * for every closeable object this step creates in order to capture it for later closing. 914 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 915 * ClosingFuture} call {@link #from(ListenableFuture)}. 916 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 917 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 918 * {@link #withoutCloser(AsyncFunction)} 919 * </ul> 920 * 921 * <p>Example usage: 922 * 923 * <pre>{@code 924 * // Fall back to a secondary input stream in case of IOException. 925 * ClosingFuture<InputStream> inputFuture = 926 * firstInputFuture.catchingAsync( 927 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 928 * } 929 * }</pre> 930 * 931 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 932 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 933 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 934 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 935 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 936 * responsible for completing the returned {@code ClosingFuture}.) 937 * 938 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 939 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 940 * this {@code ClosingFuture}. 941 * 942 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 943 * type is matched against this step's exception. "This step's exception" means the cause of 944 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 945 * underlying this step or, if {@code get()} throws a different kind of exception, that 946 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 947 * prefer more specific types, avoiding {@code Throwable.class} in particular. 948 * @param fallback the function to be called if this step fails with the expected exception type. 949 * The function's argument is this step's exception. "This step's exception" means the cause 950 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 951 * underlying this step or, if {@code get()} throws a different kind of exception, that 952 * exception itself. 953 * @param executor the executor that runs {@code fallback} if the input fails 954 */ 955 // TODO(dpb): Should this do something special if the function throws CancellationException or 956 // ExecutionException? 957 public <X extends Throwable> ClosingFuture<V> catchingAsync( 958 Class<X> exceptionType, 959 AsyncClosingFunction<? super X, ? extends V> fallback, 960 Executor executor) { 961 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 962 } 963 964 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 965 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 966 Class<X> exceptionType, 967 final AsyncClosingFunction<? super X, W> fallback, 968 Executor executor) { 969 checkNotNull(fallback); 970 AsyncFunction<X, W> asyncFunction = 971 new AsyncFunction<X, W>() { 972 @Override 973 public ListenableFuture<W> apply(X exception) throws Exception { 974 return closeables.applyAsyncClosingFunction(fallback, exception); 975 } 976 977 @Override 978 public String toString() { 979 return fallback.toString(); 980 } 981 }; 982 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 983 } 984 985 /** 986 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When the returned 987 * {@link Future} is done, all objects captured for closing during the pipeline's computation will 988 * be closed. 989 * 990 * <p>After calling this method, you may not call {@link 991 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 992 * derivation method on this {@code ClosingFuture}. 993 * 994 * @return a {@link Future} that represents the final value or exception of the pipeline 995 */ 996 public FluentFuture<V> finishToFuture() { 997 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 998 logger.log(FINER, "will close {0}", this); 999 future.addListener( 1000 new Runnable() { 1001 @Override 1002 public void run() { 1003 checkAndUpdateState(WILL_CLOSE, CLOSING); 1004 close(); 1005 checkAndUpdateState(CLOSING, CLOSED); 1006 } 1007 }, 1008 directExecutor()); 1009 } else { 1010 switch (state.get()) { 1011 case SUBSUMED: 1012 throw new IllegalStateException( 1013 "Cannot call finishToFuture() after deriving another step"); 1014 1015 case WILL_CREATE_VALUE_AND_CLOSER: 1016 throw new IllegalStateException( 1017 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 1018 1019 case WILL_CLOSE: 1020 case CLOSING: 1021 case CLOSED: 1022 throw new IllegalStateException("Cannot call finishToFuture() twice"); 1023 1024 case OPEN: 1025 throw new AssertionError(); 1026 } 1027 } 1028 return future; 1029 } 1030 1031 /** 1032 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 1033 * {@code receiver} will be called with an object that contains the result of the operation. The 1034 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 1035 * 1036 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 1037 * any other derivation method on this {@code ClosingFuture}. 1038 * 1039 * @param consumer a callback whose method will be called (using {@code executor}) when this 1040 * operation is done 1041 */ 1042 public void finishToValueAndCloser( 1043 final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 1044 checkNotNull(consumer); 1045 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 1046 switch (state.get()) { 1047 case SUBSUMED: 1048 throw new IllegalStateException( 1049 "Cannot call finishToValueAndCloser() after deriving another step"); 1050 1051 case WILL_CLOSE: 1052 case CLOSING: 1053 case CLOSED: 1054 throw new IllegalStateException( 1055 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1056 1057 case WILL_CREATE_VALUE_AND_CLOSER: 1058 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1059 1060 case OPEN: 1061 break; 1062 } 1063 throw new AssertionError(state); 1064 } 1065 future.addListener( 1066 new Runnable() { 1067 @Override 1068 public void run() { 1069 provideValueAndCloser(consumer, ClosingFuture.this); 1070 } 1071 }, 1072 executor); 1073 } 1074 1075 private static <C, V extends C> void provideValueAndCloser( 1076 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1077 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1078 } 1079 1080 /** 1081 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1082 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1083 * successful, and this step has not started when {@code cancel} is called, this step should never 1084 * run. 1085 * 1086 * <p>If successful, causes the objects captured by this step (if already started) and its input 1087 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1088 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1089 * 1090 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1091 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1092 * cancelled regardless 1093 * @return {@code false} if the step could not be cancelled, typically because it has already 1094 * completed normally; {@code true} otherwise 1095 */ 1096 @CanIgnoreReturnValue 1097 public boolean cancel(boolean mayInterruptIfRunning) { 1098 logger.log(FINER, "cancelling {0}", this); 1099 boolean cancelled = future.cancel(mayInterruptIfRunning); 1100 if (cancelled) { 1101 close(); 1102 } 1103 return cancelled; 1104 } 1105 1106 private void close() { 1107 logger.log(FINER, "closing {0}", this); 1108 closeables.close(); 1109 } 1110 1111 private <U> ClosingFuture<U> derive(FluentFuture<U> future) { 1112 ClosingFuture<U> derived = new ClosingFuture<>(future); 1113 becomeSubsumedInto(derived.closeables); 1114 return derived; 1115 } 1116 1117 private void becomeSubsumedInto(CloseableList otherCloseables) { 1118 checkAndUpdateState(OPEN, SUBSUMED); 1119 otherCloseables.add(closeables, directExecutor()); 1120 } 1121 1122 /** 1123 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1124 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1125 * 1126 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1127 */ 1128 public static final class Peeker { 1129 private final ImmutableList<ClosingFuture<?>> futures; 1130 private volatile boolean beingCalled; 1131 1132 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1133 this.futures = checkNotNull(futures); 1134 } 1135 1136 /** 1137 * Returns the value of {@code closingFuture}. 1138 * 1139 * @throws ExecutionException if {@code closingFuture} is a failed step 1140 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1141 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1142 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1143 * @throws IllegalStateException if called outside of a call to {@link 1144 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1145 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1146 */ 1147 @NullableDecl 1148 public final <D extends Object> D getDone(ClosingFuture<D> closingFuture) 1149 throws ExecutionException { 1150 checkState(beingCalled); 1151 checkArgument(futures.contains(closingFuture)); 1152 return Futures.getDone(closingFuture.future); 1153 } 1154 1155 @NullableDecl 1156 private <V extends Object> V call(CombiningCallable<V> combiner, CloseableList closeables) 1157 throws Exception { 1158 beingCalled = true; 1159 CloseableList newCloseables = new CloseableList(); 1160 try { 1161 return combiner.call(newCloseables.closer, this); 1162 } finally { 1163 closeables.add(newCloseables, directExecutor()); 1164 beingCalled = false; 1165 } 1166 } 1167 1168 private <V extends Object> FluentFuture<V> callAsync( 1169 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1170 beingCalled = true; 1171 CloseableList newCloseables = new CloseableList(); 1172 try { 1173 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1174 closingFuture.becomeSubsumedInto(closeables); 1175 return closingFuture.future; 1176 } finally { 1177 closeables.add(newCloseables, directExecutor()); 1178 beingCalled = false; 1179 } 1180 } 1181 } 1182 1183 /** 1184 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1185 * 1186 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1187 * instantiate this class. 1188 * 1189 * <p>Example: 1190 * 1191 * <pre>{@code 1192 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1193 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1194 * ListenableFuture<Integer> numberOfDifferentLines = 1195 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1196 * .call( 1197 * (closer, peeker) -> { 1198 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1199 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1200 * return countDifferentLines(file1Reader, file2Reader); 1201 * }, 1202 * executor) 1203 * .closing(executor); 1204 * }</pre> 1205 */ 1206 // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8. 1207 @com.google.errorprone.annotations.DoNotMock( 1208 "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1209 public static class Combiner { 1210 1211 private final CloseableList closeables = new CloseableList(); 1212 1213 /** 1214 * An operation that returns a result and may throw an exception. 1215 * 1216 * @param <V> the type of the result 1217 */ 1218 public interface CombiningCallable<V extends Object> { 1219 /** 1220 * Computes a result, or throws an exception if unable to do so. 1221 * 1222 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1223 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1224 * is done (but not before this method completes), even if this method throws or the pipeline 1225 * is cancelled. 1226 * 1227 * @param peeker used to get the value of any of the input futures 1228 */ 1229 @NullableDecl 1230 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1231 } 1232 1233 /** 1234 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1235 * 1236 * @param <V> the type of the result 1237 */ 1238 public interface AsyncCombiningCallable<V extends Object> { 1239 /** 1240 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1241 * 1242 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1243 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1244 * is done (but not before this method completes), even if this method throws or the pipeline 1245 * is cancelled. 1246 * 1247 * @param peeker used to get the value of any of the input futures 1248 */ 1249 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1250 } 1251 1252 private final boolean allMustSucceed; 1253 protected final ImmutableList<ClosingFuture<?>> inputs; 1254 1255 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1256 this.allMustSucceed = allMustSucceed; 1257 this.inputs = ImmutableList.copyOf(inputs); 1258 for (ClosingFuture<?> input : inputs) { 1259 input.becomeSubsumedInto(closeables); 1260 } 1261 } 1262 1263 /** 1264 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1265 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1266 * objects to be closed when the pipeline is done. 1267 * 1268 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1269 * fail, so will the returned step. 1270 * 1271 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1272 * cancelled. 1273 * 1274 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1275 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1276 */ 1277 public <V> ClosingFuture<V> call( 1278 final CombiningCallable<V> combiningCallable, Executor executor) { 1279 Callable<V> callable = 1280 new Callable<V>() { 1281 @Override 1282 public V call() throws Exception { 1283 return new Peeker(inputs).call(combiningCallable, closeables); 1284 } 1285 1286 @Override 1287 public String toString() { 1288 return combiningCallable.toString(); 1289 } 1290 }; 1291 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1292 derived.closeables.add(closeables, directExecutor()); 1293 return derived; 1294 } 1295 1296 /** 1297 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1298 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1299 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1300 * captured by the returned {@link ClosingFuture}). 1301 * 1302 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1303 * fail, so will the returned step. 1304 * 1305 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1306 * cancelled. 1307 * 1308 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1309 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1310 * 1311 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1312 * derived step. 1313 * 1314 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1315 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1316 * 1317 * <p>Usage guidelines for this method: 1318 * 1319 * <ul> 1320 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1321 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1322 * Executor)} instead, with a function that returns the next value directly. 1323 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1324 * closer.eventuallyClose()} for every closeable object this step creates in order to 1325 * capture it for later closing. 1326 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1327 * ClosingFuture} call {@link #from(ListenableFuture)}. 1328 * </ul> 1329 * 1330 * <p>The same warnings about doing heavyweight operations within {@link 1331 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1332 */ 1333 public <V> ClosingFuture<V> callAsync( 1334 final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1335 AsyncCallable<V> asyncCallable = 1336 new AsyncCallable<V>() { 1337 @Override 1338 public ListenableFuture<V> call() throws Exception { 1339 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1340 } 1341 1342 @Override 1343 public String toString() { 1344 return combiningCallable.toString(); 1345 } 1346 }; 1347 ClosingFuture<V> derived = 1348 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1349 derived.closeables.add(closeables, directExecutor()); 1350 return derived; 1351 } 1352 1353 private FutureCombiner<Object> futureCombiner() { 1354 return allMustSucceed 1355 ? Futures.whenAllSucceed(inputFutures()) 1356 : Futures.whenAllComplete(inputFutures()); 1357 } 1358 1359 private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE = 1360 new Function<ClosingFuture<?>, FluentFuture<?>>() { 1361 @Override 1362 public FluentFuture<?> apply(ClosingFuture<?> future) { 1363 return future.future; 1364 } 1365 }; 1366 1367 private ImmutableList<FluentFuture<?>> inputFutures() { 1368 return FluentIterable.from(inputs).transform(INNER_FUTURE).toList(); 1369 } 1370 } 1371 1372 /** 1373 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1374 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1375 * combination. 1376 * 1377 * @param <V1> the type returned by the first future 1378 * @param <V2> the type returned by the second future 1379 */ 1380 public static final class Combiner2<V1 extends Object, V2 extends Object> extends Combiner { 1381 1382 /** 1383 * A function that returns a value when applied to the values of the two futures passed to 1384 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1385 * 1386 * @param <V1> the type returned by the first future 1387 * @param <V2> the type returned by the second future 1388 * @param <U> the type returned by the function 1389 */ 1390 public interface ClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> { 1391 1392 /** 1393 * Applies this function to two inputs, or throws an exception if unable to do so. 1394 * 1395 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1396 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1397 * is done (but not before this method completes), even if this method throws or the pipeline 1398 * is cancelled. 1399 */ 1400 @NullableDecl 1401 U apply(DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2) 1402 throws Exception; 1403 } 1404 1405 /** 1406 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1407 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1408 * 1409 * @param <V1> the type returned by the first future 1410 * @param <V2> the type returned by the second future 1411 * @param <U> the type returned by the function 1412 */ 1413 public interface AsyncClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> { 1414 1415 /** 1416 * Applies this function to two inputs, or throws an exception if unable to do so. 1417 * 1418 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1419 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1420 * is done (but not before this method completes), even if this method throws or the pipeline 1421 * is cancelled. 1422 */ 1423 ClosingFuture<U> apply( 1424 DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2) throws Exception; 1425 } 1426 1427 private final ClosingFuture<V1> future1; 1428 private final ClosingFuture<V2> future2; 1429 1430 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1431 super(true, ImmutableList.of(future1, future2)); 1432 this.future1 = future1; 1433 this.future2 = future2; 1434 } 1435 1436 /** 1437 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1438 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1439 * objects to be closed when the pipeline is done. 1440 * 1441 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1442 * any of the inputs fail, so will the returned step. 1443 * 1444 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1445 * 1446 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1447 * ExecutionException} will be extracted and used as the failure of the derived step. 1448 */ 1449 public <U extends Object> ClosingFuture<U> call( 1450 final ClosingFunction2<V1, V2, U> function, Executor executor) { 1451 return call( 1452 new CombiningCallable<U>() { 1453 @Override 1454 @NullableDecl 1455 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1456 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1457 } 1458 1459 @Override 1460 public String toString() { 1461 return function.toString(); 1462 } 1463 }, 1464 executor); 1465 } 1466 1467 /** 1468 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1469 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1470 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1471 * captured by the returned {@link ClosingFuture}). 1472 * 1473 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1474 * any of the inputs fail, so will the returned step. 1475 * 1476 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1477 * 1478 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1479 * ExecutionException} will be extracted and used as the failure of the derived step. 1480 * 1481 * <p>If the function throws any other exception, it will be used as the failure of the derived 1482 * step. 1483 * 1484 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1485 * the closeable objects in that {@code ClosingFuture} will be closed. 1486 * 1487 * <p>Usage guidelines for this method: 1488 * 1489 * <ul> 1490 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1491 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1492 * Executor)} instead, with a function that returns the next value directly. 1493 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1494 * closer.eventuallyClose()} for every closeable object this step creates in order to 1495 * capture it for later closing. 1496 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1497 * ClosingFuture} call {@link #from(ListenableFuture)}. 1498 * </ul> 1499 * 1500 * <p>The same warnings about doing heavyweight operations within {@link 1501 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1502 */ 1503 public <U extends Object> ClosingFuture<U> callAsync( 1504 final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1505 return callAsync( 1506 new AsyncCombiningCallable<U>() { 1507 @Override 1508 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1509 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1510 } 1511 1512 @Override 1513 public String toString() { 1514 return function.toString(); 1515 } 1516 }, 1517 executor); 1518 } 1519 } 1520 1521 /** 1522 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1523 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1524 * ClosingFuture)} to start this combination. 1525 * 1526 * @param <V1> the type returned by the first future 1527 * @param <V2> the type returned by the second future 1528 * @param <V3> the type returned by the third future 1529 */ 1530 public static final class Combiner3<V1 extends Object, V2 extends Object, V3 extends Object> 1531 extends Combiner { 1532 /** 1533 * A function that returns a value when applied to the values of the three futures passed to 1534 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1535 * 1536 * @param <V1> the type returned by the first future 1537 * @param <V2> the type returned by the second future 1538 * @param <V3> the type returned by the third future 1539 * @param <U> the type returned by the function 1540 */ 1541 public interface ClosingFunction3< 1542 V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> { 1543 /** 1544 * Applies this function to three inputs, or throws an exception if unable to do so. 1545 * 1546 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1547 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1548 * is done (but not before this method completes), even if this method throws or the pipeline 1549 * is cancelled. 1550 */ 1551 @NullableDecl 1552 U apply( 1553 DeferredCloser closer, 1554 @NullableDecl V1 value1, 1555 @NullableDecl V2 value2, 1556 @NullableDecl V3 v3) 1557 throws Exception; 1558 } 1559 1560 /** 1561 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1562 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1563 * 1564 * @param <V1> the type returned by the first future 1565 * @param <V2> the type returned by the second future 1566 * @param <V3> the type returned by the third future 1567 * @param <U> the type returned by the function 1568 */ 1569 public interface AsyncClosingFunction3< 1570 V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> { 1571 /** 1572 * Applies this function to three inputs, or throws an exception if unable to do so. 1573 * 1574 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1575 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1576 * is done (but not before this method completes), even if this method throws or the pipeline 1577 * is cancelled. 1578 */ 1579 ClosingFuture<U> apply( 1580 DeferredCloser closer, 1581 @NullableDecl V1 value1, 1582 @NullableDecl V2 value2, 1583 @NullableDecl V3 value3) 1584 throws Exception; 1585 } 1586 1587 private final ClosingFuture<V1> future1; 1588 private final ClosingFuture<V2> future2; 1589 private final ClosingFuture<V3> future3; 1590 1591 private Combiner3( 1592 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1593 super(true, ImmutableList.of(future1, future2, future3)); 1594 this.future1 = future1; 1595 this.future2 = future2; 1596 this.future3 = future3; 1597 } 1598 1599 /** 1600 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1601 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1602 * objects to be closed when the pipeline is done. 1603 * 1604 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1605 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1606 * 1607 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1608 * 1609 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1610 * ExecutionException} will be extracted and used as the failure of the derived step. 1611 */ 1612 public <U extends Object> ClosingFuture<U> call( 1613 final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1614 return call( 1615 new CombiningCallable<U>() { 1616 @Override 1617 @NullableDecl 1618 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1619 return function.apply( 1620 closer, 1621 peeker.getDone(future1), 1622 peeker.getDone(future2), 1623 peeker.getDone(future3)); 1624 } 1625 1626 @Override 1627 public String toString() { 1628 return function.toString(); 1629 } 1630 }, 1631 executor); 1632 } 1633 1634 /** 1635 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1636 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1637 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1638 * captured by the returned {@link ClosingFuture}). 1639 * 1640 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1641 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1642 * 1643 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1644 * 1645 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1646 * ExecutionException} will be extracted and used as the failure of the derived step. 1647 * 1648 * <p>If the function throws any other exception, it will be used as the failure of the derived 1649 * step. 1650 * 1651 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1652 * the closeable objects in that {@code ClosingFuture} will be closed. 1653 * 1654 * <p>Usage guidelines for this method: 1655 * 1656 * <ul> 1657 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1658 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1659 * Executor)} instead, with a function that returns the next value directly. 1660 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1661 * closer.eventuallyClose()} for every closeable object this step creates in order to 1662 * capture it for later closing. 1663 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1664 * ClosingFuture} call {@link #from(ListenableFuture)}. 1665 * </ul> 1666 * 1667 * <p>The same warnings about doing heavyweight operations within {@link 1668 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1669 */ 1670 public <U extends Object> ClosingFuture<U> callAsync( 1671 final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1672 return callAsync( 1673 new AsyncCombiningCallable<U>() { 1674 @Override 1675 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1676 return function.apply( 1677 closer, 1678 peeker.getDone(future1), 1679 peeker.getDone(future2), 1680 peeker.getDone(future3)); 1681 } 1682 1683 @Override 1684 public String toString() { 1685 return function.toString(); 1686 } 1687 }, 1688 executor); 1689 } 1690 } 1691 1692 /** 1693 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1694 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1695 * ClosingFuture)} to start this combination. 1696 * 1697 * @param <V1> the type returned by the first future 1698 * @param <V2> the type returned by the second future 1699 * @param <V3> the type returned by the third future 1700 * @param <V4> the type returned by the fourth future 1701 */ 1702 public static final class Combiner4< 1703 V1 extends Object, V2 extends Object, V3 extends Object, V4 extends Object> 1704 extends Combiner { 1705 /** 1706 * A function that returns a value when applied to the values of the four futures passed to 1707 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1708 * 1709 * @param <V1> the type returned by the first future 1710 * @param <V2> the type returned by the second future 1711 * @param <V3> the type returned by the third future 1712 * @param <V4> the type returned by the fourth future 1713 * @param <U> the type returned by the function 1714 */ 1715 public interface ClosingFunction4< 1716 V1 extends Object, 1717 V2 extends Object, 1718 V3 extends Object, 1719 V4 extends Object, 1720 U extends Object> { 1721 /** 1722 * Applies this function to four inputs, or throws an exception if unable to do so. 1723 * 1724 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1725 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1726 * is done (but not before this method completes), even if this method throws or the pipeline 1727 * is cancelled. 1728 */ 1729 @NullableDecl 1730 U apply( 1731 DeferredCloser closer, 1732 @NullableDecl V1 value1, 1733 @NullableDecl V2 value2, 1734 @NullableDecl V3 value3, 1735 @NullableDecl V4 value4) 1736 throws Exception; 1737 } 1738 1739 /** 1740 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1741 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1742 * ClosingFuture)}. 1743 * 1744 * @param <V1> the type returned by the first future 1745 * @param <V2> the type returned by the second future 1746 * @param <V3> the type returned by the third future 1747 * @param <V4> the type returned by the fourth future 1748 * @param <U> the type returned by the function 1749 */ 1750 public interface AsyncClosingFunction4< 1751 V1 extends Object, 1752 V2 extends Object, 1753 V3 extends Object, 1754 V4 extends Object, 1755 U extends Object> { 1756 /** 1757 * Applies this function to four inputs, or throws an exception if unable to do so. 1758 * 1759 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1760 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1761 * is done (but not before this method completes), even if this method throws or the pipeline 1762 * is cancelled. 1763 */ 1764 ClosingFuture<U> apply( 1765 DeferredCloser closer, 1766 @NullableDecl V1 value1, 1767 @NullableDecl V2 value2, 1768 @NullableDecl V3 value3, 1769 @NullableDecl V4 value4) 1770 throws Exception; 1771 } 1772 1773 private final ClosingFuture<V1> future1; 1774 private final ClosingFuture<V2> future2; 1775 private final ClosingFuture<V3> future3; 1776 private final ClosingFuture<V4> future4; 1777 1778 private Combiner4( 1779 ClosingFuture<V1> future1, 1780 ClosingFuture<V2> future2, 1781 ClosingFuture<V3> future3, 1782 ClosingFuture<V4> future4) { 1783 super(true, ImmutableList.of(future1, future2, future3, future4)); 1784 this.future1 = future1; 1785 this.future2 = future2; 1786 this.future3 = future3; 1787 this.future4 = future4; 1788 } 1789 1790 /** 1791 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1792 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1793 * objects to be closed when the pipeline is done. 1794 * 1795 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1796 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1797 * 1798 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1799 * 1800 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1801 * ExecutionException} will be extracted and used as the failure of the derived step. 1802 */ 1803 public <U extends Object> ClosingFuture<U> call( 1804 final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1805 return call( 1806 new CombiningCallable<U>() { 1807 @Override 1808 @NullableDecl 1809 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1810 return function.apply( 1811 closer, 1812 peeker.getDone(future1), 1813 peeker.getDone(future2), 1814 peeker.getDone(future3), 1815 peeker.getDone(future4)); 1816 } 1817 1818 @Override 1819 public String toString() { 1820 return function.toString(); 1821 } 1822 }, 1823 executor); 1824 } 1825 1826 /** 1827 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1828 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1829 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1830 * captured by the returned {@link ClosingFuture}). 1831 * 1832 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1833 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1834 * 1835 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1836 * 1837 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1838 * ExecutionException} will be extracted and used as the failure of the derived step. 1839 * 1840 * <p>If the function throws any other exception, it will be used as the failure of the derived 1841 * step. 1842 * 1843 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1844 * the closeable objects in that {@code ClosingFuture} will be closed. 1845 * 1846 * <p>Usage guidelines for this method: 1847 * 1848 * <ul> 1849 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1850 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1851 * Executor)} instead, with a function that returns the next value directly. 1852 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1853 * closer.eventuallyClose()} for every closeable object this step creates in order to 1854 * capture it for later closing. 1855 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1856 * ClosingFuture} call {@link #from(ListenableFuture)}. 1857 * </ul> 1858 * 1859 * <p>The same warnings about doing heavyweight operations within {@link 1860 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1861 */ 1862 public <U extends Object> ClosingFuture<U> callAsync( 1863 final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1864 return callAsync( 1865 new AsyncCombiningCallable<U>() { 1866 @Override 1867 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1868 return function.apply( 1869 closer, 1870 peeker.getDone(future1), 1871 peeker.getDone(future2), 1872 peeker.getDone(future3), 1873 peeker.getDone(future4)); 1874 } 1875 1876 @Override 1877 public String toString() { 1878 return function.toString(); 1879 } 1880 }, 1881 executor); 1882 } 1883 } 1884 1885 /** 1886 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1887 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1888 * ClosingFuture, ClosingFuture)} to start this combination. 1889 * 1890 * @param <V1> the type returned by the first future 1891 * @param <V2> the type returned by the second future 1892 * @param <V3> the type returned by the third future 1893 * @param <V4> the type returned by the fourth future 1894 * @param <V5> the type returned by the fifth future 1895 */ 1896 public static final class Combiner5< 1897 V1 extends Object, 1898 V2 extends Object, 1899 V3 extends Object, 1900 V4 extends Object, 1901 V5 extends Object> 1902 extends Combiner { 1903 /** 1904 * A function that returns a value when applied to the values of the five futures passed to 1905 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1906 * ClosingFuture)}. 1907 * 1908 * @param <V1> the type returned by the first future 1909 * @param <V2> the type returned by the second future 1910 * @param <V3> the type returned by the third future 1911 * @param <V4> the type returned by the fourth future 1912 * @param <V5> the type returned by the fifth future 1913 * @param <U> the type returned by the function 1914 */ 1915 public interface ClosingFunction5< 1916 V1 extends Object, 1917 V2 extends Object, 1918 V3 extends Object, 1919 V4 extends Object, 1920 V5 extends Object, 1921 U extends Object> { 1922 /** 1923 * Applies this function to five inputs, or throws an exception if unable to do so. 1924 * 1925 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1926 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1927 * is done (but not before this method completes), even if this method throws or the pipeline 1928 * is cancelled. 1929 */ 1930 @NullableDecl 1931 U apply( 1932 DeferredCloser closer, 1933 @NullableDecl V1 value1, 1934 @NullableDecl V2 value2, 1935 @NullableDecl V3 value3, 1936 @NullableDecl V4 value4, 1937 @NullableDecl V5 value5) 1938 throws Exception; 1939 } 1940 1941 /** 1942 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1943 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1944 * ClosingFuture, ClosingFuture)}. 1945 * 1946 * @param <V1> the type returned by the first future 1947 * @param <V2> the type returned by the second future 1948 * @param <V3> the type returned by the third future 1949 * @param <V4> the type returned by the fourth future 1950 * @param <V5> the type returned by the fifth future 1951 * @param <U> the type returned by the function 1952 */ 1953 public interface AsyncClosingFunction5< 1954 V1 extends Object, 1955 V2 extends Object, 1956 V3 extends Object, 1957 V4 extends Object, 1958 V5 extends Object, 1959 U extends Object> { 1960 /** 1961 * Applies this function to five inputs, or throws an exception if unable to do so. 1962 * 1963 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1964 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1965 * is done (but not before this method completes), even if this method throws or the pipeline 1966 * is cancelled. 1967 */ 1968 ClosingFuture<U> apply( 1969 DeferredCloser closer, 1970 @NullableDecl V1 value1, 1971 @NullableDecl V2 value2, 1972 @NullableDecl V3 value3, 1973 @NullableDecl V4 value4, 1974 @NullableDecl V5 value5) 1975 throws Exception; 1976 } 1977 1978 private final ClosingFuture<V1> future1; 1979 private final ClosingFuture<V2> future2; 1980 private final ClosingFuture<V3> future3; 1981 private final ClosingFuture<V4> future4; 1982 private final ClosingFuture<V5> future5; 1983 1984 private Combiner5( 1985 ClosingFuture<V1> future1, 1986 ClosingFuture<V2> future2, 1987 ClosingFuture<V3> future3, 1988 ClosingFuture<V4> future4, 1989 ClosingFuture<V5> future5) { 1990 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 1991 this.future1 = future1; 1992 this.future2 = future2; 1993 this.future3 = future3; 1994 this.future4 = future4; 1995 this.future5 = future5; 1996 } 1997 1998 /** 1999 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2000 * combining function to their values. The function can use a {@link DeferredCloser} to capture 2001 * objects to be closed when the pipeline is done. 2002 * 2003 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2004 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2005 * returned step. 2006 * 2007 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2008 * 2009 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2010 * ExecutionException} will be extracted and used as the failure of the derived step. 2011 */ 2012 public <U extends Object> ClosingFuture<U> call( 2013 final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2014 return call( 2015 new CombiningCallable<U>() { 2016 @Override 2017 @NullableDecl 2018 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 2019 return function.apply( 2020 closer, 2021 peeker.getDone(future1), 2022 peeker.getDone(future2), 2023 peeker.getDone(future3), 2024 peeker.getDone(future4), 2025 peeker.getDone(future5)); 2026 } 2027 2028 @Override 2029 public String toString() { 2030 return function.toString(); 2031 } 2032 }, 2033 executor); 2034 } 2035 2036 /** 2037 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2038 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 2039 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 2040 * captured by the returned {@link ClosingFuture}). 2041 * 2042 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2043 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2044 * returned step. 2045 * 2046 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2047 * 2048 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2049 * ExecutionException} will be extracted and used as the failure of the derived step. 2050 * 2051 * <p>If the function throws any other exception, it will be used as the failure of the derived 2052 * step. 2053 * 2054 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2055 * the closeable objects in that {@code ClosingFuture} will be closed. 2056 * 2057 * <p>Usage guidelines for this method: 2058 * 2059 * <ul> 2060 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2061 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2062 * Executor)} instead, with a function that returns the next value directly. 2063 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 2064 * closer.eventuallyClose()} for every closeable object this step creates in order to 2065 * capture it for later closing. 2066 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2067 * ClosingFuture} call {@link #from(ListenableFuture)}. 2068 * </ul> 2069 * 2070 * <p>The same warnings about doing heavyweight operations within {@link 2071 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2072 */ 2073 public <U extends Object> ClosingFuture<U> callAsync( 2074 final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2075 return callAsync( 2076 new AsyncCombiningCallable<U>() { 2077 @Override 2078 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2079 return function.apply( 2080 closer, 2081 peeker.getDone(future1), 2082 peeker.getDone(future2), 2083 peeker.getDone(future3), 2084 peeker.getDone(future4), 2085 peeker.getDone(future5)); 2086 } 2087 2088 @Override 2089 public String toString() { 2090 return function.toString(); 2091 } 2092 }, 2093 executor); 2094 } 2095 } 2096 2097 @Override 2098 public String toString() { 2099 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2100 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2101 } 2102 2103 @Override 2104 protected void finalize() { 2105 if (state.get().equals(OPEN)) { 2106 logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2107 FluentFuture<V> unused = finishToFuture(); 2108 } 2109 } 2110 2111 private static void closeQuietly(final Closeable closeable, Executor executor) { 2112 if (closeable == null) { 2113 return; 2114 } 2115 try { 2116 executor.execute( 2117 new Runnable() { 2118 @Override 2119 public void run() { 2120 try { 2121 closeable.close(); 2122 } catch (IOException | RuntimeException e) { 2123 logger.log(WARNING, "thrown by close()", e); 2124 } 2125 } 2126 }); 2127 } catch (RejectedExecutionException e) { 2128 if (logger.isLoggable(WARNING)) { 2129 logger.log( 2130 WARNING, String.format("while submitting close to %s; will close inline", executor), e); 2131 } 2132 closeQuietly(closeable, directExecutor()); 2133 } 2134 } 2135 2136 private void checkAndUpdateState(State oldState, State newState) { 2137 checkState( 2138 compareAndUpdateState(oldState, newState), 2139 "Expected state to be %s, but it was %s", 2140 oldState, 2141 newState); 2142 } 2143 2144 private boolean compareAndUpdateState(State oldState, State newState) { 2145 return state.compareAndSet(oldState, newState); 2146 } 2147 2148 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2149 private static final class CloseableList extends IdentityHashMap<Closeable, Executor> 2150 implements Closeable { 2151 private final DeferredCloser closer = new DeferredCloser(this); 2152 private volatile boolean closed; 2153 private volatile CountDownLatch whenClosed; 2154 2155 <V, U> ListenableFuture<U> applyClosingFunction( 2156 ClosingFunction<? super V, U> transformation, V input) throws Exception { 2157 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2158 CloseableList newCloseables = new CloseableList(); 2159 try { 2160 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2161 } finally { 2162 add(newCloseables, directExecutor()); 2163 } 2164 } 2165 2166 <V, U> FluentFuture<U> applyAsyncClosingFunction( 2167 AsyncClosingFunction<V, U> transformation, V input) throws Exception { 2168 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2169 CloseableList newCloseables = new CloseableList(); 2170 try { 2171 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2172 closingFuture.becomeSubsumedInto(newCloseables); 2173 return closingFuture.future; 2174 } finally { 2175 add(newCloseables, directExecutor()); 2176 } 2177 } 2178 2179 @Override 2180 public void close() { 2181 if (closed) { 2182 return; 2183 } 2184 synchronized (this) { 2185 if (closed) { 2186 return; 2187 } 2188 closed = true; 2189 } 2190 for (Map.Entry<Closeable, Executor> entry : entrySet()) { 2191 closeQuietly(entry.getKey(), entry.getValue()); 2192 } 2193 clear(); 2194 if (whenClosed != null) { 2195 whenClosed.countDown(); 2196 } 2197 } 2198 2199 void add(@NullableDecl Closeable closeable, Executor executor) { 2200 checkNotNull(executor); 2201 if (closeable == null) { 2202 return; 2203 } 2204 synchronized (this) { 2205 if (!closed) { 2206 put(closeable, executor); 2207 return; 2208 } 2209 } 2210 closeQuietly(closeable, executor); 2211 } 2212 2213 /** 2214 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2215 */ 2216 CountDownLatch whenClosedCountDown() { 2217 if (closed) { 2218 return new CountDownLatch(0); 2219 } 2220 synchronized (this) { 2221 if (closed) { 2222 return new CountDownLatch(0); 2223 } 2224 checkState(whenClosed == null); 2225 return whenClosed = new CountDownLatch(1); 2226 } 2227 } 2228 } 2229 2230 /** 2231 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2232 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2233 */ 2234 @VisibleForTesting 2235 CountDownLatch whenClosedCountDown() { 2236 return closeables.whenClosedCountDown(); 2237 } 2238 2239 /** The state of a {@link CloseableList}. */ 2240 enum State { 2241 /** The {@link CloseableList} has not been subsumed or closed. */ 2242 OPEN, 2243 2244 /** 2245 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2246 * into any other. 2247 */ 2248 SUBSUMED, 2249 2250 /** 2251 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2252 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2253 */ 2254 WILL_CLOSE, 2255 2256 /** 2257 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2258 * {@link CloseableList} may not be subsumed. 2259 */ 2260 CLOSING, 2261 2262 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2263 CLOSED, 2264 2265 /** 2266 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2267 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2268 */ 2269 WILL_CREATE_VALUE_AND_CLOSER, 2270 } 2271}