001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035import static java.util.logging.Level.FINER; 036import static java.util.logging.Level.SEVERE; 037import static java.util.logging.Level.WARNING; 038 039import com.google.common.annotations.Beta; 040import com.google.common.annotations.VisibleForTesting; 041import com.google.common.base.Function; 042import com.google.common.collect.FluentIterable; 043import com.google.common.collect.ImmutableList; 044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 046import com.google.common.util.concurrent.Futures.FutureCombiner; 047import com.google.errorprone.annotations.CanIgnoreReturnValue; 048import com.google.errorprone.annotations.DoNotMock; 049import com.google.j2objc.annotations.RetainedWith; 050import java.io.Closeable; 051import java.io.IOException; 052import java.util.IdentityHashMap; 053import java.util.Map; 054import java.util.concurrent.Callable; 055import java.util.concurrent.CancellationException; 056import java.util.concurrent.CountDownLatch; 057import java.util.concurrent.ExecutionException; 058import java.util.concurrent.Executor; 059import java.util.concurrent.Future; 060import java.util.concurrent.RejectedExecutionException; 061import java.util.concurrent.atomic.AtomicReference; 062import java.util.logging.Logger; 063import org.checkerframework.checker.nullness.compatqual.NullableDecl; 064 065/** 066 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 067 * complete, some objects captured during the computation are closed. 068 * 069 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 070 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 071 * cancellation of the operation so far. The only way to extract the value or exception from a step 072 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 073 * "value" of a successful step or the "result" (value or exception) of any step. 074 * 075 * <ol> 076 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 077 * block or a {@link ListenableFuture}. 078 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 079 * can be captured for later closing. 080 * <li>There is one last step (the root of the tree), from which you can extract the final result 081 * of the computation. After that result is available (or the computation fails), all objects 082 * captured by any of the steps in the pipeline are closed. 083 * </ol> 084 * 085 * <h3>Starting a pipeline</h3> 086 * 087 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 088 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 089 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 090 * #from(ListenableFuture)} instead. 091 * 092 * <h3>Derived steps</h3> 093 * 094 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 095 * ways similar to {@link FluentFuture}s: 096 * 097 * <ul> 098 * <li>by transforming the value from a successful input step, 099 * <li>by catching the exception from a failed input step, or 100 * <li>by combining the results of several input steps. 101 * </ul> 102 * 103 * Each derivation can capture the next value or any intermediate objects for later closing. 104 * 105 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 106 * exception, or combine it with others, you cannot do anything else with it, including declare it 107 * to be the last step of the pipeline. 108 * 109 * <h4>Transforming</h4> 110 * 111 * To derive the next step by asynchronously applying a function to an input step's value, call 112 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 113 * Executor)} on the input step. 114 * 115 * <h4>Catching</h4> 116 * 117 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 118 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 119 * 120 * <h4>Combining</h4> 121 * 122 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 123 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 124 * 125 * <h3>Cancelling</h3> 126 * 127 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 128 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 129 * successfully cancelled step will immediately start closing all objects captured for later closing 130 * by it and by its input steps. 131 * 132 * <h3>Ending a pipeline</h3> 133 * 134 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 135 * close the captured objects automatically or manually. 136 * 137 * <h4>Automatically closing</h4> 138 * 139 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 140 * calling {@link #finishToFuture()}. When that final {@link Future} is done, all objects captured 141 * by all steps in the pipeline will be closed. 142 * 143 * <pre>{@code 144 * FluentFuture<UserName> userName = 145 * ClosingFuture.submit( 146 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 147 * executor) 148 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 149 * .transform((closer, result) -> result.get("userName"), directExecutor()) 150 * .catching(DBException.class, e -> "no user", directExecutor()) 151 * .finishToFuture(); 152 * }</pre> 153 * 154 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 155 * result cursor will both be closed, even if the operation is cancelled or fails. 156 * 157 * <h4>Manually closing</h4> 158 * 159 * If you want to close the captured objects manually, after you've used the final result, call 160 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 161 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 162 * 163 * <pre>{@code 164 * ClosingFuture.submit( 165 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 166 * executor) 167 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 168 * .transform((closer, result) -> result.get("userName"), directExecutor()) 169 * .catching(DBException.class, e -> "no user", directExecutor()) 170 * .finishToValueAndCloser( 171 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 172 * 173 * // later 174 * try { // get() will throw if the operation failed or was cancelled. 175 * UserName userName = userNameValueAndCloser.get(); 176 * // do something with userName 177 * } finally { 178 * userNameValueAndCloser.closeAsync(); 179 * } 180 * }</pre> 181 * 182 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 183 * the query result cursor will both be closed, even if the operation is cancelled or fails. 184 * 185 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 186 * automatic-closing approach described above is safer. 187 * 188 * @param <V> the type of the value of this step 189 * @since 30.0 190 */ 191// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 192@Beta // @Beta for one release. 193@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 194// TODO(dpb): GWT compatibility. 195public final class ClosingFuture<V> { 196 197 private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName()); 198 199 /** 200 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 201 * done. 202 */ 203 public static final class DeferredCloser { 204 @RetainedWith private final CloseableList list; 205 206 DeferredCloser(CloseableList list) { 207 this.list = list; 208 } 209 210 /** 211 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 212 * 213 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 214 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 215 * Closeable}. (For more about the flavors, see <a 216 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 217 * build</a>.) 218 * 219 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 220 * building for Android): Ensure that any object you pass implements the interface not just in 221 * your current SDK version but also at the oldest version you support. For example, <a 222 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 223 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 224 * {@code Closeable} with a method reference like {@code cursor::close}. 225 * 226 * <p>Note that this method is still binary-compatible between flavors because the erasure of 227 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 228 * 229 * @param closeable the object to be closed (see notes above) 230 * @param closingExecutor the object will be closed on this executor 231 * @return the first argument 232 */ 233 @CanIgnoreReturnValue 234 @NullableDecl 235 // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19. 236 public <C extends Object & Closeable> C eventuallyClose( 237 @NullableDecl C closeable, Executor closingExecutor) { 238 checkNotNull(closingExecutor); 239 if (closeable != null) { 240 list.add(closeable, closingExecutor); 241 } 242 return closeable; 243 } 244 } 245 246 /** 247 * An operation that computes a result. 248 * 249 * @param <V> the type of the result 250 */ 251 public interface ClosingCallable<V extends Object> { 252 /** 253 * Computes a result, or throws an exception if unable to do so. 254 * 255 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 256 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 257 * not before this method completes), even if this method throws or the pipeline is cancelled. 258 */ 259 @NullableDecl 260 V call(DeferredCloser closer) throws Exception; 261 } 262 263 /** 264 * A function from an input to a result. 265 * 266 * @param <T> the type of the input to the function 267 * @param <U> the type of the result of the function 268 */ 269 public interface ClosingFunction<T extends Object, U extends Object> { 270 271 /** 272 * Applies this function to an input, or throws an exception if unable to do so. 273 * 274 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 275 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 276 * not before this method completes), even if this method throws or the pipeline is cancelled. 277 */ 278 @NullableDecl 279 U apply(DeferredCloser closer, @NullableDecl T input) throws Exception; 280 } 281 282 /** 283 * A function from an input to a {@link ClosingFuture} of a result. 284 * 285 * @param <T> the type of the input to the function 286 * @param <U> the type of the result of the function 287 */ 288 public interface AsyncClosingFunction<T extends Object, U extends Object> { 289 /** 290 * Applies this function to an input, or throws an exception if unable to do so. 291 * 292 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 293 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 294 * not before this method completes), even if this method throws or the pipeline is cancelled. 295 */ 296 ClosingFuture<U> apply(DeferredCloser closer, @NullableDecl T input) throws Exception; 297 } 298 299 /** 300 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 301 * allows the user to close all the closeable objects that were captured during it for later 302 * closing. 303 * 304 * <p>The asynchronous operation will have completed before this object is created. 305 * 306 * @param <V> the type of the value of a successful operation 307 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 308 */ 309 public static final class ValueAndCloser<V> { 310 311 private final ClosingFuture<? extends V> closingFuture; 312 313 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 314 this.closingFuture = checkNotNull(closingFuture); 315 } 316 317 /** 318 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 319 * {@link Future#get()} would. 320 * 321 * <p>Because the asynchronous operation has already completed, this method is synchronous and 322 * returns immediately. 323 * 324 * @throws CancellationException if the computation was cancelled 325 * @throws ExecutionException if the computation threw an exception 326 */ 327 @NullableDecl 328 public V get() throws ExecutionException { 329 return getDone(closingFuture.future); 330 } 331 332 /** 333 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 334 * operation on the {@link Executor}s specified by calls to {@link 335 * DeferredCloser#eventuallyClose(Closeable, Executor)}. 336 * 337 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 338 * closed synchronously. 339 * 340 * <p>Idempotent: objects will be closed at most once. 341 */ 342 public void closeAsync() { 343 closingFuture.close(); 344 } 345 } 346 347 /** 348 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 349 * ClosingFuture} pipeline. 350 * 351 * @param <V> the type of the final value of a successful pipeline 352 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 353 */ 354 public interface ValueAndCloserConsumer<V> { 355 356 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 357 void accept(ValueAndCloser<V> valueAndCloser); 358 } 359 360 /** 361 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 362 * 363 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 364 * execution 365 */ 366 public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) { 367 return new ClosingFuture<>(callable, executor); 368 } 369 370 // TODO(dpb, cpovirk): Do we need submitAsync? 371 372 /** 373 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 374 * 375 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 376 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 377 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 378 */ 379 public static <V> ClosingFuture<V> from(ListenableFuture<V> future) { 380 return new ClosingFuture<V>(future); 381 } 382 383 /** 384 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 385 * 386 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when 387 * the pipeline is done, even if the pipeline is canceled or fails. 388 * 389 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 390 * value in order to close it. 391 * 392 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 393 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable, 394 * Executor)}. 395 * @param closingExecutor the future's result will be closed on this executor 396 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 397 * underlying value may never be closed if the {@link Future} is canceled after its operation 398 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 399 * including those that pass them to this method, with {@link #submit(ClosingCallable, 400 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 401 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 402 * ClosingFuture#from}. 403 */ 404 @Deprecated 405 // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19. 406 public static <C extends Object & Closeable> ClosingFuture<C> eventuallyClosing( 407 ListenableFuture<C> future, final Executor closingExecutor) { 408 checkNotNull(closingExecutor); 409 final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 410 Futures.addCallback( 411 future, 412 new FutureCallback<Closeable>() { 413 @Override 414 public void onSuccess(@NullableDecl Closeable result) { 415 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 416 } 417 418 @Override 419 public void onFailure(Throwable t) {} 420 }, 421 directExecutor()); 422 return closingFuture; 423 } 424 425 /** 426 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 427 * 428 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 429 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 430 */ 431 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 432 return new Combiner(false, futures); 433 } 434 435 /** 436 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 437 * 438 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 439 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 440 */ 441 public static Combiner whenAllComplete( 442 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 443 return whenAllComplete(asList(future1, moreFutures)); 444 } 445 446 /** 447 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 448 * all succeed. If any fail, the resulting pipeline will fail. 449 * 450 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 451 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 452 */ 453 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 454 return new Combiner(true, futures); 455 } 456 457 /** 458 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 459 * they all succeed. If any fail, the resulting pipeline will fail. 460 * 461 * <p>Calling this method allows you to use lambdas or method references typed with the types of 462 * the input {@link ClosingFuture}s. 463 * 464 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 465 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 466 */ 467 public static <V1, V2> Combiner2<V1, V2> whenAllSucceed( 468 ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 469 return new Combiner2<>(future1, future2); 470 } 471 472 /** 473 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 474 * they all succeed. If any fail, the resulting pipeline will fail. 475 * 476 * <p>Calling this method allows you to use lambdas or method references typed with the types of 477 * the input {@link ClosingFuture}s. 478 * 479 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 480 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 481 */ 482 public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed( 483 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 484 return new Combiner3<>(future1, future2, future3); 485 } 486 487 /** 488 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 489 * they all succeed. If any fail, the resulting pipeline will fail. 490 * 491 * <p>Calling this method allows you to use lambdas or method references typed with the types of 492 * the input {@link ClosingFuture}s. 493 * 494 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 495 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 496 */ 497 public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed( 498 ClosingFuture<V1> future1, 499 ClosingFuture<V2> future2, 500 ClosingFuture<V3> future3, 501 ClosingFuture<V4> future4) { 502 return new Combiner4<>(future1, future2, future3, future4); 503 } 504 505 /** 506 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 507 * they all succeed. If any fail, the resulting pipeline will fail. 508 * 509 * <p>Calling this method allows you to use lambdas or method references typed with the types of 510 * the input {@link ClosingFuture}s. 511 * 512 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 513 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 514 */ 515 public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 516 ClosingFuture<V1> future1, 517 ClosingFuture<V2> future2, 518 ClosingFuture<V3> future3, 519 ClosingFuture<V4> future4, 520 ClosingFuture<V5> future5) { 521 return new Combiner5<>(future1, future2, future3, future4, future5); 522 } 523 524 /** 525 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 526 * all succeed. If any fail, the resulting pipeline will fail. 527 * 528 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 529 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 530 */ 531 public static Combiner whenAllSucceed( 532 ClosingFuture<?> future1, 533 ClosingFuture<?> future2, 534 ClosingFuture<?> future3, 535 ClosingFuture<?> future4, 536 ClosingFuture<?> future5, 537 ClosingFuture<?> future6, 538 ClosingFuture<?>... moreFutures) { 539 return whenAllSucceed( 540 FluentIterable.of(future1, future2, future3, future4, future5, future6) 541 .append(moreFutures)); 542 } 543 544 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 545 private final CloseableList closeables = new CloseableList(); 546 private final FluentFuture<V> future; 547 548 private ClosingFuture(ListenableFuture<V> future) { 549 this.future = FluentFuture.from(future); 550 } 551 552 private ClosingFuture(final ClosingCallable<V> callable, Executor executor) { 553 checkNotNull(callable); 554 TrustedListenableFutureTask<V> task = 555 TrustedListenableFutureTask.create( 556 new Callable<V>() { 557 @Override 558 public V call() throws Exception { 559 return callable.call(closeables.closer); 560 } 561 562 @Override 563 public String toString() { 564 return callable.toString(); 565 } 566 }); 567 executor.execute(task); 568 this.future = task; 569 } 570 571 /** 572 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 573 * future returns {@code null} if the step is successful or throws the same exception that would 574 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 575 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 576 * 577 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 578 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 579 * a derivation method <i>on the same instance</i>. This is important because calling {@code 580 * statusFuture} alone does not provide a way to close the pipeline. 581 */ 582 public ListenableFuture<?> statusFuture() { 583 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 584 } 585 586 /** 587 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 588 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 589 * when the pipeline is done. 590 * 591 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 592 * ClosingFuture} will be equivalent to this one. 593 * 594 * <p>If the function throws an exception, that exception is used as the result of the derived 595 * {@code ClosingFuture}. 596 * 597 * <p>Example usage: 598 * 599 * <pre>{@code 600 * ClosingFuture<List<Row>> rowsFuture = 601 * queryFuture.transform((closer, result) -> result.getRows(), executor); 602 * }</pre> 603 * 604 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 605 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 606 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 607 * 608 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 609 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 610 * this {@code ClosingFuture}. 611 * 612 * @param function transforms the value of this step to the value of the derived step 613 * @param executor executor to run the function in 614 * @return the derived step 615 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 616 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 617 * finished} 618 */ 619 public <U> ClosingFuture<U> transform( 620 final ClosingFunction<? super V, U> function, Executor executor) { 621 checkNotNull(function); 622 AsyncFunction<V, U> applyFunction = 623 new AsyncFunction<V, U>() { 624 @Override 625 public ListenableFuture<U> apply(V input) throws Exception { 626 return closeables.applyClosingFunction(function, input); 627 } 628 629 @Override 630 public String toString() { 631 return function.toString(); 632 } 633 }; 634 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 635 return derive(future.transformAsync(applyFunction, executor)); 636 } 637 638 /** 639 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 640 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 641 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 642 * captured by the returned {@link ClosingFuture}). 643 * 644 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 645 * returned by the function. 646 * 647 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 648 * ClosingFuture} will be equivalent to this one. 649 * 650 * <p>If the function throws an exception, that exception is used as the result of the derived 651 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 652 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 653 * closed. 654 * 655 * <p>Usage guidelines for this method: 656 * 657 * <ul> 658 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 659 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 660 * Executor)} instead, with a function that returns the next value directly. 661 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 662 * for every closeable object this step creates in order to capture it for later closing. 663 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 664 * ClosingFuture} call {@link #from(ListenableFuture)}. 665 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 666 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 667 * {@link #withoutCloser(AsyncFunction)} 668 * </ul> 669 * 670 * <p>Example usage: 671 * 672 * <pre>{@code 673 * // Result.getRowsClosingFuture() returns a ClosingFuture. 674 * ClosingFuture<List<Row>> rowsFuture = 675 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 676 * 677 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 678 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 679 * // Closeable). 680 * ClosingFuture<Integer> rowsFuture2 = 681 * queryFuture.transformAsync( 682 * (closer, result) -> { 683 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 684 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 685 * }, 686 * executor); 687 * 688 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 689 * ClosingFuture<List<Row>> rowsFuture3 = 690 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 691 * 692 * }</pre> 693 * 694 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 695 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 696 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 697 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 698 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 699 * responsible for completing the returned {@code ClosingFuture}.) 700 * 701 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 702 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 703 * this {@code ClosingFuture}. 704 * 705 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 706 * the derived step 707 * @param executor executor to run the function in 708 * @return the derived step 709 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 710 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 711 * finished} 712 */ 713 public <U> ClosingFuture<U> transformAsync( 714 final AsyncClosingFunction<? super V, U> function, Executor executor) { 715 checkNotNull(function); 716 AsyncFunction<V, U> applyFunction = 717 new AsyncFunction<V, U>() { 718 @Override 719 public ListenableFuture<U> apply(V input) throws Exception { 720 return closeables.applyAsyncClosingFunction(function, input); 721 } 722 723 @Override 724 public String toString() { 725 return function.toString(); 726 } 727 }; 728 return derive(future.transformAsync(applyFunction, executor)); 729 } 730 731 /** 732 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 733 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 734 * {@link ListenableFuture}. 735 * 736 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 737 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 738 * meets these conditions: 739 * 740 * <ul> 741 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 742 * DeferredCloser#eventuallyClose(Closeable, Executor)}. 743 * <li>It returns a {@link ListenableFuture}. 744 * </ul> 745 * 746 * <p>Example usage: 747 * 748 * <pre>{@code 749 * // Result.getRowsFuture() returns a ListenableFuture. 750 * ClosingFuture<List<Row>> rowsFuture = 751 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 752 * }</pre> 753 * 754 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 755 * ListenableFuture} with the value of a derived step 756 */ 757 public static <V, U> AsyncClosingFunction<V, U> withoutCloser( 758 final AsyncFunction<V, U> function) { 759 checkNotNull(function); 760 return new AsyncClosingFunction<V, U>() { 761 @Override 762 public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 763 return ClosingFuture.from(function.apply(input)); 764 } 765 }; 766 } 767 768 /** 769 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 770 * to its exception if it is an instance of a given exception type. The function can use a {@link 771 * DeferredCloser} to capture objects to be closed when the pipeline is done. 772 * 773 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 774 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 775 * one. 776 * 777 * <p>If the function throws an exception, that exception is used as the result of the derived 778 * {@code ClosingFuture}. 779 * 780 * <p>Example usage: 781 * 782 * <pre>{@code 783 * ClosingFuture<QueryResult> queryFuture = 784 * queryFuture.catching( 785 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 786 * }</pre> 787 * 788 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 789 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 790 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 791 * 792 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 793 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 794 * this {@code ClosingFuture}. 795 * 796 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 797 * type is matched against this step's exception. "This step's exception" means the cause of 798 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 799 * underlying this step or, if {@code get()} throws a different kind of exception, that 800 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 801 * prefer more specific types, avoiding {@code Throwable.class} in particular. 802 * @param fallback the function to be called if this step fails with the expected exception type. 803 * The function's argument is this step's exception. "This step's exception" means the cause 804 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 805 * underlying this step or, if {@code get()} throws a different kind of exception, that 806 * exception itself. 807 * @param executor the executor that runs {@code fallback} if the input fails 808 */ 809 public <X extends Throwable> ClosingFuture<V> catching( 810 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 811 return catchingMoreGeneric(exceptionType, fallback, executor); 812 } 813 814 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 815 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 816 Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 817 checkNotNull(fallback); 818 AsyncFunction<X, W> applyFallback = 819 new AsyncFunction<X, W>() { 820 @Override 821 public ListenableFuture<W> apply(X exception) throws Exception { 822 return closeables.applyClosingFunction(fallback, exception); 823 } 824 825 @Override 826 public String toString() { 827 return fallback.toString(); 828 } 829 }; 830 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 831 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 832 } 833 834 /** 835 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 836 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 837 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 838 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 839 * 840 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 841 * ClosingFuture} will be equivalent to the one returned by the function. 842 * 843 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 844 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 845 * one. 846 * 847 * <p>If the function throws an exception, that exception is used as the result of the derived 848 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 849 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 850 * closed. 851 * 852 * <p>Usage guidelines for this method: 853 * 854 * <ul> 855 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 856 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 857 * ClosingFunction, Executor)} instead, with a function that returns the next value 858 * directly. 859 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 860 * for every closeable object this step creates in order to capture it for later closing. 861 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 862 * ClosingFuture} call {@link #from(ListenableFuture)}. 863 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 864 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 865 * {@link #withoutCloser(AsyncFunction)} 866 * </ul> 867 * 868 * <p>Example usage: 869 * 870 * <pre>{@code 871 * // Fall back to a secondary input stream in case of IOException. 872 * ClosingFuture<InputStream> inputFuture = 873 * firstInputFuture.catchingAsync( 874 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 875 * } 876 * }</pre> 877 * 878 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 879 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 880 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 881 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 882 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 883 * responsible for completing the returned {@code ClosingFuture}.) 884 * 885 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 886 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 887 * this {@code ClosingFuture}. 888 * 889 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 890 * type is matched against this step's exception. "This step's exception" means the cause of 891 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 892 * underlying this step or, if {@code get()} throws a different kind of exception, that 893 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 894 * prefer more specific types, avoiding {@code Throwable.class} in particular. 895 * @param fallback the function to be called if this step fails with the expected exception type. 896 * The function's argument is this step's exception. "This step's exception" means the cause 897 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 898 * underlying this step or, if {@code get()} throws a different kind of exception, that 899 * exception itself. 900 * @param executor the executor that runs {@code fallback} if the input fails 901 */ 902 // TODO(dpb): Should this do something special if the function throws CancellationException or 903 // ExecutionException? 904 public <X extends Throwable> ClosingFuture<V> catchingAsync( 905 Class<X> exceptionType, 906 AsyncClosingFunction<? super X, ? extends V> fallback, 907 Executor executor) { 908 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 909 } 910 911 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 912 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 913 Class<X> exceptionType, 914 final AsyncClosingFunction<? super X, W> fallback, 915 Executor executor) { 916 checkNotNull(fallback); 917 AsyncFunction<X, W> asyncFunction = 918 new AsyncFunction<X, W>() { 919 @Override 920 public ListenableFuture<W> apply(X exception) throws Exception { 921 return closeables.applyAsyncClosingFunction(fallback, exception); 922 } 923 924 @Override 925 public String toString() { 926 return fallback.toString(); 927 } 928 }; 929 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 930 } 931 932 /** 933 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When the returned 934 * {@link Future} is done, all objects captured for closing during the pipeline's computation will 935 * be closed. 936 * 937 * <p>After calling this method, you may not call {@link 938 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 939 * derivation method on this {@code ClosingFuture}. 940 * 941 * @return a {@link Future} that represents the final value or exception of the pipeline 942 */ 943 public FluentFuture<V> finishToFuture() { 944 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 945 logger.log(FINER, "will close {0}", this); 946 future.addListener( 947 new Runnable() { 948 @Override 949 public void run() { 950 checkAndUpdateState(WILL_CLOSE, CLOSING); 951 close(); 952 checkAndUpdateState(CLOSING, CLOSED); 953 } 954 }, 955 directExecutor()); 956 } else { 957 switch (state.get()) { 958 case SUBSUMED: 959 throw new IllegalStateException( 960 "Cannot call finishToFuture() after deriving another step"); 961 962 case WILL_CREATE_VALUE_AND_CLOSER: 963 throw new IllegalStateException( 964 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 965 966 case WILL_CLOSE: 967 case CLOSING: 968 case CLOSED: 969 throw new IllegalStateException("Cannot call finishToFuture() twice"); 970 971 case OPEN: 972 throw new AssertionError(); 973 } 974 } 975 return future; 976 } 977 978 /** 979 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 980 * {@code receiver} will be called with an object that contains the result of the operation. The 981 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 982 * 983 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 984 * any other derivation method on this {@code ClosingFuture}. 985 * 986 * @param consumer a callback whose method will be called (using {@code executor}) when this 987 * operation is done 988 */ 989 public void finishToValueAndCloser( 990 final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 991 checkNotNull(consumer); 992 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 993 switch (state.get()) { 994 case SUBSUMED: 995 throw new IllegalStateException( 996 "Cannot call finishToValueAndCloser() after deriving another step"); 997 998 case WILL_CLOSE: 999 case CLOSING: 1000 case CLOSED: 1001 throw new IllegalStateException( 1002 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1003 1004 case WILL_CREATE_VALUE_AND_CLOSER: 1005 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1006 1007 case OPEN: 1008 break; 1009 } 1010 throw new AssertionError(state); 1011 } 1012 future.addListener( 1013 new Runnable() { 1014 @Override 1015 public void run() { 1016 provideValueAndCloser(consumer, ClosingFuture.this); 1017 } 1018 }, 1019 executor); 1020 } 1021 1022 private static <C, V extends C> void provideValueAndCloser( 1023 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1024 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1025 } 1026 1027 /** 1028 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1029 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1030 * successful, and this step has not started when {@code cancel} is called, this step should never 1031 * run. 1032 * 1033 * <p>If successful, causes the objects captured by this step (if already started) and its input 1034 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1035 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1036 * 1037 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1038 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1039 * cancelled regardless 1040 * @return {@code false} if the step could not be cancelled, typically because it has already 1041 * completed normally; {@code true} otherwise 1042 */ 1043 @CanIgnoreReturnValue 1044 public boolean cancel(boolean mayInterruptIfRunning) { 1045 logger.log(FINER, "cancelling {0}", this); 1046 boolean cancelled = future.cancel(mayInterruptIfRunning); 1047 if (cancelled) { 1048 close(); 1049 } 1050 return cancelled; 1051 } 1052 1053 private void close() { 1054 logger.log(FINER, "closing {0}", this); 1055 closeables.close(); 1056 } 1057 1058 private <U> ClosingFuture<U> derive(FluentFuture<U> future) { 1059 ClosingFuture<U> derived = new ClosingFuture<>(future); 1060 becomeSubsumedInto(derived.closeables); 1061 return derived; 1062 } 1063 1064 private void becomeSubsumedInto(CloseableList otherCloseables) { 1065 checkAndUpdateState(OPEN, SUBSUMED); 1066 otherCloseables.add(closeables, directExecutor()); 1067 } 1068 1069 /** 1070 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1071 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1072 * 1073 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1074 */ 1075 public static final class Peeker { 1076 private final ImmutableList<ClosingFuture<?>> futures; 1077 private volatile boolean beingCalled; 1078 1079 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1080 this.futures = checkNotNull(futures); 1081 } 1082 1083 /** 1084 * Returns the value of {@code closingFuture}. 1085 * 1086 * @throws ExecutionException if {@code closingFuture} is a failed step 1087 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1088 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1089 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1090 * @throws IllegalStateException if called outside of a call to {@link 1091 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1092 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1093 */ 1094 @NullableDecl 1095 public final <D extends Object> D getDone(ClosingFuture<D> closingFuture) 1096 throws ExecutionException { 1097 checkState(beingCalled); 1098 checkArgument(futures.contains(closingFuture)); 1099 return Futures.getDone(closingFuture.future); 1100 } 1101 1102 @NullableDecl 1103 private <V extends Object> V call(CombiningCallable<V> combiner, CloseableList closeables) 1104 throws Exception { 1105 beingCalled = true; 1106 CloseableList newCloseables = new CloseableList(); 1107 try { 1108 return combiner.call(newCloseables.closer, this); 1109 } finally { 1110 closeables.add(newCloseables, directExecutor()); 1111 beingCalled = false; 1112 } 1113 } 1114 1115 private <V extends Object> FluentFuture<V> callAsync( 1116 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1117 beingCalled = true; 1118 CloseableList newCloseables = new CloseableList(); 1119 try { 1120 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1121 closingFuture.becomeSubsumedInto(closeables); 1122 return closingFuture.future; 1123 } finally { 1124 closeables.add(newCloseables, directExecutor()); 1125 beingCalled = false; 1126 } 1127 } 1128 } 1129 1130 /** 1131 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1132 * 1133 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1134 * instantiate this class. 1135 * 1136 * <p>Example: 1137 * 1138 * <pre>{@code 1139 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1140 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1141 * ListenableFuture<Integer> numberOfDifferentLines = 1142 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1143 * .call( 1144 * (closer, peeker) -> { 1145 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1146 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1147 * return countDifferentLines(file1Reader, file2Reader); 1148 * }, 1149 * executor) 1150 * .closing(executor); 1151 * }</pre> 1152 */ 1153 // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8. 1154 @com.google.errorprone.annotations.DoNotMock( 1155 "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1156 public static class Combiner { 1157 1158 private final CloseableList closeables = new CloseableList(); 1159 1160 /** 1161 * An operation that returns a result and may throw an exception. 1162 * 1163 * @param <V> the type of the result 1164 */ 1165 public interface CombiningCallable<V extends Object> { 1166 /** 1167 * Computes a result, or throws an exception if unable to do so. 1168 * 1169 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1170 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1171 * is done (but not before this method completes), even if this method throws or the pipeline 1172 * is cancelled. 1173 * 1174 * @param peeker used to get the value of any of the input futures 1175 */ 1176 @NullableDecl 1177 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1178 } 1179 1180 /** 1181 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1182 * 1183 * @param <V> the type of the result 1184 */ 1185 public interface AsyncCombiningCallable<V extends Object> { 1186 /** 1187 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1188 * 1189 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1190 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1191 * is done (but not before this method completes), even if this method throws or the pipeline 1192 * is cancelled. 1193 * 1194 * @param peeker used to get the value of any of the input futures 1195 */ 1196 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1197 } 1198 1199 private final boolean allMustSucceed; 1200 protected final ImmutableList<ClosingFuture<?>> inputs; 1201 1202 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1203 this.allMustSucceed = allMustSucceed; 1204 this.inputs = ImmutableList.copyOf(inputs); 1205 for (ClosingFuture<?> input : inputs) { 1206 input.becomeSubsumedInto(closeables); 1207 } 1208 } 1209 1210 /** 1211 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1212 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1213 * objects to be closed when the pipeline is done. 1214 * 1215 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1216 * fail, so will the returned step. 1217 * 1218 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1219 * cancelled. 1220 * 1221 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1222 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1223 */ 1224 public <V> ClosingFuture<V> call( 1225 final CombiningCallable<V> combiningCallable, Executor executor) { 1226 Callable<V> callable = 1227 new Callable<V>() { 1228 @Override 1229 public V call() throws Exception { 1230 return new Peeker(inputs).call(combiningCallable, closeables); 1231 } 1232 1233 @Override 1234 public String toString() { 1235 return combiningCallable.toString(); 1236 } 1237 }; 1238 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1239 derived.closeables.add(closeables, directExecutor()); 1240 return derived; 1241 } 1242 1243 /** 1244 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1245 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1246 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1247 * captured by the returned {@link ClosingFuture}). 1248 * 1249 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1250 * fail, so will the returned step. 1251 * 1252 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1253 * cancelled. 1254 * 1255 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1256 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1257 * 1258 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1259 * derived step. 1260 * 1261 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1262 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1263 * 1264 * <p>Usage guidelines for this method: 1265 * 1266 * <ul> 1267 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1268 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1269 * Executor)} instead, with a function that returns the next value directly. 1270 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1271 * closer.eventuallyClose()} for every closeable object this step creates in order to 1272 * capture it for later closing. 1273 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1274 * ClosingFuture} call {@link #from(ListenableFuture)}. 1275 * </ul> 1276 * 1277 * <p>The same warnings about doing heavyweight operations within {@link 1278 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1279 */ 1280 public <V> ClosingFuture<V> callAsync( 1281 final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1282 AsyncCallable<V> asyncCallable = 1283 new AsyncCallable<V>() { 1284 @Override 1285 public ListenableFuture<V> call() throws Exception { 1286 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1287 } 1288 1289 @Override 1290 public String toString() { 1291 return combiningCallable.toString(); 1292 } 1293 }; 1294 ClosingFuture<V> derived = 1295 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1296 derived.closeables.add(closeables, directExecutor()); 1297 return derived; 1298 } 1299 1300 private FutureCombiner<Object> futureCombiner() { 1301 return allMustSucceed 1302 ? Futures.whenAllSucceed(inputFutures()) 1303 : Futures.whenAllComplete(inputFutures()); 1304 } 1305 1306 private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE = 1307 new Function<ClosingFuture<?>, FluentFuture<?>>() { 1308 @Override 1309 public FluentFuture<?> apply(ClosingFuture<?> future) { 1310 return future.future; 1311 } 1312 }; 1313 1314 private ImmutableList<FluentFuture<?>> inputFutures() { 1315 return FluentIterable.from(inputs).transform(INNER_FUTURE).toList(); 1316 } 1317 } 1318 1319 /** 1320 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1321 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1322 * combination. 1323 * 1324 * @param <V1> the type returned by the first future 1325 * @param <V2> the type returned by the second future 1326 */ 1327 public static final class Combiner2<V1 extends Object, V2 extends Object> extends Combiner { 1328 1329 /** 1330 * A function that returns a value when applied to the values of the two futures passed to 1331 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1332 * 1333 * @param <V1> the type returned by the first future 1334 * @param <V2> the type returned by the second future 1335 * @param <U> the type returned by the function 1336 */ 1337 public interface ClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> { 1338 1339 /** 1340 * Applies this function to two inputs, or throws an exception if unable to do so. 1341 * 1342 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1343 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1344 * is done (but not before this method completes), even if this method throws or the pipeline 1345 * is cancelled. 1346 */ 1347 @NullableDecl 1348 U apply(DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2) 1349 throws Exception; 1350 } 1351 1352 /** 1353 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1354 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1355 * 1356 * @param <V1> the type returned by the first future 1357 * @param <V2> the type returned by the second future 1358 * @param <U> the type returned by the function 1359 */ 1360 public interface AsyncClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> { 1361 1362 /** 1363 * Applies this function to two inputs, or throws an exception if unable to do so. 1364 * 1365 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1366 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1367 * is done (but not before this method completes), even if this method throws or the pipeline 1368 * is cancelled. 1369 */ 1370 ClosingFuture<U> apply( 1371 DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2) throws Exception; 1372 } 1373 1374 private final ClosingFuture<V1> future1; 1375 private final ClosingFuture<V2> future2; 1376 1377 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1378 super(true, ImmutableList.of(future1, future2)); 1379 this.future1 = future1; 1380 this.future2 = future2; 1381 } 1382 1383 /** 1384 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1385 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1386 * objects to be closed when the pipeline is done. 1387 * 1388 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1389 * any of the inputs fail, so will the returned step. 1390 * 1391 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1392 * 1393 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1394 * ExecutionException} will be extracted and used as the failure of the derived step. 1395 */ 1396 public <U extends Object> ClosingFuture<U> call( 1397 final ClosingFunction2<V1, V2, U> function, Executor executor) { 1398 return call( 1399 new CombiningCallable<U>() { 1400 @Override 1401 @NullableDecl 1402 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1403 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1404 } 1405 1406 @Override 1407 public String toString() { 1408 return function.toString(); 1409 } 1410 }, 1411 executor); 1412 } 1413 1414 /** 1415 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1416 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1417 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1418 * captured by the returned {@link ClosingFuture}). 1419 * 1420 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1421 * any of the inputs fail, so will the returned step. 1422 * 1423 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1424 * 1425 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1426 * ExecutionException} will be extracted and used as the failure of the derived step. 1427 * 1428 * <p>If the function throws any other exception, it will be used as the failure of the derived 1429 * step. 1430 * 1431 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1432 * the closeable objects in that {@code ClosingFuture} will be closed. 1433 * 1434 * <p>Usage guidelines for this method: 1435 * 1436 * <ul> 1437 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1438 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1439 * Executor)} instead, with a function that returns the next value directly. 1440 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1441 * closer.eventuallyClose()} for every closeable object this step creates in order to 1442 * capture it for later closing. 1443 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1444 * ClosingFuture} call {@link #from(ListenableFuture)}. 1445 * </ul> 1446 * 1447 * <p>The same warnings about doing heavyweight operations within {@link 1448 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1449 */ 1450 public <U extends Object> ClosingFuture<U> callAsync( 1451 final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1452 return callAsync( 1453 new AsyncCombiningCallable<U>() { 1454 @Override 1455 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1456 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1457 } 1458 1459 @Override 1460 public String toString() { 1461 return function.toString(); 1462 } 1463 }, 1464 executor); 1465 } 1466 } 1467 1468 /** 1469 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1470 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1471 * ClosingFuture)} to start this combination. 1472 * 1473 * @param <V1> the type returned by the first future 1474 * @param <V2> the type returned by the second future 1475 * @param <V3> the type returned by the third future 1476 */ 1477 public static final class Combiner3<V1 extends Object, V2 extends Object, V3 extends Object> 1478 extends Combiner { 1479 /** 1480 * A function that returns a value when applied to the values of the three futures passed to 1481 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1482 * 1483 * @param <V1> the type returned by the first future 1484 * @param <V2> the type returned by the second future 1485 * @param <V3> the type returned by the third future 1486 * @param <U> the type returned by the function 1487 */ 1488 public interface ClosingFunction3< 1489 V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> { 1490 /** 1491 * Applies this function to three inputs, or throws an exception if unable to do so. 1492 * 1493 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1494 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1495 * is done (but not before this method completes), even if this method throws or the pipeline 1496 * is cancelled. 1497 */ 1498 @NullableDecl 1499 U apply( 1500 DeferredCloser closer, 1501 @NullableDecl V1 value1, 1502 @NullableDecl V2 value2, 1503 @NullableDecl V3 v3) 1504 throws Exception; 1505 } 1506 1507 /** 1508 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1509 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1510 * 1511 * @param <V1> the type returned by the first future 1512 * @param <V2> the type returned by the second future 1513 * @param <V3> the type returned by the third future 1514 * @param <U> the type returned by the function 1515 */ 1516 public interface AsyncClosingFunction3< 1517 V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> { 1518 /** 1519 * Applies this function to three inputs, or throws an exception if unable to do so. 1520 * 1521 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1522 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1523 * is done (but not before this method completes), even if this method throws or the pipeline 1524 * is cancelled. 1525 */ 1526 ClosingFuture<U> apply( 1527 DeferredCloser closer, 1528 @NullableDecl V1 value1, 1529 @NullableDecl V2 value2, 1530 @NullableDecl V3 value3) 1531 throws Exception; 1532 } 1533 1534 private final ClosingFuture<V1> future1; 1535 private final ClosingFuture<V2> future2; 1536 private final ClosingFuture<V3> future3; 1537 1538 private Combiner3( 1539 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1540 super(true, ImmutableList.of(future1, future2, future3)); 1541 this.future1 = future1; 1542 this.future2 = future2; 1543 this.future3 = future3; 1544 } 1545 1546 /** 1547 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1548 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1549 * objects to be closed when the pipeline is done. 1550 * 1551 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1552 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1553 * 1554 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1555 * 1556 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1557 * ExecutionException} will be extracted and used as the failure of the derived step. 1558 */ 1559 public <U extends Object> ClosingFuture<U> call( 1560 final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1561 return call( 1562 new CombiningCallable<U>() { 1563 @Override 1564 @NullableDecl 1565 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1566 return function.apply( 1567 closer, 1568 peeker.getDone(future1), 1569 peeker.getDone(future2), 1570 peeker.getDone(future3)); 1571 } 1572 1573 @Override 1574 public String toString() { 1575 return function.toString(); 1576 } 1577 }, 1578 executor); 1579 } 1580 1581 /** 1582 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1583 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1584 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1585 * captured by the returned {@link ClosingFuture}). 1586 * 1587 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1588 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1589 * 1590 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1591 * 1592 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1593 * ExecutionException} will be extracted and used as the failure of the derived step. 1594 * 1595 * <p>If the function throws any other exception, it will be used as the failure of the derived 1596 * step. 1597 * 1598 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1599 * the closeable objects in that {@code ClosingFuture} will be closed. 1600 * 1601 * <p>Usage guidelines for this method: 1602 * 1603 * <ul> 1604 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1605 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1606 * Executor)} instead, with a function that returns the next value directly. 1607 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1608 * closer.eventuallyClose()} for every closeable object this step creates in order to 1609 * capture it for later closing. 1610 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1611 * ClosingFuture} call {@link #from(ListenableFuture)}. 1612 * </ul> 1613 * 1614 * <p>The same warnings about doing heavyweight operations within {@link 1615 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1616 */ 1617 public <U extends Object> ClosingFuture<U> callAsync( 1618 final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1619 return callAsync( 1620 new AsyncCombiningCallable<U>() { 1621 @Override 1622 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1623 return function.apply( 1624 closer, 1625 peeker.getDone(future1), 1626 peeker.getDone(future2), 1627 peeker.getDone(future3)); 1628 } 1629 1630 @Override 1631 public String toString() { 1632 return function.toString(); 1633 } 1634 }, 1635 executor); 1636 } 1637 } 1638 1639 /** 1640 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1641 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1642 * ClosingFuture)} to start this combination. 1643 * 1644 * @param <V1> the type returned by the first future 1645 * @param <V2> the type returned by the second future 1646 * @param <V3> the type returned by the third future 1647 * @param <V4> the type returned by the fourth future 1648 */ 1649 public static final class Combiner4< 1650 V1 extends Object, V2 extends Object, V3 extends Object, V4 extends Object> 1651 extends Combiner { 1652 /** 1653 * A function that returns a value when applied to the values of the four futures passed to 1654 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1655 * 1656 * @param <V1> the type returned by the first future 1657 * @param <V2> the type returned by the second future 1658 * @param <V3> the type returned by the third future 1659 * @param <V4> the type returned by the fourth future 1660 * @param <U> the type returned by the function 1661 */ 1662 public interface ClosingFunction4< 1663 V1 extends Object, 1664 V2 extends Object, 1665 V3 extends Object, 1666 V4 extends Object, 1667 U extends Object> { 1668 /** 1669 * Applies this function to four inputs, or throws an exception if unable to do so. 1670 * 1671 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1672 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1673 * is done (but not before this method completes), even if this method throws or the pipeline 1674 * is cancelled. 1675 */ 1676 @NullableDecl 1677 U apply( 1678 DeferredCloser closer, 1679 @NullableDecl V1 value1, 1680 @NullableDecl V2 value2, 1681 @NullableDecl V3 value3, 1682 @NullableDecl V4 value4) 1683 throws Exception; 1684 } 1685 1686 /** 1687 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1688 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1689 * ClosingFuture)}. 1690 * 1691 * @param <V1> the type returned by the first future 1692 * @param <V2> the type returned by the second future 1693 * @param <V3> the type returned by the third future 1694 * @param <V4> the type returned by the fourth future 1695 * @param <U> the type returned by the function 1696 */ 1697 public interface AsyncClosingFunction4< 1698 V1 extends Object, 1699 V2 extends Object, 1700 V3 extends Object, 1701 V4 extends Object, 1702 U extends Object> { 1703 /** 1704 * Applies this function to four inputs, or throws an exception if unable to do so. 1705 * 1706 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1707 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1708 * is done (but not before this method completes), even if this method throws or the pipeline 1709 * is cancelled. 1710 */ 1711 ClosingFuture<U> apply( 1712 DeferredCloser closer, 1713 @NullableDecl V1 value1, 1714 @NullableDecl V2 value2, 1715 @NullableDecl V3 value3, 1716 @NullableDecl V4 value4) 1717 throws Exception; 1718 } 1719 1720 private final ClosingFuture<V1> future1; 1721 private final ClosingFuture<V2> future2; 1722 private final ClosingFuture<V3> future3; 1723 private final ClosingFuture<V4> future4; 1724 1725 private Combiner4( 1726 ClosingFuture<V1> future1, 1727 ClosingFuture<V2> future2, 1728 ClosingFuture<V3> future3, 1729 ClosingFuture<V4> future4) { 1730 super(true, ImmutableList.of(future1, future2, future3, future4)); 1731 this.future1 = future1; 1732 this.future2 = future2; 1733 this.future3 = future3; 1734 this.future4 = future4; 1735 } 1736 1737 /** 1738 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1739 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1740 * objects to be closed when the pipeline is done. 1741 * 1742 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1743 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1744 * 1745 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1746 * 1747 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1748 * ExecutionException} will be extracted and used as the failure of the derived step. 1749 */ 1750 public <U extends Object> ClosingFuture<U> call( 1751 final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1752 return call( 1753 new CombiningCallable<U>() { 1754 @Override 1755 @NullableDecl 1756 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1757 return function.apply( 1758 closer, 1759 peeker.getDone(future1), 1760 peeker.getDone(future2), 1761 peeker.getDone(future3), 1762 peeker.getDone(future4)); 1763 } 1764 1765 @Override 1766 public String toString() { 1767 return function.toString(); 1768 } 1769 }, 1770 executor); 1771 } 1772 1773 /** 1774 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1775 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1776 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1777 * captured by the returned {@link ClosingFuture}). 1778 * 1779 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1780 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1781 * 1782 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1783 * 1784 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1785 * ExecutionException} will be extracted and used as the failure of the derived step. 1786 * 1787 * <p>If the function throws any other exception, it will be used as the failure of the derived 1788 * step. 1789 * 1790 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1791 * the closeable objects in that {@code ClosingFuture} will be closed. 1792 * 1793 * <p>Usage guidelines for this method: 1794 * 1795 * <ul> 1796 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1797 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1798 * Executor)} instead, with a function that returns the next value directly. 1799 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1800 * closer.eventuallyClose()} for every closeable object this step creates in order to 1801 * capture it for later closing. 1802 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1803 * ClosingFuture} call {@link #from(ListenableFuture)}. 1804 * </ul> 1805 * 1806 * <p>The same warnings about doing heavyweight operations within {@link 1807 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1808 */ 1809 public <U extends Object> ClosingFuture<U> callAsync( 1810 final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1811 return callAsync( 1812 new AsyncCombiningCallable<U>() { 1813 @Override 1814 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1815 return function.apply( 1816 closer, 1817 peeker.getDone(future1), 1818 peeker.getDone(future2), 1819 peeker.getDone(future3), 1820 peeker.getDone(future4)); 1821 } 1822 1823 @Override 1824 public String toString() { 1825 return function.toString(); 1826 } 1827 }, 1828 executor); 1829 } 1830 } 1831 1832 /** 1833 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1834 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1835 * ClosingFuture, ClosingFuture)} to start this combination. 1836 * 1837 * @param <V1> the type returned by the first future 1838 * @param <V2> the type returned by the second future 1839 * @param <V3> the type returned by the third future 1840 * @param <V4> the type returned by the fourth future 1841 * @param <V5> the type returned by the fifth future 1842 */ 1843 public static final class Combiner5< 1844 V1 extends Object, 1845 V2 extends Object, 1846 V3 extends Object, 1847 V4 extends Object, 1848 V5 extends Object> 1849 extends Combiner { 1850 /** 1851 * A function that returns a value when applied to the values of the five futures passed to 1852 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1853 * ClosingFuture)}. 1854 * 1855 * @param <V1> the type returned by the first future 1856 * @param <V2> the type returned by the second future 1857 * @param <V3> the type returned by the third future 1858 * @param <V4> the type returned by the fourth future 1859 * @param <V5> the type returned by the fifth future 1860 * @param <U> the type returned by the function 1861 */ 1862 public interface ClosingFunction5< 1863 V1 extends Object, 1864 V2 extends Object, 1865 V3 extends Object, 1866 V4 extends Object, 1867 V5 extends Object, 1868 U extends Object> { 1869 /** 1870 * Applies this function to five inputs, or throws an exception if unable to do so. 1871 * 1872 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1873 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1874 * is done (but not before this method completes), even if this method throws or the pipeline 1875 * is cancelled. 1876 */ 1877 @NullableDecl 1878 U apply( 1879 DeferredCloser closer, 1880 @NullableDecl V1 value1, 1881 @NullableDecl V2 value2, 1882 @NullableDecl V3 value3, 1883 @NullableDecl V4 value4, 1884 @NullableDecl V5 value5) 1885 throws Exception; 1886 } 1887 1888 /** 1889 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1890 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1891 * ClosingFuture, ClosingFuture)}. 1892 * 1893 * @param <V1> the type returned by the first future 1894 * @param <V2> the type returned by the second future 1895 * @param <V3> the type returned by the third future 1896 * @param <V4> the type returned by the fourth future 1897 * @param <V5> the type returned by the fifth future 1898 * @param <U> the type returned by the function 1899 */ 1900 public interface AsyncClosingFunction5< 1901 V1 extends Object, 1902 V2 extends Object, 1903 V3 extends Object, 1904 V4 extends Object, 1905 V5 extends Object, 1906 U extends Object> { 1907 /** 1908 * Applies this function to five inputs, or throws an exception if unable to do so. 1909 * 1910 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1911 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1912 * is done (but not before this method completes), even if this method throws or the pipeline 1913 * is cancelled. 1914 */ 1915 ClosingFuture<U> apply( 1916 DeferredCloser closer, 1917 @NullableDecl V1 value1, 1918 @NullableDecl V2 value2, 1919 @NullableDecl V3 value3, 1920 @NullableDecl V4 value4, 1921 @NullableDecl V5 value5) 1922 throws Exception; 1923 } 1924 1925 private final ClosingFuture<V1> future1; 1926 private final ClosingFuture<V2> future2; 1927 private final ClosingFuture<V3> future3; 1928 private final ClosingFuture<V4> future4; 1929 private final ClosingFuture<V5> future5; 1930 1931 private Combiner5( 1932 ClosingFuture<V1> future1, 1933 ClosingFuture<V2> future2, 1934 ClosingFuture<V3> future3, 1935 ClosingFuture<V4> future4, 1936 ClosingFuture<V5> future5) { 1937 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 1938 this.future1 = future1; 1939 this.future2 = future2; 1940 this.future3 = future3; 1941 this.future4 = future4; 1942 this.future5 = future5; 1943 } 1944 1945 /** 1946 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1947 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1948 * objects to be closed when the pipeline is done. 1949 * 1950 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1951 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 1952 * returned step. 1953 * 1954 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1955 * 1956 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1957 * ExecutionException} will be extracted and used as the failure of the derived step. 1958 */ 1959 public <U extends Object> ClosingFuture<U> call( 1960 final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 1961 return call( 1962 new CombiningCallable<U>() { 1963 @Override 1964 @NullableDecl 1965 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1966 return function.apply( 1967 closer, 1968 peeker.getDone(future1), 1969 peeker.getDone(future2), 1970 peeker.getDone(future3), 1971 peeker.getDone(future4), 1972 peeker.getDone(future5)); 1973 } 1974 1975 @Override 1976 public String toString() { 1977 return function.toString(); 1978 } 1979 }, 1980 executor); 1981 } 1982 1983 /** 1984 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1985 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1986 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1987 * captured by the returned {@link ClosingFuture}). 1988 * 1989 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1990 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 1991 * returned step. 1992 * 1993 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1994 * 1995 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1996 * ExecutionException} will be extracted and used as the failure of the derived step. 1997 * 1998 * <p>If the function throws any other exception, it will be used as the failure of the derived 1999 * step. 2000 * 2001 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 2002 * the closeable objects in that {@code ClosingFuture} will be closed. 2003 * 2004 * <p>Usage guidelines for this method: 2005 * 2006 * <ul> 2007 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 2008 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 2009 * Executor)} instead, with a function that returns the next value directly. 2010 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 2011 * closer.eventuallyClose()} for every closeable object this step creates in order to 2012 * capture it for later closing. 2013 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 2014 * ClosingFuture} call {@link #from(ListenableFuture)}. 2015 * </ul> 2016 * 2017 * <p>The same warnings about doing heavyweight operations within {@link 2018 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 2019 */ 2020 public <U extends Object> ClosingFuture<U> callAsync( 2021 final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2022 return callAsync( 2023 new AsyncCombiningCallable<U>() { 2024 @Override 2025 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2026 return function.apply( 2027 closer, 2028 peeker.getDone(future1), 2029 peeker.getDone(future2), 2030 peeker.getDone(future3), 2031 peeker.getDone(future4), 2032 peeker.getDone(future5)); 2033 } 2034 2035 @Override 2036 public String toString() { 2037 return function.toString(); 2038 } 2039 }, 2040 executor); 2041 } 2042 } 2043 2044 @Override 2045 public String toString() { 2046 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2047 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2048 } 2049 2050 @Override 2051 protected void finalize() { 2052 if (state.get().equals(OPEN)) { 2053 logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2054 FluentFuture<V> unused = finishToFuture(); 2055 } 2056 } 2057 2058 private static void closeQuietly(final Closeable closeable, Executor executor) { 2059 if (closeable == null) { 2060 return; 2061 } 2062 try { 2063 executor.execute( 2064 new Runnable() { 2065 @Override 2066 public void run() { 2067 try { 2068 closeable.close(); 2069 } catch (IOException | RuntimeException e) { 2070 logger.log(WARNING, "thrown by close()", e); 2071 } 2072 } 2073 }); 2074 } catch (RejectedExecutionException e) { 2075 if (logger.isLoggable(WARNING)) { 2076 logger.log( 2077 WARNING, String.format("while submitting close to %s; will close inline", executor), e); 2078 } 2079 closeQuietly(closeable, directExecutor()); 2080 } 2081 } 2082 2083 private void checkAndUpdateState(State oldState, State newState) { 2084 checkState( 2085 compareAndUpdateState(oldState, newState), 2086 "Expected state to be %s, but it was %s", 2087 oldState, 2088 newState); 2089 } 2090 2091 private boolean compareAndUpdateState(State oldState, State newState) { 2092 return state.compareAndSet(oldState, newState); 2093 } 2094 2095 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2096 private static final class CloseableList extends IdentityHashMap<Closeable, Executor> 2097 implements Closeable { 2098 private final DeferredCloser closer = new DeferredCloser(this); 2099 private volatile boolean closed; 2100 private volatile CountDownLatch whenClosed; 2101 2102 <V, U> ListenableFuture<U> applyClosingFunction( 2103 ClosingFunction<? super V, U> transformation, V input) throws Exception { 2104 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2105 CloseableList newCloseables = new CloseableList(); 2106 try { 2107 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2108 } finally { 2109 add(newCloseables, directExecutor()); 2110 } 2111 } 2112 2113 <V, U> FluentFuture<U> applyAsyncClosingFunction( 2114 AsyncClosingFunction<V, U> transformation, V input) throws Exception { 2115 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2116 CloseableList newCloseables = new CloseableList(); 2117 try { 2118 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2119 closingFuture.becomeSubsumedInto(newCloseables); 2120 return closingFuture.future; 2121 } finally { 2122 add(newCloseables, directExecutor()); 2123 } 2124 } 2125 2126 @Override 2127 public void close() { 2128 if (closed) { 2129 return; 2130 } 2131 synchronized (this) { 2132 if (closed) { 2133 return; 2134 } 2135 closed = true; 2136 } 2137 for (Map.Entry<Closeable, Executor> entry : entrySet()) { 2138 closeQuietly(entry.getKey(), entry.getValue()); 2139 } 2140 clear(); 2141 if (whenClosed != null) { 2142 whenClosed.countDown(); 2143 } 2144 } 2145 2146 void add(@NullableDecl Closeable closeable, Executor executor) { 2147 checkNotNull(executor); 2148 if (closeable == null) { 2149 return; 2150 } 2151 synchronized (this) { 2152 if (!closed) { 2153 put(closeable, executor); 2154 return; 2155 } 2156 } 2157 closeQuietly(closeable, executor); 2158 } 2159 2160 /** 2161 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2162 */ 2163 CountDownLatch whenClosedCountDown() { 2164 if (closed) { 2165 return new CountDownLatch(0); 2166 } 2167 synchronized (this) { 2168 if (closed) { 2169 return new CountDownLatch(0); 2170 } 2171 checkState(whenClosed == null); 2172 return whenClosed = new CountDownLatch(1); 2173 } 2174 } 2175 } 2176 2177 /** 2178 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2179 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2180 */ 2181 @VisibleForTesting 2182 CountDownLatch whenClosedCountDown() { 2183 return closeables.whenClosedCountDown(); 2184 } 2185 2186 /** The state of a {@link CloseableList}. */ 2187 enum State { 2188 /** The {@link CloseableList} has not been subsumed or closed. */ 2189 OPEN, 2190 2191 /** 2192 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2193 * into any other. 2194 */ 2195 SUBSUMED, 2196 2197 /** 2198 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2199 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2200 */ 2201 WILL_CLOSE, 2202 2203 /** 2204 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2205 * {@link CloseableList} may not be subsumed. 2206 */ 2207 CLOSING, 2208 2209 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2210 CLOSED, 2211 2212 /** 2213 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2214 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2215 */ 2216 WILL_CREATE_VALUE_AND_CLOSER, 2217 } 2218}