001/* 002 * Copyright (C) 2017 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Functions.constant; 020import static com.google.common.base.MoreObjects.toStringHelper; 021import static com.google.common.base.Preconditions.checkArgument; 022import static com.google.common.base.Preconditions.checkNotNull; 023import static com.google.common.base.Preconditions.checkState; 024import static com.google.common.collect.Lists.asList; 025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED; 026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING; 027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN; 028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED; 029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE; 030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER; 031import static com.google.common.util.concurrent.Futures.getDone; 032import static com.google.common.util.concurrent.Futures.immediateFuture; 033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating; 034import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 035import static java.util.logging.Level.FINER; 036import static java.util.logging.Level.SEVERE; 037import static java.util.logging.Level.WARNING; 038 039import com.google.common.annotations.Beta; 040import com.google.common.annotations.VisibleForTesting; 041import com.google.common.base.Function; 042import com.google.common.collect.FluentIterable; 043import com.google.common.collect.ImmutableList; 044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable; 045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable; 046import com.google.common.util.concurrent.Futures.FutureCombiner; 047import com.google.errorprone.annotations.CanIgnoreReturnValue; 048import com.google.errorprone.annotations.DoNotMock; 049import com.google.j2objc.annotations.RetainedWith; 050import java.io.Closeable; 051import java.util.IdentityHashMap; 052import java.util.Map; 053import java.util.concurrent.Callable; 054import java.util.concurrent.CancellationException; 055import java.util.concurrent.CountDownLatch; 056import java.util.concurrent.ExecutionException; 057import java.util.concurrent.Executor; 058import java.util.concurrent.Future; 059import java.util.concurrent.RejectedExecutionException; 060import java.util.concurrent.atomic.AtomicReference; 061import java.util.logging.Logger; 062import org.checkerframework.checker.nullness.qual.Nullable; 063 064/** 065 * A step in a pipeline of an asynchronous computation. When the last step in the computation is 066 * complete, some objects captured during the computation are closed. 067 * 068 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an 069 * asynchronously-computed intermediate value, or else an exception that indicates the failure or 070 * cancellation of the operation so far. The only way to extract the value or exception from a step 071 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the 072 * "value" of a successful step or the "result" (value or exception) of any step. 073 * 074 * <ol> 075 * <li>A pipeline starts at its leaf step (or steps), which is created from either a callable 076 * block or a {@link ListenableFuture}. 077 * <li>Each other step is derived from one or more input steps. At each step, zero or more objects 078 * can be captured for later closing. 079 * <li>There is one last step (the root of the tree), from which you can extract the final result 080 * of the computation. After that result is available (or the computation fails), all objects 081 * captured by any of the steps in the pipeline are closed. 082 * </ol> 083 * 084 * <h3>Starting a pipeline</h3> 085 * 086 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a 087 * callable block} that may capture objects for later closing. To start a pipeline from a {@link 088 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link 089 * #from(ListenableFuture)} instead. 090 * 091 * <h3>Derived steps</h3> 092 * 093 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in 094 * ways similar to {@link FluentFuture}s: 095 * 096 * <ul> 097 * <li>by transforming the value from a successful input step, 098 * <li>by catching the exception from a failed input step, or 099 * <li>by combining the results of several input steps. 100 * </ul> 101 * 102 * Each derivation can capture the next value or any intermediate objects for later closing. 103 * 104 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its 105 * exception, or combine it with others, you cannot do anything else with it, including declare it 106 * to be the last step of the pipeline. 107 * 108 * <h4>Transforming</h4> 109 * 110 * To derive the next step by asynchronously applying a function to an input step's value, call 111 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction, 112 * Executor)} on the input step. 113 * 114 * <h4>Catching</h4> 115 * 116 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction, 117 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step. 118 * 119 * <h4>Combining</h4> 120 * 121 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link 122 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads. 123 * 124 * <h3>Cancelling</h3> 125 * 126 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step 127 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a 128 * successfully cancelled step will immediately start closing all objects captured for later closing 129 * by it and by its input steps. 130 * 131 * <h3>Ending a pipeline</h3> 132 * 133 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to 134 * close the captured objects automatically or manually. 135 * 136 * <h4>Automatically closing</h4> 137 * 138 * You can extract a {@link Future} that represents the result of the last step in the pipeline by 139 * calling {@link #finishToFuture()}. When that final {@link Future} is done, all objects captured 140 * by all steps in the pipeline will be closed. 141 * 142 * <pre>{@code 143 * FluentFuture<UserName> userName = 144 * ClosingFuture.submit( 145 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 146 * executor) 147 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 148 * .transform((closer, result) -> result.get("userName"), directExecutor()) 149 * .catching(DBException.class, e -> "no user", directExecutor()) 150 * .finishToFuture(); 151 * }</pre> 152 * 153 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query 154 * result cursor will both be closed, even if the operation is cancelled or fails. 155 * 156 * <h4>Manually closing</h4> 157 * 158 * If you want to close the captured objects manually, after you've used the final result, call 159 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the 160 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects. 161 * 162 * <pre>{@code 163 * ClosingFuture.submit( 164 * closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor), 165 * executor) 166 * .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor) 167 * .transform((closer, result) -> result.get("userName"), directExecutor()) 168 * .catching(DBException.class, e -> "no user", directExecutor()) 169 * .finishToValueAndCloser( 170 * valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor); 171 * 172 * // later 173 * try { // get() will throw if the operation failed or was cancelled. 174 * UserName userName = userNameValueAndCloser.get(); 175 * // do something with userName 176 * } finally { 177 * userNameValueAndCloser.closeAsync(); 178 * } 179 * }</pre> 180 * 181 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and 182 * the query result cursor will both be closed, even if the operation is cancelled or fails. 183 * 184 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The 185 * automatic-closing approach described above is safer. 186 * 187 * @param <V> the type of the value of this step 188 * @since 30.0 189 */ 190// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations. 191@Beta // @Beta for one release. 192@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)") 193// TODO(dpb): GWT compatibility. 194public final class ClosingFuture<V> { 195 196 private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName()); 197 198 /** 199 * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is 200 * done. 201 */ 202 public static final class DeferredCloser { 203 @RetainedWith private final CloseableList list; 204 205 DeferredCloser(CloseableList list) { 206 this.list = list; 207 } 208 209 /** 210 * Captures an object to be closed when a {@link ClosingFuture} pipeline is done. 211 * 212 * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code 213 * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code 214 * Closeable}. (For more about the flavors, see <a 215 * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your 216 * build</a>.) 217 * 218 * <p>Be careful when targeting an older SDK than you are building against (most commonly when 219 * building for Android): Ensure that any object you pass implements the interface not just in 220 * your current SDK version but also at the oldest version you support. For example, <a 221 * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version 222 * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper 223 * {@code Closeable} with a method reference like {@code cursor::close}. 224 * 225 * <p>Note that this method is still binary-compatible between flavors because the erasure of 226 * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}. 227 * 228 * @param closeable the object to be closed (see notes above) 229 * @param closingExecutor the object will be closed on this executor 230 * @return the first argument 231 */ 232 @CanIgnoreReturnValue 233 public <C extends @Nullable Object & @Nullable AutoCloseable> C eventuallyClose( 234 C closeable, Executor closingExecutor) { 235 checkNotNull(closingExecutor); 236 if (closeable != null) { 237 list.add(closeable, closingExecutor); 238 } 239 return closeable; 240 } 241 } 242 243 /** 244 * An operation that computes a result. 245 * 246 * @param <V> the type of the result 247 */ 248 @FunctionalInterface 249 public interface ClosingCallable<V extends @Nullable Object> { 250 /** 251 * Computes a result, or throws an exception if unable to do so. 252 * 253 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 254 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 255 * not before this method completes), even if this method throws or the pipeline is cancelled. 256 */ 257 V call(DeferredCloser closer) throws Exception; 258 } 259 260 /** 261 * A function from an input to a result. 262 * 263 * @param <T> the type of the input to the function 264 * @param <U> the type of the result of the function 265 */ 266 @FunctionalInterface 267 public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 268 269 /** 270 * Applies this function to an input, or throws an exception if unable to do so. 271 * 272 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 273 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 274 * not before this method completes), even if this method throws or the pipeline is cancelled. 275 */ 276 U apply(DeferredCloser closer, T input) throws Exception; 277 } 278 279 /** 280 * A function from an input to a {@link ClosingFuture} of a result. 281 * 282 * @param <T> the type of the input to the function 283 * @param <U> the type of the result of the function 284 */ 285 @FunctionalInterface 286 public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> { 287 /** 288 * Applies this function to an input, or throws an exception if unable to do so. 289 * 290 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor) 291 * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but 292 * not before this method completes), even if this method throws or the pipeline is cancelled. 293 */ 294 ClosingFuture<U> apply(DeferredCloser closer, T input) throws Exception; 295 } 296 297 /** 298 * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and 299 * allows the user to close all the closeable objects that were captured during it for later 300 * closing. 301 * 302 * <p>The asynchronous operation will have completed before this object is created. 303 * 304 * @param <V> the type of the value of a successful operation 305 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 306 */ 307 public static final class ValueAndCloser<V> { 308 309 private final ClosingFuture<? extends V> closingFuture; 310 311 ValueAndCloser(ClosingFuture<? extends V> closingFuture) { 312 this.closingFuture = checkNotNull(closingFuture); 313 } 314 315 /** 316 * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as 317 * {@link Future#get()} would. 318 * 319 * <p>Because the asynchronous operation has already completed, this method is synchronous and 320 * returns immediately. 321 * 322 * @throws CancellationException if the computation was cancelled 323 * @throws ExecutionException if the computation threw an exception 324 */ 325 @Nullable 326 public V get() throws ExecutionException { 327 return getDone(closingFuture.future); 328 } 329 330 /** 331 * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous 332 * operation on the {@link Executor}s specified by calls to {@link 333 * DeferredCloser#eventuallyClose(Closeable, Executor)}. 334 * 335 * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be 336 * closed synchronously. 337 * 338 * <p>Idempotent: objects will be closed at most once. 339 */ 340 public void closeAsync() { 341 closingFuture.close(); 342 } 343 } 344 345 /** 346 * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link 347 * ClosingFuture} pipeline. 348 * 349 * @param <V> the type of the final value of a successful pipeline 350 * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor) 351 */ 352 @FunctionalInterface 353 public interface ValueAndCloserConsumer<V> { 354 355 /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */ 356 void accept(ValueAndCloser<V> valueAndCloser); 357 } 358 359 /** 360 * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor. 361 * 362 * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for 363 * execution 364 */ 365 public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) { 366 return new ClosingFuture<>(callable, executor); 367 } 368 369 // TODO(dpb, cpovirk): Do we need submitAsync? 370 371 /** 372 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 373 * 374 * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V} 375 * implements {@link Closeable}. In order to start a pipeline with a value that will be closed 376 * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead. 377 */ 378 public static <V> ClosingFuture<V> from(ListenableFuture<V> future) { 379 return new ClosingFuture<V>(future); 380 } 381 382 /** 383 * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}. 384 * 385 * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when 386 * the pipeline is done, even if the pipeline is canceled or fails. 387 * 388 * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its 389 * value in order to close it. 390 * 391 * @param future the future to create the {@code ClosingFuture} from. For discussion of the 392 * future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable, 393 * Executor)}. 394 * @param closingExecutor the future's result will be closed on this executor 395 * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the 396 * underlying value may never be closed if the {@link Future} is canceled after its operation 397 * begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types, 398 * including those that pass them to this method, with {@link #submit(ClosingCallable, 399 * Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a 400 * {@link ListenableFuture} that doesn't create values that should be closed, use {@link 401 * ClosingFuture#from}. 402 */ 403 @Deprecated 404 public static <C extends @Nullable Object & @Nullable AutoCloseable> 405 ClosingFuture<C> eventuallyClosing( 406 ListenableFuture<C> future, final Executor closingExecutor) { 407 checkNotNull(closingExecutor); 408 final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future)); 409 Futures.addCallback( 410 future, 411 new FutureCallback<@Nullable AutoCloseable>() { 412 @Override 413 public void onSuccess(@Nullable AutoCloseable result) { 414 closingFuture.closeables.closer.eventuallyClose(result, closingExecutor); 415 } 416 417 @Override 418 public void onFailure(Throwable t) {} 419 }, 420 directExecutor()); 421 return closingFuture; 422 } 423 424 /** 425 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 426 * 427 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 428 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 429 */ 430 public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) { 431 return new Combiner(false, futures); 432 } 433 434 /** 435 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline. 436 * 437 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 438 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 439 */ 440 public static Combiner whenAllComplete( 441 ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) { 442 return whenAllComplete(asList(future1, moreFutures)); 443 } 444 445 /** 446 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 447 * all succeed. If any fail, the resulting pipeline will fail. 448 * 449 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 450 * the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished} 451 */ 452 public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) { 453 return new Combiner(true, futures); 454 } 455 456 /** 457 * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming 458 * they all succeed. If any fail, the resulting pipeline will fail. 459 * 460 * <p>Calling this method allows you to use lambdas or method references typed with the types of 461 * the input {@link ClosingFuture}s. 462 * 463 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 464 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 465 */ 466 public static <V1, V2> Combiner2<V1, V2> whenAllSucceed( 467 ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 468 return new Combiner2<>(future1, future2); 469 } 470 471 /** 472 * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming 473 * they all succeed. If any fail, the resulting pipeline will fail. 474 * 475 * <p>Calling this method allows you to use lambdas or method references typed with the types of 476 * the input {@link ClosingFuture}s. 477 * 478 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 479 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 480 */ 481 public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed( 482 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 483 return new Combiner3<>(future1, future2, future3); 484 } 485 486 /** 487 * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming 488 * they all succeed. If any fail, the resulting pipeline will fail. 489 * 490 * <p>Calling this method allows you to use lambdas or method references typed with the types of 491 * the input {@link ClosingFuture}s. 492 * 493 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 494 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 495 */ 496 public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed( 497 ClosingFuture<V1> future1, 498 ClosingFuture<V2> future2, 499 ClosingFuture<V3> future3, 500 ClosingFuture<V4> future4) { 501 return new Combiner4<>(future1, future2, future3, future4); 502 } 503 504 /** 505 * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming 506 * they all succeed. If any fail, the resulting pipeline will fail. 507 * 508 * <p>Calling this method allows you to use lambdas or method references typed with the types of 509 * the input {@link ClosingFuture}s. 510 * 511 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 512 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 513 */ 514 public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed( 515 ClosingFuture<V1> future1, 516 ClosingFuture<V2> future2, 517 ClosingFuture<V3> future3, 518 ClosingFuture<V4> future4, 519 ClosingFuture<V5> future5) { 520 return new Combiner5<>(future1, future2, future3, future4, future5); 521 } 522 523 /** 524 * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they 525 * all succeed. If any fail, the resulting pipeline will fail. 526 * 527 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of 528 * the arguments, or if any has already been {@linkplain #finishToFuture() finished} 529 */ 530 public static Combiner whenAllSucceed( 531 ClosingFuture<?> future1, 532 ClosingFuture<?> future2, 533 ClosingFuture<?> future3, 534 ClosingFuture<?> future4, 535 ClosingFuture<?> future5, 536 ClosingFuture<?> future6, 537 ClosingFuture<?>... moreFutures) { 538 return whenAllSucceed( 539 FluentIterable.of(future1, future2, future3, future4, future5, future6) 540 .append(moreFutures)); 541 } 542 543 private final AtomicReference<State> state = new AtomicReference<>(OPEN); 544 private final CloseableList closeables = new CloseableList(); 545 private final FluentFuture<V> future; 546 547 private ClosingFuture(ListenableFuture<V> future) { 548 this.future = FluentFuture.from(future); 549 } 550 551 private ClosingFuture(final ClosingCallable<V> callable, Executor executor) { 552 checkNotNull(callable); 553 TrustedListenableFutureTask<V> task = 554 TrustedListenableFutureTask.create( 555 new Callable<V>() { 556 @Override 557 public V call() throws Exception { 558 return callable.call(closeables.closer); 559 } 560 561 @Override 562 public String toString() { 563 return callable.toString(); 564 } 565 }); 566 executor.execute(task); 567 this.future = task; 568 } 569 570 /** 571 * Returns a future that finishes when this step does. Calling {@code get()} on the returned 572 * future returns {@code null} if the step is successful or throws the same exception that would 573 * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code 574 * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline. 575 * 576 * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls 577 * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or 578 * a derivation method <i>on the same instance</i>. This is important because calling {@code 579 * statusFuture} alone does not provide a way to close the pipeline. 580 */ 581 public ListenableFuture<?> statusFuture() { 582 return nonCancellationPropagating(future.transform(constant(null), directExecutor())); 583 } 584 585 /** 586 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 587 * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed 588 * when the pipeline is done. 589 * 590 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 591 * ClosingFuture} will be equivalent to this one. 592 * 593 * <p>If the function throws an exception, that exception is used as the result of the derived 594 * {@code ClosingFuture}. 595 * 596 * <p>Example usage: 597 * 598 * <pre>{@code 599 * ClosingFuture<List<Row>> rowsFuture = 600 * queryFuture.transform((closer, result) -> result.getRows(), executor); 601 * }</pre> 602 * 603 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 604 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 605 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 606 * 607 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 608 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 609 * this {@code ClosingFuture}. 610 * 611 * @param function transforms the value of this step to the value of the derived step 612 * @param executor executor to run the function in 613 * @return the derived step 614 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 615 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 616 * finished} 617 */ 618 public <U> ClosingFuture<U> transform( 619 final ClosingFunction<? super V, U> function, Executor executor) { 620 checkNotNull(function); 621 AsyncFunction<V, U> applyFunction = 622 new AsyncFunction<V, U>() { 623 @Override 624 public ListenableFuture<U> apply(V input) throws Exception { 625 return closeables.applyClosingFunction(function, input); 626 } 627 628 @Override 629 public String toString() { 630 return function.toString(); 631 } 632 }; 633 // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function). 634 return derive(future.transformAsync(applyFunction, executor)); 635 } 636 637 /** 638 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 639 * that returns a {@code ClosingFuture} to its value. The function can use a {@link 640 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 641 * captured by the returned {@link ClosingFuture}). 642 * 643 * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one 644 * returned by the function. 645 * 646 * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code 647 * ClosingFuture} will be equivalent to this one. 648 * 649 * <p>If the function throws an exception, that exception is used as the result of the derived 650 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 651 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 652 * closed. 653 * 654 * <p>Usage guidelines for this method: 655 * 656 * <ul> 657 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 658 * {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction, 659 * Executor)} instead, with a function that returns the next value directly. 660 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 661 * for every closeable object this step creates in order to capture it for later closing. 662 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 663 * ClosingFuture} call {@link #from(ListenableFuture)}. 664 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 665 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 666 * {@link #withoutCloser(AsyncFunction)} 667 * </ul> 668 * 669 * <p>Example usage: 670 * 671 * <pre>{@code 672 * // Result.getRowsClosingFuture() returns a ClosingFuture. 673 * ClosingFuture<List<Row>> rowsFuture = 674 * queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor); 675 * 676 * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the 677 * // number of written rows. openOutputFile() returns a FileOutputStream (which implements 678 * // Closeable). 679 * ClosingFuture<Integer> rowsFuture2 = 680 * queryFuture.transformAsync( 681 * (closer, result) -> { 682 * FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor); 683 * return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos)); 684 * }, 685 * executor); 686 * 687 * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created). 688 * ClosingFuture<List<Row>> rowsFuture3 = 689 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 690 * 691 * }</pre> 692 * 693 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 694 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 695 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 696 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 697 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 698 * responsible for completing the returned {@code ClosingFuture}.) 699 * 700 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 701 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 702 * this {@code ClosingFuture}. 703 * 704 * @param function transforms the value of this step to a {@code ClosingFuture} with the value of 705 * the derived step 706 * @param executor executor to run the function in 707 * @return the derived step 708 * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this 709 * one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture() 710 * finished} 711 */ 712 public <U> ClosingFuture<U> transformAsync( 713 final AsyncClosingFunction<? super V, U> function, Executor executor) { 714 checkNotNull(function); 715 AsyncFunction<V, U> applyFunction = 716 new AsyncFunction<V, U>() { 717 @Override 718 public ListenableFuture<U> apply(V input) throws Exception { 719 return closeables.applyAsyncClosingFunction(function, input); 720 } 721 722 @Override 723 public String toString() { 724 return function.toString(); 725 } 726 }; 727 return derive(future.transformAsync(applyFunction, executor)); 728 } 729 730 /** 731 * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input, 732 * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned 733 * {@link ListenableFuture}. 734 * 735 * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction, 736 * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it 737 * meets these conditions: 738 * 739 * <ul> 740 * <li>It does not need to capture any {@link Closeable} objects by calling {@link 741 * DeferredCloser#eventuallyClose(Closeable, Executor)}. 742 * <li>It returns a {@link ListenableFuture}. 743 * </ul> 744 * 745 * <p>Example usage: 746 * 747 * <pre>{@code 748 * // Result.getRowsFuture() returns a ListenableFuture. 749 * ClosingFuture<List<Row>> rowsFuture = 750 * queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor); 751 * }</pre> 752 * 753 * @param function transforms the value of a {@code ClosingFuture} step to a {@link 754 * ListenableFuture} with the value of a derived step 755 */ 756 public static <V, U> AsyncClosingFunction<V, U> withoutCloser( 757 final AsyncFunction<V, U> function) { 758 checkNotNull(function); 759 return new AsyncClosingFunction<V, U>() { 760 @Override 761 public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception { 762 return ClosingFuture.from(function.apply(input)); 763 } 764 }; 765 } 766 767 /** 768 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 769 * to its exception if it is an instance of a given exception type. The function can use a {@link 770 * DeferredCloser} to capture objects to be closed when the pipeline is done. 771 * 772 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 773 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 774 * one. 775 * 776 * <p>If the function throws an exception, that exception is used as the result of the derived 777 * {@code ClosingFuture}. 778 * 779 * <p>Example usage: 780 * 781 * <pre>{@code 782 * ClosingFuture<QueryResult> queryFuture = 783 * queryFuture.catching( 784 * QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor); 785 * }</pre> 786 * 787 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 788 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 789 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 790 * 791 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 792 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 793 * this {@code ClosingFuture}. 794 * 795 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 796 * type is matched against this step's exception. "This step's exception" means the cause of 797 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 798 * underlying this step or, if {@code get()} throws a different kind of exception, that 799 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 800 * prefer more specific types, avoiding {@code Throwable.class} in particular. 801 * @param fallback the function to be called if this step fails with the expected exception type. 802 * The function's argument is this step's exception. "This step's exception" means the cause 803 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 804 * underlying this step or, if {@code get()} throws a different kind of exception, that 805 * exception itself. 806 * @param executor the executor that runs {@code fallback} if the input fails 807 */ 808 public <X extends Throwable> ClosingFuture<V> catching( 809 Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) { 810 return catchingMoreGeneric(exceptionType, fallback, executor); 811 } 812 813 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 814 private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric( 815 Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) { 816 checkNotNull(fallback); 817 AsyncFunction<X, W> applyFallback = 818 new AsyncFunction<X, W>() { 819 @Override 820 public ListenableFuture<W> apply(X exception) throws Exception { 821 return closeables.applyClosingFunction(fallback, exception); 822 } 823 824 @Override 825 public String toString() { 826 return fallback.toString(); 827 } 828 }; 829 // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function). 830 return derive(future.catchingAsync(exceptionType, applyFallback, executor)); 831 } 832 833 /** 834 * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function 835 * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception 836 * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the 837 * pipeline is done (other than those captured by the returned {@link ClosingFuture}). 838 * 839 * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code 840 * ClosingFuture} will be equivalent to the one returned by the function. 841 * 842 * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the 843 * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this 844 * one. 845 * 846 * <p>If the function throws an exception, that exception is used as the result of the derived 847 * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code 848 * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be 849 * closed. 850 * 851 * <p>Usage guidelines for this method: 852 * 853 * <ul> 854 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 855 * {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class, 856 * ClosingFunction, Executor)} instead, with a function that returns the next value 857 * directly. 858 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()} 859 * for every closeable object this step creates in order to capture it for later closing. 860 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 861 * ClosingFuture} call {@link #from(ListenableFuture)}. 862 * <li>In case this step doesn't create new closeables, you can adapt an API that returns a 863 * {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to 864 * {@link #withoutCloser(AsyncFunction)} 865 * </ul> 866 * 867 * <p>Example usage: 868 * 869 * <pre>{@code 870 * // Fall back to a secondary input stream in case of IOException. 871 * ClosingFuture<InputStream> inputFuture = 872 * firstInputFuture.catchingAsync( 873 * IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor); 874 * } 875 * }</pre> 876 * 877 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 878 * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings 879 * about heavyweight listeners are also applicable to heavyweight functions passed to this method. 880 * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside 881 * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads 882 * responsible for completing the returned {@code ClosingFuture}.) 883 * 884 * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link 885 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on 886 * this {@code ClosingFuture}. 887 * 888 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 889 * type is matched against this step's exception. "This step's exception" means the cause of 890 * the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 891 * underlying this step or, if {@code get()} throws a different kind of exception, that 892 * exception itself. To avoid hiding bugs and other unrecoverable errors, callers should 893 * prefer more specific types, avoiding {@code Throwable.class} in particular. 894 * @param fallback the function to be called if this step fails with the expected exception type. 895 * The function's argument is this step's exception. "This step's exception" means the cause 896 * of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future} 897 * underlying this step or, if {@code get()} throws a different kind of exception, that 898 * exception itself. 899 * @param executor the executor that runs {@code fallback} if the input fails 900 */ 901 // TODO(dpb): Should this do something special if the function throws CancellationException or 902 // ExecutionException? 903 public <X extends Throwable> ClosingFuture<V> catchingAsync( 904 Class<X> exceptionType, 905 AsyncClosingFunction<? super X, ? extends V> fallback, 906 Executor executor) { 907 return catchingAsyncMoreGeneric(exceptionType, fallback, executor); 908 } 909 910 // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V. 911 private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric( 912 Class<X> exceptionType, 913 final AsyncClosingFunction<? super X, W> fallback, 914 Executor executor) { 915 checkNotNull(fallback); 916 AsyncFunction<X, W> asyncFunction = 917 new AsyncFunction<X, W>() { 918 @Override 919 public ListenableFuture<W> apply(X exception) throws Exception { 920 return closeables.applyAsyncClosingFunction(fallback, exception); 921 } 922 923 @Override 924 public String toString() { 925 return fallback.toString(); 926 } 927 }; 928 return derive(future.catchingAsync(exceptionType, asyncFunction, executor)); 929 } 930 931 /** 932 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When the returned 933 * {@link Future} is done, all objects captured for closing during the pipeline's computation will 934 * be closed. 935 * 936 * <p>After calling this method, you may not call {@link 937 * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other 938 * derivation method on this {@code ClosingFuture}. 939 * 940 * @return a {@link Future} that represents the final value or exception of the pipeline 941 */ 942 public FluentFuture<V> finishToFuture() { 943 if (compareAndUpdateState(OPEN, WILL_CLOSE)) { 944 logger.log(FINER, "will close {0}", this); 945 future.addListener( 946 new Runnable() { 947 @Override 948 public void run() { 949 checkAndUpdateState(WILL_CLOSE, CLOSING); 950 close(); 951 checkAndUpdateState(CLOSING, CLOSED); 952 } 953 }, 954 directExecutor()); 955 } else { 956 switch (state.get()) { 957 case SUBSUMED: 958 throw new IllegalStateException( 959 "Cannot call finishToFuture() after deriving another step"); 960 961 case WILL_CREATE_VALUE_AND_CLOSER: 962 throw new IllegalStateException( 963 "Cannot call finishToFuture() after calling finishToValueAndCloser()"); 964 965 case WILL_CLOSE: 966 case CLOSING: 967 case CLOSED: 968 throw new IllegalStateException("Cannot call finishToFuture() twice"); 969 970 case OPEN: 971 throw new AssertionError(); 972 } 973 } 974 return future; 975 } 976 977 /** 978 * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done, 979 * {@code receiver} will be called with an object that contains the result of the operation. The 980 * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use. 981 * 982 * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or 983 * any other derivation method on this {@code ClosingFuture}. 984 * 985 * @param consumer a callback whose method will be called (using {@code executor}) when this 986 * operation is done 987 */ 988 public void finishToValueAndCloser( 989 final ValueAndCloserConsumer<? super V> consumer, Executor executor) { 990 checkNotNull(consumer); 991 if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) { 992 switch (state.get()) { 993 case SUBSUMED: 994 throw new IllegalStateException( 995 "Cannot call finishToValueAndCloser() after deriving another step"); 996 997 case WILL_CLOSE: 998 case CLOSING: 999 case CLOSED: 1000 throw new IllegalStateException( 1001 "Cannot call finishToValueAndCloser() after calling finishToFuture()"); 1002 1003 case WILL_CREATE_VALUE_AND_CLOSER: 1004 throw new IllegalStateException("Cannot call finishToValueAndCloser() twice"); 1005 1006 case OPEN: 1007 break; 1008 } 1009 throw new AssertionError(state); 1010 } 1011 future.addListener( 1012 new Runnable() { 1013 @Override 1014 public void run() { 1015 provideValueAndCloser(consumer, ClosingFuture.this); 1016 } 1017 }, 1018 executor); 1019 } 1020 1021 private static <C, V extends C> void provideValueAndCloser( 1022 ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) { 1023 consumer.accept(new ValueAndCloser<C>(closingFuture)); 1024 } 1025 1026 /** 1027 * Attempts to cancel execution of this step. This attempt will fail if the step has already 1028 * completed, has already been cancelled, or could not be cancelled for some other reason. If 1029 * successful, and this step has not started when {@code cancel} is called, this step should never 1030 * run. 1031 * 1032 * <p>If successful, causes the objects captured by this step (if already started) and its input 1033 * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls 1034 * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously. 1035 * 1036 * @param mayInterruptIfRunning {@code true} if the thread executing this task should be 1037 * interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be 1038 * cancelled regardless 1039 * @return {@code false} if the step could not be cancelled, typically because it has already 1040 * completed normally; {@code true} otherwise 1041 */ 1042 @CanIgnoreReturnValue 1043 public boolean cancel(boolean mayInterruptIfRunning) { 1044 logger.log(FINER, "cancelling {0}", this); 1045 boolean cancelled = future.cancel(mayInterruptIfRunning); 1046 if (cancelled) { 1047 close(); 1048 } 1049 return cancelled; 1050 } 1051 1052 private void close() { 1053 logger.log(FINER, "closing {0}", this); 1054 closeables.close(); 1055 } 1056 1057 private <U> ClosingFuture<U> derive(FluentFuture<U> future) { 1058 ClosingFuture<U> derived = new ClosingFuture<>(future); 1059 becomeSubsumedInto(derived.closeables); 1060 return derived; 1061 } 1062 1063 private void becomeSubsumedInto(CloseableList otherCloseables) { 1064 checkAndUpdateState(OPEN, SUBSUMED); 1065 otherCloseables.add(closeables, directExecutor()); 1066 } 1067 1068 /** 1069 * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link 1070 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}. 1071 * 1072 * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object. 1073 */ 1074 public static final class Peeker { 1075 private final ImmutableList<ClosingFuture<?>> futures; 1076 private volatile boolean beingCalled; 1077 1078 private Peeker(ImmutableList<ClosingFuture<?>> futures) { 1079 this.futures = checkNotNull(futures); 1080 } 1081 1082 /** 1083 * Returns the value of {@code closingFuture}. 1084 * 1085 * @throws ExecutionException if {@code closingFuture} is a failed step 1086 * @throws CancellationException if the {@code closingFuture}'s future was cancelled 1087 * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to 1088 * {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)} 1089 * @throws IllegalStateException if called outside of a call to {@link 1090 * CombiningCallable#call(DeferredCloser, Peeker)} or {@link 1091 * AsyncCombiningCallable#call(DeferredCloser, Peeker)} 1092 */ 1093 public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture) 1094 throws ExecutionException { 1095 checkState(beingCalled); 1096 checkArgument(futures.contains(closingFuture)); 1097 return Futures.getDone(closingFuture.future); 1098 } 1099 1100 private <V extends @Nullable Object> V call( 1101 CombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1102 beingCalled = true; 1103 CloseableList newCloseables = new CloseableList(); 1104 try { 1105 return combiner.call(newCloseables.closer, this); 1106 } finally { 1107 closeables.add(newCloseables, directExecutor()); 1108 beingCalled = false; 1109 } 1110 } 1111 1112 private <V extends @Nullable Object> FluentFuture<V> callAsync( 1113 AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception { 1114 beingCalled = true; 1115 CloseableList newCloseables = new CloseableList(); 1116 try { 1117 ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this); 1118 closingFuture.becomeSubsumedInto(closeables); 1119 return closingFuture.future; 1120 } finally { 1121 closeables.add(newCloseables, directExecutor()); 1122 beingCalled = false; 1123 } 1124 } 1125 } 1126 1127 /** 1128 * A builder of a {@link ClosingFuture} step that is derived from more than one input step. 1129 * 1130 * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to 1131 * instantiate this class. 1132 * 1133 * <p>Example: 1134 * 1135 * <pre>{@code 1136 * final ClosingFuture<BufferedReader> file1ReaderFuture = ...; 1137 * final ClosingFuture<BufferedReader> file2ReaderFuture = ...; 1138 * ListenableFuture<Integer> numberOfDifferentLines = 1139 * ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture) 1140 * .call( 1141 * (closer, peeker) -> { 1142 * BufferedReader file1Reader = peeker.getDone(file1ReaderFuture); 1143 * BufferedReader file2Reader = peeker.getDone(file2ReaderFuture); 1144 * return countDifferentLines(file1Reader, file2Reader); 1145 * }, 1146 * executor) 1147 * .closing(executor); 1148 * }</pre> 1149 */ 1150 // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8. 1151 @com.google.errorprone.annotations.DoNotMock( 1152 "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.") 1153 public static class Combiner { 1154 1155 private final CloseableList closeables = new CloseableList(); 1156 1157 /** 1158 * An operation that returns a result and may throw an exception. 1159 * 1160 * @param <V> the type of the result 1161 */ 1162 @FunctionalInterface 1163 public interface CombiningCallable<V extends @Nullable Object> { 1164 /** 1165 * Computes a result, or throws an exception if unable to do so. 1166 * 1167 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1168 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1169 * is done (but not before this method completes), even if this method throws or the pipeline 1170 * is cancelled. 1171 * 1172 * @param peeker used to get the value of any of the input futures 1173 */ 1174 V call(DeferredCloser closer, Peeker peeker) throws Exception; 1175 } 1176 1177 /** 1178 * An operation that returns a {@link ClosingFuture} result and may throw an exception. 1179 * 1180 * @param <V> the type of the result 1181 */ 1182 @FunctionalInterface 1183 public interface AsyncCombiningCallable<V extends @Nullable Object> { 1184 /** 1185 * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so. 1186 * 1187 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1188 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1189 * is done (but not before this method completes), even if this method throws or the pipeline 1190 * is cancelled. 1191 * 1192 * @param peeker used to get the value of any of the input futures 1193 */ 1194 ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception; 1195 } 1196 1197 private final boolean allMustSucceed; 1198 protected final ImmutableList<ClosingFuture<?>> inputs; 1199 1200 private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) { 1201 this.allMustSucceed = allMustSucceed; 1202 this.inputs = ImmutableList.copyOf(inputs); 1203 for (ClosingFuture<?> input : inputs) { 1204 input.becomeSubsumedInto(closeables); 1205 } 1206 } 1207 1208 /** 1209 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1210 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1211 * objects to be closed when the pipeline is done. 1212 * 1213 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1214 * fail, so will the returned step. 1215 * 1216 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1217 * cancelled. 1218 * 1219 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1220 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1221 */ 1222 public <V> ClosingFuture<V> call( 1223 final CombiningCallable<V> combiningCallable, Executor executor) { 1224 Callable<V> callable = 1225 new Callable<V>() { 1226 @Override 1227 public V call() throws Exception { 1228 return new Peeker(inputs).call(combiningCallable, closeables); 1229 } 1230 1231 @Override 1232 public String toString() { 1233 return combiningCallable.toString(); 1234 } 1235 }; 1236 ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor)); 1237 derived.closeables.add(closeables, directExecutor()); 1238 return derived; 1239 } 1240 1241 /** 1242 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1243 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1244 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1245 * captured by the returned {@link ClosingFuture}). 1246 * 1247 * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs 1248 * fail, so will the returned step. 1249 * 1250 * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be 1251 * cancelled. 1252 * 1253 * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown 1254 * {@code ExecutionException} will be extracted and used as the failure of the derived step. 1255 * 1256 * <p>If the combiningCallable throws any other exception, it will be used as the failure of the 1257 * derived step. 1258 * 1259 * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture}, 1260 * then none of the closeable objects in that {@code ClosingFuture} will be closed. 1261 * 1262 * <p>Usage guidelines for this method: 1263 * 1264 * <ul> 1265 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1266 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1267 * Executor)} instead, with a function that returns the next value directly. 1268 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1269 * closer.eventuallyClose()} for every closeable object this step creates in order to 1270 * capture it for later closing. 1271 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1272 * ClosingFuture} call {@link #from(ListenableFuture)}. 1273 * </ul> 1274 * 1275 * <p>The same warnings about doing heavyweight operations within {@link 1276 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1277 */ 1278 public <V> ClosingFuture<V> callAsync( 1279 final AsyncCombiningCallable<V> combiningCallable, Executor executor) { 1280 AsyncCallable<V> asyncCallable = 1281 new AsyncCallable<V>() { 1282 @Override 1283 public ListenableFuture<V> call() throws Exception { 1284 return new Peeker(inputs).callAsync(combiningCallable, closeables); 1285 } 1286 1287 @Override 1288 public String toString() { 1289 return combiningCallable.toString(); 1290 } 1291 }; 1292 ClosingFuture<V> derived = 1293 new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor)); 1294 derived.closeables.add(closeables, directExecutor()); 1295 return derived; 1296 } 1297 1298 private FutureCombiner<Object> futureCombiner() { 1299 return allMustSucceed 1300 ? Futures.whenAllSucceed(inputFutures()) 1301 : Futures.whenAllComplete(inputFutures()); 1302 } 1303 1304 private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE = 1305 new Function<ClosingFuture<?>, FluentFuture<?>>() { 1306 @Override 1307 public FluentFuture<?> apply(ClosingFuture<?> future) { 1308 return future.future; 1309 } 1310 }; 1311 1312 private ImmutableList<FluentFuture<?>> inputFutures() { 1313 return FluentIterable.from(inputs).transform(INNER_FUTURE).toList(); 1314 } 1315 } 1316 1317 /** 1318 * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link 1319 * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this 1320 * combination. 1321 * 1322 * @param <V1> the type returned by the first future 1323 * @param <V2> the type returned by the second future 1324 */ 1325 public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object> 1326 extends Combiner { 1327 1328 /** 1329 * A function that returns a value when applied to the values of the two futures passed to 1330 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1331 * 1332 * @param <V1> the type returned by the first future 1333 * @param <V2> the type returned by the second future 1334 * @param <U> the type returned by the function 1335 */ 1336 @FunctionalInterface 1337 public interface ClosingFunction2< 1338 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1339 1340 /** 1341 * Applies this function to two inputs, or throws an exception if unable to do so. 1342 * 1343 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1344 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1345 * is done (but not before this method completes), even if this method throws or the pipeline 1346 * is cancelled. 1347 */ 1348 U apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception; 1349 } 1350 1351 /** 1352 * A function that returns a {@link ClosingFuture} when applied to the values of the two futures 1353 * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}. 1354 * 1355 * @param <V1> the type returned by the first future 1356 * @param <V2> the type returned by the second future 1357 * @param <U> the type returned by the function 1358 */ 1359 @FunctionalInterface 1360 public interface AsyncClosingFunction2< 1361 V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> { 1362 1363 /** 1364 * Applies this function to two inputs, or throws an exception if unable to do so. 1365 * 1366 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1367 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1368 * is done (but not before this method completes), even if this method throws or the pipeline 1369 * is cancelled. 1370 */ 1371 ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception; 1372 } 1373 1374 private final ClosingFuture<V1> future1; 1375 private final ClosingFuture<V2> future2; 1376 1377 private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) { 1378 super(true, ImmutableList.of(future1, future2)); 1379 this.future1 = future1; 1380 this.future2 = future2; 1381 } 1382 1383 /** 1384 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1385 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1386 * objects to be closed when the pipeline is done. 1387 * 1388 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1389 * any of the inputs fail, so will the returned step. 1390 * 1391 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1392 * 1393 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1394 * ExecutionException} will be extracted and used as the failure of the derived step. 1395 */ 1396 public <U extends @Nullable Object> ClosingFuture<U> call( 1397 final ClosingFunction2<V1, V2, U> function, Executor executor) { 1398 return call( 1399 new CombiningCallable<U>() { 1400 @Override 1401 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1402 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1403 } 1404 1405 @Override 1406 public String toString() { 1407 return function.toString(); 1408 } 1409 }, 1410 executor); 1411 } 1412 1413 /** 1414 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1415 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1416 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1417 * captured by the returned {@link ClosingFuture}). 1418 * 1419 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and 1420 * any of the inputs fail, so will the returned step. 1421 * 1422 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1423 * 1424 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1425 * ExecutionException} will be extracted and used as the failure of the derived step. 1426 * 1427 * <p>If the function throws any other exception, it will be used as the failure of the derived 1428 * step. 1429 * 1430 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1431 * the closeable objects in that {@code ClosingFuture} will be closed. 1432 * 1433 * <p>Usage guidelines for this method: 1434 * 1435 * <ul> 1436 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1437 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1438 * Executor)} instead, with a function that returns the next value directly. 1439 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1440 * closer.eventuallyClose()} for every closeable object this step creates in order to 1441 * capture it for later closing. 1442 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1443 * ClosingFuture} call {@link #from(ListenableFuture)}. 1444 * </ul> 1445 * 1446 * <p>The same warnings about doing heavyweight operations within {@link 1447 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1448 */ 1449 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1450 final AsyncClosingFunction2<V1, V2, U> function, Executor executor) { 1451 return callAsync( 1452 new AsyncCombiningCallable<U>() { 1453 @Override 1454 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1455 return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2)); 1456 } 1457 1458 @Override 1459 public String toString() { 1460 return function.toString(); 1461 } 1462 }, 1463 executor); 1464 } 1465 } 1466 1467 /** 1468 * A generic {@link Combiner} that lets you use a lambda or method reference to combine three 1469 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1470 * ClosingFuture)} to start this combination. 1471 * 1472 * @param <V1> the type returned by the first future 1473 * @param <V2> the type returned by the second future 1474 * @param <V3> the type returned by the third future 1475 */ 1476 public static final class Combiner3< 1477 V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object> 1478 extends Combiner { 1479 /** 1480 * A function that returns a value when applied to the values of the three futures passed to 1481 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1482 * 1483 * @param <V1> the type returned by the first future 1484 * @param <V2> the type returned by the second future 1485 * @param <V3> the type returned by the third future 1486 * @param <U> the type returned by the function 1487 */ 1488 @FunctionalInterface 1489 public interface ClosingFunction3< 1490 V1 extends @Nullable Object, 1491 V2 extends @Nullable Object, 1492 V3 extends @Nullable Object, 1493 U extends @Nullable Object> { 1494 /** 1495 * Applies this function to three inputs, or throws an exception if unable to do so. 1496 * 1497 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1498 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1499 * is done (but not before this method completes), even if this method throws or the pipeline 1500 * is cancelled. 1501 */ 1502 U apply(DeferredCloser closer, V1 value1, V2 value2, V3 v3) throws Exception; 1503 } 1504 1505 /** 1506 * A function that returns a {@link ClosingFuture} when applied to the values of the three 1507 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}. 1508 * 1509 * @param <V1> the type returned by the first future 1510 * @param <V2> the type returned by the second future 1511 * @param <V3> the type returned by the third future 1512 * @param <U> the type returned by the function 1513 */ 1514 @FunctionalInterface 1515 public interface AsyncClosingFunction3< 1516 V1 extends @Nullable Object, 1517 V2 extends @Nullable Object, 1518 V3 extends @Nullable Object, 1519 U extends @Nullable Object> { 1520 /** 1521 * Applies this function to three inputs, or throws an exception if unable to do so. 1522 * 1523 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1524 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1525 * is done (but not before this method completes), even if this method throws or the pipeline 1526 * is cancelled. 1527 */ 1528 ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3) 1529 throws Exception; 1530 } 1531 1532 private final ClosingFuture<V1> future1; 1533 private final ClosingFuture<V2> future2; 1534 private final ClosingFuture<V3> future3; 1535 1536 private Combiner3( 1537 ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) { 1538 super(true, ImmutableList.of(future1, future2, future3)); 1539 this.future1 = future1; 1540 this.future2 = future2; 1541 this.future3 = future3; 1542 } 1543 1544 /** 1545 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1546 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1547 * objects to be closed when the pipeline is done. 1548 * 1549 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1550 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1551 * 1552 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1553 * 1554 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1555 * ExecutionException} will be extracted and used as the failure of the derived step. 1556 */ 1557 public <U extends @Nullable Object> ClosingFuture<U> call( 1558 final ClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1559 return call( 1560 new CombiningCallable<U>() { 1561 @Override 1562 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1563 return function.apply( 1564 closer, 1565 peeker.getDone(future1), 1566 peeker.getDone(future2), 1567 peeker.getDone(future3)); 1568 } 1569 1570 @Override 1571 public String toString() { 1572 return function.toString(); 1573 } 1574 }, 1575 executor); 1576 } 1577 1578 /** 1579 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1580 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1581 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1582 * captured by the returned {@link ClosingFuture}). 1583 * 1584 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1585 * ClosingFuture)} and any of the inputs fail, so will the returned step. 1586 * 1587 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1588 * 1589 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1590 * ExecutionException} will be extracted and used as the failure of the derived step. 1591 * 1592 * <p>If the function throws any other exception, it will be used as the failure of the derived 1593 * step. 1594 * 1595 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1596 * the closeable objects in that {@code ClosingFuture} will be closed. 1597 * 1598 * <p>Usage guidelines for this method: 1599 * 1600 * <ul> 1601 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1602 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1603 * Executor)} instead, with a function that returns the next value directly. 1604 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1605 * closer.eventuallyClose()} for every closeable object this step creates in order to 1606 * capture it for later closing. 1607 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1608 * ClosingFuture} call {@link #from(ListenableFuture)}. 1609 * </ul> 1610 * 1611 * <p>The same warnings about doing heavyweight operations within {@link 1612 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1613 */ 1614 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1615 final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) { 1616 return callAsync( 1617 new AsyncCombiningCallable<U>() { 1618 @Override 1619 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1620 return function.apply( 1621 closer, 1622 peeker.getDone(future1), 1623 peeker.getDone(future2), 1624 peeker.getDone(future3)); 1625 } 1626 1627 @Override 1628 public String toString() { 1629 return function.toString(); 1630 } 1631 }, 1632 executor); 1633 } 1634 } 1635 1636 /** 1637 * A generic {@link Combiner} that lets you use a lambda or method reference to combine four 1638 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1639 * ClosingFuture)} to start this combination. 1640 * 1641 * @param <V1> the type returned by the first future 1642 * @param <V2> the type returned by the second future 1643 * @param <V3> the type returned by the third future 1644 * @param <V4> the type returned by the fourth future 1645 */ 1646 public static final class Combiner4< 1647 V1 extends @Nullable Object, 1648 V2 extends @Nullable Object, 1649 V3 extends @Nullable Object, 1650 V4 extends @Nullable Object> 1651 extends Combiner { 1652 /** 1653 * A function that returns a value when applied to the values of the four futures passed to 1654 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}. 1655 * 1656 * @param <V1> the type returned by the first future 1657 * @param <V2> the type returned by the second future 1658 * @param <V3> the type returned by the third future 1659 * @param <V4> the type returned by the fourth future 1660 * @param <U> the type returned by the function 1661 */ 1662 @FunctionalInterface 1663 public interface ClosingFunction4< 1664 V1 extends @Nullable Object, 1665 V2 extends @Nullable Object, 1666 V3 extends @Nullable Object, 1667 V4 extends @Nullable Object, 1668 U extends @Nullable Object> { 1669 /** 1670 * Applies this function to four inputs, or throws an exception if unable to do so. 1671 * 1672 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1673 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1674 * is done (but not before this method completes), even if this method throws or the pipeline 1675 * is cancelled. 1676 */ 1677 U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4) throws Exception; 1678 } 1679 1680 /** 1681 * A function that returns a {@link ClosingFuture} when applied to the values of the four 1682 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1683 * ClosingFuture)}. 1684 * 1685 * @param <V1> the type returned by the first future 1686 * @param <V2> the type returned by the second future 1687 * @param <V3> the type returned by the third future 1688 * @param <V4> the type returned by the fourth future 1689 * @param <U> the type returned by the function 1690 */ 1691 @FunctionalInterface 1692 public interface AsyncClosingFunction4< 1693 V1 extends @Nullable Object, 1694 V2 extends @Nullable Object, 1695 V3 extends @Nullable Object, 1696 V4 extends @Nullable Object, 1697 U extends @Nullable Object> { 1698 /** 1699 * Applies this function to four inputs, or throws an exception if unable to do so. 1700 * 1701 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1702 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1703 * is done (but not before this method completes), even if this method throws or the pipeline 1704 * is cancelled. 1705 */ 1706 ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4) 1707 throws Exception; 1708 } 1709 1710 private final ClosingFuture<V1> future1; 1711 private final ClosingFuture<V2> future2; 1712 private final ClosingFuture<V3> future3; 1713 private final ClosingFuture<V4> future4; 1714 1715 private Combiner4( 1716 ClosingFuture<V1> future1, 1717 ClosingFuture<V2> future2, 1718 ClosingFuture<V3> future3, 1719 ClosingFuture<V4> future4) { 1720 super(true, ImmutableList.of(future1, future2, future3, future4)); 1721 this.future1 = future1; 1722 this.future2 = future2; 1723 this.future3 = future3; 1724 this.future4 = future4; 1725 } 1726 1727 /** 1728 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1729 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1730 * objects to be closed when the pipeline is done. 1731 * 1732 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1733 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1734 * 1735 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1736 * 1737 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1738 * ExecutionException} will be extracted and used as the failure of the derived step. 1739 */ 1740 public <U extends @Nullable Object> ClosingFuture<U> call( 1741 final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1742 return call( 1743 new CombiningCallable<U>() { 1744 @Override 1745 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1746 return function.apply( 1747 closer, 1748 peeker.getDone(future1), 1749 peeker.getDone(future2), 1750 peeker.getDone(future3), 1751 peeker.getDone(future4)); 1752 } 1753 1754 @Override 1755 public String toString() { 1756 return function.toString(); 1757 } 1758 }, 1759 executor); 1760 } 1761 1762 /** 1763 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1764 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1765 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1766 * captured by the returned {@link ClosingFuture}). 1767 * 1768 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1769 * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step. 1770 * 1771 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1772 * 1773 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1774 * ExecutionException} will be extracted and used as the failure of the derived step. 1775 * 1776 * <p>If the function throws any other exception, it will be used as the failure of the derived 1777 * step. 1778 * 1779 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1780 * the closeable objects in that {@code ClosingFuture} will be closed. 1781 * 1782 * <p>Usage guidelines for this method: 1783 * 1784 * <ul> 1785 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1786 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1787 * Executor)} instead, with a function that returns the next value directly. 1788 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1789 * closer.eventuallyClose()} for every closeable object this step creates in order to 1790 * capture it for later closing. 1791 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1792 * ClosingFuture} call {@link #from(ListenableFuture)}. 1793 * </ul> 1794 * 1795 * <p>The same warnings about doing heavyweight operations within {@link 1796 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1797 */ 1798 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1799 final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) { 1800 return callAsync( 1801 new AsyncCombiningCallable<U>() { 1802 @Override 1803 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 1804 return function.apply( 1805 closer, 1806 peeker.getDone(future1), 1807 peeker.getDone(future2), 1808 peeker.getDone(future3), 1809 peeker.getDone(future4)); 1810 } 1811 1812 @Override 1813 public String toString() { 1814 return function.toString(); 1815 } 1816 }, 1817 executor); 1818 } 1819 } 1820 1821 /** 1822 * A generic {@link Combiner} that lets you use a lambda or method reference to combine five 1823 * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1824 * ClosingFuture, ClosingFuture)} to start this combination. 1825 * 1826 * @param <V1> the type returned by the first future 1827 * @param <V2> the type returned by the second future 1828 * @param <V3> the type returned by the third future 1829 * @param <V4> the type returned by the fourth future 1830 * @param <V5> the type returned by the fifth future 1831 */ 1832 public static final class Combiner5< 1833 V1 extends @Nullable Object, 1834 V2 extends @Nullable Object, 1835 V3 extends @Nullable Object, 1836 V4 extends @Nullable Object, 1837 V5 extends @Nullable Object> 1838 extends Combiner { 1839 /** 1840 * A function that returns a value when applied to the values of the five futures passed to 1841 * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture, 1842 * ClosingFuture)}. 1843 * 1844 * @param <V1> the type returned by the first future 1845 * @param <V2> the type returned by the second future 1846 * @param <V3> the type returned by the third future 1847 * @param <V4> the type returned by the fourth future 1848 * @param <V5> the type returned by the fifth future 1849 * @param <U> the type returned by the function 1850 */ 1851 @FunctionalInterface 1852 public interface ClosingFunction5< 1853 V1 extends @Nullable Object, 1854 V2 extends @Nullable Object, 1855 V3 extends @Nullable Object, 1856 V4 extends @Nullable Object, 1857 V5 extends @Nullable Object, 1858 U extends @Nullable Object> { 1859 /** 1860 * Applies this function to five inputs, or throws an exception if unable to do so. 1861 * 1862 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1863 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1864 * is done (but not before this method completes), even if this method throws or the pipeline 1865 * is cancelled. 1866 */ 1867 U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5) 1868 throws Exception; 1869 } 1870 1871 /** 1872 * A function that returns a {@link ClosingFuture} when applied to the values of the five 1873 * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, 1874 * ClosingFuture, ClosingFuture)}. 1875 * 1876 * @param <V1> the type returned by the first future 1877 * @param <V2> the type returned by the second future 1878 * @param <V3> the type returned by the third future 1879 * @param <V4> the type returned by the fourth future 1880 * @param <V5> the type returned by the fifth future 1881 * @param <U> the type returned by the function 1882 */ 1883 @FunctionalInterface 1884 public interface AsyncClosingFunction5< 1885 V1 extends @Nullable Object, 1886 V2 extends @Nullable Object, 1887 V3 extends @Nullable Object, 1888 V4 extends @Nullable Object, 1889 V5 extends @Nullable Object, 1890 U extends @Nullable Object> { 1891 /** 1892 * Applies this function to five inputs, or throws an exception if unable to do so. 1893 * 1894 * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, 1895 * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline 1896 * is done (but not before this method completes), even if this method throws or the pipeline 1897 * is cancelled. 1898 */ 1899 ClosingFuture<U> apply( 1900 DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5) 1901 throws Exception; 1902 } 1903 1904 private final ClosingFuture<V1> future1; 1905 private final ClosingFuture<V2> future2; 1906 private final ClosingFuture<V3> future3; 1907 private final ClosingFuture<V4> future4; 1908 private final ClosingFuture<V5> future5; 1909 1910 private Combiner5( 1911 ClosingFuture<V1> future1, 1912 ClosingFuture<V2> future2, 1913 ClosingFuture<V3> future3, 1914 ClosingFuture<V4> future4, 1915 ClosingFuture<V5> future5) { 1916 super(true, ImmutableList.of(future1, future2, future3, future4, future5)); 1917 this.future1 = future1; 1918 this.future2 = future2; 1919 this.future3 = future3; 1920 this.future4 = future4; 1921 this.future5 = future5; 1922 } 1923 1924 /** 1925 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1926 * combining function to their values. The function can use a {@link DeferredCloser} to capture 1927 * objects to be closed when the pipeline is done. 1928 * 1929 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1930 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 1931 * returned step. 1932 * 1933 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1934 * 1935 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1936 * ExecutionException} will be extracted and used as the failure of the derived step. 1937 */ 1938 public <U extends @Nullable Object> ClosingFuture<U> call( 1939 final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 1940 return call( 1941 new CombiningCallable<U>() { 1942 @Override 1943 public U call(DeferredCloser closer, Peeker peeker) throws Exception { 1944 return function.apply( 1945 closer, 1946 peeker.getDone(future1), 1947 peeker.getDone(future2), 1948 peeker.getDone(future3), 1949 peeker.getDone(future4), 1950 peeker.getDone(future5)); 1951 } 1952 1953 @Override 1954 public String toString() { 1955 return function.toString(); 1956 } 1957 }, 1958 executor); 1959 } 1960 1961 /** 1962 * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a 1963 * {@code ClosingFuture}-returning function to their values. The function can use a {@link 1964 * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those 1965 * captured by the returned {@link ClosingFuture}). 1966 * 1967 * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture, 1968 * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the 1969 * returned step. 1970 * 1971 * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled. 1972 * 1973 * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code 1974 * ExecutionException} will be extracted and used as the failure of the derived step. 1975 * 1976 * <p>If the function throws any other exception, it will be used as the failure of the derived 1977 * step. 1978 * 1979 * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of 1980 * the closeable objects in that {@code ClosingFuture} will be closed. 1981 * 1982 * <p>Usage guidelines for this method: 1983 * 1984 * <ul> 1985 * <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a 1986 * {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable, 1987 * Executor)} instead, with a function that returns the next value directly. 1988 * <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) 1989 * closer.eventuallyClose()} for every closeable object this step creates in order to 1990 * capture it for later closing. 1991 * <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code 1992 * ClosingFuture} call {@link #from(ListenableFuture)}. 1993 * </ul> 1994 * 1995 * <p>The same warnings about doing heavyweight operations within {@link 1996 * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here. 1997 */ 1998 public <U extends @Nullable Object> ClosingFuture<U> callAsync( 1999 final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) { 2000 return callAsync( 2001 new AsyncCombiningCallable<U>() { 2002 @Override 2003 public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception { 2004 return function.apply( 2005 closer, 2006 peeker.getDone(future1), 2007 peeker.getDone(future2), 2008 peeker.getDone(future3), 2009 peeker.getDone(future4), 2010 peeker.getDone(future5)); 2011 } 2012 2013 @Override 2014 public String toString() { 2015 return function.toString(); 2016 } 2017 }, 2018 executor); 2019 } 2020 } 2021 2022 @Override 2023 public String toString() { 2024 // TODO(dpb): Better toString, in the style of Futures.transform etc. 2025 return toStringHelper(this).add("state", state.get()).addValue(future).toString(); 2026 } 2027 2028 @Override 2029 protected void finalize() { 2030 if (state.get().equals(OPEN)) { 2031 logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this); 2032 FluentFuture<V> unused = finishToFuture(); 2033 } 2034 } 2035 2036 private static void closeQuietly(final AutoCloseable closeable, Executor executor) { 2037 if (closeable == null) { 2038 return; 2039 } 2040 try { 2041 executor.execute( 2042 new Runnable() { 2043 @Override 2044 public void run() { 2045 try { 2046 closeable.close(); 2047 } catch (Exception e) { 2048 logger.log(WARNING, "thrown by close()", e); 2049 } 2050 } 2051 }); 2052 } catch (RejectedExecutionException e) { 2053 if (logger.isLoggable(WARNING)) { 2054 logger.log( 2055 WARNING, String.format("while submitting close to %s; will close inline", executor), e); 2056 } 2057 closeQuietly(closeable, directExecutor()); 2058 } 2059 } 2060 2061 private void checkAndUpdateState(State oldState, State newState) { 2062 checkState( 2063 compareAndUpdateState(oldState, newState), 2064 "Expected state to be %s, but it was %s", 2065 oldState, 2066 newState); 2067 } 2068 2069 private boolean compareAndUpdateState(State oldState, State newState) { 2070 return state.compareAndSet(oldState, newState); 2071 } 2072 2073 // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap? 2074 private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor> 2075 implements Closeable { 2076 private final DeferredCloser closer = new DeferredCloser(this); 2077 private volatile boolean closed; 2078 private volatile CountDownLatch whenClosed; 2079 2080 <V, U> ListenableFuture<U> applyClosingFunction( 2081 ClosingFunction<? super V, U> transformation, V input) throws Exception { 2082 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2083 CloseableList newCloseables = new CloseableList(); 2084 try { 2085 return immediateFuture(transformation.apply(newCloseables.closer, input)); 2086 } finally { 2087 add(newCloseables, directExecutor()); 2088 } 2089 } 2090 2091 <V, U> FluentFuture<U> applyAsyncClosingFunction( 2092 AsyncClosingFunction<V, U> transformation, V input) throws Exception { 2093 // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList. 2094 CloseableList newCloseables = new CloseableList(); 2095 try { 2096 ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input); 2097 closingFuture.becomeSubsumedInto(newCloseables); 2098 return closingFuture.future; 2099 } finally { 2100 add(newCloseables, directExecutor()); 2101 } 2102 } 2103 2104 @Override 2105 public void close() { 2106 if (closed) { 2107 return; 2108 } 2109 synchronized (this) { 2110 if (closed) { 2111 return; 2112 } 2113 closed = true; 2114 } 2115 for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) { 2116 closeQuietly(entry.getKey(), entry.getValue()); 2117 } 2118 clear(); 2119 if (whenClosed != null) { 2120 whenClosed.countDown(); 2121 } 2122 } 2123 2124 void add(@Nullable AutoCloseable closeable, Executor executor) { 2125 checkNotNull(executor); 2126 if (closeable == null) { 2127 return; 2128 } 2129 synchronized (this) { 2130 if (!closed) { 2131 put(closeable, executor); 2132 return; 2133 } 2134 } 2135 closeQuietly(closeable, executor); 2136 } 2137 2138 /** 2139 * Returns a latch that reaches zero when this objects' deferred closeables have been closed. 2140 */ 2141 CountDownLatch whenClosedCountDown() { 2142 if (closed) { 2143 return new CountDownLatch(0); 2144 } 2145 synchronized (this) { 2146 if (closed) { 2147 return new CountDownLatch(0); 2148 } 2149 checkState(whenClosed == null); 2150 return whenClosed = new CountDownLatch(1); 2151 } 2152 } 2153 } 2154 2155 /** 2156 * Returns an object that can be used to wait until this objects' deferred closeables have all had 2157 * {@link Runnable}s that close them submitted to each one's closing {@link Executor}. 2158 */ 2159 @VisibleForTesting 2160 CountDownLatch whenClosedCountDown() { 2161 return closeables.whenClosedCountDown(); 2162 } 2163 2164 /** The state of a {@link CloseableList}. */ 2165 enum State { 2166 /** The {@link CloseableList} has not been subsumed or closed. */ 2167 OPEN, 2168 2169 /** 2170 * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed 2171 * into any other. 2172 */ 2173 SUBSUMED, 2174 2175 /** 2176 * Some {@link ListenableFuture} has a callback attached that will close the {@link 2177 * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed. 2178 */ 2179 WILL_CLOSE, 2180 2181 /** 2182 * The callback that closes the {@link CloseableList} is running, but it has not completed. The 2183 * {@link CloseableList} may not be subsumed. 2184 */ 2185 CLOSING, 2186 2187 /** The {@link CloseableList} has been closed. It may not be further subsumed. */ 2188 CLOSED, 2189 2190 /** 2191 * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been 2192 * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called. 2193 */ 2194 WILL_CREATE_VALUE_AND_CLOSER, 2195 } 2196}