001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 036import static java.util.logging.Level.FINER; 037import static java.util.logging.Level.SEVERE; 038import static java.util.logging.Level.WARNING; 039 040import com.google.common.annotations.J2ktIncompatible; 041import com.google.common.annotations.VisibleForTesting; 042import com.google.common.collect.FluentIterable; 043import com.google.common.collect.ImmutableList; 044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 046import com.google.common.util.concurrent.Futures.FutureCombiner; 047import com.google.errorprone.annotations.CanIgnoreReturnValue; 048import com.google.errorprone.annotations.DoNotMock; 049import com.google.j2objc.annotations.RetainedWith; 050import java.io.Closeable; 051import java.util.IdentityHashMap; 052import java.util.Map; 053import java.util.concurrent.Callable; 054import java.util.concurrent.CancellationException; 055import java.util.concurrent.CountDownLatch; 056import java.util.concurrent.ExecutionException; 057import java.util.concurrent.Executor; 058import java.util.concurrent.Future; 059import java.util.concurrent.RejectedExecutionException; 060import java.util.concurrent.atomic.AtomicReference; 061import javax.annotation.CheckForNull; 062import org.checkerframework.checker.nullness.qual.Nullable; 063 064/** 065 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 066 * complete, some objects captured during the computation are closed. 067 * 068 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 069 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 070 * cancellation of the operation so far. The only way to extract the value or exception from a step 071 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 072 * "value" of a successful step or the "result" (value or exception) of any step. 073 * 074 * <ol> 075 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 076 * block or a {@link ListenableFuture}. 077 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 078 * can be captured for later closing. 079 * <li>There is one last step (the root of the tree), from which you can extract the final result 080 * of the computation. After that result is available (or the computation fails), all objects 081 * captured by any of the steps in the pipeline are closed. 082 * </ol> 083 * 084 * <h3>Starting a pipeline</h3> 085 * 086 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 087 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 088 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 089 * #from(ListenableFuture)} instead. 090 * 091 * <h3>Derived steps</h3> 092 * 093 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 094 * ways similar to {@link FluentFuture}s: 095 * 096 * <ul> 097 * <li>by transforming the value from a successful input step, 098 * <li>by catching the exception from a failed input step, or 099 * <li>by combining the results of several input steps. 100 * </ul> 101 * 102 * Each derivation can capture the next value or any intermediate objects for later closing. 103 * 104 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 105 * exception, or combine it with others, you cannot do anything else with it, including declare it 106 * to be the last step of the pipeline. 107 * 108 * <h4>Transforming</h4> 109 * 110 * To derive the next step by asynchronously applying a function to an input step's value, call 111 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 112 * Executor)} on the input step. 113 * 114 * <h4>Catching</h4> 115 * 116 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 117 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 118 * 119 * <h4>Combining</h4> 120 * 121 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 122 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 123 * 124 * <h3>Cancelling</h3> 125 * 126 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 127 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 128 * successfully cancelled step will immediately start closing all objects captured for later closing 129 * by it and by its input steps. 130 * 131 * <h3>Ending a pipeline</h3> 132 * 133 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 134 * close the captured objects automatically or manually. 135 * 136 * <h4>Automatically closing</h4> 137 * 138 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 139 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin 140 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future 141 * completes before closing starts, rather than once it has finished. 142 * 143 * <pre>{@code 144 * FluentFuture<UserName> userName = 145 * ClosingFuture.submit( 146 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 147 * executor) 148 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 149 * .transform((closer, result) -> result.get("userName"), directExecutor()) 150 * .catching(DBException.class, e -> "no user", directExecutor()) 151 * .finishToFuture(); 152 * }</pre> 153 * 154 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 155 * result cursor will both be closed, even if the operation is cancelled or fails. 156 * 157 * <h4>Manually closing</h4> 158 * 159 * If you want to close the captured objects manually, after you've used the final result, call 160 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 161 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 162 * 163 * <pre>{@code 164 * ClosingFuture.submit( 165 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 166 * executor) 167 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 168 * .transform((closer, result) -> result.get("userName"), directExecutor()) 169 * .catching(DBException.class, e -> "no user", directExecutor()) 170 * .finishToValueAndCloser( 171 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 172 * 173 * // later 174 * try { // get() will throw if the operation failed or was cancelled. 175 * UserName userName = userNameValueAndCloser.get(); 176 * // do something with userName 177 * } finally { 178 * userNameValueAndCloser.closeAsync(); 179 * } 180 * }</pre> 181 * 182 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 183 * the query result cursor will both be closed, even if the operation is cancelled or fails. 184 * 185 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 186 * automatic-closing approach described above is safer. 187 * 188 * @param <V> the type of the value of this step 189 * @since 30.0 190 */ 191// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 192@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 193@J2ktIncompatible 194@ElementTypesAreNonnullByDefault 195// TODO(dpb): GWT compatibility. 196public final class ClosingFuture<V extends @Nullable Object> { 197 198 private static final LazyLogger logger = new LazyLogger(ClosingFuture.class); 199 200 /** 201 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 202 * done. 203 */ 204 public static final class DeferredCloser { 205 @RetainedWith private final CloseableList list; 206 207 DeferredCloser(CloseableList list) { 208 this.list = list; 209 } 210 211 /** 212 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 213 * 214 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 215 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 216 * Closeable}. (For more about the flavors, see <a 217 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 218 * build</a>.) 219 * 220 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 221 * building for Android): Ensure that any object you pass implements the interface not just in 222 * your current SDK version but also at the oldest version you support. For example, <a 223 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 224 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 225 * {@code Closeable} with a method reference like {@code cursor::close}. 226 * 227 * <p>Note that this method is still binary-compatible between flavors because the erasure of 228 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 229 * 230 * @param closeable the object to be closed (see notes above) 231 * @param closingExecutor the object will be closed on this executor 232 * @return the first argument 233 */ 234 @CanIgnoreReturnValue 235 @ParametricNullness 236 public <C extends @Nullable Object & @Nullable AutoCloseable> C eventuallyClose( 237 @ParametricNullness C closeable, Executor closingExecutor) { 238 checkNotNull(closingExecutor); 239 if (closeable != null) { 240 list.add(closeable, closingExecutor); 241 } 242 return closeable; 243 } 244 } 245 246 /** 247 * An operation that computes a result. 248 * 249 * @param <V> the type of the result 250 */ 251 public interface ClosingCallable<V extends @Nullable Object> { 252 /** 253 * Computes a result, or throws an exception if unable to do so. 254 * 255 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 256 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 257 * not before this method completes), even if this method throws or the pipeline is cancelled. 258 */ 259 @ParametricNullness 260 V call(DeferredCloser closer) throws Exception; 261 } 262 263 /** 264 * An operation that computes a {@link ClosingFuture} of a result. 265 * 266 * @param <V> the type of the result 267 * @since 30.1 268 */ 269 public interface AsyncClosingCallable<V extends @Nullable Object> { 270 /** 271 * Computes a result, or throws an exception if unable to do so. 272 * 273 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 274 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 275 * not before this method completes), even if this method throws or the pipeline is cancelled. 276 */ 277 ClosingFuture<V> call(DeferredCloser closer) throws Exception; 278 } 279 280 /** 281 * A function from an input to a result. 282 * 283 * @param <T> the type of the input to the function 284 * @param <U> the type of the result of the function 285 */ 286 public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 287 288 /** 289 * Applies this function to an input, or throws an exception if unable to do so. 290 * 291 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 292 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 293 * not before this method completes), even if this method throws or the pipeline is cancelled. 294 */ 295 @ParametricNullness 296 U apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 297 } 298 299 /** 300 * A function from an input to a {@link ClosingFuture} of a result. 301 * 302 * @param <T> the type of the input to the function 303 * @param <U> the type of the result of the function 304 */ 305 public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 306 /** 307 * Applies this function to an input, or throws an exception if unable to do so. 308 * 309 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 310 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 311 * not before this method completes), even if this method throws or the pipeline is cancelled. 312 */ 313 ClosingFuture<U> apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 314 } 315 316 /** 317 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 318 * allows the user to close all the closeable objects that were captured during it for later 319 * closing. 320 * 321 * <p>The asynchronous operation will have completed before this object is created. 322 * 323 * @param <V> the type of the value of a successful operation 324 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 325 */ 326 public static final class ValueAndCloser<V extends @Nullable Object> { 327 328 private final ClosingFuture<? extends V> closingFuture; 329 330 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 331 this.closingFuture = checkNotNull(closingFuture); 332 } 333 334 /** 335 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 336 * {@link Future#get()} would. 337 * 338 * <p>Because the asynchronous operation has already completed, this method is synchronous and 339 * returns immediately. 340 * 341 * @throws CancellationException if the computation was cancelled 342 * @throws ExecutionException if the computation threw an exception 343 */ 344 @ParametricNullness 345 public V get() throws ExecutionException { 346 return getDone(closingFuture.future); 347 } 348 349 /** 350 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 351 * operation on the {@link Executor}s specified by calls to {@link 352 * DeferredCloser#eventuallyClose(Object, Executor)}. 353 * 354 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 355 * closed synchronously. 356 * 357 * <p>Idempotent: objects will be closed at most once. 358 */ 359 public void closeAsync() { 360 closingFuture.close(); 361 } 362 } 363 364 /** 365 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 366 * ClosingFuture} pipeline. 367 * 368 * @param <V> the type of the final value of a successful pipeline 369 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 370 */ 371 public interface ValueAndCloserConsumer<V extends @Nullable Object> { 372 373 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 374 void accept(ValueAndCloser<V> valueAndCloser); 375 } 376 377 /** 378 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 379 * 380 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 381 * execution 382 */ 383 public static <V extends @Nullable Object> ClosingFuture<V> submit( 384 ClosingCallable<V> callable, Executor executor) { 385 return new ClosingFuture<>(callable, executor); 386 } 387 388 /** 389 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 390 * 391 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 392 * execution 393 * @since 30.1 394 */ 395 public static <V extends @Nullable Object> ClosingFuture<V> submitAsync( 396 AsyncClosingCallable<V> callable, Executor executor) { 397 return new ClosingFuture<>(callable, executor); 398 } 399 400 /** 401 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 402 * 403 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 404 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 405 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 406 */ 407 public static <V extends @Nullable Object> ClosingFuture<V> from(ListenableFuture<V> future) { 408 return new ClosingFuture<>(future); 409 } 410 411 /** 412 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 413 * 414 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)}) when 415 * the pipeline is done, even if the pipeline is canceled or fails. 416 * 417 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 418 * value in order to close it. 419 * 420 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 421 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Object, 422 * Executor)}. 423 * @param closingExecutor the future's result will be closed on this executor 424 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 425 * underlying value may never be closed if the {@link Future} is canceled after its operation 426 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 427 * including those that pass them to this method, with {@link #submit(ClosingCallable, 428 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 429 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 430 * ClosingFuture#from}. 431 */ 432 @Deprecated 433 public static <C extends @Nullable Object & @Nullable AutoCloseable> 434 ClosingFuture<C> eventuallyClosing( 435 ListenableFuture<C> future, final Executor closingExecutor) { 436 checkNotNull(closingExecutor); 437 final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 438 Futures.addCallback( 439 future, 440 new FutureCallback<@Nullable AutoCloseable>() { 441 @Override 442 public void onSuccess(@CheckForNull AutoCloseable result) { 443 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 444 } 445 446 @Override 447 public void onFailure(Throwable t) {} 448 }, 449 directExecutor()); 450 return closingFuture; 451 } 452 453 /** 454 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 455 * 456 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 457 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 458 */ 459 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 460 return new Combiner(false, futures); 461 } 462 463 /** 464 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 465 * 466 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 467 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 468 */ 469 public static Combiner whenAllComplete( 470 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 471 return whenAllComplete(asList(future1, moreFutures)); 472 } 473 474 /** 475 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 476 * all succeed. If any fail, the resulting pipeline will fail. 477 * 478 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 479 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 480 */ 481 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 482 return new Combiner(true, futures); 483 } 484 485 /** 486 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 487 * they all succeed. If any fail, the resulting pipeline will fail. 488 * 489 * <p>Calling this method allows you to use lambdas or method references typed with the types of 490 * the input {@link ClosingFuture}s. 491 * 492 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 493 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 494 */ 495 public static <V1 extends @Nullable Object, V2 extends @Nullable Object> 496 Combiner2<V1, V2> whenAllSucceed(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 497 return new Combiner2<>(future1, future2); 498 } 499 500 /** 501 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 502 * they all succeed. If any fail, the resulting pipeline will fail. 503 * 504 * <p>Calling this method allows you to use lambdas or method references typed with the types of 505 * the input {@link ClosingFuture}s. 506 * 507 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 508 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 509 */ 510 public static < 511 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 512 Combiner3<V1, V2, V3> whenAllSucceed( 513 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 514 return new Combiner3<>(future1, future2, future3); 515 } 516 517 /** 518 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 519 * they all succeed. If any fail, the resulting pipeline will fail. 520 * 521 * <p>Calling this method allows you to use lambdas or method references typed with the types of 522 * the input {@link ClosingFuture}s. 523 * 524 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 525 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 526 */ 527 public static < 528 V1 extends @Nullable Object, 529 V2 extends @Nullable Object, 530 V3 extends @Nullable Object, 531 V4 extends @Nullable Object> 532 Combiner4<V1, V2, V3, V4> whenAllSucceed( 533 ClosingFuture<V1> future1, 534 ClosingFuture<V2> future2, 535 ClosingFuture<V3> future3, 536 ClosingFuture<V4> future4) { 537 return new Combiner4<>(future1, future2, future3, future4); 538 } 539 540 /** 541 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 542 * they all succeed. If any fail, the resulting pipeline will fail. 543 * 544 * <p>Calling this method allows you to use lambdas or method references typed with the types of 545 * the input {@link ClosingFuture}s. 546 * 547 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 548 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 549 */ 550 public static < 551 V1 extends @Nullable Object, 552 V2 extends @Nullable Object, 553 V3 extends @Nullable Object, 554 V4 extends @Nullable Object, 555 V5 extends @Nullable Object> 556 Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 557 ClosingFuture<V1> future1, 558 ClosingFuture<V2> future2, 559 ClosingFuture<V3> future3, 560 ClosingFuture<V4> future4, 561 ClosingFuture<V5> future5) { 562 return new Combiner5<>(future1, future2, future3, future4, future5); 563 } 564 565 /** 566 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 567 * all succeed. If any fail, the resulting pipeline will fail. 568 * 569 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 570 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 571 */ 572 public static Combiner whenAllSucceed( 573 ClosingFuture<?> future1, 574 ClosingFuture<?> future2, 575 ClosingFuture<?> future3, 576 ClosingFuture<?> future4, 577 ClosingFuture<?> future5, 578 ClosingFuture<?> future6, 579 ClosingFuture<?>... moreFutures) { 580 return whenAllSucceed( 581 FluentIterable.of(future1, future2, future3, future4, future5, future6) 582 .append(moreFutures)); 583 } 584 585 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 586 private final CloseableList closeables = new CloseableList(); 587 private final FluentFuture<V> future; 588 589 private ClosingFuture(ListenableFuture<V> future) { 590 this.future = FluentFuture.from(future); 591 } 592 593 private ClosingFuture(final ClosingCallable<V> callable, Executor executor) { 594 checkNotNull(callable); 595 TrustedListenableFutureTask<V> task = 596 TrustedListenableFutureTask.create( 597 new Callable<V>() { 598 @Override 599 @ParametricNullness 600 public V call() throws Exception { 601 return callable.call(closeables.closer); 602 } 603 604 @Override 605 public String toString() { 606 return callable.toString(); 607 } 608 }); 609 executor.execute(task); 610 this.future = task; 611 } 612 613 private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) { 614 checkNotNull(callable); 615 TrustedListenableFutureTask<V> task = 616 TrustedListenableFutureTask.create( 617 new AsyncCallable<V>() { 618 @Override 619 public ListenableFuture<V> call() throws Exception { 620 CloseableList newCloseables = new CloseableList(); 621 try { 622 ClosingFuture<V> closingFuture = callable.call(newCloseables.closer); 623 closingFuture.becomeSubsumedInto(closeables); 624 return closingFuture.future; 625 } finally { 626 closeables.add(newCloseables, directExecutor()); 627 } 628 } 629 630 @Override 631 public String toString() { 632 return callable.toString(); 633 } 634 }); 635 executor.execute(task); 636 this.future = task; 637 } 638 639 /** 640 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 641 * future returns {@code null} if the step is successful or throws the same exception that would 642 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 643 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 644 * 645 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 646 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 647 * a derivation method <i>on the same instance</i>. This is important because calling {@code 648 * statusFuture} alone does not provide a way to close the pipeline. 649 */ 650 public ListenableFuture<?> statusFuture() { 651 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 652 } 653 654 /** 655 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 656 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 657 * when the pipeline is done. 658 * 659 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 660 * ClosingFuture} will be equivalent to this one. 661 * 662 * <p>If the function throws an exception, that exception is used as the result of the derived 663 * {@code ClosingFuture}. 664 * 665 * <p>Example usage: 666 * 667 * <pre>{@code 668 * ClosingFuture<List<Row>> rowsFuture = 669 * queryFuture.transform((closer, result) -> result.getRows(), executor); 670 * }</pre> 671 * 672 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 673 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 674 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 675 * 676 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 677 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 678 * the original {@code ClosingFuture} instance. 679 * 680 * @param function transforms the value of this step to the value of the derived step 681 * @param executor executor to run the function in 682 * @return the derived step 683 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 684 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 685 * finished} 686 */ 687 public <U extends @Nullable Object> ClosingFuture<U> transform( 688 final ClosingFunction<? super V, U> function, Executor executor) { 689 checkNotNull(function); 690 AsyncFunction<V, U> applyFunction = 691 new AsyncFunction<V, U>() { 692 @Override 693 public ListenableFuture<U> apply(V input) throws Exception { 694 return closeables.applyClosingFunction(function, input); 695 } 696 697 @Override 698 public String toString() { 699 return function.toString(); 700 } 701 }; 702 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 703 return derive(future.transformAsync(applyFunction, executor)); 704 } 705 706 /** 707 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 708 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 709 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 710 * captured by the returned {@link ClosingFuture}). 711 * 712 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 713 * returned by the function. 714 * 715 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 716 * ClosingFuture} will be equivalent to this one. 717 * 718 * <p>If the function throws an exception, that exception is used as the result of the derived 719 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 720 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 721 * closed. 722 * 723 * <p>Usage guidelines for this method: 724 * 725 * <ul> 726 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 727 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 728 * Executor)} instead, with a function that returns the next value directly. 729 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 730 * for every closeable object this step creates in order to capture it for later closing. 731 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 732 * ClosingFuture} call {@link #from(ListenableFuture)}. 733 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 734 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 735 * {@link #withoutCloser(AsyncFunction)} 736 * </ul> 737 * 738 * <p>Example usage: 739 * 740 * <pre>{@code 741 * // Result.getRowsClosingFuture() returns a ClosingFuture. 742 * ClosingFuture<List<Row>> rowsFuture = 743 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 744 * 745 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 746 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 747 * // Closeable). 748 * ClosingFuture<Integer> rowsFuture2 = 749 * queryFuture.transformAsync( 750 * (closer, result) -> { 751 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 752 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 753 * }, 754 * executor); 755 * 756 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 757 * ClosingFuture<List<Row>> rowsFuture3 = 758 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 759 * 760 * }</pre> 761 * 762 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 763 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 764 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 765 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 766 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 767 * responsible for completing the returned {@code ClosingFuture}.) 768 * 769 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 770 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 771 * the original {@code ClosingFuture} instance. 772 * 773 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 774 * the derived step 775 * @param executor executor to run the function in 776 * @return the derived step 777 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 778 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 779 * finished} 780 */ 781 public <U extends @Nullable Object> ClosingFuture<U> transformAsync( 782 final AsyncClosingFunction<? super V, U> function, Executor executor) { 783 checkNotNull(function); 784 AsyncFunction<V, U> applyFunction = 785 new AsyncFunction<V, U>() { 786 @Override 787 public ListenableFuture<U> apply(V input) throws Exception { 788 return closeables.applyAsyncClosingFunction(function, input); 789 } 790 791 @Override 792 public String toString() { 793 return function.toString(); 794 } 795 }; 796 return derive(future.transformAsync(applyFunction, executor)); 797 } 798 799 /** 800 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 801 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 802 * {@link ListenableFuture}. 803 * 804 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 805 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 806 * meets these conditions: 807 * 808 * <ul> 809 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 810 * DeferredCloser#eventuallyClose(Object, Executor)}. 811 * <li>It returns a {@link ListenableFuture}. 812 * </ul> 813 * 814 * <p>Example usage: 815 * 816 * <pre>{@code 817 * // Result.getRowsFuture() returns a ListenableFuture. 818 * ClosingFuture<List<Row>> rowsFuture = 819 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 820 * }</pre> 821 * 822 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 823 * ListenableFuture} with the value of a derived step 824 */ 825 public static <V extends @Nullable Object, U extends @Nullable Object> 826 AsyncClosingFunction<V, U> withoutCloser(final AsyncFunction<V, U> function) { 827 checkNotNull(function); 828 return new AsyncClosingFunction<V, U>() { 829 @Override 830 public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 831 return ClosingFuture.from(function.apply(input)); 832 } 833 }; 834 } 835 836 /** 837 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 838 * to its exception if it is an instance of a given exception type. The function can use a {@link 839 * DeferredCloser} to capture objects to be closed when the pipeline is done. 840 * 841 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 842 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 843 * one. 844 * 845 * <p>If the function throws an exception, that exception is used as the result of the derived 846 * {@code ClosingFuture}. 847 * 848 * <p>Example usage: 849 * 850 * <pre>{@code 851 * ClosingFuture<QueryResult> queryFuture = 852 * queryFuture.catching( 853 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 854 * }</pre> 855 * 856 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 857 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 858 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 859 * 860 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 861 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 862 * the original {@code ClosingFuture} instance. 863 * 864 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 865 * type is matched against this step's exception. "This step's exception" means the cause of 866 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 867 * underlying this step or, if {@code get()} throws a different kind of exception, that 868 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 869 * prefer more specific types, avoiding {@code Throwable.class} in particular. 870 * @param fallback the function to be called if this step fails with the expected exception type. 871 * The function's argument is this step's exception. "This step's exception" means the cause 872 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 873 * underlying this step or, if {@code get()} throws a different kind of exception, that 874 * exception itself. 875 * @param executor the executor that runs {@code fallback} if the input fails 876 */ 877 public <X extends Throwable> ClosingFuture<V> catching( 878 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 879 return catchingMoreGeneric(exceptionType, fallback, executor); 880 } 881 882 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 883 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 884 Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 885 checkNotNull(fallback); 886 AsyncFunction<X, W> applyFallback = 887 new AsyncFunction<X, W>() { 888 @Override 889 public ListenableFuture<W> apply(X exception) throws Exception { 890 return closeables.applyClosingFunction(fallback, exception); 891 } 892 893 @Override 894 public String toString() { 895 return fallback.toString(); 896 } 897 }; 898 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 899 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 900 } 901 902 /** 903 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 904 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 905 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 906 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 907 * 908 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 909 * ClosingFuture} will be equivalent to the one returned by the function. 910 * 911 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 912 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 913 * one. 914 * 915 * <p>If the function throws an exception, that exception is used as the result of the derived 916 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 917 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 918 * closed. 919 * 920 * <p>Usage guidelines for this method: 921 * 922 * <ul> 923 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 924 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 925 * ClosingFunction, Executor)} instead, with a function that returns the next value 926 * directly. 927 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 928 * for every closeable object this step creates in order to capture it for later closing. 929 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 930 * ClosingFuture} call {@link #from(ListenableFuture)}. 931 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 932 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 933 * {@link #withoutCloser(AsyncFunction)} 934 * </ul> 935 * 936 * <p>Example usage: 937 * 938 * <pre>{@code 939 * // Fall back to a secondary input stream in case of IOException. 940 * ClosingFuture<InputStream> inputFuture = 941 * firstInputFuture.catchingAsync( 942 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 943 * } 944 * }</pre> 945 * 946 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 947 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 948 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 949 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 950 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 951 * responsible for completing the returned {@code ClosingFuture}.) 952 * 953 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 954 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 955 * the original {@code ClosingFuture} instance. 956 * 957 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 958 * type is matched against this step's exception. "This step's exception" means the cause of 959 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 960 * underlying this step or, if {@code get()} throws a different kind of exception, that 961 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 962 * prefer more specific types, avoiding {@code Throwable.class} in particular. 963 * @param fallback the function to be called if this step fails with the expected exception type. 964 * The function's argument is this step's exception. "This step's exception" means the cause 965 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 966 * underlying this step or, if {@code get()} throws a different kind of exception, that 967 * exception itself. 968 * @param executor the executor that runs {@code fallback} if the input fails 969 */ 970 // TODO(dpb): Should this do something special if the function throws CancellationException or 971 // ExecutionException? 972 public <X extends Throwable> ClosingFuture<V> catchingAsync( 973 Class<X> exceptionType, 974 AsyncClosingFunction<? super X, ? extends V> fallback, 975 Executor executor) { 976 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 977 } 978 979 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 980 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 981 Class<X> exceptionType, 982 final AsyncClosingFunction<? super X, W> fallback, 983 Executor executor) { 984 checkNotNull(fallback); 985 AsyncFunction<X, W> asyncFunction = 986 new AsyncFunction<X, W>() { 987 @Override 988 public ListenableFuture<W> apply(X exception) throws Exception { 989 return closeables.applyAsyncClosingFunction(fallback, exception); 990 } 991 992 @Override 993 public String toString() { 994 return fallback.toString(); 995 } 996 }; 997 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 998 } 999 1000 /** 1001 * Marks this step as the last step in the {@code ClosingFuture} pipeline. 1002 * 1003 * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when 1004 * the pipeline is cancelled. 1005 * 1006 * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously 1007 * <b>after</b> the returned {@code Future} is done: the future completes before closing starts, 1008 * rather than once it has finished. 1009 * 1010 * <p>After calling this method, you may not call {@link 1011 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 1012 * derivation method on the original {@code ClosingFuture} instance. 1013 * 1014 * @return a {@link Future} that represents the final value or exception of the pipeline 1015 */ 1016 public FluentFuture<V> finishToFuture() { 1017 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 1018 logger.get().log(FINER, "will close {0}", this); 1019 future.addListener( 1020 new Runnable() { 1021 @Override 1022 public void run() { 1023 checkAndUpdateState(WILL_CLOSE, CLOSING); 1024 close(); 1025 checkAndUpdateState(CLOSING, CLOSED); 1026 } 1027 }, 1028 directExecutor()); 1029 } else { 1030 switch (state.get()) { 1031 case SUBSUMED: 1032 throw new IllegalStateException( 1033 "Cannot call finishToFuture() after deriving another step"); 1034 1035 case WILL_CREATE_VALUE_AND_CLOSER: 1036 throw new IllegalStateException( 1037 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 1038 1039 case WILL_CLOSE: 1040 case CLOSING: 1041 case CLOSED: 1042 throw new IllegalStateException("Cannot call finishToFuture() twice"); 1043 1044 case OPEN: 1045 throw new AssertionError(); 1046 } 1047 } 1048 return future; 1049 } 1050 1051 /** 1052 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 1053 * {@code receiver} will be called with an object that contains the result of the operation. The 1054 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 1055 * 1056 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 1057 * any other derivation method on the original {@code ClosingFuture} instance. 1058 * 1059 * @param consumer a callback whose method will be called (using {@code executor}) when this 1060 * operation is done 1061 */ 1062 public void finishToValueAndCloser( 1063 final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 1064 checkNotNull(consumer); 1065 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 1066 switch (state.get()) { 1067 case SUBSUMED: 1068 throw new IllegalStateException( 1069 "Cannot call finishToValueAndCloser() after deriving another step"); 1070 1071 case WILL_CLOSE: 1072 case CLOSING: 1073 case CLOSED: 1074 throw new IllegalStateException( 1075 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1076 1077 case WILL_CREATE_VALUE_AND_CLOSER: 1078 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1079 1080 case OPEN: 1081 break; 1082 } 1083 throw new AssertionError(state); 1084 } 1085 future.addListener( 1086 new Runnable() { 1087 @Override 1088 public void run() { 1089 provideValueAndCloser(consumer, ClosingFuture.this); 1090 } 1091 }, 1092 executor); 1093 } 1094 1095 private static <C extends @Nullable Object, V extends C> void provideValueAndCloser( 1096 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1097 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1098 } 1099 1100 /** 1101 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1102 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1103 * successful, and this step has not started when {@code cancel} is called, this step should never 1104 * run. 1105 * 1106 * <p>If successful, causes the objects captured by this step (if already started) and its input 1107 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1108 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1109 * 1110 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1111 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1112 * cancelled regardless 1113 * @return {@code false} if the step could not be cancelled, typically because it has already 1114 * completed normally; {@code true} otherwise 1115 */ 1116 @CanIgnoreReturnValue 1117 public boolean cancel(boolean mayInterruptIfRunning) { 1118 logger.get().log(FINER, "cancelling {0}", this); 1119 boolean cancelled = future.cancel(mayInterruptIfRunning); 1120 if (cancelled) { 1121 close(); 1122 } 1123 return cancelled; 1124 } 1125 1126 private void close() { 1127 logger.get().log(FINER, "closing {0}", this); 1128 closeables.close(); 1129 } 1130 1131 private <U extends @Nullable Object> ClosingFuture<U> derive(FluentFuture<U> future) { 1132 ClosingFuture<U> derived = new ClosingFuture<>(future); 1133 becomeSubsumedInto(derived.closeables); 1134 return derived; 1135 } 1136 1137 private void becomeSubsumedInto(CloseableList otherCloseables) { 1138 checkAndUpdateState(OPEN, SUBSUMED); 1139 otherCloseables.add(closeables, directExecutor()); 1140 } 1141 1142 /** 1143 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1144 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1145 * 1146 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1147 */ 1148 public static final class Peeker { 1149 private final ImmutableList<ClosingFuture<?>> futures; 1150 private volatile boolean beingCalled; 1151 1152 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1153 this.futures = checkNotNull(futures); 1154 } 1155 1156 /** 1157 * Returns the value of {@code closingFuture}. 1158 * 1159 * @throws ExecutionException if {@code closingFuture} is a failed step 1160 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1161 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1162 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1163 * @throws IllegalStateException if called outside of a call to {@link 1164 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1165 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1166 */ 1167 @ParametricNullness 1168 public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture) 1169 throws ExecutionException { 1170 checkState(beingCalled); 1171 checkArgument(futures.contains(closingFuture)); 1172 return Futures.getDone(closingFuture.future); 1173 } 1174 1175 @ParametricNullness 1176 private <V extends @Nullable Object> V call( 1177 CombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1178 beingCalled = true; 1179 CloseableList newCloseables = new CloseableList(); 1180 try { 1181 return combiner.call(newCloseables.closer, this); 1182 } finally { 1183 closeables.add(newCloseables, directExecutor()); 1184 beingCalled = false; 1185 } 1186 } 1187 1188 private <V extends @Nullable Object> FluentFuture<V> callAsync( 1189 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1190 beingCalled = true; 1191 CloseableList newCloseables = new CloseableList(); 1192 try { 1193 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1194 closingFuture.becomeSubsumedInto(closeables); 1195 return closingFuture.future; 1196 } finally { 1197 closeables.add(newCloseables, directExecutor()); 1198 beingCalled = false; 1199 } 1200 } 1201 } 1202 1203 /** 1204 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1205 * 1206 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1207 * instantiate this class. 1208 * 1209 * <p>Example: 1210 * 1211 * <pre>{@code 1212 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1213 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1214 * ListenableFuture<Integer> numberOfDifferentLines = 1215 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1216 * .call( 1217 * (closer, peeker) -> { 1218 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1219 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1220 * return countDifferentLines(file1Reader, file2Reader); 1221 * }, 1222 * executor) 1223 * .closing(executor); 1224 * }</pre> 1225 */ 1226 // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8. 1227 @com.google.errorprone.annotations.DoNotMock( 1228 "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1229 public static class Combiner { 1230 1231 private final CloseableList closeables = new CloseableList(); 1232 1233 /** 1234 * An operation that returns a result and may throw an exception. 1235 * 1236 * @param <V> the type of the result 1237 */ 1238 public interface CombiningCallable<V extends @Nullable Object> { 1239 /** 1240 * Computes a result, or throws an exception if unable to do so. 1241 * 1242 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1243 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1244 * (but not before this method completes), even if this method throws or the pipeline is 1245 * cancelled. 1246 * 1247 * @param peeker used to get the value of any of the input futures 1248 */ 1249 @ParametricNullness 1250 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1251 } 1252 1253 /** 1254 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1255 * 1256 * @param <V> the type of the result 1257 */ 1258 public interface AsyncCombiningCallable<V extends @Nullable Object> { 1259 /** 1260 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1261 * 1262 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1263 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1264 * (but not before this method completes), even if this method throws or the pipeline is 1265 * cancelled. 1266 * 1267 * @param peeker used to get the value of any of the input futures 1268 */ 1269 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1270 } 1271 1272 private final boolean allMustSucceed; 1273 protected final ImmutableList<ClosingFuture<?>> inputs; 1274 1275 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1276 this.allMustSucceed = allMustSucceed; 1277 this.inputs = ImmutableList.copyOf(inputs); 1278 for (ClosingFuture<?> input : inputs) { 1279 input.becomeSubsumedInto(closeables); 1280 } 1281 } 1282 1283 /** 1284 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1285 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1286 * objects to be closed when the pipeline is done. 1287 * 1288 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1289 * fail, so will the returned step. 1290 * 1291 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1292 * cancelled. 1293 * 1294 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1295 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1296 */ 1297 public <V extends @Nullable Object> ClosingFuture<V> call( 1298 final CombiningCallable<V> combiningCallable, Executor executor) { 1299 Callable<V> callable = 1300 new Callable<V>() { 1301 @Override 1302 @ParametricNullness 1303 public V call() throws Exception { 1304 return new Peeker(inputs).call(combiningCallable, closeables); 1305 } 1306 1307 @Override 1308 public String toString() { 1309 return combiningCallable.toString(); 1310 } 1311 }; 1312 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1313 derived.closeables.add(closeables, directExecutor()); 1314 return derived; 1315 } 1316 1317 /** 1318 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1319 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1320 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1321 * captured by the returned {@link ClosingFuture}). 1322 * 1323 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1324 * fail, so will the returned step. 1325 * 1326 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1327 * cancelled. 1328 * 1329 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1330 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1331 * 1332 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1333 * derived step. 1334 * 1335 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1336 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1337 * 1338 * <p>Usage guidelines for this method: 1339 * 1340 * <ul> 1341 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1342 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1343 * Executor)} instead, with a function that returns the next value directly. 1344 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1345 * for every closeable object this step creates in order to capture it for later closing. 1346 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1347 * ClosingFuture} call {@link #from(ListenableFuture)}. 1348 * </ul> 1349 * 1350 * <p>The same warnings about doing heavyweight operations within {@link 1351 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1352 */ 1353 public <V extends @Nullable Object> ClosingFuture<V> callAsync( 1354 final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1355 AsyncCallable<V> asyncCallable = 1356 new AsyncCallable<V>() { 1357 @Override 1358 public ListenableFuture<V> call() throws Exception { 1359 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1360 } 1361 1362 @Override 1363 public String toString() { 1364 return combiningCallable.toString(); 1365 } 1366 }; 1367 ClosingFuture<V> derived = 1368 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1369 derived.closeables.add(closeables, directExecutor()); 1370 return derived; 1371 } 1372 1373 private FutureCombiner<@Nullable Object> futureCombiner() { 1374 return allMustSucceed 1375 ? Futures.whenAllSucceed(inputFutures()) 1376 : Futures.whenAllComplete(inputFutures()); 1377 } 1378 1379 1380 private ImmutableList<FluentFuture<?>> inputFutures() { 1381 return FluentIterable.from(inputs) 1382 .<FluentFuture<?>>transform(future -> future.future) 1383 .toList(); 1384 } 1385 } 1386 1387 /** 1388 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1389 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1390 * combination. 1391 * 1392 * @param <V1> the type returned by the first future 1393 * @param <V2> the type returned by the second future 1394 */ 1395 public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object> 1396 extends Combiner { 1397 1398 /** 1399 * A function that returns a value when applied to the values of the two futures passed to 1400 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1401 * 1402 * @param <V1> the type returned by the first future 1403 * @param <V2> the type returned by the second future 1404 * @param <U> the type returned by the function 1405 */ 1406 public interface ClosingFunction2< 1407 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1408 1409 /** 1410 * Applies this function to two inputs, or throws an exception if unable to do so. 1411 * 1412 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1413 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1414 * (but not before this method completes), even if this method throws or the pipeline is 1415 * cancelled. 1416 */ 1417 @ParametricNullness 1418 U apply(DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1419 throws Exception; 1420 } 1421 1422 /** 1423 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1424 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1425 * 1426 * @param <V1> the type returned by the first future 1427 * @param <V2> the type returned by the second future 1428 * @param <U> the type returned by the function 1429 */ 1430 public interface AsyncClosingFunction2< 1431 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1432 1433 /** 1434 * Applies this function to two inputs, or throws an exception if unable to do so. 1435 * 1436 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1437 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1438 * (but not before this method completes), even if this method throws or the pipeline is 1439 * cancelled. 1440 */ 1441 ClosingFuture<U> apply( 1442 DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1443 throws Exception; 1444 } 1445 1446 private final ClosingFuture<V1> future1; 1447 private final ClosingFuture<V2> future2; 1448 1449 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1450 super(true, ImmutableList.of(future1, future2)); 1451 this.future1 = future1; 1452 this.future2 = future2; 1453 } 1454 1455 /** 1456 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1457 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1458 * objects to be closed when the pipeline is done. 1459 * 1460 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1461 * any of the inputs fail, so will the returned step. 1462 * 1463 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1464 * 1465 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1466 * ExecutionException} will be extracted and used as the failure of the derived step. 1467 */ 1468 public <U extends @Nullable Object> ClosingFuture<U> call( 1469 final ClosingFunction2<V1, V2, U> function, Executor executor) { 1470 return call( 1471 new CombiningCallable<U>() { 1472 @Override 1473 @ParametricNullness 1474 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1475 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1476 } 1477 1478 @Override 1479 public String toString() { 1480 return function.toString(); 1481 } 1482 }, 1483 executor); 1484 } 1485 1486 /** 1487 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1488 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1489 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1490 * captured by the returned {@link ClosingFuture}). 1491 * 1492 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1493 * any of the inputs fail, so will the returned step. 1494 * 1495 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1496 * 1497 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1498 * ExecutionException} will be extracted and used as the failure of the derived step. 1499 * 1500 * <p>If the function throws any other exception, it will be used as the failure of the derived 1501 * step. 1502 * 1503 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1504 * the closeable objects in that {@code ClosingFuture} will be closed. 1505 * 1506 * <p>Usage guidelines for this method: 1507 * 1508 * <ul> 1509 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1510 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1511 * Executor)} instead, with a function that returns the next value directly. 1512 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1513 * for every closeable object this step creates in order to capture it for later closing. 1514 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1515 * ClosingFuture} call {@link #from(ListenableFuture)}. 1516 * </ul> 1517 * 1518 * <p>The same warnings about doing heavyweight operations within {@link 1519 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1520 */ 1521 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1522 final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1523 return callAsync( 1524 new AsyncCombiningCallable<U>() { 1525 @Override 1526 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1527 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1528 } 1529 1530 @Override 1531 public String toString() { 1532 return function.toString(); 1533 } 1534 }, 1535 executor); 1536 } 1537 } 1538 1539 /** 1540 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1541 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1542 * ClosingFuture)} to start this combination. 1543 * 1544 * @param <V1> the type returned by the first future 1545 * @param <V2> the type returned by the second future 1546 * @param <V3> the type returned by the third future 1547 */ 1548 public static final class Combiner3< 1549 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 1550 extends Combiner { 1551 /** 1552 * A function that returns a value when applied to the values of the three futures passed to 1553 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1554 * 1555 * @param <V1> the type returned by the first future 1556 * @param <V2> the type returned by the second future 1557 * @param <V3> the type returned by the third future 1558 * @param <U> the type returned by the function 1559 */ 1560 public interface ClosingFunction3< 1561 V1 extends @Nullable Object, 1562 V2 extends @Nullable Object, 1563 V3 extends @Nullable Object, 1564 U extends @Nullable Object> { 1565 /** 1566 * Applies this function to three inputs, or throws an exception if unable to do so. 1567 * 1568 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1569 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1570 * (but not before this method completes), even if this method throws or the pipeline is 1571 * cancelled. 1572 */ 1573 @ParametricNullness 1574 U apply( 1575 DeferredCloser closer, 1576 @ParametricNullness V1 value1, 1577 @ParametricNullness V2 value2, 1578 @ParametricNullness V3 value3) 1579 throws Exception; 1580 } 1581 1582 /** 1583 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1584 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1585 * 1586 * @param <V1> the type returned by the first future 1587 * @param <V2> the type returned by the second future 1588 * @param <V3> the type returned by the third future 1589 * @param <U> the type returned by the function 1590 */ 1591 public interface AsyncClosingFunction3< 1592 V1 extends @Nullable Object, 1593 V2 extends @Nullable Object, 1594 V3 extends @Nullable Object, 1595 U extends @Nullable Object> { 1596 /** 1597 * Applies this function to three inputs, or throws an exception if unable to do so. 1598 * 1599 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1600 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1601 * (but not before this method completes), even if this method throws or the pipeline is 1602 * cancelled. 1603 */ 1604 ClosingFuture<U> apply( 1605 DeferredCloser closer, 1606 @ParametricNullness V1 value1, 1607 @ParametricNullness V2 value2, 1608 @ParametricNullness V3 value3) 1609 throws Exception; 1610 } 1611 1612 private final ClosingFuture<V1> future1; 1613 private final ClosingFuture<V2> future2; 1614 private final ClosingFuture<V3> future3; 1615 1616 private Combiner3( 1617 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1618 super(true, ImmutableList.of(future1, future2, future3)); 1619 this.future1 = future1; 1620 this.future2 = future2; 1621 this.future3 = future3; 1622 } 1623 1624 /** 1625 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1626 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1627 * objects to be closed when the pipeline is done. 1628 * 1629 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1630 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1631 * 1632 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1633 * 1634 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1635 * ExecutionException} will be extracted and used as the failure of the derived step. 1636 */ 1637 public <U extends @Nullable Object> ClosingFuture<U> call( 1638 final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1639 return call( 1640 new CombiningCallable<U>() { 1641 @Override 1642 @ParametricNullness 1643 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1644 return function.apply( 1645 closer, 1646 peeker.getDone(future1), 1647 peeker.getDone(future2), 1648 peeker.getDone(future3)); 1649 } 1650 1651 @Override 1652 public String toString() { 1653 return function.toString(); 1654 } 1655 }, 1656 executor); 1657 } 1658 1659 /** 1660 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1661 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1662 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1663 * captured by the returned {@link ClosingFuture}). 1664 * 1665 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1666 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1667 * 1668 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1669 * 1670 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1671 * ExecutionException} will be extracted and used as the failure of the derived step. 1672 * 1673 * <p>If the function throws any other exception, it will be used as the failure of the derived 1674 * step. 1675 * 1676 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1677 * the closeable objects in that {@code ClosingFuture} will be closed. 1678 * 1679 * <p>Usage guidelines for this method: 1680 * 1681 * <ul> 1682 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1683 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1684 * Executor)} instead, with a function that returns the next value directly. 1685 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1686 * for every closeable object this step creates in order to capture it for later closing. 1687 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1688 * ClosingFuture} call {@link #from(ListenableFuture)}. 1689 * </ul> 1690 * 1691 * <p>The same warnings about doing heavyweight operations within {@link 1692 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1693 */ 1694 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1695 final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1696 return callAsync( 1697 new AsyncCombiningCallable<U>() { 1698 @Override 1699 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1700 return function.apply( 1701 closer, 1702 peeker.getDone(future1), 1703 peeker.getDone(future2), 1704 peeker.getDone(future3)); 1705 } 1706 1707 @Override 1708 public String toString() { 1709 return function.toString(); 1710 } 1711 }, 1712 executor); 1713 } 1714 } 1715 1716 /** 1717 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1718 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1719 * ClosingFuture)} to start this combination. 1720 * 1721 * @param <V1> the type returned by the first future 1722 * @param <V2> the type returned by the second future 1723 * @param <V3> the type returned by the third future 1724 * @param <V4> the type returned by the fourth future 1725 */ 1726 public static final class Combiner4< 1727 V1 extends @Nullable Object, 1728 V2 extends @Nullable Object, 1729 V3 extends @Nullable Object, 1730 V4 extends @Nullable Object> 1731 extends Combiner { 1732 /** 1733 * A function that returns a value when applied to the values of the four futures passed to 1734 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1735 * 1736 * @param <V1> the type returned by the first future 1737 * @param <V2> the type returned by the second future 1738 * @param <V3> the type returned by the third future 1739 * @param <V4> the type returned by the fourth future 1740 * @param <U> the type returned by the function 1741 */ 1742 public interface ClosingFunction4< 1743 V1 extends @Nullable Object, 1744 V2 extends @Nullable Object, 1745 V3 extends @Nullable Object, 1746 V4 extends @Nullable Object, 1747 U extends @Nullable Object> { 1748 /** 1749 * Applies this function to four inputs, or throws an exception if unable to do so. 1750 * 1751 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1752 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1753 * (but not before this method completes), even if this method throws or the pipeline is 1754 * cancelled. 1755 */ 1756 @ParametricNullness 1757 U apply( 1758 DeferredCloser closer, 1759 @ParametricNullness V1 value1, 1760 @ParametricNullness V2 value2, 1761 @ParametricNullness V3 value3, 1762 @ParametricNullness V4 value4) 1763 throws Exception; 1764 } 1765 1766 /** 1767 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1768 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1769 * ClosingFuture)}. 1770 * 1771 * @param <V1> the type returned by the first future 1772 * @param <V2> the type returned by the second future 1773 * @param <V3> the type returned by the third future 1774 * @param <V4> the type returned by the fourth future 1775 * @param <U> the type returned by the function 1776 */ 1777 public interface AsyncClosingFunction4< 1778 V1 extends @Nullable Object, 1779 V2 extends @Nullable Object, 1780 V3 extends @Nullable Object, 1781 V4 extends @Nullable Object, 1782 U extends @Nullable Object> { 1783 /** 1784 * Applies this function to four inputs, or throws an exception if unable to do so. 1785 * 1786 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1787 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1788 * (but not before this method completes), even if this method throws or the pipeline is 1789 * cancelled. 1790 */ 1791 ClosingFuture<U> apply( 1792 DeferredCloser closer, 1793 @ParametricNullness V1 value1, 1794 @ParametricNullness V2 value2, 1795 @ParametricNullness V3 value3, 1796 @ParametricNullness V4 value4) 1797 throws Exception; 1798 } 1799 1800 private final ClosingFuture<V1> future1; 1801 private final ClosingFuture<V2> future2; 1802 private final ClosingFuture<V3> future3; 1803 private final ClosingFuture<V4> future4; 1804 1805 private Combiner4( 1806 ClosingFuture<V1> future1, 1807 ClosingFuture<V2> future2, 1808 ClosingFuture<V3> future3, 1809 ClosingFuture<V4> future4) { 1810 super(true, ImmutableList.of(future1, future2, future3, future4)); 1811 this.future1 = future1; 1812 this.future2 = future2; 1813 this.future3 = future3; 1814 this.future4 = future4; 1815 } 1816 1817 /** 1818 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1819 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1820 * objects to be closed when the pipeline is done. 1821 * 1822 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1823 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1824 * 1825 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1826 * 1827 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1828 * ExecutionException} will be extracted and used as the failure of the derived step. 1829 */ 1830 public <U extends @Nullable Object> ClosingFuture<U> call( 1831 final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1832 return call( 1833 new CombiningCallable<U>() { 1834 @Override 1835 @ParametricNullness 1836 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1837 return function.apply( 1838 closer, 1839 peeker.getDone(future1), 1840 peeker.getDone(future2), 1841 peeker.getDone(future3), 1842 peeker.getDone(future4)); 1843 } 1844 1845 @Override 1846 public String toString() { 1847 return function.toString(); 1848 } 1849 }, 1850 executor); 1851 } 1852 1853 /** 1854 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1855 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1856 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1857 * captured by the returned {@link ClosingFuture}). 1858 * 1859 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1860 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1861 * 1862 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1863 * 1864 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1865 * ExecutionException} will be extracted and used as the failure of the derived step. 1866 * 1867 * <p>If the function throws any other exception, it will be used as the failure of the derived 1868 * step. 1869 * 1870 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1871 * the closeable objects in that {@code ClosingFuture} will be closed. 1872 * 1873 * <p>Usage guidelines for this method: 1874 * 1875 * <ul> 1876 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1877 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1878 * Executor)} instead, with a function that returns the next value directly. 1879 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1880 * for every closeable object this step creates in order to capture it for later closing. 1881 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1882 * ClosingFuture} call {@link #from(ListenableFuture)}. 1883 * </ul> 1884 * 1885 * <p>The same warnings about doing heavyweight operations within {@link 1886 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1887 */ 1888 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1889 final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1890 return callAsync( 1891 new AsyncCombiningCallable<U>() { 1892 @Override 1893 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1894 return function.apply( 1895 closer, 1896 peeker.getDone(future1), 1897 peeker.getDone(future2), 1898 peeker.getDone(future3), 1899 peeker.getDone(future4)); 1900 } 1901 1902 @Override 1903 public String toString() { 1904 return function.toString(); 1905 } 1906 }, 1907 executor); 1908 } 1909 } 1910 1911 /** 1912 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1913 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1914 * ClosingFuture, ClosingFuture)} to start this combination. 1915 * 1916 * @param <V1> the type returned by the first future 1917 * @param <V2> the type returned by the second future 1918 * @param <V3> the type returned by the third future 1919 * @param <V4> the type returned by the fourth future 1920 * @param <V5> the type returned by the fifth future 1921 */ 1922 public static final class Combiner5< 1923 V1 extends @Nullable Object, 1924 V2 extends @Nullable Object, 1925 V3 extends @Nullable Object, 1926 V4 extends @Nullable Object, 1927 V5 extends @Nullable Object> 1928 extends Combiner { 1929 /** 1930 * A function that returns a value when applied to the values of the five futures passed to 1931 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1932 * ClosingFuture)}. 1933 * 1934 * @param <V1> the type returned by the first future 1935 * @param <V2> the type returned by the second future 1936 * @param <V3> the type returned by the third future 1937 * @param <V4> the type returned by the fourth future 1938 * @param <V5> the type returned by the fifth future 1939 * @param <U> the type returned by the function 1940 */ 1941 public interface ClosingFunction5< 1942 V1 extends @Nullable Object, 1943 V2 extends @Nullable Object, 1944 V3 extends @Nullable Object, 1945 V4 extends @Nullable Object, 1946 V5 extends @Nullable Object, 1947 U extends @Nullable Object> { 1948 /** 1949 * Applies this function to five inputs, or throws an exception if unable to do so. 1950 * 1951 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1952 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1953 * (but not before this method completes), even if this method throws or the pipeline is 1954 * cancelled. 1955 */ 1956 @ParametricNullness 1957 U apply( 1958 DeferredCloser closer, 1959 @ParametricNullness V1 value1, 1960 @ParametricNullness V2 value2, 1961 @ParametricNullness V3 value3, 1962 @ParametricNullness V4 value4, 1963 @ParametricNullness V5 value5) 1964 throws Exception; 1965 } 1966 1967 /** 1968 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1969 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1970 * ClosingFuture, ClosingFuture)}. 1971 * 1972 * @param <V1> the type returned by the first future 1973 * @param <V2> the type returned by the second future 1974 * @param <V3> the type returned by the third future 1975 * @param <V4> the type returned by the fourth future 1976 * @param <V5> the type returned by the fifth future 1977 * @param <U> the type returned by the function 1978 */ 1979 public interface AsyncClosingFunction5< 1980 V1 extends @Nullable Object, 1981 V2 extends @Nullable Object, 1982 V3 extends @Nullable Object, 1983 V4 extends @Nullable Object, 1984 V5 extends @Nullable Object, 1985 U extends @Nullable Object> { 1986 /** 1987 * Applies this function to five inputs, or throws an exception if unable to do so. 1988 * 1989 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1990 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1991 * (but not before this method completes), even if this method throws or the pipeline is 1992 * cancelled. 1993 */ 1994 ClosingFuture<U> apply( 1995 DeferredCloser closer, 1996 @ParametricNullness V1 value1, 1997 @ParametricNullness V2 value2, 1998 @ParametricNullness V3 value3, 1999 @ParametricNullness V4 value4, 2000 @ParametricNullness V5 value5) 2001 throws Exception; 2002 } 2003 2004 private final ClosingFuture<V1> future1; 2005 private final ClosingFuture<V2> future2; 2006 private final ClosingFuture<V3> future3; 2007 private final ClosingFuture<V4> future4; 2008 private final ClosingFuture<V5> future5; 2009 2010 private Combiner5( 2011 ClosingFuture<V1> future1, 2012 ClosingFuture<V2> future2, 2013 ClosingFuture<V3> future3, 2014 ClosingFuture<V4> future4, 2015 ClosingFuture<V5> future5) { 2016 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 2017 this.future1 = future1; 2018 this.future2 = future2; 2019 this.future3 = future3; 2020 this.future4 = future4; 2021 this.future5 = future5; 2022 } 2023 2024 /** 2025 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2026 * combining function to their values. The function can use a {@link DeferredCloser} to capture 2027 * objects to be closed when the pipeline is done. 2028 * 2029 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2030 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2031 * returned step. 2032 * 2033 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2034 * 2035 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2036 * ExecutionException} will be extracted and used as the failure of the derived step. 2037 */ 2038 public <U extends @Nullable Object> ClosingFuture<U> call( 2039 final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2040 return call( 2041 new CombiningCallable<U>() { 2042 @Override 2043 @ParametricNullness 2044 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 2045 return function.apply( 2046 closer, 2047 peeker.getDone(future1), 2048 peeker.getDone(future2), 2049 peeker.getDone(future3), 2050 peeker.getDone(future4), 2051 peeker.getDone(future5)); 2052 } 2053 2054 @Override 2055 public String toString() { 2056 return function.toString(); 2057 } 2058 }, 2059 executor); 2060 } 2061 2062 /** 2063 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2064 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 2065 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 2066 * captured by the returned {@link ClosingFuture}). 2067 * 2068 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2069 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2070 * returned step. 2071 * 2072 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2073 * 2074 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2075 * ExecutionException} will be extracted and used as the failure of the derived step. 2076 * 2077 * <p>If the function throws any other exception, it will be used as the failure of the derived 2078 * step. 2079 * 2080 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2081 * the closeable objects in that {@code ClosingFuture} will be closed. 2082 * 2083 * <p>Usage guidelines for this method: 2084 * 2085 * <ul> 2086 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2087 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2088 * Executor)} instead, with a function that returns the next value directly. 2089 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 2090 * for every closeable object this step creates in order to capture it for later closing. 2091 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2092 * ClosingFuture} call {@link #from(ListenableFuture)}. 2093 * </ul> 2094 * 2095 * <p>The same warnings about doing heavyweight operations within {@link 2096 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2097 */ 2098 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 2099 final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2100 return callAsync( 2101 new AsyncCombiningCallable<U>() { 2102 @Override 2103 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2104 return function.apply( 2105 closer, 2106 peeker.getDone(future1), 2107 peeker.getDone(future2), 2108 peeker.getDone(future3), 2109 peeker.getDone(future4), 2110 peeker.getDone(future5)); 2111 } 2112 2113 @Override 2114 public String toString() { 2115 return function.toString(); 2116 } 2117 }, 2118 executor); 2119 } 2120 } 2121 2122 @Override 2123 public String toString() { 2124 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2125 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2126 } 2127 2128 @SuppressWarnings("removal") // b/260137033 2129 @Override 2130 protected void finalize() { 2131 if (state.get().equals(OPEN)) { 2132 logger.get().log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2133 FluentFuture<V> unused = finishToFuture(); 2134 } 2135 } 2136 2137 private static void closeQuietly(@CheckForNull final AutoCloseable closeable, Executor executor) { 2138 if (closeable == null) { 2139 return; 2140 } 2141 try { 2142 executor.execute( 2143 () -> { 2144 try { 2145 closeable.close(); 2146 } catch (Exception e) { 2147 /* 2148 * In guava-jre, any kind of Exception may be thrown because `closeable` has type 2149 * `AutoCloseable`. 2150 * 2151 * In guava-android, the only kinds of Exception that may be thrown are 2152 * RuntimeException and IOException because `closeable` has type `Closeable`—except 2153 * that we have to account for sneaky checked exception. 2154 */ 2155 restoreInterruptIfIsInterruptedException(e); 2156 logger.get().log(WARNING, "thrown by close()", e); 2157 } 2158 }); 2159 } catch (RejectedExecutionException e) { 2160 if (logger.get().isLoggable(WARNING)) { 2161 logger 2162 .get() 2163 .log( 2164 WARNING, 2165 String.format("while submitting close to %s; will close inline", executor), 2166 e); 2167 } 2168 closeQuietly(closeable, directExecutor()); 2169 } 2170 } 2171 2172 private void checkAndUpdateState(State oldState, State newState) { 2173 checkState( 2174 compareAndUpdateState(oldState, newState), 2175 "Expected state to be %s, but it was %s", 2176 oldState, 2177 newState); 2178 } 2179 2180 private boolean compareAndUpdateState(State oldState, State newState) { 2181 return state.compareAndSet(oldState, newState); 2182 } 2183 2184 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2185 private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor> 2186 implements Closeable { 2187 private final DeferredCloser closer = new DeferredCloser(this); 2188 private volatile boolean closed; 2189 @CheckForNull private volatile CountDownLatch whenClosed; 2190 2191 <V extends @Nullable Object, U extends @Nullable Object> 2192 ListenableFuture<U> applyClosingFunction( 2193 ClosingFunction<? super V, U> transformation, @ParametricNullness V input) 2194 throws Exception { 2195 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2196 CloseableList newCloseables = new CloseableList(); 2197 try { 2198 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2199 } finally { 2200 add(newCloseables, directExecutor()); 2201 } 2202 } 2203 2204 <V extends @Nullable Object, U extends @Nullable Object> 2205 FluentFuture<U> applyAsyncClosingFunction( 2206 AsyncClosingFunction<V, U> transformation, @ParametricNullness V input) 2207 throws Exception { 2208 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2209 CloseableList newCloseables = new CloseableList(); 2210 try { 2211 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2212 closingFuture.becomeSubsumedInto(newCloseables); 2213 return closingFuture.future; 2214 } finally { 2215 add(newCloseables, directExecutor()); 2216 } 2217 } 2218 2219 @Override 2220 public void close() { 2221 if (closed) { 2222 return; 2223 } 2224 synchronized (this) { 2225 if (closed) { 2226 return; 2227 } 2228 closed = true; 2229 } 2230 for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) { 2231 closeQuietly(entry.getKey(), entry.getValue()); 2232 } 2233 clear(); 2234 if (whenClosed != null) { 2235 whenClosed.countDown(); 2236 } 2237 } 2238 2239 void add(@CheckForNull AutoCloseable closeable, Executor executor) { 2240 checkNotNull(executor); 2241 if (closeable == null) { 2242 return; 2243 } 2244 synchronized (this) { 2245 if (!closed) { 2246 put(closeable, executor); 2247 return; 2248 } 2249 } 2250 closeQuietly(closeable, executor); 2251 } 2252 2253 /** 2254 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2255 */ 2256 CountDownLatch whenClosedCountDown() { 2257 if (closed) { 2258 return new CountDownLatch(0); 2259 } 2260 synchronized (this) { 2261 if (closed) { 2262 return new CountDownLatch(0); 2263 } 2264 checkState(whenClosed == null); 2265 return whenClosed = new CountDownLatch(1); 2266 } 2267 } 2268 } 2269 2270 /** 2271 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2272 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2273 */ 2274 @VisibleForTesting 2275 CountDownLatch whenClosedCountDown() { 2276 return closeables.whenClosedCountDown(); 2277 } 2278 2279 /** The state of a {@link CloseableList}. */ 2280 enum State { 2281 /** The {@link CloseableList} has not been subsumed or closed. */ 2282 OPEN, 2283 2284 /** 2285 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2286 * into any other. 2287 */ 2288 SUBSUMED, 2289 2290 /** 2291 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2292 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2293 */ 2294 WILL_CLOSE, 2295 2296 /** 2297 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2298 * {@link CloseableList} may not be subsumed. 2299 */ 2300 CLOSING, 2301 2302 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2303 CLOSED, 2304 2305 /** 2306 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2307 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2308 */ 2309 WILL_CREATE_VALUE_AND_CLOSER, 2310 } 2311}