001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 036import static java.util.logging.Level.FINER; 037import static java.util.logging.Level.SEVERE; 038import static java.util.logging.Level.WARNING; 039 040import com.google.common.annotations.J2ktIncompatible; 041import com.google.common.annotations.VisibleForTesting; 042import com.google.common.collect.FluentIterable; 043import com.google.common.collect.ImmutableList; 044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 046import com.google.common.util.concurrent.Futures.FutureCombiner; 047import com.google.errorprone.annotations.CanIgnoreReturnValue; 048import com.google.errorprone.annotations.DoNotMock; 049import com.google.j2objc.annotations.RetainedWith; 050import java.io.Closeable; 051import java.util.IdentityHashMap; 052import java.util.Map; 053import java.util.concurrent.Callable; 054import java.util.concurrent.CancellationException; 055import java.util.concurrent.CountDownLatch; 056import java.util.concurrent.ExecutionException; 057import java.util.concurrent.Executor; 058import java.util.concurrent.Future; 059import java.util.concurrent.RejectedExecutionException; 060import java.util.concurrent.atomic.AtomicReference; 061import javax.annotation.CheckForNull; 062import org.checkerframework.checker.nullness.qual.Nullable; 063 064/** 065 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 066 * complete, some objects captured during the computation are closed. 067 * 068 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 069 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 070 * cancellation of the operation so far. The only way to extract the value or exception from a step 071 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 072 * "value" of a successful step or the "result" (value or exception) of any step. 073 * 074 * <ol> 075 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 076 * block or a {@link ListenableFuture}. 077 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 078 * can be captured for later closing. 079 * <li>There is one last step (the root of the tree), from which you can extract the final result 080 * of the computation. After that result is available (or the computation fails), all objects 081 * captured by any of the steps in the pipeline are closed. 082 * </ol> 083 * 084 * <h3>Starting a pipeline</h3> 085 * 086 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 087 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 088 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 089 * #from(ListenableFuture)} instead. 090 * 091 * <h3>Derived steps</h3> 092 * 093 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 094 * ways similar to {@link FluentFuture}s: 095 * 096 * <ul> 097 * <li>by transforming the value from a successful input step, 098 * <li>by catching the exception from a failed input step, or 099 * <li>by combining the results of several input steps. 100 * </ul> 101 * 102 * Each derivation can capture the next value or any intermediate objects for later closing. 103 * 104 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 105 * exception, or combine it with others, you cannot do anything else with it, including declare it 106 * to be the last step of the pipeline. 107 * 108 * <h4>Transforming</h4> 109 * 110 * To derive the next step by asynchronously applying a function to an input step's value, call 111 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 112 * Executor)} on the input step. 113 * 114 * <h4>Catching</h4> 115 * 116 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 117 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 118 * 119 * <h4>Combining</h4> 120 * 121 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 122 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 123 * 124 * <h3>Cancelling</h3> 125 * 126 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 127 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 128 * successfully cancelled step will immediately start closing all objects captured for later closing 129 * by it and by its input steps. 130 * 131 * <h3>Ending a pipeline</h3> 132 * 133 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 134 * close the captured objects automatically or manually. 135 * 136 * <h4>Automatically closing</h4> 137 * 138 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 139 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin 140 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future 141 * completes before closing starts, rather than once it has finished. 142 * 143 * <pre>{@code 144 * FluentFuture<UserName> userName = 145 * ClosingFuture.submit( 146 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 147 * executor) 148 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 149 * .transform((closer, result) -> result.get("userName"), directExecutor()) 150 * .catching(DBException.class, e -> "no user", directExecutor()) 151 * .finishToFuture(); 152 * }</pre> 153 * 154 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 155 * result cursor will both be closed, even if the operation is cancelled or fails. 156 * 157 * <h4>Manually closing</h4> 158 * 159 * If you want to close the captured objects manually, after you've used the final result, call 160 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 161 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 162 * 163 * <pre>{@code 164 * ClosingFuture.submit( 165 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 166 * executor) 167 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 168 * .transform((closer, result) -> result.get("userName"), directExecutor()) 169 * .catching(DBException.class, e -> "no user", directExecutor()) 170 * .finishToValueAndCloser( 171 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 172 * 173 * // later 174 * try { // get() will throw if the operation failed or was cancelled. 175 * UserName userName = userNameValueAndCloser.get(); 176 * // do something with userName 177 * } finally { 178 * userNameValueAndCloser.closeAsync(); 179 * } 180 * }</pre> 181 * 182 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 183 * the query result cursor will both be closed, even if the operation is cancelled or fails. 184 * 185 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 186 * automatic-closing approach described above is safer. 187 * 188 * @param <V> the type of the value of this step 189 * @since 30.0 190 */ 191// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 192@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 193@J2ktIncompatible 194@ElementTypesAreNonnullByDefault 195// TODO(dpb): GWT compatibility. 196public final class ClosingFuture<V extends @Nullable Object> { 197 198 private static final LazyLogger logger = new LazyLogger(ClosingFuture.class); 199 200 /** 201 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 202 * done. 203 */ 204 public static final class DeferredCloser { 205 @RetainedWith private final CloseableList list; 206 207 DeferredCloser(CloseableList list) { 208 this.list = list; 209 } 210 211 /** 212 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 213 * 214 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 215 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 216 * Closeable}. (For more about the flavors, see <a 217 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 218 * build</a>.) 219 * 220 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 221 * building for Android): Ensure that any object you pass implements the interface not just in 222 * your current SDK version but also at the oldest version you support. For example, <a 223 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 224 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 225 * {@code Closeable} with a method reference like {@code cursor::close}. 226 * 227 * <p>Note that this method is still binary-compatible between flavors because the erasure of 228 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 229 * 230 * @param closeable the object to be closed (see notes above) 231 * @param closingExecutor the object will be closed on this executor 232 * @return the first argument 233 */ 234 @CanIgnoreReturnValue 235 @ParametricNullness 236 public <C extends @Nullable Object & @Nullable AutoCloseable> C eventuallyClose( 237 @ParametricNullness C closeable, Executor closingExecutor) { 238 checkNotNull(closingExecutor); 239 if (closeable != null) { 240 list.add(closeable, closingExecutor); 241 } 242 return closeable; 243 } 244 } 245 246 /** 247 * An operation that computes a result. 248 * 249 * @param <V> the type of the result 250 */ 251 public interface ClosingCallable<V extends @Nullable Object> { 252 /** 253 * Computes a result, or throws an exception if unable to do so. 254 * 255 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 256 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 257 * not before this method completes), even if this method throws or the pipeline is cancelled. 258 */ 259 @ParametricNullness 260 V call(DeferredCloser closer) throws Exception; 261 } 262 263 /** 264 * An operation that computes a {@link ClosingFuture} of a result. 265 * 266 * @param <V> the type of the result 267 * @since 30.1 268 */ 269 public interface AsyncClosingCallable<V extends @Nullable Object> { 270 /** 271 * Computes a result, or throws an exception if unable to do so. 272 * 273 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 274 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 275 * not before this method completes), even if this method throws or the pipeline is cancelled. 276 */ 277 ClosingFuture<V> call(DeferredCloser closer) throws Exception; 278 } 279 280 /** 281 * A function from an input to a result. 282 * 283 * @param <T> the type of the input to the function 284 * @param <U> the type of the result of the function 285 */ 286 public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 287 288 /** 289 * Applies this function to an input, or throws an exception if unable to do so. 290 * 291 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 292 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 293 * not before this method completes), even if this method throws or the pipeline is cancelled. 294 */ 295 @ParametricNullness 296 U apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 297 } 298 299 /** 300 * A function from an input to a {@link ClosingFuture} of a result. 301 * 302 * @param <T> the type of the input to the function 303 * @param <U> the type of the result of the function 304 */ 305 public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 306 /** 307 * Applies this function to an input, or throws an exception if unable to do so. 308 * 309 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 310 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 311 * not before this method completes), even if this method throws or the pipeline is cancelled. 312 */ 313 ClosingFuture<U> apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 314 } 315 316 /** 317 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 318 * allows the user to close all the closeable objects that were captured during it for later 319 * closing. 320 * 321 * <p>The asynchronous operation will have completed before this object is created. 322 * 323 * @param <V> the type of the value of a successful operation 324 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 325 */ 326 public static final class ValueAndCloser<V extends @Nullable Object> { 327 328 private final ClosingFuture<? extends V> closingFuture; 329 330 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 331 this.closingFuture = checkNotNull(closingFuture); 332 } 333 334 /** 335 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 336 * {@link Future#get()} would. 337 * 338 * <p>Because the asynchronous operation has already completed, this method is synchronous and 339 * returns immediately. 340 * 341 * @throws CancellationException if the computation was cancelled 342 * @throws ExecutionException if the computation threw an exception 343 */ 344 @ParametricNullness 345 public V get() throws ExecutionException { 346 return getDone(closingFuture.future); 347 } 348 349 /** 350 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 351 * operation on the {@link Executor}s specified by calls to {@link 352 * DeferredCloser#eventuallyClose(Object, Executor)}. 353 * 354 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 355 * closed synchronously. 356 * 357 * <p>Idempotent: objects will be closed at most once. 358 */ 359 public void closeAsync() { 360 closingFuture.close(); 361 } 362 } 363 364 /** 365 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 366 * ClosingFuture} pipeline. 367 * 368 * @param <V> the type of the final value of a successful pipeline 369 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 370 */ 371 public interface ValueAndCloserConsumer<V extends @Nullable Object> { 372 373 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 374 void accept(ValueAndCloser<V> valueAndCloser); 375 } 376 377 /** 378 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 379 * 380 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 381 * execution 382 */ 383 public static <V extends @Nullable Object> ClosingFuture<V> submit( 384 ClosingCallable<V> callable, Executor executor) { 385 checkNotNull(callable); 386 CloseableList closeables = new CloseableList(); 387 TrustedListenableFutureTask<V> task = 388 TrustedListenableFutureTask.create( 389 new Callable<V>() { 390 @Override 391 @ParametricNullness 392 public V call() throws Exception { 393 return callable.call(closeables.closer); 394 } 395 396 @Override 397 public String toString() { 398 return callable.toString(); 399 } 400 }); 401 executor.execute(task); 402 return new ClosingFuture<>(task, closeables); 403 } 404 405 /** 406 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 407 * 408 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 409 * execution 410 * @since 30.1 411 */ 412 public static <V extends @Nullable Object> ClosingFuture<V> submitAsync( 413 AsyncClosingCallable<V> callable, Executor executor) { 414 checkNotNull(callable); 415 CloseableList closeables = new CloseableList(); 416 TrustedListenableFutureTask<V> task = 417 TrustedListenableFutureTask.create( 418 new AsyncCallable<V>() { 419 @Override 420 public ListenableFuture<V> call() throws Exception { 421 CloseableList newCloseables = new CloseableList(); 422 try { 423 ClosingFuture<V> closingFuture = callable.call(newCloseables.closer); 424 closingFuture.becomeSubsumedInto(closeables); 425 return closingFuture.future; 426 } finally { 427 closeables.add(newCloseables, directExecutor()); 428 } 429 } 430 431 @Override 432 public String toString() { 433 return callable.toString(); 434 } 435 }); 436 executor.execute(task); 437 return new ClosingFuture<>(task, closeables); 438 } 439 440 /** 441 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 442 * 443 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 444 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 445 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 446 */ 447 public static <V extends @Nullable Object> ClosingFuture<V> from(ListenableFuture<V> future) { 448 return new ClosingFuture<>(future); 449 } 450 451 /** 452 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 453 * 454 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)}) when 455 * the pipeline is done, even if the pipeline is canceled or fails. 456 * 457 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 458 * value in order to close it. 459 * 460 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 461 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Object, 462 * Executor)}. 463 * @param closingExecutor the future's result will be closed on this executor 464 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 465 * underlying value may never be closed if the {@link Future} is canceled after its operation 466 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 467 * including those that pass them to this method, with {@link #submit(ClosingCallable, 468 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 469 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 470 * ClosingFuture#from}. 471 */ 472 @Deprecated 473 public static <C extends @Nullable Object & @Nullable AutoCloseable> 474 ClosingFuture<C> eventuallyClosing( 475 ListenableFuture<C> future, final Executor closingExecutor) { 476 checkNotNull(closingExecutor); 477 final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 478 Futures.addCallback( 479 future, 480 new FutureCallback<@Nullable AutoCloseable>() { 481 @Override 482 public void onSuccess(@CheckForNull AutoCloseable result) { 483 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 484 } 485 486 @Override 487 public void onFailure(Throwable t) {} 488 }, 489 directExecutor()); 490 return closingFuture; 491 } 492 493 /** 494 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 495 * 496 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 497 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 498 */ 499 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 500 return new Combiner(false, futures); 501 } 502 503 /** 504 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 505 * 506 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 507 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 508 */ 509 public static Combiner whenAllComplete( 510 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 511 return whenAllComplete(asList(future1, moreFutures)); 512 } 513 514 /** 515 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 516 * all succeed. If any fail, the resulting pipeline will fail. 517 * 518 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 519 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 520 */ 521 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 522 return new Combiner(true, futures); 523 } 524 525 /** 526 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 527 * they all succeed. If any fail, the resulting pipeline will fail. 528 * 529 * <p>Calling this method allows you to use lambdas or method references typed with the types of 530 * the input {@link ClosingFuture}s. 531 * 532 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 533 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 534 */ 535 public static <V1 extends @Nullable Object, V2 extends @Nullable Object> 536 Combiner2<V1, V2> whenAllSucceed(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 537 return new Combiner2<>(future1, future2); 538 } 539 540 /** 541 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 542 * they all succeed. If any fail, the resulting pipeline will fail. 543 * 544 * <p>Calling this method allows you to use lambdas or method references typed with the types of 545 * the input {@link ClosingFuture}s. 546 * 547 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 548 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 549 */ 550 public static < 551 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 552 Combiner3<V1, V2, V3> whenAllSucceed( 553 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 554 return new Combiner3<>(future1, future2, future3); 555 } 556 557 /** 558 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 559 * they all succeed. If any fail, the resulting pipeline will fail. 560 * 561 * <p>Calling this method allows you to use lambdas or method references typed with the types of 562 * the input {@link ClosingFuture}s. 563 * 564 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 565 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 566 */ 567 public static < 568 V1 extends @Nullable Object, 569 V2 extends @Nullable Object, 570 V3 extends @Nullable Object, 571 V4 extends @Nullable Object> 572 Combiner4<V1, V2, V3, V4> whenAllSucceed( 573 ClosingFuture<V1> future1, 574 ClosingFuture<V2> future2, 575 ClosingFuture<V3> future3, 576 ClosingFuture<V4> future4) { 577 return new Combiner4<>(future1, future2, future3, future4); 578 } 579 580 /** 581 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 582 * they all succeed. If any fail, the resulting pipeline will fail. 583 * 584 * <p>Calling this method allows you to use lambdas or method references typed with the types of 585 * the input {@link ClosingFuture}s. 586 * 587 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 588 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 589 */ 590 public static < 591 V1 extends @Nullable Object, 592 V2 extends @Nullable Object, 593 V3 extends @Nullable Object, 594 V4 extends @Nullable Object, 595 V5 extends @Nullable Object> 596 Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 597 ClosingFuture<V1> future1, 598 ClosingFuture<V2> future2, 599 ClosingFuture<V3> future3, 600 ClosingFuture<V4> future4, 601 ClosingFuture<V5> future5) { 602 return new Combiner5<>(future1, future2, future3, future4, future5); 603 } 604 605 /** 606 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 607 * all succeed. If any fail, the resulting pipeline will fail. 608 * 609 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 610 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 611 */ 612 public static Combiner whenAllSucceed( 613 ClosingFuture<?> future1, 614 ClosingFuture<?> future2, 615 ClosingFuture<?> future3, 616 ClosingFuture<?> future4, 617 ClosingFuture<?> future5, 618 ClosingFuture<?> future6, 619 ClosingFuture<?>... moreFutures) { 620 return whenAllSucceed( 621 FluentIterable.of(future1, future2, future3, future4, future5, future6) 622 .append(moreFutures)); 623 } 624 625 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 626 private final CloseableList closeables; 627 private final FluentFuture<V> future; 628 629 private ClosingFuture(ListenableFuture<V> future) { 630 this(future, new CloseableList()); 631 } 632 633 private ClosingFuture(ListenableFuture<V> future, CloseableList closeables) { 634 this.future = FluentFuture.from(future); 635 this.closeables = closeables; 636 } 637 638 /** 639 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 640 * future returns {@code null} if the step is successful or throws the same exception that would 641 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 642 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 643 * 644 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 645 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 646 * a derivation method <i>on the same instance</i>. This is important because calling {@code 647 * statusFuture} alone does not provide a way to close the pipeline. 648 */ 649 public ListenableFuture<?> statusFuture() { 650 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 651 } 652 653 /** 654 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 655 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 656 * when the pipeline is done. 657 * 658 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 659 * ClosingFuture} will be equivalent to this one. 660 * 661 * <p>If the function throws an exception, that exception is used as the result of the derived 662 * {@code ClosingFuture}. 663 * 664 * <p>Example usage: 665 * 666 * <pre>{@code 667 * ClosingFuture<List<Row>> rowsFuture = 668 * queryFuture.transform((closer, result) -> result.getRows(), executor); 669 * }</pre> 670 * 671 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 672 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 673 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 674 * 675 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 676 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 677 * the original {@code ClosingFuture} instance. 678 * 679 * @param function transforms the value of this step to the value of the derived step 680 * @param executor executor to run the function in 681 * @return the derived step 682 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 683 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 684 * finished} 685 */ 686 public <U extends @Nullable Object> ClosingFuture<U> transform( 687 final ClosingFunction<? super V, U> function, Executor executor) { 688 checkNotNull(function); 689 AsyncFunction<V, U> applyFunction = 690 new AsyncFunction<V, U>() { 691 @Override 692 public ListenableFuture<U> apply(V input) throws Exception { 693 return closeables.applyClosingFunction(function, input); 694 } 695 696 @Override 697 public String toString() { 698 return function.toString(); 699 } 700 }; 701 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 702 return derive(future.transformAsync(applyFunction, executor)); 703 } 704 705 /** 706 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 707 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 708 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 709 * captured by the returned {@link ClosingFuture}). 710 * 711 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 712 * returned by the function. 713 * 714 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 715 * ClosingFuture} will be equivalent to this one. 716 * 717 * <p>If the function throws an exception, that exception is used as the result of the derived 718 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 719 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 720 * closed. 721 * 722 * <p>Usage guidelines for this method: 723 * 724 * <ul> 725 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 726 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 727 * Executor)} instead, with a function that returns the next value directly. 728 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 729 * for every closeable object this step creates in order to capture it for later closing. 730 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 731 * ClosingFuture} call {@link #from(ListenableFuture)}. 732 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 733 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 734 * {@link #withoutCloser(AsyncFunction)} 735 * </ul> 736 * 737 * <p>Example usage: 738 * 739 * <pre>{@code 740 * // Result.getRowsClosingFuture() returns a ClosingFuture. 741 * ClosingFuture<List<Row>> rowsFuture = 742 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 743 * 744 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 745 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 746 * // Closeable). 747 * ClosingFuture<Integer> rowsFuture2 = 748 * queryFuture.transformAsync( 749 * (closer, result) -> { 750 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 751 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 752 * }, 753 * executor); 754 * 755 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 756 * ClosingFuture<List<Row>> rowsFuture3 = 757 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 758 * 759 * }</pre> 760 * 761 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 762 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 763 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 764 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 765 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 766 * responsible for completing the returned {@code ClosingFuture}.) 767 * 768 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 769 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 770 * the original {@code ClosingFuture} instance. 771 * 772 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 773 * the derived step 774 * @param executor executor to run the function in 775 * @return the derived step 776 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 777 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 778 * finished} 779 */ 780 public <U extends @Nullable Object> ClosingFuture<U> transformAsync( 781 final AsyncClosingFunction<? super V, U> function, Executor executor) { 782 checkNotNull(function); 783 AsyncFunction<V, U> applyFunction = 784 new AsyncFunction<V, U>() { 785 @Override 786 public ListenableFuture<U> apply(V input) throws Exception { 787 return closeables.applyAsyncClosingFunction(function, input); 788 } 789 790 @Override 791 public String toString() { 792 return function.toString(); 793 } 794 }; 795 return derive(future.transformAsync(applyFunction, executor)); 796 } 797 798 /** 799 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 800 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 801 * {@link ListenableFuture}. 802 * 803 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 804 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 805 * meets these conditions: 806 * 807 * <ul> 808 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 809 * DeferredCloser#eventuallyClose(Object, Executor)}. 810 * <li>It returns a {@link ListenableFuture}. 811 * </ul> 812 * 813 * <p>Example usage: 814 * 815 * <pre>{@code 816 * // Result.getRowsFuture() returns a ListenableFuture. 817 * ClosingFuture<List<Row>> rowsFuture = 818 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 819 * }</pre> 820 * 821 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 822 * ListenableFuture} with the value of a derived step 823 */ 824 public static <V extends @Nullable Object, U extends @Nullable Object> 825 AsyncClosingFunction<V, U> withoutCloser(final AsyncFunction<V, U> function) { 826 checkNotNull(function); 827 return new AsyncClosingFunction<V, U>() { 828 @Override 829 public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 830 return ClosingFuture.from(function.apply(input)); 831 } 832 }; 833 } 834 835 /** 836 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 837 * to its exception if it is an instance of a given exception type. The function can use a {@link 838 * DeferredCloser} to capture objects to be closed when the pipeline is done. 839 * 840 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 841 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 842 * one. 843 * 844 * <p>If the function throws an exception, that exception is used as the result of the derived 845 * {@code ClosingFuture}. 846 * 847 * <p>Example usage: 848 * 849 * <pre>{@code 850 * ClosingFuture<QueryResult> queryFuture = 851 * queryFuture.catching( 852 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 853 * }</pre> 854 * 855 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 856 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 857 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 858 * 859 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 860 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 861 * the original {@code ClosingFuture} instance. 862 * 863 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 864 * type is matched against this step's exception. "This step's exception" means the cause of 865 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 866 * underlying this step or, if {@code get()} throws a different kind of exception, that 867 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 868 * prefer more specific types, avoiding {@code Throwable.class} in particular. 869 * @param fallback the function to be called if this step fails with the expected exception type. 870 * The function's argument is this step's exception. "This step's exception" means the cause 871 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 872 * underlying this step or, if {@code get()} throws a different kind of exception, that 873 * exception itself. 874 * @param executor the executor that runs {@code fallback} if the input fails 875 */ 876 public <X extends Throwable> ClosingFuture<V> catching( 877 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 878 return catchingMoreGeneric(exceptionType, fallback, executor); 879 } 880 881 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 882 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 883 Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 884 checkNotNull(fallback); 885 AsyncFunction<X, W> applyFallback = 886 new AsyncFunction<X, W>() { 887 @Override 888 public ListenableFuture<W> apply(X exception) throws Exception { 889 return closeables.applyClosingFunction(fallback, exception); 890 } 891 892 @Override 893 public String toString() { 894 return fallback.toString(); 895 } 896 }; 897 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 898 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 899 } 900 901 /** 902 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 903 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 904 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 905 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 906 * 907 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 908 * ClosingFuture} will be equivalent to the one returned by the function. 909 * 910 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 911 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 912 * one. 913 * 914 * <p>If the function throws an exception, that exception is used as the result of the derived 915 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 916 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 917 * closed. 918 * 919 * <p>Usage guidelines for this method: 920 * 921 * <ul> 922 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 923 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 924 * ClosingFunction, Executor)} instead, with a function that returns the next value 925 * directly. 926 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 927 * for every closeable object this step creates in order to capture it for later closing. 928 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 929 * ClosingFuture} call {@link #from(ListenableFuture)}. 930 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 931 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 932 * {@link #withoutCloser(AsyncFunction)} 933 * </ul> 934 * 935 * <p>Example usage: 936 * 937 * <pre>{@code 938 * // Fall back to a secondary input stream in case of IOException. 939 * ClosingFuture<InputStream> inputFuture = 940 * firstInputFuture.catchingAsync( 941 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 942 * } 943 * }</pre> 944 * 945 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 946 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 947 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 948 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 949 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 950 * responsible for completing the returned {@code ClosingFuture}.) 951 * 952 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 953 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 954 * the original {@code ClosingFuture} instance. 955 * 956 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 957 * type is matched against this step's exception. "This step's exception" means the cause of 958 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 959 * underlying this step or, if {@code get()} throws a different kind of exception, that 960 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 961 * prefer more specific types, avoiding {@code Throwable.class} in particular. 962 * @param fallback the function to be called if this step fails with the expected exception type. 963 * The function's argument is this step's exception. "This step's exception" means the cause 964 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 965 * underlying this step or, if {@code get()} throws a different kind of exception, that 966 * exception itself. 967 * @param executor the executor that runs {@code fallback} if the input fails 968 */ 969 // TODO(dpb): Should this do something special if the function throws CancellationException or 970 // ExecutionException? 971 public <X extends Throwable> ClosingFuture<V> catchingAsync( 972 Class<X> exceptionType, 973 AsyncClosingFunction<? super X, ? extends V> fallback, 974 Executor executor) { 975 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 976 } 977 978 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 979 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 980 Class<X> exceptionType, 981 final AsyncClosingFunction<? super X, W> fallback, 982 Executor executor) { 983 checkNotNull(fallback); 984 AsyncFunction<X, W> asyncFunction = 985 new AsyncFunction<X, W>() { 986 @Override 987 public ListenableFuture<W> apply(X exception) throws Exception { 988 return closeables.applyAsyncClosingFunction(fallback, exception); 989 } 990 991 @Override 992 public String toString() { 993 return fallback.toString(); 994 } 995 }; 996 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 997 } 998 999 /** 1000 * Marks this step as the last step in the {@code ClosingFuture} pipeline. 1001 * 1002 * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when 1003 * the pipeline is cancelled. 1004 * 1005 * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously 1006 * <b>after</b> the returned {@code Future} is done: the future completes before closing starts, 1007 * rather than once it has finished. 1008 * 1009 * <p>After calling this method, you may not call {@link 1010 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 1011 * derivation method on the original {@code ClosingFuture} instance. 1012 * 1013 * @return a {@link Future} that represents the final value or exception of the pipeline 1014 */ 1015 public FluentFuture<V> finishToFuture() { 1016 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 1017 logger.get().log(FINER, "will close {0}", this); 1018 future.addListener( 1019 new Runnable() { 1020 @Override 1021 public void run() { 1022 checkAndUpdateState(WILL_CLOSE, CLOSING); 1023 close(); 1024 checkAndUpdateState(CLOSING, CLOSED); 1025 } 1026 }, 1027 directExecutor()); 1028 } else { 1029 switch (state.get()) { 1030 case SUBSUMED: 1031 throw new IllegalStateException( 1032 "Cannot call finishToFuture() after deriving another step"); 1033 1034 case WILL_CREATE_VALUE_AND_CLOSER: 1035 throw new IllegalStateException( 1036 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 1037 1038 case WILL_CLOSE: 1039 case CLOSING: 1040 case CLOSED: 1041 throw new IllegalStateException("Cannot call finishToFuture() twice"); 1042 1043 case OPEN: 1044 throw new AssertionError(); 1045 } 1046 } 1047 return future; 1048 } 1049 1050 /** 1051 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 1052 * {@code receiver} will be called with an object that contains the result of the operation. The 1053 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 1054 * 1055 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 1056 * any other derivation method on the original {@code ClosingFuture} instance. 1057 * 1058 * @param consumer a callback whose method will be called (using {@code executor}) when this 1059 * operation is done 1060 */ 1061 public void finishToValueAndCloser( 1062 final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 1063 checkNotNull(consumer); 1064 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 1065 switch (state.get()) { 1066 case SUBSUMED: 1067 throw new IllegalStateException( 1068 "Cannot call finishToValueAndCloser() after deriving another step"); 1069 1070 case WILL_CLOSE: 1071 case CLOSING: 1072 case CLOSED: 1073 throw new IllegalStateException( 1074 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1075 1076 case WILL_CREATE_VALUE_AND_CLOSER: 1077 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1078 1079 case OPEN: 1080 break; 1081 } 1082 throw new AssertionError(state); 1083 } 1084 future.addListener( 1085 new Runnable() { 1086 @Override 1087 public void run() { 1088 provideValueAndCloser(consumer, ClosingFuture.this); 1089 } 1090 }, 1091 executor); 1092 } 1093 1094 private static <C extends @Nullable Object, V extends C> void provideValueAndCloser( 1095 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1096 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1097 } 1098 1099 /** 1100 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1101 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1102 * successful, and this step has not started when {@code cancel} is called, this step should never 1103 * run. 1104 * 1105 * <p>If successful, causes the objects captured by this step (if already started) and its input 1106 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1107 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1108 * 1109 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1110 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1111 * cancelled regardless 1112 * @return {@code false} if the step could not be cancelled, typically because it has already 1113 * completed normally; {@code true} otherwise 1114 */ 1115 @CanIgnoreReturnValue 1116 public boolean cancel(boolean mayInterruptIfRunning) { 1117 logger.get().log(FINER, "cancelling {0}", this); 1118 boolean cancelled = future.cancel(mayInterruptIfRunning); 1119 if (cancelled) { 1120 close(); 1121 } 1122 return cancelled; 1123 } 1124 1125 private void close() { 1126 logger.get().log(FINER, "closing {0}", this); 1127 closeables.close(); 1128 } 1129 1130 private <U extends @Nullable Object> ClosingFuture<U> derive(FluentFuture<U> future) { 1131 ClosingFuture<U> derived = new ClosingFuture<>(future); 1132 becomeSubsumedInto(derived.closeables); 1133 return derived; 1134 } 1135 1136 private void becomeSubsumedInto(CloseableList otherCloseables) { 1137 checkAndUpdateState(OPEN, SUBSUMED); 1138 otherCloseables.add(closeables, directExecutor()); 1139 } 1140 1141 /** 1142 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1143 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1144 * 1145 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1146 */ 1147 public static final class Peeker { 1148 private final ImmutableList<ClosingFuture<?>> futures; 1149 private volatile boolean beingCalled; 1150 1151 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1152 this.futures = checkNotNull(futures); 1153 } 1154 1155 /** 1156 * Returns the value of {@code closingFuture}. 1157 * 1158 * @throws ExecutionException if {@code closingFuture} is a failed step 1159 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1160 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1161 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1162 * @throws IllegalStateException if called outside of a call to {@link 1163 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1164 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1165 */ 1166 @ParametricNullness 1167 public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture) 1168 throws ExecutionException { 1169 checkState(beingCalled); 1170 checkArgument(futures.contains(closingFuture)); 1171 return Futures.getDone(closingFuture.future); 1172 } 1173 1174 @ParametricNullness 1175 private <V extends @Nullable Object> V call( 1176 CombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1177 beingCalled = true; 1178 CloseableList newCloseables = new CloseableList(); 1179 try { 1180 return combiner.call(newCloseables.closer, this); 1181 } finally { 1182 closeables.add(newCloseables, directExecutor()); 1183 beingCalled = false; 1184 } 1185 } 1186 1187 private <V extends @Nullable Object> FluentFuture<V> callAsync( 1188 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1189 beingCalled = true; 1190 CloseableList newCloseables = new CloseableList(); 1191 try { 1192 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1193 closingFuture.becomeSubsumedInto(closeables); 1194 return closingFuture.future; 1195 } finally { 1196 closeables.add(newCloseables, directExecutor()); 1197 beingCalled = false; 1198 } 1199 } 1200 } 1201 1202 /** 1203 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1204 * 1205 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1206 * instantiate this class. 1207 * 1208 * <p>Example: 1209 * 1210 * <pre>{@code 1211 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1212 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1213 * ListenableFuture<Integer> numberOfDifferentLines = 1214 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1215 * .call( 1216 * (closer, peeker) -> { 1217 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1218 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1219 * return countDifferentLines(file1Reader, file2Reader); 1220 * }, 1221 * executor) 1222 * .closing(executor); 1223 * }</pre> 1224 */ 1225 // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8. 1226 @com.google.errorprone.annotations.DoNotMock( 1227 "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1228 public static class Combiner { 1229 1230 private final CloseableList closeables = new CloseableList(); 1231 1232 /** 1233 * An operation that returns a result and may throw an exception. 1234 * 1235 * @param <V> the type of the result 1236 */ 1237 public interface CombiningCallable<V extends @Nullable Object> { 1238 /** 1239 * Computes a result, or throws an exception if unable to do so. 1240 * 1241 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1242 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1243 * (but not before this method completes), even if this method throws or the pipeline is 1244 * cancelled. 1245 * 1246 * @param peeker used to get the value of any of the input futures 1247 */ 1248 @ParametricNullness 1249 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1250 } 1251 1252 /** 1253 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1254 * 1255 * @param <V> the type of the result 1256 */ 1257 public interface AsyncCombiningCallable<V extends @Nullable Object> { 1258 /** 1259 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1260 * 1261 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1262 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1263 * (but not before this method completes), even if this method throws or the pipeline is 1264 * cancelled. 1265 * 1266 * @param peeker used to get the value of any of the input futures 1267 */ 1268 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1269 } 1270 1271 private final boolean allMustSucceed; 1272 protected final ImmutableList<ClosingFuture<?>> inputs; 1273 1274 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1275 this.allMustSucceed = allMustSucceed; 1276 this.inputs = ImmutableList.copyOf(inputs); 1277 for (ClosingFuture<?> input : inputs) { 1278 input.becomeSubsumedInto(closeables); 1279 } 1280 } 1281 1282 /** 1283 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1284 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1285 * objects to be closed when the pipeline is done. 1286 * 1287 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1288 * fail, so will the returned step. 1289 * 1290 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1291 * cancelled. 1292 * 1293 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1294 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1295 */ 1296 public <V extends @Nullable Object> ClosingFuture<V> call( 1297 final CombiningCallable<V> combiningCallable, Executor executor) { 1298 Callable<V> callable = 1299 new Callable<V>() { 1300 @Override 1301 @ParametricNullness 1302 public V call() throws Exception { 1303 return new Peeker(inputs).call(combiningCallable, closeables); 1304 } 1305 1306 @Override 1307 public String toString() { 1308 return combiningCallable.toString(); 1309 } 1310 }; 1311 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1312 derived.closeables.add(closeables, directExecutor()); 1313 return derived; 1314 } 1315 1316 /** 1317 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1318 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1319 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1320 * captured by the returned {@link ClosingFuture}). 1321 * 1322 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1323 * fail, so will the returned step. 1324 * 1325 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1326 * cancelled. 1327 * 1328 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1329 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1330 * 1331 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1332 * derived step. 1333 * 1334 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1335 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1336 * 1337 * <p>Usage guidelines for this method: 1338 * 1339 * <ul> 1340 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1341 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1342 * Executor)} instead, with a function that returns the next value directly. 1343 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1344 * for every closeable object this step creates in order to capture it for later closing. 1345 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1346 * ClosingFuture} call {@link #from(ListenableFuture)}. 1347 * </ul> 1348 * 1349 * <p>The same warnings about doing heavyweight operations within {@link 1350 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1351 */ 1352 public <V extends @Nullable Object> ClosingFuture<V> callAsync( 1353 final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1354 AsyncCallable<V> asyncCallable = 1355 new AsyncCallable<V>() { 1356 @Override 1357 public ListenableFuture<V> call() throws Exception { 1358 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1359 } 1360 1361 @Override 1362 public String toString() { 1363 return combiningCallable.toString(); 1364 } 1365 }; 1366 ClosingFuture<V> derived = 1367 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1368 derived.closeables.add(closeables, directExecutor()); 1369 return derived; 1370 } 1371 1372 private FutureCombiner<@Nullable Object> futureCombiner() { 1373 return allMustSucceed 1374 ? Futures.whenAllSucceed(inputFutures()) 1375 : Futures.whenAllComplete(inputFutures()); 1376 } 1377 1378 1379 private ImmutableList<FluentFuture<?>> inputFutures() { 1380 return FluentIterable.from(inputs) 1381 .<FluentFuture<?>>transform(future -> future.future) 1382 .toList(); 1383 } 1384 } 1385 1386 /** 1387 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1388 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1389 * combination. 1390 * 1391 * @param <V1> the type returned by the first future 1392 * @param <V2> the type returned by the second future 1393 */ 1394 public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object> 1395 extends Combiner { 1396 1397 /** 1398 * A function that returns a value when applied to the values of the two futures passed to 1399 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1400 * 1401 * @param <V1> the type returned by the first future 1402 * @param <V2> the type returned by the second future 1403 * @param <U> the type returned by the function 1404 */ 1405 public interface ClosingFunction2< 1406 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1407 1408 /** 1409 * Applies this function to two inputs, or throws an exception if unable to do so. 1410 * 1411 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1412 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1413 * (but not before this method completes), even if this method throws or the pipeline is 1414 * cancelled. 1415 */ 1416 @ParametricNullness 1417 U apply(DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1418 throws Exception; 1419 } 1420 1421 /** 1422 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1423 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1424 * 1425 * @param <V1> the type returned by the first future 1426 * @param <V2> the type returned by the second future 1427 * @param <U> the type returned by the function 1428 */ 1429 public interface AsyncClosingFunction2< 1430 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1431 1432 /** 1433 * Applies this function to two inputs, or throws an exception if unable to do so. 1434 * 1435 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1436 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1437 * (but not before this method completes), even if this method throws or the pipeline is 1438 * cancelled. 1439 */ 1440 ClosingFuture<U> apply( 1441 DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1442 throws Exception; 1443 } 1444 1445 private final ClosingFuture<V1> future1; 1446 private final ClosingFuture<V2> future2; 1447 1448 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1449 super(true, ImmutableList.of(future1, future2)); 1450 this.future1 = future1; 1451 this.future2 = future2; 1452 } 1453 1454 /** 1455 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1456 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1457 * objects to be closed when the pipeline is done. 1458 * 1459 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1460 * any of the inputs fail, so will the returned step. 1461 * 1462 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1463 * 1464 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1465 * ExecutionException} will be extracted and used as the failure of the derived step. 1466 */ 1467 public <U extends @Nullable Object> ClosingFuture<U> call( 1468 final ClosingFunction2<V1, V2, U> function, Executor executor) { 1469 return call( 1470 new CombiningCallable<U>() { 1471 @Override 1472 @ParametricNullness 1473 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1474 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1475 } 1476 1477 @Override 1478 public String toString() { 1479 return function.toString(); 1480 } 1481 }, 1482 executor); 1483 } 1484 1485 /** 1486 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1487 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1488 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1489 * captured by the returned {@link ClosingFuture}). 1490 * 1491 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1492 * any of the inputs fail, so will the returned step. 1493 * 1494 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1495 * 1496 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1497 * ExecutionException} will be extracted and used as the failure of the derived step. 1498 * 1499 * <p>If the function throws any other exception, it will be used as the failure of the derived 1500 * step. 1501 * 1502 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1503 * the closeable objects in that {@code ClosingFuture} will be closed. 1504 * 1505 * <p>Usage guidelines for this method: 1506 * 1507 * <ul> 1508 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1509 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1510 * Executor)} instead, with a function that returns the next value directly. 1511 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1512 * for every closeable object this step creates in order to capture it for later closing. 1513 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1514 * ClosingFuture} call {@link #from(ListenableFuture)}. 1515 * </ul> 1516 * 1517 * <p>The same warnings about doing heavyweight operations within {@link 1518 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1519 */ 1520 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1521 final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1522 return callAsync( 1523 new AsyncCombiningCallable<U>() { 1524 @Override 1525 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1526 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1527 } 1528 1529 @Override 1530 public String toString() { 1531 return function.toString(); 1532 } 1533 }, 1534 executor); 1535 } 1536 } 1537 1538 /** 1539 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1540 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1541 * ClosingFuture)} to start this combination. 1542 * 1543 * @param <V1> the type returned by the first future 1544 * @param <V2> the type returned by the second future 1545 * @param <V3> the type returned by the third future 1546 */ 1547 public static final class Combiner3< 1548 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 1549 extends Combiner { 1550 /** 1551 * A function that returns a value when applied to the values of the three futures passed to 1552 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1553 * 1554 * @param <V1> the type returned by the first future 1555 * @param <V2> the type returned by the second future 1556 * @param <V3> the type returned by the third future 1557 * @param <U> the type returned by the function 1558 */ 1559 public interface ClosingFunction3< 1560 V1 extends @Nullable Object, 1561 V2 extends @Nullable Object, 1562 V3 extends @Nullable Object, 1563 U extends @Nullable Object> { 1564 /** 1565 * Applies this function to three inputs, or throws an exception if unable to do so. 1566 * 1567 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1568 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1569 * (but not before this method completes), even if this method throws or the pipeline is 1570 * cancelled. 1571 */ 1572 @ParametricNullness 1573 U apply( 1574 DeferredCloser closer, 1575 @ParametricNullness V1 value1, 1576 @ParametricNullness V2 value2, 1577 @ParametricNullness V3 value3) 1578 throws Exception; 1579 } 1580 1581 /** 1582 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1583 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1584 * 1585 * @param <V1> the type returned by the first future 1586 * @param <V2> the type returned by the second future 1587 * @param <V3> the type returned by the third future 1588 * @param <U> the type returned by the function 1589 */ 1590 public interface AsyncClosingFunction3< 1591 V1 extends @Nullable Object, 1592 V2 extends @Nullable Object, 1593 V3 extends @Nullable Object, 1594 U extends @Nullable Object> { 1595 /** 1596 * Applies this function to three inputs, or throws an exception if unable to do so. 1597 * 1598 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1599 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1600 * (but not before this method completes), even if this method throws or the pipeline is 1601 * cancelled. 1602 */ 1603 ClosingFuture<U> apply( 1604 DeferredCloser closer, 1605 @ParametricNullness V1 value1, 1606 @ParametricNullness V2 value2, 1607 @ParametricNullness V3 value3) 1608 throws Exception; 1609 } 1610 1611 private final ClosingFuture<V1> future1; 1612 private final ClosingFuture<V2> future2; 1613 private final ClosingFuture<V3> future3; 1614 1615 private Combiner3( 1616 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1617 super(true, ImmutableList.of(future1, future2, future3)); 1618 this.future1 = future1; 1619 this.future2 = future2; 1620 this.future3 = future3; 1621 } 1622 1623 /** 1624 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1625 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1626 * objects to be closed when the pipeline is done. 1627 * 1628 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1629 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1630 * 1631 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1632 * 1633 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1634 * ExecutionException} will be extracted and used as the failure of the derived step. 1635 */ 1636 public <U extends @Nullable Object> ClosingFuture<U> call( 1637 final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1638 return call( 1639 new CombiningCallable<U>() { 1640 @Override 1641 @ParametricNullness 1642 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1643 return function.apply( 1644 closer, 1645 peeker.getDone(future1), 1646 peeker.getDone(future2), 1647 peeker.getDone(future3)); 1648 } 1649 1650 @Override 1651 public String toString() { 1652 return function.toString(); 1653 } 1654 }, 1655 executor); 1656 } 1657 1658 /** 1659 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1660 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1661 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1662 * captured by the returned {@link ClosingFuture}). 1663 * 1664 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1665 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1666 * 1667 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1668 * 1669 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1670 * ExecutionException} will be extracted and used as the failure of the derived step. 1671 * 1672 * <p>If the function throws any other exception, it will be used as the failure of the derived 1673 * step. 1674 * 1675 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1676 * the closeable objects in that {@code ClosingFuture} will be closed. 1677 * 1678 * <p>Usage guidelines for this method: 1679 * 1680 * <ul> 1681 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1682 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1683 * Executor)} instead, with a function that returns the next value directly. 1684 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1685 * for every closeable object this step creates in order to capture it for later closing. 1686 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1687 * ClosingFuture} call {@link #from(ListenableFuture)}. 1688 * </ul> 1689 * 1690 * <p>The same warnings about doing heavyweight operations within {@link 1691 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1692 */ 1693 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1694 final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1695 return callAsync( 1696 new AsyncCombiningCallable<U>() { 1697 @Override 1698 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1699 return function.apply( 1700 closer, 1701 peeker.getDone(future1), 1702 peeker.getDone(future2), 1703 peeker.getDone(future3)); 1704 } 1705 1706 @Override 1707 public String toString() { 1708 return function.toString(); 1709 } 1710 }, 1711 executor); 1712 } 1713 } 1714 1715 /** 1716 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1717 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1718 * ClosingFuture)} to start this combination. 1719 * 1720 * @param <V1> the type returned by the first future 1721 * @param <V2> the type returned by the second future 1722 * @param <V3> the type returned by the third future 1723 * @param <V4> the type returned by the fourth future 1724 */ 1725 public static final class Combiner4< 1726 V1 extends @Nullable Object, 1727 V2 extends @Nullable Object, 1728 V3 extends @Nullable Object, 1729 V4 extends @Nullable Object> 1730 extends Combiner { 1731 /** 1732 * A function that returns a value when applied to the values of the four futures passed to 1733 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1734 * 1735 * @param <V1> the type returned by the first future 1736 * @param <V2> the type returned by the second future 1737 * @param <V3> the type returned by the third future 1738 * @param <V4> the type returned by the fourth future 1739 * @param <U> the type returned by the function 1740 */ 1741 public interface ClosingFunction4< 1742 V1 extends @Nullable Object, 1743 V2 extends @Nullable Object, 1744 V3 extends @Nullable Object, 1745 V4 extends @Nullable Object, 1746 U extends @Nullable Object> { 1747 /** 1748 * Applies this function to four inputs, or throws an exception if unable to do so. 1749 * 1750 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1751 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1752 * (but not before this method completes), even if this method throws or the pipeline is 1753 * cancelled. 1754 */ 1755 @ParametricNullness 1756 U apply( 1757 DeferredCloser closer, 1758 @ParametricNullness V1 value1, 1759 @ParametricNullness V2 value2, 1760 @ParametricNullness V3 value3, 1761 @ParametricNullness V4 value4) 1762 throws Exception; 1763 } 1764 1765 /** 1766 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1767 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1768 * ClosingFuture)}. 1769 * 1770 * @param <V1> the type returned by the first future 1771 * @param <V2> the type returned by the second future 1772 * @param <V3> the type returned by the third future 1773 * @param <V4> the type returned by the fourth future 1774 * @param <U> the type returned by the function 1775 */ 1776 public interface AsyncClosingFunction4< 1777 V1 extends @Nullable Object, 1778 V2 extends @Nullable Object, 1779 V3 extends @Nullable Object, 1780 V4 extends @Nullable Object, 1781 U extends @Nullable Object> { 1782 /** 1783 * Applies this function to four inputs, or throws an exception if unable to do so. 1784 * 1785 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1786 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1787 * (but not before this method completes), even if this method throws or the pipeline is 1788 * cancelled. 1789 */ 1790 ClosingFuture<U> apply( 1791 DeferredCloser closer, 1792 @ParametricNullness V1 value1, 1793 @ParametricNullness V2 value2, 1794 @ParametricNullness V3 value3, 1795 @ParametricNullness V4 value4) 1796 throws Exception; 1797 } 1798 1799 private final ClosingFuture<V1> future1; 1800 private final ClosingFuture<V2> future2; 1801 private final ClosingFuture<V3> future3; 1802 private final ClosingFuture<V4> future4; 1803 1804 private Combiner4( 1805 ClosingFuture<V1> future1, 1806 ClosingFuture<V2> future2, 1807 ClosingFuture<V3> future3, 1808 ClosingFuture<V4> future4) { 1809 super(true, ImmutableList.of(future1, future2, future3, future4)); 1810 this.future1 = future1; 1811 this.future2 = future2; 1812 this.future3 = future3; 1813 this.future4 = future4; 1814 } 1815 1816 /** 1817 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1818 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1819 * objects to be closed when the pipeline is done. 1820 * 1821 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1822 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1823 * 1824 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1825 * 1826 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1827 * ExecutionException} will be extracted and used as the failure of the derived step. 1828 */ 1829 public <U extends @Nullable Object> ClosingFuture<U> call( 1830 final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1831 return call( 1832 new CombiningCallable<U>() { 1833 @Override 1834 @ParametricNullness 1835 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1836 return function.apply( 1837 closer, 1838 peeker.getDone(future1), 1839 peeker.getDone(future2), 1840 peeker.getDone(future3), 1841 peeker.getDone(future4)); 1842 } 1843 1844 @Override 1845 public String toString() { 1846 return function.toString(); 1847 } 1848 }, 1849 executor); 1850 } 1851 1852 /** 1853 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1854 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1855 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1856 * captured by the returned {@link ClosingFuture}). 1857 * 1858 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1859 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1860 * 1861 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1862 * 1863 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1864 * ExecutionException} will be extracted and used as the failure of the derived step. 1865 * 1866 * <p>If the function throws any other exception, it will be used as the failure of the derived 1867 * step. 1868 * 1869 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1870 * the closeable objects in that {@code ClosingFuture} will be closed. 1871 * 1872 * <p>Usage guidelines for this method: 1873 * 1874 * <ul> 1875 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1876 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1877 * Executor)} instead, with a function that returns the next value directly. 1878 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1879 * for every closeable object this step creates in order to capture it for later closing. 1880 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1881 * ClosingFuture} call {@link #from(ListenableFuture)}. 1882 * </ul> 1883 * 1884 * <p>The same warnings about doing heavyweight operations within {@link 1885 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1886 */ 1887 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1888 final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1889 return callAsync( 1890 new AsyncCombiningCallable<U>() { 1891 @Override 1892 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1893 return function.apply( 1894 closer, 1895 peeker.getDone(future1), 1896 peeker.getDone(future2), 1897 peeker.getDone(future3), 1898 peeker.getDone(future4)); 1899 } 1900 1901 @Override 1902 public String toString() { 1903 return function.toString(); 1904 } 1905 }, 1906 executor); 1907 } 1908 } 1909 1910 /** 1911 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1912 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1913 * ClosingFuture, ClosingFuture)} to start this combination. 1914 * 1915 * @param <V1> the type returned by the first future 1916 * @param <V2> the type returned by the second future 1917 * @param <V3> the type returned by the third future 1918 * @param <V4> the type returned by the fourth future 1919 * @param <V5> the type returned by the fifth future 1920 */ 1921 public static final class Combiner5< 1922 V1 extends @Nullable Object, 1923 V2 extends @Nullable Object, 1924 V3 extends @Nullable Object, 1925 V4 extends @Nullable Object, 1926 V5 extends @Nullable Object> 1927 extends Combiner { 1928 /** 1929 * A function that returns a value when applied to the values of the five futures passed to 1930 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1931 * ClosingFuture)}. 1932 * 1933 * @param <V1> the type returned by the first future 1934 * @param <V2> the type returned by the second future 1935 * @param <V3> the type returned by the third future 1936 * @param <V4> the type returned by the fourth future 1937 * @param <V5> the type returned by the fifth future 1938 * @param <U> the type returned by the function 1939 */ 1940 public interface ClosingFunction5< 1941 V1 extends @Nullable Object, 1942 V2 extends @Nullable Object, 1943 V3 extends @Nullable Object, 1944 V4 extends @Nullable Object, 1945 V5 extends @Nullable Object, 1946 U extends @Nullable Object> { 1947 /** 1948 * Applies this function to five inputs, or throws an exception if unable to do so. 1949 * 1950 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1951 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1952 * (but not before this method completes), even if this method throws or the pipeline is 1953 * cancelled. 1954 */ 1955 @ParametricNullness 1956 U apply( 1957 DeferredCloser closer, 1958 @ParametricNullness V1 value1, 1959 @ParametricNullness V2 value2, 1960 @ParametricNullness V3 value3, 1961 @ParametricNullness V4 value4, 1962 @ParametricNullness V5 value5) 1963 throws Exception; 1964 } 1965 1966 /** 1967 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1968 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1969 * ClosingFuture, ClosingFuture)}. 1970 * 1971 * @param <V1> the type returned by the first future 1972 * @param <V2> the type returned by the second future 1973 * @param <V3> the type returned by the third future 1974 * @param <V4> the type returned by the fourth future 1975 * @param <V5> the type returned by the fifth future 1976 * @param <U> the type returned by the function 1977 */ 1978 public interface AsyncClosingFunction5< 1979 V1 extends @Nullable Object, 1980 V2 extends @Nullable Object, 1981 V3 extends @Nullable Object, 1982 V4 extends @Nullable Object, 1983 V5 extends @Nullable Object, 1984 U extends @Nullable Object> { 1985 /** 1986 * Applies this function to five inputs, or throws an exception if unable to do so. 1987 * 1988 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1989 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1990 * (but not before this method completes), even if this method throws or the pipeline is 1991 * cancelled. 1992 */ 1993 ClosingFuture<U> apply( 1994 DeferredCloser closer, 1995 @ParametricNullness V1 value1, 1996 @ParametricNullness V2 value2, 1997 @ParametricNullness V3 value3, 1998 @ParametricNullness V4 value4, 1999 @ParametricNullness V5 value5) 2000 throws Exception; 2001 } 2002 2003 private final ClosingFuture<V1> future1; 2004 private final ClosingFuture<V2> future2; 2005 private final ClosingFuture<V3> future3; 2006 private final ClosingFuture<V4> future4; 2007 private final ClosingFuture<V5> future5; 2008 2009 private Combiner5( 2010 ClosingFuture<V1> future1, 2011 ClosingFuture<V2> future2, 2012 ClosingFuture<V3> future3, 2013 ClosingFuture<V4> future4, 2014 ClosingFuture<V5> future5) { 2015 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 2016 this.future1 = future1; 2017 this.future2 = future2; 2018 this.future3 = future3; 2019 this.future4 = future4; 2020 this.future5 = future5; 2021 } 2022 2023 /** 2024 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2025 * combining function to their values. The function can use a {@link DeferredCloser} to capture 2026 * objects to be closed when the pipeline is done. 2027 * 2028 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2029 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2030 * returned step. 2031 * 2032 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2033 * 2034 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2035 * ExecutionException} will be extracted and used as the failure of the derived step. 2036 */ 2037 public <U extends @Nullable Object> ClosingFuture<U> call( 2038 final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2039 return call( 2040 new CombiningCallable<U>() { 2041 @Override 2042 @ParametricNullness 2043 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 2044 return function.apply( 2045 closer, 2046 peeker.getDone(future1), 2047 peeker.getDone(future2), 2048 peeker.getDone(future3), 2049 peeker.getDone(future4), 2050 peeker.getDone(future5)); 2051 } 2052 2053 @Override 2054 public String toString() { 2055 return function.toString(); 2056 } 2057 }, 2058 executor); 2059 } 2060 2061 /** 2062 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2063 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 2064 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 2065 * captured by the returned {@link ClosingFuture}). 2066 * 2067 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2068 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2069 * returned step. 2070 * 2071 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2072 * 2073 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2074 * ExecutionException} will be extracted and used as the failure of the derived step. 2075 * 2076 * <p>If the function throws any other exception, it will be used as the failure of the derived 2077 * step. 2078 * 2079 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2080 * the closeable objects in that {@code ClosingFuture} will be closed. 2081 * 2082 * <p>Usage guidelines for this method: 2083 * 2084 * <ul> 2085 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2086 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2087 * Executor)} instead, with a function that returns the next value directly. 2088 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 2089 * for every closeable object this step creates in order to capture it for later closing. 2090 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2091 * ClosingFuture} call {@link #from(ListenableFuture)}. 2092 * </ul> 2093 * 2094 * <p>The same warnings about doing heavyweight operations within {@link 2095 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2096 */ 2097 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 2098 final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2099 return callAsync( 2100 new AsyncCombiningCallable<U>() { 2101 @Override 2102 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2103 return function.apply( 2104 closer, 2105 peeker.getDone(future1), 2106 peeker.getDone(future2), 2107 peeker.getDone(future3), 2108 peeker.getDone(future4), 2109 peeker.getDone(future5)); 2110 } 2111 2112 @Override 2113 public String toString() { 2114 return function.toString(); 2115 } 2116 }, 2117 executor); 2118 } 2119 } 2120 2121 @Override 2122 public String toString() { 2123 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2124 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2125 } 2126 2127 @SuppressWarnings({"removal", "Finalize"}) // b/260137033 2128 @Override 2129 protected void finalize() { 2130 if (state.get().equals(OPEN)) { 2131 logger.get().log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2132 FluentFuture<V> unused = finishToFuture(); 2133 } 2134 } 2135 2136 private static void closeQuietly(@CheckForNull final AutoCloseable closeable, Executor executor) { 2137 if (closeable == null) { 2138 return; 2139 } 2140 try { 2141 executor.execute( 2142 () -> { 2143 try { 2144 closeable.close(); 2145 } catch (Exception e) { 2146 /* 2147 * In guava-jre, any kind of Exception may be thrown because `closeable` has type 2148 * `AutoCloseable`. 2149 * 2150 * In guava-android, the only kinds of Exception that may be thrown are 2151 * RuntimeException and IOException because `closeable` has type `Closeable`—except 2152 * that we have to account for sneaky checked exception. 2153 */ 2154 restoreInterruptIfIsInterruptedException(e); 2155 logger.get().log(WARNING, "thrown by close()", e); 2156 } 2157 }); 2158 } catch (RejectedExecutionException e) { 2159 if (logger.get().isLoggable(WARNING)) { 2160 logger 2161 .get() 2162 .log( 2163 WARNING, 2164 String.format("while submitting close to %s; will close inline", executor), 2165 e); 2166 } 2167 closeQuietly(closeable, directExecutor()); 2168 } 2169 } 2170 2171 private void checkAndUpdateState(State oldState, State newState) { 2172 checkState( 2173 compareAndUpdateState(oldState, newState), 2174 "Expected state to be %s, but it was %s", 2175 oldState, 2176 newState); 2177 } 2178 2179 private boolean compareAndUpdateState(State oldState, State newState) { 2180 return state.compareAndSet(oldState, newState); 2181 } 2182 2183 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2184 private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor> 2185 implements Closeable { 2186 private final DeferredCloser closer = new DeferredCloser(this); 2187 private volatile boolean closed; 2188 @CheckForNull private volatile CountDownLatch whenClosed; 2189 2190 <V extends @Nullable Object, U extends @Nullable Object> 2191 ListenableFuture<U> applyClosingFunction( 2192 ClosingFunction<? super V, U> transformation, @ParametricNullness V input) 2193 throws Exception { 2194 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2195 CloseableList newCloseables = new CloseableList(); 2196 try { 2197 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2198 } finally { 2199 add(newCloseables, directExecutor()); 2200 } 2201 } 2202 2203 <V extends @Nullable Object, U extends @Nullable Object> 2204 FluentFuture<U> applyAsyncClosingFunction( 2205 AsyncClosingFunction<V, U> transformation, @ParametricNullness V input) 2206 throws Exception { 2207 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2208 CloseableList newCloseables = new CloseableList(); 2209 try { 2210 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2211 closingFuture.becomeSubsumedInto(newCloseables); 2212 return closingFuture.future; 2213 } finally { 2214 add(newCloseables, directExecutor()); 2215 } 2216 } 2217 2218 @Override 2219 public void close() { 2220 if (closed) { 2221 return; 2222 } 2223 synchronized (this) { 2224 if (closed) { 2225 return; 2226 } 2227 closed = true; 2228 } 2229 for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) { 2230 closeQuietly(entry.getKey(), entry.getValue()); 2231 } 2232 clear(); 2233 if (whenClosed != null) { 2234 whenClosed.countDown(); 2235 } 2236 } 2237 2238 void add(@CheckForNull AutoCloseable closeable, Executor executor) { 2239 checkNotNull(executor); 2240 if (closeable == null) { 2241 return; 2242 } 2243 synchronized (this) { 2244 if (!closed) { 2245 put(closeable, executor); 2246 return; 2247 } 2248 } 2249 closeQuietly(closeable, executor); 2250 } 2251 2252 /** 2253 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2254 */ 2255 CountDownLatch whenClosedCountDown() { 2256 if (closed) { 2257 return new CountDownLatch(0); 2258 } 2259 synchronized (this) { 2260 if (closed) { 2261 return new CountDownLatch(0); 2262 } 2263 checkState(whenClosed == null); 2264 return whenClosed = new CountDownLatch(1); 2265 } 2266 } 2267 } 2268 2269 /** 2270 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2271 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2272 */ 2273 @VisibleForTesting 2274 CountDownLatch whenClosedCountDown() { 2275 return closeables.whenClosedCountDown(); 2276 } 2277 2278 /** The state of a {@link CloseableList}. */ 2279 enum State { 2280 /** The {@link CloseableList} has not been subsumed or closed. */ 2281 OPEN, 2282 2283 /** 2284 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2285 * into any other. 2286 */ 2287 SUBSUMED, 2288 2289 /** 2290 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2291 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2292 */ 2293 WILL_CLOSE, 2294 2295 /** 2296 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2297 * {@link CloseableList} may not be subsumed. 2298 */ 2299 CLOSING, 2300 2301 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2302 CLOSED, 2303 2304 /** 2305 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2306 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2307 */ 2308 WILL_CREATE_VALUE_AND_CLOSER, 2309 } 2310}