001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 036import static java.util.logging.Level.FINER; 037import static java.util.logging.Level.SEVERE; 038import static java.util.logging.Level.WARNING; 039 040import com.google.common.annotations.J2ktIncompatible; 041import com.google.common.annotations.VisibleForTesting; 042import com.google.common.collect.FluentIterable; 043import com.google.common.collect.ImmutableList; 044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 046import com.google.common.util.concurrent.Futures.FutureCombiner; 047import com.google.errorprone.annotations.CanIgnoreReturnValue; 048import com.google.errorprone.annotations.DoNotMock; 049import com.google.j2objc.annotations.RetainedWith; 050import java.io.Closeable; 051import java.util.IdentityHashMap; 052import java.util.Map; 053import java.util.concurrent.Callable; 054import java.util.concurrent.CancellationException; 055import java.util.concurrent.CountDownLatch; 056import java.util.concurrent.ExecutionException; 057import java.util.concurrent.Executor; 058import java.util.concurrent.Future; 059import java.util.concurrent.RejectedExecutionException; 060import java.util.concurrent.atomic.AtomicReference; 061import javax.annotation.CheckForNull; 062import org.checkerframework.checker.nullness.qual.Nullable; 063 064/** 065 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 066 * complete, some objects captured during the computation are closed. 067 * 068 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 069 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 070 * cancellation of the operation so far. The only way to extract the value or exception from a step 071 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 072 * "value" of a successful step or the "result" (value or exception) of any step. 073 * 074 * <ol> 075 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 076 * block or a {@link ListenableFuture}. 077 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 078 * can be captured for later closing. 079 * <li>There is one last step (the root of the tree), from which you can extract the final result 080 * of the computation. After that result is available (or the computation fails), all objects 081 * captured by any of the steps in the pipeline are closed. 082 * </ol> 083 * 084 * <h3>Starting a pipeline</h3> 085 * 086 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 087 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 088 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 089 * #from(ListenableFuture)} instead. 090 * 091 * <h3>Derived steps</h3> 092 * 093 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 094 * ways similar to {@link FluentFuture}s: 095 * 096 * <ul> 097 * <li>by transforming the value from a successful input step, 098 * <li>by catching the exception from a failed input step, or 099 * <li>by combining the results of several input steps. 100 * </ul> 101 * 102 * Each derivation can capture the next value or any intermediate objects for later closing. 103 * 104 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 105 * exception, or combine it with others, you cannot do anything else with it, including declare it 106 * to be the last step of the pipeline. 107 * 108 * <h4>Transforming</h4> 109 * 110 * To derive the next step by asynchronously applying a function to an input step's value, call 111 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 112 * Executor)} on the input step. 113 * 114 * <h4>Catching</h4> 115 * 116 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 117 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 118 * 119 * <h4>Combining</h4> 120 * 121 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 122 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 123 * 124 * <h3>Cancelling</h3> 125 * 126 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 127 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 128 * successfully cancelled step will immediately start closing all objects captured for later closing 129 * by it and by its input steps. 130 * 131 * <h3>Ending a pipeline</h3> 132 * 133 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 134 * close the captured objects automatically or manually. 135 * 136 * <h4>Automatically closing</h4> 137 * 138 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 139 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin 140 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future 141 * completes before closing starts, rather than once it has finished. 142 * 143 * <pre>{@code 144 * FluentFuture<UserName> userName = 145 * ClosingFuture.submit( 146 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 147 * executor) 148 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 149 * .transform((closer, result) -> result.get("userName"), directExecutor()) 150 * .catching(DBException.class, e -> "no user", directExecutor()) 151 * .finishToFuture(); 152 * }</pre> 153 * 154 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 155 * result cursor will both be closed, even if the operation is cancelled or fails. 156 * 157 * <h4>Manually closing</h4> 158 * 159 * If you want to close the captured objects manually, after you've used the final result, call 160 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 161 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 162 * 163 * <pre>{@code 164 * ClosingFuture.submit( 165 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 166 * executor) 167 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 168 * .transform((closer, result) -> result.get("userName"), directExecutor()) 169 * .catching(DBException.class, e -> "no user", directExecutor()) 170 * .finishToValueAndCloser( 171 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 172 * 173 * // later 174 * try { // get() will throw if the operation failed or was cancelled. 175 * UserName userName = userNameValueAndCloser.get(); 176 * // do something with userName 177 * } finally { 178 * userNameValueAndCloser.closeAsync(); 179 * } 180 * }</pre> 181 * 182 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 183 * the query result cursor will both be closed, even if the operation is cancelled or fails. 184 * 185 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 186 * automatic-closing approach described above is safer. 187 * 188 * @param <V> the type of the value of this step 189 * @since 30.0 190 */ 191// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 192@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 193@J2ktIncompatible 194// TODO(dpb): GWT compatibility. 195public final class ClosingFuture<V extends @Nullable Object> { 196 197 private static final LazyLogger logger = new LazyLogger(ClosingFuture.class); 198 199 /** 200 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 201 * done. 202 */ 203 public static final class DeferredCloser { 204 @RetainedWith private final CloseableList list; 205 206 DeferredCloser(CloseableList list) { 207 this.list = list; 208 } 209 210 /** 211 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 212 * 213 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 214 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 215 * Closeable}. (For more about the flavors, see <a 216 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 217 * build</a>.) 218 * 219 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 220 * building for Android): Ensure that any object you pass implements the interface not just in 221 * your current SDK version but also at the oldest version you support. For example, <a 222 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 223 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 224 * {@code Closeable} with a method reference like {@code cursor::close}. 225 * 226 * <p>Note that this method is still binary-compatible between flavors because the erasure of 227 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 228 * 229 * @param closeable the object to be closed (see notes above) 230 * @param closingExecutor the object will be closed on this executor 231 * @return the first argument 232 */ 233 @CanIgnoreReturnValue 234 @ParametricNullness 235 public <C extends @Nullable Object & @Nullable AutoCloseable> C eventuallyClose( 236 @ParametricNullness C closeable, Executor closingExecutor) { 237 checkNotNull(closingExecutor); 238 if (closeable != null) { 239 list.add(closeable, closingExecutor); 240 } 241 return closeable; 242 } 243 } 244 245 /** 246 * An operation that computes a result. 247 * 248 * @param <V> the type of the result 249 */ 250 public interface ClosingCallable<V extends @Nullable Object> { 251 /** 252 * Computes a result, or throws an exception if unable to do so. 253 * 254 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 255 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 256 * not before this method completes), even if this method throws or the pipeline is cancelled. 257 */ 258 @ParametricNullness 259 V call(DeferredCloser closer) throws Exception; 260 } 261 262 /** 263 * An operation that computes a {@link ClosingFuture} of a result. 264 * 265 * @param <V> the type of the result 266 * @since 30.1 267 */ 268 public interface AsyncClosingCallable<V extends @Nullable Object> { 269 /** 270 * Computes a result, or throws an exception if unable to do so. 271 * 272 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 273 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 274 * not before this method completes), even if this method throws or the pipeline is cancelled. 275 */ 276 ClosingFuture<V> call(DeferredCloser closer) throws Exception; 277 } 278 279 /** 280 * A function from an input to a result. 281 * 282 * @param <T> the type of the input to the function 283 * @param <U> the type of the result of the function 284 */ 285 public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 286 287 /** 288 * Applies this function to an input, or throws an exception if unable to do so. 289 * 290 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 291 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 292 * not before this method completes), even if this method throws or the pipeline is cancelled. 293 */ 294 @ParametricNullness 295 U apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 296 } 297 298 /** 299 * A function from an input to a {@link ClosingFuture} of a result. 300 * 301 * @param <T> the type of the input to the function 302 * @param <U> the type of the result of the function 303 */ 304 public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 305 /** 306 * Applies this function to an input, or throws an exception if unable to do so. 307 * 308 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 309 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 310 * not before this method completes), even if this method throws or the pipeline is cancelled. 311 */ 312 ClosingFuture<U> apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 313 } 314 315 /** 316 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 317 * allows the user to close all the closeable objects that were captured during it for later 318 * closing. 319 * 320 * <p>The asynchronous operation will have completed before this object is created. 321 * 322 * @param <V> the type of the value of a successful operation 323 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 324 */ 325 public static final class ValueAndCloser<V extends @Nullable Object> { 326 327 private final ClosingFuture<? extends V> closingFuture; 328 329 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 330 this.closingFuture = checkNotNull(closingFuture); 331 } 332 333 /** 334 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 335 * {@link Future#get()} would. 336 * 337 * <p>Because the asynchronous operation has already completed, this method is synchronous and 338 * returns immediately. 339 * 340 * @throws CancellationException if the computation was cancelled 341 * @throws ExecutionException if the computation threw an exception 342 */ 343 @ParametricNullness 344 public V get() throws ExecutionException { 345 return getDone(closingFuture.future); 346 } 347 348 /** 349 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 350 * operation on the {@link Executor}s specified by calls to {@link 351 * DeferredCloser#eventuallyClose(Object, Executor)}. 352 * 353 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 354 * closed synchronously. 355 * 356 * <p>Idempotent: objects will be closed at most once. 357 */ 358 public void closeAsync() { 359 closingFuture.close(); 360 } 361 } 362 363 /** 364 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 365 * ClosingFuture} pipeline. 366 * 367 * @param <V> the type of the final value of a successful pipeline 368 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 369 */ 370 public interface ValueAndCloserConsumer<V extends @Nullable Object> { 371 372 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 373 void accept(ValueAndCloser<V> valueAndCloser); 374 } 375 376 /** 377 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 378 * 379 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 380 * execution 381 */ 382 public static <V extends @Nullable Object> ClosingFuture<V> submit( 383 ClosingCallable<V> callable, Executor executor) { 384 checkNotNull(callable); 385 CloseableList closeables = new CloseableList(); 386 TrustedListenableFutureTask<V> task = 387 TrustedListenableFutureTask.create( 388 new Callable<V>() { 389 @Override 390 @ParametricNullness 391 public V call() throws Exception { 392 return callable.call(closeables.closer); 393 } 394 395 @Override 396 public String toString() { 397 return callable.toString(); 398 } 399 }); 400 executor.execute(task); 401 return new ClosingFuture<>(task, closeables); 402 } 403 404 /** 405 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 406 * 407 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 408 * execution 409 * @since 30.1 410 */ 411 public static <V extends @Nullable Object> ClosingFuture<V> submitAsync( 412 AsyncClosingCallable<V> callable, Executor executor) { 413 checkNotNull(callable); 414 CloseableList closeables = new CloseableList(); 415 TrustedListenableFutureTask<V> task = 416 TrustedListenableFutureTask.create( 417 new AsyncCallable<V>() { 418 @Override 419 public ListenableFuture<V> call() throws Exception { 420 CloseableList newCloseables = new CloseableList(); 421 try { 422 ClosingFuture<V> closingFuture = callable.call(newCloseables.closer); 423 closingFuture.becomeSubsumedInto(closeables); 424 return closingFuture.future; 425 } finally { 426 closeables.add(newCloseables, directExecutor()); 427 } 428 } 429 430 @Override 431 public String toString() { 432 return callable.toString(); 433 } 434 }); 435 executor.execute(task); 436 return new ClosingFuture<>(task, closeables); 437 } 438 439 /** 440 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 441 * 442 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 443 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 444 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 445 */ 446 public static <V extends @Nullable Object> ClosingFuture<V> from(ListenableFuture<V> future) { 447 return new ClosingFuture<>(future); 448 } 449 450 /** 451 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 452 * 453 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)}) when 454 * the pipeline is done, even if the pipeline is canceled or fails. 455 * 456 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 457 * value in order to close it. 458 * 459 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 460 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Object, 461 * Executor)}. 462 * @param closingExecutor the future's result will be closed on this executor 463 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 464 * underlying value may never be closed if the {@link Future} is canceled after its operation 465 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 466 * including those that pass them to this method, with {@link #submit(ClosingCallable, 467 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 468 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 469 * ClosingFuture#from}. 470 */ 471 @Deprecated 472 public static <C extends @Nullable Object & @Nullable AutoCloseable> 473 ClosingFuture<C> eventuallyClosing( 474 ListenableFuture<C> future, final Executor closingExecutor) { 475 checkNotNull(closingExecutor); 476 final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 477 Futures.addCallback( 478 future, 479 new FutureCallback<@Nullable AutoCloseable>() { 480 @Override 481 public void onSuccess(@CheckForNull AutoCloseable result) { 482 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 483 } 484 485 @Override 486 public void onFailure(Throwable t) {} 487 }, 488 directExecutor()); 489 return closingFuture; 490 } 491 492 /** 493 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 494 * 495 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 496 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 497 */ 498 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 499 return new Combiner(false, futures); 500 } 501 502 /** 503 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 504 * 505 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 506 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 507 */ 508 public static Combiner whenAllComplete( 509 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 510 return whenAllComplete(asList(future1, moreFutures)); 511 } 512 513 /** 514 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 515 * all succeed. If any fail, the resulting pipeline will fail. 516 * 517 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 518 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 519 */ 520 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 521 return new Combiner(true, futures); 522 } 523 524 /** 525 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 526 * they all succeed. If any fail, the resulting pipeline will fail. 527 * 528 * <p>Calling this method allows you to use lambdas or method references typed with the types of 529 * the input {@link ClosingFuture}s. 530 * 531 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 532 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 533 */ 534 public static <V1 extends @Nullable Object, V2 extends @Nullable Object> 535 Combiner2<V1, V2> whenAllSucceed(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 536 return new Combiner2<>(future1, future2); 537 } 538 539 /** 540 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 541 * they all succeed. If any fail, the resulting pipeline will fail. 542 * 543 * <p>Calling this method allows you to use lambdas or method references typed with the types of 544 * the input {@link ClosingFuture}s. 545 * 546 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 547 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 548 */ 549 public static < 550 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 551 Combiner3<V1, V2, V3> whenAllSucceed( 552 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 553 return new Combiner3<>(future1, future2, future3); 554 } 555 556 /** 557 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 558 * they all succeed. If any fail, the resulting pipeline will fail. 559 * 560 * <p>Calling this method allows you to use lambdas or method references typed with the types of 561 * the input {@link ClosingFuture}s. 562 * 563 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 564 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 565 */ 566 public static < 567 V1 extends @Nullable Object, 568 V2 extends @Nullable Object, 569 V3 extends @Nullable Object, 570 V4 extends @Nullable Object> 571 Combiner4<V1, V2, V3, V4> whenAllSucceed( 572 ClosingFuture<V1> future1, 573 ClosingFuture<V2> future2, 574 ClosingFuture<V3> future3, 575 ClosingFuture<V4> future4) { 576 return new Combiner4<>(future1, future2, future3, future4); 577 } 578 579 /** 580 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 581 * they all succeed. If any fail, the resulting pipeline will fail. 582 * 583 * <p>Calling this method allows you to use lambdas or method references typed with the types of 584 * the input {@link ClosingFuture}s. 585 * 586 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 587 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 588 */ 589 public static < 590 V1 extends @Nullable Object, 591 V2 extends @Nullable Object, 592 V3 extends @Nullable Object, 593 V4 extends @Nullable Object, 594 V5 extends @Nullable Object> 595 Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 596 ClosingFuture<V1> future1, 597 ClosingFuture<V2> future2, 598 ClosingFuture<V3> future3, 599 ClosingFuture<V4> future4, 600 ClosingFuture<V5> future5) { 601 return new Combiner5<>(future1, future2, future3, future4, future5); 602 } 603 604 /** 605 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 606 * all succeed. If any fail, the resulting pipeline will fail. 607 * 608 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 609 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 610 */ 611 public static Combiner whenAllSucceed( 612 ClosingFuture<?> future1, 613 ClosingFuture<?> future2, 614 ClosingFuture<?> future3, 615 ClosingFuture<?> future4, 616 ClosingFuture<?> future5, 617 ClosingFuture<?> future6, 618 ClosingFuture<?>... moreFutures) { 619 return whenAllSucceed( 620 FluentIterable.of(future1, future2, future3, future4, future5, future6) 621 .append(moreFutures)); 622 } 623 624 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 625 private final CloseableList closeables; 626 private final FluentFuture<V> future; 627 628 private ClosingFuture(ListenableFuture<V> future) { 629 this(future, new CloseableList()); 630 } 631 632 private ClosingFuture(ListenableFuture<V> future, CloseableList closeables) { 633 this.future = FluentFuture.from(future); 634 this.closeables = closeables; 635 } 636 637 /** 638 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 639 * future returns {@code null} if the step is successful or throws the same exception that would 640 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 641 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 642 * 643 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 644 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 645 * a derivation method <i>on the same instance</i>. This is important because calling {@code 646 * statusFuture} alone does not provide a way to close the pipeline. 647 */ 648 public ListenableFuture<?> statusFuture() { 649 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 650 } 651 652 /** 653 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 654 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 655 * when the pipeline is done. 656 * 657 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 658 * ClosingFuture} will be equivalent to this one. 659 * 660 * <p>If the function throws an exception, that exception is used as the result of the derived 661 * {@code ClosingFuture}. 662 * 663 * <p>Example usage: 664 * 665 * <pre>{@code 666 * ClosingFuture<List<Row>> rowsFuture = 667 * queryFuture.transform((closer, result) -> result.getRows(), executor); 668 * }</pre> 669 * 670 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 671 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 672 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 673 * 674 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 675 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 676 * the original {@code ClosingFuture} instance. 677 * 678 * @param function transforms the value of this step to the value of the derived step 679 * @param executor executor to run the function in 680 * @return the derived step 681 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 682 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 683 * finished} 684 */ 685 public <U extends @Nullable Object> ClosingFuture<U> transform( 686 final ClosingFunction<? super V, U> function, Executor executor) { 687 checkNotNull(function); 688 AsyncFunction<V, U> applyFunction = 689 new AsyncFunction<V, U>() { 690 @Override 691 public ListenableFuture<U> apply(V input) throws Exception { 692 return closeables.applyClosingFunction(function, input); 693 } 694 695 @Override 696 public String toString() { 697 return function.toString(); 698 } 699 }; 700 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 701 return derive(future.transformAsync(applyFunction, executor)); 702 } 703 704 /** 705 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 706 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 707 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 708 * captured by the returned {@link ClosingFuture}). 709 * 710 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 711 * returned by the function. 712 * 713 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 714 * ClosingFuture} will be equivalent to this one. 715 * 716 * <p>If the function throws an exception, that exception is used as the result of the derived 717 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 718 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 719 * closed. 720 * 721 * <p>Usage guidelines for this method: 722 * 723 * <ul> 724 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 725 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 726 * Executor)} instead, with a function that returns the next value directly. 727 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 728 * for every closeable object this step creates in order to capture it for later closing. 729 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 730 * ClosingFuture} call {@link #from(ListenableFuture)}. 731 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 732 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 733 * {@link #withoutCloser(AsyncFunction)} 734 * </ul> 735 * 736 * <p>Example usage: 737 * 738 * <pre>{@code 739 * // Result.getRowsClosingFuture() returns a ClosingFuture. 740 * ClosingFuture<List<Row>> rowsFuture = 741 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 742 * 743 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 744 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 745 * // Closeable). 746 * ClosingFuture<Integer> rowsFuture2 = 747 * queryFuture.transformAsync( 748 * (closer, result) -> { 749 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 750 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 751 * }, 752 * executor); 753 * 754 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 755 * ClosingFuture<List<Row>> rowsFuture3 = 756 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 757 * 758 * }</pre> 759 * 760 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 761 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 762 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 763 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 764 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 765 * responsible for completing the returned {@code ClosingFuture}.) 766 * 767 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 768 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 769 * the original {@code ClosingFuture} instance. 770 * 771 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 772 * the derived step 773 * @param executor executor to run the function in 774 * @return the derived step 775 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 776 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 777 * finished} 778 */ 779 public <U extends @Nullable Object> ClosingFuture<U> transformAsync( 780 final AsyncClosingFunction<? super V, U> function, Executor executor) { 781 checkNotNull(function); 782 AsyncFunction<V, U> applyFunction = 783 new AsyncFunction<V, U>() { 784 @Override 785 public ListenableFuture<U> apply(V input) throws Exception { 786 return closeables.applyAsyncClosingFunction(function, input); 787 } 788 789 @Override 790 public String toString() { 791 return function.toString(); 792 } 793 }; 794 return derive(future.transformAsync(applyFunction, executor)); 795 } 796 797 /** 798 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 799 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 800 * {@link ListenableFuture}. 801 * 802 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 803 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 804 * meets these conditions: 805 * 806 * <ul> 807 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 808 * DeferredCloser#eventuallyClose(Object, Executor)}. 809 * <li>It returns a {@link ListenableFuture}. 810 * </ul> 811 * 812 * <p>Example usage: 813 * 814 * <pre>{@code 815 * // Result.getRowsFuture() returns a ListenableFuture. 816 * ClosingFuture<List<Row>> rowsFuture = 817 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 818 * }</pre> 819 * 820 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 821 * ListenableFuture} with the value of a derived step 822 */ 823 public static <V extends @Nullable Object, U extends @Nullable Object> 824 AsyncClosingFunction<V, U> withoutCloser(final AsyncFunction<V, U> function) { 825 checkNotNull(function); 826 return new AsyncClosingFunction<V, U>() { 827 @Override 828 public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 829 return ClosingFuture.from(function.apply(input)); 830 } 831 }; 832 } 833 834 /** 835 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 836 * to its exception if it is an instance of a given exception type. The function can use a {@link 837 * DeferredCloser} to capture objects to be closed when the pipeline is done. 838 * 839 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 840 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 841 * one. 842 * 843 * <p>If the function throws an exception, that exception is used as the result of the derived 844 * {@code ClosingFuture}. 845 * 846 * <p>Example usage: 847 * 848 * <pre>{@code 849 * ClosingFuture<QueryResult> queryFuture = 850 * queryFuture.catching( 851 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 852 * }</pre> 853 * 854 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 855 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 856 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 857 * 858 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 859 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 860 * the original {@code ClosingFuture} instance. 861 * 862 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 863 * type is matched against this step's exception. "This step's exception" means the cause of 864 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 865 * underlying this step or, if {@code get()} throws a different kind of exception, that 866 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 867 * prefer more specific types, avoiding {@code Throwable.class} in particular. 868 * @param fallback the function to be called if this step fails with the expected exception type. 869 * The function's argument is this step's exception. "This step's exception" means the cause 870 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 871 * underlying this step or, if {@code get()} throws a different kind of exception, that 872 * exception itself. 873 * @param executor the executor that runs {@code fallback} if the input fails 874 */ 875 public <X extends Throwable> ClosingFuture<V> catching( 876 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 877 return catchingMoreGeneric(exceptionType, fallback, executor); 878 } 879 880 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 881 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 882 Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 883 checkNotNull(fallback); 884 AsyncFunction<X, W> applyFallback = 885 new AsyncFunction<X, W>() { 886 @Override 887 public ListenableFuture<W> apply(X exception) throws Exception { 888 return closeables.applyClosingFunction(fallback, exception); 889 } 890 891 @Override 892 public String toString() { 893 return fallback.toString(); 894 } 895 }; 896 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 897 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 898 } 899 900 /** 901 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 902 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 903 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 904 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 905 * 906 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 907 * ClosingFuture} will be equivalent to the one returned by the function. 908 * 909 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 910 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 911 * one. 912 * 913 * <p>If the function throws an exception, that exception is used as the result of the derived 914 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 915 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 916 * closed. 917 * 918 * <p>Usage guidelines for this method: 919 * 920 * <ul> 921 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 922 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 923 * ClosingFunction, Executor)} instead, with a function that returns the next value 924 * directly. 925 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 926 * for every closeable object this step creates in order to capture it for later closing. 927 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 928 * ClosingFuture} call {@link #from(ListenableFuture)}. 929 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 930 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 931 * {@link #withoutCloser(AsyncFunction)} 932 * </ul> 933 * 934 * <p>Example usage: 935 * 936 * <pre>{@code 937 * // Fall back to a secondary input stream in case of IOException. 938 * ClosingFuture<InputStream> inputFuture = 939 * firstInputFuture.catchingAsync( 940 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 941 * } 942 * }</pre> 943 * 944 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 945 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 946 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 947 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 948 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 949 * responsible for completing the returned {@code ClosingFuture}.) 950 * 951 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 952 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 953 * the original {@code ClosingFuture} instance. 954 * 955 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 956 * type is matched against this step's exception. "This step's exception" means the cause of 957 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 958 * underlying this step or, if {@code get()} throws a different kind of exception, that 959 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 960 * prefer more specific types, avoiding {@code Throwable.class} in particular. 961 * @param fallback the function to be called if this step fails with the expected exception type. 962 * The function's argument is this step's exception. "This step's exception" means the cause 963 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 964 * underlying this step or, if {@code get()} throws a different kind of exception, that 965 * exception itself. 966 * @param executor the executor that runs {@code fallback} if the input fails 967 */ 968 // TODO(dpb): Should this do something special if the function throws CancellationException or 969 // ExecutionException? 970 public <X extends Throwable> ClosingFuture<V> catchingAsync( 971 Class<X> exceptionType, 972 AsyncClosingFunction<? super X, ? extends V> fallback, 973 Executor executor) { 974 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 975 } 976 977 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 978 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 979 Class<X> exceptionType, 980 final AsyncClosingFunction<? super X, W> fallback, 981 Executor executor) { 982 checkNotNull(fallback); 983 AsyncFunction<X, W> asyncFunction = 984 new AsyncFunction<X, W>() { 985 @Override 986 public ListenableFuture<W> apply(X exception) throws Exception { 987 return closeables.applyAsyncClosingFunction(fallback, exception); 988 } 989 990 @Override 991 public String toString() { 992 return fallback.toString(); 993 } 994 }; 995 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 996 } 997 998 /** 999 * Marks this step as the last step in the {@code ClosingFuture} pipeline. 1000 * 1001 * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when 1002 * the pipeline is cancelled. 1003 * 1004 * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously 1005 * <b>after</b> the returned {@code Future} is done: the future completes before closing starts, 1006 * rather than once it has finished. 1007 * 1008 * <p>After calling this method, you may not call {@link 1009 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 1010 * derivation method on the original {@code ClosingFuture} instance. 1011 * 1012 * @return a {@link Future} that represents the final value or exception of the pipeline 1013 */ 1014 public FluentFuture<V> finishToFuture() { 1015 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 1016 logger.get().log(FINER, "will close {0}", this); 1017 future.addListener( 1018 new Runnable() { 1019 @Override 1020 public void run() { 1021 checkAndUpdateState(WILL_CLOSE, CLOSING); 1022 close(); 1023 checkAndUpdateState(CLOSING, CLOSED); 1024 } 1025 }, 1026 directExecutor()); 1027 } else { 1028 switch (state.get()) { 1029 case SUBSUMED: 1030 throw new IllegalStateException( 1031 "Cannot call finishToFuture() after deriving another step"); 1032 1033 case WILL_CREATE_VALUE_AND_CLOSER: 1034 throw new IllegalStateException( 1035 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 1036 1037 case WILL_CLOSE: 1038 case CLOSING: 1039 case CLOSED: 1040 throw new IllegalStateException("Cannot call finishToFuture() twice"); 1041 1042 case OPEN: 1043 throw new AssertionError(); 1044 } 1045 } 1046 return future; 1047 } 1048 1049 /** 1050 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 1051 * {@code receiver} will be called with an object that contains the result of the operation. The 1052 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 1053 * 1054 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 1055 * any other derivation method on the original {@code ClosingFuture} instance. 1056 * 1057 * @param consumer a callback whose method will be called (using {@code executor}) when this 1058 * operation is done 1059 */ 1060 public void finishToValueAndCloser( 1061 final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 1062 checkNotNull(consumer); 1063 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 1064 switch (state.get()) { 1065 case SUBSUMED: 1066 throw new IllegalStateException( 1067 "Cannot call finishToValueAndCloser() after deriving another step"); 1068 1069 case WILL_CLOSE: 1070 case CLOSING: 1071 case CLOSED: 1072 throw new IllegalStateException( 1073 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1074 1075 case WILL_CREATE_VALUE_AND_CLOSER: 1076 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1077 1078 case OPEN: 1079 break; 1080 } 1081 throw new AssertionError(state); 1082 } 1083 future.addListener( 1084 new Runnable() { 1085 @Override 1086 public void run() { 1087 provideValueAndCloser(consumer, ClosingFuture.this); 1088 } 1089 }, 1090 executor); 1091 } 1092 1093 private static <C extends @Nullable Object, V extends C> void provideValueAndCloser( 1094 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1095 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1096 } 1097 1098 /** 1099 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1100 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1101 * successful, and this step has not started when {@code cancel} is called, this step should never 1102 * run. 1103 * 1104 * <p>If successful, causes the objects captured by this step (if already started) and its input 1105 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1106 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1107 * 1108 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1109 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1110 * cancelled regardless 1111 * @return {@code false} if the step could not be cancelled, typically because it has already 1112 * completed normally; {@code true} otherwise 1113 */ 1114 @CanIgnoreReturnValue 1115 @SuppressWarnings("Interruption") // We are propagating an interrupt from a caller. 1116 public boolean cancel(boolean mayInterruptIfRunning) { 1117 logger.get().log(FINER, "cancelling {0}", this); 1118 boolean cancelled = future.cancel(mayInterruptIfRunning); 1119 if (cancelled) { 1120 close(); 1121 } 1122 return cancelled; 1123 } 1124 1125 private void close() { 1126 logger.get().log(FINER, "closing {0}", this); 1127 closeables.close(); 1128 } 1129 1130 private <U extends @Nullable Object> ClosingFuture<U> derive(FluentFuture<U> future) { 1131 ClosingFuture<U> derived = new ClosingFuture<>(future); 1132 becomeSubsumedInto(derived.closeables); 1133 return derived; 1134 } 1135 1136 private void becomeSubsumedInto(CloseableList otherCloseables) { 1137 checkAndUpdateState(OPEN, SUBSUMED); 1138 otherCloseables.add(closeables, directExecutor()); 1139 } 1140 1141 /** 1142 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1143 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1144 * 1145 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1146 */ 1147 public static final class Peeker { 1148 private final ImmutableList<ClosingFuture<?>> futures; 1149 private volatile boolean beingCalled; 1150 1151 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1152 this.futures = checkNotNull(futures); 1153 } 1154 1155 /** 1156 * Returns the value of {@code closingFuture}. 1157 * 1158 * @throws ExecutionException if {@code closingFuture} is a failed step 1159 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1160 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1161 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1162 * @throws IllegalStateException if called outside of a call to {@link 1163 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1164 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1165 */ 1166 @ParametricNullness 1167 public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture) 1168 throws ExecutionException { 1169 checkState(beingCalled); 1170 checkArgument(futures.contains(closingFuture)); 1171 return Futures.getDone(closingFuture.future); 1172 } 1173 1174 @ParametricNullness 1175 private <V extends @Nullable Object> V call( 1176 CombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1177 beingCalled = true; 1178 CloseableList newCloseables = new CloseableList(); 1179 try { 1180 return combiner.call(newCloseables.closer, this); 1181 } finally { 1182 closeables.add(newCloseables, directExecutor()); 1183 beingCalled = false; 1184 } 1185 } 1186 1187 private <V extends @Nullable Object> FluentFuture<V> callAsync( 1188 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1189 beingCalled = true; 1190 CloseableList newCloseables = new CloseableList(); 1191 try { 1192 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1193 closingFuture.becomeSubsumedInto(closeables); 1194 return closingFuture.future; 1195 } finally { 1196 closeables.add(newCloseables, directExecutor()); 1197 beingCalled = false; 1198 } 1199 } 1200 } 1201 1202 /** 1203 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1204 * 1205 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1206 * instantiate this class. 1207 * 1208 * <p>Example: 1209 * 1210 * <pre>{@code 1211 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1212 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1213 * ListenableFuture<Integer> numberOfDifferentLines = 1214 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1215 * .call( 1216 * (closer, peeker) -> { 1217 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1218 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1219 * return countDifferentLines(file1Reader, file2Reader); 1220 * }, 1221 * executor) 1222 * .closing(executor); 1223 * }</pre> 1224 */ 1225 @DoNotMock("Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1226 public static class Combiner { 1227 1228 private final CloseableList closeables = new CloseableList(); 1229 1230 /** 1231 * An operation that returns a result and may throw an exception. 1232 * 1233 * @param <V> the type of the result 1234 */ 1235 public interface CombiningCallable<V extends @Nullable Object> { 1236 /** 1237 * Computes a result, or throws an exception if unable to do so. 1238 * 1239 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1240 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1241 * (but not before this method completes), even if this method throws or the pipeline is 1242 * cancelled. 1243 * 1244 * @param peeker used to get the value of any of the input futures 1245 */ 1246 @ParametricNullness 1247 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1248 } 1249 1250 /** 1251 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1252 * 1253 * @param <V> the type of the result 1254 */ 1255 public interface AsyncCombiningCallable<V extends @Nullable Object> { 1256 /** 1257 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1258 * 1259 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1260 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1261 * (but not before this method completes), even if this method throws or the pipeline is 1262 * cancelled. 1263 * 1264 * @param peeker used to get the value of any of the input futures 1265 */ 1266 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1267 } 1268 1269 private final boolean allMustSucceed; 1270 protected final ImmutableList<ClosingFuture<?>> inputs; 1271 1272 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1273 this.allMustSucceed = allMustSucceed; 1274 this.inputs = ImmutableList.copyOf(inputs); 1275 for (ClosingFuture<?> input : inputs) { 1276 input.becomeSubsumedInto(closeables); 1277 } 1278 } 1279 1280 /** 1281 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1282 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1283 * objects to be closed when the pipeline is done. 1284 * 1285 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1286 * fail, so will the returned step. 1287 * 1288 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1289 * cancelled. 1290 * 1291 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1292 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1293 */ 1294 public <V extends @Nullable Object> ClosingFuture<V> call( 1295 final CombiningCallable<V> combiningCallable, Executor executor) { 1296 Callable<V> callable = 1297 new Callable<V>() { 1298 @Override 1299 @ParametricNullness 1300 public V call() throws Exception { 1301 return new Peeker(inputs).call(combiningCallable, closeables); 1302 } 1303 1304 @Override 1305 public String toString() { 1306 return combiningCallable.toString(); 1307 } 1308 }; 1309 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1310 derived.closeables.add(closeables, directExecutor()); 1311 return derived; 1312 } 1313 1314 /** 1315 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1316 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1317 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1318 * captured by the returned {@link ClosingFuture}). 1319 * 1320 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1321 * fail, so will the returned step. 1322 * 1323 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1324 * cancelled. 1325 * 1326 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1327 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1328 * 1329 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1330 * derived step. 1331 * 1332 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1333 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1334 * 1335 * <p>Usage guidelines for this method: 1336 * 1337 * <ul> 1338 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1339 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1340 * Executor)} instead, with a function that returns the next value directly. 1341 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1342 * for every closeable object this step creates in order to capture it for later closing. 1343 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1344 * ClosingFuture} call {@link #from(ListenableFuture)}. 1345 * </ul> 1346 * 1347 * <p>The same warnings about doing heavyweight operations within {@link 1348 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1349 */ 1350 public <V extends @Nullable Object> ClosingFuture<V> callAsync( 1351 final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1352 AsyncCallable<V> asyncCallable = 1353 new AsyncCallable<V>() { 1354 @Override 1355 public ListenableFuture<V> call() throws Exception { 1356 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1357 } 1358 1359 @Override 1360 public String toString() { 1361 return combiningCallable.toString(); 1362 } 1363 }; 1364 ClosingFuture<V> derived = 1365 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1366 derived.closeables.add(closeables, directExecutor()); 1367 return derived; 1368 } 1369 1370 private FutureCombiner<@Nullable Object> futureCombiner() { 1371 return allMustSucceed 1372 ? Futures.whenAllSucceed(inputFutures()) 1373 : Futures.whenAllComplete(inputFutures()); 1374 } 1375 1376 1377 private ImmutableList<FluentFuture<?>> inputFutures() { 1378 return FluentIterable.from(inputs) 1379 .<FluentFuture<?>>transform(future -> future.future) 1380 .toList(); 1381 } 1382 } 1383 1384 /** 1385 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1386 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1387 * combination. 1388 * 1389 * @param <V1> the type returned by the first future 1390 * @param <V2> the type returned by the second future 1391 */ 1392 public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object> 1393 extends Combiner { 1394 1395 /** 1396 * A function that returns a value when applied to the values of the two futures passed to 1397 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1398 * 1399 * @param <V1> the type returned by the first future 1400 * @param <V2> the type returned by the second future 1401 * @param <U> the type returned by the function 1402 */ 1403 public interface ClosingFunction2< 1404 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1405 1406 /** 1407 * Applies this function to two inputs, or throws an exception if unable to do so. 1408 * 1409 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1410 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1411 * (but not before this method completes), even if this method throws or the pipeline is 1412 * cancelled. 1413 */ 1414 @ParametricNullness 1415 U apply(DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1416 throws Exception; 1417 } 1418 1419 /** 1420 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1421 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1422 * 1423 * @param <V1> the type returned by the first future 1424 * @param <V2> the type returned by the second future 1425 * @param <U> the type returned by the function 1426 */ 1427 public interface AsyncClosingFunction2< 1428 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1429 1430 /** 1431 * Applies this function to two inputs, or throws an exception if unable to do so. 1432 * 1433 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1434 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1435 * (but not before this method completes), even if this method throws or the pipeline is 1436 * cancelled. 1437 */ 1438 ClosingFuture<U> apply( 1439 DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1440 throws Exception; 1441 } 1442 1443 private final ClosingFuture<V1> future1; 1444 private final ClosingFuture<V2> future2; 1445 1446 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1447 super(true, ImmutableList.of(future1, future2)); 1448 this.future1 = future1; 1449 this.future2 = future2; 1450 } 1451 1452 /** 1453 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1454 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1455 * objects to be closed when the pipeline is done. 1456 * 1457 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1458 * any of the inputs fail, so will the returned step. 1459 * 1460 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1461 * 1462 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1463 * ExecutionException} will be extracted and used as the failure of the derived step. 1464 */ 1465 public <U extends @Nullable Object> ClosingFuture<U> call( 1466 final ClosingFunction2<V1, V2, U> function, Executor executor) { 1467 return call( 1468 new CombiningCallable<U>() { 1469 @Override 1470 @ParametricNullness 1471 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1472 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1473 } 1474 1475 @Override 1476 public String toString() { 1477 return function.toString(); 1478 } 1479 }, 1480 executor); 1481 } 1482 1483 /** 1484 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1485 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1486 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1487 * captured by the returned {@link ClosingFuture}). 1488 * 1489 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1490 * any of the inputs fail, so will the returned step. 1491 * 1492 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1493 * 1494 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1495 * ExecutionException} will be extracted and used as the failure of the derived step. 1496 * 1497 * <p>If the function throws any other exception, it will be used as the failure of the derived 1498 * step. 1499 * 1500 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1501 * the closeable objects in that {@code ClosingFuture} will be closed. 1502 * 1503 * <p>Usage guidelines for this method: 1504 * 1505 * <ul> 1506 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1507 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1508 * Executor)} instead, with a function that returns the next value directly. 1509 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1510 * for every closeable object this step creates in order to capture it for later closing. 1511 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1512 * ClosingFuture} call {@link #from(ListenableFuture)}. 1513 * </ul> 1514 * 1515 * <p>The same warnings about doing heavyweight operations within {@link 1516 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1517 */ 1518 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1519 final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1520 return callAsync( 1521 new AsyncCombiningCallable<U>() { 1522 @Override 1523 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1524 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1525 } 1526 1527 @Override 1528 public String toString() { 1529 return function.toString(); 1530 } 1531 }, 1532 executor); 1533 } 1534 } 1535 1536 /** 1537 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1538 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1539 * ClosingFuture)} to start this combination. 1540 * 1541 * @param <V1> the type returned by the first future 1542 * @param <V2> the type returned by the second future 1543 * @param <V3> the type returned by the third future 1544 */ 1545 public static final class Combiner3< 1546 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 1547 extends Combiner { 1548 /** 1549 * A function that returns a value when applied to the values of the three futures passed to 1550 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1551 * 1552 * @param <V1> the type returned by the first future 1553 * @param <V2> the type returned by the second future 1554 * @param <V3> the type returned by the third future 1555 * @param <U> the type returned by the function 1556 */ 1557 public interface ClosingFunction3< 1558 V1 extends @Nullable Object, 1559 V2 extends @Nullable Object, 1560 V3 extends @Nullable Object, 1561 U extends @Nullable Object> { 1562 /** 1563 * Applies this function to three inputs, or throws an exception if unable to do so. 1564 * 1565 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1566 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1567 * (but not before this method completes), even if this method throws or the pipeline is 1568 * cancelled. 1569 */ 1570 @ParametricNullness 1571 U apply( 1572 DeferredCloser closer, 1573 @ParametricNullness V1 value1, 1574 @ParametricNullness V2 value2, 1575 @ParametricNullness V3 value3) 1576 throws Exception; 1577 } 1578 1579 /** 1580 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1581 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1582 * 1583 * @param <V1> the type returned by the first future 1584 * @param <V2> the type returned by the second future 1585 * @param <V3> the type returned by the third future 1586 * @param <U> the type returned by the function 1587 */ 1588 public interface AsyncClosingFunction3< 1589 V1 extends @Nullable Object, 1590 V2 extends @Nullable Object, 1591 V3 extends @Nullable Object, 1592 U extends @Nullable Object> { 1593 /** 1594 * Applies this function to three inputs, or throws an exception if unable to do so. 1595 * 1596 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1597 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1598 * (but not before this method completes), even if this method throws or the pipeline is 1599 * cancelled. 1600 */ 1601 ClosingFuture<U> apply( 1602 DeferredCloser closer, 1603 @ParametricNullness V1 value1, 1604 @ParametricNullness V2 value2, 1605 @ParametricNullness V3 value3) 1606 throws Exception; 1607 } 1608 1609 private final ClosingFuture<V1> future1; 1610 private final ClosingFuture<V2> future2; 1611 private final ClosingFuture<V3> future3; 1612 1613 private Combiner3( 1614 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1615 super(true, ImmutableList.of(future1, future2, future3)); 1616 this.future1 = future1; 1617 this.future2 = future2; 1618 this.future3 = future3; 1619 } 1620 1621 /** 1622 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1623 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1624 * objects to be closed when the pipeline is done. 1625 * 1626 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1627 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1628 * 1629 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1630 * 1631 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1632 * ExecutionException} will be extracted and used as the failure of the derived step. 1633 */ 1634 public <U extends @Nullable Object> ClosingFuture<U> call( 1635 final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1636 return call( 1637 new CombiningCallable<U>() { 1638 @Override 1639 @ParametricNullness 1640 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1641 return function.apply( 1642 closer, 1643 peeker.getDone(future1), 1644 peeker.getDone(future2), 1645 peeker.getDone(future3)); 1646 } 1647 1648 @Override 1649 public String toString() { 1650 return function.toString(); 1651 } 1652 }, 1653 executor); 1654 } 1655 1656 /** 1657 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1658 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1659 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1660 * captured by the returned {@link ClosingFuture}). 1661 * 1662 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1663 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1664 * 1665 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1666 * 1667 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1668 * ExecutionException} will be extracted and used as the failure of the derived step. 1669 * 1670 * <p>If the function throws any other exception, it will be used as the failure of the derived 1671 * step. 1672 * 1673 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1674 * the closeable objects in that {@code ClosingFuture} will be closed. 1675 * 1676 * <p>Usage guidelines for this method: 1677 * 1678 * <ul> 1679 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1680 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1681 * Executor)} instead, with a function that returns the next value directly. 1682 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1683 * for every closeable object this step creates in order to capture it for later closing. 1684 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1685 * ClosingFuture} call {@link #from(ListenableFuture)}. 1686 * </ul> 1687 * 1688 * <p>The same warnings about doing heavyweight operations within {@link 1689 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1690 */ 1691 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1692 final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1693 return callAsync( 1694 new AsyncCombiningCallable<U>() { 1695 @Override 1696 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1697 return function.apply( 1698 closer, 1699 peeker.getDone(future1), 1700 peeker.getDone(future2), 1701 peeker.getDone(future3)); 1702 } 1703 1704 @Override 1705 public String toString() { 1706 return function.toString(); 1707 } 1708 }, 1709 executor); 1710 } 1711 } 1712 1713 /** 1714 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1715 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1716 * ClosingFuture)} to start this combination. 1717 * 1718 * @param <V1> the type returned by the first future 1719 * @param <V2> the type returned by the second future 1720 * @param <V3> the type returned by the third future 1721 * @param <V4> the type returned by the fourth future 1722 */ 1723 public static final class Combiner4< 1724 V1 extends @Nullable Object, 1725 V2 extends @Nullable Object, 1726 V3 extends @Nullable Object, 1727 V4 extends @Nullable Object> 1728 extends Combiner { 1729 /** 1730 * A function that returns a value when applied to the values of the four futures passed to 1731 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1732 * 1733 * @param <V1> the type returned by the first future 1734 * @param <V2> the type returned by the second future 1735 * @param <V3> the type returned by the third future 1736 * @param <V4> the type returned by the fourth future 1737 * @param <U> the type returned by the function 1738 */ 1739 public interface ClosingFunction4< 1740 V1 extends @Nullable Object, 1741 V2 extends @Nullable Object, 1742 V3 extends @Nullable Object, 1743 V4 extends @Nullable Object, 1744 U extends @Nullable Object> { 1745 /** 1746 * Applies this function to four inputs, or throws an exception if unable to do so. 1747 * 1748 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1749 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1750 * (but not before this method completes), even if this method throws or the pipeline is 1751 * cancelled. 1752 */ 1753 @ParametricNullness 1754 U apply( 1755 DeferredCloser closer, 1756 @ParametricNullness V1 value1, 1757 @ParametricNullness V2 value2, 1758 @ParametricNullness V3 value3, 1759 @ParametricNullness V4 value4) 1760 throws Exception; 1761 } 1762 1763 /** 1764 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1765 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1766 * ClosingFuture)}. 1767 * 1768 * @param <V1> the type returned by the first future 1769 * @param <V2> the type returned by the second future 1770 * @param <V3> the type returned by the third future 1771 * @param <V4> the type returned by the fourth future 1772 * @param <U> the type returned by the function 1773 */ 1774 public interface AsyncClosingFunction4< 1775 V1 extends @Nullable Object, 1776 V2 extends @Nullable Object, 1777 V3 extends @Nullable Object, 1778 V4 extends @Nullable Object, 1779 U extends @Nullable Object> { 1780 /** 1781 * Applies this function to four inputs, or throws an exception if unable to do so. 1782 * 1783 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1784 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1785 * (but not before this method completes), even if this method throws or the pipeline is 1786 * cancelled. 1787 */ 1788 ClosingFuture<U> apply( 1789 DeferredCloser closer, 1790 @ParametricNullness V1 value1, 1791 @ParametricNullness V2 value2, 1792 @ParametricNullness V3 value3, 1793 @ParametricNullness V4 value4) 1794 throws Exception; 1795 } 1796 1797 private final ClosingFuture<V1> future1; 1798 private final ClosingFuture<V2> future2; 1799 private final ClosingFuture<V3> future3; 1800 private final ClosingFuture<V4> future4; 1801 1802 private Combiner4( 1803 ClosingFuture<V1> future1, 1804 ClosingFuture<V2> future2, 1805 ClosingFuture<V3> future3, 1806 ClosingFuture<V4> future4) { 1807 super(true, ImmutableList.of(future1, future2, future3, future4)); 1808 this.future1 = future1; 1809 this.future2 = future2; 1810 this.future3 = future3; 1811 this.future4 = future4; 1812 } 1813 1814 /** 1815 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1816 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1817 * objects to be closed when the pipeline is done. 1818 * 1819 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1820 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1821 * 1822 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1823 * 1824 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1825 * ExecutionException} will be extracted and used as the failure of the derived step. 1826 */ 1827 public <U extends @Nullable Object> ClosingFuture<U> call( 1828 final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1829 return call( 1830 new CombiningCallable<U>() { 1831 @Override 1832 @ParametricNullness 1833 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1834 return function.apply( 1835 closer, 1836 peeker.getDone(future1), 1837 peeker.getDone(future2), 1838 peeker.getDone(future3), 1839 peeker.getDone(future4)); 1840 } 1841 1842 @Override 1843 public String toString() { 1844 return function.toString(); 1845 } 1846 }, 1847 executor); 1848 } 1849 1850 /** 1851 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1852 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1853 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1854 * captured by the returned {@link ClosingFuture}). 1855 * 1856 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1857 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1858 * 1859 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1860 * 1861 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1862 * ExecutionException} will be extracted and used as the failure of the derived step. 1863 * 1864 * <p>If the function throws any other exception, it will be used as the failure of the derived 1865 * step. 1866 * 1867 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1868 * the closeable objects in that {@code ClosingFuture} will be closed. 1869 * 1870 * <p>Usage guidelines for this method: 1871 * 1872 * <ul> 1873 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1874 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1875 * Executor)} instead, with a function that returns the next value directly. 1876 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1877 * for every closeable object this step creates in order to capture it for later closing. 1878 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1879 * ClosingFuture} call {@link #from(ListenableFuture)}. 1880 * </ul> 1881 * 1882 * <p>The same warnings about doing heavyweight operations within {@link 1883 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1884 */ 1885 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1886 final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1887 return callAsync( 1888 new AsyncCombiningCallable<U>() { 1889 @Override 1890 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1891 return function.apply( 1892 closer, 1893 peeker.getDone(future1), 1894 peeker.getDone(future2), 1895 peeker.getDone(future3), 1896 peeker.getDone(future4)); 1897 } 1898 1899 @Override 1900 public String toString() { 1901 return function.toString(); 1902 } 1903 }, 1904 executor); 1905 } 1906 } 1907 1908 /** 1909 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1910 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1911 * ClosingFuture, ClosingFuture)} to start this combination. 1912 * 1913 * @param <V1> the type returned by the first future 1914 * @param <V2> the type returned by the second future 1915 * @param <V3> the type returned by the third future 1916 * @param <V4> the type returned by the fourth future 1917 * @param <V5> the type returned by the fifth future 1918 */ 1919 public static final class Combiner5< 1920 V1 extends @Nullable Object, 1921 V2 extends @Nullable Object, 1922 V3 extends @Nullable Object, 1923 V4 extends @Nullable Object, 1924 V5 extends @Nullable Object> 1925 extends Combiner { 1926 /** 1927 * A function that returns a value when applied to the values of the five futures passed to 1928 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1929 * ClosingFuture)}. 1930 * 1931 * @param <V1> the type returned by the first future 1932 * @param <V2> the type returned by the second future 1933 * @param <V3> the type returned by the third future 1934 * @param <V4> the type returned by the fourth future 1935 * @param <V5> the type returned by the fifth future 1936 * @param <U> the type returned by the function 1937 */ 1938 public interface ClosingFunction5< 1939 V1 extends @Nullable Object, 1940 V2 extends @Nullable Object, 1941 V3 extends @Nullable Object, 1942 V4 extends @Nullable Object, 1943 V5 extends @Nullable Object, 1944 U extends @Nullable Object> { 1945 /** 1946 * Applies this function to five inputs, or throws an exception if unable to do so. 1947 * 1948 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1949 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1950 * (but not before this method completes), even if this method throws or the pipeline is 1951 * cancelled. 1952 */ 1953 @ParametricNullness 1954 U apply( 1955 DeferredCloser closer, 1956 @ParametricNullness V1 value1, 1957 @ParametricNullness V2 value2, 1958 @ParametricNullness V3 value3, 1959 @ParametricNullness V4 value4, 1960 @ParametricNullness V5 value5) 1961 throws Exception; 1962 } 1963 1964 /** 1965 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1966 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1967 * ClosingFuture, ClosingFuture)}. 1968 * 1969 * @param <V1> the type returned by the first future 1970 * @param <V2> the type returned by the second future 1971 * @param <V3> the type returned by the third future 1972 * @param <V4> the type returned by the fourth future 1973 * @param <V5> the type returned by the fifth future 1974 * @param <U> the type returned by the function 1975 */ 1976 public interface AsyncClosingFunction5< 1977 V1 extends @Nullable Object, 1978 V2 extends @Nullable Object, 1979 V3 extends @Nullable Object, 1980 V4 extends @Nullable Object, 1981 V5 extends @Nullable Object, 1982 U extends @Nullable Object> { 1983 /** 1984 * Applies this function to five inputs, or throws an exception if unable to do so. 1985 * 1986 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1987 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1988 * (but not before this method completes), even if this method throws or the pipeline is 1989 * cancelled. 1990 */ 1991 ClosingFuture<U> apply( 1992 DeferredCloser closer, 1993 @ParametricNullness V1 value1, 1994 @ParametricNullness V2 value2, 1995 @ParametricNullness V3 value3, 1996 @ParametricNullness V4 value4, 1997 @ParametricNullness V5 value5) 1998 throws Exception; 1999 } 2000 2001 private final ClosingFuture<V1> future1; 2002 private final ClosingFuture<V2> future2; 2003 private final ClosingFuture<V3> future3; 2004 private final ClosingFuture<V4> future4; 2005 private final ClosingFuture<V5> future5; 2006 2007 private Combiner5( 2008 ClosingFuture<V1> future1, 2009 ClosingFuture<V2> future2, 2010 ClosingFuture<V3> future3, 2011 ClosingFuture<V4> future4, 2012 ClosingFuture<V5> future5) { 2013 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 2014 this.future1 = future1; 2015 this.future2 = future2; 2016 this.future3 = future3; 2017 this.future4 = future4; 2018 this.future5 = future5; 2019 } 2020 2021 /** 2022 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2023 * combining function to their values. The function can use a {@link DeferredCloser} to capture 2024 * objects to be closed when the pipeline is done. 2025 * 2026 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2027 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2028 * returned step. 2029 * 2030 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2031 * 2032 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2033 * ExecutionException} will be extracted and used as the failure of the derived step. 2034 */ 2035 public <U extends @Nullable Object> ClosingFuture<U> call( 2036 final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2037 return call( 2038 new CombiningCallable<U>() { 2039 @Override 2040 @ParametricNullness 2041 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 2042 return function.apply( 2043 closer, 2044 peeker.getDone(future1), 2045 peeker.getDone(future2), 2046 peeker.getDone(future3), 2047 peeker.getDone(future4), 2048 peeker.getDone(future5)); 2049 } 2050 2051 @Override 2052 public String toString() { 2053 return function.toString(); 2054 } 2055 }, 2056 executor); 2057 } 2058 2059 /** 2060 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2061 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 2062 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 2063 * captured by the returned {@link ClosingFuture}). 2064 * 2065 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2066 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2067 * returned step. 2068 * 2069 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2070 * 2071 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2072 * ExecutionException} will be extracted and used as the failure of the derived step. 2073 * 2074 * <p>If the function throws any other exception, it will be used as the failure of the derived 2075 * step. 2076 * 2077 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2078 * the closeable objects in that {@code ClosingFuture} will be closed. 2079 * 2080 * <p>Usage guidelines for this method: 2081 * 2082 * <ul> 2083 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2084 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2085 * Executor)} instead, with a function that returns the next value directly. 2086 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 2087 * for every closeable object this step creates in order to capture it for later closing. 2088 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2089 * ClosingFuture} call {@link #from(ListenableFuture)}. 2090 * </ul> 2091 * 2092 * <p>The same warnings about doing heavyweight operations within {@link 2093 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2094 */ 2095 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 2096 final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2097 return callAsync( 2098 new AsyncCombiningCallable<U>() { 2099 @Override 2100 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2101 return function.apply( 2102 closer, 2103 peeker.getDone(future1), 2104 peeker.getDone(future2), 2105 peeker.getDone(future3), 2106 peeker.getDone(future4), 2107 peeker.getDone(future5)); 2108 } 2109 2110 @Override 2111 public String toString() { 2112 return function.toString(); 2113 } 2114 }, 2115 executor); 2116 } 2117 } 2118 2119 @Override 2120 public String toString() { 2121 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2122 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2123 } 2124 2125 @SuppressWarnings({"removal", "Finalize"}) // b/260137033 2126 @Override 2127 protected void finalize() { 2128 if (state.get().equals(OPEN)) { 2129 logger.get().log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2130 FluentFuture<V> unused = finishToFuture(); 2131 } 2132 } 2133 2134 private static void closeQuietly(@CheckForNull final AutoCloseable closeable, Executor executor) { 2135 if (closeable == null) { 2136 return; 2137 } 2138 try { 2139 executor.execute( 2140 () -> { 2141 try { 2142 closeable.close(); 2143 } catch (Exception e) { 2144 /* 2145 * In guava-jre, any kind of Exception may be thrown because `closeable` has type 2146 * `AutoCloseable`. 2147 * 2148 * In guava-android, the only kinds of Exception that may be thrown are 2149 * RuntimeException and IOException because `closeable` has type `Closeable`—except 2150 * that we have to account for sneaky checked exception. 2151 */ 2152 restoreInterruptIfIsInterruptedException(e); 2153 logger.get().log(WARNING, "thrown by close()", e); 2154 } 2155 }); 2156 } catch (RejectedExecutionException e) { 2157 if (logger.get().isLoggable(WARNING)) { 2158 logger 2159 .get() 2160 .log( 2161 WARNING, 2162 String.format("while submitting close to %s; will close inline", executor), 2163 e); 2164 } 2165 closeQuietly(closeable, directExecutor()); 2166 } 2167 } 2168 2169 private void checkAndUpdateState(State oldState, State newState) { 2170 checkState( 2171 compareAndUpdateState(oldState, newState), 2172 "Expected state to be %s, but it was %s", 2173 oldState, 2174 newState); 2175 } 2176 2177 private boolean compareAndUpdateState(State oldState, State newState) { 2178 return state.compareAndSet(oldState, newState); 2179 } 2180 2181 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2182 private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor> 2183 implements Closeable { 2184 private final DeferredCloser closer = new DeferredCloser(this); 2185 private volatile boolean closed; 2186 @CheckForNull private volatile CountDownLatch whenClosed; 2187 2188 <V extends @Nullable Object, U extends @Nullable Object> 2189 ListenableFuture<U> applyClosingFunction( 2190 ClosingFunction<? super V, U> transformation, @ParametricNullness V input) 2191 throws Exception { 2192 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2193 CloseableList newCloseables = new CloseableList(); 2194 try { 2195 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2196 } finally { 2197 add(newCloseables, directExecutor()); 2198 } 2199 } 2200 2201 <V extends @Nullable Object, U extends @Nullable Object> 2202 FluentFuture<U> applyAsyncClosingFunction( 2203 AsyncClosingFunction<V, U> transformation, @ParametricNullness V input) 2204 throws Exception { 2205 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2206 CloseableList newCloseables = new CloseableList(); 2207 try { 2208 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2209 closingFuture.becomeSubsumedInto(newCloseables); 2210 return closingFuture.future; 2211 } finally { 2212 add(newCloseables, directExecutor()); 2213 } 2214 } 2215 2216 @Override 2217 public void close() { 2218 if (closed) { 2219 return; 2220 } 2221 synchronized (this) { 2222 if (closed) { 2223 return; 2224 } 2225 closed = true; 2226 } 2227 for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) { 2228 closeQuietly(entry.getKey(), entry.getValue()); 2229 } 2230 clear(); 2231 if (whenClosed != null) { 2232 whenClosed.countDown(); 2233 } 2234 } 2235 2236 void add(@CheckForNull AutoCloseable closeable, Executor executor) { 2237 checkNotNull(executor); 2238 if (closeable == null) { 2239 return; 2240 } 2241 synchronized (this) { 2242 if (!closed) { 2243 put(closeable, executor); 2244 return; 2245 } 2246 } 2247 closeQuietly(closeable, executor); 2248 } 2249 2250 /** 2251 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2252 */ 2253 CountDownLatch whenClosedCountDown() { 2254 if (closed) { 2255 return new CountDownLatch(0); 2256 } 2257 synchronized (this) { 2258 if (closed) { 2259 return new CountDownLatch(0); 2260 } 2261 checkState(whenClosed == null); 2262 return whenClosed = new CountDownLatch(1); 2263 } 2264 } 2265 } 2266 2267 /** 2268 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2269 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2270 */ 2271 @VisibleForTesting 2272 CountDownLatch whenClosedCountDown() { 2273 return closeables.whenClosedCountDown(); 2274 } 2275 2276 /** The state of a {@link CloseableList}. */ 2277 enum State { 2278 /** The {@link CloseableList} has not been subsumed or closed. */ 2279 OPEN, 2280 2281 /** 2282 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2283 * into any other. 2284 */ 2285 SUBSUMED, 2286 2287 /** 2288 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2289 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2290 */ 2291 WILL_CLOSE, 2292 2293 /** 2294 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2295 * {@link CloseableList} may not be subsumed. 2296 */ 2297 CLOSING, 2298 2299 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2300 CLOSED, 2301 2302 /** 2303 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2304 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2305 */ 2306 WILL_CREATE_VALUE_AND_CLOSER, 2307 } 2308}