001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 036import static java.util.logging.Level.FINER; 037import static java.util.logging.Level.SEVERE; 038import static java.util.logging.Level.WARNING; 039 040import com.google.common.annotations.J2ktIncompatible; 041import com.google.common.annotations.VisibleForTesting; 042import com.google.common.collect.FluentIterable; 043import com.google.common.collect.ImmutableList; 044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 046import com.google.common.util.concurrent.Futures.FutureCombiner; 047import com.google.errorprone.annotations.CanIgnoreReturnValue; 048import com.google.errorprone.annotations.DoNotMock; 049import com.google.j2objc.annotations.RetainedWith; 050import java.io.Closeable; 051import java.util.IdentityHashMap; 052import java.util.Map; 053import java.util.concurrent.Callable; 054import java.util.concurrent.CancellationException; 055import java.util.concurrent.CountDownLatch; 056import java.util.concurrent.ExecutionException; 057import java.util.concurrent.Executor; 058import java.util.concurrent.Future; 059import java.util.concurrent.RejectedExecutionException; 060import java.util.concurrent.atomic.AtomicReference; 061import org.jspecify.annotations.Nullable; 062 063/** 064 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 065 * complete, some objects captured during the computation are closed. 066 * 067 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 068 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 069 * cancellation of the operation so far. The only way to extract the value or exception from a step 070 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 071 * "value" of a successful step or the "result" (value or exception) of any step. 072 * 073 * <ol> 074 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 075 * block or a {@link ListenableFuture}. 076 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 077 * can be captured for later closing. 078 * <li>There is one last step (the root of the tree), from which you can extract the final result 079 * of the computation. After that result is available (or the computation fails), all objects 080 * captured by any of the steps in the pipeline are closed. 081 * </ol> 082 * 083 * <h3>Starting a pipeline</h3> 084 * 085 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 086 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 087 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 088 * #from(ListenableFuture)} instead. 089 * 090 * <h3>Derived steps</h3> 091 * 092 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 093 * ways similar to {@link FluentFuture}s: 094 * 095 * <ul> 096 * <li>by transforming the value from a successful input step, 097 * <li>by catching the exception from a failed input step, or 098 * <li>by combining the results of several input steps. 099 * </ul> 100 * 101 * Each derivation can capture the next value or any intermediate objects for later closing. 102 * 103 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 104 * exception, or combine it with others, you cannot do anything else with it, including declare it 105 * to be the last step of the pipeline. 106 * 107 * <h4>Transforming</h4> 108 * 109 * To derive the next step by asynchronously applying a function to an input step's value, call 110 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 111 * Executor)} on the input step. 112 * 113 * <h4>Catching</h4> 114 * 115 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 116 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 117 * 118 * <h4>Combining</h4> 119 * 120 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 121 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 122 * 123 * <h3>Cancelling</h3> 124 * 125 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 126 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 127 * successfully cancelled step will immediately start closing all objects captured for later closing 128 * by it and by its input steps. 129 * 130 * <h3>Ending a pipeline</h3> 131 * 132 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 133 * close the captured objects automatically or manually. 134 * 135 * <h4>Automatically closing</h4> 136 * 137 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 138 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin 139 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future 140 * completes before closing starts, rather than once it has finished. 141 * 142 * <pre>{@code 143 * FluentFuture<UserName> userName = 144 * ClosingFuture.submit( 145 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 146 * executor) 147 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 148 * .transform((closer, result) -> result.get("userName"), directExecutor()) 149 * .catching(DBException.class, e -> "no user", directExecutor()) 150 * .finishToFuture(); 151 * }</pre> 152 * 153 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 154 * result cursor will both be closed, even if the operation is cancelled or fails. 155 * 156 * <h4>Manually closing</h4> 157 * 158 * If you want to close the captured objects manually, after you've used the final result, call 159 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 160 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 161 * 162 * <pre>{@code 163 * ClosingFuture.submit( 164 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 165 * executor) 166 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 167 * .transform((closer, result) -> result.get("userName"), directExecutor()) 168 * .catching(DBException.class, e -> "no user", directExecutor()) 169 * .finishToValueAndCloser( 170 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 171 * 172 * // later 173 * try { // get() will throw if the operation failed or was cancelled. 174 * UserName userName = userNameValueAndCloser.get(); 175 * // do something with userName 176 * } finally { 177 * userNameValueAndCloser.closeAsync(); 178 * } 179 * }</pre> 180 * 181 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 182 * the query result cursor will both be closed, even if the operation is cancelled or fails. 183 * 184 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 185 * automatic-closing approach described above is safer. 186 * 187 * @param <V> the type of the value of this step 188 * @since 30.0 189 */ 190// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 191@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 192@J2ktIncompatible 193// TODO(dpb): GWT compatibility. 194public final class ClosingFuture<V extends @Nullable Object> { 195 196 private static final LazyLogger logger = new LazyLogger(ClosingFuture.class); 197 198 /** 199 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 200 * done. 201 */ 202 public static final class DeferredCloser { 203 @RetainedWith private final CloseableList list; 204 205 DeferredCloser(CloseableList list) { 206 this.list = list; 207 } 208 209 /** 210 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 211 * 212 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 213 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 214 * Closeable}. (For more about the flavors, see <a 215 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 216 * build</a>.) 217 * 218 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 219 * building for Android): Ensure that any object you pass implements the interface not just in 220 * your current SDK version but also at the oldest version you support. For example, <a 221 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 222 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 223 * {@code Closeable} with a method reference like {@code cursor::close}. 224 * 225 * <p>Note that this method is still binary-compatible between flavors because the erasure of 226 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 227 * 228 * @param closeable the object to be closed (see notes above) 229 * @param closingExecutor the object will be closed on this executor 230 * @return the first argument 231 */ 232 @CanIgnoreReturnValue 233 @ParametricNullness 234 public <C extends @Nullable Object & @Nullable AutoCloseable> C eventuallyClose( 235 @ParametricNullness C closeable, Executor closingExecutor) { 236 checkNotNull(closingExecutor); 237 if (closeable != null) { 238 list.add(closeable, closingExecutor); 239 } 240 return closeable; 241 } 242 } 243 244 /** 245 * An operation that computes a result. 246 * 247 * @param <V> the type of the result 248 */ 249 public interface ClosingCallable<V extends @Nullable Object> { 250 /** 251 * Computes a result, or throws an exception if unable to do so. 252 * 253 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 254 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 255 * not before this method completes), even if this method throws or the pipeline is cancelled. 256 */ 257 @ParametricNullness 258 V call(DeferredCloser closer) throws Exception; 259 } 260 261 /** 262 * An operation that computes a {@link ClosingFuture} of a result. 263 * 264 * @param <V> the type of the result 265 * @since 30.1 266 */ 267 public interface AsyncClosingCallable<V extends @Nullable Object> { 268 /** 269 * Computes a result, or throws an exception if unable to do so. 270 * 271 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 272 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 273 * not before this method completes), even if this method throws or the pipeline is cancelled. 274 */ 275 ClosingFuture<V> call(DeferredCloser closer) throws Exception; 276 } 277 278 /** 279 * A function from an input to a result. 280 * 281 * @param <T> the type of the input to the function 282 * @param <U> the type of the result of the function 283 */ 284 public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 285 286 /** 287 * Applies this function to an input, or throws an exception if unable to do so. 288 * 289 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 290 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 291 * not before this method completes), even if this method throws or the pipeline is cancelled. 292 */ 293 @ParametricNullness 294 U apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 295 } 296 297 /** 298 * A function from an input to a {@link ClosingFuture} of a result. 299 * 300 * @param <T> the type of the input to the function 301 * @param <U> the type of the result of the function 302 */ 303 public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 304 /** 305 * Applies this function to an input, or throws an exception if unable to do so. 306 * 307 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 308 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 309 * not before this method completes), even if this method throws or the pipeline is cancelled. 310 */ 311 ClosingFuture<U> apply(DeferredCloser closer, @ParametricNullness T input) throws Exception; 312 } 313 314 /** 315 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 316 * allows the user to close all the closeable objects that were captured during it for later 317 * closing. 318 * 319 * <p>The asynchronous operation will have completed before this object is created. 320 * 321 * @param <V> the type of the value of a successful operation 322 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 323 */ 324 public static final class ValueAndCloser<V extends @Nullable Object> { 325 326 private final ClosingFuture<? extends V> closingFuture; 327 328 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 329 this.closingFuture = checkNotNull(closingFuture); 330 } 331 332 /** 333 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 334 * {@link Future#get()} would. 335 * 336 * <p>Because the asynchronous operation has already completed, this method is synchronous and 337 * returns immediately. 338 * 339 * @throws CancellationException if the computation was cancelled 340 * @throws ExecutionException if the computation threw an exception 341 */ 342 @ParametricNullness 343 public V get() throws ExecutionException { 344 return getDone(closingFuture.future); 345 } 346 347 /** 348 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 349 * operation on the {@link Executor}s specified by calls to {@link 350 * DeferredCloser#eventuallyClose(Object, Executor)}. 351 * 352 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 353 * closed synchronously. 354 * 355 * <p>Idempotent: objects will be closed at most once. 356 */ 357 public void closeAsync() { 358 closingFuture.close(); 359 } 360 } 361 362 /** 363 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 364 * ClosingFuture} pipeline. 365 * 366 * @param <V> the type of the final value of a successful pipeline 367 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 368 */ 369 public interface ValueAndCloserConsumer<V extends @Nullable Object> { 370 371 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 372 void accept(ValueAndCloser<V> valueAndCloser); 373 } 374 375 /** 376 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 377 * 378 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 379 * execution 380 */ 381 public static <V extends @Nullable Object> ClosingFuture<V> submit( 382 ClosingCallable<V> callable, Executor executor) { 383 checkNotNull(callable); 384 CloseableList closeables = new CloseableList(); 385 TrustedListenableFutureTask<V> task = 386 TrustedListenableFutureTask.create( 387 new Callable<V>() { 388 @Override 389 @ParametricNullness 390 public V call() throws Exception { 391 return callable.call(closeables.closer); 392 } 393 394 @Override 395 public String toString() { 396 return callable.toString(); 397 } 398 }); 399 executor.execute(task); 400 return new ClosingFuture<>(task, closeables); 401 } 402 403 /** 404 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 405 * 406 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 407 * execution 408 * @since 30.1 409 */ 410 public static <V extends @Nullable Object> ClosingFuture<V> submitAsync( 411 AsyncClosingCallable<V> callable, Executor executor) { 412 checkNotNull(callable); 413 CloseableList closeables = new CloseableList(); 414 TrustedListenableFutureTask<V> task = 415 TrustedListenableFutureTask.create( 416 new AsyncCallable<V>() { 417 @Override 418 public ListenableFuture<V> call() throws Exception { 419 CloseableList newCloseables = new CloseableList(); 420 try { 421 ClosingFuture<V> closingFuture = callable.call(newCloseables.closer); 422 closingFuture.becomeSubsumedInto(closeables); 423 return closingFuture.future; 424 } finally { 425 closeables.add(newCloseables, directExecutor()); 426 } 427 } 428 429 @Override 430 public String toString() { 431 return callable.toString(); 432 } 433 }); 434 executor.execute(task); 435 return new ClosingFuture<>(task, closeables); 436 } 437 438 /** 439 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 440 * 441 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 442 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 443 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 444 */ 445 public static <V extends @Nullable Object> ClosingFuture<V> from(ListenableFuture<V> future) { 446 return new ClosingFuture<>(future); 447 } 448 449 /** 450 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 451 * 452 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)}) when 453 * the pipeline is done, even if the pipeline is canceled or fails. 454 * 455 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 456 * value in order to close it. 457 * 458 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 459 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Object, 460 * Executor)}. 461 * @param closingExecutor the future's result will be closed on this executor 462 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 463 * underlying value may never be closed if the {@link Future} is canceled after its operation 464 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 465 * including those that pass them to this method, with {@link #submit(ClosingCallable, 466 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 467 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 468 * ClosingFuture#from}. 469 */ 470 @Deprecated 471 public static <C extends @Nullable Object & @Nullable AutoCloseable> 472 ClosingFuture<C> eventuallyClosing(ListenableFuture<C> future, Executor closingExecutor) { 473 checkNotNull(closingExecutor); 474 ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 475 Futures.addCallback( 476 future, 477 new FutureCallback<@Nullable AutoCloseable>() { 478 @Override 479 public void onSuccess(@Nullable AutoCloseable result) { 480 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 481 } 482 483 @Override 484 public void onFailure(Throwable t) {} 485 }, 486 directExecutor()); 487 return closingFuture; 488 } 489 490 /** 491 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 492 * 493 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 494 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 495 */ 496 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 497 return new Combiner(false, futures); 498 } 499 500 /** 501 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 502 * 503 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 504 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 505 */ 506 public static Combiner whenAllComplete( 507 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 508 return whenAllComplete(asList(future1, moreFutures)); 509 } 510 511 /** 512 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 513 * all succeed. If any fail, the resulting pipeline will fail. 514 * 515 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 516 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 517 */ 518 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 519 return new Combiner(true, futures); 520 } 521 522 /** 523 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 524 * they all succeed. If any fail, the resulting pipeline will fail. 525 * 526 * <p>Calling this method allows you to use lambdas or method references typed with the types of 527 * the input {@link ClosingFuture}s. 528 * 529 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 530 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 531 */ 532 public static <V1 extends @Nullable Object, V2 extends @Nullable Object> 533 Combiner2<V1, V2> whenAllSucceed(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 534 return new Combiner2<>(future1, future2); 535 } 536 537 /** 538 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 539 * they all succeed. If any fail, the resulting pipeline will fail. 540 * 541 * <p>Calling this method allows you to use lambdas or method references typed with the types of 542 * the input {@link ClosingFuture}s. 543 * 544 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 545 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 546 */ 547 public static < 548 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 549 Combiner3<V1, V2, V3> whenAllSucceed( 550 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 551 return new Combiner3<>(future1, future2, future3); 552 } 553 554 /** 555 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 556 * they all succeed. If any fail, the resulting pipeline will fail. 557 * 558 * <p>Calling this method allows you to use lambdas or method references typed with the types of 559 * the input {@link ClosingFuture}s. 560 * 561 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 562 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 563 */ 564 public static < 565 V1 extends @Nullable Object, 566 V2 extends @Nullable Object, 567 V3 extends @Nullable Object, 568 V4 extends @Nullable Object> 569 Combiner4<V1, V2, V3, V4> whenAllSucceed( 570 ClosingFuture<V1> future1, 571 ClosingFuture<V2> future2, 572 ClosingFuture<V3> future3, 573 ClosingFuture<V4> future4) { 574 return new Combiner4<>(future1, future2, future3, future4); 575 } 576 577 /** 578 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 579 * they all succeed. If any fail, the resulting pipeline will fail. 580 * 581 * <p>Calling this method allows you to use lambdas or method references typed with the types of 582 * the input {@link ClosingFuture}s. 583 * 584 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 585 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 586 */ 587 public static < 588 V1 extends @Nullable Object, 589 V2 extends @Nullable Object, 590 V3 extends @Nullable Object, 591 V4 extends @Nullable Object, 592 V5 extends @Nullable Object> 593 Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 594 ClosingFuture<V1> future1, 595 ClosingFuture<V2> future2, 596 ClosingFuture<V3> future3, 597 ClosingFuture<V4> future4, 598 ClosingFuture<V5> future5) { 599 return new Combiner5<>(future1, future2, future3, future4, future5); 600 } 601 602 /** 603 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 604 * all succeed. If any fail, the resulting pipeline will fail. 605 * 606 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 607 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 608 */ 609 public static Combiner whenAllSucceed( 610 ClosingFuture<?> future1, 611 ClosingFuture<?> future2, 612 ClosingFuture<?> future3, 613 ClosingFuture<?> future4, 614 ClosingFuture<?> future5, 615 ClosingFuture<?> future6, 616 ClosingFuture<?>... moreFutures) { 617 return whenAllSucceed( 618 FluentIterable.of(future1, future2, future3, future4, future5, future6) 619 .append(moreFutures)); 620 } 621 622 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 623 private final CloseableList closeables; 624 private final FluentFuture<V> future; 625 626 private ClosingFuture(ListenableFuture<V> future) { 627 this(future, new CloseableList()); 628 } 629 630 private ClosingFuture(ListenableFuture<V> future, CloseableList closeables) { 631 this.future = FluentFuture.from(future); 632 this.closeables = closeables; 633 } 634 635 /** 636 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 637 * future returns {@code null} if the step is successful or throws the same exception that would 638 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 639 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 640 * 641 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 642 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 643 * a derivation method <i>on the same instance</i>. This is important because calling {@code 644 * statusFuture} alone does not provide a way to close the pipeline. 645 */ 646 public ListenableFuture<?> statusFuture() { 647 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 648 } 649 650 /** 651 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 652 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 653 * when the pipeline is done. 654 * 655 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 656 * ClosingFuture} will be equivalent to this one. 657 * 658 * <p>If the function throws an exception, that exception is used as the result of the derived 659 * {@code ClosingFuture}. 660 * 661 * <p>Example usage: 662 * 663 * <pre>{@code 664 * ClosingFuture<List<Row>> rowsFuture = 665 * queryFuture.transform((closer, result) -> result.getRows(), executor); 666 * }</pre> 667 * 668 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 669 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 670 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 671 * 672 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 673 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 674 * the original {@code ClosingFuture} instance. 675 * 676 * @param function transforms the value of this step to the value of the derived step 677 * @param executor executor to run the function in 678 * @return the derived step 679 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 680 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 681 * finished} 682 */ 683 public <U extends @Nullable Object> ClosingFuture<U> transform( 684 ClosingFunction<? super V, U> function, Executor executor) { 685 checkNotNull(function); 686 AsyncFunction<V, U> applyFunction = 687 new AsyncFunction<V, U>() { 688 @Override 689 public ListenableFuture<U> apply(V input) throws Exception { 690 return closeables.applyClosingFunction(function, input); 691 } 692 693 @Override 694 public String toString() { 695 return function.toString(); 696 } 697 }; 698 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 699 return derive(future.transformAsync(applyFunction, executor)); 700 } 701 702 /** 703 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 704 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 705 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 706 * captured by the returned {@link ClosingFuture}). 707 * 708 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 709 * returned by the function. 710 * 711 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 712 * ClosingFuture} will be equivalent to this one. 713 * 714 * <p>If the function throws an exception, that exception is used as the result of the derived 715 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 716 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 717 * closed. 718 * 719 * <p>Usage guidelines for this method: 720 * 721 * <ul> 722 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 723 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 724 * Executor)} instead, with a function that returns the next value directly. 725 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 726 * for every closeable object this step creates in order to capture it for later closing. 727 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 728 * ClosingFuture} call {@link #from(ListenableFuture)}. 729 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 730 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 731 * {@link #withoutCloser(AsyncFunction)} 732 * </ul> 733 * 734 * <p>Example usage: 735 * 736 * <pre>{@code 737 * // Result.getRowsClosingFuture() returns a ClosingFuture. 738 * ClosingFuture<List<Row>> rowsFuture = 739 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 740 * 741 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 742 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 743 * // Closeable). 744 * ClosingFuture<Integer> rowsFuture2 = 745 * queryFuture.transformAsync( 746 * (closer, result) -> { 747 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 748 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 749 * }, 750 * executor); 751 * 752 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 753 * ClosingFuture<List<Row>> rowsFuture3 = 754 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 755 * 756 * }</pre> 757 * 758 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 759 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 760 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 761 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 762 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 763 * responsible for completing the returned {@code ClosingFuture}.) 764 * 765 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 766 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 767 * the original {@code ClosingFuture} instance. 768 * 769 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 770 * the derived step 771 * @param executor executor to run the function in 772 * @return the derived step 773 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 774 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 775 * finished} 776 */ 777 public <U extends @Nullable Object> ClosingFuture<U> transformAsync( 778 AsyncClosingFunction<? super V, U> function, Executor executor) { 779 checkNotNull(function); 780 AsyncFunction<V, U> applyFunction = 781 new AsyncFunction<V, U>() { 782 @Override 783 public ListenableFuture<U> apply(V input) throws Exception { 784 return closeables.applyAsyncClosingFunction(function, input); 785 } 786 787 @Override 788 public String toString() { 789 return function.toString(); 790 } 791 }; 792 return derive(future.transformAsync(applyFunction, executor)); 793 } 794 795 /** 796 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 797 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 798 * {@link ListenableFuture}. 799 * 800 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 801 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 802 * meets these conditions: 803 * 804 * <ul> 805 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 806 * DeferredCloser#eventuallyClose(Object, Executor)}. 807 * <li>It returns a {@link ListenableFuture}. 808 * </ul> 809 * 810 * <p>Example usage: 811 * 812 * <pre>{@code 813 * // Result.getRowsFuture() returns a ListenableFuture. 814 * ClosingFuture<List<Row>> rowsFuture = 815 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 816 * }</pre> 817 * 818 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 819 * ListenableFuture} with the value of a derived step 820 */ 821 public static <V extends @Nullable Object, U extends @Nullable Object> 822 AsyncClosingFunction<V, U> withoutCloser(AsyncFunction<V, U> function) { 823 checkNotNull(function); 824 return (closer, input) -> ClosingFuture.from(function.apply(input)); 825 } 826 827 /** 828 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 829 * to its exception if it is an instance of a given exception type. The function can use a {@link 830 * DeferredCloser} to capture objects to be closed when the pipeline is done. 831 * 832 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 833 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 834 * one. 835 * 836 * <p>If the function throws an exception, that exception is used as the result of the derived 837 * {@code ClosingFuture}. 838 * 839 * <p>Example usage: 840 * 841 * <pre>{@code 842 * ClosingFuture<QueryResult> queryFuture = 843 * queryFuture.catching( 844 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 845 * }</pre> 846 * 847 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 848 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 849 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 850 * 851 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 852 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 853 * the original {@code ClosingFuture} instance. 854 * 855 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 856 * type is matched against this step's exception. "This step's exception" means the cause of 857 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 858 * underlying this step or, if {@code get()} throws a different kind of exception, that 859 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 860 * prefer more specific types, avoiding {@code Throwable.class} in particular. 861 * @param fallback the function to be called if this step fails with the expected exception type. 862 * The function's argument is this step's exception. "This step's exception" means the cause 863 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 864 * underlying this step or, if {@code get()} throws a different kind of exception, that 865 * exception itself. 866 * @param executor the executor that runs {@code fallback} if the input fails 867 */ 868 public <X extends Throwable> ClosingFuture<V> catching( 869 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 870 return catchingMoreGeneric(exceptionType, fallback, executor); 871 } 872 873 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 874 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 875 Class<X> exceptionType, ClosingFunction<? super X, W> fallback, Executor executor) { 876 checkNotNull(fallback); 877 AsyncFunction<X, W> applyFallback = 878 new AsyncFunction<X, W>() { 879 @Override 880 public ListenableFuture<W> apply(X exception) throws Exception { 881 return closeables.applyClosingFunction(fallback, exception); 882 } 883 884 @Override 885 public String toString() { 886 return fallback.toString(); 887 } 888 }; 889 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 890 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 891 } 892 893 /** 894 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 895 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 896 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 897 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 898 * 899 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 900 * ClosingFuture} will be equivalent to the one returned by the function. 901 * 902 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 903 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 904 * one. 905 * 906 * <p>If the function throws an exception, that exception is used as the result of the derived 907 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 908 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 909 * closed. 910 * 911 * <p>Usage guidelines for this method: 912 * 913 * <ul> 914 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 915 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 916 * ClosingFunction, Executor)} instead, with a function that returns the next value 917 * directly. 918 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 919 * for every closeable object this step creates in order to capture it for later closing. 920 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 921 * ClosingFuture} call {@link #from(ListenableFuture)}. 922 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 923 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 924 * {@link #withoutCloser(AsyncFunction)} 925 * </ul> 926 * 927 * <p>Example usage: 928 * 929 * <pre>{@code 930 * // Fall back to a secondary input stream in case of IOException. 931 * ClosingFuture<InputStream> inputFuture = 932 * firstInputFuture.catchingAsync( 933 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 934 * } 935 * }</pre> 936 * 937 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 938 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 939 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 940 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 941 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 942 * responsible for completing the returned {@code ClosingFuture}.) 943 * 944 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 945 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 946 * the original {@code ClosingFuture} instance. 947 * 948 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 949 * type is matched against this step's exception. "This step's exception" means the cause of 950 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 951 * underlying this step or, if {@code get()} throws a different kind of exception, that 952 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 953 * prefer more specific types, avoiding {@code Throwable.class} in particular. 954 * @param fallback the function to be called if this step fails with the expected exception type. 955 * The function's argument is this step's exception. "This step's exception" means the cause 956 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 957 * underlying this step or, if {@code get()} throws a different kind of exception, that 958 * exception itself. 959 * @param executor the executor that runs {@code fallback} if the input fails 960 */ 961 // TODO(dpb): Should this do something special if the function throws CancellationException or 962 // ExecutionException? 963 public <X extends Throwable> ClosingFuture<V> catchingAsync( 964 Class<X> exceptionType, 965 AsyncClosingFunction<? super X, ? extends V> fallback, 966 Executor executor) { 967 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 968 } 969 970 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 971 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 972 Class<X> exceptionType, AsyncClosingFunction<? super X, W> fallback, Executor executor) { 973 checkNotNull(fallback); 974 AsyncFunction<X, W> asyncFunction = 975 new AsyncFunction<X, W>() { 976 @Override 977 public ListenableFuture<W> apply(X exception) throws Exception { 978 return closeables.applyAsyncClosingFunction(fallback, exception); 979 } 980 981 @Override 982 public String toString() { 983 return fallback.toString(); 984 } 985 }; 986 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 987 } 988 989 /** 990 * Marks this step as the last step in the {@code ClosingFuture} pipeline. 991 * 992 * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when 993 * the pipeline is cancelled. 994 * 995 * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously 996 * <b>after</b> the returned {@code Future} is done: the future completes before closing starts, 997 * rather than once it has finished. 998 * 999 * <p>After calling this method, you may not call {@link 1000 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 1001 * derivation method on the original {@code ClosingFuture} instance. 1002 * 1003 * @return a {@link Future} that represents the final value or exception of the pipeline 1004 */ 1005 public FluentFuture<V> finishToFuture() { 1006 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 1007 logger.get().log(FINER, "will close {0}", this); 1008 future.addListener( 1009 () -> { 1010 checkAndUpdateState(WILL_CLOSE, CLOSING); 1011 close(); 1012 checkAndUpdateState(CLOSING, CLOSED); 1013 }, 1014 directExecutor()); 1015 } else { 1016 switch (state.get()) { 1017 case SUBSUMED: 1018 throw new IllegalStateException( 1019 "Cannot call finishToFuture() after deriving another step"); 1020 1021 case WILL_CREATE_VALUE_AND_CLOSER: 1022 throw new IllegalStateException( 1023 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 1024 1025 case WILL_CLOSE: 1026 case CLOSING: 1027 case CLOSED: 1028 throw new IllegalStateException("Cannot call finishToFuture() twice"); 1029 1030 case OPEN: 1031 throw new AssertionError(); 1032 } 1033 } 1034 return future; 1035 } 1036 1037 /** 1038 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 1039 * {@code receiver} will be called with an object that contains the result of the operation. The 1040 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 1041 * 1042 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 1043 * any other derivation method on the original {@code ClosingFuture} instance. 1044 * 1045 * @param consumer a callback whose method will be called (using {@code executor}) when this 1046 * operation is done 1047 */ 1048 public void finishToValueAndCloser( 1049 ValueAndCloserConsumer<? super V> consumer, Executor executor) { 1050 checkNotNull(consumer); 1051 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 1052 switch (state.get()) { 1053 case SUBSUMED: 1054 throw new IllegalStateException( 1055 "Cannot call finishToValueAndCloser() after deriving another step"); 1056 1057 case WILL_CLOSE: 1058 case CLOSING: 1059 case CLOSED: 1060 throw new IllegalStateException( 1061 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1062 1063 case WILL_CREATE_VALUE_AND_CLOSER: 1064 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1065 1066 case OPEN: 1067 break; 1068 } 1069 throw new AssertionError(state); 1070 } 1071 future.addListener(() -> provideValueAndCloser(consumer, ClosingFuture.this), executor); 1072 } 1073 1074 private static <C extends @Nullable Object, V extends C> void provideValueAndCloser( 1075 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1076 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1077 } 1078 1079 /** 1080 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1081 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1082 * successful, and this step has not started when {@code cancel} is called, this step should never 1083 * run. 1084 * 1085 * <p>If successful, causes the objects captured by this step (if already started) and its input 1086 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1087 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1088 * 1089 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1090 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1091 * cancelled regardless 1092 * @return {@code false} if the step could not be cancelled, typically because it has already 1093 * completed normally; {@code true} otherwise 1094 */ 1095 @CanIgnoreReturnValue 1096 @SuppressWarnings("Interruption") // We are propagating an interrupt from a caller. 1097 public boolean cancel(boolean mayInterruptIfRunning) { 1098 logger.get().log(FINER, "cancelling {0}", this); 1099 boolean cancelled = future.cancel(mayInterruptIfRunning); 1100 if (cancelled) { 1101 close(); 1102 } 1103 return cancelled; 1104 } 1105 1106 private void close() { 1107 logger.get().log(FINER, "closing {0}", this); 1108 closeables.close(); 1109 } 1110 1111 private <U extends @Nullable Object> ClosingFuture<U> derive(FluentFuture<U> future) { 1112 ClosingFuture<U> derived = new ClosingFuture<>(future); 1113 becomeSubsumedInto(derived.closeables); 1114 return derived; 1115 } 1116 1117 private void becomeSubsumedInto(CloseableList otherCloseables) { 1118 checkAndUpdateState(OPEN, SUBSUMED); 1119 otherCloseables.add(closeables, directExecutor()); 1120 } 1121 1122 /** 1123 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1124 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1125 * 1126 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1127 */ 1128 public static final class Peeker { 1129 private final ImmutableList<ClosingFuture<?>> futures; 1130 private volatile boolean beingCalled; 1131 1132 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1133 this.futures = checkNotNull(futures); 1134 } 1135 1136 /** 1137 * Returns the value of {@code closingFuture}. 1138 * 1139 * @throws ExecutionException if {@code closingFuture} is a failed step 1140 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1141 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1142 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1143 * @throws IllegalStateException if called outside of a call to {@link 1144 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1145 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1146 */ 1147 @ParametricNullness 1148 public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture) 1149 throws ExecutionException { 1150 checkState(beingCalled); 1151 checkArgument(futures.contains(closingFuture)); 1152 return Futures.getDone(closingFuture.future); 1153 } 1154 1155 @ParametricNullness 1156 private <V extends @Nullable Object> V call( 1157 CombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1158 beingCalled = true; 1159 CloseableList newCloseables = new CloseableList(); 1160 try { 1161 return combiner.call(newCloseables.closer, this); 1162 } finally { 1163 closeables.add(newCloseables, directExecutor()); 1164 beingCalled = false; 1165 } 1166 } 1167 1168 private <V extends @Nullable Object> FluentFuture<V> callAsync( 1169 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1170 beingCalled = true; 1171 CloseableList newCloseables = new CloseableList(); 1172 try { 1173 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1174 closingFuture.becomeSubsumedInto(closeables); 1175 return closingFuture.future; 1176 } finally { 1177 closeables.add(newCloseables, directExecutor()); 1178 beingCalled = false; 1179 } 1180 } 1181 } 1182 1183 /** 1184 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1185 * 1186 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1187 * instantiate this class. 1188 * 1189 * <p>Example: 1190 * 1191 * <pre>{@code 1192 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1193 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1194 * ListenableFuture<Integer> numberOfDifferentLines = 1195 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1196 * .call( 1197 * (closer, peeker) -> { 1198 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1199 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1200 * return countDifferentLines(file1Reader, file2Reader); 1201 * }, 1202 * executor) 1203 * .closing(executor); 1204 * }</pre> 1205 */ 1206 @DoNotMock("Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1207 public static class Combiner { 1208 1209 private final CloseableList closeables = new CloseableList(); 1210 1211 /** 1212 * An operation that returns a result and may throw an exception. 1213 * 1214 * @param <V> the type of the result 1215 */ 1216 public interface CombiningCallable<V extends @Nullable Object> { 1217 /** 1218 * Computes a result, or throws an exception if unable to do so. 1219 * 1220 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1221 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1222 * (but not before this method completes), even if this method throws or the pipeline is 1223 * cancelled. 1224 * 1225 * @param peeker used to get the value of any of the input futures 1226 */ 1227 @ParametricNullness 1228 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1229 } 1230 1231 /** 1232 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1233 * 1234 * @param <V> the type of the result 1235 */ 1236 public interface AsyncCombiningCallable<V extends @Nullable Object> { 1237 /** 1238 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1239 * 1240 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1241 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1242 * (but not before this method completes), even if this method throws or the pipeline is 1243 * cancelled. 1244 * 1245 * @param peeker used to get the value of any of the input futures 1246 */ 1247 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1248 } 1249 1250 private final boolean allMustSucceed; 1251 protected final ImmutableList<ClosingFuture<?>> inputs; 1252 1253 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1254 this.allMustSucceed = allMustSucceed; 1255 this.inputs = ImmutableList.copyOf(inputs); 1256 for (ClosingFuture<?> input : inputs) { 1257 input.becomeSubsumedInto(closeables); 1258 } 1259 } 1260 1261 /** 1262 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1263 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1264 * objects to be closed when the pipeline is done. 1265 * 1266 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1267 * fail, so will the returned step. 1268 * 1269 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1270 * cancelled. 1271 * 1272 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1273 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1274 */ 1275 public <V extends @Nullable Object> ClosingFuture<V> call( 1276 CombiningCallable<V> combiningCallable, Executor executor) { 1277 Callable<V> callable = 1278 new Callable<V>() { 1279 @Override 1280 @ParametricNullness 1281 public V call() throws Exception { 1282 return new Peeker(inputs).call(combiningCallable, closeables); 1283 } 1284 1285 @Override 1286 public String toString() { 1287 return combiningCallable.toString(); 1288 } 1289 }; 1290 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1291 derived.closeables.add(closeables, directExecutor()); 1292 return derived; 1293 } 1294 1295 /** 1296 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1297 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1298 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1299 * captured by the returned {@link ClosingFuture}). 1300 * 1301 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1302 * fail, so will the returned step. 1303 * 1304 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1305 * cancelled. 1306 * 1307 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1308 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1309 * 1310 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1311 * derived step. 1312 * 1313 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1314 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1315 * 1316 * <p>Usage guidelines for this method: 1317 * 1318 * <ul> 1319 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1320 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1321 * Executor)} instead, with a function that returns the next value directly. 1322 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1323 * for every closeable object this step creates in order to capture it for later closing. 1324 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1325 * ClosingFuture} call {@link #from(ListenableFuture)}. 1326 * </ul> 1327 * 1328 * <p>The same warnings about doing heavyweight operations within {@link 1329 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1330 */ 1331 public <V extends @Nullable Object> ClosingFuture<V> callAsync( 1332 AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1333 AsyncCallable<V> asyncCallable = 1334 new AsyncCallable<V>() { 1335 @Override 1336 public ListenableFuture<V> call() throws Exception { 1337 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1338 } 1339 1340 @Override 1341 public String toString() { 1342 return combiningCallable.toString(); 1343 } 1344 }; 1345 ClosingFuture<V> derived = 1346 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1347 derived.closeables.add(closeables, directExecutor()); 1348 return derived; 1349 } 1350 1351 private FutureCombiner<@Nullable Object> futureCombiner() { 1352 return allMustSucceed 1353 ? Futures.whenAllSucceed(inputFutures()) 1354 : Futures.whenAllComplete(inputFutures()); 1355 } 1356 1357 private ImmutableList<FluentFuture<?>> inputFutures() { 1358 return FluentIterable.from(inputs) 1359 .<FluentFuture<?>>transform(future -> future.future) 1360 .toList(); 1361 } 1362 } 1363 1364 /** 1365 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1366 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1367 * combination. 1368 * 1369 * @param <V1> the type returned by the first future 1370 * @param <V2> the type returned by the second future 1371 */ 1372 public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object> 1373 extends Combiner { 1374 1375 /** 1376 * A function that returns a value when applied to the values of the two futures passed to 1377 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1378 * 1379 * @param <V1> the type returned by the first future 1380 * @param <V2> the type returned by the second future 1381 * @param <U> the type returned by the function 1382 */ 1383 public interface ClosingFunction2< 1384 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1385 1386 /** 1387 * Applies this function to two inputs, or throws an exception if unable to do so. 1388 * 1389 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1390 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1391 * (but not before this method completes), even if this method throws or the pipeline is 1392 * cancelled. 1393 */ 1394 @ParametricNullness 1395 U apply(DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1396 throws Exception; 1397 } 1398 1399 /** 1400 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1401 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1402 * 1403 * @param <V1> the type returned by the first future 1404 * @param <V2> the type returned by the second future 1405 * @param <U> the type returned by the function 1406 */ 1407 public interface AsyncClosingFunction2< 1408 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1409 1410 /** 1411 * Applies this function to two inputs, or throws an exception if unable to do so. 1412 * 1413 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1414 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1415 * (but not before this method completes), even if this method throws or the pipeline is 1416 * cancelled. 1417 */ 1418 ClosingFuture<U> apply( 1419 DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2) 1420 throws Exception; 1421 } 1422 1423 private final ClosingFuture<V1> future1; 1424 private final ClosingFuture<V2> future2; 1425 1426 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1427 super(true, ImmutableList.of(future1, future2)); 1428 this.future1 = future1; 1429 this.future2 = future2; 1430 } 1431 1432 /** 1433 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1434 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1435 * objects to be closed when the pipeline is done. 1436 * 1437 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1438 * any of the inputs fail, so will the returned step. 1439 * 1440 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1441 * 1442 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1443 * ExecutionException} will be extracted and used as the failure of the derived step. 1444 */ 1445 public <U extends @Nullable Object> ClosingFuture<U> call( 1446 ClosingFunction2<V1, V2, U> function, Executor executor) { 1447 return call( 1448 new CombiningCallable<U>() { 1449 @Override 1450 @ParametricNullness 1451 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1452 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1453 } 1454 1455 @Override 1456 public String toString() { 1457 return function.toString(); 1458 } 1459 }, 1460 executor); 1461 } 1462 1463 /** 1464 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1465 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1466 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1467 * captured by the returned {@link ClosingFuture}). 1468 * 1469 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1470 * any of the inputs fail, so will the returned step. 1471 * 1472 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1473 * 1474 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1475 * ExecutionException} will be extracted and used as the failure of the derived step. 1476 * 1477 * <p>If the function throws any other exception, it will be used as the failure of the derived 1478 * step. 1479 * 1480 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1481 * the closeable objects in that {@code ClosingFuture} will be closed. 1482 * 1483 * <p>Usage guidelines for this method: 1484 * 1485 * <ul> 1486 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1487 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1488 * Executor)} instead, with a function that returns the next value directly. 1489 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1490 * for every closeable object this step creates in order to capture it for later closing. 1491 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1492 * ClosingFuture} call {@link #from(ListenableFuture)}. 1493 * </ul> 1494 * 1495 * <p>The same warnings about doing heavyweight operations within {@link 1496 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1497 */ 1498 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1499 AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1500 return callAsync( 1501 new AsyncCombiningCallable<U>() { 1502 @Override 1503 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1504 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1505 } 1506 1507 @Override 1508 public String toString() { 1509 return function.toString(); 1510 } 1511 }, 1512 executor); 1513 } 1514 } 1515 1516 /** 1517 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1518 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1519 * ClosingFuture)} to start this combination. 1520 * 1521 * @param <V1> the type returned by the first future 1522 * @param <V2> the type returned by the second future 1523 * @param <V3> the type returned by the third future 1524 */ 1525 public static final class Combiner3< 1526 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 1527 extends Combiner { 1528 /** 1529 * A function that returns a value when applied to the values of the three futures passed to 1530 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1531 * 1532 * @param <V1> the type returned by the first future 1533 * @param <V2> the type returned by the second future 1534 * @param <V3> the type returned by the third future 1535 * @param <U> the type returned by the function 1536 */ 1537 public interface ClosingFunction3< 1538 V1 extends @Nullable Object, 1539 V2 extends @Nullable Object, 1540 V3 extends @Nullable Object, 1541 U extends @Nullable Object> { 1542 /** 1543 * Applies this function to three inputs, or throws an exception if unable to do so. 1544 * 1545 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1546 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1547 * (but not before this method completes), even if this method throws or the pipeline is 1548 * cancelled. 1549 */ 1550 @ParametricNullness 1551 U apply( 1552 DeferredCloser closer, 1553 @ParametricNullness V1 value1, 1554 @ParametricNullness V2 value2, 1555 @ParametricNullness V3 value3) 1556 throws Exception; 1557 } 1558 1559 /** 1560 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1561 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1562 * 1563 * @param <V1> the type returned by the first future 1564 * @param <V2> the type returned by the second future 1565 * @param <V3> the type returned by the third future 1566 * @param <U> the type returned by the function 1567 */ 1568 public interface AsyncClosingFunction3< 1569 V1 extends @Nullable Object, 1570 V2 extends @Nullable Object, 1571 V3 extends @Nullable Object, 1572 U extends @Nullable Object> { 1573 /** 1574 * Applies this function to three inputs, or throws an exception if unable to do so. 1575 * 1576 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1577 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1578 * (but not before this method completes), even if this method throws or the pipeline is 1579 * cancelled. 1580 */ 1581 ClosingFuture<U> apply( 1582 DeferredCloser closer, 1583 @ParametricNullness V1 value1, 1584 @ParametricNullness V2 value2, 1585 @ParametricNullness V3 value3) 1586 throws Exception; 1587 } 1588 1589 private final ClosingFuture<V1> future1; 1590 private final ClosingFuture<V2> future2; 1591 private final ClosingFuture<V3> future3; 1592 1593 private Combiner3( 1594 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1595 super(true, ImmutableList.of(future1, future2, future3)); 1596 this.future1 = future1; 1597 this.future2 = future2; 1598 this.future3 = future3; 1599 } 1600 1601 /** 1602 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1603 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1604 * objects to be closed when the pipeline is done. 1605 * 1606 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1607 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1608 * 1609 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1610 * 1611 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1612 * ExecutionException} will be extracted and used as the failure of the derived step. 1613 */ 1614 public <U extends @Nullable Object> ClosingFuture<U> call( 1615 ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1616 return call( 1617 new CombiningCallable<U>() { 1618 @Override 1619 @ParametricNullness 1620 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1621 return function.apply( 1622 closer, 1623 peeker.getDone(future1), 1624 peeker.getDone(future2), 1625 peeker.getDone(future3)); 1626 } 1627 1628 @Override 1629 public String toString() { 1630 return function.toString(); 1631 } 1632 }, 1633 executor); 1634 } 1635 1636 /** 1637 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1638 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1639 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1640 * captured by the returned {@link ClosingFuture}). 1641 * 1642 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1643 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1644 * 1645 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1646 * 1647 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1648 * ExecutionException} will be extracted and used as the failure of the derived step. 1649 * 1650 * <p>If the function throws any other exception, it will be used as the failure of the derived 1651 * step. 1652 * 1653 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1654 * the closeable objects in that {@code ClosingFuture} will be closed. 1655 * 1656 * <p>Usage guidelines for this method: 1657 * 1658 * <ul> 1659 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1660 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1661 * Executor)} instead, with a function that returns the next value directly. 1662 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1663 * for every closeable object this step creates in order to capture it for later closing. 1664 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1665 * ClosingFuture} call {@link #from(ListenableFuture)}. 1666 * </ul> 1667 * 1668 * <p>The same warnings about doing heavyweight operations within {@link 1669 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1670 */ 1671 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1672 AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1673 return callAsync( 1674 new AsyncCombiningCallable<U>() { 1675 @Override 1676 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1677 return function.apply( 1678 closer, 1679 peeker.getDone(future1), 1680 peeker.getDone(future2), 1681 peeker.getDone(future3)); 1682 } 1683 1684 @Override 1685 public String toString() { 1686 return function.toString(); 1687 } 1688 }, 1689 executor); 1690 } 1691 } 1692 1693 /** 1694 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1695 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1696 * ClosingFuture)} to start this combination. 1697 * 1698 * @param <V1> the type returned by the first future 1699 * @param <V2> the type returned by the second future 1700 * @param <V3> the type returned by the third future 1701 * @param <V4> the type returned by the fourth future 1702 */ 1703 public static final class Combiner4< 1704 V1 extends @Nullable Object, 1705 V2 extends @Nullable Object, 1706 V3 extends @Nullable Object, 1707 V4 extends @Nullable Object> 1708 extends Combiner { 1709 /** 1710 * A function that returns a value when applied to the values of the four futures passed to 1711 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1712 * 1713 * @param <V1> the type returned by the first future 1714 * @param <V2> the type returned by the second future 1715 * @param <V3> the type returned by the third future 1716 * @param <V4> the type returned by the fourth future 1717 * @param <U> the type returned by the function 1718 */ 1719 public interface ClosingFunction4< 1720 V1 extends @Nullable Object, 1721 V2 extends @Nullable Object, 1722 V3 extends @Nullable Object, 1723 V4 extends @Nullable Object, 1724 U extends @Nullable Object> { 1725 /** 1726 * Applies this function to four inputs, or throws an exception if unable to do so. 1727 * 1728 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1729 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1730 * (but not before this method completes), even if this method throws or the pipeline is 1731 * cancelled. 1732 */ 1733 @ParametricNullness 1734 U apply( 1735 DeferredCloser closer, 1736 @ParametricNullness V1 value1, 1737 @ParametricNullness V2 value2, 1738 @ParametricNullness V3 value3, 1739 @ParametricNullness V4 value4) 1740 throws Exception; 1741 } 1742 1743 /** 1744 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1745 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1746 * ClosingFuture)}. 1747 * 1748 * @param <V1> the type returned by the first future 1749 * @param <V2> the type returned by the second future 1750 * @param <V3> the type returned by the third future 1751 * @param <V4> the type returned by the fourth future 1752 * @param <U> the type returned by the function 1753 */ 1754 public interface AsyncClosingFunction4< 1755 V1 extends @Nullable Object, 1756 V2 extends @Nullable Object, 1757 V3 extends @Nullable Object, 1758 V4 extends @Nullable Object, 1759 U extends @Nullable Object> { 1760 /** 1761 * Applies this function to four inputs, or throws an exception if unable to do so. 1762 * 1763 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1764 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1765 * (but not before this method completes), even if this method throws or the pipeline is 1766 * cancelled. 1767 */ 1768 ClosingFuture<U> apply( 1769 DeferredCloser closer, 1770 @ParametricNullness V1 value1, 1771 @ParametricNullness V2 value2, 1772 @ParametricNullness V3 value3, 1773 @ParametricNullness V4 value4) 1774 throws Exception; 1775 } 1776 1777 private final ClosingFuture<V1> future1; 1778 private final ClosingFuture<V2> future2; 1779 private final ClosingFuture<V3> future3; 1780 private final ClosingFuture<V4> future4; 1781 1782 private Combiner4( 1783 ClosingFuture<V1> future1, 1784 ClosingFuture<V2> future2, 1785 ClosingFuture<V3> future3, 1786 ClosingFuture<V4> future4) { 1787 super(true, ImmutableList.of(future1, future2, future3, future4)); 1788 this.future1 = future1; 1789 this.future2 = future2; 1790 this.future3 = future3; 1791 this.future4 = future4; 1792 } 1793 1794 /** 1795 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1796 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1797 * objects to be closed when the pipeline is done. 1798 * 1799 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1800 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1801 * 1802 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1803 * 1804 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1805 * ExecutionException} will be extracted and used as the failure of the derived step. 1806 */ 1807 public <U extends @Nullable Object> ClosingFuture<U> call( 1808 ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1809 return call( 1810 new CombiningCallable<U>() { 1811 @Override 1812 @ParametricNullness 1813 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1814 return function.apply( 1815 closer, 1816 peeker.getDone(future1), 1817 peeker.getDone(future2), 1818 peeker.getDone(future3), 1819 peeker.getDone(future4)); 1820 } 1821 1822 @Override 1823 public String toString() { 1824 return function.toString(); 1825 } 1826 }, 1827 executor); 1828 } 1829 1830 /** 1831 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1832 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1833 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1834 * captured by the returned {@link ClosingFuture}). 1835 * 1836 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1837 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1838 * 1839 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1840 * 1841 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1842 * ExecutionException} will be extracted and used as the failure of the derived step. 1843 * 1844 * <p>If the function throws any other exception, it will be used as the failure of the derived 1845 * step. 1846 * 1847 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1848 * the closeable objects in that {@code ClosingFuture} will be closed. 1849 * 1850 * <p>Usage guidelines for this method: 1851 * 1852 * <ul> 1853 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1854 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1855 * Executor)} instead, with a function that returns the next value directly. 1856 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 1857 * for every closeable object this step creates in order to capture it for later closing. 1858 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1859 * ClosingFuture} call {@link #from(ListenableFuture)}. 1860 * </ul> 1861 * 1862 * <p>The same warnings about doing heavyweight operations within {@link 1863 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1864 */ 1865 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1866 AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1867 return callAsync( 1868 new AsyncCombiningCallable<U>() { 1869 @Override 1870 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1871 return function.apply( 1872 closer, 1873 peeker.getDone(future1), 1874 peeker.getDone(future2), 1875 peeker.getDone(future3), 1876 peeker.getDone(future4)); 1877 } 1878 1879 @Override 1880 public String toString() { 1881 return function.toString(); 1882 } 1883 }, 1884 executor); 1885 } 1886 } 1887 1888 /** 1889 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1890 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1891 * ClosingFuture, ClosingFuture)} to start this combination. 1892 * 1893 * @param <V1> the type returned by the first future 1894 * @param <V2> the type returned by the second future 1895 * @param <V3> the type returned by the third future 1896 * @param <V4> the type returned by the fourth future 1897 * @param <V5> the type returned by the fifth future 1898 */ 1899 public static final class Combiner5< 1900 V1 extends @Nullable Object, 1901 V2 extends @Nullable Object, 1902 V3 extends @Nullable Object, 1903 V4 extends @Nullable Object, 1904 V5 extends @Nullable Object> 1905 extends Combiner { 1906 /** 1907 * A function that returns a value when applied to the values of the five futures passed to 1908 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1909 * ClosingFuture)}. 1910 * 1911 * @param <V1> the type returned by the first future 1912 * @param <V2> the type returned by the second future 1913 * @param <V3> the type returned by the third future 1914 * @param <V4> the type returned by the fourth future 1915 * @param <V5> the type returned by the fifth future 1916 * @param <U> the type returned by the function 1917 */ 1918 public interface ClosingFunction5< 1919 V1 extends @Nullable Object, 1920 V2 extends @Nullable Object, 1921 V3 extends @Nullable Object, 1922 V4 extends @Nullable Object, 1923 V5 extends @Nullable Object, 1924 U extends @Nullable Object> { 1925 /** 1926 * Applies this function to five inputs, or throws an exception if unable to do so. 1927 * 1928 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1929 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1930 * (but not before this method completes), even if this method throws or the pipeline is 1931 * cancelled. 1932 */ 1933 @ParametricNullness 1934 U apply( 1935 DeferredCloser closer, 1936 @ParametricNullness V1 value1, 1937 @ParametricNullness V2 value2, 1938 @ParametricNullness V3 value3, 1939 @ParametricNullness V4 value4, 1940 @ParametricNullness V5 value5) 1941 throws Exception; 1942 } 1943 1944 /** 1945 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1946 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1947 * ClosingFuture, ClosingFuture)}. 1948 * 1949 * @param <V1> the type returned by the first future 1950 * @param <V2> the type returned by the second future 1951 * @param <V3> the type returned by the third future 1952 * @param <V4> the type returned by the fourth future 1953 * @param <V5> the type returned by the fifth future 1954 * @param <U> the type returned by the function 1955 */ 1956 public interface AsyncClosingFunction5< 1957 V1 extends @Nullable Object, 1958 V2 extends @Nullable Object, 1959 V3 extends @Nullable Object, 1960 V4 extends @Nullable Object, 1961 V5 extends @Nullable Object, 1962 U extends @Nullable Object> { 1963 /** 1964 * Applies this function to five inputs, or throws an exception if unable to do so. 1965 * 1966 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor) 1967 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done 1968 * (but not before this method completes), even if this method throws or the pipeline is 1969 * cancelled. 1970 */ 1971 ClosingFuture<U> apply( 1972 DeferredCloser closer, 1973 @ParametricNullness V1 value1, 1974 @ParametricNullness V2 value2, 1975 @ParametricNullness V3 value3, 1976 @ParametricNullness V4 value4, 1977 @ParametricNullness V5 value5) 1978 throws Exception; 1979 } 1980 1981 private final ClosingFuture<V1> future1; 1982 private final ClosingFuture<V2> future2; 1983 private final ClosingFuture<V3> future3; 1984 private final ClosingFuture<V4> future4; 1985 private final ClosingFuture<V5> future5; 1986 1987 private Combiner5( 1988 ClosingFuture<V1> future1, 1989 ClosingFuture<V2> future2, 1990 ClosingFuture<V3> future3, 1991 ClosingFuture<V4> future4, 1992 ClosingFuture<V5> future5) { 1993 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 1994 this.future1 = future1; 1995 this.future2 = future2; 1996 this.future3 = future3; 1997 this.future4 = future4; 1998 this.future5 = future5; 1999 } 2000 2001 /** 2002 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2003 * combining function to their values. The function can use a {@link DeferredCloser} to capture 2004 * objects to be closed when the pipeline is done. 2005 * 2006 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2007 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2008 * returned step. 2009 * 2010 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2011 * 2012 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2013 * ExecutionException} will be extracted and used as the failure of the derived step. 2014 */ 2015 public <U extends @Nullable Object> ClosingFuture<U> call( 2016 ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2017 return call( 2018 new CombiningCallable<U>() { 2019 @Override 2020 @ParametricNullness 2021 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 2022 return function.apply( 2023 closer, 2024 peeker.getDone(future1), 2025 peeker.getDone(future2), 2026 peeker.getDone(future3), 2027 peeker.getDone(future4), 2028 peeker.getDone(future5)); 2029 } 2030 2031 @Override 2032 public String toString() { 2033 return function.toString(); 2034 } 2035 }, 2036 executor); 2037 } 2038 2039 /** 2040 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 2041 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 2042 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 2043 * captured by the returned {@link ClosingFuture}). 2044 * 2045 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 2046 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 2047 * returned step. 2048 * 2049 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 2050 * 2051 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 2052 * ExecutionException} will be extracted and used as the failure of the derived step. 2053 * 2054 * <p>If the function throws any other exception, it will be used as the failure of the derived 2055 * step. 2056 * 2057 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2058 * the closeable objects in that {@code ClosingFuture} will be closed. 2059 * 2060 * <p>Usage guidelines for this method: 2061 * 2062 * <ul> 2063 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2064 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2065 * Executor)} instead, with a function that returns the next value directly. 2066 * <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()} 2067 * for every closeable object this step creates in order to capture it for later closing. 2068 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2069 * ClosingFuture} call {@link #from(ListenableFuture)}. 2070 * </ul> 2071 * 2072 * <p>The same warnings about doing heavyweight operations within {@link 2073 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2074 */ 2075 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 2076 AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2077 return callAsync( 2078 new AsyncCombiningCallable<U>() { 2079 @Override 2080 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2081 return function.apply( 2082 closer, 2083 peeker.getDone(future1), 2084 peeker.getDone(future2), 2085 peeker.getDone(future3), 2086 peeker.getDone(future4), 2087 peeker.getDone(future5)); 2088 } 2089 2090 @Override 2091 public String toString() { 2092 return function.toString(); 2093 } 2094 }, 2095 executor); 2096 } 2097 } 2098 2099 @Override 2100 public String toString() { 2101 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2102 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2103 } 2104 2105 @SuppressWarnings({"removal", "Finalize"}) // b/260137033 2106 @Override 2107 protected void finalize() { 2108 if (state.get().equals(OPEN)) { 2109 logger.get().log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2110 FluentFuture<V> unused = finishToFuture(); 2111 } 2112 } 2113 2114 private static void closeQuietly(@Nullable AutoCloseable closeable, Executor executor) { 2115 if (closeable == null) { 2116 return; 2117 } 2118 try { 2119 executor.execute( 2120 () -> { 2121 try { 2122 closeable.close(); 2123 } catch (Exception e) { 2124 /* 2125 * In guava-jre, any kind of Exception may be thrown because `closeable` has type 2126 * `AutoCloseable`. 2127 * 2128 * In guava-android, the only kinds of Exception that may be thrown are 2129 * RuntimeException and IOException because `closeable` has type `Closeable`—except 2130 * that we have to account for sneaky checked exception. 2131 */ 2132 restoreInterruptIfIsInterruptedException(e); 2133 logger.get().log(WARNING, "thrown by close()", e); 2134 } 2135 }); 2136 } catch (RejectedExecutionException e) { 2137 if (logger.get().isLoggable(WARNING)) { 2138 logger 2139 .get() 2140 .log( 2141 WARNING, 2142 String.format("while submitting close to %s; will close inline", executor), 2143 e); 2144 } 2145 closeQuietly(closeable, directExecutor()); 2146 } 2147 } 2148 2149 private void checkAndUpdateState(State oldState, State newState) { 2150 checkState( 2151 compareAndUpdateState(oldState, newState), 2152 "Expected state to be %s, but it was %s", 2153 oldState, 2154 newState); 2155 } 2156 2157 private boolean compareAndUpdateState(State oldState, State newState) { 2158 return state.compareAndSet(oldState, newState); 2159 } 2160 2161 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2162 private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor> 2163 implements Closeable { 2164 private final DeferredCloser closer = new DeferredCloser(this); 2165 private volatile boolean closed; 2166 private volatile @Nullable CountDownLatch whenClosed; 2167 2168 <V extends @Nullable Object, U extends @Nullable Object> 2169 ListenableFuture<U> applyClosingFunction( 2170 ClosingFunction<? super V, U> transformation, @ParametricNullness V input) 2171 throws Exception { 2172 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2173 CloseableList newCloseables = new CloseableList(); 2174 try { 2175 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2176 } finally { 2177 add(newCloseables, directExecutor()); 2178 } 2179 } 2180 2181 <V extends @Nullable Object, U extends @Nullable Object> 2182 FluentFuture<U> applyAsyncClosingFunction( 2183 AsyncClosingFunction<V, U> transformation, @ParametricNullness V input) 2184 throws Exception { 2185 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2186 CloseableList newCloseables = new CloseableList(); 2187 try { 2188 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2189 closingFuture.becomeSubsumedInto(newCloseables); 2190 return closingFuture.future; 2191 } finally { 2192 add(newCloseables, directExecutor()); 2193 } 2194 } 2195 2196 @Override 2197 public void close() { 2198 if (closed) { 2199 return; 2200 } 2201 synchronized (this) { 2202 if (closed) { 2203 return; 2204 } 2205 closed = true; 2206 } 2207 for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) { 2208 closeQuietly(entry.getKey(), entry.getValue()); 2209 } 2210 clear(); 2211 if (whenClosed != null) { 2212 whenClosed.countDown(); 2213 } 2214 } 2215 2216 void add(@Nullable AutoCloseable closeable, Executor executor) { 2217 checkNotNull(executor); 2218 if (closeable == null) { 2219 return; 2220 } 2221 synchronized (this) { 2222 if (!closed) { 2223 put(closeable, executor); 2224 return; 2225 } 2226 } 2227 closeQuietly(closeable, executor); 2228 } 2229 2230 /** 2231 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2232 */ 2233 CountDownLatch whenClosedCountDown() { 2234 if (closed) { 2235 return new CountDownLatch(0); 2236 } 2237 synchronized (this) { 2238 if (closed) { 2239 return new CountDownLatch(0); 2240 } 2241 checkState(whenClosed == null); 2242 return whenClosed = new CountDownLatch(1); 2243 } 2244 } 2245 } 2246 2247 /** 2248 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2249 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2250 */ 2251 @VisibleForTesting 2252 CountDownLatch whenClosedCountDown() { 2253 return closeables.whenClosedCountDown(); 2254 } 2255 2256 /** The state of a {@link CloseableList}. */ 2257 enum State { 2258 /** The {@link CloseableList} has not been subsumed or closed. */ 2259 OPEN, 2260 2261 /** 2262 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2263 * into any other. 2264 */ 2265 SUBSUMED, 2266 2267 /** 2268 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2269 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2270 */ 2271 WILL_CLOSE, 2272 2273 /** 2274 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2275 * {@link CloseableList} may not be subsumed. 2276 */ 2277 CLOSING, 2278 2279 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2280 CLOSED, 2281 2282 /** 2283 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2284 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2285 */ 2286 WILL_CREATE_VALUE_AND_CLOSER, 2287 } 2288}