001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 036import static java.util.logging.Level.FINER; 037import static java.util.logging.Level.SEVERE; 038import static java.util.logging.Level.WARNING; 039 040import com.google.common.annotations.J2ktIncompatible; 041import com.google.common.annotations.VisibleForTesting; 042import com.google.common.collect.FluentIterable; 043import com.google.common.collect.ImmutableList; 044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 046import com.google.common.util.concurrent.Futures.FutureCombiner; 047import com.google.errorprone.annotations.CanIgnoreReturnValue; 048import com.google.errorprone.annotations.DoNotMock; 049import com.google.j2objc.annotations.RetainedWith; 050import java.io.Closeable; 051import java.util.IdentityHashMap; 052import java.util.Map; 053import java.util.concurrent.Callable; 054import java.util.concurrent.CancellationException; 055import java.util.concurrent.CountDownLatch; 056import java.util.concurrent.ExecutionException; 057import java.util.concurrent.Executor; 058import java.util.concurrent.Future; 059import java.util.concurrent.RejectedExecutionException; 060import java.util.concurrent.atomic.AtomicReference; 061import javax.annotation.CheckForNull; 062import org.checkerframework.checker.nullness.qual.Nullable; 063 064/** 065 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 066 * complete, some objects captured during the computation are closed. 067 * 068 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 069 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 070 * cancellation of the operation so far. The only way to extract the value or exception from a step 071 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 072 * "value" of a successful step or the "result" (value or exception) of any step. 073 * 074 * <ol> 075 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 076 * block or a {@link ListenableFuture}. 077 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 078 * can be captured for later closing. 079 * <li>There is one last step (the root of the tree), from which you can extract the final result 080 * of the computation. After that result is available (or the computation fails), all objects 081 * captured by any of the steps in the pipeline are closed. 082 * </ol> 083 * 084 * <h3>Starting a pipeline</h3> 085 * 086 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 087 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 088 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 089 * #from(ListenableFuture)} instead. 090 * 091 * <h3>Derived steps</h3> 092 * 093 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 094 * ways similar to {@link FluentFuture}s: 095 * 096 * <ul> 097 * <li>by transforming the value from a successful input step, 098 * <li>by catching the exception from a failed input step, or 099 * <li>by combining the results of several input steps. 100 * </ul> 101 * 102 * Each derivation can capture the next value or any intermediate objects for later closing. 103 * 104 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 105 * exception, or combine it with others, you cannot do anything else with it, including declare it 106 * to be the last step of the pipeline. 107 * 108 * <h4>Transforming</h4> 109 * 110 * To derive the next step by asynchronously applying a function to an input step's value, call 111 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 112 * Executor)} on the input step. 113 * 114 * <h4>Catching</h4> 115 * 116 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 117 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 118 * 119 * <h4>Combining</h4> 120 * 121 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 122 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 123 * 124 * <h3>Cancelling</h3> 125 * 126 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 127 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 128 * successfully cancelled step will immediately start closing all objects captured for later closing 129 * by it and by its input steps. 130 * 131 * <h3>Ending a pipeline</h3> 132 * 133 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 134 * close the captured objects automatically or manually. 135 * 136 * <h4>Automatically closing</h4> 137 * 138 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 139 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin 140 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future 141 * completes before closing starts, rather than once it has finished. 142 * 143 * <pre>{@code 144 * FluentFuture<UserName> userName = 145 * ClosingFuture.submit( 146 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 147 * executor) 148 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 149 * .transform((closer, result) -> result.get("userName"), directExecutor()) 150 * .catching(DBException.class, e -> "no user", directExecutor()) 151 * .finishToFuture(); 152 * }</pre> 153 * 154 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 155 * result cursor will both be closed, even if the operation is cancelled or fails. 156 * 157 * <h4>Manually closing</h4> 158 * 159 * If you want to close the captured objects manually, after you've used the final result, call 160 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 161 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 162 * 163 * <pre>{@code 164 * ClosingFuture.submit( 165 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 166 * executor) 167 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 168 * .transform((closer, result) -> result.get("userName"), directExecutor()) 169 * .catching(DBException.class, e -> "no user", directExecutor()) 170 * .finishToValueAndCloser( 171 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 172 * 173 * // later 174 * try { // get() will throw if the operation failed or was cancelled. 175 * UserName userName = userNameValueAndCloser.get(); 176 * // do something with userName 177 * } finally { 178 * userNameValueAndCloser.closeAsync(); 179 * } 180 * }</pre> 181 * 182 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 183 * the query result cursor will both be closed, even if the operation is cancelled or fails. 184 * 185 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 186 * automatic-closing approach described above is safer. 187 * 188 * @param <V> the type of the value of this step 189 * @since 30.0 190 */ 191// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 192@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 193@J2ktIncompatible 194@ElementTypesAreNonnullByDefault 195// TODO(dpb): GWT compatibility. 196public final class ClosingFuture<V extends @Nullable Object> { 197 198 private static final LazyLogger logger = new LazyLogger(ClosingFuture.class); 199 200 /** 201 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 202 * done. 203 */ 204 public static final class DeferredCloser { 205 @RetainedWith private final CloseableList list; 206 207 DeferredCloser(CloseableList list) { 208 this.list = list; 209 } 210 211 /** 212 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 213 * 214 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 215 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 216 * Closeable}. (For more about the flavors, see <a 217 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 218 * build</a>.) 219 * 220 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 221 * building for Android): Ensure that any object you pass implements the interface not just in 222 * your current SDK version but also at the oldest version you support. For example, <a 223 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 224 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 225 * {@code Closeable} with a method reference like {@code cursor::close}. 226 * 227 * <p>Note that this method is still binary-compatible between flavors because the erasure of 228 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 229 * 230 * @param closeable the object to be closed (see notes above) 231 * @param closingExecutor the object will be closed on this executor 232 * @return the first argument 233 */ 234 @CanIgnoreReturnValue 235 @ParametricNullness 236 // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19. 237 public <C extends @Nullable Object & @Nullable Closeable> C eventuallyClose( 238 @ParametricNullness C closeable, Executor closingExecutor) { 239 checkNotNull(closingExecutor); 240 if (closeable != null) { 241 list.add(closeable, closingExecutor); 242 } 243 return closeable; 244 } 245 } 246 247 /** 248 * An operation that computes a result. 249 * 250 * @param <V> the type of the result 251 */ 252 public interface ClosingCallable<V extends @Nullable Object> { 253 /** 254 * Computes a result, or throws an exception if unable to do so. 255 * 256 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 257 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 258 * not before this method completes), even if this method throws or the pipeline is cancelled. 259 */ 260 @ParametricNullness 261 V call(DeferredCloser closer) throws Exception; 262 } 263 264 /** 265 * An operation that computes a {@link ClosingFuture} of a result. 266 * 267 * @param <V> the type of the result 268 * @since 30.1 269 */ 270 public interface AsyncClosingCallable<V extends @Nullable Object> { 271 /** 272 * Computes a result, or throws an exception if unable to do so. 273 * 274 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 275 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 276 * not before this method completes), even if this method throws or the pipeline is cancelled. 277 */ 278 ClosingFuture<V> call(DeferredCloser closer) throws Exception; 279 } 280 281 /** 282 * A function from an input to a result. 283 * 284 * @param <T> the type of the input to the function 285 * @param <U> the type of the result of the function 286 */ 287 public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 288 289 /** 290 * Applies this function to an input, or throws an exception if unable to do so. 291 * 292 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 293 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 294 * not before this method completes), even if this method throws or the pipeline is cancelled. 295 */ 296 @ParametricNullness 297 U apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 298 } 299 300 /** 301 * A function from an input to a {@link ClosingFuture} of a result. 302 * 303 * @param <T> the type of the input to the function 304 * @param <U> the type of the result of the function 305 */ 306 public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 307 /** 308 * Applies this function to an input, or throws an exception if unable to do so. 309 * 310 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 311 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 312 * not before this method completes), even if this method throws or the pipeline is cancelled. 313 */ 314 ClosingFuture<U> apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 315 } 316 317 /** 318 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 319 * allows the user to close all the closeable objects that were captured during it for later 320 * closing. 321 * 322 * <p>The asynchronous operation will have completed before this object is created. 323 * 324 * @param <V> the type of the value of a successful operation 325 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 326 */ 327 public static final class ValueAndCloser<V extends @Nullable Object> { 328 329 private final ClosingFuture<? extends V> closingFuture; 330 331 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 332 this.closingFuture = checkNotNull(closingFuture); 333 } 334 335 /** 336 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 337 * {@link Future#get()} would. 338 * 339 * <p>Because the asynchronous operation has already completed, this method is synchronous and 340 * returns immediately. 341 * 342 * @throws CancellationException if the computation was cancelled 343 * @throws ExecutionException if the computation threw an exception 344 */ 345 @ParametricNullness 346 public V get() throws ExecutionException { 347 return getDone(closingFuture.future); 348 } 349 350 /** 351 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 352 * operation on the {@link Executor}s specified by calls to {@link 353 * DeferredCloser#eventuallyClose(Object, Executor)}. 354 * 355 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 356 * closed synchronously. 357 * 358 * <p>Idempotent: objects will be closed at most once. 359 */ 360 public void closeAsync() { 361 closingFuture.close(); 362 } 363 } 364 365 /** 366 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 367 * ClosingFuture} pipeline. 368 * 369 * @param <V> the type of the final value of a successful pipeline 370 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 371 */ 372 public interface ValueAndCloserConsumer<V extends @Nullable Object> { 373 374 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 375 void accept(ValueAndCloser<V> valueAndCloser); 376 } 377 378 /** 379 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 380 * 381 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 382 * execution 383 */ 384 public static <V extends @Nullable Object> ClosingFuture<V> submit( 385 ClosingCallable<V> callable, Executor executor) { 386 return new ClosingFuture<>(callable, executor); 387 } 388 389 /** 390 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 391 * 392 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 393 * execution 394 * @since 30.1 395 */ 396 public static <V extends @Nullable Object> ClosingFuture<V> submitAsync( 397 AsyncClosingCallable<V> callable, Executor executor) { 398 return new ClosingFuture<>(callable, executor); 399 } 400 401 /** 402 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 403 * 404 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 405 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 406 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 407 */ 408 public static <V extends @Nullable Object> ClosingFuture<V> from(ListenableFuture<V> future) { 409 return new ClosingFuture<V>(future); 410 } 411 412 /** 413 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 414 * 415 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)}) when 416 * the pipeline is done, even if the pipeline is canceled or fails. 417 * 418 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 419 * value in order to close it. 420 * 421 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 422 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Object, 423 * Executor)}. 424 * @param closingExecutor the future's result will be closed on this executor 425 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 426 * underlying value may never be closed if the {@link Future} is canceled after its operation 427 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 428 * including those that pass them to this method, with {@link #submit(ClosingCallable, 429 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 430 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 431 * ClosingFuture#from}. 432 */ 433 @Deprecated 434 // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19. 435 public static <C extends @Nullable Object & @Nullable Closeable> 436 ClosingFuture<C> eventuallyClosing( 437 ListenableFuture<C> future, final Executor closingExecutor) { 438 checkNotNull(closingExecutor); 439 final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 440 Futures.addCallback( 441 future, 442 new FutureCallback<@Nullable Closeable>() { 443 @Override 444 public void onSuccess(@CheckForNull Closeable result) { 445 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 446 } 447 448 @Override 449 public void onFailure(Throwable t) {} 450 }, 451 directExecutor()); 452 return closingFuture; 453 } 454 455 /** 456 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 457 * 458 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 459 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 460 */ 461 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 462 return new Combiner(false, futures); 463 } 464 465 /** 466 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 467 * 468 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 469 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 470 */ 471 public static Combiner whenAllComplete( 472 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 473 return whenAllComplete(asList(future1, moreFutures)); 474 } 475 476 /** 477 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 478 * all succeed. If any fail, the resulting pipeline will fail. 479 * 480 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 481 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 482 */ 483 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 484 return new Combiner(true, futures); 485 } 486 487 /** 488 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 489 * they all succeed. If any fail, the resulting pipeline will fail. 490 * 491 * <p>Calling this method allows you to use lambdas or method references typed with the types of 492 * the input {@link ClosingFuture}s. 493 * 494 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 495 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 496 */ 497 public static <V1 extends @Nullable Object, V2 extends @Nullable Object> 498 Combiner2<V1, V2> whenAllSucceed(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 499 return new Combiner2<>(future1, future2); 500 } 501 502 /** 503 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 504 * they all succeed. If any fail, the resulting pipeline will fail. 505 * 506 * <p>Calling this method allows you to use lambdas or method references typed with the types of 507 * the input {@link ClosingFuture}s. 508 * 509 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 510 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 511 */ 512 public static < 513 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 514 Combiner3<V1, V2, V3> whenAllSucceed( 515 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 516 return new Combiner3<>(future1, future2, future3); 517 } 518 519 /** 520 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 521 * they all succeed. If any fail, the resulting pipeline will fail. 522 * 523 * <p>Calling this method allows you to use lambdas or method references typed with the types of 524 * the input {@link ClosingFuture}s. 525 * 526 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 527 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 528 */ 529 public static < 530 V1 extends @Nullable Object, 531 V2 extends @Nullable Object, 532 V3 extends @Nullable Object, 533 V4 extends @Nullable Object> 534 Combiner4<V1, V2, V3, V4> whenAllSucceed( 535 ClosingFuture<V1> future1, 536 ClosingFuture<V2> future2, 537 ClosingFuture<V3> future3, 538 ClosingFuture<V4> future4) { 539 return new Combiner4<>(future1, future2, future3, future4); 540 } 541 542 /** 543 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 544 * they all succeed. If any fail, the resulting pipeline will fail. 545 * 546 * <p>Calling this method allows you to use lambdas or method references typed with the types of 547 * the input {@link ClosingFuture}s. 548 * 549 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 550 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 551 */ 552 public static < 553 V1 extends @Nullable Object, 554 V2 extends @Nullable Object, 555 V3 extends @Nullable Object, 556 V4 extends @Nullable Object, 557 V5 extends @Nullable Object> 558 Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 559 ClosingFuture<V1> future1, 560 ClosingFuture<V2> future2, 561 ClosingFuture<V3> future3, 562 ClosingFuture<V4> future4, 563 ClosingFuture<V5> future5) { 564 return new Combiner5<>(future1, future2, future3, future4, future5); 565 } 566 567 /** 568 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 569 * all succeed. If any fail, the resulting pipeline will fail. 570 * 571 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 572 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 573 */ 574 public static Combiner whenAllSucceed( 575 ClosingFuture<?> future1, 576 ClosingFuture<?> future2, 577 ClosingFuture<?> future3, 578 ClosingFuture<?> future4, 579 ClosingFuture<?> future5, 580 ClosingFuture<?> future6, 581 ClosingFuture<?>... moreFutures) { 582 return whenAllSucceed( 583 FluentIterable.of(future1, future2, future3, future4, future5, future6) 584 .append(moreFutures)); 585 } 586 587 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 588 private final CloseableList closeables = new CloseableList(); 589 private final FluentFuture<V> future; 590 591 private ClosingFuture(ListenableFuture<V> future) { 592 this.future = FluentFuture.from(future); 593 } 594 595 private ClosingFuture(final ClosingCallable<V> callable, Executor executor) { 596 checkNotNull(callable); 597 TrustedListenableFutureTask<V> task = 598 TrustedListenableFutureTask.create( 599 new Callable<V>() { 600 @Override 601 @ParametricNullness 602 public V call() throws Exception { 603 return callable.call(closeables.closer); 604 } 605 606 @Override 607 public String toString() { 608 return callable.toString(); 609 } 610 }); 611 executor.execute(task); 612 this.future = task; 613 } 614 615 private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) { 616 checkNotNull(callable); 617 TrustedListenableFutureTask<V> task = 618 TrustedListenableFutureTask.create( 619 new AsyncCallable<V>() { 620 @Override 621 public ListenableFuture<V> call() throws Exception { 622 CloseableList newCloseables = new CloseableList(); 623 try { 624 ClosingFuture<V> closingFuture = callable.call(newCloseables.closer); 625 closingFuture.becomeSubsumedInto(closeables); 626 return closingFuture.future; 627 } finally { 628 closeables.add(newCloseables, directExecutor()); 629 } 630 } 631 632 @Override 633 public String toString() { 634 return callable.toString(); 635 } 636 }); 637 executor.execute(task); 638 this.future = task; 639 } 640 641 /** 642 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 643 * future returns {@code null} if the step is successful or throws the same exception that would 644 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 645 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 646 * 647 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 648 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 649 * a derivation method <i>on the same instance</i>. This is important because calling {@code 650 * statusFuture} alone does not provide a way to close the pipeline. 651 */ 652 public ListenableFuture<?> statusFuture() { 653 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 654 } 655 656 /** 657 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 658 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 659 * when the pipeline is done. 660 * 661 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 662 * ClosingFuture} will be equivalent to this one. 663 * 664 * <p>If the function throws an exception, that exception is used as the result of the derived 665 * {@code ClosingFuture}. 666 * 667 * <p>Example usage: 668 * 669 * <pre>{@code 670 * ClosingFuture<List<Row>> rowsFuture = 671 * queryFuture.transform((closer, result) -> result.getRows(), executor); 672 * }</pre> 673 * 674 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 675 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 676 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 677 * 678 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 679 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 680 * the original {@code ClosingFuture} instance. 681 * 682 * @param function transforms the value of this step to the value of the derived step 683 * @param executor executor to run the function in 684 * @return the derived step 685 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 686 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 687 * finished} 688 */ 689 public <U extends @Nullable Object> ClosingFuture<U> transform( 690 final ClosingFunction<? super V, U> function, Executor executor) { 691 checkNotNull(function); 692 AsyncFunction<V, U> applyFunction = 693 new AsyncFunction<V, U>() { 694 @Override 695 public ListenableFuture<U> apply(V input) throws Exception { 696 return closeables.applyClosingFunction(function, input); 697 } 698 699 @Override 700 public String toString() { 701 return function.toString(); 702 } 703 }; 704 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 705 return derive(future.transformAsync(applyFunction, executor)); 706 } 707 708 /** 709 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 710 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 711 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 712 * captured by the returned {@link ClosingFuture}). 713 * 714 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 715 * returned by the function. 716 * 717 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 718 * ClosingFuture} will be equivalent to this one. 719 * 720 * <p>If the function throws an exception, that exception is used as the result of the derived 721 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 722 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 723 * closed. 724 * 725 * <p>Usage guidelines for this method: 726 * 727 * <ul> 728 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 729 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 730 * Executor)} instead, with a function that returns the next value directly. 731 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 732 * for every closeable object this step creates in order to capture it for later closing. 733 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 734 * ClosingFuture} call {@link #from(ListenableFuture)}. 735 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 736 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 737 * {@link #withoutCloser(AsyncFunction)} 738 * </ul> 739 * 740 * <p>Example usage: 741 * 742 * <pre>{@code 743 * // Result.getRowsClosingFuture() returns a ClosingFuture. 744 * ClosingFuture<List<Row>> rowsFuture = 745 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 746 * 747 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 748 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 749 * // Closeable). 750 * ClosingFuture<Integer> rowsFuture2 = 751 * queryFuture.transformAsync( 752 * (closer, result) -> { 753 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 754 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 755 * }, 756 * executor); 757 * 758 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 759 * ClosingFuture<List<Row>> rowsFuture3 = 760 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 761 * 762 * }</pre> 763 * 764 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 765 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 766 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 767 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 768 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 769 * responsible for completing the returned {@code ClosingFuture}.) 770 * 771 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 772 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 773 * the original {@code ClosingFuture} instance. 774 * 775 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 776 * the derived step 777 * @param executor executor to run the function in 778 * @return the derived step 779 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 780 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 781 * finished} 782 */ 783 public <U extends @Nullable Object> ClosingFuture<U> transformAsync( 784 final AsyncClosingFunction<? super V, U> function, Executor executor) { 785 checkNotNull(function); 786 AsyncFunction<V, U> applyFunction = 787 new AsyncFunction<V, U>() { 788 @Override 789 public ListenableFuture<U> apply(V input) throws Exception { 790 return closeables.applyAsyncClosingFunction(function, input); 791 } 792 793 @Override 794 public String toString() { 795 return function.toString(); 796 } 797 }; 798 return derive(future.transformAsync(applyFunction, executor)); 799 } 800 801 /** 802 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 803 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 804 * {@link ListenableFuture}. 805 * 806 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 807 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 808 * meets these conditions: 809 * 810 * <ul> 811 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 812 * DeferredCloser#eventuallyClose(Object, Executor)}. 813 * <li>It returns a {@link ListenableFuture}. 814 * </ul> 815 * 816 * <p>Example usage: 817 * 818 * <pre>{@code 819 * // Result.getRowsFuture() returns a ListenableFuture. 820 * ClosingFuture<List<Row>> rowsFuture = 821 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 822 * }</pre> 823 * 824 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 825 * ListenableFuture} with the value of a derived step 826 */ 827 public static <V extends @Nullable Object, U extends @Nullable Object> 828 AsyncClosingFunction<V, U> withoutCloser(final AsyncFunction<V, U> function) { 829 checkNotNull(function); 830 return new AsyncClosingFunction<V, U>() { 831 @Override 832 public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 833 return ClosingFuture.from(function.apply(input)); 834 } 835 }; 836 } 837 838 /** 839 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 840 * to its exception if it is an instance of a given exception type. The function can use a {@link 841 * DeferredCloser} to capture objects to be closed when the pipeline is done. 842 * 843 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 844 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 845 * one. 846 * 847 * <p>If the function throws an exception, that exception is used as the result of the derived 848 * {@code ClosingFuture}. 849 * 850 * <p>Example usage: 851 * 852 * <pre>{@code 853 * ClosingFuture<QueryResult> queryFuture = 854 * queryFuture.catching( 855 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 856 * }</pre> 857 * 858 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 859 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 860 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 861 * 862 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 863 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 864 * the original {@code ClosingFuture} instance. 865 * 866 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 867 * type is matched against this step's exception. "This step's exception" means the cause of 868 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 869 * underlying this step or, if {@code get()} throws a different kind of exception, that 870 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 871 * prefer more specific types, avoiding {@code Throwable.class} in particular. 872 * @param fallback the function to be called if this step fails with the expected exception type. 873 * The function's argument is this step's exception. "This step's exception" means the cause 874 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 875 * underlying this step or, if {@code get()} throws a different kind of exception, that 876 * exception itself. 877 * @param executor the executor that runs {@code fallback} if the input fails 878 */ 879 public <X extends Throwable> ClosingFuture<V> catching( 880 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 881 return catchingMoreGeneric(exceptionType, fallback, executor); 882 } 883 884 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 885 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 886 Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 887 checkNotNull(fallback); 888 AsyncFunction<X, W> applyFallback = 889 new AsyncFunction<X, W>() { 890 @Override 891 public ListenableFuture<W> apply(X exception) throws Exception { 892 return closeables.applyClosingFunction(fallback, exception); 893 } 894 895 @Override 896 public String toString() { 897 return fallback.toString(); 898 } 899 }; 900 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 901 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 902 } 903 904 /** 905 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 906 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 907 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 908 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 909 * 910 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 911 * ClosingFuture} will be equivalent to the one returned by the function. 912 * 913 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 914 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 915 * one. 916 * 917 * <p>If the function throws an exception, that exception is used as the result of the derived 918 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 919 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 920 * closed. 921 * 922 * <p>Usage guidelines for this method: 923 * 924 * <ul> 925 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 926 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 927 * ClosingFunction, Executor)} instead, with a function that returns the next value 928 * directly. 929 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 930 * for every closeable object this step creates in order to capture it for later closing. 931 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 932 * ClosingFuture} call {@link #from(ListenableFuture)}. 933 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 934 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 935 * {@link #withoutCloser(AsyncFunction)} 936 * </ul> 937 * 938 * <p>Example usage: 939 * 940 * <pre>{@code 941 * // Fall back to a secondary input stream in case of IOException. 942 * ClosingFuture<InputStream> inputFuture = 943 * firstInputFuture.catchingAsync( 944 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 945 * } 946 * }</pre> 947 * 948 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 949 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 950 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 951 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 952 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 953 * responsible for completing the returned {@code ClosingFuture}.) 954 * 955 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 956 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 957 * the original {@code ClosingFuture} instance. 958 * 959 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 960 * type is matched against this step's exception. "This step's exception" means the cause of 961 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 962 * underlying this step or, if {@code get()} throws a different kind of exception, that 963 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 964 * prefer more specific types, avoiding {@code Throwable.class} in particular. 965 * @param fallback the function to be called if this step fails with the expected exception type. 966 * The function's argument is this step's exception. "This step's exception" means the cause 967 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 968 * underlying this step or, if {@code get()} throws a different kind of exception, that 969 * exception itself. 970 * @param executor the executor that runs {@code fallback} if the input fails 971 */ 972 // TODO(dpb): Should this do something special if the function throws CancellationException or 973 // ExecutionException? 974 public <X extends Throwable> ClosingFuture<V> catchingAsync( 975 Class<X> exceptionType, 976 AsyncClosingFunction<? super X, ? extends V> fallback, 977 Executor executor) { 978 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 979 } 980 981 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 982 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 983 Class<X> exceptionType, 984 final AsyncClosingFunction<? super X, W> fallback, 985 Executor executor) { 986 checkNotNull(fallback); 987 AsyncFunction<X, W> asyncFunction = 988 new AsyncFunction<X, W>() { 989 @Override 990 public ListenableFuture<W> apply(X exception) throws Exception { 991 return closeables.applyAsyncClosingFunction(fallback, exception); 992 } 993 994 @Override 995 public String toString() { 996 return fallback.toString(); 997 } 998 }; 999 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 1000 } 1001 1002 /** 1003 * Marks this step as the last step in the {@code ClosingFuture} pipeline. 1004 * 1005 * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when 1006 * the pipeline is cancelled. 1007 * 1008 * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously 1009 * <b>after</b> the returned {@code Future} is done: the future completes before closing starts, 1010 * rather than once it has finished. 1011 * 1012 * <p>After calling this method, you may not call {@link 1013 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 1014 * derivation method on the original {@code ClosingFuture} instance. 1015 * 1016 * @return a {@link Future} that represents the final value or exception of the pipeline 1017 */ 1018 public FluentFuture<V> finishToFuture() { 1019 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 1020 logger.get().log(FINER, "will close {0}", this); 1021 future.addListener( 1022 new Runnable() { 1023 @Override 1024 public void run() { 1025 checkAndUpdateState(WILL_CLOSE, CLOSING); 1026 close(); 1027 checkAndUpdateState(CLOSING, CLOSED); 1028 } 1029 }, 1030 directExecutor()); 1031 } else { 1032 switch (state.get()) { 1033 case SUBSUMED: 1034 throw new IllegalStateException( 1035 "Cannot call finishToFuture() after deriving another step"); 1036 1037 case WILL_CREATE_VALUE_AND_CLOSER: 1038 throw new IllegalStateException( 1039 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 1040 1041 case WILL_CLOSE: 1042 case CLOSING: 1043 case CLOSED: 1044 throw new IllegalStateException("Cannot call finishToFuture() twice"); 1045 1046 case OPEN: 1047 throw new AssertionError(); 1048 } 1049 } 1050 return future; 1051 } 1052 1053 /** 1054 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 1055 * {@code receiver} will be called with an object that contains the result of the operation. The 1056 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 1057 * 1058 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 1059 * any other derivation method on the original {@code ClosingFuture} instance. 1060 * 1061 * @param consumer a callback whose method will be called (using {@code executor}) when this 1062 * operation is done 1063 */ 1064 public void finishToValueAndCloser( 1065 final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 1066 checkNotNull(consumer); 1067 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 1068 switch (state.get()) { 1069 case SUBSUMED: 1070 throw new IllegalStateException( 1071 "Cannot call finishToValueAndCloser() after deriving another step"); 1072 1073 case WILL_CLOSE: 1074 case CLOSING: 1075 case CLOSED: 1076 throw new IllegalStateException( 1077 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1078 1079 case WILL_CREATE_VALUE_AND_CLOSER: 1080 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1081 1082 case OPEN: 1083 break; 1084 } 1085 throw new AssertionError(state); 1086 } 1087 future.addListener( 1088 new Runnable() { 1089 @Override 1090 public void run() { 1091 provideValueAndCloser(consumer, ClosingFuture.this); 1092 } 1093 }, 1094 executor); 1095 } 1096 1097 private static <C extends @Nullable Object, V extends C> void provideValueAndCloser( 1098 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1099 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1100 } 1101 1102 /** 1103 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1104 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1105 * successful, and this step has not started when {@code cancel} is called, this step should never 1106 * run. 1107 * 1108 * <p>If successful, causes the objects captured by this step (if already started) and its input 1109 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1110 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1111 * 1112 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1113 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1114 * cancelled regardless 1115 * @return {@code false} if the step could not be cancelled, typically because it has already 1116 * completed normally; {@code true} otherwise 1117 */ 1118 @CanIgnoreReturnValue 1119 public boolean cancel(boolean mayInterruptIfRunning) { 1120 logger.get().log(FINER, "cancelling {0}", this); 1121 boolean cancelled = future.cancel(mayInterruptIfRunning); 1122 if (cancelled) { 1123 close(); 1124 } 1125 return cancelled; 1126 } 1127 1128 private void close() { 1129 logger.get().log(FINER, "closing {0}", this); 1130 closeables.close(); 1131 } 1132 1133 private <U extends @Nullable Object> ClosingFuture<U> derive(FluentFuture<U> future) { 1134 ClosingFuture<U> derived = new ClosingFuture<>(future); 1135 becomeSubsumedInto(derived.closeables); 1136 return derived; 1137 } 1138 1139 private void becomeSubsumedInto(CloseableList otherCloseables) { 1140 checkAndUpdateState(OPEN, SUBSUMED); 1141 otherCloseables.add(closeables, directExecutor()); 1142 } 1143 1144 /** 1145 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1146 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1147 * 1148 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1149 */ 1150 public static final class Peeker { 1151 private final ImmutableList<ClosingFuture<?>> futures; 1152 private volatile boolean beingCalled; 1153 1154 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1155 this.futures = checkNotNull(futures); 1156 } 1157 1158 /** 1159 * Returns the value of {@code closingFuture}. 1160 * 1161 * @throws ExecutionException if {@code closingFuture} is a failed step 1162 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1163 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1164 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1165 * @throws IllegalStateException if called outside of a call to {@link 1166 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1167 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1168 */ 1169 @ParametricNullness 1170 public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture) 1171 throws ExecutionException { 1172 checkState(beingCalled); 1173 checkArgument(futures.contains(closingFuture)); 1174 return Futures.getDone(closingFuture.future); 1175 } 1176 1177 @ParametricNullness 1178 private <V extends @Nullable Object> V call( 1179 CombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1180 beingCalled = true; 1181 CloseableList newCloseables = new CloseableList(); 1182 try { 1183 return combiner.call(newCloseables.closer, this); 1184 } finally { 1185 closeables.add(newCloseables, directExecutor()); 1186 beingCalled = false; 1187 } 1188 } 1189 1190 private <V extends @Nullable Object> FluentFuture<V> callAsync( 1191 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1192 beingCalled = true; 1193 CloseableList newCloseables = new CloseableList(); 1194 try { 1195 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1196 closingFuture.becomeSubsumedInto(closeables); 1197 return closingFuture.future; 1198 } finally { 1199 closeables.add(newCloseables, directExecutor()); 1200 beingCalled = false; 1201 } 1202 } 1203 } 1204 1205 /** 1206 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1207 * 1208 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1209 * instantiate this class. 1210 * 1211 * <p>Example: 1212 * 1213 * <pre>{@code 1214 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1215 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1216 * ListenableFuture<Integer> numberOfDifferentLines = 1217 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1218 * .call( 1219 * (closer, peeker) -> { 1220 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1221 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1222 * return countDifferentLines(file1Reader, file2Reader); 1223 * }, 1224 * executor) 1225 * .closing(executor); 1226 * }</pre> 1227 */ 1228 // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8. 1229 @com.google.errorprone.annotations.DoNotMock( 1230 "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1231 public static class Combiner { 1232 1233 private final CloseableList closeables = new CloseableList(); 1234 1235 /** 1236 * An operation that returns a result and may throw an exception. 1237 * 1238 * @param <V> the type of the result 1239 */ 1240 public interface CombiningCallable<V extends @Nullable Object> { 1241 /** 1242 * Computes a result, or throws an exception if unable to do so. 1243 * 1244 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1245 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1246 * (but not before this method completes), even if this method throws or the pipeline is 1247 * cancelled. 1248 * 1249 * @param peeker used to get the value of any of the input futures 1250 */ 1251 @ParametricNullness 1252 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1253 } 1254 1255 /** 1256 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1257 * 1258 * @param <V> the type of the result 1259 */ 1260 public interface AsyncCombiningCallable<V extends @Nullable Object> { 1261 /** 1262 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1263 * 1264 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1265 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1266 * (but not before this method completes), even if this method throws or the pipeline is 1267 * cancelled. 1268 * 1269 * @param peeker used to get the value of any of the input futures 1270 */ 1271 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1272 } 1273 1274 private final boolean allMustSucceed; 1275 protected final ImmutableList<ClosingFuture<?>> inputs; 1276 1277 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1278 this.allMustSucceed = allMustSucceed; 1279 this.inputs = ImmutableList.copyOf(inputs); 1280 for (ClosingFuture<?> input : inputs) { 1281 input.becomeSubsumedInto(closeables); 1282 } 1283 } 1284 1285 /** 1286 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1287 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1288 * objects to be closed when the pipeline is done. 1289 * 1290 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1291 * fail, so will the returned step. 1292 * 1293 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1294 * cancelled. 1295 * 1296 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1297 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1298 */ 1299 public <V extends @Nullable Object> ClosingFuture<V> call( 1300 final CombiningCallable<V> combiningCallable, Executor executor) { 1301 Callable<V> callable = 1302 new Callable<V>() { 1303 @Override 1304 @ParametricNullness 1305 public V call() throws Exception { 1306 return new Peeker(inputs).call(combiningCallable, closeables); 1307 } 1308 1309 @Override 1310 public String toString() { 1311 return combiningCallable.toString(); 1312 } 1313 }; 1314 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1315 derived.closeables.add(closeables, directExecutor()); 1316 return derived; 1317 } 1318 1319 /** 1320 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1321 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1322 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1323 * captured by the returned {@link ClosingFuture}). 1324 * 1325 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1326 * fail, so will the returned step. 1327 * 1328 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1329 * cancelled. 1330 * 1331 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1332 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1333 * 1334 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1335 * derived step. 1336 * 1337 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1338 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1339 * 1340 * <p>Usage guidelines for this method: 1341 * 1342 * <ul> 1343 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1344 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1345 * Executor)} instead, with a function that returns the next value directly. 1346 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1347 * for every closeable object this step creates in order to capture it for later closing. 1348 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1349 * ClosingFuture} call {@link #from(ListenableFuture)}. 1350 * </ul> 1351 * 1352 * <p>The same warnings about doing heavyweight operations within {@link 1353 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1354 */ 1355 public <V extends @Nullable Object> ClosingFuture<V> callAsync( 1356 final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1357 AsyncCallable<V> asyncCallable = 1358 new AsyncCallable<V>() { 1359 @Override 1360 public ListenableFuture<V> call() throws Exception { 1361 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1362 } 1363 1364 @Override 1365 public String toString() { 1366 return combiningCallable.toString(); 1367 } 1368 }; 1369 ClosingFuture<V> derived = 1370 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1371 derived.closeables.add(closeables, directExecutor()); 1372 return derived; 1373 } 1374 1375 private FutureCombiner<@Nullable Object> futureCombiner() { 1376 return allMustSucceed 1377 ? Futures.whenAllSucceed(inputFutures()) 1378 : Futures.whenAllComplete(inputFutures()); 1379 } 1380 1381 1382 private ImmutableList<FluentFuture<?>> inputFutures() { 1383 return FluentIterable.from(inputs) 1384 .<FluentFuture<?>>transform(future -> future.future) 1385 .toList(); 1386 } 1387 } 1388 1389 /** 1390 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1391 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1392 * combination. 1393 * 1394 * @param <V1> the type returned by the first future 1395 * @param <V2> the type returned by the second future 1396 */ 1397 public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object> 1398 extends Combiner { 1399 1400 /** 1401 * A function that returns a value when applied to the values of the two futures passed to 1402 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1403 * 1404 * @param <V1> the type returned by the first future 1405 * @param <V2> the type returned by the second future 1406 * @param <U> the type returned by the function 1407 */ 1408 public interface ClosingFunction2< 1409 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1410 1411 /** 1412 * Applies this function to two inputs, or throws an exception if unable to do so. 1413 * 1414 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1415 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1416 * (but not before this method completes), even if this method throws or the pipeline is 1417 * cancelled. 1418 */ 1419 @ParametricNullness 1420 U apply(DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1421 throws Exception; 1422 } 1423 1424 /** 1425 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1426 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1427 * 1428 * @param <V1> the type returned by the first future 1429 * @param <V2> the type returned by the second future 1430 * @param <U> the type returned by the function 1431 */ 1432 public interface AsyncClosingFunction2< 1433 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1434 1435 /** 1436 * Applies this function to two inputs, or throws an exception if unable to do so. 1437 * 1438 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1439 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1440 * (but not before this method completes), even if this method throws or the pipeline is 1441 * cancelled. 1442 */ 1443 ClosingFuture<U> apply( 1444 DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1445 throws Exception; 1446 } 1447 1448 private final ClosingFuture<V1> future1; 1449 private final ClosingFuture<V2> future2; 1450 1451 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1452 super(true, ImmutableList.of(future1, future2)); 1453 this.future1 = future1; 1454 this.future2 = future2; 1455 } 1456 1457 /** 1458 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1459 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1460 * objects to be closed when the pipeline is done. 1461 * 1462 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1463 * any of the inputs fail, so will the returned step. 1464 * 1465 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1466 * 1467 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1468 * ExecutionException} will be extracted and used as the failure of the derived step. 1469 */ 1470 public <U extends @Nullable Object> ClosingFuture<U> call( 1471 final ClosingFunction2<V1, V2, U> function, Executor executor) { 1472 return call( 1473 new CombiningCallable<U>() { 1474 @Override 1475 @ParametricNullness 1476 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1477 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1478 } 1479 1480 @Override 1481 public String toString() { 1482 return function.toString(); 1483 } 1484 }, 1485 executor); 1486 } 1487 1488 /** 1489 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1490 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1491 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1492 * captured by the returned {@link ClosingFuture}). 1493 * 1494 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1495 * any of the inputs fail, so will the returned step. 1496 * 1497 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1498 * 1499 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1500 * ExecutionException} will be extracted and used as the failure of the derived step. 1501 * 1502 * <p>If the function throws any other exception, it will be used as the failure of the derived 1503 * step. 1504 * 1505 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1506 * the closeable objects in that {@code ClosingFuture} will be closed. 1507 * 1508 * <p>Usage guidelines for this method: 1509 * 1510 * <ul> 1511 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1512 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1513 * Executor)} instead, with a function that returns the next value directly. 1514 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1515 * for every closeable object this step creates in order to capture it for later closing. 1516 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1517 * ClosingFuture} call {@link #from(ListenableFuture)}. 1518 * </ul> 1519 * 1520 * <p>The same warnings about doing heavyweight operations within {@link 1521 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1522 */ 1523 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1524 final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1525 return callAsync( 1526 new AsyncCombiningCallable<U>() { 1527 @Override 1528 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1529 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1530 } 1531 1532 @Override 1533 public String toString() { 1534 return function.toString(); 1535 } 1536 }, 1537 executor); 1538 } 1539 } 1540 1541 /** 1542 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1543 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1544 * ClosingFuture)} to start this combination. 1545 * 1546 * @param <V1> the type returned by the first future 1547 * @param <V2> the type returned by the second future 1548 * @param <V3> the type returned by the third future 1549 */ 1550 public static final class Combiner3< 1551 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 1552 extends Combiner { 1553 /** 1554 * A function that returns a value when applied to the values of the three futures passed to 1555 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1556 * 1557 * @param <V1> the type returned by the first future 1558 * @param <V2> the type returned by the second future 1559 * @param <V3> the type returned by the third future 1560 * @param <U> the type returned by the function 1561 */ 1562 public interface ClosingFunction3< 1563 V1 extends @Nullable Object, 1564 V2 extends @Nullable Object, 1565 V3 extends @Nullable Object, 1566 U extends @Nullable Object> { 1567 /** 1568 * Applies this function to three inputs, or throws an exception if unable to do so. 1569 * 1570 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1571 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1572 * (but not before this method completes), even if this method throws or the pipeline is 1573 * cancelled. 1574 */ 1575 @ParametricNullness 1576 U apply( 1577 DeferredCloser closer, 1578 @ParametricNullness V1 value1, 1579 @ParametricNullness V2 value2, 1580 @ParametricNullness V3 value3) 1581 throws Exception; 1582 } 1583 1584 /** 1585 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1586 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1587 * 1588 * @param <V1> the type returned by the first future 1589 * @param <V2> the type returned by the second future 1590 * @param <V3> the type returned by the third future 1591 * @param <U> the type returned by the function 1592 */ 1593 public interface AsyncClosingFunction3< 1594 V1 extends @Nullable Object, 1595 V2 extends @Nullable Object, 1596 V3 extends @Nullable Object, 1597 U extends @Nullable Object> { 1598 /** 1599 * Applies this function to three inputs, or throws an exception if unable to do so. 1600 * 1601 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1602 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1603 * (but not before this method completes), even if this method throws or the pipeline is 1604 * cancelled. 1605 */ 1606 ClosingFuture<U> apply( 1607 DeferredCloser closer, 1608 @ParametricNullness V1 value1, 1609 @ParametricNullness V2 value2, 1610 @ParametricNullness V3 value3) 1611 throws Exception; 1612 } 1613 1614 private final ClosingFuture<V1> future1; 1615 private final ClosingFuture<V2> future2; 1616 private final ClosingFuture<V3> future3; 1617 1618 private Combiner3( 1619 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1620 super(true, ImmutableList.of(future1, future2, future3)); 1621 this.future1 = future1; 1622 this.future2 = future2; 1623 this.future3 = future3; 1624 } 1625 1626 /** 1627 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1628 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1629 * objects to be closed when the pipeline is done. 1630 * 1631 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1632 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1633 * 1634 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1635 * 1636 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1637 * ExecutionException} will be extracted and used as the failure of the derived step. 1638 */ 1639 public <U extends @Nullable Object> ClosingFuture<U> call( 1640 final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1641 return call( 1642 new CombiningCallable<U>() { 1643 @Override 1644 @ParametricNullness 1645 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1646 return function.apply( 1647 closer, 1648 peeker.getDone(future1), 1649 peeker.getDone(future2), 1650 peeker.getDone(future3)); 1651 } 1652 1653 @Override 1654 public String toString() { 1655 return function.toString(); 1656 } 1657 }, 1658 executor); 1659 } 1660 1661 /** 1662 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1663 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1664 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1665 * captured by the returned {@link ClosingFuture}). 1666 * 1667 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1668 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1669 * 1670 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1671 * 1672 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1673 * ExecutionException} will be extracted and used as the failure of the derived step. 1674 * 1675 * <p>If the function throws any other exception, it will be used as the failure of the derived 1676 * step. 1677 * 1678 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1679 * the closeable objects in that {@code ClosingFuture} will be closed. 1680 * 1681 * <p>Usage guidelines for this method: 1682 * 1683 * <ul> 1684 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1685 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1686 * Executor)} instead, with a function that returns the next value directly. 1687 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1688 * for every closeable object this step creates in order to capture it for later closing. 1689 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1690 * ClosingFuture} call {@link #from(ListenableFuture)}. 1691 * </ul> 1692 * 1693 * <p>The same warnings about doing heavyweight operations within {@link 1694 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1695 */ 1696 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1697 final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1698 return callAsync( 1699 new AsyncCombiningCallable<U>() { 1700 @Override 1701 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1702 return function.apply( 1703 closer, 1704 peeker.getDone(future1), 1705 peeker.getDone(future2), 1706 peeker.getDone(future3)); 1707 } 1708 1709 @Override 1710 public String toString() { 1711 return function.toString(); 1712 } 1713 }, 1714 executor); 1715 } 1716 } 1717 1718 /** 1719 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1720 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1721 * ClosingFuture)} to start this combination. 1722 * 1723 * @param <V1> the type returned by the first future 1724 * @param <V2> the type returned by the second future 1725 * @param <V3> the type returned by the third future 1726 * @param <V4> the type returned by the fourth future 1727 */ 1728 public static final class Combiner4< 1729 V1 extends @Nullable Object, 1730 V2 extends @Nullable Object, 1731 V3 extends @Nullable Object, 1732 V4 extends @Nullable Object> 1733 extends Combiner { 1734 /** 1735 * A function that returns a value when applied to the values of the four futures passed to 1736 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1737 * 1738 * @param <V1> the type returned by the first future 1739 * @param <V2> the type returned by the second future 1740 * @param <V3> the type returned by the third future 1741 * @param <V4> the type returned by the fourth future 1742 * @param <U> the type returned by the function 1743 */ 1744 public interface ClosingFunction4< 1745 V1 extends @Nullable Object, 1746 V2 extends @Nullable Object, 1747 V3 extends @Nullable Object, 1748 V4 extends @Nullable Object, 1749 U extends @Nullable Object> { 1750 /** 1751 * Applies this function to four inputs, or throws an exception if unable to do so. 1752 * 1753 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1754 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1755 * (but not before this method completes), even if this method throws or the pipeline is 1756 * cancelled. 1757 */ 1758 @ParametricNullness 1759 U apply( 1760 DeferredCloser closer, 1761 @ParametricNullness V1 value1, 1762 @ParametricNullness V2 value2, 1763 @ParametricNullness V3 value3, 1764 @ParametricNullness V4 value4) 1765 throws Exception; 1766 } 1767 1768 /** 1769 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1770 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1771 * ClosingFuture)}. 1772 * 1773 * @param <V1> the type returned by the first future 1774 * @param <V2> the type returned by the second future 1775 * @param <V3> the type returned by the third future 1776 * @param <V4> the type returned by the fourth future 1777 * @param <U> the type returned by the function 1778 */ 1779 public interface AsyncClosingFunction4< 1780 V1 extends @Nullable Object, 1781 V2 extends @Nullable Object, 1782 V3 extends @Nullable Object, 1783 V4 extends @Nullable Object, 1784 U extends @Nullable Object> { 1785 /** 1786 * Applies this function to four inputs, or throws an exception if unable to do so. 1787 * 1788 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1789 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1790 * (but not before this method completes), even if this method throws or the pipeline is 1791 * cancelled. 1792 */ 1793 ClosingFuture<U> apply( 1794 DeferredCloser closer, 1795 @ParametricNullness V1 value1, 1796 @ParametricNullness V2 value2, 1797 @ParametricNullness V3 value3, 1798 @ParametricNullness V4 value4) 1799 throws Exception; 1800 } 1801 1802 private final ClosingFuture<V1> future1; 1803 private final ClosingFuture<V2> future2; 1804 private final ClosingFuture<V3> future3; 1805 private final ClosingFuture<V4> future4; 1806 1807 private Combiner4( 1808 ClosingFuture<V1> future1, 1809 ClosingFuture<V2> future2, 1810 ClosingFuture<V3> future3, 1811 ClosingFuture<V4> future4) { 1812 super(true, ImmutableList.of(future1, future2, future3, future4)); 1813 this.future1 = future1; 1814 this.future2 = future2; 1815 this.future3 = future3; 1816 this.future4 = future4; 1817 } 1818 1819 /** 1820 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1821 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1822 * objects to be closed when the pipeline is done. 1823 * 1824 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1825 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1826 * 1827 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1828 * 1829 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1830 * ExecutionException} will be extracted and used as the failure of the derived step. 1831 */ 1832 public <U extends @Nullable Object> ClosingFuture<U> call( 1833 final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1834 return call( 1835 new CombiningCallable<U>() { 1836 @Override 1837 @ParametricNullness 1838 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1839 return function.apply( 1840 closer, 1841 peeker.getDone(future1), 1842 peeker.getDone(future2), 1843 peeker.getDone(future3), 1844 peeker.getDone(future4)); 1845 } 1846 1847 @Override 1848 public String toString() { 1849 return function.toString(); 1850 } 1851 }, 1852 executor); 1853 } 1854 1855 /** 1856 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1857 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1858 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1859 * captured by the returned {@link ClosingFuture}). 1860 * 1861 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1862 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1863 * 1864 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1865 * 1866 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1867 * ExecutionException} will be extracted and used as the failure of the derived step. 1868 * 1869 * <p>If the function throws any other exception, it will be used as the failure of the derived 1870 * step. 1871 * 1872 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1873 * the closeable objects in that {@code ClosingFuture} will be closed. 1874 * 1875 * <p>Usage guidelines for this method: 1876 * 1877 * <ul> 1878 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1879 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1880 * Executor)} instead, with a function that returns the next value directly. 1881 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1882 * for every closeable object this step creates in order to capture it for later closing. 1883 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1884 * ClosingFuture} call {@link #from(ListenableFuture)}. 1885 * </ul> 1886 * 1887 * <p>The same warnings about doing heavyweight operations within {@link 1888 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1889 */ 1890 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1891 final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1892 return callAsync( 1893 new AsyncCombiningCallable<U>() { 1894 @Override 1895 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1896 return function.apply( 1897 closer, 1898 peeker.getDone(future1), 1899 peeker.getDone(future2), 1900 peeker.getDone(future3), 1901 peeker.getDone(future4)); 1902 } 1903 1904 @Override 1905 public String toString() { 1906 return function.toString(); 1907 } 1908 }, 1909 executor); 1910 } 1911 } 1912 1913 /** 1914 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1915 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1916 * ClosingFuture, ClosingFuture)} to start this combination. 1917 * 1918 * @param <V1> the type returned by the first future 1919 * @param <V2> the type returned by the second future 1920 * @param <V3> the type returned by the third future 1921 * @param <V4> the type returned by the fourth future 1922 * @param <V5> the type returned by the fifth future 1923 */ 1924 public static final class Combiner5< 1925 V1 extends @Nullable Object, 1926 V2 extends @Nullable Object, 1927 V3 extends @Nullable Object, 1928 V4 extends @Nullable Object, 1929 V5 extends @Nullable Object> 1930 extends Combiner { 1931 /** 1932 * A function that returns a value when applied to the values of the five futures passed to 1933 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1934 * ClosingFuture)}. 1935 * 1936 * @param <V1> the type returned by the first future 1937 * @param <V2> the type returned by the second future 1938 * @param <V3> the type returned by the third future 1939 * @param <V4> the type returned by the fourth future 1940 * @param <V5> the type returned by the fifth future 1941 * @param <U> the type returned by the function 1942 */ 1943 public interface ClosingFunction5< 1944 V1 extends @Nullable Object, 1945 V2 extends @Nullable Object, 1946 V3 extends @Nullable Object, 1947 V4 extends @Nullable Object, 1948 V5 extends @Nullable Object, 1949 U extends @Nullable Object> { 1950 /** 1951 * Applies this function to five inputs, or throws an exception if unable to do so. 1952 * 1953 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1954 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1955 * (but not before this method completes), even if this method throws or the pipeline is 1956 * cancelled. 1957 */ 1958 @ParametricNullness 1959 U apply( 1960 DeferredCloser closer, 1961 @ParametricNullness V1 value1, 1962 @ParametricNullness V2 value2, 1963 @ParametricNullness V3 value3, 1964 @ParametricNullness V4 value4, 1965 @ParametricNullness V5 value5) 1966 throws Exception; 1967 } 1968 1969 /** 1970 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1971 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1972 * ClosingFuture, ClosingFuture)}. 1973 * 1974 * @param <V1> the type returned by the first future 1975 * @param <V2> the type returned by the second future 1976 * @param <V3> the type returned by the third future 1977 * @param <V4> the type returned by the fourth future 1978 * @param <V5> the type returned by the fifth future 1979 * @param <U> the type returned by the function 1980 */ 1981 public interface AsyncClosingFunction5< 1982 V1 extends @Nullable Object, 1983 V2 extends @Nullable Object, 1984 V3 extends @Nullable Object, 1985 V4 extends @Nullable Object, 1986 V5 extends @Nullable Object, 1987 U extends @Nullable Object> { 1988 /** 1989 * Applies this function to five inputs, or throws an exception if unable to do so. 1990 * 1991 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1992 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1993 * (but not before this method completes), even if this method throws or the pipeline is 1994 * cancelled. 1995 */ 1996 ClosingFuture<U> apply( 1997 DeferredCloser closer, 1998 @ParametricNullness V1 value1, 1999 @ParametricNullness V2 value2, 2000 @ParametricNullness V3 value3, 2001 @ParametricNullness V4 value4, 2002 @ParametricNullness V5 value5) 2003 throws Exception; 2004 } 2005 2006 private final ClosingFuture<V1> future1; 2007 private final ClosingFuture<V2> future2; 2008 private final ClosingFuture<V3> future3; 2009 private final ClosingFuture<V4> future4; 2010 private final ClosingFuture<V5> future5; 2011 2012 private Combiner5( 2013 ClosingFuture<V1> future1, 2014 ClosingFuture<V2> future2, 2015 ClosingFuture<V3> future3, 2016 ClosingFuture<V4> future4, 2017 ClosingFuture<V5> future5) { 2018 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 2019 this.future1 = future1; 2020 this.future2 = future2; 2021 this.future3 = future3; 2022 this.future4 = future4; 2023 this.future5 = future5; 2024 } 2025 2026 /** 2027 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2028 * combining function to their values. The function can use a {@link DeferredCloser} to capture 2029 * objects to be closed when the pipeline is done. 2030 * 2031 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2032 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2033 * returned step. 2034 * 2035 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2036 * 2037 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2038 * ExecutionException} will be extracted and used as the failure of the derived step. 2039 */ 2040 public <U extends @Nullable Object> ClosingFuture<U> call( 2041 final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2042 return call( 2043 new CombiningCallable<U>() { 2044 @Override 2045 @ParametricNullness 2046 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 2047 return function.apply( 2048 closer, 2049 peeker.getDone(future1), 2050 peeker.getDone(future2), 2051 peeker.getDone(future3), 2052 peeker.getDone(future4), 2053 peeker.getDone(future5)); 2054 } 2055 2056 @Override 2057 public String toString() { 2058 return function.toString(); 2059 } 2060 }, 2061 executor); 2062 } 2063 2064 /** 2065 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2066 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 2067 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 2068 * captured by the returned {@link ClosingFuture}). 2069 * 2070 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2071 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2072 * returned step. 2073 * 2074 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2075 * 2076 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2077 * ExecutionException} will be extracted and used as the failure of the derived step. 2078 * 2079 * <p>If the function throws any other exception, it will be used as the failure of the derived 2080 * step. 2081 * 2082 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2083 * the closeable objects in that {@code ClosingFuture} will be closed. 2084 * 2085 * <p>Usage guidelines for this method: 2086 * 2087 * <ul> 2088 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2089 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2090 * Executor)} instead, with a function that returns the next value directly. 2091 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 2092 * for every closeable object this step creates in order to capture it for later closing. 2093 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2094 * ClosingFuture} call {@link #from(ListenableFuture)}. 2095 * </ul> 2096 * 2097 * <p>The same warnings about doing heavyweight operations within {@link 2098 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2099 */ 2100 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 2101 final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2102 return callAsync( 2103 new AsyncCombiningCallable<U>() { 2104 @Override 2105 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2106 return function.apply( 2107 closer, 2108 peeker.getDone(future1), 2109 peeker.getDone(future2), 2110 peeker.getDone(future3), 2111 peeker.getDone(future4), 2112 peeker.getDone(future5)); 2113 } 2114 2115 @Override 2116 public String toString() { 2117 return function.toString(); 2118 } 2119 }, 2120 executor); 2121 } 2122 } 2123 2124 @Override 2125 public String toString() { 2126 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2127 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2128 } 2129 2130 @Override 2131 protected void finalize() { 2132 if (state.get().equals(OPEN)) { 2133 logger.get().log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2134 FluentFuture<V> unused = finishToFuture(); 2135 } 2136 } 2137 2138 private static void closeQuietly(@CheckForNull final Closeable closeable, Executor executor) { 2139 if (closeable == null) { 2140 return; 2141 } 2142 try { 2143 executor.execute( 2144 () -> { 2145 try { 2146 closeable.close(); 2147 } catch (Exception e) { 2148 /* 2149 * In guava-jre, any kind of Exception may be thrown because `closeable` has type 2150 * `AutoCloseable`. 2151 * 2152 * In guava-android, the only kinds of Exception that may be thrown are 2153 * RuntimeException and IOException because `closeable` has type `Closeable`—except 2154 * that we have to account for sneaky checked exception. 2155 */ 2156 restoreInterruptIfIsInterruptedException(e); 2157 logger.get().log(WARNING, "thrown by close()", e); 2158 } 2159 }); 2160 } catch (RejectedExecutionException e) { 2161 if (logger.get().isLoggable(WARNING)) { 2162 logger 2163 .get() 2164 .log( 2165 WARNING, 2166 String.format("while submitting close to %s; will close inline", executor), 2167 e); 2168 } 2169 closeQuietly(closeable, directExecutor()); 2170 } 2171 } 2172 2173 private void checkAndUpdateState(State oldState, State newState) { 2174 checkState( 2175 compareAndUpdateState(oldState, newState), 2176 "Expected state to be %s, but it was %s", 2177 oldState, 2178 newState); 2179 } 2180 2181 private boolean compareAndUpdateState(State oldState, State newState) { 2182 return state.compareAndSet(oldState, newState); 2183 } 2184 2185 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2186 private static final class CloseableList extends IdentityHashMap<Closeable, Executor> 2187 implements Closeable { 2188 private final DeferredCloser closer = new DeferredCloser(this); 2189 private volatile boolean closed; 2190 @CheckForNull private volatile CountDownLatch whenClosed; 2191 2192 <V extends @Nullable Object, U extends @Nullable Object> 2193 ListenableFuture<U> applyClosingFunction( 2194 ClosingFunction<? super V, U> transformation, @ParametricNullness V input) 2195 throws Exception { 2196 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2197 CloseableList newCloseables = new CloseableList(); 2198 try { 2199 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2200 } finally { 2201 add(newCloseables, directExecutor()); 2202 } 2203 } 2204 2205 <V extends @Nullable Object, U extends @Nullable Object> 2206 FluentFuture<U> applyAsyncClosingFunction( 2207 AsyncClosingFunction<V, U> transformation, @ParametricNullness V input) 2208 throws Exception { 2209 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2210 CloseableList newCloseables = new CloseableList(); 2211 try { 2212 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2213 closingFuture.becomeSubsumedInto(newCloseables); 2214 return closingFuture.future; 2215 } finally { 2216 add(newCloseables, directExecutor()); 2217 } 2218 } 2219 2220 @Override 2221 public void close() { 2222 if (closed) { 2223 return; 2224 } 2225 synchronized (this) { 2226 if (closed) { 2227 return; 2228 } 2229 closed = true; 2230 } 2231 for (Map.Entry<Closeable, Executor> entry : entrySet()) { 2232 closeQuietly(entry.getKey(), entry.getValue()); 2233 } 2234 clear(); 2235 if (whenClosed != null) { 2236 whenClosed.countDown(); 2237 } 2238 } 2239 2240 void add(@CheckForNull Closeable closeable, Executor executor) { 2241 checkNotNull(executor); 2242 if (closeable == null) { 2243 return; 2244 } 2245 synchronized (this) { 2246 if (!closed) { 2247 put(closeable, executor); 2248 return; 2249 } 2250 } 2251 closeQuietly(closeable, executor); 2252 } 2253 2254 /** 2255 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2256 */ 2257 CountDownLatch whenClosedCountDown() { 2258 if (closed) { 2259 return new CountDownLatch(0); 2260 } 2261 synchronized (this) { 2262 if (closed) { 2263 return new CountDownLatch(0); 2264 } 2265 checkState(whenClosed == null); 2266 return whenClosed = new CountDownLatch(1); 2267 } 2268 } 2269 } 2270 2271 /** 2272 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2273 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2274 */ 2275 @VisibleForTesting 2276 CountDownLatch whenClosedCountDown() { 2277 return closeables.whenClosedCountDown(); 2278 } 2279 2280 /** The state of a {@link CloseableList}. */ 2281 enum State { 2282 /** The {@link CloseableList} has not been subsumed or closed. */ 2283 OPEN, 2284 2285 /** 2286 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2287 * into any other. 2288 */ 2289 SUBSUMED, 2290 2291 /** 2292 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2293 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2294 */ 2295 WILL_CLOSE, 2296 2297 /** 2298 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2299 * {@link CloseableList} may not be subsumed. 2300 */ 2301 CLOSING, 2302 2303 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2304 CLOSED, 2305 2306 /** 2307 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2308 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2309 */ 2310 WILL_CREATE_VALUE_AND_CLOSER, 2311 } 2312}