001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035import static java.util.logging.Level.FINER; 036import static java.util.logging.Level.SEVERE; 037import static java.util.logging.Level.WARNING; 038 039import com.google.common.annotations.J2ktIncompatible; 040import com.google.common.annotations.VisibleForTesting; 041import com.google.common.collect.FluentIterable; 042import com.google.common.collect.ImmutableList; 043import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 044import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 045import com.google.common.util.concurrent.Futures.FutureCombiner; 046import com.google.errorprone.annotations.CanIgnoreReturnValue; 047import com.google.errorprone.annotations.DoNotMock; 048import com.google.j2objc.annotations.RetainedWith; 049import java.io.Closeable; 050import java.io.IOException; 051import java.util.IdentityHashMap; 052import java.util.Map; 053import java.util.concurrent.Callable; 054import java.util.concurrent.CancellationException; 055import java.util.concurrent.CountDownLatch; 056import java.util.concurrent.ExecutionException; 057import java.util.concurrent.Executor; 058import java.util.concurrent.Future; 059import java.util.concurrent.RejectedExecutionException; 060import java.util.concurrent.atomic.AtomicReference; 061import java.util.logging.Logger; 062import javax.annotation.CheckForNull; 063import org.checkerframework.checker.nullness.qual.Nullable; 064 065/** 066 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 067 * complete, some objects captured during the computation are closed. 068 * 069 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 070 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 071 * cancellation of the operation so far. The only way to extract the value or exception from a step 072 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 073 * "value" of a successful step or the "result" (value or exception) of any step. 074 * 075 * <ol> 076 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 077 * block or a {@link ListenableFuture}. 078 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 079 * can be captured for later closing. 080 * <li>There is one last step (the root of the tree), from which you can extract the final result 081 * of the computation. After that result is available (or the computation fails), all objects 082 * captured by any of the steps in the pipeline are closed. 083 * </ol> 084 * 085 * <h3>Starting a pipeline</h3> 086 * 087 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 088 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 089 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 090 * #from(ListenableFuture)} instead. 091 * 092 * <h3>Derived steps</h3> 093 * 094 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 095 * ways similar to {@link FluentFuture}s: 096 * 097 * <ul> 098 * <li>by transforming the value from a successful input step, 099 * <li>by catching the exception from a failed input step, or 100 * <li>by combining the results of several input steps. 101 * </ul> 102 * 103 * Each derivation can capture the next value or any intermediate objects for later closing. 104 * 105 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 106 * exception, or combine it with others, you cannot do anything else with it, including declare it 107 * to be the last step of the pipeline. 108 * 109 * <h4>Transforming</h4> 110 * 111 * To derive the next step by asynchronously applying a function to an input step's value, call 112 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 113 * Executor)} on the input step. 114 * 115 * <h4>Catching</h4> 116 * 117 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 118 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 119 * 120 * <h4>Combining</h4> 121 * 122 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 123 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 124 * 125 * <h3>Cancelling</h3> 126 * 127 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 128 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 129 * successfully cancelled step will immediately start closing all objects captured for later closing 130 * by it and by its input steps. 131 * 132 * <h3>Ending a pipeline</h3> 133 * 134 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 135 * close the captured objects automatically or manually. 136 * 137 * <h4>Automatically closing</h4> 138 * 139 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 140 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin 141 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future 142 * completes before closing starts, rather than once it has finished. 143 * 144 * <pre>{@code 145 * FluentFuture<UserName> userName = 146 * ClosingFuture.submit( 147 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 148 * executor) 149 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 150 * .transform((closer, result) -> result.get("userName"), directExecutor()) 151 * .catching(DBException.class, e -> "no user", directExecutor()) 152 * .finishToFuture(); 153 * }</pre> 154 * 155 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 156 * result cursor will both be closed, even if the operation is cancelled or fails. 157 * 158 * <h4>Manually closing</h4> 159 * 160 * If you want to close the captured objects manually, after you've used the final result, call 161 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 162 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 163 * 164 * <pre>{@code 165 * ClosingFuture.submit( 166 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 167 * executor) 168 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 169 * .transform((closer, result) -> result.get("userName"), directExecutor()) 170 * .catching(DBException.class, e -> "no user", directExecutor()) 171 * .finishToValueAndCloser( 172 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 173 * 174 * // later 175 * try { // get() will throw if the operation failed or was cancelled. 176 * UserName userName = userNameValueAndCloser.get(); 177 * // do something with userName 178 * } finally { 179 * userNameValueAndCloser.closeAsync(); 180 * } 181 * }</pre> 182 * 183 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 184 * the query result cursor will both be closed, even if the operation is cancelled or fails. 185 * 186 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 187 * automatic-closing approach described above is safer. 188 * 189 * @param <V> the type of the value of this step 190 * @since 30.0 191 */ 192// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 193@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 194@J2ktIncompatible 195@ElementTypesAreNonnullByDefault 196// TODO(dpb): GWT compatibility. 197public final class ClosingFuture<V extends @Nullable Object> { 198 199 private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName()); 200 201 /** 202 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 203 * done. 204 */ 205 public static final class DeferredCloser { 206 @RetainedWith private final CloseableList list; 207 208 DeferredCloser(CloseableList list) { 209 this.list = list; 210 } 211 212 /** 213 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 214 * 215 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 216 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 217 * Closeable}. (For more about the flavors, see <a 218 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 219 * build</a>.) 220 * 221 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 222 * building for Android): Ensure that any object you pass implements the interface not just in 223 * your current SDK version but also at the oldest version you support. For example, <a 224 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 225 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 226 * {@code Closeable} with a method reference like {@code cursor::close}. 227 * 228 * <p>Note that this method is still binary-compatible between flavors because the erasure of 229 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 230 * 231 * @param closeable the object to be closed (see notes above) 232 * @param closingExecutor the object will be closed on this executor 233 * @return the first argument 234 */ 235 @CanIgnoreReturnValue 236 @ParametricNullness 237 // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19. 238 public <C extends @Nullable Object & @Nullable Closeable> C eventuallyClose( 239 @ParametricNullness C closeable, Executor closingExecutor) { 240 checkNotNull(closingExecutor); 241 if (closeable != null) { 242 list.add(closeable, closingExecutor); 243 } 244 return closeable; 245 } 246 } 247 248 /** 249 * An operation that computes a result. 250 * 251 * @param <V> the type of the result 252 */ 253 public interface ClosingCallable<V extends @Nullable Object> { 254 /** 255 * Computes a result, or throws an exception if unable to do so. 256 * 257 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 258 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 259 * not before this method completes), even if this method throws or the pipeline is cancelled. 260 */ 261 @ParametricNullness 262 V call(DeferredCloser closer) throws Exception; 263 } 264 265 /** 266 * An operation that computes a {@link ClosingFuture} of a result. 267 * 268 * @param <V> the type of the result 269 * @since 30.1 270 */ 271 public interface AsyncClosingCallable<V extends @Nullable Object> { 272 /** 273 * Computes a result, or throws an exception if unable to do so. 274 * 275 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 276 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 277 * not before this method completes), even if this method throws or the pipeline is cancelled. 278 */ 279 ClosingFuture<V> call(DeferredCloser closer) throws Exception; 280 } 281 282 /** 283 * A function from an input to a result. 284 * 285 * @param <T> the type of the input to the function 286 * @param <U> the type of the result of the function 287 */ 288 public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 289 290 /** 291 * Applies this function to an input, or throws an exception if unable to do so. 292 * 293 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 294 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 295 * not before this method completes), even if this method throws or the pipeline is cancelled. 296 */ 297 @ParametricNullness 298 U apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 299 } 300 301 /** 302 * A function from an input to a {@link ClosingFuture} of a result. 303 * 304 * @param <T> the type of the input to the function 305 * @param <U> the type of the result of the function 306 */ 307 public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 308 /** 309 * Applies this function to an input, or throws an exception if unable to do so. 310 * 311 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 312 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 313 * not before this method completes), even if this method throws or the pipeline is cancelled. 314 */ 315 ClosingFuture<U> apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 316 } 317 318 /** 319 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 320 * allows the user to close all the closeable objects that were captured during it for later 321 * closing. 322 * 323 * <p>The asynchronous operation will have completed before this object is created. 324 * 325 * @param <V> the type of the value of a successful operation 326 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 327 */ 328 public static final class ValueAndCloser<V extends @Nullable Object> { 329 330 private final ClosingFuture<? extends V> closingFuture; 331 332 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 333 this.closingFuture = checkNotNull(closingFuture); 334 } 335 336 /** 337 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 338 * {@link Future#get()} would. 339 * 340 * <p>Because the asynchronous operation has already completed, this method is synchronous and 341 * returns immediately. 342 * 343 * @throws CancellationException if the computation was cancelled 344 * @throws ExecutionException if the computation threw an exception 345 */ 346 @ParametricNullness 347 public V get() throws ExecutionException { 348 return getDone(closingFuture.future); 349 } 350 351 /** 352 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 353 * operation on the {@link Executor}s specified by calls to {@link 354 * DeferredCloser#eventuallyClose(Object, Executor)}. 355 * 356 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 357 * closed synchronously. 358 * 359 * <p>Idempotent: objects will be closed at most once. 360 */ 361 public void closeAsync() { 362 closingFuture.close(); 363 } 364 } 365 366 /** 367 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 368 * ClosingFuture} pipeline. 369 * 370 * @param <V> the type of the final value of a successful pipeline 371 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 372 */ 373 public interface ValueAndCloserConsumer<V extends @Nullable Object> { 374 375 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 376 void accept(ValueAndCloser<V> valueAndCloser); 377 } 378 379 /** 380 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 381 * 382 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 383 * execution 384 */ 385 public static <V extends @Nullable Object> ClosingFuture<V> submit( 386 ClosingCallable<V> callable, Executor executor) { 387 return new ClosingFuture<>(callable, executor); 388 } 389 390 /** 391 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 392 * 393 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 394 * execution 395 * @since 30.1 396 */ 397 public static <V extends @Nullable Object> ClosingFuture<V> submitAsync( 398 AsyncClosingCallable<V> callable, Executor executor) { 399 return new ClosingFuture<>(callable, executor); 400 } 401 402 /** 403 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 404 * 405 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 406 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 407 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 408 */ 409 public static <V extends @Nullable Object> ClosingFuture<V> from(ListenableFuture<V> future) { 410 return new ClosingFuture<V>(future); 411 } 412 413 /** 414 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 415 * 416 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)}) when 417 * the pipeline is done, even if the pipeline is canceled or fails. 418 * 419 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 420 * value in order to close it. 421 * 422 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 423 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Object, 424 * Executor)}. 425 * @param closingExecutor the future's result will be closed on this executor 426 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 427 * underlying value may never be closed if the {@link Future} is canceled after its operation 428 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 429 * including those that pass them to this method, with {@link #submit(ClosingCallable, 430 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 431 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 432 * ClosingFuture#from}. 433 */ 434 @Deprecated 435 // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19. 436 public static <C extends @Nullable Object & @Nullable Closeable> 437 ClosingFuture<C> eventuallyClosing( 438 ListenableFuture<C> future, final Executor closingExecutor) { 439 checkNotNull(closingExecutor); 440 final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 441 Futures.addCallback( 442 future, 443 new FutureCallback<@Nullable Closeable>() { 444 @Override 445 public void onSuccess(@CheckForNull Closeable result) { 446 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 447 } 448 449 @Override 450 public void onFailure(Throwable t) {} 451 }, 452 directExecutor()); 453 return closingFuture; 454 } 455 456 /** 457 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 458 * 459 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 460 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 461 */ 462 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 463 return new Combiner(false, futures); 464 } 465 466 /** 467 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 468 * 469 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 470 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 471 */ 472 public static Combiner whenAllComplete( 473 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 474 return whenAllComplete(asList(future1, moreFutures)); 475 } 476 477 /** 478 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 479 * all succeed. If any fail, the resulting pipeline will fail. 480 * 481 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 482 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 483 */ 484 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 485 return new Combiner(true, futures); 486 } 487 488 /** 489 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 490 * they all succeed. If any fail, the resulting pipeline will fail. 491 * 492 * <p>Calling this method allows you to use lambdas or method references typed with the types of 493 * the input {@link ClosingFuture}s. 494 * 495 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 496 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 497 */ 498 public static <V1 extends @Nullable Object, V2 extends @Nullable Object> 499 Combiner2<V1, V2> whenAllSucceed(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 500 return new Combiner2<>(future1, future2); 501 } 502 503 /** 504 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 505 * they all succeed. If any fail, the resulting pipeline will fail. 506 * 507 * <p>Calling this method allows you to use lambdas or method references typed with the types of 508 * the input {@link ClosingFuture}s. 509 * 510 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 511 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 512 */ 513 public static < 514 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 515 Combiner3<V1, V2, V3> whenAllSucceed( 516 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 517 return new Combiner3<>(future1, future2, future3); 518 } 519 520 /** 521 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 522 * they all succeed. If any fail, the resulting pipeline will fail. 523 * 524 * <p>Calling this method allows you to use lambdas or method references typed with the types of 525 * the input {@link ClosingFuture}s. 526 * 527 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 528 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 529 */ 530 public static < 531 V1 extends @Nullable Object, 532 V2 extends @Nullable Object, 533 V3 extends @Nullable Object, 534 V4 extends @Nullable Object> 535 Combiner4<V1, V2, V3, V4> whenAllSucceed( 536 ClosingFuture<V1> future1, 537 ClosingFuture<V2> future2, 538 ClosingFuture<V3> future3, 539 ClosingFuture<V4> future4) { 540 return new Combiner4<>(future1, future2, future3, future4); 541 } 542 543 /** 544 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 545 * they all succeed. If any fail, the resulting pipeline will fail. 546 * 547 * <p>Calling this method allows you to use lambdas or method references typed with the types of 548 * the input {@link ClosingFuture}s. 549 * 550 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 551 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 552 */ 553 public static < 554 V1 extends @Nullable Object, 555 V2 extends @Nullable Object, 556 V3 extends @Nullable Object, 557 V4 extends @Nullable Object, 558 V5 extends @Nullable Object> 559 Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 560 ClosingFuture<V1> future1, 561 ClosingFuture<V2> future2, 562 ClosingFuture<V3> future3, 563 ClosingFuture<V4> future4, 564 ClosingFuture<V5> future5) { 565 return new Combiner5<>(future1, future2, future3, future4, future5); 566 } 567 568 /** 569 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 570 * all succeed. If any fail, the resulting pipeline will fail. 571 * 572 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 573 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 574 */ 575 public static Combiner whenAllSucceed( 576 ClosingFuture<?> future1, 577 ClosingFuture<?> future2, 578 ClosingFuture<?> future3, 579 ClosingFuture<?> future4, 580 ClosingFuture<?> future5, 581 ClosingFuture<?> future6, 582 ClosingFuture<?>... moreFutures) { 583 return whenAllSucceed( 584 FluentIterable.of(future1, future2, future3, future4, future5, future6) 585 .append(moreFutures)); 586 } 587 588 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 589 private final CloseableList closeables = new CloseableList(); 590 private final FluentFuture<V> future; 591 592 private ClosingFuture(ListenableFuture<V> future) { 593 this.future = FluentFuture.from(future); 594 } 595 596 private ClosingFuture(final ClosingCallable<V> callable, Executor executor) { 597 checkNotNull(callable); 598 TrustedListenableFutureTask<V> task = 599 TrustedListenableFutureTask.create( 600 new Callable<V>() { 601 @Override 602 @ParametricNullness 603 public V call() throws Exception { 604 return callable.call(closeables.closer); 605 } 606 607 @Override 608 public String toString() { 609 return callable.toString(); 610 } 611 }); 612 executor.execute(task); 613 this.future = task; 614 } 615 616 private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) { 617 checkNotNull(callable); 618 TrustedListenableFutureTask<V> task = 619 TrustedListenableFutureTask.create( 620 new AsyncCallable<V>() { 621 @Override 622 public ListenableFuture<V> call() throws Exception { 623 CloseableList newCloseables = new CloseableList(); 624 try { 625 ClosingFuture<V> closingFuture = callable.call(newCloseables.closer); 626 closingFuture.becomeSubsumedInto(closeables); 627 return closingFuture.future; 628 } finally { 629 closeables.add(newCloseables, directExecutor()); 630 } 631 } 632 633 @Override 634 public String toString() { 635 return callable.toString(); 636 } 637 }); 638 executor.execute(task); 639 this.future = task; 640 } 641 642 /** 643 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 644 * future returns {@code null} if the step is successful or throws the same exception that would 645 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 646 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 647 * 648 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 649 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 650 * a derivation method <i>on the same instance</i>. This is important because calling {@code 651 * statusFuture} alone does not provide a way to close the pipeline. 652 */ 653 public ListenableFuture<?> statusFuture() { 654 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 655 } 656 657 /** 658 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 659 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 660 * when the pipeline is done. 661 * 662 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 663 * ClosingFuture} will be equivalent to this one. 664 * 665 * <p>If the function throws an exception, that exception is used as the result of the derived 666 * {@code ClosingFuture}. 667 * 668 * <p>Example usage: 669 * 670 * <pre>{@code 671 * ClosingFuture<List<Row>> rowsFuture = 672 * queryFuture.transform((closer, result) -> result.getRows(), executor); 673 * }</pre> 674 * 675 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 676 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 677 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 678 * 679 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 680 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 681 * this {@code ClosingFuture}. 682 * 683 * @param function transforms the value of this step to the value of the derived step 684 * @param executor executor to run the function in 685 * @return the derived step 686 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 687 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 688 * finished} 689 */ 690 public <U extends @Nullable Object> ClosingFuture<U> transform( 691 final ClosingFunction<? super V, U> function, Executor executor) { 692 checkNotNull(function); 693 AsyncFunction<V, U> applyFunction = 694 new AsyncFunction<V, U>() { 695 @Override 696 public ListenableFuture<U> apply(V input) throws Exception { 697 return closeables.applyClosingFunction(function, input); 698 } 699 700 @Override 701 public String toString() { 702 return function.toString(); 703 } 704 }; 705 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 706 return derive(future.transformAsync(applyFunction, executor)); 707 } 708 709 /** 710 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 711 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 712 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 713 * captured by the returned {@link ClosingFuture}). 714 * 715 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 716 * returned by the function. 717 * 718 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 719 * ClosingFuture} will be equivalent to this one. 720 * 721 * <p>If the function throws an exception, that exception is used as the result of the derived 722 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 723 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 724 * closed. 725 * 726 * <p>Usage guidelines for this method: 727 * 728 * <ul> 729 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 730 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 731 * Executor)} instead, with a function that returns the next value directly. 732 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 733 * for every closeable object this step creates in order to capture it for later closing. 734 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 735 * ClosingFuture} call {@link #from(ListenableFuture)}. 736 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 737 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 738 * {@link #withoutCloser(AsyncFunction)} 739 * </ul> 740 * 741 * <p>Example usage: 742 * 743 * <pre>{@code 744 * // Result.getRowsClosingFuture() returns a ClosingFuture. 745 * ClosingFuture<List<Row>> rowsFuture = 746 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 747 * 748 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 749 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 750 * // Closeable). 751 * ClosingFuture<Integer> rowsFuture2 = 752 * queryFuture.transformAsync( 753 * (closer, result) -> { 754 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 755 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 756 * }, 757 * executor); 758 * 759 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 760 * ClosingFuture<List<Row>> rowsFuture3 = 761 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 762 * 763 * }</pre> 764 * 765 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 766 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 767 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 768 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 769 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 770 * responsible for completing the returned {@code ClosingFuture}.) 771 * 772 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 773 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 774 * this {@code ClosingFuture}. 775 * 776 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 777 * the derived step 778 * @param executor executor to run the function in 779 * @return the derived step 780 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 781 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 782 * finished} 783 */ 784 public <U extends @Nullable Object> ClosingFuture<U> transformAsync( 785 final AsyncClosingFunction<? super V, U> function, Executor executor) { 786 checkNotNull(function); 787 AsyncFunction<V, U> applyFunction = 788 new AsyncFunction<V, U>() { 789 @Override 790 public ListenableFuture<U> apply(V input) throws Exception { 791 return closeables.applyAsyncClosingFunction(function, input); 792 } 793 794 @Override 795 public String toString() { 796 return function.toString(); 797 } 798 }; 799 return derive(future.transformAsync(applyFunction, executor)); 800 } 801 802 /** 803 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 804 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 805 * {@link ListenableFuture}. 806 * 807 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 808 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 809 * meets these conditions: 810 * 811 * <ul> 812 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 813 * DeferredCloser#eventuallyClose(Object, Executor)}. 814 * <li>It returns a {@link ListenableFuture}. 815 * </ul> 816 * 817 * <p>Example usage: 818 * 819 * <pre>{@code 820 * // Result.getRowsFuture() returns a ListenableFuture. 821 * ClosingFuture<List<Row>> rowsFuture = 822 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 823 * }</pre> 824 * 825 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 826 * ListenableFuture} with the value of a derived step 827 */ 828 public static <V extends @Nullable Object, U extends @Nullable Object> 829 AsyncClosingFunction<V, U> withoutCloser(final AsyncFunction<V, U> function) { 830 checkNotNull(function); 831 return new AsyncClosingFunction<V, U>() { 832 @Override 833 public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 834 return ClosingFuture.from(function.apply(input)); 835 } 836 }; 837 } 838 839 /** 840 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 841 * to its exception if it is an instance of a given exception type. The function can use a {@link 842 * DeferredCloser} to capture objects to be closed when the pipeline is done. 843 * 844 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 845 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 846 * one. 847 * 848 * <p>If the function throws an exception, that exception is used as the result of the derived 849 * {@code ClosingFuture}. 850 * 851 * <p>Example usage: 852 * 853 * <pre>{@code 854 * ClosingFuture<QueryResult> queryFuture = 855 * queryFuture.catching( 856 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 857 * }</pre> 858 * 859 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 860 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 861 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 862 * 863 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 864 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 865 * this {@code ClosingFuture}. 866 * 867 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 868 * type is matched against this step's exception. "This step's exception" means the cause of 869 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 870 * underlying this step or, if {@code get()} throws a different kind of exception, that 871 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 872 * prefer more specific types, avoiding {@code Throwable.class} in particular. 873 * @param fallback the function to be called if this step fails with the expected exception type. 874 * The function's argument is this step's exception. "This step's exception" means the cause 875 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 876 * underlying this step or, if {@code get()} throws a different kind of exception, that 877 * exception itself. 878 * @param executor the executor that runs {@code fallback} if the input fails 879 */ 880 public <X extends Throwable> ClosingFuture<V> catching( 881 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 882 return catchingMoreGeneric(exceptionType, fallback, executor); 883 } 884 885 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 886 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 887 Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 888 checkNotNull(fallback); 889 AsyncFunction<X, W> applyFallback = 890 new AsyncFunction<X, W>() { 891 @Override 892 public ListenableFuture<W> apply(X exception) throws Exception { 893 return closeables.applyClosingFunction(fallback, exception); 894 } 895 896 @Override 897 public String toString() { 898 return fallback.toString(); 899 } 900 }; 901 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 902 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 903 } 904 905 /** 906 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 907 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 908 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 909 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 910 * 911 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 912 * ClosingFuture} will be equivalent to the one returned by the function. 913 * 914 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 915 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 916 * one. 917 * 918 * <p>If the function throws an exception, that exception is used as the result of the derived 919 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 920 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 921 * closed. 922 * 923 * <p>Usage guidelines for this method: 924 * 925 * <ul> 926 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 927 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 928 * ClosingFunction, Executor)} instead, with a function that returns the next value 929 * directly. 930 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 931 * for every closeable object this step creates in order to capture it for later closing. 932 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 933 * ClosingFuture} call {@link #from(ListenableFuture)}. 934 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 935 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 936 * {@link #withoutCloser(AsyncFunction)} 937 * </ul> 938 * 939 * <p>Example usage: 940 * 941 * <pre>{@code 942 * // Fall back to a secondary input stream in case of IOException. 943 * ClosingFuture<InputStream> inputFuture = 944 * firstInputFuture.catchingAsync( 945 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 946 * } 947 * }</pre> 948 * 949 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 950 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 951 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 952 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 953 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 954 * responsible for completing the returned {@code ClosingFuture}.) 955 * 956 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 957 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 958 * this {@code ClosingFuture}. 959 * 960 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 961 * type is matched against this step's exception. "This step's exception" means the cause of 962 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 963 * underlying this step or, if {@code get()} throws a different kind of exception, that 964 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 965 * prefer more specific types, avoiding {@code Throwable.class} in particular. 966 * @param fallback the function to be called if this step fails with the expected exception type. 967 * The function's argument is this step's exception. "This step's exception" means the cause 968 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 969 * underlying this step or, if {@code get()} throws a different kind of exception, that 970 * exception itself. 971 * @param executor the executor that runs {@code fallback} if the input fails 972 */ 973 // TODO(dpb): Should this do something special if the function throws CancellationException or 974 // ExecutionException? 975 public <X extends Throwable> ClosingFuture<V> catchingAsync( 976 Class<X> exceptionType, 977 AsyncClosingFunction<? super X, ? extends V> fallback, 978 Executor executor) { 979 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 980 } 981 982 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 983 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 984 Class<X> exceptionType, 985 final AsyncClosingFunction<? super X, W> fallback, 986 Executor executor) { 987 checkNotNull(fallback); 988 AsyncFunction<X, W> asyncFunction = 989 new AsyncFunction<X, W>() { 990 @Override 991 public ListenableFuture<W> apply(X exception) throws Exception { 992 return closeables.applyAsyncClosingFunction(fallback, exception); 993 } 994 995 @Override 996 public String toString() { 997 return fallback.toString(); 998 } 999 }; 1000 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 1001 } 1002 1003 /** 1004 * Marks this step as the last step in the {@code ClosingFuture} pipeline. 1005 * 1006 * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when 1007 * the pipeline is cancelled. 1008 * 1009 * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously 1010 * <b>after</b> the returned {@code Future} is done: the future completes before closing starts, 1011 * rather than once it has finished. 1012 * 1013 * <p>After calling this method, you may not call {@link 1014 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 1015 * derivation method on this {@code ClosingFuture}. 1016 * 1017 * @return a {@link Future} that represents the final value or exception of the pipeline 1018 */ 1019 public FluentFuture<V> finishToFuture() { 1020 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 1021 logger.log(FINER, "will close {0}", this); 1022 future.addListener( 1023 new Runnable() { 1024 @Override 1025 public void run() { 1026 checkAndUpdateState(WILL_CLOSE, CLOSING); 1027 close(); 1028 checkAndUpdateState(CLOSING, CLOSED); 1029 } 1030 }, 1031 directExecutor()); 1032 } else { 1033 switch (state.get()) { 1034 case SUBSUMED: 1035 throw new IllegalStateException( 1036 "Cannot call finishToFuture() after deriving another step"); 1037 1038 case WILL_CREATE_VALUE_AND_CLOSER: 1039 throw new IllegalStateException( 1040 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 1041 1042 case WILL_CLOSE: 1043 case CLOSING: 1044 case CLOSED: 1045 throw new IllegalStateException("Cannot call finishToFuture() twice"); 1046 1047 case OPEN: 1048 throw new AssertionError(); 1049 } 1050 } 1051 return future; 1052 } 1053 1054 /** 1055 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 1056 * {@code receiver} will be called with an object that contains the result of the operation. The 1057 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 1058 * 1059 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 1060 * any other derivation method on this {@code ClosingFuture}. 1061 * 1062 * @param consumer a callback whose method will be called (using {@code executor}) when this 1063 * operation is done 1064 */ 1065 public void finishToValueAndCloser( 1066 final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 1067 checkNotNull(consumer); 1068 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 1069 switch (state.get()) { 1070 case SUBSUMED: 1071 throw new IllegalStateException( 1072 "Cannot call finishToValueAndCloser() after deriving another step"); 1073 1074 case WILL_CLOSE: 1075 case CLOSING: 1076 case CLOSED: 1077 throw new IllegalStateException( 1078 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1079 1080 case WILL_CREATE_VALUE_AND_CLOSER: 1081 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1082 1083 case OPEN: 1084 break; 1085 } 1086 throw new AssertionError(state); 1087 } 1088 future.addListener( 1089 new Runnable() { 1090 @Override 1091 public void run() { 1092 provideValueAndCloser(consumer, ClosingFuture.this); 1093 } 1094 }, 1095 executor); 1096 } 1097 1098 private static <C extends @Nullable Object, V extends C> void provideValueAndCloser( 1099 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1100 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1101 } 1102 1103 /** 1104 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1105 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1106 * successful, and this step has not started when {@code cancel} is called, this step should never 1107 * run. 1108 * 1109 * <p>If successful, causes the objects captured by this step (if already started) and its input 1110 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1111 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1112 * 1113 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1114 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1115 * cancelled regardless 1116 * @return {@code false} if the step could not be cancelled, typically because it has already 1117 * completed normally; {@code true} otherwise 1118 */ 1119 @CanIgnoreReturnValue 1120 public boolean cancel(boolean mayInterruptIfRunning) { 1121 logger.log(FINER, "cancelling {0}", this); 1122 boolean cancelled = future.cancel(mayInterruptIfRunning); 1123 if (cancelled) { 1124 close(); 1125 } 1126 return cancelled; 1127 } 1128 1129 private void close() { 1130 logger.log(FINER, "closing {0}", this); 1131 closeables.close(); 1132 } 1133 1134 private <U extends @Nullable Object> ClosingFuture<U> derive(FluentFuture<U> future) { 1135 ClosingFuture<U> derived = new ClosingFuture<>(future); 1136 becomeSubsumedInto(derived.closeables); 1137 return derived; 1138 } 1139 1140 private void becomeSubsumedInto(CloseableList otherCloseables) { 1141 checkAndUpdateState(OPEN, SUBSUMED); 1142 otherCloseables.add(closeables, directExecutor()); 1143 } 1144 1145 /** 1146 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1147 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1148 * 1149 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1150 */ 1151 public static final class Peeker { 1152 private final ImmutableList<ClosingFuture<?>> futures; 1153 private volatile boolean beingCalled; 1154 1155 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1156 this.futures = checkNotNull(futures); 1157 } 1158 1159 /** 1160 * Returns the value of {@code closingFuture}. 1161 * 1162 * @throws ExecutionException if {@code closingFuture} is a failed step 1163 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1164 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1165 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1166 * @throws IllegalStateException if called outside of a call to {@link 1167 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1168 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1169 */ 1170 @ParametricNullness 1171 public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture) 1172 throws ExecutionException { 1173 checkState(beingCalled); 1174 checkArgument(futures.contains(closingFuture)); 1175 return Futures.getDone(closingFuture.future); 1176 } 1177 1178 @ParametricNullness 1179 private <V extends @Nullable Object> V call( 1180 CombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1181 beingCalled = true; 1182 CloseableList newCloseables = new CloseableList(); 1183 try { 1184 return combiner.call(newCloseables.closer, this); 1185 } finally { 1186 closeables.add(newCloseables, directExecutor()); 1187 beingCalled = false; 1188 } 1189 } 1190 1191 private <V extends @Nullable Object> FluentFuture<V> callAsync( 1192 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1193 beingCalled = true; 1194 CloseableList newCloseables = new CloseableList(); 1195 try { 1196 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1197 closingFuture.becomeSubsumedInto(closeables); 1198 return closingFuture.future; 1199 } finally { 1200 closeables.add(newCloseables, directExecutor()); 1201 beingCalled = false; 1202 } 1203 } 1204 } 1205 1206 /** 1207 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1208 * 1209 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1210 * instantiate this class. 1211 * 1212 * <p>Example: 1213 * 1214 * <pre>{@code 1215 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1216 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1217 * ListenableFuture<Integer> numberOfDifferentLines = 1218 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1219 * .call( 1220 * (closer, peeker) -> { 1221 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1222 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1223 * return countDifferentLines(file1Reader, file2Reader); 1224 * }, 1225 * executor) 1226 * .closing(executor); 1227 * }</pre> 1228 */ 1229 // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8. 1230 @com.google.errorprone.annotations.DoNotMock( 1231 "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1232 public static class Combiner { 1233 1234 private final CloseableList closeables = new CloseableList(); 1235 1236 /** 1237 * An operation that returns a result and may throw an exception. 1238 * 1239 * @param <V> the type of the result 1240 */ 1241 public interface CombiningCallable<V extends @Nullable Object> { 1242 /** 1243 * Computes a result, or throws an exception if unable to do so. 1244 * 1245 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1246 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1247 * (but not before this method completes), even if this method throws or the pipeline is 1248 * cancelled. 1249 * 1250 * @param peeker used to get the value of any of the input futures 1251 */ 1252 @ParametricNullness 1253 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1254 } 1255 1256 /** 1257 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1258 * 1259 * @param <V> the type of the result 1260 */ 1261 public interface AsyncCombiningCallable<V extends @Nullable Object> { 1262 /** 1263 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1264 * 1265 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1266 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1267 * (but not before this method completes), even if this method throws or the pipeline is 1268 * cancelled. 1269 * 1270 * @param peeker used to get the value of any of the input futures 1271 */ 1272 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1273 } 1274 1275 private final boolean allMustSucceed; 1276 protected final ImmutableList<ClosingFuture<?>> inputs; 1277 1278 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1279 this.allMustSucceed = allMustSucceed; 1280 this.inputs = ImmutableList.copyOf(inputs); 1281 for (ClosingFuture<?> input : inputs) { 1282 input.becomeSubsumedInto(closeables); 1283 } 1284 } 1285 1286 /** 1287 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1288 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1289 * objects to be closed when the pipeline is done. 1290 * 1291 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1292 * fail, so will the returned step. 1293 * 1294 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1295 * cancelled. 1296 * 1297 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1298 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1299 */ 1300 public <V extends @Nullable Object> ClosingFuture<V> call( 1301 final CombiningCallable<V> combiningCallable, Executor executor) { 1302 Callable<V> callable = 1303 new Callable<V>() { 1304 @Override 1305 @ParametricNullness 1306 public V call() throws Exception { 1307 return new Peeker(inputs).call(combiningCallable, closeables); 1308 } 1309 1310 @Override 1311 public String toString() { 1312 return combiningCallable.toString(); 1313 } 1314 }; 1315 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1316 derived.closeables.add(closeables, directExecutor()); 1317 return derived; 1318 } 1319 1320 /** 1321 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1322 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1323 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1324 * captured by the returned {@link ClosingFuture}). 1325 * 1326 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1327 * fail, so will the returned step. 1328 * 1329 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1330 * cancelled. 1331 * 1332 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1333 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1334 * 1335 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1336 * derived step. 1337 * 1338 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1339 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1340 * 1341 * <p>Usage guidelines for this method: 1342 * 1343 * <ul> 1344 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1345 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1346 * Executor)} instead, with a function that returns the next value directly. 1347 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1348 * for every closeable object this step creates in order to capture it for later closing. 1349 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1350 * ClosingFuture} call {@link #from(ListenableFuture)}. 1351 * </ul> 1352 * 1353 * <p>The same warnings about doing heavyweight operations within {@link 1354 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1355 */ 1356 public <V extends @Nullable Object> ClosingFuture<V> callAsync( 1357 final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1358 AsyncCallable<V> asyncCallable = 1359 new AsyncCallable<V>() { 1360 @Override 1361 public ListenableFuture<V> call() throws Exception { 1362 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1363 } 1364 1365 @Override 1366 public String toString() { 1367 return combiningCallable.toString(); 1368 } 1369 }; 1370 ClosingFuture<V> derived = 1371 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1372 derived.closeables.add(closeables, directExecutor()); 1373 return derived; 1374 } 1375 1376 private FutureCombiner<@Nullable Object> futureCombiner() { 1377 return allMustSucceed 1378 ? Futures.whenAllSucceed(inputFutures()) 1379 : Futures.whenAllComplete(inputFutures()); 1380 } 1381 1382 1383 private ImmutableList<FluentFuture<?>> inputFutures() { 1384 return FluentIterable.from(inputs) 1385 .<FluentFuture<?>>transform(future -> future.future) 1386 .toList(); 1387 } 1388 } 1389 1390 /** 1391 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1392 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1393 * combination. 1394 * 1395 * @param <V1> the type returned by the first future 1396 * @param <V2> the type returned by the second future 1397 */ 1398 public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object> 1399 extends Combiner { 1400 1401 /** 1402 * A function that returns a value when applied to the values of the two futures passed to 1403 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1404 * 1405 * @param <V1> the type returned by the first future 1406 * @param <V2> the type returned by the second future 1407 * @param <U> the type returned by the function 1408 */ 1409 public interface ClosingFunction2< 1410 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1411 1412 /** 1413 * Applies this function to two inputs, or throws an exception if unable to do so. 1414 * 1415 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1416 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1417 * (but not before this method completes), even if this method throws or the pipeline is 1418 * cancelled. 1419 */ 1420 @ParametricNullness 1421 U apply(DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1422 throws Exception; 1423 } 1424 1425 /** 1426 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1427 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1428 * 1429 * @param <V1> the type returned by the first future 1430 * @param <V2> the type returned by the second future 1431 * @param <U> the type returned by the function 1432 */ 1433 public interface AsyncClosingFunction2< 1434 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1435 1436 /** 1437 * Applies this function to two inputs, or throws an exception if unable to do so. 1438 * 1439 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1440 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1441 * (but not before this method completes), even if this method throws or the pipeline is 1442 * cancelled. 1443 */ 1444 ClosingFuture<U> apply( 1445 DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1446 throws Exception; 1447 } 1448 1449 private final ClosingFuture<V1> future1; 1450 private final ClosingFuture<V2> future2; 1451 1452 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1453 super(true, ImmutableList.of(future1, future2)); 1454 this.future1 = future1; 1455 this.future2 = future2; 1456 } 1457 1458 /** 1459 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1460 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1461 * objects to be closed when the pipeline is done. 1462 * 1463 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1464 * any of the inputs fail, so will the returned step. 1465 * 1466 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1467 * 1468 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1469 * ExecutionException} will be extracted and used as the failure of the derived step. 1470 */ 1471 public <U extends @Nullable Object> ClosingFuture<U> call( 1472 final ClosingFunction2<V1, V2, U> function, Executor executor) { 1473 return call( 1474 new CombiningCallable<U>() { 1475 @Override 1476 @ParametricNullness 1477 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1478 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1479 } 1480 1481 @Override 1482 public String toString() { 1483 return function.toString(); 1484 } 1485 }, 1486 executor); 1487 } 1488 1489 /** 1490 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1491 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1492 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1493 * captured by the returned {@link ClosingFuture}). 1494 * 1495 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1496 * any of the inputs fail, so will the returned step. 1497 * 1498 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1499 * 1500 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1501 * ExecutionException} will be extracted and used as the failure of the derived step. 1502 * 1503 * <p>If the function throws any other exception, it will be used as the failure of the derived 1504 * step. 1505 * 1506 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1507 * the closeable objects in that {@code ClosingFuture} will be closed. 1508 * 1509 * <p>Usage guidelines for this method: 1510 * 1511 * <ul> 1512 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1513 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1514 * Executor)} instead, with a function that returns the next value directly. 1515 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1516 * for every closeable object this step creates in order to capture it for later closing. 1517 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1518 * ClosingFuture} call {@link #from(ListenableFuture)}. 1519 * </ul> 1520 * 1521 * <p>The same warnings about doing heavyweight operations within {@link 1522 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1523 */ 1524 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1525 final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1526 return callAsync( 1527 new AsyncCombiningCallable<U>() { 1528 @Override 1529 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1530 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1531 } 1532 1533 @Override 1534 public String toString() { 1535 return function.toString(); 1536 } 1537 }, 1538 executor); 1539 } 1540 } 1541 1542 /** 1543 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1544 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1545 * ClosingFuture)} to start this combination. 1546 * 1547 * @param <V1> the type returned by the first future 1548 * @param <V2> the type returned by the second future 1549 * @param <V3> the type returned by the third future 1550 */ 1551 public static final class Combiner3< 1552 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 1553 extends Combiner { 1554 /** 1555 * A function that returns a value when applied to the values of the three futures passed to 1556 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1557 * 1558 * @param <V1> the type returned by the first future 1559 * @param <V2> the type returned by the second future 1560 * @param <V3> the type returned by the third future 1561 * @param <U> the type returned by the function 1562 */ 1563 public interface ClosingFunction3< 1564 V1 extends @Nullable Object, 1565 V2 extends @Nullable Object, 1566 V3 extends @Nullable Object, 1567 U extends @Nullable Object> { 1568 /** 1569 * Applies this function to three inputs, or throws an exception if unable to do so. 1570 * 1571 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1572 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1573 * (but not before this method completes), even if this method throws or the pipeline is 1574 * cancelled. 1575 */ 1576 @ParametricNullness 1577 U apply( 1578 DeferredCloser closer, 1579 @ParametricNullness V1 value1, 1580 @ParametricNullness V2 value2, 1581 @ParametricNullness V3 value3) 1582 throws Exception; 1583 } 1584 1585 /** 1586 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1587 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1588 * 1589 * @param <V1> the type returned by the first future 1590 * @param <V2> the type returned by the second future 1591 * @param <V3> the type returned by the third future 1592 * @param <U> the type returned by the function 1593 */ 1594 public interface AsyncClosingFunction3< 1595 V1 extends @Nullable Object, 1596 V2 extends @Nullable Object, 1597 V3 extends @Nullable Object, 1598 U extends @Nullable Object> { 1599 /** 1600 * Applies this function to three inputs, or throws an exception if unable to do so. 1601 * 1602 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1603 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1604 * (but not before this method completes), even if this method throws or the pipeline is 1605 * cancelled. 1606 */ 1607 ClosingFuture<U> apply( 1608 DeferredCloser closer, 1609 @ParametricNullness V1 value1, 1610 @ParametricNullness V2 value2, 1611 @ParametricNullness V3 value3) 1612 throws Exception; 1613 } 1614 1615 private final ClosingFuture<V1> future1; 1616 private final ClosingFuture<V2> future2; 1617 private final ClosingFuture<V3> future3; 1618 1619 private Combiner3( 1620 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1621 super(true, ImmutableList.of(future1, future2, future3)); 1622 this.future1 = future1; 1623 this.future2 = future2; 1624 this.future3 = future3; 1625 } 1626 1627 /** 1628 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1629 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1630 * objects to be closed when the pipeline is done. 1631 * 1632 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1633 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1634 * 1635 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1636 * 1637 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1638 * ExecutionException} will be extracted and used as the failure of the derived step. 1639 */ 1640 public <U extends @Nullable Object> ClosingFuture<U> call( 1641 final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1642 return call( 1643 new CombiningCallable<U>() { 1644 @Override 1645 @ParametricNullness 1646 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1647 return function.apply( 1648 closer, 1649 peeker.getDone(future1), 1650 peeker.getDone(future2), 1651 peeker.getDone(future3)); 1652 } 1653 1654 @Override 1655 public String toString() { 1656 return function.toString(); 1657 } 1658 }, 1659 executor); 1660 } 1661 1662 /** 1663 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1664 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1665 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1666 * captured by the returned {@link ClosingFuture}). 1667 * 1668 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1669 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1670 * 1671 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1672 * 1673 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1674 * ExecutionException} will be extracted and used as the failure of the derived step. 1675 * 1676 * <p>If the function throws any other exception, it will be used as the failure of the derived 1677 * step. 1678 * 1679 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1680 * the closeable objects in that {@code ClosingFuture} will be closed. 1681 * 1682 * <p>Usage guidelines for this method: 1683 * 1684 * <ul> 1685 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1686 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1687 * Executor)} instead, with a function that returns the next value directly. 1688 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1689 * for every closeable object this step creates in order to capture it for later closing. 1690 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1691 * ClosingFuture} call {@link #from(ListenableFuture)}. 1692 * </ul> 1693 * 1694 * <p>The same warnings about doing heavyweight operations within {@link 1695 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1696 */ 1697 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1698 final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1699 return callAsync( 1700 new AsyncCombiningCallable<U>() { 1701 @Override 1702 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1703 return function.apply( 1704 closer, 1705 peeker.getDone(future1), 1706 peeker.getDone(future2), 1707 peeker.getDone(future3)); 1708 } 1709 1710 @Override 1711 public String toString() { 1712 return function.toString(); 1713 } 1714 }, 1715 executor); 1716 } 1717 } 1718 1719 /** 1720 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1721 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1722 * ClosingFuture)} to start this combination. 1723 * 1724 * @param <V1> the type returned by the first future 1725 * @param <V2> the type returned by the second future 1726 * @param <V3> the type returned by the third future 1727 * @param <V4> the type returned by the fourth future 1728 */ 1729 public static final class Combiner4< 1730 V1 extends @Nullable Object, 1731 V2 extends @Nullable Object, 1732 V3 extends @Nullable Object, 1733 V4 extends @Nullable Object> 1734 extends Combiner { 1735 /** 1736 * A function that returns a value when applied to the values of the four futures passed to 1737 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1738 * 1739 * @param <V1> the type returned by the first future 1740 * @param <V2> the type returned by the second future 1741 * @param <V3> the type returned by the third future 1742 * @param <V4> the type returned by the fourth future 1743 * @param <U> the type returned by the function 1744 */ 1745 public interface ClosingFunction4< 1746 V1 extends @Nullable Object, 1747 V2 extends @Nullable Object, 1748 V3 extends @Nullable Object, 1749 V4 extends @Nullable Object, 1750 U extends @Nullable Object> { 1751 /** 1752 * Applies this function to four inputs, or throws an exception if unable to do so. 1753 * 1754 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1755 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1756 * (but not before this method completes), even if this method throws or the pipeline is 1757 * cancelled. 1758 */ 1759 @ParametricNullness 1760 U apply( 1761 DeferredCloser closer, 1762 @ParametricNullness V1 value1, 1763 @ParametricNullness V2 value2, 1764 @ParametricNullness V3 value3, 1765 @ParametricNullness V4 value4) 1766 throws Exception; 1767 } 1768 1769 /** 1770 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1771 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1772 * ClosingFuture)}. 1773 * 1774 * @param <V1> the type returned by the first future 1775 * @param <V2> the type returned by the second future 1776 * @param <V3> the type returned by the third future 1777 * @param <V4> the type returned by the fourth future 1778 * @param <U> the type returned by the function 1779 */ 1780 public interface AsyncClosingFunction4< 1781 V1 extends @Nullable Object, 1782 V2 extends @Nullable Object, 1783 V3 extends @Nullable Object, 1784 V4 extends @Nullable Object, 1785 U extends @Nullable Object> { 1786 /** 1787 * Applies this function to four inputs, or throws an exception if unable to do so. 1788 * 1789 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1790 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1791 * (but not before this method completes), even if this method throws or the pipeline is 1792 * cancelled. 1793 */ 1794 ClosingFuture<U> apply( 1795 DeferredCloser closer, 1796 @ParametricNullness V1 value1, 1797 @ParametricNullness V2 value2, 1798 @ParametricNullness V3 value3, 1799 @ParametricNullness V4 value4) 1800 throws Exception; 1801 } 1802 1803 private final ClosingFuture<V1> future1; 1804 private final ClosingFuture<V2> future2; 1805 private final ClosingFuture<V3> future3; 1806 private final ClosingFuture<V4> future4; 1807 1808 private Combiner4( 1809 ClosingFuture<V1> future1, 1810 ClosingFuture<V2> future2, 1811 ClosingFuture<V3> future3, 1812 ClosingFuture<V4> future4) { 1813 super(true, ImmutableList.of(future1, future2, future3, future4)); 1814 this.future1 = future1; 1815 this.future2 = future2; 1816 this.future3 = future3; 1817 this.future4 = future4; 1818 } 1819 1820 /** 1821 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1822 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1823 * objects to be closed when the pipeline is done. 1824 * 1825 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1826 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1827 * 1828 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1829 * 1830 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1831 * ExecutionException} will be extracted and used as the failure of the derived step. 1832 */ 1833 public <U extends @Nullable Object> ClosingFuture<U> call( 1834 final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1835 return call( 1836 new CombiningCallable<U>() { 1837 @Override 1838 @ParametricNullness 1839 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1840 return function.apply( 1841 closer, 1842 peeker.getDone(future1), 1843 peeker.getDone(future2), 1844 peeker.getDone(future3), 1845 peeker.getDone(future4)); 1846 } 1847 1848 @Override 1849 public String toString() { 1850 return function.toString(); 1851 } 1852 }, 1853 executor); 1854 } 1855 1856 /** 1857 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1858 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1859 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1860 * captured by the returned {@link ClosingFuture}). 1861 * 1862 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1863 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1864 * 1865 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1866 * 1867 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1868 * ExecutionException} will be extracted and used as the failure of the derived step. 1869 * 1870 * <p>If the function throws any other exception, it will be used as the failure of the derived 1871 * step. 1872 * 1873 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1874 * the closeable objects in that {@code ClosingFuture} will be closed. 1875 * 1876 * <p>Usage guidelines for this method: 1877 * 1878 * <ul> 1879 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1880 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1881 * Executor)} instead, with a function that returns the next value directly. 1882 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1883 * for every closeable object this step creates in order to capture it for later closing. 1884 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1885 * ClosingFuture} call {@link #from(ListenableFuture)}. 1886 * </ul> 1887 * 1888 * <p>The same warnings about doing heavyweight operations within {@link 1889 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1890 */ 1891 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1892 final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1893 return callAsync( 1894 new AsyncCombiningCallable<U>() { 1895 @Override 1896 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1897 return function.apply( 1898 closer, 1899 peeker.getDone(future1), 1900 peeker.getDone(future2), 1901 peeker.getDone(future3), 1902 peeker.getDone(future4)); 1903 } 1904 1905 @Override 1906 public String toString() { 1907 return function.toString(); 1908 } 1909 }, 1910 executor); 1911 } 1912 } 1913 1914 /** 1915 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1916 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1917 * ClosingFuture, ClosingFuture)} to start this combination. 1918 * 1919 * @param <V1> the type returned by the first future 1920 * @param <V2> the type returned by the second future 1921 * @param <V3> the type returned by the third future 1922 * @param <V4> the type returned by the fourth future 1923 * @param <V5> the type returned by the fifth future 1924 */ 1925 public static final class Combiner5< 1926 V1 extends @Nullable Object, 1927 V2 extends @Nullable Object, 1928 V3 extends @Nullable Object, 1929 V4 extends @Nullable Object, 1930 V5 extends @Nullable Object> 1931 extends Combiner { 1932 /** 1933 * A function that returns a value when applied to the values of the five futures passed to 1934 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1935 * ClosingFuture)}. 1936 * 1937 * @param <V1> the type returned by the first future 1938 * @param <V2> the type returned by the second future 1939 * @param <V3> the type returned by the third future 1940 * @param <V4> the type returned by the fourth future 1941 * @param <V5> the type returned by the fifth future 1942 * @param <U> the type returned by the function 1943 */ 1944 public interface ClosingFunction5< 1945 V1 extends @Nullable Object, 1946 V2 extends @Nullable Object, 1947 V3 extends @Nullable Object, 1948 V4 extends @Nullable Object, 1949 V5 extends @Nullable Object, 1950 U extends @Nullable Object> { 1951 /** 1952 * Applies this function to five inputs, or throws an exception if unable to do so. 1953 * 1954 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1955 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1956 * (but not before this method completes), even if this method throws or the pipeline is 1957 * cancelled. 1958 */ 1959 @ParametricNullness 1960 U apply( 1961 DeferredCloser closer, 1962 @ParametricNullness V1 value1, 1963 @ParametricNullness V2 value2, 1964 @ParametricNullness V3 value3, 1965 @ParametricNullness V4 value4, 1966 @ParametricNullness V5 value5) 1967 throws Exception; 1968 } 1969 1970 /** 1971 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1972 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1973 * ClosingFuture, ClosingFuture)}. 1974 * 1975 * @param <V1> the type returned by the first future 1976 * @param <V2> the type returned by the second future 1977 * @param <V3> the type returned by the third future 1978 * @param <V4> the type returned by the fourth future 1979 * @param <V5> the type returned by the fifth future 1980 * @param <U> the type returned by the function 1981 */ 1982 public interface AsyncClosingFunction5< 1983 V1 extends @Nullable Object, 1984 V2 extends @Nullable Object, 1985 V3 extends @Nullable Object, 1986 V4 extends @Nullable Object, 1987 V5 extends @Nullable Object, 1988 U extends @Nullable Object> { 1989 /** 1990 * Applies this function to five inputs, or throws an exception if unable to do so. 1991 * 1992 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1993 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1994 * (but not before this method completes), even if this method throws or the pipeline is 1995 * cancelled. 1996 */ 1997 ClosingFuture<U> apply( 1998 DeferredCloser closer, 1999 @ParametricNullness V1 value1, 2000 @ParametricNullness V2 value2, 2001 @ParametricNullness V3 value3, 2002 @ParametricNullness V4 value4, 2003 @ParametricNullness V5 value5) 2004 throws Exception; 2005 } 2006 2007 private final ClosingFuture<V1> future1; 2008 private final ClosingFuture<V2> future2; 2009 private final ClosingFuture<V3> future3; 2010 private final ClosingFuture<V4> future4; 2011 private final ClosingFuture<V5> future5; 2012 2013 private Combiner5( 2014 ClosingFuture<V1> future1, 2015 ClosingFuture<V2> future2, 2016 ClosingFuture<V3> future3, 2017 ClosingFuture<V4> future4, 2018 ClosingFuture<V5> future5) { 2019 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 2020 this.future1 = future1; 2021 this.future2 = future2; 2022 this.future3 = future3; 2023 this.future4 = future4; 2024 this.future5 = future5; 2025 } 2026 2027 /** 2028 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2029 * combining function to their values. The function can use a {@link DeferredCloser} to capture 2030 * objects to be closed when the pipeline is done. 2031 * 2032 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2033 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2034 * returned step. 2035 * 2036 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2037 * 2038 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2039 * ExecutionException} will be extracted and used as the failure of the derived step. 2040 */ 2041 public <U extends @Nullable Object> ClosingFuture<U> call( 2042 final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2043 return call( 2044 new CombiningCallable<U>() { 2045 @Override 2046 @ParametricNullness 2047 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 2048 return function.apply( 2049 closer, 2050 peeker.getDone(future1), 2051 peeker.getDone(future2), 2052 peeker.getDone(future3), 2053 peeker.getDone(future4), 2054 peeker.getDone(future5)); 2055 } 2056 2057 @Override 2058 public String toString() { 2059 return function.toString(); 2060 } 2061 }, 2062 executor); 2063 } 2064 2065 /** 2066 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2067 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 2068 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 2069 * captured by the returned {@link ClosingFuture}). 2070 * 2071 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2072 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2073 * returned step. 2074 * 2075 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2076 * 2077 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2078 * ExecutionException} will be extracted and used as the failure of the derived step. 2079 * 2080 * <p>If the function throws any other exception, it will be used as the failure of the derived 2081 * step. 2082 * 2083 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2084 * the closeable objects in that {@code ClosingFuture} will be closed. 2085 * 2086 * <p>Usage guidelines for this method: 2087 * 2088 * <ul> 2089 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2090 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2091 * Executor)} instead, with a function that returns the next value directly. 2092 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 2093 * for every closeable object this step creates in order to capture it for later closing. 2094 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2095 * ClosingFuture} call {@link #from(ListenableFuture)}. 2096 * </ul> 2097 * 2098 * <p>The same warnings about doing heavyweight operations within {@link 2099 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2100 */ 2101 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 2102 final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2103 return callAsync( 2104 new AsyncCombiningCallable<U>() { 2105 @Override 2106 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2107 return function.apply( 2108 closer, 2109 peeker.getDone(future1), 2110 peeker.getDone(future2), 2111 peeker.getDone(future3), 2112 peeker.getDone(future4), 2113 peeker.getDone(future5)); 2114 } 2115 2116 @Override 2117 public String toString() { 2118 return function.toString(); 2119 } 2120 }, 2121 executor); 2122 } 2123 } 2124 2125 @Override 2126 public String toString() { 2127 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2128 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2129 } 2130 2131 @Override 2132 protected void finalize() { 2133 if (state.get().equals(OPEN)) { 2134 logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2135 FluentFuture<V> unused = finishToFuture(); 2136 } 2137 } 2138 2139 private static void closeQuietly(@CheckForNull final Closeable closeable, Executor executor) { 2140 if (closeable == null) { 2141 return; 2142 } 2143 try { 2144 executor.execute( 2145 () -> { 2146 try { 2147 closeable.close(); 2148 } catch (IOException | RuntimeException e) { 2149 logger.log(WARNING, "thrown by close()", e); 2150 } 2151 }); 2152 } catch (RejectedExecutionException e) { 2153 if (logger.isLoggable(WARNING)) { 2154 logger.log( 2155 WARNING, String.format("while submitting close to %s; will close inline", executor), e); 2156 } 2157 closeQuietly(closeable, directExecutor()); 2158 } 2159 } 2160 2161 private void checkAndUpdateState(State oldState, State newState) { 2162 checkState( 2163 compareAndUpdateState(oldState, newState), 2164 "Expected state to be %s, but it was %s", 2165 oldState, 2166 newState); 2167 } 2168 2169 private boolean compareAndUpdateState(State oldState, State newState) { 2170 return state.compareAndSet(oldState, newState); 2171 } 2172 2173 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2174 private static final class CloseableList extends IdentityHashMap<Closeable, Executor> 2175 implements Closeable { 2176 private final DeferredCloser closer = new DeferredCloser(this); 2177 private volatile boolean closed; 2178 @CheckForNull private volatile CountDownLatch whenClosed; 2179 2180 <V extends @Nullable Object, U extends @Nullable Object> 2181 ListenableFuture<U> applyClosingFunction( 2182 ClosingFunction<? super V, U> transformation, @ParametricNullness V input) 2183 throws Exception { 2184 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2185 CloseableList newCloseables = new CloseableList(); 2186 try { 2187 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2188 } finally { 2189 add(newCloseables, directExecutor()); 2190 } 2191 } 2192 2193 <V extends @Nullable Object, U extends @Nullable Object> 2194 FluentFuture<U> applyAsyncClosingFunction( 2195 AsyncClosingFunction<V, U> transformation, @ParametricNullness V input) 2196 throws Exception { 2197 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2198 CloseableList newCloseables = new CloseableList(); 2199 try { 2200 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2201 closingFuture.becomeSubsumedInto(newCloseables); 2202 return closingFuture.future; 2203 } finally { 2204 add(newCloseables, directExecutor()); 2205 } 2206 } 2207 2208 @Override 2209 public void close() { 2210 if (closed) { 2211 return; 2212 } 2213 synchronized (this) { 2214 if (closed) { 2215 return; 2216 } 2217 closed = true; 2218 } 2219 for (Map.Entry<Closeable, Executor> entry : entrySet()) { 2220 closeQuietly(entry.getKey(), entry.getValue()); 2221 } 2222 clear(); 2223 if (whenClosed != null) { 2224 whenClosed.countDown(); 2225 } 2226 } 2227 2228 void add(@CheckForNull Closeable closeable, Executor executor) { 2229 checkNotNull(executor); 2230 if (closeable == null) { 2231 return; 2232 } 2233 synchronized (this) { 2234 if (!closed) { 2235 put(closeable, executor); 2236 return; 2237 } 2238 } 2239 closeQuietly(closeable, executor); 2240 } 2241 2242 /** 2243 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2244 */ 2245 CountDownLatch whenClosedCountDown() { 2246 if (closed) { 2247 return new CountDownLatch(0); 2248 } 2249 synchronized (this) { 2250 if (closed) { 2251 return new CountDownLatch(0); 2252 } 2253 checkState(whenClosed == null); 2254 return whenClosed = new CountDownLatch(1); 2255 } 2256 } 2257 } 2258 2259 /** 2260 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2261 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2262 */ 2263 @VisibleForTesting 2264 CountDownLatch whenClosedCountDown() { 2265 return closeables.whenClosedCountDown(); 2266 } 2267 2268 /** The state of a {@link CloseableList}. */ 2269 enum State { 2270 /** The {@link CloseableList} has not been subsumed or closed. */ 2271 OPEN, 2272 2273 /** 2274 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2275 * into any other. 2276 */ 2277 SUBSUMED, 2278 2279 /** 2280 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2281 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2282 */ 2283 WILL_CLOSE, 2284 2285 /** 2286 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2287 * {@link CloseableList} may not be subsumed. 2288 */ 2289 CLOSING, 2290 2291 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2292 CLOSED, 2293 2294 /** 2295 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2296 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2297 */ 2298 WILL_CREATE_VALUE_AND_CLOSER, 2299 } 2300}