001/*
002 * Copyright (C) 2017 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Functions.constant;
020import static com.google.common.base.MoreObjects.toStringHelper;
021import static com.google.common.base.Preconditions.checkArgument;
022import static com.google.common.base.Preconditions.checkNotNull;
023import static com.google.common.base.Preconditions.checkState;
024import static com.google.common.collect.Lists.asList;
025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED;
026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING;
027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN;
028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED;
029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE;
030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER;
031import static com.google.common.util.concurrent.Futures.getDone;
032import static com.google.common.util.concurrent.Futures.immediateFuture;
033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
034import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
035import static java.util.logging.Level.FINER;
036import static java.util.logging.Level.SEVERE;
037import static java.util.logging.Level.WARNING;
038
039import com.google.common.annotations.Beta;
040import com.google.common.annotations.VisibleForTesting;
041import com.google.common.base.Function;
042import com.google.common.collect.FluentIterable;
043import com.google.common.collect.ImmutableList;
044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
046import com.google.common.util.concurrent.Futures.FutureCombiner;
047import com.google.errorprone.annotations.CanIgnoreReturnValue;
048import com.google.errorprone.annotations.DoNotMock;
049import com.google.j2objc.annotations.RetainedWith;
050import java.io.Closeable;
051import java.util.IdentityHashMap;
052import java.util.Map;
053import java.util.concurrent.Callable;
054import java.util.concurrent.CancellationException;
055import java.util.concurrent.CountDownLatch;
056import java.util.concurrent.ExecutionException;
057import java.util.concurrent.Executor;
058import java.util.concurrent.Future;
059import java.util.concurrent.RejectedExecutionException;
060import java.util.concurrent.atomic.AtomicReference;
061import java.util.logging.Logger;
062import org.checkerframework.checker.nullness.qual.Nullable;
063
064/**
065 * A step in a pipeline of an asynchronous computation. When the last step in the computation is
066 * complete, some objects captured during the computation are closed.
067 *
068 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an
069 * asynchronously-computed intermediate value, or else an exception that indicates the failure or
070 * cancellation of the operation so far. The only way to extract the value or exception from a step
071 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the
072 * "value" of a successful step or the "result" (value or exception) of any step.
073 *
074 * <ol>
075 *   <li>A pipeline starts at its leaf step (or steps), which is created from either a callable
076 *       block or a {@link ListenableFuture}.
077 *   <li>Each other step is derived from one or more input steps. At each step, zero or more objects
078 *       can be captured for later closing.
079 *   <li>There is one last step (the root of the tree), from which you can extract the final result
080 *       of the computation. After that result is available (or the computation fails), all objects
081 *       captured by any of the steps in the pipeline are closed.
082 * </ol>
083 *
084 * <h3>Starting a pipeline</h3>
085 *
086 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a
087 * callable block} that may capture objects for later closing. To start a pipeline from a {@link
088 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link
089 * #from(ListenableFuture)} instead.
090 *
091 * <h3>Derived steps</h3>
092 *
093 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in
094 * ways similar to {@link FluentFuture}s:
095 *
096 * <ul>
097 *   <li>by transforming the value from a successful input step,
098 *   <li>by catching the exception from a failed input step, or
099 *   <li>by combining the results of several input steps.
100 * </ul>
101 *
102 * Each derivation can capture the next value or any intermediate objects for later closing.
103 *
104 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its
105 * exception, or combine it with others, you cannot do anything else with it, including declare it
106 * to be the last step of the pipeline.
107 *
108 * <h4>Transforming</h4>
109 *
110 * To derive the next step by asynchronously applying a function to an input step's value, call
111 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction,
112 * Executor)} on the input step.
113 *
114 * <h4>Catching</h4>
115 *
116 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction,
117 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step.
118 *
119 * <h4>Combining</h4>
120 *
121 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link
122 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads.
123 *
124 * <h3>Cancelling</h3>
125 *
126 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step
127 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a
128 * successfully cancelled step will immediately start closing all objects captured for later closing
129 * by it and by its input steps.
130 *
131 * <h3>Ending a pipeline</h3>
132 *
133 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to
134 * close the captured objects automatically or manually.
135 *
136 * <h4>Automatically closing</h4>
137 *
138 * You can extract a {@link Future} that represents the result of the last step in the pipeline by
139 * calling {@link #finishToFuture()}. When that final {@link Future} is done, all objects captured
140 * by all steps in the pipeline will be closed.
141 *
142 * <pre>{@code
143 * FluentFuture<UserName> userName =
144 *     ClosingFuture.submit(
145 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
146 *             executor)
147 *         .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
148 *         .transform((closer, result) -> result.get("userName"), directExecutor())
149 *         .catching(DBException.class, e -> "no user", directExecutor())
150 *         .finishToFuture();
151 * }</pre>
152 *
153 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query
154 * result cursor will both be closed, even if the operation is cancelled or fails.
155 *
156 * <h4>Manually closing</h4>
157 *
158 * If you want to close the captured objects manually, after you've used the final result, call
159 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the
160 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects.
161 *
162 * <pre>{@code
163 *     ClosingFuture.submit(
164 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
165 *             executor)
166 *     .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
167 *     .transform((closer, result) -> result.get("userName"), directExecutor())
168 *     .catching(DBException.class, e -> "no user", directExecutor())
169 *     .finishToValueAndCloser(
170 *         valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor);
171 *
172 * // later
173 * try { // get() will throw if the operation failed or was cancelled.
174 *   UserName userName = userNameValueAndCloser.get();
175 *   // do something with userName
176 * } finally {
177 *   userNameValueAndCloser.closeAsync();
178 * }
179 * }</pre>
180 *
181 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and
182 * the query result cursor will both be closed, even if the operation is cancelled or fails.
183 *
184 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The
185 * automatic-closing approach described above is safer.
186 *
187 * @param <V> the type of the value of this step
188 * @since 30.0
189 */
190// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations.
191@Beta // @Beta for one release.
192@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)")
193// TODO(dpb): GWT compatibility.
194public final class ClosingFuture<V> {
195
196  private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName());
197
198  /**
199   * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is
200   * done.
201   */
202  public static final class DeferredCloser {
203    @RetainedWith private final CloseableList list;
204
205    DeferredCloser(CloseableList list) {
206      this.list = list;
207    }
208
209    /**
210     * Captures an object to be closed when a {@link ClosingFuture} pipeline is done.
211     *
212     * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code
213     * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code
214     * Closeable}. (For more about the flavors, see <a
215     * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your
216     * build</a>.)
217     *
218     * <p>Be careful when targeting an older SDK than you are building against (most commonly when
219     * building for Android): Ensure that any object you pass implements the interface not just in
220     * your current SDK version but also at the oldest version you support. For example, <a
221     * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version
222     * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper
223     * {@code Closeable} with a method reference like {@code cursor::close}.
224     *
225     * <p>Note that this method is still binary-compatible between flavors because the erasure of
226     * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}.
227     *
228     * @param closeable the object to be closed (see notes above)
229     * @param closingExecutor the object will be closed on this executor
230     * @return the first argument
231     */
232    @CanIgnoreReturnValue
233    public <C extends @Nullable Object & @Nullable AutoCloseable> C eventuallyClose(
234        C closeable, Executor closingExecutor) {
235      checkNotNull(closingExecutor);
236      if (closeable != null) {
237        list.add(closeable, closingExecutor);
238      }
239      return closeable;
240    }
241  }
242
243  /**
244   * An operation that computes a result.
245   *
246   * @param <V> the type of the result
247   */
248  @FunctionalInterface
249  public interface ClosingCallable<V extends @Nullable Object> {
250    /**
251     * Computes a result, or throws an exception if unable to do so.
252     *
253     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
254     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
255     * not before this method completes), even if this method throws or the pipeline is cancelled.
256     */
257    V call(DeferredCloser closer) throws Exception;
258  }
259
260  /**
261   * An operation that computes a {@link ClosingFuture} of a result.
262   *
263   * @param <V> the type of the result
264   * @since 30.1
265   */
266  @FunctionalInterface
267  public interface AsyncClosingCallable<V extends @Nullable Object> {
268    /**
269     * Computes a result, or throws an exception if unable to do so.
270     *
271     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
272     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
273     * not before this method completes), even if this method throws or the pipeline is cancelled.
274     */
275    ClosingFuture<V> call(DeferredCloser closer) throws Exception;
276  }
277
278  /**
279   * A function from an input to a result.
280   *
281   * @param <T> the type of the input to the function
282   * @param <U> the type of the result of the function
283   */
284  @FunctionalInterface
285  public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> {
286
287    /**
288     * Applies this function to an input, or throws an exception if unable to do so.
289     *
290     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
291     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
292     * not before this method completes), even if this method throws or the pipeline is cancelled.
293     */
294    U apply(DeferredCloser closer, T input) throws Exception;
295  }
296
297  /**
298   * A function from an input to a {@link ClosingFuture} of a result.
299   *
300   * @param <T> the type of the input to the function
301   * @param <U> the type of the result of the function
302   */
303  @FunctionalInterface
304  public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> {
305    /**
306     * Applies this function to an input, or throws an exception if unable to do so.
307     *
308     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
309     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
310     * not before this method completes), even if this method throws or the pipeline is cancelled.
311     */
312    ClosingFuture<U> apply(DeferredCloser closer, T input) throws Exception;
313  }
314
315  /**
316   * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and
317   * allows the user to close all the closeable objects that were captured during it for later
318   * closing.
319   *
320   * <p>The asynchronous operation will have completed before this object is created.
321   *
322   * @param <V> the type of the value of a successful operation
323   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
324   */
325  public static final class ValueAndCloser<V> {
326
327    private final ClosingFuture<? extends V> closingFuture;
328
329    ValueAndCloser(ClosingFuture<? extends V> closingFuture) {
330      this.closingFuture = checkNotNull(closingFuture);
331    }
332
333    /**
334     * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as
335     * {@link Future#get()} would.
336     *
337     * <p>Because the asynchronous operation has already completed, this method is synchronous and
338     * returns immediately.
339     *
340     * @throws CancellationException if the computation was cancelled
341     * @throws ExecutionException if the computation threw an exception
342     */
343    @Nullable
344    public V get() throws ExecutionException {
345      return getDone(closingFuture.future);
346    }
347
348    /**
349     * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous
350     * operation on the {@link Executor}s specified by calls to {@link
351     * DeferredCloser#eventuallyClose(Closeable, Executor)}.
352     *
353     * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be
354     * closed synchronously.
355     *
356     * <p>Idempotent: objects will be closed at most once.
357     */
358    public void closeAsync() {
359      closingFuture.close();
360    }
361  }
362
363  /**
364   * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link
365   * ClosingFuture} pipeline.
366   *
367   * @param <V> the type of the final value of a successful pipeline
368   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
369   */
370  @FunctionalInterface
371  public interface ValueAndCloserConsumer<V> {
372
373    /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */
374    void accept(ValueAndCloser<V> valueAndCloser);
375  }
376
377  /**
378   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
379   *
380   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
381   *     execution
382   */
383  public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) {
384    return new ClosingFuture<>(callable, executor);
385  }
386
387  /**
388   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
389   *
390   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
391   *     execution
392   * @since 30.1
393   */
394  public static <V> ClosingFuture<V> submitAsync(
395      AsyncClosingCallable<V> callable, Executor executor) {
396    return new ClosingFuture<>(callable, executor);
397  }
398
399  /**
400   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
401   *
402   * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V}
403   * implements {@link Closeable}. In order to start a pipeline with a value that will be closed
404   * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead.
405   */
406  public static <V> ClosingFuture<V> from(ListenableFuture<V> future) {
407    return new ClosingFuture<V>(future);
408  }
409
410  /**
411   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
412   *
413   * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when
414   * the pipeline is done, even if the pipeline is canceled or fails.
415   *
416   * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its
417   * value in order to close it.
418   *
419   * @param future the future to create the {@code ClosingFuture} from. For discussion of the
420   *     future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable,
421   *     Executor)}.
422   * @param closingExecutor the future's result will be closed on this executor
423   * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the
424   *     underlying value may never be closed if the {@link Future} is canceled after its operation
425   *     begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types,
426   *     including those that pass them to this method, with {@link #submit(ClosingCallable,
427   *     Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a
428   *     {@link ListenableFuture} that doesn't create values that should be closed, use {@link
429   *     ClosingFuture#from}.
430   */
431  @Deprecated
432  public static <C extends @Nullable Object & @Nullable AutoCloseable>
433      ClosingFuture<C> eventuallyClosing(
434          ListenableFuture<C> future, final Executor closingExecutor) {
435    checkNotNull(closingExecutor);
436    final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future));
437    Futures.addCallback(
438        future,
439        new FutureCallback<@Nullable AutoCloseable>() {
440          @Override
441          public void onSuccess(@Nullable AutoCloseable result) {
442            closingFuture.closeables.closer.eventuallyClose(result, closingExecutor);
443          }
444
445          @Override
446          public void onFailure(Throwable t) {}
447        },
448        directExecutor());
449    return closingFuture;
450  }
451
452  /**
453   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
454   *
455   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
456   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
457   */
458  public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) {
459    return new Combiner(false, futures);
460  }
461
462  /**
463   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
464   *
465   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
466   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
467   */
468  public static Combiner whenAllComplete(
469      ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) {
470    return whenAllComplete(asList(future1, moreFutures));
471  }
472
473  /**
474   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
475   * all succeed. If any fail, the resulting pipeline will fail.
476   *
477   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
478   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
479   */
480  public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) {
481    return new Combiner(true, futures);
482  }
483
484  /**
485   * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming
486   * they all succeed. If any fail, the resulting pipeline will fail.
487   *
488   * <p>Calling this method allows you to use lambdas or method references typed with the types of
489   * the input {@link ClosingFuture}s.
490   *
491   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
492   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
493   */
494  public static <V1, V2> Combiner2<V1, V2> whenAllSucceed(
495      ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
496    return new Combiner2<>(future1, future2);
497  }
498
499  /**
500   * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming
501   * they all succeed. If any fail, the resulting pipeline will fail.
502   *
503   * <p>Calling this method allows you to use lambdas or method references typed with the types of
504   * the input {@link ClosingFuture}s.
505   *
506   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
507   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
508   */
509  public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed(
510      ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
511    return new Combiner3<>(future1, future2, future3);
512  }
513
514  /**
515   * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming
516   * they all succeed. If any fail, the resulting pipeline will fail.
517   *
518   * <p>Calling this method allows you to use lambdas or method references typed with the types of
519   * the input {@link ClosingFuture}s.
520   *
521   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
522   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
523   */
524  public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed(
525      ClosingFuture<V1> future1,
526      ClosingFuture<V2> future2,
527      ClosingFuture<V3> future3,
528      ClosingFuture<V4> future4) {
529    return new Combiner4<>(future1, future2, future3, future4);
530  }
531
532  /**
533   * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming
534   * they all succeed. If any fail, the resulting pipeline will fail.
535   *
536   * <p>Calling this method allows you to use lambdas or method references typed with the types of
537   * the input {@link ClosingFuture}s.
538   *
539   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
540   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
541   */
542  public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed(
543      ClosingFuture<V1> future1,
544      ClosingFuture<V2> future2,
545      ClosingFuture<V3> future3,
546      ClosingFuture<V4> future4,
547      ClosingFuture<V5> future5) {
548    return new Combiner5<>(future1, future2, future3, future4, future5);
549  }
550
551  /**
552   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
553   * all succeed. If any fail, the resulting pipeline will fail.
554   *
555   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
556   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
557   */
558  public static Combiner whenAllSucceed(
559      ClosingFuture<?> future1,
560      ClosingFuture<?> future2,
561      ClosingFuture<?> future3,
562      ClosingFuture<?> future4,
563      ClosingFuture<?> future5,
564      ClosingFuture<?> future6,
565      ClosingFuture<?>... moreFutures) {
566    return whenAllSucceed(
567        FluentIterable.of(future1, future2, future3, future4, future5, future6)
568            .append(moreFutures));
569  }
570
571  private final AtomicReference<State> state = new AtomicReference<>(OPEN);
572  private final CloseableList closeables = new CloseableList();
573  private final FluentFuture<V> future;
574
575  private ClosingFuture(ListenableFuture<V> future) {
576    this.future = FluentFuture.from(future);
577  }
578
579  private ClosingFuture(final ClosingCallable<V> callable, Executor executor) {
580    checkNotNull(callable);
581    TrustedListenableFutureTask<V> task =
582        TrustedListenableFutureTask.create(
583            new Callable<V>() {
584              @Override
585              public V call() throws Exception {
586                return callable.call(closeables.closer);
587              }
588
589              @Override
590              public String toString() {
591                return callable.toString();
592              }
593            });
594    executor.execute(task);
595    this.future = task;
596  }
597
598  private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) {
599    checkNotNull(callable);
600    TrustedListenableFutureTask<V> task =
601        TrustedListenableFutureTask.create(
602            new AsyncCallable<V>() {
603              @Override
604              public ListenableFuture<V> call() throws Exception {
605                CloseableList newCloseables = new CloseableList();
606                try {
607                  ClosingFuture<V> closingFuture = callable.call(newCloseables.closer);
608                  closingFuture.becomeSubsumedInto(closeables);
609                  return closingFuture.future;
610                } finally {
611                  closeables.add(newCloseables, directExecutor());
612                }
613              }
614
615              @Override
616              public String toString() {
617                return callable.toString();
618              }
619            });
620    executor.execute(task);
621    this.future = task;
622  }
623
624  /**
625   * Returns a future that finishes when this step does. Calling {@code get()} on the returned
626   * future returns {@code null} if the step is successful or throws the same exception that would
627   * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code
628   * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline.
629   *
630   * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls
631   * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or
632   * a derivation method <i>on the same instance</i>. This is important because calling {@code
633   * statusFuture} alone does not provide a way to close the pipeline.
634   */
635  public ListenableFuture<?> statusFuture() {
636    return nonCancellationPropagating(future.transform(constant(null), directExecutor()));
637  }
638
639  /**
640   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
641   * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed
642   * when the pipeline is done.
643   *
644   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
645   * ClosingFuture} will be equivalent to this one.
646   *
647   * <p>If the function throws an exception, that exception is used as the result of the derived
648   * {@code ClosingFuture}.
649   *
650   * <p>Example usage:
651   *
652   * <pre>{@code
653   * ClosingFuture<List<Row>> rowsFuture =
654   *     queryFuture.transform((closer, result) -> result.getRows(), executor);
655   * }</pre>
656   *
657   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
658   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
659   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
660   *
661   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
662   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
663   * this {@code ClosingFuture}.
664   *
665   * @param function transforms the value of this step to the value of the derived step
666   * @param executor executor to run the function in
667   * @return the derived step
668   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
669   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
670   *     finished}
671   */
672  public <U> ClosingFuture<U> transform(
673      final ClosingFunction<? super V, U> function, Executor executor) {
674    checkNotNull(function);
675    AsyncFunction<V, U> applyFunction =
676        new AsyncFunction<V, U>() {
677          @Override
678          public ListenableFuture<U> apply(V input) throws Exception {
679            return closeables.applyClosingFunction(function, input);
680          }
681
682          @Override
683          public String toString() {
684            return function.toString();
685          }
686        };
687    // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function).
688    return derive(future.transformAsync(applyFunction, executor));
689  }
690
691  /**
692   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
693   * that returns a {@code ClosingFuture} to its value. The function can use a {@link
694   * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
695   * captured by the returned {@link ClosingFuture}).
696   *
697   * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one
698   * returned by the function.
699   *
700   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
701   * ClosingFuture} will be equivalent to this one.
702   *
703   * <p>If the function throws an exception, that exception is used as the result of the derived
704   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
705   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
706   * closed.
707   *
708   * <p>Usage guidelines for this method:
709   *
710   * <ul>
711   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
712   *       {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction,
713   *       Executor)} instead, with a function that returns the next value directly.
714   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
715   *       for every closeable object this step creates in order to capture it for later closing.
716   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
717   *       ClosingFuture} call {@link #from(ListenableFuture)}.
718   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
719   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
720   *       {@link #withoutCloser(AsyncFunction)}
721   * </ul>
722   *
723   * <p>Example usage:
724   *
725   * <pre>{@code
726   * // Result.getRowsClosingFuture() returns a ClosingFuture.
727   * ClosingFuture<List<Row>> rowsFuture =
728   *     queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor);
729   *
730   * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the
731   * // number of written rows. openOutputFile() returns a FileOutputStream (which implements
732   * // Closeable).
733   * ClosingFuture<Integer> rowsFuture2 =
734   *     queryFuture.transformAsync(
735   *         (closer, result) -> {
736   *           FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor);
737   *           return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos));
738   *      },
739   *      executor);
740   *
741   * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created).
742   * ClosingFuture<List<Row>> rowsFuture3 =
743   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
744   *
745   * }</pre>
746   *
747   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
748   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
749   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
750   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
751   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
752   * responsible for completing the returned {@code ClosingFuture}.)
753   *
754   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
755   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
756   * this {@code ClosingFuture}.
757   *
758   * @param function transforms the value of this step to a {@code ClosingFuture} with the value of
759   *     the derived step
760   * @param executor executor to run the function in
761   * @return the derived step
762   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
763   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
764   *     finished}
765   */
766  public <U> ClosingFuture<U> transformAsync(
767      final AsyncClosingFunction<? super V, U> function, Executor executor) {
768    checkNotNull(function);
769    AsyncFunction<V, U> applyFunction =
770        new AsyncFunction<V, U>() {
771          @Override
772          public ListenableFuture<U> apply(V input) throws Exception {
773            return closeables.applyAsyncClosingFunction(function, input);
774          }
775
776          @Override
777          public String toString() {
778            return function.toString();
779          }
780        };
781    return derive(future.transformAsync(applyFunction, executor));
782  }
783
784  /**
785   * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input,
786   * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned
787   * {@link ListenableFuture}.
788   *
789   * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction,
790   * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it
791   * meets these conditions:
792   *
793   * <ul>
794   *   <li>It does not need to capture any {@link Closeable} objects by calling {@link
795   *       DeferredCloser#eventuallyClose(Closeable, Executor)}.
796   *   <li>It returns a {@link ListenableFuture}.
797   * </ul>
798   *
799   * <p>Example usage:
800   *
801   * <pre>{@code
802   * // Result.getRowsFuture() returns a ListenableFuture.
803   * ClosingFuture<List<Row>> rowsFuture =
804   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
805   * }</pre>
806   *
807   * @param function transforms the value of a {@code ClosingFuture} step to a {@link
808   *     ListenableFuture} with the value of a derived step
809   */
810  public static <V, U> AsyncClosingFunction<V, U> withoutCloser(
811      final AsyncFunction<V, U> function) {
812    checkNotNull(function);
813    return new AsyncClosingFunction<V, U>() {
814      @Override
815      public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception {
816        return ClosingFuture.from(function.apply(input));
817      }
818    };
819  }
820
821  /**
822   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
823   * to its exception if it is an instance of a given exception type. The function can use a {@link
824   * DeferredCloser} to capture objects to be closed when the pipeline is done.
825   *
826   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
827   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
828   * one.
829   *
830   * <p>If the function throws an exception, that exception is used as the result of the derived
831   * {@code ClosingFuture}.
832   *
833   * <p>Example usage:
834   *
835   * <pre>{@code
836   * ClosingFuture<QueryResult> queryFuture =
837   *     queryFuture.catching(
838   *         QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor);
839   * }</pre>
840   *
841   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
842   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
843   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
844   *
845   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
846   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
847   * this {@code ClosingFuture}.
848   *
849   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
850   *     type is matched against this step's exception. "This step's exception" means the cause of
851   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
852   *     underlying this step or, if {@code get()} throws a different kind of exception, that
853   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
854   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
855   * @param fallback the function to be called if this step fails with the expected exception type.
856   *     The function's argument is this step's exception. "This step's exception" means the cause
857   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
858   *     underlying this step or, if {@code get()} throws a different kind of exception, that
859   *     exception itself.
860   * @param executor the executor that runs {@code fallback} if the input fails
861   */
862  public <X extends Throwable> ClosingFuture<V> catching(
863      Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) {
864    return catchingMoreGeneric(exceptionType, fallback, executor);
865  }
866
867  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
868  private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric(
869      Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) {
870    checkNotNull(fallback);
871    AsyncFunction<X, W> applyFallback =
872        new AsyncFunction<X, W>() {
873          @Override
874          public ListenableFuture<W> apply(X exception) throws Exception {
875            return closeables.applyClosingFunction(fallback, exception);
876          }
877
878          @Override
879          public String toString() {
880            return fallback.toString();
881          }
882        };
883    // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function).
884    return derive(future.catchingAsync(exceptionType, applyFallback, executor));
885  }
886
887  /**
888   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
889   * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception
890   * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the
891   * pipeline is done (other than those captured by the returned {@link ClosingFuture}).
892   *
893   * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code
894   * ClosingFuture} will be equivalent to the one returned by the function.
895   *
896   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
897   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
898   * one.
899   *
900   * <p>If the function throws an exception, that exception is used as the result of the derived
901   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
902   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
903   * closed.
904   *
905   * <p>Usage guidelines for this method:
906   *
907   * <ul>
908   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
909   *       {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class,
910   *       ClosingFunction, Executor)} instead, with a function that returns the next value
911   *       directly.
912   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
913   *       for every closeable object this step creates in order to capture it for later closing.
914   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
915   *       ClosingFuture} call {@link #from(ListenableFuture)}.
916   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
917   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
918   *       {@link #withoutCloser(AsyncFunction)}
919   * </ul>
920   *
921   * <p>Example usage:
922   *
923   * <pre>{@code
924   * // Fall back to a secondary input stream in case of IOException.
925   * ClosingFuture<InputStream> inputFuture =
926   *     firstInputFuture.catchingAsync(
927   *         IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor);
928   * }
929   * }</pre>
930   *
931   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
932   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
933   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
934   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
935   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
936   * responsible for completing the returned {@code ClosingFuture}.)
937   *
938   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
939   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
940   * this {@code ClosingFuture}.
941   *
942   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
943   *     type is matched against this step's exception. "This step's exception" means the cause of
944   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
945   *     underlying this step or, if {@code get()} throws a different kind of exception, that
946   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
947   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
948   * @param fallback the function to be called if this step fails with the expected exception type.
949   *     The function's argument is this step's exception. "This step's exception" means the cause
950   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
951   *     underlying this step or, if {@code get()} throws a different kind of exception, that
952   *     exception itself.
953   * @param executor the executor that runs {@code fallback} if the input fails
954   */
955  // TODO(dpb): Should this do something special if the function throws CancellationException or
956  // ExecutionException?
957  public <X extends Throwable> ClosingFuture<V> catchingAsync(
958      Class<X> exceptionType,
959      AsyncClosingFunction<? super X, ? extends V> fallback,
960      Executor executor) {
961    return catchingAsyncMoreGeneric(exceptionType, fallback, executor);
962  }
963
964  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
965  private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric(
966      Class<X> exceptionType,
967      final AsyncClosingFunction<? super X, W> fallback,
968      Executor executor) {
969    checkNotNull(fallback);
970    AsyncFunction<X, W> asyncFunction =
971        new AsyncFunction<X, W>() {
972          @Override
973          public ListenableFuture<W> apply(X exception) throws Exception {
974            return closeables.applyAsyncClosingFunction(fallback, exception);
975          }
976
977          @Override
978          public String toString() {
979            return fallback.toString();
980          }
981        };
982    return derive(future.catchingAsync(exceptionType, asyncFunction, executor));
983  }
984
985  /**
986   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When the returned
987   * {@link Future} is done, all objects captured for closing during the pipeline's computation will
988   * be closed.
989   *
990   * <p>After calling this method, you may not call {@link
991   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other
992   * derivation method on this {@code ClosingFuture}.
993   *
994   * @return a {@link Future} that represents the final value or exception of the pipeline
995   */
996  public FluentFuture<V> finishToFuture() {
997    if (compareAndUpdateState(OPEN, WILL_CLOSE)) {
998      logger.log(FINER, "will close {0}", this);
999      future.addListener(
1000          new Runnable() {
1001            @Override
1002            public void run() {
1003              checkAndUpdateState(WILL_CLOSE, CLOSING);
1004              close();
1005              checkAndUpdateState(CLOSING, CLOSED);
1006            }
1007          },
1008          directExecutor());
1009    } else {
1010      switch (state.get()) {
1011        case SUBSUMED:
1012          throw new IllegalStateException(
1013              "Cannot call finishToFuture() after deriving another step");
1014
1015        case WILL_CREATE_VALUE_AND_CLOSER:
1016          throw new IllegalStateException(
1017              "Cannot call finishToFuture() after calling finishToValueAndCloser()");
1018
1019        case WILL_CLOSE:
1020        case CLOSING:
1021        case CLOSED:
1022          throw new IllegalStateException("Cannot call finishToFuture() twice");
1023
1024        case OPEN:
1025          throw new AssertionError();
1026      }
1027    }
1028    return future;
1029  }
1030
1031  /**
1032   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done,
1033   * {@code receiver} will be called with an object that contains the result of the operation. The
1034   * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use.
1035   *
1036   * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or
1037   * any other derivation method on this {@code ClosingFuture}.
1038   *
1039   * @param consumer a callback whose method will be called (using {@code executor}) when this
1040   *     operation is done
1041   */
1042  public void finishToValueAndCloser(
1043      final ValueAndCloserConsumer<? super V> consumer, Executor executor) {
1044    checkNotNull(consumer);
1045    if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) {
1046      switch (state.get()) {
1047        case SUBSUMED:
1048          throw new IllegalStateException(
1049              "Cannot call finishToValueAndCloser() after deriving another step");
1050
1051        case WILL_CLOSE:
1052        case CLOSING:
1053        case CLOSED:
1054          throw new IllegalStateException(
1055              "Cannot call finishToValueAndCloser() after calling finishToFuture()");
1056
1057        case WILL_CREATE_VALUE_AND_CLOSER:
1058          throw new IllegalStateException("Cannot call finishToValueAndCloser() twice");
1059
1060        case OPEN:
1061          break;
1062      }
1063      throw new AssertionError(state);
1064    }
1065    future.addListener(
1066        new Runnable() {
1067          @Override
1068          public void run() {
1069            provideValueAndCloser(consumer, ClosingFuture.this);
1070          }
1071        },
1072        executor);
1073  }
1074
1075  private static <C, V extends C> void provideValueAndCloser(
1076      ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) {
1077    consumer.accept(new ValueAndCloser<C>(closingFuture));
1078  }
1079
1080  /**
1081   * Attempts to cancel execution of this step. This attempt will fail if the step has already
1082   * completed, has already been cancelled, or could not be cancelled for some other reason. If
1083   * successful, and this step has not started when {@code cancel} is called, this step should never
1084   * run.
1085   *
1086   * <p>If successful, causes the objects captured by this step (if already started) and its input
1087   * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls
1088   * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously.
1089   *
1090   * @param mayInterruptIfRunning {@code true} if the thread executing this task should be
1091   *     interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be
1092   *     cancelled regardless
1093   * @return {@code false} if the step could not be cancelled, typically because it has already
1094   *     completed normally; {@code true} otherwise
1095   */
1096  @CanIgnoreReturnValue
1097  public boolean cancel(boolean mayInterruptIfRunning) {
1098    logger.log(FINER, "cancelling {0}", this);
1099    boolean cancelled = future.cancel(mayInterruptIfRunning);
1100    if (cancelled) {
1101      close();
1102    }
1103    return cancelled;
1104  }
1105
1106  private void close() {
1107    logger.log(FINER, "closing {0}", this);
1108    closeables.close();
1109  }
1110
1111  private <U> ClosingFuture<U> derive(FluentFuture<U> future) {
1112    ClosingFuture<U> derived = new ClosingFuture<>(future);
1113    becomeSubsumedInto(derived.closeables);
1114    return derived;
1115  }
1116
1117  private void becomeSubsumedInto(CloseableList otherCloseables) {
1118    checkAndUpdateState(OPEN, SUBSUMED);
1119    otherCloseables.add(closeables, directExecutor());
1120  }
1121
1122  /**
1123   * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link
1124   * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}.
1125   *
1126   * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object.
1127   */
1128  public static final class Peeker {
1129    private final ImmutableList<ClosingFuture<?>> futures;
1130    private volatile boolean beingCalled;
1131
1132    private Peeker(ImmutableList<ClosingFuture<?>> futures) {
1133      this.futures = checkNotNull(futures);
1134    }
1135
1136    /**
1137     * Returns the value of {@code closingFuture}.
1138     *
1139     * @throws ExecutionException if {@code closingFuture} is a failed step
1140     * @throws CancellationException if the {@code closingFuture}'s future was cancelled
1141     * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to
1142     *     {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)}
1143     * @throws IllegalStateException if called outside of a call to {@link
1144     *     CombiningCallable#call(DeferredCloser, Peeker)} or {@link
1145     *     AsyncCombiningCallable#call(DeferredCloser, Peeker)}
1146     */
1147    public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture)
1148        throws ExecutionException {
1149      checkState(beingCalled);
1150      checkArgument(futures.contains(closingFuture));
1151      return Futures.getDone(closingFuture.future);
1152    }
1153
1154    private <V extends @Nullable Object> V call(
1155        CombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1156      beingCalled = true;
1157      CloseableList newCloseables = new CloseableList();
1158      try {
1159        return combiner.call(newCloseables.closer, this);
1160      } finally {
1161        closeables.add(newCloseables, directExecutor());
1162        beingCalled = false;
1163      }
1164    }
1165
1166    private <V extends @Nullable Object> FluentFuture<V> callAsync(
1167        AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1168      beingCalled = true;
1169      CloseableList newCloseables = new CloseableList();
1170      try {
1171        ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this);
1172        closingFuture.becomeSubsumedInto(closeables);
1173        return closingFuture.future;
1174      } finally {
1175        closeables.add(newCloseables, directExecutor());
1176        beingCalled = false;
1177      }
1178    }
1179  }
1180
1181  /**
1182   * A builder of a {@link ClosingFuture} step that is derived from more than one input step.
1183   *
1184   * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to
1185   * instantiate this class.
1186   *
1187   * <p>Example:
1188   *
1189   * <pre>{@code
1190   * final ClosingFuture<BufferedReader> file1ReaderFuture = ...;
1191   * final ClosingFuture<BufferedReader> file2ReaderFuture = ...;
1192   * ListenableFuture<Integer> numberOfDifferentLines =
1193   *       ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture)
1194   *           .call(
1195   *               (closer, peeker) -> {
1196   *                 BufferedReader file1Reader = peeker.getDone(file1ReaderFuture);
1197   *                 BufferedReader file2Reader = peeker.getDone(file2ReaderFuture);
1198   *                 return countDifferentLines(file1Reader, file2Reader);
1199   *               },
1200   *               executor)
1201   *           .closing(executor);
1202   * }</pre>
1203   */
1204  // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8.
1205  @com.google.errorprone.annotations.DoNotMock(
1206      "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.")
1207  public static class Combiner {
1208
1209    private final CloseableList closeables = new CloseableList();
1210
1211    /**
1212     * An operation that returns a result and may throw an exception.
1213     *
1214     * @param <V> the type of the result
1215     */
1216    @FunctionalInterface
1217    public interface CombiningCallable<V extends @Nullable Object> {
1218      /**
1219       * Computes a result, or throws an exception if unable to do so.
1220       *
1221       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1222       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1223       * is done (but not before this method completes), even if this method throws or the pipeline
1224       * is cancelled.
1225       *
1226       * @param peeker used to get the value of any of the input futures
1227       */
1228      V call(DeferredCloser closer, Peeker peeker) throws Exception;
1229    }
1230
1231    /**
1232     * An operation that returns a {@link ClosingFuture} result and may throw an exception.
1233     *
1234     * @param <V> the type of the result
1235     */
1236    @FunctionalInterface
1237    public interface AsyncCombiningCallable<V extends @Nullable Object> {
1238      /**
1239       * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so.
1240       *
1241       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1242       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1243       * is done (but not before this method completes), even if this method throws or the pipeline
1244       * is cancelled.
1245       *
1246       * @param peeker used to get the value of any of the input futures
1247       */
1248      ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception;
1249    }
1250
1251    private final boolean allMustSucceed;
1252    protected final ImmutableList<ClosingFuture<?>> inputs;
1253
1254    private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) {
1255      this.allMustSucceed = allMustSucceed;
1256      this.inputs = ImmutableList.copyOf(inputs);
1257      for (ClosingFuture<?> input : inputs) {
1258        input.becomeSubsumedInto(closeables);
1259      }
1260    }
1261
1262    /**
1263     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1264     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1265     * objects to be closed when the pipeline is done.
1266     *
1267     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1268     * fail, so will the returned step.
1269     *
1270     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1271     * cancelled.
1272     *
1273     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1274     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1275     */
1276    public <V> ClosingFuture<V> call(
1277        final CombiningCallable<V> combiningCallable, Executor executor) {
1278      Callable<V> callable =
1279          new Callable<V>() {
1280            @Override
1281            public V call() throws Exception {
1282              return new Peeker(inputs).call(combiningCallable, closeables);
1283            }
1284
1285            @Override
1286            public String toString() {
1287              return combiningCallable.toString();
1288            }
1289          };
1290      ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor));
1291      derived.closeables.add(closeables, directExecutor());
1292      return derived;
1293    }
1294
1295    /**
1296     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1297     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1298     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1299     * captured by the returned {@link ClosingFuture}).
1300     *
1301     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1302     * fail, so will the returned step.
1303     *
1304     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1305     * cancelled.
1306     *
1307     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1308     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1309     *
1310     * <p>If the combiningCallable throws any other exception, it will be used as the failure of the
1311     * derived step.
1312     *
1313     * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture},
1314     * then none of the closeable objects in that {@code ClosingFuture} will be closed.
1315     *
1316     * <p>Usage guidelines for this method:
1317     *
1318     * <ul>
1319     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1320     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1321     *       Executor)} instead, with a function that returns the next value directly.
1322     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1323     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1324     *       capture it for later closing.
1325     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1326     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1327     * </ul>
1328     *
1329     * <p>The same warnings about doing heavyweight operations within {@link
1330     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1331     */
1332    public <V> ClosingFuture<V> callAsync(
1333        final AsyncCombiningCallable<V> combiningCallable, Executor executor) {
1334      AsyncCallable<V> asyncCallable =
1335          new AsyncCallable<V>() {
1336            @Override
1337            public ListenableFuture<V> call() throws Exception {
1338              return new Peeker(inputs).callAsync(combiningCallable, closeables);
1339            }
1340
1341            @Override
1342            public String toString() {
1343              return combiningCallable.toString();
1344            }
1345          };
1346      ClosingFuture<V> derived =
1347          new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor));
1348      derived.closeables.add(closeables, directExecutor());
1349      return derived;
1350    }
1351
1352    private FutureCombiner<Object> futureCombiner() {
1353      return allMustSucceed
1354          ? Futures.whenAllSucceed(inputFutures())
1355          : Futures.whenAllComplete(inputFutures());
1356    }
1357
1358    private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE =
1359        new Function<ClosingFuture<?>, FluentFuture<?>>() {
1360          @Override
1361          public FluentFuture<?> apply(ClosingFuture<?> future) {
1362            return future.future;
1363          }
1364        };
1365
1366    private ImmutableList<FluentFuture<?>> inputFutures() {
1367      return FluentIterable.from(inputs).transform(INNER_FUTURE).toList();
1368    }
1369  }
1370
1371  /**
1372   * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link
1373   * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this
1374   * combination.
1375   *
1376   * @param <V1> the type returned by the first future
1377   * @param <V2> the type returned by the second future
1378   */
1379  public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object>
1380      extends Combiner {
1381
1382    /**
1383     * A function that returns a value when applied to the values of the two futures passed to
1384     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1385     *
1386     * @param <V1> the type returned by the first future
1387     * @param <V2> the type returned by the second future
1388     * @param <U> the type returned by the function
1389     */
1390    @FunctionalInterface
1391    public interface ClosingFunction2<
1392        V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> {
1393
1394      /**
1395       * Applies this function to two inputs, or throws an exception if unable to do so.
1396       *
1397       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1398       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1399       * is done (but not before this method completes), even if this method throws or the pipeline
1400       * is cancelled.
1401       */
1402      U apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception;
1403    }
1404
1405    /**
1406     * A function that returns a {@link ClosingFuture} when applied to the values of the two futures
1407     * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1408     *
1409     * @param <V1> the type returned by the first future
1410     * @param <V2> the type returned by the second future
1411     * @param <U> the type returned by the function
1412     */
1413    @FunctionalInterface
1414    public interface AsyncClosingFunction2<
1415        V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> {
1416
1417      /**
1418       * Applies this function to two inputs, or throws an exception if unable to do so.
1419       *
1420       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1421       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1422       * is done (but not before this method completes), even if this method throws or the pipeline
1423       * is cancelled.
1424       */
1425      ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception;
1426    }
1427
1428    private final ClosingFuture<V1> future1;
1429    private final ClosingFuture<V2> future2;
1430
1431    private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
1432      super(true, ImmutableList.of(future1, future2));
1433      this.future1 = future1;
1434      this.future2 = future2;
1435    }
1436
1437    /**
1438     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1439     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1440     * objects to be closed when the pipeline is done.
1441     *
1442     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1443     * any of the inputs fail, so will the returned step.
1444     *
1445     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1446     *
1447     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1448     * ExecutionException} will be extracted and used as the failure of the derived step.
1449     */
1450    public <U extends @Nullable Object> ClosingFuture<U> call(
1451        final ClosingFunction2<V1, V2, U> function, Executor executor) {
1452      return call(
1453          new CombiningCallable<U>() {
1454            @Override
1455            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1456              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1457            }
1458
1459            @Override
1460            public String toString() {
1461              return function.toString();
1462            }
1463          },
1464          executor);
1465    }
1466
1467    /**
1468     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1469     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1470     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1471     * captured by the returned {@link ClosingFuture}).
1472     *
1473     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1474     * any of the inputs fail, so will the returned step.
1475     *
1476     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1477     *
1478     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1479     * ExecutionException} will be extracted and used as the failure of the derived step.
1480     *
1481     * <p>If the function throws any other exception, it will be used as the failure of the derived
1482     * step.
1483     *
1484     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1485     * the closeable objects in that {@code ClosingFuture} will be closed.
1486     *
1487     * <p>Usage guidelines for this method:
1488     *
1489     * <ul>
1490     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1491     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1492     *       Executor)} instead, with a function that returns the next value directly.
1493     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1494     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1495     *       capture it for later closing.
1496     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1497     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1498     * </ul>
1499     *
1500     * <p>The same warnings about doing heavyweight operations within {@link
1501     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1502     */
1503    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1504        final AsyncClosingFunction2<V1, V2, U> function, Executor executor) {
1505      return callAsync(
1506          new AsyncCombiningCallable<U>() {
1507            @Override
1508            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1509              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1510            }
1511
1512            @Override
1513            public String toString() {
1514              return function.toString();
1515            }
1516          },
1517          executor);
1518    }
1519  }
1520
1521  /**
1522   * A generic {@link Combiner} that lets you use a lambda or method reference to combine three
1523   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1524   * ClosingFuture)} to start this combination.
1525   *
1526   * @param <V1> the type returned by the first future
1527   * @param <V2> the type returned by the second future
1528   * @param <V3> the type returned by the third future
1529   */
1530  public static final class Combiner3<
1531          V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object>
1532      extends Combiner {
1533    /**
1534     * A function that returns a value when applied to the values of the three futures passed to
1535     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1536     *
1537     * @param <V1> the type returned by the first future
1538     * @param <V2> the type returned by the second future
1539     * @param <V3> the type returned by the third future
1540     * @param <U> the type returned by the function
1541     */
1542    @FunctionalInterface
1543    public interface ClosingFunction3<
1544        V1 extends @Nullable Object,
1545        V2 extends @Nullable Object,
1546        V3 extends @Nullable Object,
1547        U extends @Nullable Object> {
1548      /**
1549       * Applies this function to three inputs, or throws an exception if unable to do so.
1550       *
1551       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1552       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1553       * is done (but not before this method completes), even if this method throws or the pipeline
1554       * is cancelled.
1555       */
1556      U apply(DeferredCloser closer, V1 value1, V2 value2, V3 v3) throws Exception;
1557    }
1558
1559    /**
1560     * A function that returns a {@link ClosingFuture} when applied to the values of the three
1561     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1562     *
1563     * @param <V1> the type returned by the first future
1564     * @param <V2> the type returned by the second future
1565     * @param <V3> the type returned by the third future
1566     * @param <U> the type returned by the function
1567     */
1568    @FunctionalInterface
1569    public interface AsyncClosingFunction3<
1570        V1 extends @Nullable Object,
1571        V2 extends @Nullable Object,
1572        V3 extends @Nullable Object,
1573        U extends @Nullable Object> {
1574      /**
1575       * Applies this function to three inputs, or throws an exception if unable to do so.
1576       *
1577       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1578       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1579       * is done (but not before this method completes), even if this method throws or the pipeline
1580       * is cancelled.
1581       */
1582      ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3)
1583          throws Exception;
1584    }
1585
1586    private final ClosingFuture<V1> future1;
1587    private final ClosingFuture<V2> future2;
1588    private final ClosingFuture<V3> future3;
1589
1590    private Combiner3(
1591        ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
1592      super(true, ImmutableList.of(future1, future2, future3));
1593      this.future1 = future1;
1594      this.future2 = future2;
1595      this.future3 = future3;
1596    }
1597
1598    /**
1599     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1600     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1601     * objects to be closed when the pipeline is done.
1602     *
1603     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1604     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1605     *
1606     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1607     *
1608     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1609     * ExecutionException} will be extracted and used as the failure of the derived step.
1610     */
1611    public <U extends @Nullable Object> ClosingFuture<U> call(
1612        final ClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1613      return call(
1614          new CombiningCallable<U>() {
1615            @Override
1616            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1617              return function.apply(
1618                  closer,
1619                  peeker.getDone(future1),
1620                  peeker.getDone(future2),
1621                  peeker.getDone(future3));
1622            }
1623
1624            @Override
1625            public String toString() {
1626              return function.toString();
1627            }
1628          },
1629          executor);
1630    }
1631
1632    /**
1633     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1634     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1635     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1636     * captured by the returned {@link ClosingFuture}).
1637     *
1638     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1639     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1640     *
1641     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1642     *
1643     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1644     * ExecutionException} will be extracted and used as the failure of the derived step.
1645     *
1646     * <p>If the function throws any other exception, it will be used as the failure of the derived
1647     * step.
1648     *
1649     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1650     * the closeable objects in that {@code ClosingFuture} will be closed.
1651     *
1652     * <p>Usage guidelines for this method:
1653     *
1654     * <ul>
1655     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1656     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1657     *       Executor)} instead, with a function that returns the next value directly.
1658     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1659     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1660     *       capture it for later closing.
1661     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1662     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1663     * </ul>
1664     *
1665     * <p>The same warnings about doing heavyweight operations within {@link
1666     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1667     */
1668    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1669        final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1670      return callAsync(
1671          new AsyncCombiningCallable<U>() {
1672            @Override
1673            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1674              return function.apply(
1675                  closer,
1676                  peeker.getDone(future1),
1677                  peeker.getDone(future2),
1678                  peeker.getDone(future3));
1679            }
1680
1681            @Override
1682            public String toString() {
1683              return function.toString();
1684            }
1685          },
1686          executor);
1687    }
1688  }
1689
1690  /**
1691   * A generic {@link Combiner} that lets you use a lambda or method reference to combine four
1692   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1693   * ClosingFuture)} to start this combination.
1694   *
1695   * @param <V1> the type returned by the first future
1696   * @param <V2> the type returned by the second future
1697   * @param <V3> the type returned by the third future
1698   * @param <V4> the type returned by the fourth future
1699   */
1700  public static final class Combiner4<
1701          V1 extends @Nullable Object,
1702          V2 extends @Nullable Object,
1703          V3 extends @Nullable Object,
1704          V4 extends @Nullable Object>
1705      extends Combiner {
1706    /**
1707     * A function that returns a value when applied to the values of the four futures passed to
1708     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}.
1709     *
1710     * @param <V1> the type returned by the first future
1711     * @param <V2> the type returned by the second future
1712     * @param <V3> the type returned by the third future
1713     * @param <V4> the type returned by the fourth future
1714     * @param <U> the type returned by the function
1715     */
1716    @FunctionalInterface
1717    public interface ClosingFunction4<
1718        V1 extends @Nullable Object,
1719        V2 extends @Nullable Object,
1720        V3 extends @Nullable Object,
1721        V4 extends @Nullable Object,
1722        U extends @Nullable Object> {
1723      /**
1724       * Applies this function to four inputs, or throws an exception if unable to do so.
1725       *
1726       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1727       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1728       * is done (but not before this method completes), even if this method throws or the pipeline
1729       * is cancelled.
1730       */
1731      U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4) throws Exception;
1732    }
1733
1734    /**
1735     * A function that returns a {@link ClosingFuture} when applied to the values of the four
1736     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1737     * ClosingFuture)}.
1738     *
1739     * @param <V1> the type returned by the first future
1740     * @param <V2> the type returned by the second future
1741     * @param <V3> the type returned by the third future
1742     * @param <V4> the type returned by the fourth future
1743     * @param <U> the type returned by the function
1744     */
1745    @FunctionalInterface
1746    public interface AsyncClosingFunction4<
1747        V1 extends @Nullable Object,
1748        V2 extends @Nullable Object,
1749        V3 extends @Nullable Object,
1750        V4 extends @Nullable Object,
1751        U extends @Nullable Object> {
1752      /**
1753       * Applies this function to four inputs, or throws an exception if unable to do so.
1754       *
1755       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1756       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1757       * is done (but not before this method completes), even if this method throws or the pipeline
1758       * is cancelled.
1759       */
1760      ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4)
1761          throws Exception;
1762    }
1763
1764    private final ClosingFuture<V1> future1;
1765    private final ClosingFuture<V2> future2;
1766    private final ClosingFuture<V3> future3;
1767    private final ClosingFuture<V4> future4;
1768
1769    private Combiner4(
1770        ClosingFuture<V1> future1,
1771        ClosingFuture<V2> future2,
1772        ClosingFuture<V3> future3,
1773        ClosingFuture<V4> future4) {
1774      super(true, ImmutableList.of(future1, future2, future3, future4));
1775      this.future1 = future1;
1776      this.future2 = future2;
1777      this.future3 = future3;
1778      this.future4 = future4;
1779    }
1780
1781    /**
1782     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1783     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1784     * objects to be closed when the pipeline is done.
1785     *
1786     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1787     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1788     *
1789     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1790     *
1791     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1792     * ExecutionException} will be extracted and used as the failure of the derived step.
1793     */
1794    public <U extends @Nullable Object> ClosingFuture<U> call(
1795        final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1796      return call(
1797          new CombiningCallable<U>() {
1798            @Override
1799            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1800              return function.apply(
1801                  closer,
1802                  peeker.getDone(future1),
1803                  peeker.getDone(future2),
1804                  peeker.getDone(future3),
1805                  peeker.getDone(future4));
1806            }
1807
1808            @Override
1809            public String toString() {
1810              return function.toString();
1811            }
1812          },
1813          executor);
1814    }
1815
1816    /**
1817     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1818     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1819     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1820     * captured by the returned {@link ClosingFuture}).
1821     *
1822     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1823     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1824     *
1825     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1826     *
1827     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1828     * ExecutionException} will be extracted and used as the failure of the derived step.
1829     *
1830     * <p>If the function throws any other exception, it will be used as the failure of the derived
1831     * step.
1832     *
1833     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1834     * the closeable objects in that {@code ClosingFuture} will be closed.
1835     *
1836     * <p>Usage guidelines for this method:
1837     *
1838     * <ul>
1839     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1840     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1841     *       Executor)} instead, with a function that returns the next value directly.
1842     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1843     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1844     *       capture it for later closing.
1845     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1846     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1847     * </ul>
1848     *
1849     * <p>The same warnings about doing heavyweight operations within {@link
1850     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1851     */
1852    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1853        final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1854      return callAsync(
1855          new AsyncCombiningCallable<U>() {
1856            @Override
1857            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1858              return function.apply(
1859                  closer,
1860                  peeker.getDone(future1),
1861                  peeker.getDone(future2),
1862                  peeker.getDone(future3),
1863                  peeker.getDone(future4));
1864            }
1865
1866            @Override
1867            public String toString() {
1868              return function.toString();
1869            }
1870          },
1871          executor);
1872    }
1873  }
1874
1875  /**
1876   * A generic {@link Combiner} that lets you use a lambda or method reference to combine five
1877   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1878   * ClosingFuture, ClosingFuture)} to start this combination.
1879   *
1880   * @param <V1> the type returned by the first future
1881   * @param <V2> the type returned by the second future
1882   * @param <V3> the type returned by the third future
1883   * @param <V4> the type returned by the fourth future
1884   * @param <V5> the type returned by the fifth future
1885   */
1886  public static final class Combiner5<
1887          V1 extends @Nullable Object,
1888          V2 extends @Nullable Object,
1889          V3 extends @Nullable Object,
1890          V4 extends @Nullable Object,
1891          V5 extends @Nullable Object>
1892      extends Combiner {
1893    /**
1894     * A function that returns a value when applied to the values of the five futures passed to
1895     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture,
1896     * ClosingFuture)}.
1897     *
1898     * @param <V1> the type returned by the first future
1899     * @param <V2> the type returned by the second future
1900     * @param <V3> the type returned by the third future
1901     * @param <V4> the type returned by the fourth future
1902     * @param <V5> the type returned by the fifth future
1903     * @param <U> the type returned by the function
1904     */
1905    @FunctionalInterface
1906    public interface ClosingFunction5<
1907        V1 extends @Nullable Object,
1908        V2 extends @Nullable Object,
1909        V3 extends @Nullable Object,
1910        V4 extends @Nullable Object,
1911        V5 extends @Nullable Object,
1912        U extends @Nullable Object> {
1913      /**
1914       * Applies this function to five inputs, or throws an exception if unable to do so.
1915       *
1916       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1917       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1918       * is done (but not before this method completes), even if this method throws or the pipeline
1919       * is cancelled.
1920       */
1921      U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5)
1922          throws Exception;
1923    }
1924
1925    /**
1926     * A function that returns a {@link ClosingFuture} when applied to the values of the five
1927     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1928     * ClosingFuture, ClosingFuture)}.
1929     *
1930     * @param <V1> the type returned by the first future
1931     * @param <V2> the type returned by the second future
1932     * @param <V3> the type returned by the third future
1933     * @param <V4> the type returned by the fourth future
1934     * @param <V5> the type returned by the fifth future
1935     * @param <U> the type returned by the function
1936     */
1937    @FunctionalInterface
1938    public interface AsyncClosingFunction5<
1939        V1 extends @Nullable Object,
1940        V2 extends @Nullable Object,
1941        V3 extends @Nullable Object,
1942        V4 extends @Nullable Object,
1943        V5 extends @Nullable Object,
1944        U extends @Nullable Object> {
1945      /**
1946       * Applies this function to five inputs, or throws an exception if unable to do so.
1947       *
1948       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1949       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1950       * is done (but not before this method completes), even if this method throws or the pipeline
1951       * is cancelled.
1952       */
1953      ClosingFuture<U> apply(
1954          DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5)
1955          throws Exception;
1956    }
1957
1958    private final ClosingFuture<V1> future1;
1959    private final ClosingFuture<V2> future2;
1960    private final ClosingFuture<V3> future3;
1961    private final ClosingFuture<V4> future4;
1962    private final ClosingFuture<V5> future5;
1963
1964    private Combiner5(
1965        ClosingFuture<V1> future1,
1966        ClosingFuture<V2> future2,
1967        ClosingFuture<V3> future3,
1968        ClosingFuture<V4> future4,
1969        ClosingFuture<V5> future5) {
1970      super(true, ImmutableList.of(future1, future2, future3, future4, future5));
1971      this.future1 = future1;
1972      this.future2 = future2;
1973      this.future3 = future3;
1974      this.future4 = future4;
1975      this.future5 = future5;
1976    }
1977
1978    /**
1979     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1980     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1981     * objects to be closed when the pipeline is done.
1982     *
1983     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1984     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
1985     * returned step.
1986     *
1987     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1988     *
1989     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1990     * ExecutionException} will be extracted and used as the failure of the derived step.
1991     */
1992    public <U extends @Nullable Object> ClosingFuture<U> call(
1993        final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
1994      return call(
1995          new CombiningCallable<U>() {
1996            @Override
1997            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1998              return function.apply(
1999                  closer,
2000                  peeker.getDone(future1),
2001                  peeker.getDone(future2),
2002                  peeker.getDone(future3),
2003                  peeker.getDone(future4),
2004                  peeker.getDone(future5));
2005            }
2006
2007            @Override
2008            public String toString() {
2009              return function.toString();
2010            }
2011          },
2012          executor);
2013    }
2014
2015    /**
2016     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2017     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
2018     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
2019     * captured by the returned {@link ClosingFuture}).
2020     *
2021     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2022     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2023     * returned step.
2024     *
2025     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2026     *
2027     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2028     * ExecutionException} will be extracted and used as the failure of the derived step.
2029     *
2030     * <p>If the function throws any other exception, it will be used as the failure of the derived
2031     * step.
2032     *
2033     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
2034     * the closeable objects in that {@code ClosingFuture} will be closed.
2035     *
2036     * <p>Usage guidelines for this method:
2037     *
2038     * <ul>
2039     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
2040     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
2041     *       Executor)} instead, with a function that returns the next value directly.
2042     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
2043     *       closer.eventuallyClose()} for every closeable object this step creates in order to
2044     *       capture it for later closing.
2045     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
2046     *       ClosingFuture} call {@link #from(ListenableFuture)}.
2047     * </ul>
2048     *
2049     * <p>The same warnings about doing heavyweight operations within {@link
2050     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
2051     */
2052    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
2053        final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2054      return callAsync(
2055          new AsyncCombiningCallable<U>() {
2056            @Override
2057            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
2058              return function.apply(
2059                  closer,
2060                  peeker.getDone(future1),
2061                  peeker.getDone(future2),
2062                  peeker.getDone(future3),
2063                  peeker.getDone(future4),
2064                  peeker.getDone(future5));
2065            }
2066
2067            @Override
2068            public String toString() {
2069              return function.toString();
2070            }
2071          },
2072          executor);
2073    }
2074  }
2075
2076  @Override
2077  public String toString() {
2078    // TODO(dpb): Better toString, in the style of Futures.transform etc.
2079    return toStringHelper(this).add("state", state.get()).addValue(future).toString();
2080  }
2081
2082  @Override
2083  protected void finalize() {
2084    if (state.get().equals(OPEN)) {
2085      logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this);
2086      FluentFuture<V> unused = finishToFuture();
2087    }
2088  }
2089
2090  private static void closeQuietly(final AutoCloseable closeable, Executor executor) {
2091    if (closeable == null) {
2092      return;
2093    }
2094    try {
2095      executor.execute(
2096          new Runnable() {
2097            @Override
2098            public void run() {
2099              try {
2100                closeable.close();
2101              } catch (Exception e) {
2102                logger.log(WARNING, "thrown by close()", e);
2103              }
2104            }
2105          });
2106    } catch (RejectedExecutionException e) {
2107      if (logger.isLoggable(WARNING)) {
2108        logger.log(
2109            WARNING, String.format("while submitting close to %s; will close inline", executor), e);
2110      }
2111      closeQuietly(closeable, directExecutor());
2112    }
2113  }
2114
2115  private void checkAndUpdateState(State oldState, State newState) {
2116    checkState(
2117        compareAndUpdateState(oldState, newState),
2118        "Expected state to be %s, but it was %s",
2119        oldState,
2120        newState);
2121  }
2122
2123  private boolean compareAndUpdateState(State oldState, State newState) {
2124    return state.compareAndSet(oldState, newState);
2125  }
2126
2127  // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap?
2128  private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor>
2129      implements Closeable {
2130    private final DeferredCloser closer = new DeferredCloser(this);
2131    private volatile boolean closed;
2132    private volatile CountDownLatch whenClosed;
2133
2134    <V, U> ListenableFuture<U> applyClosingFunction(
2135        ClosingFunction<? super V, U> transformation, V input) throws Exception {
2136      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2137      CloseableList newCloseables = new CloseableList();
2138      try {
2139        return immediateFuture(transformation.apply(newCloseables.closer, input));
2140      } finally {
2141        add(newCloseables, directExecutor());
2142      }
2143    }
2144
2145    <V, U> FluentFuture<U> applyAsyncClosingFunction(
2146        AsyncClosingFunction<V, U> transformation, V input) throws Exception {
2147      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2148      CloseableList newCloseables = new CloseableList();
2149      try {
2150        ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input);
2151        closingFuture.becomeSubsumedInto(newCloseables);
2152        return closingFuture.future;
2153      } finally {
2154        add(newCloseables, directExecutor());
2155      }
2156    }
2157
2158    @Override
2159    public void close() {
2160      if (closed) {
2161        return;
2162      }
2163      synchronized (this) {
2164        if (closed) {
2165          return;
2166        }
2167        closed = true;
2168      }
2169      for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) {
2170        closeQuietly(entry.getKey(), entry.getValue());
2171      }
2172      clear();
2173      if (whenClosed != null) {
2174        whenClosed.countDown();
2175      }
2176    }
2177
2178    void add(@Nullable AutoCloseable closeable, Executor executor) {
2179      checkNotNull(executor);
2180      if (closeable == null) {
2181        return;
2182      }
2183      synchronized (this) {
2184        if (!closed) {
2185          put(closeable, executor);
2186          return;
2187        }
2188      }
2189      closeQuietly(closeable, executor);
2190    }
2191
2192    /**
2193     * Returns a latch that reaches zero when this objects' deferred closeables have been closed.
2194     */
2195    CountDownLatch whenClosedCountDown() {
2196      if (closed) {
2197        return new CountDownLatch(0);
2198      }
2199      synchronized (this) {
2200        if (closed) {
2201          return new CountDownLatch(0);
2202        }
2203        checkState(whenClosed == null);
2204        return whenClosed = new CountDownLatch(1);
2205      }
2206    }
2207  }
2208
2209  /**
2210   * Returns an object that can be used to wait until this objects' deferred closeables have all had
2211   * {@link Runnable}s that close them submitted to each one's closing {@link Executor}.
2212   */
2213  @VisibleForTesting
2214  CountDownLatch whenClosedCountDown() {
2215    return closeables.whenClosedCountDown();
2216  }
2217
2218  /** The state of a {@link CloseableList}. */
2219  enum State {
2220    /** The {@link CloseableList} has not been subsumed or closed. */
2221    OPEN,
2222
2223    /**
2224     * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed
2225     * into any other.
2226     */
2227    SUBSUMED,
2228
2229    /**
2230     * Some {@link ListenableFuture} has a callback attached that will close the {@link
2231     * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed.
2232     */
2233    WILL_CLOSE,
2234
2235    /**
2236     * The callback that closes the {@link CloseableList} is running, but it has not completed. The
2237     * {@link CloseableList} may not be subsumed.
2238     */
2239    CLOSING,
2240
2241    /** The {@link CloseableList} has been closed. It may not be further subsumed. */
2242    CLOSED,
2243
2244    /**
2245     * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been
2246     * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called.
2247     */
2248    WILL_CREATE_VALUE_AND_CLOSER,
2249  }
2250}