001/*
002 * Copyright (C) 2017 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Functions.constant;
020import static com.google.common.base.MoreObjects.toStringHelper;
021import static com.google.common.base.Preconditions.checkArgument;
022import static com.google.common.base.Preconditions.checkNotNull;
023import static com.google.common.base.Preconditions.checkState;
024import static com.google.common.collect.Lists.asList;
025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED;
026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING;
027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN;
028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED;
029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE;
030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER;
031import static com.google.common.util.concurrent.Futures.getDone;
032import static com.google.common.util.concurrent.Futures.immediateFuture;
033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
034import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
035import static java.util.logging.Level.FINER;
036import static java.util.logging.Level.SEVERE;
037import static java.util.logging.Level.WARNING;
038
039import com.google.common.annotations.Beta;
040import com.google.common.annotations.VisibleForTesting;
041import com.google.common.base.Function;
042import com.google.common.collect.FluentIterable;
043import com.google.common.collect.ImmutableList;
044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
046import com.google.common.util.concurrent.Futures.FutureCombiner;
047import com.google.errorprone.annotations.CanIgnoreReturnValue;
048import com.google.errorprone.annotations.DoNotMock;
049import com.google.j2objc.annotations.RetainedWith;
050import java.io.Closeable;
051import java.io.IOException;
052import java.util.IdentityHashMap;
053import java.util.Map;
054import java.util.concurrent.Callable;
055import java.util.concurrent.CancellationException;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.ExecutionException;
058import java.util.concurrent.Executor;
059import java.util.concurrent.Future;
060import java.util.concurrent.RejectedExecutionException;
061import java.util.concurrent.atomic.AtomicReference;
062import java.util.logging.Logger;
063import org.checkerframework.checker.nullness.compatqual.NullableDecl;
064
065/**
066 * A step in a pipeline of an asynchronous computation. When the last step in the computation is
067 * complete, some objects captured during the computation are closed.
068 *
069 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an
070 * asynchronously-computed intermediate value, or else an exception that indicates the failure or
071 * cancellation of the operation so far. The only way to extract the value or exception from a step
072 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the
073 * "value" of a successful step or the "result" (value or exception) of any step.
074 *
075 * <ol>
076 *   <li>A pipeline starts at its leaf step (or steps), which is created from either a callable
077 *       block or a {@link ListenableFuture}.
078 *   <li>Each other step is derived from one or more input steps. At each step, zero or more objects
079 *       can be captured for later closing.
080 *   <li>There is one last step (the root of the tree), from which you can extract the final result
081 *       of the computation. After that result is available (or the computation fails), all objects
082 *       captured by any of the steps in the pipeline are closed.
083 * </ol>
084 *
085 * <h3>Starting a pipeline</h3>
086 *
087 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a
088 * callable block} that may capture objects for later closing. To start a pipeline from a {@link
089 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link
090 * #from(ListenableFuture)} instead.
091 *
092 * <h3>Derived steps</h3>
093 *
094 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in
095 * ways similar to {@link FluentFuture}s:
096 *
097 * <ul>
098 *   <li>by transforming the value from a successful input step,
099 *   <li>by catching the exception from a failed input step, or
100 *   <li>by combining the results of several input steps.
101 * </ul>
102 *
103 * Each derivation can capture the next value or any intermediate objects for later closing.
104 *
105 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its
106 * exception, or combine it with others, you cannot do anything else with it, including declare it
107 * to be the last step of the pipeline.
108 *
109 * <h4>Transforming</h4>
110 *
111 * To derive the next step by asynchronously applying a function to an input step's value, call
112 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction,
113 * Executor)} on the input step.
114 *
115 * <h4>Catching</h4>
116 *
117 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction,
118 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step.
119 *
120 * <h4>Combining</h4>
121 *
122 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link
123 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads.
124 *
125 * <h3>Cancelling</h3>
126 *
127 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step
128 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a
129 * successfully cancelled step will immediately start closing all objects captured for later closing
130 * by it and by its input steps.
131 *
132 * <h3>Ending a pipeline</h3>
133 *
134 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to
135 * close the captured objects automatically or manually.
136 *
137 * <h4>Automatically closing</h4>
138 *
139 * You can extract a {@link Future} that represents the result of the last step in the pipeline by
140 * calling {@link #finishToFuture()}. When that final {@link Future} is done, all objects captured
141 * by all steps in the pipeline will be closed.
142 *
143 * <pre>{@code
144 * FluentFuture<UserName> userName =
145 *     ClosingFuture.submit(
146 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
147 *             executor)
148 *         .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
149 *         .transform((closer, result) -> result.get("userName"), directExecutor())
150 *         .catching(DBException.class, e -> "no user", directExecutor())
151 *         .finishToFuture();
152 * }</pre>
153 *
154 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query
155 * result cursor will both be closed, even if the operation is cancelled or fails.
156 *
157 * <h4>Manually closing</h4>
158 *
159 * If you want to close the captured objects manually, after you've used the final result, call
160 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the
161 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects.
162 *
163 * <pre>{@code
164 *     ClosingFuture.submit(
165 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
166 *             executor)
167 *     .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
168 *     .transform((closer, result) -> result.get("userName"), directExecutor())
169 *     .catching(DBException.class, e -> "no user", directExecutor())
170 *     .finishToValueAndCloser(
171 *         valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor);
172 *
173 * // later
174 * try { // get() will throw if the operation failed or was cancelled.
175 *   UserName userName = userNameValueAndCloser.get();
176 *   // do something with userName
177 * } finally {
178 *   userNameValueAndCloser.closeAsync();
179 * }
180 * }</pre>
181 *
182 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and
183 * the query result cursor will both be closed, even if the operation is cancelled or fails.
184 *
185 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The
186 * automatic-closing approach described above is safer.
187 *
188 * @param <V> the type of the value of this step
189 * @since 30.0
190 */
191// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations.
192@Beta // @Beta for one release.
193@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)")
194// TODO(dpb): GWT compatibility.
195public final class ClosingFuture<V> {
196
197  private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName());
198
199  /**
200   * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is
201   * done.
202   */
203  public static final class DeferredCloser {
204    @RetainedWith private final CloseableList list;
205
206    DeferredCloser(CloseableList list) {
207      this.list = list;
208    }
209
210    /**
211     * Captures an object to be closed when a {@link ClosingFuture} pipeline is done.
212     *
213     * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code
214     * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code
215     * Closeable}. (For more about the flavors, see <a
216     * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your
217     * build</a>.)
218     *
219     * <p>Be careful when targeting an older SDK than you are building against (most commonly when
220     * building for Android): Ensure that any object you pass implements the interface not just in
221     * your current SDK version but also at the oldest version you support. For example, <a
222     * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version
223     * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper
224     * {@code Closeable} with a method reference like {@code cursor::close}.
225     *
226     * <p>Note that this method is still binary-compatible between flavors because the erasure of
227     * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}.
228     *
229     * @param closeable the object to be closed (see notes above)
230     * @param closingExecutor the object will be closed on this executor
231     * @return the first argument
232     */
233    @CanIgnoreReturnValue
234    @NullableDecl
235    // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19.
236    public <C extends Object & Closeable> C eventuallyClose(
237        @NullableDecl C closeable, Executor closingExecutor) {
238      checkNotNull(closingExecutor);
239      if (closeable != null) {
240        list.add(closeable, closingExecutor);
241      }
242      return closeable;
243    }
244  }
245
246  /**
247   * An operation that computes a result.
248   *
249   * @param <V> the type of the result
250   */
251  public interface ClosingCallable<V extends Object> {
252    /**
253     * Computes a result, or throws an exception if unable to do so.
254     *
255     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
256     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
257     * not before this method completes), even if this method throws or the pipeline is cancelled.
258     */
259    @NullableDecl
260    V call(DeferredCloser closer) throws Exception;
261  }
262
263  /**
264   * An operation that computes a {@link ClosingFuture} of a result.
265   *
266   * @param <V> the type of the result
267   * @since 30.1
268   */
269  public interface AsyncClosingCallable<V extends Object> {
270    /**
271     * Computes a result, or throws an exception if unable to do so.
272     *
273     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
274     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
275     * not before this method completes), even if this method throws or the pipeline is cancelled.
276     */
277    ClosingFuture<V> call(DeferredCloser closer) throws Exception;
278  }
279
280  /**
281   * A function from an input to a result.
282   *
283   * @param <T> the type of the input to the function
284   * @param <U> the type of the result of the function
285   */
286  public interface ClosingFunction<T extends Object, U extends Object> {
287
288    /**
289     * Applies this function to an input, or throws an exception if unable to do so.
290     *
291     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
292     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
293     * not before this method completes), even if this method throws or the pipeline is cancelled.
294     */
295    @NullableDecl
296    U apply(DeferredCloser closer, @NullableDecl T input) throws Exception;
297  }
298
299  /**
300   * A function from an input to a {@link ClosingFuture} of a result.
301   *
302   * @param <T> the type of the input to the function
303   * @param <U> the type of the result of the function
304   */
305  public interface AsyncClosingFunction<T extends Object, U extends Object> {
306    /**
307     * Applies this function to an input, or throws an exception if unable to do so.
308     *
309     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
310     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
311     * not before this method completes), even if this method throws or the pipeline is cancelled.
312     */
313    ClosingFuture<U> apply(DeferredCloser closer, @NullableDecl T input) throws Exception;
314  }
315
316  /**
317   * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and
318   * allows the user to close all the closeable objects that were captured during it for later
319   * closing.
320   *
321   * <p>The asynchronous operation will have completed before this object is created.
322   *
323   * @param <V> the type of the value of a successful operation
324   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
325   */
326  public static final class ValueAndCloser<V> {
327
328    private final ClosingFuture<? extends V> closingFuture;
329
330    ValueAndCloser(ClosingFuture<? extends V> closingFuture) {
331      this.closingFuture = checkNotNull(closingFuture);
332    }
333
334    /**
335     * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as
336     * {@link Future#get()} would.
337     *
338     * <p>Because the asynchronous operation has already completed, this method is synchronous and
339     * returns immediately.
340     *
341     * @throws CancellationException if the computation was cancelled
342     * @throws ExecutionException if the computation threw an exception
343     */
344    @NullableDecl
345    public V get() throws ExecutionException {
346      return getDone(closingFuture.future);
347    }
348
349    /**
350     * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous
351     * operation on the {@link Executor}s specified by calls to {@link
352     * DeferredCloser#eventuallyClose(Closeable, Executor)}.
353     *
354     * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be
355     * closed synchronously.
356     *
357     * <p>Idempotent: objects will be closed at most once.
358     */
359    public void closeAsync() {
360      closingFuture.close();
361    }
362  }
363
364  /**
365   * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link
366   * ClosingFuture} pipeline.
367   *
368   * @param <V> the type of the final value of a successful pipeline
369   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
370   */
371  public interface ValueAndCloserConsumer<V> {
372
373    /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */
374    void accept(ValueAndCloser<V> valueAndCloser);
375  }
376
377  /**
378   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
379   *
380   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
381   *     execution
382   */
383  public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) {
384    return new ClosingFuture<>(callable, executor);
385  }
386
387  /**
388   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
389   *
390   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
391   *     execution
392   * @since 30.1
393   */
394  public static <V> ClosingFuture<V> submitAsync(
395      AsyncClosingCallable<V> callable, Executor executor) {
396    return new ClosingFuture<>(callable, executor);
397  }
398
399  /**
400   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
401   *
402   * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V}
403   * implements {@link Closeable}. In order to start a pipeline with a value that will be closed
404   * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead.
405   */
406  public static <V> ClosingFuture<V> from(ListenableFuture<V> future) {
407    return new ClosingFuture<V>(future);
408  }
409
410  /**
411   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
412   *
413   * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when
414   * the pipeline is done, even if the pipeline is canceled or fails.
415   *
416   * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its
417   * value in order to close it.
418   *
419   * @param future the future to create the {@code ClosingFuture} from. For discussion of the
420   *     future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable,
421   *     Executor)}.
422   * @param closingExecutor the future's result will be closed on this executor
423   * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the
424   *     underlying value may never be closed if the {@link Future} is canceled after its operation
425   *     begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types,
426   *     including those that pass them to this method, with {@link #submit(ClosingCallable,
427   *     Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a
428   *     {@link ListenableFuture} that doesn't create values that should be closed, use {@link
429   *     ClosingFuture#from}.
430   */
431  @Deprecated
432  // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19.
433  public static <C extends Object & Closeable> ClosingFuture<C> eventuallyClosing(
434      ListenableFuture<C> future, final Executor closingExecutor) {
435    checkNotNull(closingExecutor);
436    final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future));
437    Futures.addCallback(
438        future,
439        new FutureCallback<Closeable>() {
440          @Override
441          public void onSuccess(@NullableDecl Closeable result) {
442            closingFuture.closeables.closer.eventuallyClose(result, closingExecutor);
443          }
444
445          @Override
446          public void onFailure(Throwable t) {}
447        },
448        directExecutor());
449    return closingFuture;
450  }
451
452  /**
453   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
454   *
455   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
456   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
457   */
458  public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) {
459    return new Combiner(false, futures);
460  }
461
462  /**
463   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
464   *
465   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
466   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
467   */
468  public static Combiner whenAllComplete(
469      ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) {
470    return whenAllComplete(asList(future1, moreFutures));
471  }
472
473  /**
474   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
475   * all succeed. If any fail, the resulting pipeline will fail.
476   *
477   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
478   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
479   */
480  public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) {
481    return new Combiner(true, futures);
482  }
483
484  /**
485   * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming
486   * they all succeed. If any fail, the resulting pipeline will fail.
487   *
488   * <p>Calling this method allows you to use lambdas or method references typed with the types of
489   * the input {@link ClosingFuture}s.
490   *
491   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
492   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
493   */
494  public static <V1, V2> Combiner2<V1, V2> whenAllSucceed(
495      ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
496    return new Combiner2<>(future1, future2);
497  }
498
499  /**
500   * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming
501   * they all succeed. If any fail, the resulting pipeline will fail.
502   *
503   * <p>Calling this method allows you to use lambdas or method references typed with the types of
504   * the input {@link ClosingFuture}s.
505   *
506   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
507   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
508   */
509  public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed(
510      ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
511    return new Combiner3<>(future1, future2, future3);
512  }
513
514  /**
515   * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming
516   * they all succeed. If any fail, the resulting pipeline will fail.
517   *
518   * <p>Calling this method allows you to use lambdas or method references typed with the types of
519   * the input {@link ClosingFuture}s.
520   *
521   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
522   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
523   */
524  public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed(
525      ClosingFuture<V1> future1,
526      ClosingFuture<V2> future2,
527      ClosingFuture<V3> future3,
528      ClosingFuture<V4> future4) {
529    return new Combiner4<>(future1, future2, future3, future4);
530  }
531
532  /**
533   * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming
534   * they all succeed. If any fail, the resulting pipeline will fail.
535   *
536   * <p>Calling this method allows you to use lambdas or method references typed with the types of
537   * the input {@link ClosingFuture}s.
538   *
539   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
540   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
541   */
542  public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed(
543      ClosingFuture<V1> future1,
544      ClosingFuture<V2> future2,
545      ClosingFuture<V3> future3,
546      ClosingFuture<V4> future4,
547      ClosingFuture<V5> future5) {
548    return new Combiner5<>(future1, future2, future3, future4, future5);
549  }
550
551  /**
552   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
553   * all succeed. If any fail, the resulting pipeline will fail.
554   *
555   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
556   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
557   */
558  public static Combiner whenAllSucceed(
559      ClosingFuture<?> future1,
560      ClosingFuture<?> future2,
561      ClosingFuture<?> future3,
562      ClosingFuture<?> future4,
563      ClosingFuture<?> future5,
564      ClosingFuture<?> future6,
565      ClosingFuture<?>... moreFutures) {
566    return whenAllSucceed(
567        FluentIterable.of(future1, future2, future3, future4, future5, future6)
568            .append(moreFutures));
569  }
570
571  private final AtomicReference<State> state = new AtomicReference<>(OPEN);
572  private final CloseableList closeables = new CloseableList();
573  private final FluentFuture<V> future;
574
575  private ClosingFuture(ListenableFuture<V> future) {
576    this.future = FluentFuture.from(future);
577  }
578
579  private ClosingFuture(final ClosingCallable<V> callable, Executor executor) {
580    checkNotNull(callable);
581    TrustedListenableFutureTask<V> task =
582        TrustedListenableFutureTask.create(
583            new Callable<V>() {
584              @Override
585              public V call() throws Exception {
586                return callable.call(closeables.closer);
587              }
588
589              @Override
590              public String toString() {
591                return callable.toString();
592              }
593            });
594    executor.execute(task);
595    this.future = task;
596  }
597
598  private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) {
599    checkNotNull(callable);
600    TrustedListenableFutureTask<V> task =
601        TrustedListenableFutureTask.create(
602            new AsyncCallable<V>() {
603              @Override
604              public ListenableFuture<V> call() throws Exception {
605                CloseableList newCloseables = new CloseableList();
606                try {
607                  ClosingFuture<V> closingFuture = callable.call(newCloseables.closer);
608                  closingFuture.becomeSubsumedInto(closeables);
609                  return closingFuture.future;
610                } finally {
611                  closeables.add(newCloseables, directExecutor());
612                }
613              }
614
615              @Override
616              public String toString() {
617                return callable.toString();
618              }
619            });
620    executor.execute(task);
621    this.future = task;
622  }
623
624  /**
625   * Returns a future that finishes when this step does. Calling {@code get()} on the returned
626   * future returns {@code null} if the step is successful or throws the same exception that would
627   * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code
628   * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline.
629   *
630   * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls
631   * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or
632   * a derivation method <i>on the same instance</i>. This is important because calling {@code
633   * statusFuture} alone does not provide a way to close the pipeline.
634   */
635  public ListenableFuture<?> statusFuture() {
636    return nonCancellationPropagating(future.transform(constant(null), directExecutor()));
637  }
638
639  /**
640   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
641   * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed
642   * when the pipeline is done.
643   *
644   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
645   * ClosingFuture} will be equivalent to this one.
646   *
647   * <p>If the function throws an exception, that exception is used as the result of the derived
648   * {@code ClosingFuture}.
649   *
650   * <p>Example usage:
651   *
652   * <pre>{@code
653   * ClosingFuture<List<Row>> rowsFuture =
654   *     queryFuture.transform((closer, result) -> result.getRows(), executor);
655   * }</pre>
656   *
657   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
658   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
659   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
660   *
661   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
662   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
663   * this {@code ClosingFuture}.
664   *
665   * @param function transforms the value of this step to the value of the derived step
666   * @param executor executor to run the function in
667   * @return the derived step
668   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
669   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
670   *     finished}
671   */
672  public <U> ClosingFuture<U> transform(
673      final ClosingFunction<? super V, U> function, Executor executor) {
674    checkNotNull(function);
675    AsyncFunction<V, U> applyFunction =
676        new AsyncFunction<V, U>() {
677          @Override
678          public ListenableFuture<U> apply(V input) throws Exception {
679            return closeables.applyClosingFunction(function, input);
680          }
681
682          @Override
683          public String toString() {
684            return function.toString();
685          }
686        };
687    // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function).
688    return derive(future.transformAsync(applyFunction, executor));
689  }
690
691  /**
692   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
693   * that returns a {@code ClosingFuture} to its value. The function can use a {@link
694   * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
695   * captured by the returned {@link ClosingFuture}).
696   *
697   * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one
698   * returned by the function.
699   *
700   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
701   * ClosingFuture} will be equivalent to this one.
702   *
703   * <p>If the function throws an exception, that exception is used as the result of the derived
704   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
705   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
706   * closed.
707   *
708   * <p>Usage guidelines for this method:
709   *
710   * <ul>
711   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
712   *       {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction,
713   *       Executor)} instead, with a function that returns the next value directly.
714   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
715   *       for every closeable object this step creates in order to capture it for later closing.
716   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
717   *       ClosingFuture} call {@link #from(ListenableFuture)}.
718   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
719   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
720   *       {@link #withoutCloser(AsyncFunction)}
721   * </ul>
722   *
723   * <p>Example usage:
724   *
725   * <pre>{@code
726   * // Result.getRowsClosingFuture() returns a ClosingFuture.
727   * ClosingFuture<List<Row>> rowsFuture =
728   *     queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor);
729   *
730   * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the
731   * // number of written rows. openOutputFile() returns a FileOutputStream (which implements
732   * // Closeable).
733   * ClosingFuture<Integer> rowsFuture2 =
734   *     queryFuture.transformAsync(
735   *         (closer, result) -> {
736   *           FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor);
737   *           return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos));
738   *      },
739   *      executor);
740   *
741   * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created).
742   * ClosingFuture<List<Row>> rowsFuture3 =
743   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
744   *
745   * }</pre>
746   *
747   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
748   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
749   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
750   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
751   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
752   * responsible for completing the returned {@code ClosingFuture}.)
753   *
754   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
755   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
756   * this {@code ClosingFuture}.
757   *
758   * @param function transforms the value of this step to a {@code ClosingFuture} with the value of
759   *     the derived step
760   * @param executor executor to run the function in
761   * @return the derived step
762   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
763   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
764   *     finished}
765   */
766  public <U> ClosingFuture<U> transformAsync(
767      final AsyncClosingFunction<? super V, U> function, Executor executor) {
768    checkNotNull(function);
769    AsyncFunction<V, U> applyFunction =
770        new AsyncFunction<V, U>() {
771          @Override
772          public ListenableFuture<U> apply(V input) throws Exception {
773            return closeables.applyAsyncClosingFunction(function, input);
774          }
775
776          @Override
777          public String toString() {
778            return function.toString();
779          }
780        };
781    return derive(future.transformAsync(applyFunction, executor));
782  }
783
784  /**
785   * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input,
786   * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned
787   * {@link ListenableFuture}.
788   *
789   * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction,
790   * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it
791   * meets these conditions:
792   *
793   * <ul>
794   *   <li>It does not need to capture any {@link Closeable} objects by calling {@link
795   *       DeferredCloser#eventuallyClose(Closeable, Executor)}.
796   *   <li>It returns a {@link ListenableFuture}.
797   * </ul>
798   *
799   * <p>Example usage:
800   *
801   * <pre>{@code
802   * // Result.getRowsFuture() returns a ListenableFuture.
803   * ClosingFuture<List<Row>> rowsFuture =
804   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
805   * }</pre>
806   *
807   * @param function transforms the value of a {@code ClosingFuture} step to a {@link
808   *     ListenableFuture} with the value of a derived step
809   */
810  public static <V, U> AsyncClosingFunction<V, U> withoutCloser(
811      final AsyncFunction<V, U> function) {
812    checkNotNull(function);
813    return new AsyncClosingFunction<V, U>() {
814      @Override
815      public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception {
816        return ClosingFuture.from(function.apply(input));
817      }
818    };
819  }
820
821  /**
822   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
823   * to its exception if it is an instance of a given exception type. The function can use a {@link
824   * DeferredCloser} to capture objects to be closed when the pipeline is done.
825   *
826   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
827   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
828   * one.
829   *
830   * <p>If the function throws an exception, that exception is used as the result of the derived
831   * {@code ClosingFuture}.
832   *
833   * <p>Example usage:
834   *
835   * <pre>{@code
836   * ClosingFuture<QueryResult> queryFuture =
837   *     queryFuture.catching(
838   *         QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor);
839   * }</pre>
840   *
841   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
842   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
843   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
844   *
845   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
846   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
847   * this {@code ClosingFuture}.
848   *
849   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
850   *     type is matched against this step's exception. "This step's exception" means the cause of
851   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
852   *     underlying this step or, if {@code get()} throws a different kind of exception, that
853   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
854   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
855   * @param fallback the function to be called if this step fails with the expected exception type.
856   *     The function's argument is this step's exception. "This step's exception" means the cause
857   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
858   *     underlying this step or, if {@code get()} throws a different kind of exception, that
859   *     exception itself.
860   * @param executor the executor that runs {@code fallback} if the input fails
861   */
862  public <X extends Throwable> ClosingFuture<V> catching(
863      Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) {
864    return catchingMoreGeneric(exceptionType, fallback, executor);
865  }
866
867  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
868  private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric(
869      Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) {
870    checkNotNull(fallback);
871    AsyncFunction<X, W> applyFallback =
872        new AsyncFunction<X, W>() {
873          @Override
874          public ListenableFuture<W> apply(X exception) throws Exception {
875            return closeables.applyClosingFunction(fallback, exception);
876          }
877
878          @Override
879          public String toString() {
880            return fallback.toString();
881          }
882        };
883    // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function).
884    return derive(future.catchingAsync(exceptionType, applyFallback, executor));
885  }
886
887  /**
888   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
889   * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception
890   * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the
891   * pipeline is done (other than those captured by the returned {@link ClosingFuture}).
892   *
893   * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code
894   * ClosingFuture} will be equivalent to the one returned by the function.
895   *
896   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
897   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
898   * one.
899   *
900   * <p>If the function throws an exception, that exception is used as the result of the derived
901   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
902   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
903   * closed.
904   *
905   * <p>Usage guidelines for this method:
906   *
907   * <ul>
908   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
909   *       {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class,
910   *       ClosingFunction, Executor)} instead, with a function that returns the next value
911   *       directly.
912   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
913   *       for every closeable object this step creates in order to capture it for later closing.
914   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
915   *       ClosingFuture} call {@link #from(ListenableFuture)}.
916   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
917   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
918   *       {@link #withoutCloser(AsyncFunction)}
919   * </ul>
920   *
921   * <p>Example usage:
922   *
923   * <pre>{@code
924   * // Fall back to a secondary input stream in case of IOException.
925   * ClosingFuture<InputStream> inputFuture =
926   *     firstInputFuture.catchingAsync(
927   *         IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor);
928   * }
929   * }</pre>
930   *
931   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
932   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
933   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
934   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
935   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
936   * responsible for completing the returned {@code ClosingFuture}.)
937   *
938   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
939   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
940   * this {@code ClosingFuture}.
941   *
942   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
943   *     type is matched against this step's exception. "This step's exception" means the cause of
944   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
945   *     underlying this step or, if {@code get()} throws a different kind of exception, that
946   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
947   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
948   * @param fallback the function to be called if this step fails with the expected exception type.
949   *     The function's argument is this step's exception. "This step's exception" means the cause
950   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
951   *     underlying this step or, if {@code get()} throws a different kind of exception, that
952   *     exception itself.
953   * @param executor the executor that runs {@code fallback} if the input fails
954   */
955  // TODO(dpb): Should this do something special if the function throws CancellationException or
956  // ExecutionException?
957  public <X extends Throwable> ClosingFuture<V> catchingAsync(
958      Class<X> exceptionType,
959      AsyncClosingFunction<? super X, ? extends V> fallback,
960      Executor executor) {
961    return catchingAsyncMoreGeneric(exceptionType, fallback, executor);
962  }
963
964  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
965  private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric(
966      Class<X> exceptionType,
967      final AsyncClosingFunction<? super X, W> fallback,
968      Executor executor) {
969    checkNotNull(fallback);
970    AsyncFunction<X, W> asyncFunction =
971        new AsyncFunction<X, W>() {
972          @Override
973          public ListenableFuture<W> apply(X exception) throws Exception {
974            return closeables.applyAsyncClosingFunction(fallback, exception);
975          }
976
977          @Override
978          public String toString() {
979            return fallback.toString();
980          }
981        };
982    return derive(future.catchingAsync(exceptionType, asyncFunction, executor));
983  }
984
985  /**
986   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When the returned
987   * {@link Future} is done, all objects captured for closing during the pipeline's computation will
988   * be closed.
989   *
990   * <p>After calling this method, you may not call {@link
991   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other
992   * derivation method on this {@code ClosingFuture}.
993   *
994   * @return a {@link Future} that represents the final value or exception of the pipeline
995   */
996  public FluentFuture<V> finishToFuture() {
997    if (compareAndUpdateState(OPEN, WILL_CLOSE)) {
998      logger.log(FINER, "will close {0}", this);
999      future.addListener(
1000          new Runnable() {
1001            @Override
1002            public void run() {
1003              checkAndUpdateState(WILL_CLOSE, CLOSING);
1004              close();
1005              checkAndUpdateState(CLOSING, CLOSED);
1006            }
1007          },
1008          directExecutor());
1009    } else {
1010      switch (state.get()) {
1011        case SUBSUMED:
1012          throw new IllegalStateException(
1013              "Cannot call finishToFuture() after deriving another step");
1014
1015        case WILL_CREATE_VALUE_AND_CLOSER:
1016          throw new IllegalStateException(
1017              "Cannot call finishToFuture() after calling finishToValueAndCloser()");
1018
1019        case WILL_CLOSE:
1020        case CLOSING:
1021        case CLOSED:
1022          throw new IllegalStateException("Cannot call finishToFuture() twice");
1023
1024        case OPEN:
1025          throw new AssertionError();
1026      }
1027    }
1028    return future;
1029  }
1030
1031  /**
1032   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done,
1033   * {@code receiver} will be called with an object that contains the result of the operation. The
1034   * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use.
1035   *
1036   * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or
1037   * any other derivation method on this {@code ClosingFuture}.
1038   *
1039   * @param consumer a callback whose method will be called (using {@code executor}) when this
1040   *     operation is done
1041   */
1042  public void finishToValueAndCloser(
1043      final ValueAndCloserConsumer<? super V> consumer, Executor executor) {
1044    checkNotNull(consumer);
1045    if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) {
1046      switch (state.get()) {
1047        case SUBSUMED:
1048          throw new IllegalStateException(
1049              "Cannot call finishToValueAndCloser() after deriving another step");
1050
1051        case WILL_CLOSE:
1052        case CLOSING:
1053        case CLOSED:
1054          throw new IllegalStateException(
1055              "Cannot call finishToValueAndCloser() after calling finishToFuture()");
1056
1057        case WILL_CREATE_VALUE_AND_CLOSER:
1058          throw new IllegalStateException("Cannot call finishToValueAndCloser() twice");
1059
1060        case OPEN:
1061          break;
1062      }
1063      throw new AssertionError(state);
1064    }
1065    future.addListener(
1066        new Runnable() {
1067          @Override
1068          public void run() {
1069            provideValueAndCloser(consumer, ClosingFuture.this);
1070          }
1071        },
1072        executor);
1073  }
1074
1075  private static <C, V extends C> void provideValueAndCloser(
1076      ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) {
1077    consumer.accept(new ValueAndCloser<C>(closingFuture));
1078  }
1079
1080  /**
1081   * Attempts to cancel execution of this step. This attempt will fail if the step has already
1082   * completed, has already been cancelled, or could not be cancelled for some other reason. If
1083   * successful, and this step has not started when {@code cancel} is called, this step should never
1084   * run.
1085   *
1086   * <p>If successful, causes the objects captured by this step (if already started) and its input
1087   * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls
1088   * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously.
1089   *
1090   * @param mayInterruptIfRunning {@code true} if the thread executing this task should be
1091   *     interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be
1092   *     cancelled regardless
1093   * @return {@code false} if the step could not be cancelled, typically because it has already
1094   *     completed normally; {@code true} otherwise
1095   */
1096  @CanIgnoreReturnValue
1097  public boolean cancel(boolean mayInterruptIfRunning) {
1098    logger.log(FINER, "cancelling {0}", this);
1099    boolean cancelled = future.cancel(mayInterruptIfRunning);
1100    if (cancelled) {
1101      close();
1102    }
1103    return cancelled;
1104  }
1105
1106  private void close() {
1107    logger.log(FINER, "closing {0}", this);
1108    closeables.close();
1109  }
1110
1111  private <U> ClosingFuture<U> derive(FluentFuture<U> future) {
1112    ClosingFuture<U> derived = new ClosingFuture<>(future);
1113    becomeSubsumedInto(derived.closeables);
1114    return derived;
1115  }
1116
1117  private void becomeSubsumedInto(CloseableList otherCloseables) {
1118    checkAndUpdateState(OPEN, SUBSUMED);
1119    otherCloseables.add(closeables, directExecutor());
1120  }
1121
1122  /**
1123   * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link
1124   * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}.
1125   *
1126   * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object.
1127   */
1128  public static final class Peeker {
1129    private final ImmutableList<ClosingFuture<?>> futures;
1130    private volatile boolean beingCalled;
1131
1132    private Peeker(ImmutableList<ClosingFuture<?>> futures) {
1133      this.futures = checkNotNull(futures);
1134    }
1135
1136    /**
1137     * Returns the value of {@code closingFuture}.
1138     *
1139     * @throws ExecutionException if {@code closingFuture} is a failed step
1140     * @throws CancellationException if the {@code closingFuture}'s future was cancelled
1141     * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to
1142     *     {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)}
1143     * @throws IllegalStateException if called outside of a call to {@link
1144     *     CombiningCallable#call(DeferredCloser, Peeker)} or {@link
1145     *     AsyncCombiningCallable#call(DeferredCloser, Peeker)}
1146     */
1147    @NullableDecl
1148    public final <D extends Object> D getDone(ClosingFuture<D> closingFuture)
1149        throws ExecutionException {
1150      checkState(beingCalled);
1151      checkArgument(futures.contains(closingFuture));
1152      return Futures.getDone(closingFuture.future);
1153    }
1154
1155    @NullableDecl
1156    private <V extends Object> V call(CombiningCallable<V> combiner, CloseableList closeables)
1157        throws Exception {
1158      beingCalled = true;
1159      CloseableList newCloseables = new CloseableList();
1160      try {
1161        return combiner.call(newCloseables.closer, this);
1162      } finally {
1163        closeables.add(newCloseables, directExecutor());
1164        beingCalled = false;
1165      }
1166    }
1167
1168    private <V extends Object> FluentFuture<V> callAsync(
1169        AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1170      beingCalled = true;
1171      CloseableList newCloseables = new CloseableList();
1172      try {
1173        ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this);
1174        closingFuture.becomeSubsumedInto(closeables);
1175        return closingFuture.future;
1176      } finally {
1177        closeables.add(newCloseables, directExecutor());
1178        beingCalled = false;
1179      }
1180    }
1181  }
1182
1183  /**
1184   * A builder of a {@link ClosingFuture} step that is derived from more than one input step.
1185   *
1186   * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to
1187   * instantiate this class.
1188   *
1189   * <p>Example:
1190   *
1191   * <pre>{@code
1192   * final ClosingFuture<BufferedReader> file1ReaderFuture = ...;
1193   * final ClosingFuture<BufferedReader> file2ReaderFuture = ...;
1194   * ListenableFuture<Integer> numberOfDifferentLines =
1195   *       ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture)
1196   *           .call(
1197   *               (closer, peeker) -> {
1198   *                 BufferedReader file1Reader = peeker.getDone(file1ReaderFuture);
1199   *                 BufferedReader file2Reader = peeker.getDone(file2ReaderFuture);
1200   *                 return countDifferentLines(file1Reader, file2Reader);
1201   *               },
1202   *               executor)
1203   *           .closing(executor);
1204   * }</pre>
1205   */
1206  // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8.
1207  @com.google.errorprone.annotations.DoNotMock(
1208      "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.")
1209  public static class Combiner {
1210
1211    private final CloseableList closeables = new CloseableList();
1212
1213    /**
1214     * An operation that returns a result and may throw an exception.
1215     *
1216     * @param <V> the type of the result
1217     */
1218    public interface CombiningCallable<V extends Object> {
1219      /**
1220       * Computes a result, or throws an exception if unable to do so.
1221       *
1222       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1223       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1224       * is done (but not before this method completes), even if this method throws or the pipeline
1225       * is cancelled.
1226       *
1227       * @param peeker used to get the value of any of the input futures
1228       */
1229      @NullableDecl
1230      V call(DeferredCloser closer, Peeker peeker) throws Exception;
1231    }
1232
1233    /**
1234     * An operation that returns a {@link ClosingFuture} result and may throw an exception.
1235     *
1236     * @param <V> the type of the result
1237     */
1238    public interface AsyncCombiningCallable<V extends Object> {
1239      /**
1240       * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so.
1241       *
1242       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1243       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1244       * is done (but not before this method completes), even if this method throws or the pipeline
1245       * is cancelled.
1246       *
1247       * @param peeker used to get the value of any of the input futures
1248       */
1249      ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception;
1250    }
1251
1252    private final boolean allMustSucceed;
1253    protected final ImmutableList<ClosingFuture<?>> inputs;
1254
1255    private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) {
1256      this.allMustSucceed = allMustSucceed;
1257      this.inputs = ImmutableList.copyOf(inputs);
1258      for (ClosingFuture<?> input : inputs) {
1259        input.becomeSubsumedInto(closeables);
1260      }
1261    }
1262
1263    /**
1264     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1265     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1266     * objects to be closed when the pipeline is done.
1267     *
1268     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1269     * fail, so will the returned step.
1270     *
1271     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1272     * cancelled.
1273     *
1274     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1275     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1276     */
1277    public <V> ClosingFuture<V> call(
1278        final CombiningCallable<V> combiningCallable, Executor executor) {
1279      Callable<V> callable =
1280          new Callable<V>() {
1281            @Override
1282            public V call() throws Exception {
1283              return new Peeker(inputs).call(combiningCallable, closeables);
1284            }
1285
1286            @Override
1287            public String toString() {
1288              return combiningCallable.toString();
1289            }
1290          };
1291      ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor));
1292      derived.closeables.add(closeables, directExecutor());
1293      return derived;
1294    }
1295
1296    /**
1297     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1298     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1299     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1300     * captured by the returned {@link ClosingFuture}).
1301     *
1302     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1303     * fail, so will the returned step.
1304     *
1305     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1306     * cancelled.
1307     *
1308     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1309     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1310     *
1311     * <p>If the combiningCallable throws any other exception, it will be used as the failure of the
1312     * derived step.
1313     *
1314     * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture},
1315     * then none of the closeable objects in that {@code ClosingFuture} will be closed.
1316     *
1317     * <p>Usage guidelines for this method:
1318     *
1319     * <ul>
1320     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1321     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1322     *       Executor)} instead, with a function that returns the next value directly.
1323     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1324     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1325     *       capture it for later closing.
1326     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1327     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1328     * </ul>
1329     *
1330     * <p>The same warnings about doing heavyweight operations within {@link
1331     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1332     */
1333    public <V> ClosingFuture<V> callAsync(
1334        final AsyncCombiningCallable<V> combiningCallable, Executor executor) {
1335      AsyncCallable<V> asyncCallable =
1336          new AsyncCallable<V>() {
1337            @Override
1338            public ListenableFuture<V> call() throws Exception {
1339              return new Peeker(inputs).callAsync(combiningCallable, closeables);
1340            }
1341
1342            @Override
1343            public String toString() {
1344              return combiningCallable.toString();
1345            }
1346          };
1347      ClosingFuture<V> derived =
1348          new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor));
1349      derived.closeables.add(closeables, directExecutor());
1350      return derived;
1351    }
1352
1353    private FutureCombiner<Object> futureCombiner() {
1354      return allMustSucceed
1355          ? Futures.whenAllSucceed(inputFutures())
1356          : Futures.whenAllComplete(inputFutures());
1357    }
1358
1359    private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE =
1360        new Function<ClosingFuture<?>, FluentFuture<?>>() {
1361          @Override
1362          public FluentFuture<?> apply(ClosingFuture<?> future) {
1363            return future.future;
1364          }
1365        };
1366
1367    private ImmutableList<FluentFuture<?>> inputFutures() {
1368      return FluentIterable.from(inputs).transform(INNER_FUTURE).toList();
1369    }
1370  }
1371
1372  /**
1373   * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link
1374   * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this
1375   * combination.
1376   *
1377   * @param <V1> the type returned by the first future
1378   * @param <V2> the type returned by the second future
1379   */
1380  public static final class Combiner2<V1 extends Object, V2 extends Object> extends Combiner {
1381
1382    /**
1383     * A function that returns a value when applied to the values of the two futures passed to
1384     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1385     *
1386     * @param <V1> the type returned by the first future
1387     * @param <V2> the type returned by the second future
1388     * @param <U> the type returned by the function
1389     */
1390    public interface ClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> {
1391
1392      /**
1393       * Applies this function to two inputs, or throws an exception if unable to do so.
1394       *
1395       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1396       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1397       * is done (but not before this method completes), even if this method throws or the pipeline
1398       * is cancelled.
1399       */
1400      @NullableDecl
1401      U apply(DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2)
1402          throws Exception;
1403    }
1404
1405    /**
1406     * A function that returns a {@link ClosingFuture} when applied to the values of the two futures
1407     * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1408     *
1409     * @param <V1> the type returned by the first future
1410     * @param <V2> the type returned by the second future
1411     * @param <U> the type returned by the function
1412     */
1413    public interface AsyncClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> {
1414
1415      /**
1416       * Applies this function to two inputs, or throws an exception if unable to do so.
1417       *
1418       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1419       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1420       * is done (but not before this method completes), even if this method throws or the pipeline
1421       * is cancelled.
1422       */
1423      ClosingFuture<U> apply(
1424          DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2) throws Exception;
1425    }
1426
1427    private final ClosingFuture<V1> future1;
1428    private final ClosingFuture<V2> future2;
1429
1430    private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
1431      super(true, ImmutableList.of(future1, future2));
1432      this.future1 = future1;
1433      this.future2 = future2;
1434    }
1435
1436    /**
1437     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1438     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1439     * objects to be closed when the pipeline is done.
1440     *
1441     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1442     * any of the inputs fail, so will the returned step.
1443     *
1444     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1445     *
1446     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1447     * ExecutionException} will be extracted and used as the failure of the derived step.
1448     */
1449    public <U extends Object> ClosingFuture<U> call(
1450        final ClosingFunction2<V1, V2, U> function, Executor executor) {
1451      return call(
1452          new CombiningCallable<U>() {
1453            @Override
1454            @NullableDecl
1455            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1456              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1457            }
1458
1459            @Override
1460            public String toString() {
1461              return function.toString();
1462            }
1463          },
1464          executor);
1465    }
1466
1467    /**
1468     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1469     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1470     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1471     * captured by the returned {@link ClosingFuture}).
1472     *
1473     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1474     * any of the inputs fail, so will the returned step.
1475     *
1476     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1477     *
1478     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1479     * ExecutionException} will be extracted and used as the failure of the derived step.
1480     *
1481     * <p>If the function throws any other exception, it will be used as the failure of the derived
1482     * step.
1483     *
1484     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1485     * the closeable objects in that {@code ClosingFuture} will be closed.
1486     *
1487     * <p>Usage guidelines for this method:
1488     *
1489     * <ul>
1490     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1491     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1492     *       Executor)} instead, with a function that returns the next value directly.
1493     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1494     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1495     *       capture it for later closing.
1496     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1497     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1498     * </ul>
1499     *
1500     * <p>The same warnings about doing heavyweight operations within {@link
1501     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1502     */
1503    public <U extends Object> ClosingFuture<U> callAsync(
1504        final AsyncClosingFunction2<V1, V2, U> function, Executor executor) {
1505      return callAsync(
1506          new AsyncCombiningCallable<U>() {
1507            @Override
1508            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1509              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1510            }
1511
1512            @Override
1513            public String toString() {
1514              return function.toString();
1515            }
1516          },
1517          executor);
1518    }
1519  }
1520
1521  /**
1522   * A generic {@link Combiner} that lets you use a lambda or method reference to combine three
1523   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1524   * ClosingFuture)} to start this combination.
1525   *
1526   * @param <V1> the type returned by the first future
1527   * @param <V2> the type returned by the second future
1528   * @param <V3> the type returned by the third future
1529   */
1530  public static final class Combiner3<V1 extends Object, V2 extends Object, V3 extends Object>
1531      extends Combiner {
1532    /**
1533     * A function that returns a value when applied to the values of the three futures passed to
1534     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1535     *
1536     * @param <V1> the type returned by the first future
1537     * @param <V2> the type returned by the second future
1538     * @param <V3> the type returned by the third future
1539     * @param <U> the type returned by the function
1540     */
1541    public interface ClosingFunction3<
1542        V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> {
1543      /**
1544       * Applies this function to three inputs, or throws an exception if unable to do so.
1545       *
1546       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1547       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1548       * is done (but not before this method completes), even if this method throws or the pipeline
1549       * is cancelled.
1550       */
1551      @NullableDecl
1552      U apply(
1553          DeferredCloser closer,
1554          @NullableDecl V1 value1,
1555          @NullableDecl V2 value2,
1556          @NullableDecl V3 v3)
1557          throws Exception;
1558    }
1559
1560    /**
1561     * A function that returns a {@link ClosingFuture} when applied to the values of the three
1562     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1563     *
1564     * @param <V1> the type returned by the first future
1565     * @param <V2> the type returned by the second future
1566     * @param <V3> the type returned by the third future
1567     * @param <U> the type returned by the function
1568     */
1569    public interface AsyncClosingFunction3<
1570        V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> {
1571      /**
1572       * Applies this function to three inputs, or throws an exception if unable to do so.
1573       *
1574       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1575       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1576       * is done (but not before this method completes), even if this method throws or the pipeline
1577       * is cancelled.
1578       */
1579      ClosingFuture<U> apply(
1580          DeferredCloser closer,
1581          @NullableDecl V1 value1,
1582          @NullableDecl V2 value2,
1583          @NullableDecl V3 value3)
1584          throws Exception;
1585    }
1586
1587    private final ClosingFuture<V1> future1;
1588    private final ClosingFuture<V2> future2;
1589    private final ClosingFuture<V3> future3;
1590
1591    private Combiner3(
1592        ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
1593      super(true, ImmutableList.of(future1, future2, future3));
1594      this.future1 = future1;
1595      this.future2 = future2;
1596      this.future3 = future3;
1597    }
1598
1599    /**
1600     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1601     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1602     * objects to be closed when the pipeline is done.
1603     *
1604     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1605     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1606     *
1607     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1608     *
1609     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1610     * ExecutionException} will be extracted and used as the failure of the derived step.
1611     */
1612    public <U extends Object> ClosingFuture<U> call(
1613        final ClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1614      return call(
1615          new CombiningCallable<U>() {
1616            @Override
1617            @NullableDecl
1618            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1619              return function.apply(
1620                  closer,
1621                  peeker.getDone(future1),
1622                  peeker.getDone(future2),
1623                  peeker.getDone(future3));
1624            }
1625
1626            @Override
1627            public String toString() {
1628              return function.toString();
1629            }
1630          },
1631          executor);
1632    }
1633
1634    /**
1635     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1636     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1637     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1638     * captured by the returned {@link ClosingFuture}).
1639     *
1640     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1641     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1642     *
1643     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1644     *
1645     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1646     * ExecutionException} will be extracted and used as the failure of the derived step.
1647     *
1648     * <p>If the function throws any other exception, it will be used as the failure of the derived
1649     * step.
1650     *
1651     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1652     * the closeable objects in that {@code ClosingFuture} will be closed.
1653     *
1654     * <p>Usage guidelines for this method:
1655     *
1656     * <ul>
1657     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1658     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1659     *       Executor)} instead, with a function that returns the next value directly.
1660     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1661     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1662     *       capture it for later closing.
1663     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1664     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1665     * </ul>
1666     *
1667     * <p>The same warnings about doing heavyweight operations within {@link
1668     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1669     */
1670    public <U extends Object> ClosingFuture<U> callAsync(
1671        final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1672      return callAsync(
1673          new AsyncCombiningCallable<U>() {
1674            @Override
1675            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1676              return function.apply(
1677                  closer,
1678                  peeker.getDone(future1),
1679                  peeker.getDone(future2),
1680                  peeker.getDone(future3));
1681            }
1682
1683            @Override
1684            public String toString() {
1685              return function.toString();
1686            }
1687          },
1688          executor);
1689    }
1690  }
1691
1692  /**
1693   * A generic {@link Combiner} that lets you use a lambda or method reference to combine four
1694   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1695   * ClosingFuture)} to start this combination.
1696   *
1697   * @param <V1> the type returned by the first future
1698   * @param <V2> the type returned by the second future
1699   * @param <V3> the type returned by the third future
1700   * @param <V4> the type returned by the fourth future
1701   */
1702  public static final class Combiner4<
1703          V1 extends Object, V2 extends Object, V3 extends Object, V4 extends Object>
1704      extends Combiner {
1705    /**
1706     * A function that returns a value when applied to the values of the four futures passed to
1707     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}.
1708     *
1709     * @param <V1> the type returned by the first future
1710     * @param <V2> the type returned by the second future
1711     * @param <V3> the type returned by the third future
1712     * @param <V4> the type returned by the fourth future
1713     * @param <U> the type returned by the function
1714     */
1715    public interface ClosingFunction4<
1716        V1 extends Object,
1717        V2 extends Object,
1718        V3 extends Object,
1719        V4 extends Object,
1720        U extends Object> {
1721      /**
1722       * Applies this function to four inputs, or throws an exception if unable to do so.
1723       *
1724       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1725       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1726       * is done (but not before this method completes), even if this method throws or the pipeline
1727       * is cancelled.
1728       */
1729      @NullableDecl
1730      U apply(
1731          DeferredCloser closer,
1732          @NullableDecl V1 value1,
1733          @NullableDecl V2 value2,
1734          @NullableDecl V3 value3,
1735          @NullableDecl V4 value4)
1736          throws Exception;
1737    }
1738
1739    /**
1740     * A function that returns a {@link ClosingFuture} when applied to the values of the four
1741     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1742     * ClosingFuture)}.
1743     *
1744     * @param <V1> the type returned by the first future
1745     * @param <V2> the type returned by the second future
1746     * @param <V3> the type returned by the third future
1747     * @param <V4> the type returned by the fourth future
1748     * @param <U> the type returned by the function
1749     */
1750    public interface AsyncClosingFunction4<
1751        V1 extends Object,
1752        V2 extends Object,
1753        V3 extends Object,
1754        V4 extends Object,
1755        U extends Object> {
1756      /**
1757       * Applies this function to four inputs, or throws an exception if unable to do so.
1758       *
1759       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1760       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1761       * is done (but not before this method completes), even if this method throws or the pipeline
1762       * is cancelled.
1763       */
1764      ClosingFuture<U> apply(
1765          DeferredCloser closer,
1766          @NullableDecl V1 value1,
1767          @NullableDecl V2 value2,
1768          @NullableDecl V3 value3,
1769          @NullableDecl V4 value4)
1770          throws Exception;
1771    }
1772
1773    private final ClosingFuture<V1> future1;
1774    private final ClosingFuture<V2> future2;
1775    private final ClosingFuture<V3> future3;
1776    private final ClosingFuture<V4> future4;
1777
1778    private Combiner4(
1779        ClosingFuture<V1> future1,
1780        ClosingFuture<V2> future2,
1781        ClosingFuture<V3> future3,
1782        ClosingFuture<V4> future4) {
1783      super(true, ImmutableList.of(future1, future2, future3, future4));
1784      this.future1 = future1;
1785      this.future2 = future2;
1786      this.future3 = future3;
1787      this.future4 = future4;
1788    }
1789
1790    /**
1791     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1792     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1793     * objects to be closed when the pipeline is done.
1794     *
1795     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1796     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1797     *
1798     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1799     *
1800     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1801     * ExecutionException} will be extracted and used as the failure of the derived step.
1802     */
1803    public <U extends Object> ClosingFuture<U> call(
1804        final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1805      return call(
1806          new CombiningCallable<U>() {
1807            @Override
1808            @NullableDecl
1809            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1810              return function.apply(
1811                  closer,
1812                  peeker.getDone(future1),
1813                  peeker.getDone(future2),
1814                  peeker.getDone(future3),
1815                  peeker.getDone(future4));
1816            }
1817
1818            @Override
1819            public String toString() {
1820              return function.toString();
1821            }
1822          },
1823          executor);
1824    }
1825
1826    /**
1827     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1828     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1829     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1830     * captured by the returned {@link ClosingFuture}).
1831     *
1832     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1833     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1834     *
1835     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1836     *
1837     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1838     * ExecutionException} will be extracted and used as the failure of the derived step.
1839     *
1840     * <p>If the function throws any other exception, it will be used as the failure of the derived
1841     * step.
1842     *
1843     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1844     * the closeable objects in that {@code ClosingFuture} will be closed.
1845     *
1846     * <p>Usage guidelines for this method:
1847     *
1848     * <ul>
1849     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1850     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1851     *       Executor)} instead, with a function that returns the next value directly.
1852     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1853     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1854     *       capture it for later closing.
1855     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1856     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1857     * </ul>
1858     *
1859     * <p>The same warnings about doing heavyweight operations within {@link
1860     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1861     */
1862    public <U extends Object> ClosingFuture<U> callAsync(
1863        final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1864      return callAsync(
1865          new AsyncCombiningCallable<U>() {
1866            @Override
1867            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1868              return function.apply(
1869                  closer,
1870                  peeker.getDone(future1),
1871                  peeker.getDone(future2),
1872                  peeker.getDone(future3),
1873                  peeker.getDone(future4));
1874            }
1875
1876            @Override
1877            public String toString() {
1878              return function.toString();
1879            }
1880          },
1881          executor);
1882    }
1883  }
1884
1885  /**
1886   * A generic {@link Combiner} that lets you use a lambda or method reference to combine five
1887   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1888   * ClosingFuture, ClosingFuture)} to start this combination.
1889   *
1890   * @param <V1> the type returned by the first future
1891   * @param <V2> the type returned by the second future
1892   * @param <V3> the type returned by the third future
1893   * @param <V4> the type returned by the fourth future
1894   * @param <V5> the type returned by the fifth future
1895   */
1896  public static final class Combiner5<
1897          V1 extends Object,
1898          V2 extends Object,
1899          V3 extends Object,
1900          V4 extends Object,
1901          V5 extends Object>
1902      extends Combiner {
1903    /**
1904     * A function that returns a value when applied to the values of the five futures passed to
1905     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture,
1906     * ClosingFuture)}.
1907     *
1908     * @param <V1> the type returned by the first future
1909     * @param <V2> the type returned by the second future
1910     * @param <V3> the type returned by the third future
1911     * @param <V4> the type returned by the fourth future
1912     * @param <V5> the type returned by the fifth future
1913     * @param <U> the type returned by the function
1914     */
1915    public interface ClosingFunction5<
1916        V1 extends Object,
1917        V2 extends Object,
1918        V3 extends Object,
1919        V4 extends Object,
1920        V5 extends Object,
1921        U extends Object> {
1922      /**
1923       * Applies this function to five inputs, or throws an exception if unable to do so.
1924       *
1925       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1926       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1927       * is done (but not before this method completes), even if this method throws or the pipeline
1928       * is cancelled.
1929       */
1930      @NullableDecl
1931      U apply(
1932          DeferredCloser closer,
1933          @NullableDecl V1 value1,
1934          @NullableDecl V2 value2,
1935          @NullableDecl V3 value3,
1936          @NullableDecl V4 value4,
1937          @NullableDecl V5 value5)
1938          throws Exception;
1939    }
1940
1941    /**
1942     * A function that returns a {@link ClosingFuture} when applied to the values of the five
1943     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1944     * ClosingFuture, ClosingFuture)}.
1945     *
1946     * @param <V1> the type returned by the first future
1947     * @param <V2> the type returned by the second future
1948     * @param <V3> the type returned by the third future
1949     * @param <V4> the type returned by the fourth future
1950     * @param <V5> the type returned by the fifth future
1951     * @param <U> the type returned by the function
1952     */
1953    public interface AsyncClosingFunction5<
1954        V1 extends Object,
1955        V2 extends Object,
1956        V3 extends Object,
1957        V4 extends Object,
1958        V5 extends Object,
1959        U extends Object> {
1960      /**
1961       * Applies this function to five inputs, or throws an exception if unable to do so.
1962       *
1963       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1964       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1965       * is done (but not before this method completes), even if this method throws or the pipeline
1966       * is cancelled.
1967       */
1968      ClosingFuture<U> apply(
1969          DeferredCloser closer,
1970          @NullableDecl V1 value1,
1971          @NullableDecl V2 value2,
1972          @NullableDecl V3 value3,
1973          @NullableDecl V4 value4,
1974          @NullableDecl V5 value5)
1975          throws Exception;
1976    }
1977
1978    private final ClosingFuture<V1> future1;
1979    private final ClosingFuture<V2> future2;
1980    private final ClosingFuture<V3> future3;
1981    private final ClosingFuture<V4> future4;
1982    private final ClosingFuture<V5> future5;
1983
1984    private Combiner5(
1985        ClosingFuture<V1> future1,
1986        ClosingFuture<V2> future2,
1987        ClosingFuture<V3> future3,
1988        ClosingFuture<V4> future4,
1989        ClosingFuture<V5> future5) {
1990      super(true, ImmutableList.of(future1, future2, future3, future4, future5));
1991      this.future1 = future1;
1992      this.future2 = future2;
1993      this.future3 = future3;
1994      this.future4 = future4;
1995      this.future5 = future5;
1996    }
1997
1998    /**
1999     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2000     * combining function to their values. The function can use a {@link DeferredCloser} to capture
2001     * objects to be closed when the pipeline is done.
2002     *
2003     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2004     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2005     * returned step.
2006     *
2007     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2008     *
2009     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2010     * ExecutionException} will be extracted and used as the failure of the derived step.
2011     */
2012    public <U extends Object> ClosingFuture<U> call(
2013        final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2014      return call(
2015          new CombiningCallable<U>() {
2016            @Override
2017            @NullableDecl
2018            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
2019              return function.apply(
2020                  closer,
2021                  peeker.getDone(future1),
2022                  peeker.getDone(future2),
2023                  peeker.getDone(future3),
2024                  peeker.getDone(future4),
2025                  peeker.getDone(future5));
2026            }
2027
2028            @Override
2029            public String toString() {
2030              return function.toString();
2031            }
2032          },
2033          executor);
2034    }
2035
2036    /**
2037     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2038     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
2039     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
2040     * captured by the returned {@link ClosingFuture}).
2041     *
2042     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2043     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2044     * returned step.
2045     *
2046     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2047     *
2048     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2049     * ExecutionException} will be extracted and used as the failure of the derived step.
2050     *
2051     * <p>If the function throws any other exception, it will be used as the failure of the derived
2052     * step.
2053     *
2054     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
2055     * the closeable objects in that {@code ClosingFuture} will be closed.
2056     *
2057     * <p>Usage guidelines for this method:
2058     *
2059     * <ul>
2060     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
2061     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
2062     *       Executor)} instead, with a function that returns the next value directly.
2063     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
2064     *       closer.eventuallyClose()} for every closeable object this step creates in order to
2065     *       capture it for later closing.
2066     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
2067     *       ClosingFuture} call {@link #from(ListenableFuture)}.
2068     * </ul>
2069     *
2070     * <p>The same warnings about doing heavyweight operations within {@link
2071     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
2072     */
2073    public <U extends Object> ClosingFuture<U> callAsync(
2074        final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2075      return callAsync(
2076          new AsyncCombiningCallable<U>() {
2077            @Override
2078            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
2079              return function.apply(
2080                  closer,
2081                  peeker.getDone(future1),
2082                  peeker.getDone(future2),
2083                  peeker.getDone(future3),
2084                  peeker.getDone(future4),
2085                  peeker.getDone(future5));
2086            }
2087
2088            @Override
2089            public String toString() {
2090              return function.toString();
2091            }
2092          },
2093          executor);
2094    }
2095  }
2096
2097  @Override
2098  public String toString() {
2099    // TODO(dpb): Better toString, in the style of Futures.transform etc.
2100    return toStringHelper(this).add("state", state.get()).addValue(future).toString();
2101  }
2102
2103  @Override
2104  protected void finalize() {
2105    if (state.get().equals(OPEN)) {
2106      logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this);
2107      FluentFuture<V> unused = finishToFuture();
2108    }
2109  }
2110
2111  private static void closeQuietly(final Closeable closeable, Executor executor) {
2112    if (closeable == null) {
2113      return;
2114    }
2115    try {
2116      executor.execute(
2117          new Runnable() {
2118            @Override
2119            public void run() {
2120              try {
2121                closeable.close();
2122              } catch (IOException | RuntimeException e) {
2123                logger.log(WARNING, "thrown by close()", e);
2124              }
2125            }
2126          });
2127    } catch (RejectedExecutionException e) {
2128      if (logger.isLoggable(WARNING)) {
2129        logger.log(
2130            WARNING, String.format("while submitting close to %s; will close inline", executor), e);
2131      }
2132      closeQuietly(closeable, directExecutor());
2133    }
2134  }
2135
2136  private void checkAndUpdateState(State oldState, State newState) {
2137    checkState(
2138        compareAndUpdateState(oldState, newState),
2139        "Expected state to be %s, but it was %s",
2140        oldState,
2141        newState);
2142  }
2143
2144  private boolean compareAndUpdateState(State oldState, State newState) {
2145    return state.compareAndSet(oldState, newState);
2146  }
2147
2148  // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap?
2149  private static final class CloseableList extends IdentityHashMap<Closeable, Executor>
2150      implements Closeable {
2151    private final DeferredCloser closer = new DeferredCloser(this);
2152    private volatile boolean closed;
2153    private volatile CountDownLatch whenClosed;
2154
2155    <V, U> ListenableFuture<U> applyClosingFunction(
2156        ClosingFunction<? super V, U> transformation, V input) throws Exception {
2157      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2158      CloseableList newCloseables = new CloseableList();
2159      try {
2160        return immediateFuture(transformation.apply(newCloseables.closer, input));
2161      } finally {
2162        add(newCloseables, directExecutor());
2163      }
2164    }
2165
2166    <V, U> FluentFuture<U> applyAsyncClosingFunction(
2167        AsyncClosingFunction<V, U> transformation, V input) throws Exception {
2168      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2169      CloseableList newCloseables = new CloseableList();
2170      try {
2171        ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input);
2172        closingFuture.becomeSubsumedInto(newCloseables);
2173        return closingFuture.future;
2174      } finally {
2175        add(newCloseables, directExecutor());
2176      }
2177    }
2178
2179    @Override
2180    public void close() {
2181      if (closed) {
2182        return;
2183      }
2184      synchronized (this) {
2185        if (closed) {
2186          return;
2187        }
2188        closed = true;
2189      }
2190      for (Map.Entry<Closeable, Executor> entry : entrySet()) {
2191        closeQuietly(entry.getKey(), entry.getValue());
2192      }
2193      clear();
2194      if (whenClosed != null) {
2195        whenClosed.countDown();
2196      }
2197    }
2198
2199    void add(@NullableDecl Closeable closeable, Executor executor) {
2200      checkNotNull(executor);
2201      if (closeable == null) {
2202        return;
2203      }
2204      synchronized (this) {
2205        if (!closed) {
2206          put(closeable, executor);
2207          return;
2208        }
2209      }
2210      closeQuietly(closeable, executor);
2211    }
2212
2213    /**
2214     * Returns a latch that reaches zero when this objects' deferred closeables have been closed.
2215     */
2216    CountDownLatch whenClosedCountDown() {
2217      if (closed) {
2218        return new CountDownLatch(0);
2219      }
2220      synchronized (this) {
2221        if (closed) {
2222          return new CountDownLatch(0);
2223        }
2224        checkState(whenClosed == null);
2225        return whenClosed = new CountDownLatch(1);
2226      }
2227    }
2228  }
2229
2230  /**
2231   * Returns an object that can be used to wait until this objects' deferred closeables have all had
2232   * {@link Runnable}s that close them submitted to each one's closing {@link Executor}.
2233   */
2234  @VisibleForTesting
2235  CountDownLatch whenClosedCountDown() {
2236    return closeables.whenClosedCountDown();
2237  }
2238
2239  /** The state of a {@link CloseableList}. */
2240  enum State {
2241    /** The {@link CloseableList} has not been subsumed or closed. */
2242    OPEN,
2243
2244    /**
2245     * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed
2246     * into any other.
2247     */
2248    SUBSUMED,
2249
2250    /**
2251     * Some {@link ListenableFuture} has a callback attached that will close the {@link
2252     * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed.
2253     */
2254    WILL_CLOSE,
2255
2256    /**
2257     * The callback that closes the {@link CloseableList} is running, but it has not completed. The
2258     * {@link CloseableList} may not be subsumed.
2259     */
2260    CLOSING,
2261
2262    /** The {@link CloseableList} has been closed. It may not be further subsumed. */
2263    CLOSED,
2264
2265    /**
2266     * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been
2267     * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called.
2268     */
2269    WILL_CREATE_VALUE_AND_CLOSER,
2270  }
2271}