001/*
002 * Copyright (C) 2017 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Functions.constant;
020import static com.google.common.base.MoreObjects.toStringHelper;
021import static com.google.common.base.Preconditions.checkArgument;
022import static com.google.common.base.Preconditions.checkNotNull;
023import static com.google.common.base.Preconditions.checkState;
024import static com.google.common.collect.Lists.asList;
025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED;
026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING;
027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN;
028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED;
029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE;
030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER;
031import static com.google.common.util.concurrent.Futures.getDone;
032import static com.google.common.util.concurrent.Futures.immediateFuture;
033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
034import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
035import static java.util.logging.Level.FINER;
036import static java.util.logging.Level.SEVERE;
037import static java.util.logging.Level.WARNING;
038
039import com.google.common.annotations.Beta;
040import com.google.common.annotations.VisibleForTesting;
041import com.google.common.base.Function;
042import com.google.common.collect.FluentIterable;
043import com.google.common.collect.ImmutableList;
044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
046import com.google.common.util.concurrent.Futures.FutureCombiner;
047import com.google.errorprone.annotations.CanIgnoreReturnValue;
048import com.google.errorprone.annotations.DoNotMock;
049import com.google.j2objc.annotations.RetainedWith;
050import java.io.Closeable;
051import java.io.IOException;
052import java.util.IdentityHashMap;
053import java.util.Map;
054import java.util.concurrent.Callable;
055import java.util.concurrent.CancellationException;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.ExecutionException;
058import java.util.concurrent.Executor;
059import java.util.concurrent.Future;
060import java.util.concurrent.RejectedExecutionException;
061import java.util.concurrent.atomic.AtomicReference;
062import java.util.logging.Logger;
063import org.checkerframework.checker.nullness.compatqual.NullableDecl;
064
065/**
066 * A step in a pipeline of an asynchronous computation. When the last step in the computation is
067 * complete, some objects captured during the computation are closed.
068 *
069 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an
070 * asynchronously-computed intermediate value, or else an exception that indicates the failure or
071 * cancellation of the operation so far. The only way to extract the value or exception from a step
072 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the
073 * "value" of a successful step or the "result" (value or exception) of any step.
074 *
075 * <ol>
076 *   <li>A pipeline starts at its leaf step (or steps), which is created from either a callable
077 *       block or a {@link ListenableFuture}.
078 *   <li>Each other step is derived from one or more input steps. At each step, zero or more objects
079 *       can be captured for later closing.
080 *   <li>There is one last step (the root of the tree), from which you can extract the final result
081 *       of the computation. After that result is available (or the computation fails), all objects
082 *       captured by any of the steps in the pipeline are closed.
083 * </ol>
084 *
085 * <h3>Starting a pipeline</h3>
086 *
087 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a
088 * callable block} that may capture objects for later closing. To start a pipeline from a {@link
089 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link
090 * #from(ListenableFuture)} instead.
091 *
092 * <h3>Derived steps</h3>
093 *
094 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in
095 * ways similar to {@link FluentFuture}s:
096 *
097 * <ul>
098 *   <li>by transforming the value from a successful input step,
099 *   <li>by catching the exception from a failed input step, or
100 *   <li>by combining the results of several input steps.
101 * </ul>
102 *
103 * Each derivation can capture the next value or any intermediate objects for later closing.
104 *
105 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its
106 * exception, or combine it with others, you cannot do anything else with it, including declare it
107 * to be the last step of the pipeline.
108 *
109 * <h4>Transforming</h4>
110 *
111 * To derive the next step by asynchronously applying a function to an input step's value, call
112 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction,
113 * Executor)} on the input step.
114 *
115 * <h4>Catching</h4>
116 *
117 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction,
118 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step.
119 *
120 * <h4>Combining</h4>
121 *
122 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link
123 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads.
124 *
125 * <h3>Cancelling</h3>
126 *
127 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step
128 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a
129 * successfully cancelled step will immediately start closing all objects captured for later closing
130 * by it and by its input steps.
131 *
132 * <h3>Ending a pipeline</h3>
133 *
134 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to
135 * close the captured objects automatically or manually.
136 *
137 * <h4>Automatically closing</h4>
138 *
139 * You can extract a {@link Future} that represents the result of the last step in the pipeline by
140 * calling {@link #finishToFuture()}. When that final {@link Future} is done, all objects captured
141 * by all steps in the pipeline will be closed.
142 *
143 * <pre>{@code
144 * FluentFuture<UserName> userName =
145 *     ClosingFuture.submit(
146 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
147 *             executor)
148 *         .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
149 *         .transform((closer, result) -> result.get("userName"), directExecutor())
150 *         .catching(DBException.class, e -> "no user", directExecutor())
151 *         .finishToFuture();
152 * }</pre>
153 *
154 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query
155 * result cursor will both be closed, even if the operation is cancelled or fails.
156 *
157 * <h4>Manually closing</h4>
158 *
159 * If you want to close the captured objects manually, after you've used the final result, call
160 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the
161 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects.
162 *
163 * <pre>{@code
164 *     ClosingFuture.submit(
165 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
166 *             executor)
167 *     .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
168 *     .transform((closer, result) -> result.get("userName"), directExecutor())
169 *     .catching(DBException.class, e -> "no user", directExecutor())
170 *     .finishToValueAndCloser(
171 *         valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor);
172 *
173 * // later
174 * try { // get() will throw if the operation failed or was cancelled.
175 *   UserName userName = userNameValueAndCloser.get();
176 *   // do something with userName
177 * } finally {
178 *   userNameValueAndCloser.closeAsync();
179 * }
180 * }</pre>
181 *
182 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and
183 * the query result cursor will both be closed, even if the operation is cancelled or fails.
184 *
185 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The
186 * automatic-closing approach described above is safer.
187 *
188 * @param <V> the type of the value of this step
189 * @since 30.0
190 */
191// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations.
192@Beta // @Beta for one release.
193@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)")
194// TODO(dpb): GWT compatibility.
195public final class ClosingFuture<V> {
196
197  private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName());
198
199  /**
200   * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is
201   * done.
202   */
203  public static final class DeferredCloser {
204    @RetainedWith private final CloseableList list;
205
206    DeferredCloser(CloseableList list) {
207      this.list = list;
208    }
209
210    /**
211     * Captures an object to be closed when a {@link ClosingFuture} pipeline is done.
212     *
213     * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code
214     * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code
215     * Closeable}. (For more about the flavors, see <a
216     * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your
217     * build</a>.)
218     *
219     * <p>Be careful when targeting an older SDK than you are building against (most commonly when
220     * building for Android): Ensure that any object you pass implements the interface not just in
221     * your current SDK version but also at the oldest version you support. For example, <a
222     * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version
223     * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper
224     * {@code Closeable} with a method reference like {@code cursor::close}.
225     *
226     * <p>Note that this method is still binary-compatible between flavors because the erasure of
227     * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}.
228     *
229     * @param closeable the object to be closed (see notes above)
230     * @param closingExecutor the object will be closed on this executor
231     * @return the first argument
232     */
233    @CanIgnoreReturnValue
234    @NullableDecl
235    // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19.
236    public <C extends Object & Closeable> C eventuallyClose(
237        @NullableDecl C closeable, Executor closingExecutor) {
238      checkNotNull(closingExecutor);
239      if (closeable != null) {
240        list.add(closeable, closingExecutor);
241      }
242      return closeable;
243    }
244  }
245
246  /**
247   * An operation that computes a result.
248   *
249   * @param <V> the type of the result
250   */
251  public interface ClosingCallable<V extends Object> {
252    /**
253     * Computes a result, or throws an exception if unable to do so.
254     *
255     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
256     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
257     * not before this method completes), even if this method throws or the pipeline is cancelled.
258     */
259    @NullableDecl
260    V call(DeferredCloser closer) throws Exception;
261  }
262
263  /**
264   * A function from an input to a result.
265   *
266   * @param <T> the type of the input to the function
267   * @param <U> the type of the result of the function
268   */
269  public interface ClosingFunction<T extends Object, U extends Object> {
270
271    /**
272     * Applies this function to an input, or throws an exception if unable to do so.
273     *
274     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
275     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
276     * not before this method completes), even if this method throws or the pipeline is cancelled.
277     */
278    @NullableDecl
279    U apply(DeferredCloser closer, @NullableDecl T input) throws Exception;
280  }
281
282  /**
283   * A function from an input to a {@link ClosingFuture} of a result.
284   *
285   * @param <T> the type of the input to the function
286   * @param <U> the type of the result of the function
287   */
288  public interface AsyncClosingFunction<T extends Object, U extends Object> {
289    /**
290     * Applies this function to an input, or throws an exception if unable to do so.
291     *
292     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
293     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
294     * not before this method completes), even if this method throws or the pipeline is cancelled.
295     */
296    ClosingFuture<U> apply(DeferredCloser closer, @NullableDecl T input) throws Exception;
297  }
298
299  /**
300   * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and
301   * allows the user to close all the closeable objects that were captured during it for later
302   * closing.
303   *
304   * <p>The asynchronous operation will have completed before this object is created.
305   *
306   * @param <V> the type of the value of a successful operation
307   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
308   */
309  public static final class ValueAndCloser<V> {
310
311    private final ClosingFuture<? extends V> closingFuture;
312
313    ValueAndCloser(ClosingFuture<? extends V> closingFuture) {
314      this.closingFuture = checkNotNull(closingFuture);
315    }
316
317    /**
318     * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as
319     * {@link Future#get()} would.
320     *
321     * <p>Because the asynchronous operation has already completed, this method is synchronous and
322     * returns immediately.
323     *
324     * @throws CancellationException if the computation was cancelled
325     * @throws ExecutionException if the computation threw an exception
326     */
327    @NullableDecl
328    public V get() throws ExecutionException {
329      return getDone(closingFuture.future);
330    }
331
332    /**
333     * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous
334     * operation on the {@link Executor}s specified by calls to {@link
335     * DeferredCloser#eventuallyClose(Closeable, Executor)}.
336     *
337     * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be
338     * closed synchronously.
339     *
340     * <p>Idempotent: objects will be closed at most once.
341     */
342    public void closeAsync() {
343      closingFuture.close();
344    }
345  }
346
347  /**
348   * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link
349   * ClosingFuture} pipeline.
350   *
351   * @param <V> the type of the final value of a successful pipeline
352   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
353   */
354  public interface ValueAndCloserConsumer<V> {
355
356    /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */
357    void accept(ValueAndCloser<V> valueAndCloser);
358  }
359
360  /**
361   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
362   *
363   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
364   *     execution
365   */
366  public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) {
367    return new ClosingFuture<>(callable, executor);
368  }
369
370  // TODO(dpb, cpovirk): Do we need submitAsync?
371
372  /**
373   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
374   *
375   * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V}
376   * implements {@link Closeable}. In order to start a pipeline with a value that will be closed
377   * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead.
378   */
379  public static <V> ClosingFuture<V> from(ListenableFuture<V> future) {
380    return new ClosingFuture<V>(future);
381  }
382
383  /**
384   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
385   *
386   * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when
387   * the pipeline is done, even if the pipeline is canceled or fails.
388   *
389   * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its
390   * value in order to close it.
391   *
392   * @param future the future to create the {@code ClosingFuture} from. For discussion of the
393   *     future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable,
394   *     Executor)}.
395   * @param closingExecutor the future's result will be closed on this executor
396   * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the
397   *     underlying value may never be closed if the {@link Future} is canceled after its operation
398   *     begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types,
399   *     including those that pass them to this method, with {@link #submit(ClosingCallable,
400   *     Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a
401   *     {@link ListenableFuture} that doesn't create values that should be closed, use {@link
402   *     ClosingFuture#from}.
403   */
404  @Deprecated
405  // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19.
406  public static <C extends Object & Closeable> ClosingFuture<C> eventuallyClosing(
407      ListenableFuture<C> future, final Executor closingExecutor) {
408    checkNotNull(closingExecutor);
409    final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future));
410    Futures.addCallback(
411        future,
412        new FutureCallback<Closeable>() {
413          @Override
414          public void onSuccess(@NullableDecl Closeable result) {
415            closingFuture.closeables.closer.eventuallyClose(result, closingExecutor);
416          }
417
418          @Override
419          public void onFailure(Throwable t) {}
420        },
421        directExecutor());
422    return closingFuture;
423  }
424
425  /**
426   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
427   *
428   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
429   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
430   */
431  public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) {
432    return new Combiner(false, futures);
433  }
434
435  /**
436   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
437   *
438   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
439   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
440   */
441  public static Combiner whenAllComplete(
442      ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) {
443    return whenAllComplete(asList(future1, moreFutures));
444  }
445
446  /**
447   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
448   * all succeed. If any fail, the resulting pipeline will fail.
449   *
450   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
451   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
452   */
453  public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) {
454    return new Combiner(true, futures);
455  }
456
457  /**
458   * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming
459   * they all succeed. If any fail, the resulting pipeline will fail.
460   *
461   * <p>Calling this method allows you to use lambdas or method references typed with the types of
462   * the input {@link ClosingFuture}s.
463   *
464   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
465   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
466   */
467  public static <V1, V2> Combiner2<V1, V2> whenAllSucceed(
468      ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
469    return new Combiner2<>(future1, future2);
470  }
471
472  /**
473   * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming
474   * they all succeed. If any fail, the resulting pipeline will fail.
475   *
476   * <p>Calling this method allows you to use lambdas or method references typed with the types of
477   * the input {@link ClosingFuture}s.
478   *
479   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
480   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
481   */
482  public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed(
483      ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
484    return new Combiner3<>(future1, future2, future3);
485  }
486
487  /**
488   * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming
489   * they all succeed. If any fail, the resulting pipeline will fail.
490   *
491   * <p>Calling this method allows you to use lambdas or method references typed with the types of
492   * the input {@link ClosingFuture}s.
493   *
494   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
495   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
496   */
497  public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed(
498      ClosingFuture<V1> future1,
499      ClosingFuture<V2> future2,
500      ClosingFuture<V3> future3,
501      ClosingFuture<V4> future4) {
502    return new Combiner4<>(future1, future2, future3, future4);
503  }
504
505  /**
506   * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming
507   * they all succeed. If any fail, the resulting pipeline will fail.
508   *
509   * <p>Calling this method allows you to use lambdas or method references typed with the types of
510   * the input {@link ClosingFuture}s.
511   *
512   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
513   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
514   */
515  public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed(
516      ClosingFuture<V1> future1,
517      ClosingFuture<V2> future2,
518      ClosingFuture<V3> future3,
519      ClosingFuture<V4> future4,
520      ClosingFuture<V5> future5) {
521    return new Combiner5<>(future1, future2, future3, future4, future5);
522  }
523
524  /**
525   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
526   * all succeed. If any fail, the resulting pipeline will fail.
527   *
528   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
529   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
530   */
531  public static Combiner whenAllSucceed(
532      ClosingFuture<?> future1,
533      ClosingFuture<?> future2,
534      ClosingFuture<?> future3,
535      ClosingFuture<?> future4,
536      ClosingFuture<?> future5,
537      ClosingFuture<?> future6,
538      ClosingFuture<?>... moreFutures) {
539    return whenAllSucceed(
540        FluentIterable.of(future1, future2, future3, future4, future5, future6)
541            .append(moreFutures));
542  }
543
544  private final AtomicReference<State> state = new AtomicReference<>(OPEN);
545  private final CloseableList closeables = new CloseableList();
546  private final FluentFuture<V> future;
547
548  private ClosingFuture(ListenableFuture<V> future) {
549    this.future = FluentFuture.from(future);
550  }
551
552  private ClosingFuture(final ClosingCallable<V> callable, Executor executor) {
553    checkNotNull(callable);
554    TrustedListenableFutureTask<V> task =
555        TrustedListenableFutureTask.create(
556            new Callable<V>() {
557              @Override
558              public V call() throws Exception {
559                return callable.call(closeables.closer);
560              }
561
562              @Override
563              public String toString() {
564                return callable.toString();
565              }
566            });
567    executor.execute(task);
568    this.future = task;
569  }
570
571  /**
572   * Returns a future that finishes when this step does. Calling {@code get()} on the returned
573   * future returns {@code null} if the step is successful or throws the same exception that would
574   * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code
575   * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline.
576   *
577   * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls
578   * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or
579   * a derivation method <i>on the same instance</i>. This is important because calling {@code
580   * statusFuture} alone does not provide a way to close the pipeline.
581   */
582  public ListenableFuture<?> statusFuture() {
583    return nonCancellationPropagating(future.transform(constant(null), directExecutor()));
584  }
585
586  /**
587   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
588   * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed
589   * when the pipeline is done.
590   *
591   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
592   * ClosingFuture} will be equivalent to this one.
593   *
594   * <p>If the function throws an exception, that exception is used as the result of the derived
595   * {@code ClosingFuture}.
596   *
597   * <p>Example usage:
598   *
599   * <pre>{@code
600   * ClosingFuture<List<Row>> rowsFuture =
601   *     queryFuture.transform((closer, result) -> result.getRows(), executor);
602   * }</pre>
603   *
604   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
605   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
606   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
607   *
608   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
609   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
610   * this {@code ClosingFuture}.
611   *
612   * @param function transforms the value of this step to the value of the derived step
613   * @param executor executor to run the function in
614   * @return the derived step
615   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
616   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
617   *     finished}
618   */
619  public <U> ClosingFuture<U> transform(
620      final ClosingFunction<? super V, U> function, Executor executor) {
621    checkNotNull(function);
622    AsyncFunction<V, U> applyFunction =
623        new AsyncFunction<V, U>() {
624          @Override
625          public ListenableFuture<U> apply(V input) throws Exception {
626            return closeables.applyClosingFunction(function, input);
627          }
628
629          @Override
630          public String toString() {
631            return function.toString();
632          }
633        };
634    // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function).
635    return derive(future.transformAsync(applyFunction, executor));
636  }
637
638  /**
639   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
640   * that returns a {@code ClosingFuture} to its value. The function can use a {@link
641   * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
642   * captured by the returned {@link ClosingFuture}).
643   *
644   * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one
645   * returned by the function.
646   *
647   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
648   * ClosingFuture} will be equivalent to this one.
649   *
650   * <p>If the function throws an exception, that exception is used as the result of the derived
651   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
652   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
653   * closed.
654   *
655   * <p>Usage guidelines for this method:
656   *
657   * <ul>
658   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
659   *       {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction,
660   *       Executor)} instead, with a function that returns the next value directly.
661   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
662   *       for every closeable object this step creates in order to capture it for later closing.
663   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
664   *       ClosingFuture} call {@link #from(ListenableFuture)}.
665   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
666   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
667   *       {@link #withoutCloser(AsyncFunction)}
668   * </ul>
669   *
670   * <p>Example usage:
671   *
672   * <pre>{@code
673   * // Result.getRowsClosingFuture() returns a ClosingFuture.
674   * ClosingFuture<List<Row>> rowsFuture =
675   *     queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor);
676   *
677   * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the
678   * // number of written rows. openOutputFile() returns a FileOutputStream (which implements
679   * // Closeable).
680   * ClosingFuture<Integer> rowsFuture2 =
681   *     queryFuture.transformAsync(
682   *         (closer, result) -> {
683   *           FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor);
684   *           return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos));
685   *      },
686   *      executor);
687   *
688   * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created).
689   * ClosingFuture<List<Row>> rowsFuture3 =
690   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
691   *
692   * }</pre>
693   *
694   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
695   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
696   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
697   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
698   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
699   * responsible for completing the returned {@code ClosingFuture}.)
700   *
701   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
702   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
703   * this {@code ClosingFuture}.
704   *
705   * @param function transforms the value of this step to a {@code ClosingFuture} with the value of
706   *     the derived step
707   * @param executor executor to run the function in
708   * @return the derived step
709   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
710   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
711   *     finished}
712   */
713  public <U> ClosingFuture<U> transformAsync(
714      final AsyncClosingFunction<? super V, U> function, Executor executor) {
715    checkNotNull(function);
716    AsyncFunction<V, U> applyFunction =
717        new AsyncFunction<V, U>() {
718          @Override
719          public ListenableFuture<U> apply(V input) throws Exception {
720            return closeables.applyAsyncClosingFunction(function, input);
721          }
722
723          @Override
724          public String toString() {
725            return function.toString();
726          }
727        };
728    return derive(future.transformAsync(applyFunction, executor));
729  }
730
731  /**
732   * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input,
733   * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned
734   * {@link ListenableFuture}.
735   *
736   * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction,
737   * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it
738   * meets these conditions:
739   *
740   * <ul>
741   *   <li>It does not need to capture any {@link Closeable} objects by calling {@link
742   *       DeferredCloser#eventuallyClose(Closeable, Executor)}.
743   *   <li>It returns a {@link ListenableFuture}.
744   * </ul>
745   *
746   * <p>Example usage:
747   *
748   * <pre>{@code
749   * // Result.getRowsFuture() returns a ListenableFuture.
750   * ClosingFuture<List<Row>> rowsFuture =
751   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
752   * }</pre>
753   *
754   * @param function transforms the value of a {@code ClosingFuture} step to a {@link
755   *     ListenableFuture} with the value of a derived step
756   */
757  public static <V, U> AsyncClosingFunction<V, U> withoutCloser(
758      final AsyncFunction<V, U> function) {
759    checkNotNull(function);
760    return new AsyncClosingFunction<V, U>() {
761      @Override
762      public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception {
763        return ClosingFuture.from(function.apply(input));
764      }
765    };
766  }
767
768  /**
769   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
770   * to its exception if it is an instance of a given exception type. The function can use a {@link
771   * DeferredCloser} to capture objects to be closed when the pipeline is done.
772   *
773   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
774   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
775   * one.
776   *
777   * <p>If the function throws an exception, that exception is used as the result of the derived
778   * {@code ClosingFuture}.
779   *
780   * <p>Example usage:
781   *
782   * <pre>{@code
783   * ClosingFuture<QueryResult> queryFuture =
784   *     queryFuture.catching(
785   *         QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor);
786   * }</pre>
787   *
788   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
789   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
790   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
791   *
792   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
793   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
794   * this {@code ClosingFuture}.
795   *
796   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
797   *     type is matched against this step's exception. "This step's exception" means the cause of
798   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
799   *     underlying this step or, if {@code get()} throws a different kind of exception, that
800   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
801   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
802   * @param fallback the function to be called if this step fails with the expected exception type.
803   *     The function's argument is this step's exception. "This step's exception" means the cause
804   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
805   *     underlying this step or, if {@code get()} throws a different kind of exception, that
806   *     exception itself.
807   * @param executor the executor that runs {@code fallback} if the input fails
808   */
809  public <X extends Throwable> ClosingFuture<V> catching(
810      Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) {
811    return catchingMoreGeneric(exceptionType, fallback, executor);
812  }
813
814  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
815  private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric(
816      Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) {
817    checkNotNull(fallback);
818    AsyncFunction<X, W> applyFallback =
819        new AsyncFunction<X, W>() {
820          @Override
821          public ListenableFuture<W> apply(X exception) throws Exception {
822            return closeables.applyClosingFunction(fallback, exception);
823          }
824
825          @Override
826          public String toString() {
827            return fallback.toString();
828          }
829        };
830    // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function).
831    return derive(future.catchingAsync(exceptionType, applyFallback, executor));
832  }
833
834  /**
835   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
836   * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception
837   * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the
838   * pipeline is done (other than those captured by the returned {@link ClosingFuture}).
839   *
840   * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code
841   * ClosingFuture} will be equivalent to the one returned by the function.
842   *
843   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
844   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
845   * one.
846   *
847   * <p>If the function throws an exception, that exception is used as the result of the derived
848   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
849   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
850   * closed.
851   *
852   * <p>Usage guidelines for this method:
853   *
854   * <ul>
855   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
856   *       {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class,
857   *       ClosingFunction, Executor)} instead, with a function that returns the next value
858   *       directly.
859   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
860   *       for every closeable object this step creates in order to capture it for later closing.
861   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
862   *       ClosingFuture} call {@link #from(ListenableFuture)}.
863   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
864   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
865   *       {@link #withoutCloser(AsyncFunction)}
866   * </ul>
867   *
868   * <p>Example usage:
869   *
870   * <pre>{@code
871   * // Fall back to a secondary input stream in case of IOException.
872   * ClosingFuture<InputStream> inputFuture =
873   *     firstInputFuture.catchingAsync(
874   *         IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor);
875   * }
876   * }</pre>
877   *
878   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
879   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
880   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
881   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
882   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
883   * responsible for completing the returned {@code ClosingFuture}.)
884   *
885   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
886   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
887   * this {@code ClosingFuture}.
888   *
889   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
890   *     type is matched against this step's exception. "This step's exception" means the cause of
891   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
892   *     underlying this step or, if {@code get()} throws a different kind of exception, that
893   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
894   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
895   * @param fallback the function to be called if this step fails with the expected exception type.
896   *     The function's argument is this step's exception. "This step's exception" means the cause
897   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
898   *     underlying this step or, if {@code get()} throws a different kind of exception, that
899   *     exception itself.
900   * @param executor the executor that runs {@code fallback} if the input fails
901   */
902  // TODO(dpb): Should this do something special if the function throws CancellationException or
903  // ExecutionException?
904  public <X extends Throwable> ClosingFuture<V> catchingAsync(
905      Class<X> exceptionType,
906      AsyncClosingFunction<? super X, ? extends V> fallback,
907      Executor executor) {
908    return catchingAsyncMoreGeneric(exceptionType, fallback, executor);
909  }
910
911  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
912  private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric(
913      Class<X> exceptionType,
914      final AsyncClosingFunction<? super X, W> fallback,
915      Executor executor) {
916    checkNotNull(fallback);
917    AsyncFunction<X, W> asyncFunction =
918        new AsyncFunction<X, W>() {
919          @Override
920          public ListenableFuture<W> apply(X exception) throws Exception {
921            return closeables.applyAsyncClosingFunction(fallback, exception);
922          }
923
924          @Override
925          public String toString() {
926            return fallback.toString();
927          }
928        };
929    return derive(future.catchingAsync(exceptionType, asyncFunction, executor));
930  }
931
932  /**
933   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When the returned
934   * {@link Future} is done, all objects captured for closing during the pipeline's computation will
935   * be closed.
936   *
937   * <p>After calling this method, you may not call {@link
938   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other
939   * derivation method on this {@code ClosingFuture}.
940   *
941   * @return a {@link Future} that represents the final value or exception of the pipeline
942   */
943  public FluentFuture<V> finishToFuture() {
944    if (compareAndUpdateState(OPEN, WILL_CLOSE)) {
945      logger.log(FINER, "will close {0}", this);
946      future.addListener(
947          new Runnable() {
948            @Override
949            public void run() {
950              checkAndUpdateState(WILL_CLOSE, CLOSING);
951              close();
952              checkAndUpdateState(CLOSING, CLOSED);
953            }
954          },
955          directExecutor());
956    } else {
957      switch (state.get()) {
958        case SUBSUMED:
959          throw new IllegalStateException(
960              "Cannot call finishToFuture() after deriving another step");
961
962        case WILL_CREATE_VALUE_AND_CLOSER:
963          throw new IllegalStateException(
964              "Cannot call finishToFuture() after calling finishToValueAndCloser()");
965
966        case WILL_CLOSE:
967        case CLOSING:
968        case CLOSED:
969          throw new IllegalStateException("Cannot call finishToFuture() twice");
970
971        case OPEN:
972          throw new AssertionError();
973      }
974    }
975    return future;
976  }
977
978  /**
979   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done,
980   * {@code receiver} will be called with an object that contains the result of the operation. The
981   * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use.
982   *
983   * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or
984   * any other derivation method on this {@code ClosingFuture}.
985   *
986   * @param consumer a callback whose method will be called (using {@code executor}) when this
987   *     operation is done
988   */
989  public void finishToValueAndCloser(
990      final ValueAndCloserConsumer<? super V> consumer, Executor executor) {
991    checkNotNull(consumer);
992    if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) {
993      switch (state.get()) {
994        case SUBSUMED:
995          throw new IllegalStateException(
996              "Cannot call finishToValueAndCloser() after deriving another step");
997
998        case WILL_CLOSE:
999        case CLOSING:
1000        case CLOSED:
1001          throw new IllegalStateException(
1002              "Cannot call finishToValueAndCloser() after calling finishToFuture()");
1003
1004        case WILL_CREATE_VALUE_AND_CLOSER:
1005          throw new IllegalStateException("Cannot call finishToValueAndCloser() twice");
1006
1007        case OPEN:
1008          break;
1009      }
1010      throw new AssertionError(state);
1011    }
1012    future.addListener(
1013        new Runnable() {
1014          @Override
1015          public void run() {
1016            provideValueAndCloser(consumer, ClosingFuture.this);
1017          }
1018        },
1019        executor);
1020  }
1021
1022  private static <C, V extends C> void provideValueAndCloser(
1023      ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) {
1024    consumer.accept(new ValueAndCloser<C>(closingFuture));
1025  }
1026
1027  /**
1028   * Attempts to cancel execution of this step. This attempt will fail if the step has already
1029   * completed, has already been cancelled, or could not be cancelled for some other reason. If
1030   * successful, and this step has not started when {@code cancel} is called, this step should never
1031   * run.
1032   *
1033   * <p>If successful, causes the objects captured by this step (if already started) and its input
1034   * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls
1035   * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously.
1036   *
1037   * @param mayInterruptIfRunning {@code true} if the thread executing this task should be
1038   *     interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be
1039   *     cancelled regardless
1040   * @return {@code false} if the step could not be cancelled, typically because it has already
1041   *     completed normally; {@code true} otherwise
1042   */
1043  @CanIgnoreReturnValue
1044  public boolean cancel(boolean mayInterruptIfRunning) {
1045    logger.log(FINER, "cancelling {0}", this);
1046    boolean cancelled = future.cancel(mayInterruptIfRunning);
1047    if (cancelled) {
1048      close();
1049    }
1050    return cancelled;
1051  }
1052
1053  private void close() {
1054    logger.log(FINER, "closing {0}", this);
1055    closeables.close();
1056  }
1057
1058  private <U> ClosingFuture<U> derive(FluentFuture<U> future) {
1059    ClosingFuture<U> derived = new ClosingFuture<>(future);
1060    becomeSubsumedInto(derived.closeables);
1061    return derived;
1062  }
1063
1064  private void becomeSubsumedInto(CloseableList otherCloseables) {
1065    checkAndUpdateState(OPEN, SUBSUMED);
1066    otherCloseables.add(closeables, directExecutor());
1067  }
1068
1069  /**
1070   * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link
1071   * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}.
1072   *
1073   * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object.
1074   */
1075  public static final class Peeker {
1076    private final ImmutableList<ClosingFuture<?>> futures;
1077    private volatile boolean beingCalled;
1078
1079    private Peeker(ImmutableList<ClosingFuture<?>> futures) {
1080      this.futures = checkNotNull(futures);
1081    }
1082
1083    /**
1084     * Returns the value of {@code closingFuture}.
1085     *
1086     * @throws ExecutionException if {@code closingFuture} is a failed step
1087     * @throws CancellationException if the {@code closingFuture}'s future was cancelled
1088     * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to
1089     *     {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)}
1090     * @throws IllegalStateException if called outside of a call to {@link
1091     *     CombiningCallable#call(DeferredCloser, Peeker)} or {@link
1092     *     AsyncCombiningCallable#call(DeferredCloser, Peeker)}
1093     */
1094    @NullableDecl
1095    public final <D extends Object> D getDone(ClosingFuture<D> closingFuture)
1096        throws ExecutionException {
1097      checkState(beingCalled);
1098      checkArgument(futures.contains(closingFuture));
1099      return Futures.getDone(closingFuture.future);
1100    }
1101
1102    @NullableDecl
1103    private <V extends Object> V call(CombiningCallable<V> combiner, CloseableList closeables)
1104        throws Exception {
1105      beingCalled = true;
1106      CloseableList newCloseables = new CloseableList();
1107      try {
1108        return combiner.call(newCloseables.closer, this);
1109      } finally {
1110        closeables.add(newCloseables, directExecutor());
1111        beingCalled = false;
1112      }
1113    }
1114
1115    private <V extends Object> FluentFuture<V> callAsync(
1116        AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1117      beingCalled = true;
1118      CloseableList newCloseables = new CloseableList();
1119      try {
1120        ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this);
1121        closingFuture.becomeSubsumedInto(closeables);
1122        return closingFuture.future;
1123      } finally {
1124        closeables.add(newCloseables, directExecutor());
1125        beingCalled = false;
1126      }
1127    }
1128  }
1129
1130  /**
1131   * A builder of a {@link ClosingFuture} step that is derived from more than one input step.
1132   *
1133   * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to
1134   * instantiate this class.
1135   *
1136   * <p>Example:
1137   *
1138   * <pre>{@code
1139   * final ClosingFuture<BufferedReader> file1ReaderFuture = ...;
1140   * final ClosingFuture<BufferedReader> file2ReaderFuture = ...;
1141   * ListenableFuture<Integer> numberOfDifferentLines =
1142   *       ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture)
1143   *           .call(
1144   *               (closer, peeker) -> {
1145   *                 BufferedReader file1Reader = peeker.getDone(file1ReaderFuture);
1146   *                 BufferedReader file2Reader = peeker.getDone(file2ReaderFuture);
1147   *                 return countDifferentLines(file1Reader, file2Reader);
1148   *               },
1149   *               executor)
1150   *           .closing(executor);
1151   * }</pre>
1152   */
1153  // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8.
1154  @com.google.errorprone.annotations.DoNotMock(
1155      "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.")
1156  public static class Combiner {
1157
1158    private final CloseableList closeables = new CloseableList();
1159
1160    /**
1161     * An operation that returns a result and may throw an exception.
1162     *
1163     * @param <V> the type of the result
1164     */
1165    public interface CombiningCallable<V extends Object> {
1166      /**
1167       * Computes a result, or throws an exception if unable to do so.
1168       *
1169       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1170       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1171       * is done (but not before this method completes), even if this method throws or the pipeline
1172       * is cancelled.
1173       *
1174       * @param peeker used to get the value of any of the input futures
1175       */
1176      @NullableDecl
1177      V call(DeferredCloser closer, Peeker peeker) throws Exception;
1178    }
1179
1180    /**
1181     * An operation that returns a {@link ClosingFuture} result and may throw an exception.
1182     *
1183     * @param <V> the type of the result
1184     */
1185    public interface AsyncCombiningCallable<V extends Object> {
1186      /**
1187       * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so.
1188       *
1189       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1190       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1191       * is done (but not before this method completes), even if this method throws or the pipeline
1192       * is cancelled.
1193       *
1194       * @param peeker used to get the value of any of the input futures
1195       */
1196      ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception;
1197    }
1198
1199    private final boolean allMustSucceed;
1200    protected final ImmutableList<ClosingFuture<?>> inputs;
1201
1202    private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) {
1203      this.allMustSucceed = allMustSucceed;
1204      this.inputs = ImmutableList.copyOf(inputs);
1205      for (ClosingFuture<?> input : inputs) {
1206        input.becomeSubsumedInto(closeables);
1207      }
1208    }
1209
1210    /**
1211     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1212     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1213     * objects to be closed when the pipeline is done.
1214     *
1215     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1216     * fail, so will the returned step.
1217     *
1218     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1219     * cancelled.
1220     *
1221     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1222     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1223     */
1224    public <V> ClosingFuture<V> call(
1225        final CombiningCallable<V> combiningCallable, Executor executor) {
1226      Callable<V> callable =
1227          new Callable<V>() {
1228            @Override
1229            public V call() throws Exception {
1230              return new Peeker(inputs).call(combiningCallable, closeables);
1231            }
1232
1233            @Override
1234            public String toString() {
1235              return combiningCallable.toString();
1236            }
1237          };
1238      ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor));
1239      derived.closeables.add(closeables, directExecutor());
1240      return derived;
1241    }
1242
1243    /**
1244     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1245     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1246     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1247     * captured by the returned {@link ClosingFuture}).
1248     *
1249     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1250     * fail, so will the returned step.
1251     *
1252     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1253     * cancelled.
1254     *
1255     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1256     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1257     *
1258     * <p>If the combiningCallable throws any other exception, it will be used as the failure of the
1259     * derived step.
1260     *
1261     * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture},
1262     * then none of the closeable objects in that {@code ClosingFuture} will be closed.
1263     *
1264     * <p>Usage guidelines for this method:
1265     *
1266     * <ul>
1267     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1268     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1269     *       Executor)} instead, with a function that returns the next value directly.
1270     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1271     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1272     *       capture it for later closing.
1273     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1274     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1275     * </ul>
1276     *
1277     * <p>The same warnings about doing heavyweight operations within {@link
1278     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1279     */
1280    public <V> ClosingFuture<V> callAsync(
1281        final AsyncCombiningCallable<V> combiningCallable, Executor executor) {
1282      AsyncCallable<V> asyncCallable =
1283          new AsyncCallable<V>() {
1284            @Override
1285            public ListenableFuture<V> call() throws Exception {
1286              return new Peeker(inputs).callAsync(combiningCallable, closeables);
1287            }
1288
1289            @Override
1290            public String toString() {
1291              return combiningCallable.toString();
1292            }
1293          };
1294      ClosingFuture<V> derived =
1295          new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor));
1296      derived.closeables.add(closeables, directExecutor());
1297      return derived;
1298    }
1299
1300    private FutureCombiner<Object> futureCombiner() {
1301      return allMustSucceed
1302          ? Futures.whenAllSucceed(inputFutures())
1303          : Futures.whenAllComplete(inputFutures());
1304    }
1305
1306    private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE =
1307        new Function<ClosingFuture<?>, FluentFuture<?>>() {
1308          @Override
1309          public FluentFuture<?> apply(ClosingFuture<?> future) {
1310            return future.future;
1311          }
1312        };
1313
1314    private ImmutableList<FluentFuture<?>> inputFutures() {
1315      return FluentIterable.from(inputs).transform(INNER_FUTURE).toList();
1316    }
1317  }
1318
1319  /**
1320   * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link
1321   * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this
1322   * combination.
1323   *
1324   * @param <V1> the type returned by the first future
1325   * @param <V2> the type returned by the second future
1326   */
1327  public static final class Combiner2<V1 extends Object, V2 extends Object> extends Combiner {
1328
1329    /**
1330     * A function that returns a value when applied to the values of the two futures passed to
1331     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1332     *
1333     * @param <V1> the type returned by the first future
1334     * @param <V2> the type returned by the second future
1335     * @param <U> the type returned by the function
1336     */
1337    public interface ClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> {
1338
1339      /**
1340       * Applies this function to two inputs, or throws an exception if unable to do so.
1341       *
1342       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1343       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1344       * is done (but not before this method completes), even if this method throws or the pipeline
1345       * is cancelled.
1346       */
1347      @NullableDecl
1348      U apply(DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2)
1349          throws Exception;
1350    }
1351
1352    /**
1353     * A function that returns a {@link ClosingFuture} when applied to the values of the two futures
1354     * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1355     *
1356     * @param <V1> the type returned by the first future
1357     * @param <V2> the type returned by the second future
1358     * @param <U> the type returned by the function
1359     */
1360    public interface AsyncClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> {
1361
1362      /**
1363       * Applies this function to two inputs, or throws an exception if unable to do so.
1364       *
1365       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1366       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1367       * is done (but not before this method completes), even if this method throws or the pipeline
1368       * is cancelled.
1369       */
1370      ClosingFuture<U> apply(
1371          DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2) throws Exception;
1372    }
1373
1374    private final ClosingFuture<V1> future1;
1375    private final ClosingFuture<V2> future2;
1376
1377    private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
1378      super(true, ImmutableList.of(future1, future2));
1379      this.future1 = future1;
1380      this.future2 = future2;
1381    }
1382
1383    /**
1384     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1385     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1386     * objects to be closed when the pipeline is done.
1387     *
1388     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1389     * any of the inputs fail, so will the returned step.
1390     *
1391     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1392     *
1393     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1394     * ExecutionException} will be extracted and used as the failure of the derived step.
1395     */
1396    public <U extends Object> ClosingFuture<U> call(
1397        final ClosingFunction2<V1, V2, U> function, Executor executor) {
1398      return call(
1399          new CombiningCallable<U>() {
1400            @Override
1401            @NullableDecl
1402            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1403              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1404            }
1405
1406            @Override
1407            public String toString() {
1408              return function.toString();
1409            }
1410          },
1411          executor);
1412    }
1413
1414    /**
1415     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1416     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1417     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1418     * captured by the returned {@link ClosingFuture}).
1419     *
1420     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1421     * any of the inputs fail, so will the returned step.
1422     *
1423     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1424     *
1425     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1426     * ExecutionException} will be extracted and used as the failure of the derived step.
1427     *
1428     * <p>If the function throws any other exception, it will be used as the failure of the derived
1429     * step.
1430     *
1431     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1432     * the closeable objects in that {@code ClosingFuture} will be closed.
1433     *
1434     * <p>Usage guidelines for this method:
1435     *
1436     * <ul>
1437     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1438     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1439     *       Executor)} instead, with a function that returns the next value directly.
1440     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1441     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1442     *       capture it for later closing.
1443     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1444     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1445     * </ul>
1446     *
1447     * <p>The same warnings about doing heavyweight operations within {@link
1448     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1449     */
1450    public <U extends Object> ClosingFuture<U> callAsync(
1451        final AsyncClosingFunction2<V1, V2, U> function, Executor executor) {
1452      return callAsync(
1453          new AsyncCombiningCallable<U>() {
1454            @Override
1455            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1456              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1457            }
1458
1459            @Override
1460            public String toString() {
1461              return function.toString();
1462            }
1463          },
1464          executor);
1465    }
1466  }
1467
1468  /**
1469   * A generic {@link Combiner} that lets you use a lambda or method reference to combine three
1470   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1471   * ClosingFuture)} to start this combination.
1472   *
1473   * @param <V1> the type returned by the first future
1474   * @param <V2> the type returned by the second future
1475   * @param <V3> the type returned by the third future
1476   */
1477  public static final class Combiner3<V1 extends Object, V2 extends Object, V3 extends Object>
1478      extends Combiner {
1479    /**
1480     * A function that returns a value when applied to the values of the three futures passed to
1481     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1482     *
1483     * @param <V1> the type returned by the first future
1484     * @param <V2> the type returned by the second future
1485     * @param <V3> the type returned by the third future
1486     * @param <U> the type returned by the function
1487     */
1488    public interface ClosingFunction3<
1489        V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> {
1490      /**
1491       * Applies this function to three inputs, or throws an exception if unable to do so.
1492       *
1493       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1494       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1495       * is done (but not before this method completes), even if this method throws or the pipeline
1496       * is cancelled.
1497       */
1498      @NullableDecl
1499      U apply(
1500          DeferredCloser closer,
1501          @NullableDecl V1 value1,
1502          @NullableDecl V2 value2,
1503          @NullableDecl V3 v3)
1504          throws Exception;
1505    }
1506
1507    /**
1508     * A function that returns a {@link ClosingFuture} when applied to the values of the three
1509     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1510     *
1511     * @param <V1> the type returned by the first future
1512     * @param <V2> the type returned by the second future
1513     * @param <V3> the type returned by the third future
1514     * @param <U> the type returned by the function
1515     */
1516    public interface AsyncClosingFunction3<
1517        V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> {
1518      /**
1519       * Applies this function to three inputs, or throws an exception if unable to do so.
1520       *
1521       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1522       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1523       * is done (but not before this method completes), even if this method throws or the pipeline
1524       * is cancelled.
1525       */
1526      ClosingFuture<U> apply(
1527          DeferredCloser closer,
1528          @NullableDecl V1 value1,
1529          @NullableDecl V2 value2,
1530          @NullableDecl V3 value3)
1531          throws Exception;
1532    }
1533
1534    private final ClosingFuture<V1> future1;
1535    private final ClosingFuture<V2> future2;
1536    private final ClosingFuture<V3> future3;
1537
1538    private Combiner3(
1539        ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
1540      super(true, ImmutableList.of(future1, future2, future3));
1541      this.future1 = future1;
1542      this.future2 = future2;
1543      this.future3 = future3;
1544    }
1545
1546    /**
1547     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1548     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1549     * objects to be closed when the pipeline is done.
1550     *
1551     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1552     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1553     *
1554     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1555     *
1556     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1557     * ExecutionException} will be extracted and used as the failure of the derived step.
1558     */
1559    public <U extends Object> ClosingFuture<U> call(
1560        final ClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1561      return call(
1562          new CombiningCallable<U>() {
1563            @Override
1564            @NullableDecl
1565            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1566              return function.apply(
1567                  closer,
1568                  peeker.getDone(future1),
1569                  peeker.getDone(future2),
1570                  peeker.getDone(future3));
1571            }
1572
1573            @Override
1574            public String toString() {
1575              return function.toString();
1576            }
1577          },
1578          executor);
1579    }
1580
1581    /**
1582     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1583     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1584     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1585     * captured by the returned {@link ClosingFuture}).
1586     *
1587     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1588     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1589     *
1590     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1591     *
1592     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1593     * ExecutionException} will be extracted and used as the failure of the derived step.
1594     *
1595     * <p>If the function throws any other exception, it will be used as the failure of the derived
1596     * step.
1597     *
1598     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1599     * the closeable objects in that {@code ClosingFuture} will be closed.
1600     *
1601     * <p>Usage guidelines for this method:
1602     *
1603     * <ul>
1604     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1605     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1606     *       Executor)} instead, with a function that returns the next value directly.
1607     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1608     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1609     *       capture it for later closing.
1610     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1611     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1612     * </ul>
1613     *
1614     * <p>The same warnings about doing heavyweight operations within {@link
1615     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1616     */
1617    public <U extends Object> ClosingFuture<U> callAsync(
1618        final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1619      return callAsync(
1620          new AsyncCombiningCallable<U>() {
1621            @Override
1622            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1623              return function.apply(
1624                  closer,
1625                  peeker.getDone(future1),
1626                  peeker.getDone(future2),
1627                  peeker.getDone(future3));
1628            }
1629
1630            @Override
1631            public String toString() {
1632              return function.toString();
1633            }
1634          },
1635          executor);
1636    }
1637  }
1638
1639  /**
1640   * A generic {@link Combiner} that lets you use a lambda or method reference to combine four
1641   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1642   * ClosingFuture)} to start this combination.
1643   *
1644   * @param <V1> the type returned by the first future
1645   * @param <V2> the type returned by the second future
1646   * @param <V3> the type returned by the third future
1647   * @param <V4> the type returned by the fourth future
1648   */
1649  public static final class Combiner4<
1650          V1 extends Object, V2 extends Object, V3 extends Object, V4 extends Object>
1651      extends Combiner {
1652    /**
1653     * A function that returns a value when applied to the values of the four futures passed to
1654     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}.
1655     *
1656     * @param <V1> the type returned by the first future
1657     * @param <V2> the type returned by the second future
1658     * @param <V3> the type returned by the third future
1659     * @param <V4> the type returned by the fourth future
1660     * @param <U> the type returned by the function
1661     */
1662    public interface ClosingFunction4<
1663        V1 extends Object,
1664        V2 extends Object,
1665        V3 extends Object,
1666        V4 extends Object,
1667        U extends Object> {
1668      /**
1669       * Applies this function to four inputs, or throws an exception if unable to do so.
1670       *
1671       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1672       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1673       * is done (but not before this method completes), even if this method throws or the pipeline
1674       * is cancelled.
1675       */
1676      @NullableDecl
1677      U apply(
1678          DeferredCloser closer,
1679          @NullableDecl V1 value1,
1680          @NullableDecl V2 value2,
1681          @NullableDecl V3 value3,
1682          @NullableDecl V4 value4)
1683          throws Exception;
1684    }
1685
1686    /**
1687     * A function that returns a {@link ClosingFuture} when applied to the values of the four
1688     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1689     * ClosingFuture)}.
1690     *
1691     * @param <V1> the type returned by the first future
1692     * @param <V2> the type returned by the second future
1693     * @param <V3> the type returned by the third future
1694     * @param <V4> the type returned by the fourth future
1695     * @param <U> the type returned by the function
1696     */
1697    public interface AsyncClosingFunction4<
1698        V1 extends Object,
1699        V2 extends Object,
1700        V3 extends Object,
1701        V4 extends Object,
1702        U extends Object> {
1703      /**
1704       * Applies this function to four inputs, or throws an exception if unable to do so.
1705       *
1706       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1707       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1708       * is done (but not before this method completes), even if this method throws or the pipeline
1709       * is cancelled.
1710       */
1711      ClosingFuture<U> apply(
1712          DeferredCloser closer,
1713          @NullableDecl V1 value1,
1714          @NullableDecl V2 value2,
1715          @NullableDecl V3 value3,
1716          @NullableDecl V4 value4)
1717          throws Exception;
1718    }
1719
1720    private final ClosingFuture<V1> future1;
1721    private final ClosingFuture<V2> future2;
1722    private final ClosingFuture<V3> future3;
1723    private final ClosingFuture<V4> future4;
1724
1725    private Combiner4(
1726        ClosingFuture<V1> future1,
1727        ClosingFuture<V2> future2,
1728        ClosingFuture<V3> future3,
1729        ClosingFuture<V4> future4) {
1730      super(true, ImmutableList.of(future1, future2, future3, future4));
1731      this.future1 = future1;
1732      this.future2 = future2;
1733      this.future3 = future3;
1734      this.future4 = future4;
1735    }
1736
1737    /**
1738     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1739     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1740     * objects to be closed when the pipeline is done.
1741     *
1742     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1743     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1744     *
1745     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1746     *
1747     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1748     * ExecutionException} will be extracted and used as the failure of the derived step.
1749     */
1750    public <U extends Object> ClosingFuture<U> call(
1751        final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1752      return call(
1753          new CombiningCallable<U>() {
1754            @Override
1755            @NullableDecl
1756            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1757              return function.apply(
1758                  closer,
1759                  peeker.getDone(future1),
1760                  peeker.getDone(future2),
1761                  peeker.getDone(future3),
1762                  peeker.getDone(future4));
1763            }
1764
1765            @Override
1766            public String toString() {
1767              return function.toString();
1768            }
1769          },
1770          executor);
1771    }
1772
1773    /**
1774     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1775     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1776     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1777     * captured by the returned {@link ClosingFuture}).
1778     *
1779     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1780     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1781     *
1782     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1783     *
1784     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1785     * ExecutionException} will be extracted and used as the failure of the derived step.
1786     *
1787     * <p>If the function throws any other exception, it will be used as the failure of the derived
1788     * step.
1789     *
1790     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1791     * the closeable objects in that {@code ClosingFuture} will be closed.
1792     *
1793     * <p>Usage guidelines for this method:
1794     *
1795     * <ul>
1796     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1797     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1798     *       Executor)} instead, with a function that returns the next value directly.
1799     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1800     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1801     *       capture it for later closing.
1802     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1803     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1804     * </ul>
1805     *
1806     * <p>The same warnings about doing heavyweight operations within {@link
1807     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1808     */
1809    public <U extends Object> ClosingFuture<U> callAsync(
1810        final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1811      return callAsync(
1812          new AsyncCombiningCallable<U>() {
1813            @Override
1814            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1815              return function.apply(
1816                  closer,
1817                  peeker.getDone(future1),
1818                  peeker.getDone(future2),
1819                  peeker.getDone(future3),
1820                  peeker.getDone(future4));
1821            }
1822
1823            @Override
1824            public String toString() {
1825              return function.toString();
1826            }
1827          },
1828          executor);
1829    }
1830  }
1831
1832  /**
1833   * A generic {@link Combiner} that lets you use a lambda or method reference to combine five
1834   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1835   * ClosingFuture, ClosingFuture)} to start this combination.
1836   *
1837   * @param <V1> the type returned by the first future
1838   * @param <V2> the type returned by the second future
1839   * @param <V3> the type returned by the third future
1840   * @param <V4> the type returned by the fourth future
1841   * @param <V5> the type returned by the fifth future
1842   */
1843  public static final class Combiner5<
1844          V1 extends Object,
1845          V2 extends Object,
1846          V3 extends Object,
1847          V4 extends Object,
1848          V5 extends Object>
1849      extends Combiner {
1850    /**
1851     * A function that returns a value when applied to the values of the five futures passed to
1852     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture,
1853     * ClosingFuture)}.
1854     *
1855     * @param <V1> the type returned by the first future
1856     * @param <V2> the type returned by the second future
1857     * @param <V3> the type returned by the third future
1858     * @param <V4> the type returned by the fourth future
1859     * @param <V5> the type returned by the fifth future
1860     * @param <U> the type returned by the function
1861     */
1862    public interface ClosingFunction5<
1863        V1 extends Object,
1864        V2 extends Object,
1865        V3 extends Object,
1866        V4 extends Object,
1867        V5 extends Object,
1868        U extends Object> {
1869      /**
1870       * Applies this function to five inputs, or throws an exception if unable to do so.
1871       *
1872       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1873       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1874       * is done (but not before this method completes), even if this method throws or the pipeline
1875       * is cancelled.
1876       */
1877      @NullableDecl
1878      U apply(
1879          DeferredCloser closer,
1880          @NullableDecl V1 value1,
1881          @NullableDecl V2 value2,
1882          @NullableDecl V3 value3,
1883          @NullableDecl V4 value4,
1884          @NullableDecl V5 value5)
1885          throws Exception;
1886    }
1887
1888    /**
1889     * A function that returns a {@link ClosingFuture} when applied to the values of the five
1890     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1891     * ClosingFuture, ClosingFuture)}.
1892     *
1893     * @param <V1> the type returned by the first future
1894     * @param <V2> the type returned by the second future
1895     * @param <V3> the type returned by the third future
1896     * @param <V4> the type returned by the fourth future
1897     * @param <V5> the type returned by the fifth future
1898     * @param <U> the type returned by the function
1899     */
1900    public interface AsyncClosingFunction5<
1901        V1 extends Object,
1902        V2 extends Object,
1903        V3 extends Object,
1904        V4 extends Object,
1905        V5 extends Object,
1906        U extends Object> {
1907      /**
1908       * Applies this function to five inputs, or throws an exception if unable to do so.
1909       *
1910       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1911       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1912       * is done (but not before this method completes), even if this method throws or the pipeline
1913       * is cancelled.
1914       */
1915      ClosingFuture<U> apply(
1916          DeferredCloser closer,
1917          @NullableDecl V1 value1,
1918          @NullableDecl V2 value2,
1919          @NullableDecl V3 value3,
1920          @NullableDecl V4 value4,
1921          @NullableDecl V5 value5)
1922          throws Exception;
1923    }
1924
1925    private final ClosingFuture<V1> future1;
1926    private final ClosingFuture<V2> future2;
1927    private final ClosingFuture<V3> future3;
1928    private final ClosingFuture<V4> future4;
1929    private final ClosingFuture<V5> future5;
1930
1931    private Combiner5(
1932        ClosingFuture<V1> future1,
1933        ClosingFuture<V2> future2,
1934        ClosingFuture<V3> future3,
1935        ClosingFuture<V4> future4,
1936        ClosingFuture<V5> future5) {
1937      super(true, ImmutableList.of(future1, future2, future3, future4, future5));
1938      this.future1 = future1;
1939      this.future2 = future2;
1940      this.future3 = future3;
1941      this.future4 = future4;
1942      this.future5 = future5;
1943    }
1944
1945    /**
1946     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1947     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1948     * objects to be closed when the pipeline is done.
1949     *
1950     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1951     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
1952     * returned step.
1953     *
1954     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1955     *
1956     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1957     * ExecutionException} will be extracted and used as the failure of the derived step.
1958     */
1959    public <U extends Object> ClosingFuture<U> call(
1960        final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
1961      return call(
1962          new CombiningCallable<U>() {
1963            @Override
1964            @NullableDecl
1965            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1966              return function.apply(
1967                  closer,
1968                  peeker.getDone(future1),
1969                  peeker.getDone(future2),
1970                  peeker.getDone(future3),
1971                  peeker.getDone(future4),
1972                  peeker.getDone(future5));
1973            }
1974
1975            @Override
1976            public String toString() {
1977              return function.toString();
1978            }
1979          },
1980          executor);
1981    }
1982
1983    /**
1984     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1985     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1986     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1987     * captured by the returned {@link ClosingFuture}).
1988     *
1989     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1990     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
1991     * returned step.
1992     *
1993     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1994     *
1995     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1996     * ExecutionException} will be extracted and used as the failure of the derived step.
1997     *
1998     * <p>If the function throws any other exception, it will be used as the failure of the derived
1999     * step.
2000     *
2001     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
2002     * the closeable objects in that {@code ClosingFuture} will be closed.
2003     *
2004     * <p>Usage guidelines for this method:
2005     *
2006     * <ul>
2007     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
2008     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
2009     *       Executor)} instead, with a function that returns the next value directly.
2010     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
2011     *       closer.eventuallyClose()} for every closeable object this step creates in order to
2012     *       capture it for later closing.
2013     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
2014     *       ClosingFuture} call {@link #from(ListenableFuture)}.
2015     * </ul>
2016     *
2017     * <p>The same warnings about doing heavyweight operations within {@link
2018     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
2019     */
2020    public <U extends Object> ClosingFuture<U> callAsync(
2021        final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2022      return callAsync(
2023          new AsyncCombiningCallable<U>() {
2024            @Override
2025            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
2026              return function.apply(
2027                  closer,
2028                  peeker.getDone(future1),
2029                  peeker.getDone(future2),
2030                  peeker.getDone(future3),
2031                  peeker.getDone(future4),
2032                  peeker.getDone(future5));
2033            }
2034
2035            @Override
2036            public String toString() {
2037              return function.toString();
2038            }
2039          },
2040          executor);
2041    }
2042  }
2043
2044  @Override
2045  public String toString() {
2046    // TODO(dpb): Better toString, in the style of Futures.transform etc.
2047    return toStringHelper(this).add("state", state.get()).addValue(future).toString();
2048  }
2049
2050  @Override
2051  protected void finalize() {
2052    if (state.get().equals(OPEN)) {
2053      logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this);
2054      FluentFuture<V> unused = finishToFuture();
2055    }
2056  }
2057
2058  private static void closeQuietly(final Closeable closeable, Executor executor) {
2059    if (closeable == null) {
2060      return;
2061    }
2062    try {
2063      executor.execute(
2064          new Runnable() {
2065            @Override
2066            public void run() {
2067              try {
2068                closeable.close();
2069              } catch (IOException | RuntimeException e) {
2070                logger.log(WARNING, "thrown by close()", e);
2071              }
2072            }
2073          });
2074    } catch (RejectedExecutionException e) {
2075      if (logger.isLoggable(WARNING)) {
2076        logger.log(
2077            WARNING, String.format("while submitting close to %s; will close inline", executor), e);
2078      }
2079      closeQuietly(closeable, directExecutor());
2080    }
2081  }
2082
2083  private void checkAndUpdateState(State oldState, State newState) {
2084    checkState(
2085        compareAndUpdateState(oldState, newState),
2086        "Expected state to be %s, but it was %s",
2087        oldState,
2088        newState);
2089  }
2090
2091  private boolean compareAndUpdateState(State oldState, State newState) {
2092    return state.compareAndSet(oldState, newState);
2093  }
2094
2095  // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap?
2096  private static final class CloseableList extends IdentityHashMap<Closeable, Executor>
2097      implements Closeable {
2098    private final DeferredCloser closer = new DeferredCloser(this);
2099    private volatile boolean closed;
2100    private volatile CountDownLatch whenClosed;
2101
2102    <V, U> ListenableFuture<U> applyClosingFunction(
2103        ClosingFunction<? super V, U> transformation, V input) throws Exception {
2104      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2105      CloseableList newCloseables = new CloseableList();
2106      try {
2107        return immediateFuture(transformation.apply(newCloseables.closer, input));
2108      } finally {
2109        add(newCloseables, directExecutor());
2110      }
2111    }
2112
2113    <V, U> FluentFuture<U> applyAsyncClosingFunction(
2114        AsyncClosingFunction<V, U> transformation, V input) throws Exception {
2115      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2116      CloseableList newCloseables = new CloseableList();
2117      try {
2118        ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input);
2119        closingFuture.becomeSubsumedInto(newCloseables);
2120        return closingFuture.future;
2121      } finally {
2122        add(newCloseables, directExecutor());
2123      }
2124    }
2125
2126    @Override
2127    public void close() {
2128      if (closed) {
2129        return;
2130      }
2131      synchronized (this) {
2132        if (closed) {
2133          return;
2134        }
2135        closed = true;
2136      }
2137      for (Map.Entry<Closeable, Executor> entry : entrySet()) {
2138        closeQuietly(entry.getKey(), entry.getValue());
2139      }
2140      clear();
2141      if (whenClosed != null) {
2142        whenClosed.countDown();
2143      }
2144    }
2145
2146    void add(@NullableDecl Closeable closeable, Executor executor) {
2147      checkNotNull(executor);
2148      if (closeable == null) {
2149        return;
2150      }
2151      synchronized (this) {
2152        if (!closed) {
2153          put(closeable, executor);
2154          return;
2155        }
2156      }
2157      closeQuietly(closeable, executor);
2158    }
2159
2160    /**
2161     * Returns a latch that reaches zero when this objects' deferred closeables have been closed.
2162     */
2163    CountDownLatch whenClosedCountDown() {
2164      if (closed) {
2165        return new CountDownLatch(0);
2166      }
2167      synchronized (this) {
2168        if (closed) {
2169          return new CountDownLatch(0);
2170        }
2171        checkState(whenClosed == null);
2172        return whenClosed = new CountDownLatch(1);
2173      }
2174    }
2175  }
2176
2177  /**
2178   * Returns an object that can be used to wait until this objects' deferred closeables have all had
2179   * {@link Runnable}s that close them submitted to each one's closing {@link Executor}.
2180   */
2181  @VisibleForTesting
2182  CountDownLatch whenClosedCountDown() {
2183    return closeables.whenClosedCountDown();
2184  }
2185
2186  /** The state of a {@link CloseableList}. */
2187  enum State {
2188    /** The {@link CloseableList} has not been subsumed or closed. */
2189    OPEN,
2190
2191    /**
2192     * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed
2193     * into any other.
2194     */
2195    SUBSUMED,
2196
2197    /**
2198     * Some {@link ListenableFuture} has a callback attached that will close the {@link
2199     * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed.
2200     */
2201    WILL_CLOSE,
2202
2203    /**
2204     * The callback that closes the {@link CloseableList} is running, but it has not completed. The
2205     * {@link CloseableList} may not be subsumed.
2206     */
2207    CLOSING,
2208
2209    /** The {@link CloseableList} has been closed. It may not be further subsumed. */
2210    CLOSED,
2211
2212    /**
2213     * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been
2214     * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called.
2215     */
2216    WILL_CREATE_VALUE_AND_CLOSER,
2217  }
2218}