001/*
002 * Copyright (C) 2017 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Functions.constant;
020import static com.google.common.base.MoreObjects.toStringHelper;
021import static com.google.common.base.Preconditions.checkArgument;
022import static com.google.common.base.Preconditions.checkNotNull;
023import static com.google.common.base.Preconditions.checkState;
024import static com.google.common.collect.Lists.asList;
025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED;
026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING;
027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN;
028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED;
029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE;
030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER;
031import static com.google.common.util.concurrent.Futures.getDone;
032import static com.google.common.util.concurrent.Futures.immediateFuture;
033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
034import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
035import static java.util.logging.Level.FINER;
036import static java.util.logging.Level.SEVERE;
037import static java.util.logging.Level.WARNING;
038
039import com.google.common.annotations.Beta;
040import com.google.common.annotations.VisibleForTesting;
041import com.google.common.base.Function;
042import com.google.common.collect.FluentIterable;
043import com.google.common.collect.ImmutableList;
044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
046import com.google.common.util.concurrent.Futures.FutureCombiner;
047import com.google.errorprone.annotations.CanIgnoreReturnValue;
048import com.google.errorprone.annotations.DoNotMock;
049import com.google.j2objc.annotations.RetainedWith;
050import java.io.Closeable;
051import java.io.IOException;
052import java.util.IdentityHashMap;
053import java.util.Map;
054import java.util.concurrent.Callable;
055import java.util.concurrent.CancellationException;
056import java.util.concurrent.CountDownLatch;
057import java.util.concurrent.ExecutionException;
058import java.util.concurrent.Executor;
059import java.util.concurrent.Future;
060import java.util.concurrent.RejectedExecutionException;
061import java.util.concurrent.atomic.AtomicReference;
062import java.util.logging.Logger;
063import org.checkerframework.checker.nullness.compatqual.NullableDecl;
064
065/**
066 * A step in a pipeline of an asynchronous computation. When the last step in the computation is
067 * complete, some objects captured during the computation are closed.
068 *
069 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an
070 * asynchronously-computed intermediate value, or else an exception that indicates the failure or
071 * cancellation of the operation so far. The only way to extract the value or exception from a step
072 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the
073 * "value" of a successful step or the "result" (value or exception) of any step.
074 *
075 * <ol>
076 *   <li>A pipeline starts at its leaf step (or steps), which is created from either a callable
077 *       block or a {@link ListenableFuture}.
078 *   <li>Each other step is derived from one or more input steps. At each step, zero or more objects
079 *       can be captured for later closing.
080 *   <li>There is one last step (the root of the tree), from which you can extract the final result
081 *       of the computation. After that result is available (or the computation fails), all objects
082 *       captured by any of the steps in the pipeline are closed.
083 * </ol>
084 *
085 * <h3>Starting a pipeline</h3>
086 *
087 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a
088 * callable block} that may capture objects for later closing. To start a pipeline from a {@link
089 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link
090 * #from(ListenableFuture)} instead.
091 *
092 * <h3>Derived steps</h3>
093 *
094 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in
095 * ways similar to {@link FluentFuture}s:
096 *
097 * <ul>
098 *   <li>by transforming the value from a successful input step,
099 *   <li>by catching the exception from a failed input step, or
100 *   <li>by combining the results of several input steps.
101 * </ul>
102 *
103 * Each derivation can capture the next value or any intermediate objects for later closing.
104 *
105 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its
106 * exception, or combine it with others, you cannot do anything else with it, including declare it
107 * to be the last step of the pipeline.
108 *
109 * <h4>Transforming</h4>
110 *
111 * To derive the next step by asynchronously applying a function to an input step's value, call
112 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction,
113 * Executor)} on the input step.
114 *
115 * <h4>Catching</h4>
116 *
117 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction,
118 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step.
119 *
120 * <h4>Combining</h4>
121 *
122 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link
123 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads.
124 *
125 * <h3>Cancelling</h3>
126 *
127 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step
128 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a
129 * successfully cancelled step will immediately start closing all objects captured for later closing
130 * by it and by its input steps.
131 *
132 * <h3>Ending a pipeline</h3>
133 *
134 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to
135 * close the captured objects automatically or manually.
136 *
137 * <h4>Automatically closing</h4>
138 *
139 * You can extract a {@link Future} that represents the result of the last step in the pipeline by
140 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin
141 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future
142 * completes before closing starts, rather than once it has finished.
143 *
144 * <pre>{@code
145 * FluentFuture<UserName> userName =
146 *     ClosingFuture.submit(
147 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
148 *             executor)
149 *         .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
150 *         .transform((closer, result) -> result.get("userName"), directExecutor())
151 *         .catching(DBException.class, e -> "no user", directExecutor())
152 *         .finishToFuture();
153 * }</pre>
154 *
155 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query
156 * result cursor will both be closed, even if the operation is cancelled or fails.
157 *
158 * <h4>Manually closing</h4>
159 *
160 * If you want to close the captured objects manually, after you've used the final result, call
161 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the
162 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects.
163 *
164 * <pre>{@code
165 *     ClosingFuture.submit(
166 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
167 *             executor)
168 *     .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
169 *     .transform((closer, result) -> result.get("userName"), directExecutor())
170 *     .catching(DBException.class, e -> "no user", directExecutor())
171 *     .finishToValueAndCloser(
172 *         valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor);
173 *
174 * // later
175 * try { // get() will throw if the operation failed or was cancelled.
176 *   UserName userName = userNameValueAndCloser.get();
177 *   // do something with userName
178 * } finally {
179 *   userNameValueAndCloser.closeAsync();
180 * }
181 * }</pre>
182 *
183 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and
184 * the query result cursor will both be closed, even if the operation is cancelled or fails.
185 *
186 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The
187 * automatic-closing approach described above is safer.
188 *
189 * @param <V> the type of the value of this step
190 * @since 30.0
191 */
192// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations.
193@Beta // @Beta for one release.
194@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)")
195// TODO(dpb): GWT compatibility.
196public final class ClosingFuture<V> {
197
198  private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName());
199
200  /**
201   * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is
202   * done.
203   */
204  public static final class DeferredCloser {
205    @RetainedWith private final CloseableList list;
206
207    DeferredCloser(CloseableList list) {
208      this.list = list;
209    }
210
211    /**
212     * Captures an object to be closed when a {@link ClosingFuture} pipeline is done.
213     *
214     * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code
215     * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code
216     * Closeable}. (For more about the flavors, see <a
217     * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your
218     * build</a>.)
219     *
220     * <p>Be careful when targeting an older SDK than you are building against (most commonly when
221     * building for Android): Ensure that any object you pass implements the interface not just in
222     * your current SDK version but also at the oldest version you support. For example, <a
223     * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version
224     * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper
225     * {@code Closeable} with a method reference like {@code cursor::close}.
226     *
227     * <p>Note that this method is still binary-compatible between flavors because the erasure of
228     * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}.
229     *
230     * @param closeable the object to be closed (see notes above)
231     * @param closingExecutor the object will be closed on this executor
232     * @return the first argument
233     */
234    @CanIgnoreReturnValue
235    @NullableDecl
236    // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19.
237    public <C extends Object & Closeable> C eventuallyClose(
238        @NullableDecl C closeable, Executor closingExecutor) {
239      checkNotNull(closingExecutor);
240      if (closeable != null) {
241        list.add(closeable, closingExecutor);
242      }
243      return closeable;
244    }
245  }
246
247  /**
248   * An operation that computes a result.
249   *
250   * @param <V> the type of the result
251   */
252  public interface ClosingCallable<V extends Object> {
253    /**
254     * Computes a result, or throws an exception if unable to do so.
255     *
256     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
257     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
258     * not before this method completes), even if this method throws or the pipeline is cancelled.
259     */
260    @NullableDecl
261    V call(DeferredCloser closer) throws Exception;
262  }
263
264  /**
265   * An operation that computes a {@link ClosingFuture} of a result.
266   *
267   * @param <V> the type of the result
268   * @since 30.1
269   */
270  public interface AsyncClosingCallable<V extends Object> {
271    /**
272     * Computes a result, or throws an exception if unable to do so.
273     *
274     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
275     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
276     * not before this method completes), even if this method throws or the pipeline is cancelled.
277     */
278    ClosingFuture<V> call(DeferredCloser closer) throws Exception;
279  }
280
281  /**
282   * A function from an input to a result.
283   *
284   * @param <T> the type of the input to the function
285   * @param <U> the type of the result of the function
286   */
287  public interface ClosingFunction<T extends Object, U extends Object> {
288
289    /**
290     * Applies this function to an input, or throws an exception if unable to do so.
291     *
292     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
293     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
294     * not before this method completes), even if this method throws or the pipeline is cancelled.
295     */
296    @NullableDecl
297    U apply(DeferredCloser closer, @NullableDecl T input) throws Exception;
298  }
299
300  /**
301   * A function from an input to a {@link ClosingFuture} of a result.
302   *
303   * @param <T> the type of the input to the function
304   * @param <U> the type of the result of the function
305   */
306  public interface AsyncClosingFunction<T extends Object, U extends Object> {
307    /**
308     * Applies this function to an input, or throws an exception if unable to do so.
309     *
310     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
311     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
312     * not before this method completes), even if this method throws or the pipeline is cancelled.
313     */
314    ClosingFuture<U> apply(DeferredCloser closer, @NullableDecl T input) throws Exception;
315  }
316
317  /**
318   * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and
319   * allows the user to close all the closeable objects that were captured during it for later
320   * closing.
321   *
322   * <p>The asynchronous operation will have completed before this object is created.
323   *
324   * @param <V> the type of the value of a successful operation
325   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
326   */
327  public static final class ValueAndCloser<V> {
328
329    private final ClosingFuture<? extends V> closingFuture;
330
331    ValueAndCloser(ClosingFuture<? extends V> closingFuture) {
332      this.closingFuture = checkNotNull(closingFuture);
333    }
334
335    /**
336     * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as
337     * {@link Future#get()} would.
338     *
339     * <p>Because the asynchronous operation has already completed, this method is synchronous and
340     * returns immediately.
341     *
342     * @throws CancellationException if the computation was cancelled
343     * @throws ExecutionException if the computation threw an exception
344     */
345    @NullableDecl
346    public V get() throws ExecutionException {
347      return getDone(closingFuture.future);
348    }
349
350    /**
351     * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous
352     * operation on the {@link Executor}s specified by calls to {@link
353     * DeferredCloser#eventuallyClose(Closeable, Executor)}.
354     *
355     * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be
356     * closed synchronously.
357     *
358     * <p>Idempotent: objects will be closed at most once.
359     */
360    public void closeAsync() {
361      closingFuture.close();
362    }
363  }
364
365  /**
366   * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link
367   * ClosingFuture} pipeline.
368   *
369   * @param <V> the type of the final value of a successful pipeline
370   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
371   */
372  public interface ValueAndCloserConsumer<V> {
373
374    /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */
375    void accept(ValueAndCloser<V> valueAndCloser);
376  }
377
378  /**
379   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
380   *
381   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
382   *     execution
383   */
384  public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) {
385    return new ClosingFuture<>(callable, executor);
386  }
387
388  /**
389   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
390   *
391   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
392   *     execution
393   * @since 30.1
394   */
395  public static <V> ClosingFuture<V> submitAsync(
396      AsyncClosingCallable<V> callable, Executor executor) {
397    return new ClosingFuture<>(callable, executor);
398  }
399
400  /**
401   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
402   *
403   * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V}
404   * implements {@link Closeable}. In order to start a pipeline with a value that will be closed
405   * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead.
406   */
407  public static <V> ClosingFuture<V> from(ListenableFuture<V> future) {
408    return new ClosingFuture<V>(future);
409  }
410
411  /**
412   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
413   *
414   * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when
415   * the pipeline is done, even if the pipeline is canceled or fails.
416   *
417   * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its
418   * value in order to close it.
419   *
420   * @param future the future to create the {@code ClosingFuture} from. For discussion of the
421   *     future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable,
422   *     Executor)}.
423   * @param closingExecutor the future's result will be closed on this executor
424   * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the
425   *     underlying value may never be closed if the {@link Future} is canceled after its operation
426   *     begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types,
427   *     including those that pass them to this method, with {@link #submit(ClosingCallable,
428   *     Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a
429   *     {@link ListenableFuture} that doesn't create values that should be closed, use {@link
430   *     ClosingFuture#from}.
431   */
432  @Deprecated
433  // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19.
434  public static <C extends Object & Closeable> ClosingFuture<C> eventuallyClosing(
435      ListenableFuture<C> future, final Executor closingExecutor) {
436    checkNotNull(closingExecutor);
437    final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future));
438    Futures.addCallback(
439        future,
440        new FutureCallback<Closeable>() {
441          @Override
442          public void onSuccess(@NullableDecl Closeable result) {
443            closingFuture.closeables.closer.eventuallyClose(result, closingExecutor);
444          }
445
446          @Override
447          public void onFailure(Throwable t) {}
448        },
449        directExecutor());
450    return closingFuture;
451  }
452
453  /**
454   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
455   *
456   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
457   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
458   */
459  public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) {
460    return new Combiner(false, futures);
461  }
462
463  /**
464   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
465   *
466   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
467   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
468   */
469  public static Combiner whenAllComplete(
470      ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) {
471    return whenAllComplete(asList(future1, moreFutures));
472  }
473
474  /**
475   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
476   * all succeed. If any fail, the resulting pipeline will fail.
477   *
478   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
479   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
480   */
481  public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) {
482    return new Combiner(true, futures);
483  }
484
485  /**
486   * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming
487   * they all succeed. If any fail, the resulting pipeline will fail.
488   *
489   * <p>Calling this method allows you to use lambdas or method references typed with the types of
490   * the input {@link ClosingFuture}s.
491   *
492   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
493   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
494   */
495  public static <V1, V2> Combiner2<V1, V2> whenAllSucceed(
496      ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
497    return new Combiner2<>(future1, future2);
498  }
499
500  /**
501   * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming
502   * they all succeed. If any fail, the resulting pipeline will fail.
503   *
504   * <p>Calling this method allows you to use lambdas or method references typed with the types of
505   * the input {@link ClosingFuture}s.
506   *
507   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
508   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
509   */
510  public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed(
511      ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
512    return new Combiner3<>(future1, future2, future3);
513  }
514
515  /**
516   * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming
517   * they all succeed. If any fail, the resulting pipeline will fail.
518   *
519   * <p>Calling this method allows you to use lambdas or method references typed with the types of
520   * the input {@link ClosingFuture}s.
521   *
522   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
523   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
524   */
525  public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed(
526      ClosingFuture<V1> future1,
527      ClosingFuture<V2> future2,
528      ClosingFuture<V3> future3,
529      ClosingFuture<V4> future4) {
530    return new Combiner4<>(future1, future2, future3, future4);
531  }
532
533  /**
534   * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming
535   * they all succeed. If any fail, the resulting pipeline will fail.
536   *
537   * <p>Calling this method allows you to use lambdas or method references typed with the types of
538   * the input {@link ClosingFuture}s.
539   *
540   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
541   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
542   */
543  public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed(
544      ClosingFuture<V1> future1,
545      ClosingFuture<V2> future2,
546      ClosingFuture<V3> future3,
547      ClosingFuture<V4> future4,
548      ClosingFuture<V5> future5) {
549    return new Combiner5<>(future1, future2, future3, future4, future5);
550  }
551
552  /**
553   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
554   * all succeed. If any fail, the resulting pipeline will fail.
555   *
556   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
557   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
558   */
559  public static Combiner whenAllSucceed(
560      ClosingFuture<?> future1,
561      ClosingFuture<?> future2,
562      ClosingFuture<?> future3,
563      ClosingFuture<?> future4,
564      ClosingFuture<?> future5,
565      ClosingFuture<?> future6,
566      ClosingFuture<?>... moreFutures) {
567    return whenAllSucceed(
568        FluentIterable.of(future1, future2, future3, future4, future5, future6)
569            .append(moreFutures));
570  }
571
572  private final AtomicReference<State> state = new AtomicReference<>(OPEN);
573  private final CloseableList closeables = new CloseableList();
574  private final FluentFuture<V> future;
575
576  private ClosingFuture(ListenableFuture<V> future) {
577    this.future = FluentFuture.from(future);
578  }
579
580  private ClosingFuture(final ClosingCallable<V> callable, Executor executor) {
581    checkNotNull(callable);
582    TrustedListenableFutureTask<V> task =
583        TrustedListenableFutureTask.create(
584            new Callable<V>() {
585              @Override
586              public V call() throws Exception {
587                return callable.call(closeables.closer);
588              }
589
590              @Override
591              public String toString() {
592                return callable.toString();
593              }
594            });
595    executor.execute(task);
596    this.future = task;
597  }
598
599  private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) {
600    checkNotNull(callable);
601    TrustedListenableFutureTask<V> task =
602        TrustedListenableFutureTask.create(
603            new AsyncCallable<V>() {
604              @Override
605              public ListenableFuture<V> call() throws Exception {
606                CloseableList newCloseables = new CloseableList();
607                try {
608                  ClosingFuture<V> closingFuture = callable.call(newCloseables.closer);
609                  closingFuture.becomeSubsumedInto(closeables);
610                  return closingFuture.future;
611                } finally {
612                  closeables.add(newCloseables, directExecutor());
613                }
614              }
615
616              @Override
617              public String toString() {
618                return callable.toString();
619              }
620            });
621    executor.execute(task);
622    this.future = task;
623  }
624
625  /**
626   * Returns a future that finishes when this step does. Calling {@code get()} on the returned
627   * future returns {@code null} if the step is successful or throws the same exception that would
628   * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code
629   * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline.
630   *
631   * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls
632   * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or
633   * a derivation method <i>on the same instance</i>. This is important because calling {@code
634   * statusFuture} alone does not provide a way to close the pipeline.
635   */
636  public ListenableFuture<?> statusFuture() {
637    return nonCancellationPropagating(future.transform(constant(null), directExecutor()));
638  }
639
640  /**
641   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
642   * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed
643   * when the pipeline is done.
644   *
645   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
646   * ClosingFuture} will be equivalent to this one.
647   *
648   * <p>If the function throws an exception, that exception is used as the result of the derived
649   * {@code ClosingFuture}.
650   *
651   * <p>Example usage:
652   *
653   * <pre>{@code
654   * ClosingFuture<List<Row>> rowsFuture =
655   *     queryFuture.transform((closer, result) -> result.getRows(), executor);
656   * }</pre>
657   *
658   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
659   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
660   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
661   *
662   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
663   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
664   * this {@code ClosingFuture}.
665   *
666   * @param function transforms the value of this step to the value of the derived step
667   * @param executor executor to run the function in
668   * @return the derived step
669   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
670   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
671   *     finished}
672   */
673  public <U> ClosingFuture<U> transform(
674      final ClosingFunction<? super V, U> function, Executor executor) {
675    checkNotNull(function);
676    AsyncFunction<V, U> applyFunction =
677        new AsyncFunction<V, U>() {
678          @Override
679          public ListenableFuture<U> apply(V input) throws Exception {
680            return closeables.applyClosingFunction(function, input);
681          }
682
683          @Override
684          public String toString() {
685            return function.toString();
686          }
687        };
688    // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function).
689    return derive(future.transformAsync(applyFunction, executor));
690  }
691
692  /**
693   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
694   * that returns a {@code ClosingFuture} to its value. The function can use a {@link
695   * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
696   * captured by the returned {@link ClosingFuture}).
697   *
698   * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one
699   * returned by the function.
700   *
701   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
702   * ClosingFuture} will be equivalent to this one.
703   *
704   * <p>If the function throws an exception, that exception is used as the result of the derived
705   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
706   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
707   * closed.
708   *
709   * <p>Usage guidelines for this method:
710   *
711   * <ul>
712   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
713   *       {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction,
714   *       Executor)} instead, with a function that returns the next value directly.
715   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
716   *       for every closeable object this step creates in order to capture it for later closing.
717   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
718   *       ClosingFuture} call {@link #from(ListenableFuture)}.
719   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
720   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
721   *       {@link #withoutCloser(AsyncFunction)}
722   * </ul>
723   *
724   * <p>Example usage:
725   *
726   * <pre>{@code
727   * // Result.getRowsClosingFuture() returns a ClosingFuture.
728   * ClosingFuture<List<Row>> rowsFuture =
729   *     queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor);
730   *
731   * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the
732   * // number of written rows. openOutputFile() returns a FileOutputStream (which implements
733   * // Closeable).
734   * ClosingFuture<Integer> rowsFuture2 =
735   *     queryFuture.transformAsync(
736   *         (closer, result) -> {
737   *           FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor);
738   *           return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos));
739   *      },
740   *      executor);
741   *
742   * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created).
743   * ClosingFuture<List<Row>> rowsFuture3 =
744   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
745   *
746   * }</pre>
747   *
748   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
749   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
750   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
751   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
752   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
753   * responsible for completing the returned {@code ClosingFuture}.)
754   *
755   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
756   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
757   * this {@code ClosingFuture}.
758   *
759   * @param function transforms the value of this step to a {@code ClosingFuture} with the value of
760   *     the derived step
761   * @param executor executor to run the function in
762   * @return the derived step
763   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
764   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
765   *     finished}
766   */
767  public <U> ClosingFuture<U> transformAsync(
768      final AsyncClosingFunction<? super V, U> function, Executor executor) {
769    checkNotNull(function);
770    AsyncFunction<V, U> applyFunction =
771        new AsyncFunction<V, U>() {
772          @Override
773          public ListenableFuture<U> apply(V input) throws Exception {
774            return closeables.applyAsyncClosingFunction(function, input);
775          }
776
777          @Override
778          public String toString() {
779            return function.toString();
780          }
781        };
782    return derive(future.transformAsync(applyFunction, executor));
783  }
784
785  /**
786   * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input,
787   * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned
788   * {@link ListenableFuture}.
789   *
790   * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction,
791   * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it
792   * meets these conditions:
793   *
794   * <ul>
795   *   <li>It does not need to capture any {@link Closeable} objects by calling {@link
796   *       DeferredCloser#eventuallyClose(Closeable, Executor)}.
797   *   <li>It returns a {@link ListenableFuture}.
798   * </ul>
799   *
800   * <p>Example usage:
801   *
802   * <pre>{@code
803   * // Result.getRowsFuture() returns a ListenableFuture.
804   * ClosingFuture<List<Row>> rowsFuture =
805   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
806   * }</pre>
807   *
808   * @param function transforms the value of a {@code ClosingFuture} step to a {@link
809   *     ListenableFuture} with the value of a derived step
810   */
811  public static <V, U> AsyncClosingFunction<V, U> withoutCloser(
812      final AsyncFunction<V, U> function) {
813    checkNotNull(function);
814    return new AsyncClosingFunction<V, U>() {
815      @Override
816      public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception {
817        return ClosingFuture.from(function.apply(input));
818      }
819    };
820  }
821
822  /**
823   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
824   * to its exception if it is an instance of a given exception type. The function can use a {@link
825   * DeferredCloser} to capture objects to be closed when the pipeline is done.
826   *
827   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
828   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
829   * one.
830   *
831   * <p>If the function throws an exception, that exception is used as the result of the derived
832   * {@code ClosingFuture}.
833   *
834   * <p>Example usage:
835   *
836   * <pre>{@code
837   * ClosingFuture<QueryResult> queryFuture =
838   *     queryFuture.catching(
839   *         QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor);
840   * }</pre>
841   *
842   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
843   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
844   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
845   *
846   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
847   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
848   * this {@code ClosingFuture}.
849   *
850   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
851   *     type is matched against this step's exception. "This step's exception" means the cause of
852   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
853   *     underlying this step or, if {@code get()} throws a different kind of exception, that
854   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
855   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
856   * @param fallback the function to be called if this step fails with the expected exception type.
857   *     The function's argument is this step's exception. "This step's exception" means the cause
858   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
859   *     underlying this step or, if {@code get()} throws a different kind of exception, that
860   *     exception itself.
861   * @param executor the executor that runs {@code fallback} if the input fails
862   */
863  public <X extends Throwable> ClosingFuture<V> catching(
864      Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) {
865    return catchingMoreGeneric(exceptionType, fallback, executor);
866  }
867
868  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
869  private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric(
870      Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) {
871    checkNotNull(fallback);
872    AsyncFunction<X, W> applyFallback =
873        new AsyncFunction<X, W>() {
874          @Override
875          public ListenableFuture<W> apply(X exception) throws Exception {
876            return closeables.applyClosingFunction(fallback, exception);
877          }
878
879          @Override
880          public String toString() {
881            return fallback.toString();
882          }
883        };
884    // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function).
885    return derive(future.catchingAsync(exceptionType, applyFallback, executor));
886  }
887
888  /**
889   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
890   * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception
891   * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the
892   * pipeline is done (other than those captured by the returned {@link ClosingFuture}).
893   *
894   * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code
895   * ClosingFuture} will be equivalent to the one returned by the function.
896   *
897   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
898   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
899   * one.
900   *
901   * <p>If the function throws an exception, that exception is used as the result of the derived
902   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
903   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
904   * closed.
905   *
906   * <p>Usage guidelines for this method:
907   *
908   * <ul>
909   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
910   *       {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class,
911   *       ClosingFunction, Executor)} instead, with a function that returns the next value
912   *       directly.
913   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
914   *       for every closeable object this step creates in order to capture it for later closing.
915   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
916   *       ClosingFuture} call {@link #from(ListenableFuture)}.
917   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
918   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
919   *       {@link #withoutCloser(AsyncFunction)}
920   * </ul>
921   *
922   * <p>Example usage:
923   *
924   * <pre>{@code
925   * // Fall back to a secondary input stream in case of IOException.
926   * ClosingFuture<InputStream> inputFuture =
927   *     firstInputFuture.catchingAsync(
928   *         IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor);
929   * }
930   * }</pre>
931   *
932   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
933   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
934   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
935   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
936   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
937   * responsible for completing the returned {@code ClosingFuture}.)
938   *
939   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
940   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
941   * this {@code ClosingFuture}.
942   *
943   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
944   *     type is matched against this step's exception. "This step's exception" means the cause of
945   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
946   *     underlying this step or, if {@code get()} throws a different kind of exception, that
947   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
948   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
949   * @param fallback the function to be called if this step fails with the expected exception type.
950   *     The function's argument is this step's exception. "This step's exception" means the cause
951   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
952   *     underlying this step or, if {@code get()} throws a different kind of exception, that
953   *     exception itself.
954   * @param executor the executor that runs {@code fallback} if the input fails
955   */
956  // TODO(dpb): Should this do something special if the function throws CancellationException or
957  // ExecutionException?
958  public <X extends Throwable> ClosingFuture<V> catchingAsync(
959      Class<X> exceptionType,
960      AsyncClosingFunction<? super X, ? extends V> fallback,
961      Executor executor) {
962    return catchingAsyncMoreGeneric(exceptionType, fallback, executor);
963  }
964
965  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
966  private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric(
967      Class<X> exceptionType,
968      final AsyncClosingFunction<? super X, W> fallback,
969      Executor executor) {
970    checkNotNull(fallback);
971    AsyncFunction<X, W> asyncFunction =
972        new AsyncFunction<X, W>() {
973          @Override
974          public ListenableFuture<W> apply(X exception) throws Exception {
975            return closeables.applyAsyncClosingFunction(fallback, exception);
976          }
977
978          @Override
979          public String toString() {
980            return fallback.toString();
981          }
982        };
983    return derive(future.catchingAsync(exceptionType, asyncFunction, executor));
984  }
985
986  /**
987   * Marks this step as the last step in the {@code ClosingFuture} pipeline.
988   *
989   * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when
990   * the pipeline is cancelled.
991   *
992   * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously
993   * <b>after</b> the returned {@code Future} is done: the future completes before closing starts,
994   * rather than once it has finished.
995   *
996   * <p>After calling this method, you may not call {@link
997   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other
998   * derivation method on this {@code ClosingFuture}.
999   *
1000   * @return a {@link Future} that represents the final value or exception of the pipeline
1001   */
1002  public FluentFuture<V> finishToFuture() {
1003    if (compareAndUpdateState(OPEN, WILL_CLOSE)) {
1004      logger.log(FINER, "will close {0}", this);
1005      future.addListener(
1006          new Runnable() {
1007            @Override
1008            public void run() {
1009              checkAndUpdateState(WILL_CLOSE, CLOSING);
1010              close();
1011              checkAndUpdateState(CLOSING, CLOSED);
1012            }
1013          },
1014          directExecutor());
1015    } else {
1016      switch (state.get()) {
1017        case SUBSUMED:
1018          throw new IllegalStateException(
1019              "Cannot call finishToFuture() after deriving another step");
1020
1021        case WILL_CREATE_VALUE_AND_CLOSER:
1022          throw new IllegalStateException(
1023              "Cannot call finishToFuture() after calling finishToValueAndCloser()");
1024
1025        case WILL_CLOSE:
1026        case CLOSING:
1027        case CLOSED:
1028          throw new IllegalStateException("Cannot call finishToFuture() twice");
1029
1030        case OPEN:
1031          throw new AssertionError();
1032      }
1033    }
1034    return future;
1035  }
1036
1037  /**
1038   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done,
1039   * {@code receiver} will be called with an object that contains the result of the operation. The
1040   * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use.
1041   *
1042   * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or
1043   * any other derivation method on this {@code ClosingFuture}.
1044   *
1045   * @param consumer a callback whose method will be called (using {@code executor}) when this
1046   *     operation is done
1047   */
1048  public void finishToValueAndCloser(
1049      final ValueAndCloserConsumer<? super V> consumer, Executor executor) {
1050    checkNotNull(consumer);
1051    if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) {
1052      switch (state.get()) {
1053        case SUBSUMED:
1054          throw new IllegalStateException(
1055              "Cannot call finishToValueAndCloser() after deriving another step");
1056
1057        case WILL_CLOSE:
1058        case CLOSING:
1059        case CLOSED:
1060          throw new IllegalStateException(
1061              "Cannot call finishToValueAndCloser() after calling finishToFuture()");
1062
1063        case WILL_CREATE_VALUE_AND_CLOSER:
1064          throw new IllegalStateException("Cannot call finishToValueAndCloser() twice");
1065
1066        case OPEN:
1067          break;
1068      }
1069      throw new AssertionError(state);
1070    }
1071    future.addListener(
1072        new Runnable() {
1073          @Override
1074          public void run() {
1075            provideValueAndCloser(consumer, ClosingFuture.this);
1076          }
1077        },
1078        executor);
1079  }
1080
1081  private static <C, V extends C> void provideValueAndCloser(
1082      ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) {
1083    consumer.accept(new ValueAndCloser<C>(closingFuture));
1084  }
1085
1086  /**
1087   * Attempts to cancel execution of this step. This attempt will fail if the step has already
1088   * completed, has already been cancelled, or could not be cancelled for some other reason. If
1089   * successful, and this step has not started when {@code cancel} is called, this step should never
1090   * run.
1091   *
1092   * <p>If successful, causes the objects captured by this step (if already started) and its input
1093   * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls
1094   * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously.
1095   *
1096   * @param mayInterruptIfRunning {@code true} if the thread executing this task should be
1097   *     interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be
1098   *     cancelled regardless
1099   * @return {@code false} if the step could not be cancelled, typically because it has already
1100   *     completed normally; {@code true} otherwise
1101   */
1102  @CanIgnoreReturnValue
1103  public boolean cancel(boolean mayInterruptIfRunning) {
1104    logger.log(FINER, "cancelling {0}", this);
1105    boolean cancelled = future.cancel(mayInterruptIfRunning);
1106    if (cancelled) {
1107      close();
1108    }
1109    return cancelled;
1110  }
1111
1112  private void close() {
1113    logger.log(FINER, "closing {0}", this);
1114    closeables.close();
1115  }
1116
1117  private <U> ClosingFuture<U> derive(FluentFuture<U> future) {
1118    ClosingFuture<U> derived = new ClosingFuture<>(future);
1119    becomeSubsumedInto(derived.closeables);
1120    return derived;
1121  }
1122
1123  private void becomeSubsumedInto(CloseableList otherCloseables) {
1124    checkAndUpdateState(OPEN, SUBSUMED);
1125    otherCloseables.add(closeables, directExecutor());
1126  }
1127
1128  /**
1129   * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link
1130   * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}.
1131   *
1132   * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object.
1133   */
1134  public static final class Peeker {
1135    private final ImmutableList<ClosingFuture<?>> futures;
1136    private volatile boolean beingCalled;
1137
1138    private Peeker(ImmutableList<ClosingFuture<?>> futures) {
1139      this.futures = checkNotNull(futures);
1140    }
1141
1142    /**
1143     * Returns the value of {@code closingFuture}.
1144     *
1145     * @throws ExecutionException if {@code closingFuture} is a failed step
1146     * @throws CancellationException if the {@code closingFuture}'s future was cancelled
1147     * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to
1148     *     {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)}
1149     * @throws IllegalStateException if called outside of a call to {@link
1150     *     CombiningCallable#call(DeferredCloser, Peeker)} or {@link
1151     *     AsyncCombiningCallable#call(DeferredCloser, Peeker)}
1152     */
1153    @NullableDecl
1154    public final <D extends Object> D getDone(ClosingFuture<D> closingFuture)
1155        throws ExecutionException {
1156      checkState(beingCalled);
1157      checkArgument(futures.contains(closingFuture));
1158      return Futures.getDone(closingFuture.future);
1159    }
1160
1161    @NullableDecl
1162    private <V extends Object> V call(CombiningCallable<V> combiner, CloseableList closeables)
1163        throws Exception {
1164      beingCalled = true;
1165      CloseableList newCloseables = new CloseableList();
1166      try {
1167        return combiner.call(newCloseables.closer, this);
1168      } finally {
1169        closeables.add(newCloseables, directExecutor());
1170        beingCalled = false;
1171      }
1172    }
1173
1174    private <V extends Object> FluentFuture<V> callAsync(
1175        AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1176      beingCalled = true;
1177      CloseableList newCloseables = new CloseableList();
1178      try {
1179        ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this);
1180        closingFuture.becomeSubsumedInto(closeables);
1181        return closingFuture.future;
1182      } finally {
1183        closeables.add(newCloseables, directExecutor());
1184        beingCalled = false;
1185      }
1186    }
1187  }
1188
1189  /**
1190   * A builder of a {@link ClosingFuture} step that is derived from more than one input step.
1191   *
1192   * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to
1193   * instantiate this class.
1194   *
1195   * <p>Example:
1196   *
1197   * <pre>{@code
1198   * final ClosingFuture<BufferedReader> file1ReaderFuture = ...;
1199   * final ClosingFuture<BufferedReader> file2ReaderFuture = ...;
1200   * ListenableFuture<Integer> numberOfDifferentLines =
1201   *       ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture)
1202   *           .call(
1203   *               (closer, peeker) -> {
1204   *                 BufferedReader file1Reader = peeker.getDone(file1ReaderFuture);
1205   *                 BufferedReader file2Reader = peeker.getDone(file2ReaderFuture);
1206   *                 return countDifferentLines(file1Reader, file2Reader);
1207   *               },
1208   *               executor)
1209   *           .closing(executor);
1210   * }</pre>
1211   */
1212  // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8.
1213  @com.google.errorprone.annotations.DoNotMock(
1214      "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.")
1215  public static class Combiner {
1216
1217    private final CloseableList closeables = new CloseableList();
1218
1219    /**
1220     * An operation that returns a result and may throw an exception.
1221     *
1222     * @param <V> the type of the result
1223     */
1224    public interface CombiningCallable<V extends Object> {
1225      /**
1226       * Computes a result, or throws an exception if unable to do so.
1227       *
1228       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1229       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1230       * is done (but not before this method completes), even if this method throws or the pipeline
1231       * is cancelled.
1232       *
1233       * @param peeker used to get the value of any of the input futures
1234       */
1235      @NullableDecl
1236      V call(DeferredCloser closer, Peeker peeker) throws Exception;
1237    }
1238
1239    /**
1240     * An operation that returns a {@link ClosingFuture} result and may throw an exception.
1241     *
1242     * @param <V> the type of the result
1243     */
1244    public interface AsyncCombiningCallable<V extends Object> {
1245      /**
1246       * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so.
1247       *
1248       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1249       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1250       * is done (but not before this method completes), even if this method throws or the pipeline
1251       * is cancelled.
1252       *
1253       * @param peeker used to get the value of any of the input futures
1254       */
1255      ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception;
1256    }
1257
1258    private final boolean allMustSucceed;
1259    protected final ImmutableList<ClosingFuture<?>> inputs;
1260
1261    private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) {
1262      this.allMustSucceed = allMustSucceed;
1263      this.inputs = ImmutableList.copyOf(inputs);
1264      for (ClosingFuture<?> input : inputs) {
1265        input.becomeSubsumedInto(closeables);
1266      }
1267    }
1268
1269    /**
1270     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1271     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1272     * objects to be closed when the pipeline is done.
1273     *
1274     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1275     * fail, so will the returned step.
1276     *
1277     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1278     * cancelled.
1279     *
1280     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1281     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1282     */
1283    public <V> ClosingFuture<V> call(
1284        final CombiningCallable<V> combiningCallable, Executor executor) {
1285      Callable<V> callable =
1286          new Callable<V>() {
1287            @Override
1288            public V call() throws Exception {
1289              return new Peeker(inputs).call(combiningCallable, closeables);
1290            }
1291
1292            @Override
1293            public String toString() {
1294              return combiningCallable.toString();
1295            }
1296          };
1297      ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor));
1298      derived.closeables.add(closeables, directExecutor());
1299      return derived;
1300    }
1301
1302    /**
1303     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1304     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1305     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1306     * captured by the returned {@link ClosingFuture}).
1307     *
1308     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1309     * fail, so will the returned step.
1310     *
1311     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1312     * cancelled.
1313     *
1314     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1315     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1316     *
1317     * <p>If the combiningCallable throws any other exception, it will be used as the failure of the
1318     * derived step.
1319     *
1320     * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture},
1321     * then none of the closeable objects in that {@code ClosingFuture} will be closed.
1322     *
1323     * <p>Usage guidelines for this method:
1324     *
1325     * <ul>
1326     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1327     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1328     *       Executor)} instead, with a function that returns the next value directly.
1329     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1330     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1331     *       capture it for later closing.
1332     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1333     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1334     * </ul>
1335     *
1336     * <p>The same warnings about doing heavyweight operations within {@link
1337     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1338     */
1339    public <V> ClosingFuture<V> callAsync(
1340        final AsyncCombiningCallable<V> combiningCallable, Executor executor) {
1341      AsyncCallable<V> asyncCallable =
1342          new AsyncCallable<V>() {
1343            @Override
1344            public ListenableFuture<V> call() throws Exception {
1345              return new Peeker(inputs).callAsync(combiningCallable, closeables);
1346            }
1347
1348            @Override
1349            public String toString() {
1350              return combiningCallable.toString();
1351            }
1352          };
1353      ClosingFuture<V> derived =
1354          new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor));
1355      derived.closeables.add(closeables, directExecutor());
1356      return derived;
1357    }
1358
1359    private FutureCombiner<Object> futureCombiner() {
1360      return allMustSucceed
1361          ? Futures.whenAllSucceed(inputFutures())
1362          : Futures.whenAllComplete(inputFutures());
1363    }
1364
1365    private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE =
1366        new Function<ClosingFuture<?>, FluentFuture<?>>() {
1367          @Override
1368          public FluentFuture<?> apply(ClosingFuture<?> future) {
1369            return future.future;
1370          }
1371        };
1372
1373    private ImmutableList<FluentFuture<?>> inputFutures() {
1374      return FluentIterable.from(inputs).transform(INNER_FUTURE).toList();
1375    }
1376  }
1377
1378  /**
1379   * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link
1380   * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this
1381   * combination.
1382   *
1383   * @param <V1> the type returned by the first future
1384   * @param <V2> the type returned by the second future
1385   */
1386  public static final class Combiner2<V1 extends Object, V2 extends Object> extends Combiner {
1387
1388    /**
1389     * A function that returns a value when applied to the values of the two futures passed to
1390     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1391     *
1392     * @param <V1> the type returned by the first future
1393     * @param <V2> the type returned by the second future
1394     * @param <U> the type returned by the function
1395     */
1396    public interface ClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> {
1397
1398      /**
1399       * Applies this function to two inputs, or throws an exception if unable to do so.
1400       *
1401       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1402       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1403       * is done (but not before this method completes), even if this method throws or the pipeline
1404       * is cancelled.
1405       */
1406      @NullableDecl
1407      U apply(DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2)
1408          throws Exception;
1409    }
1410
1411    /**
1412     * A function that returns a {@link ClosingFuture} when applied to the values of the two futures
1413     * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1414     *
1415     * @param <V1> the type returned by the first future
1416     * @param <V2> the type returned by the second future
1417     * @param <U> the type returned by the function
1418     */
1419    public interface AsyncClosingFunction2<V1 extends Object, V2 extends Object, U extends Object> {
1420
1421      /**
1422       * Applies this function to two inputs, or throws an exception if unable to do so.
1423       *
1424       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1425       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1426       * is done (but not before this method completes), even if this method throws or the pipeline
1427       * is cancelled.
1428       */
1429      ClosingFuture<U> apply(
1430          DeferredCloser closer, @NullableDecl V1 value1, @NullableDecl V2 value2) throws Exception;
1431    }
1432
1433    private final ClosingFuture<V1> future1;
1434    private final ClosingFuture<V2> future2;
1435
1436    private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
1437      super(true, ImmutableList.of(future1, future2));
1438      this.future1 = future1;
1439      this.future2 = future2;
1440    }
1441
1442    /**
1443     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1444     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1445     * objects to be closed when the pipeline is done.
1446     *
1447     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1448     * any of the inputs fail, so will the returned step.
1449     *
1450     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1451     *
1452     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1453     * ExecutionException} will be extracted and used as the failure of the derived step.
1454     */
1455    public <U extends Object> ClosingFuture<U> call(
1456        final ClosingFunction2<V1, V2, U> function, Executor executor) {
1457      return call(
1458          new CombiningCallable<U>() {
1459            @Override
1460            @NullableDecl
1461            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1462              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1463            }
1464
1465            @Override
1466            public String toString() {
1467              return function.toString();
1468            }
1469          },
1470          executor);
1471    }
1472
1473    /**
1474     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1475     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1476     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1477     * captured by the returned {@link ClosingFuture}).
1478     *
1479     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1480     * any of the inputs fail, so will the returned step.
1481     *
1482     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1483     *
1484     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1485     * ExecutionException} will be extracted and used as the failure of the derived step.
1486     *
1487     * <p>If the function throws any other exception, it will be used as the failure of the derived
1488     * step.
1489     *
1490     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1491     * the closeable objects in that {@code ClosingFuture} will be closed.
1492     *
1493     * <p>Usage guidelines for this method:
1494     *
1495     * <ul>
1496     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1497     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1498     *       Executor)} instead, with a function that returns the next value directly.
1499     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1500     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1501     *       capture it for later closing.
1502     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1503     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1504     * </ul>
1505     *
1506     * <p>The same warnings about doing heavyweight operations within {@link
1507     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1508     */
1509    public <U extends Object> ClosingFuture<U> callAsync(
1510        final AsyncClosingFunction2<V1, V2, U> function, Executor executor) {
1511      return callAsync(
1512          new AsyncCombiningCallable<U>() {
1513            @Override
1514            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1515              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1516            }
1517
1518            @Override
1519            public String toString() {
1520              return function.toString();
1521            }
1522          },
1523          executor);
1524    }
1525  }
1526
1527  /**
1528   * A generic {@link Combiner} that lets you use a lambda or method reference to combine three
1529   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1530   * ClosingFuture)} to start this combination.
1531   *
1532   * @param <V1> the type returned by the first future
1533   * @param <V2> the type returned by the second future
1534   * @param <V3> the type returned by the third future
1535   */
1536  public static final class Combiner3<V1 extends Object, V2 extends Object, V3 extends Object>
1537      extends Combiner {
1538    /**
1539     * A function that returns a value when applied to the values of the three futures passed to
1540     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1541     *
1542     * @param <V1> the type returned by the first future
1543     * @param <V2> the type returned by the second future
1544     * @param <V3> the type returned by the third future
1545     * @param <U> the type returned by the function
1546     */
1547    public interface ClosingFunction3<
1548        V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> {
1549      /**
1550       * Applies this function to three inputs, or throws an exception if unable to do so.
1551       *
1552       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1553       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1554       * is done (but not before this method completes), even if this method throws or the pipeline
1555       * is cancelled.
1556       */
1557      @NullableDecl
1558      U apply(
1559          DeferredCloser closer,
1560          @NullableDecl V1 value1,
1561          @NullableDecl V2 value2,
1562          @NullableDecl V3 v3)
1563          throws Exception;
1564    }
1565
1566    /**
1567     * A function that returns a {@link ClosingFuture} when applied to the values of the three
1568     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1569     *
1570     * @param <V1> the type returned by the first future
1571     * @param <V2> the type returned by the second future
1572     * @param <V3> the type returned by the third future
1573     * @param <U> the type returned by the function
1574     */
1575    public interface AsyncClosingFunction3<
1576        V1 extends Object, V2 extends Object, V3 extends Object, U extends Object> {
1577      /**
1578       * Applies this function to three inputs, or throws an exception if unable to do so.
1579       *
1580       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1581       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1582       * is done (but not before this method completes), even if this method throws or the pipeline
1583       * is cancelled.
1584       */
1585      ClosingFuture<U> apply(
1586          DeferredCloser closer,
1587          @NullableDecl V1 value1,
1588          @NullableDecl V2 value2,
1589          @NullableDecl V3 value3)
1590          throws Exception;
1591    }
1592
1593    private final ClosingFuture<V1> future1;
1594    private final ClosingFuture<V2> future2;
1595    private final ClosingFuture<V3> future3;
1596
1597    private Combiner3(
1598        ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
1599      super(true, ImmutableList.of(future1, future2, future3));
1600      this.future1 = future1;
1601      this.future2 = future2;
1602      this.future3 = future3;
1603    }
1604
1605    /**
1606     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1607     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1608     * objects to be closed when the pipeline is done.
1609     *
1610     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1611     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1612     *
1613     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1614     *
1615     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1616     * ExecutionException} will be extracted and used as the failure of the derived step.
1617     */
1618    public <U extends Object> ClosingFuture<U> call(
1619        final ClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1620      return call(
1621          new CombiningCallable<U>() {
1622            @Override
1623            @NullableDecl
1624            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1625              return function.apply(
1626                  closer,
1627                  peeker.getDone(future1),
1628                  peeker.getDone(future2),
1629                  peeker.getDone(future3));
1630            }
1631
1632            @Override
1633            public String toString() {
1634              return function.toString();
1635            }
1636          },
1637          executor);
1638    }
1639
1640    /**
1641     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1642     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1643     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1644     * captured by the returned {@link ClosingFuture}).
1645     *
1646     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1647     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1648     *
1649     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1650     *
1651     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1652     * ExecutionException} will be extracted and used as the failure of the derived step.
1653     *
1654     * <p>If the function throws any other exception, it will be used as the failure of the derived
1655     * step.
1656     *
1657     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1658     * the closeable objects in that {@code ClosingFuture} will be closed.
1659     *
1660     * <p>Usage guidelines for this method:
1661     *
1662     * <ul>
1663     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1664     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1665     *       Executor)} instead, with a function that returns the next value directly.
1666     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1667     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1668     *       capture it for later closing.
1669     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1670     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1671     * </ul>
1672     *
1673     * <p>The same warnings about doing heavyweight operations within {@link
1674     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1675     */
1676    public <U extends Object> ClosingFuture<U> callAsync(
1677        final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1678      return callAsync(
1679          new AsyncCombiningCallable<U>() {
1680            @Override
1681            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1682              return function.apply(
1683                  closer,
1684                  peeker.getDone(future1),
1685                  peeker.getDone(future2),
1686                  peeker.getDone(future3));
1687            }
1688
1689            @Override
1690            public String toString() {
1691              return function.toString();
1692            }
1693          },
1694          executor);
1695    }
1696  }
1697
1698  /**
1699   * A generic {@link Combiner} that lets you use a lambda or method reference to combine four
1700   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1701   * ClosingFuture)} to start this combination.
1702   *
1703   * @param <V1> the type returned by the first future
1704   * @param <V2> the type returned by the second future
1705   * @param <V3> the type returned by the third future
1706   * @param <V4> the type returned by the fourth future
1707   */
1708  public static final class Combiner4<
1709          V1 extends Object, V2 extends Object, V3 extends Object, V4 extends Object>
1710      extends Combiner {
1711    /**
1712     * A function that returns a value when applied to the values of the four futures passed to
1713     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}.
1714     *
1715     * @param <V1> the type returned by the first future
1716     * @param <V2> the type returned by the second future
1717     * @param <V3> the type returned by the third future
1718     * @param <V4> the type returned by the fourth future
1719     * @param <U> the type returned by the function
1720     */
1721    public interface ClosingFunction4<
1722        V1 extends Object,
1723        V2 extends Object,
1724        V3 extends Object,
1725        V4 extends Object,
1726        U extends Object> {
1727      /**
1728       * Applies this function to four inputs, or throws an exception if unable to do so.
1729       *
1730       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1731       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1732       * is done (but not before this method completes), even if this method throws or the pipeline
1733       * is cancelled.
1734       */
1735      @NullableDecl
1736      U apply(
1737          DeferredCloser closer,
1738          @NullableDecl V1 value1,
1739          @NullableDecl V2 value2,
1740          @NullableDecl V3 value3,
1741          @NullableDecl V4 value4)
1742          throws Exception;
1743    }
1744
1745    /**
1746     * A function that returns a {@link ClosingFuture} when applied to the values of the four
1747     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1748     * ClosingFuture)}.
1749     *
1750     * @param <V1> the type returned by the first future
1751     * @param <V2> the type returned by the second future
1752     * @param <V3> the type returned by the third future
1753     * @param <V4> the type returned by the fourth future
1754     * @param <U> the type returned by the function
1755     */
1756    public interface AsyncClosingFunction4<
1757        V1 extends Object,
1758        V2 extends Object,
1759        V3 extends Object,
1760        V4 extends Object,
1761        U extends Object> {
1762      /**
1763       * Applies this function to four inputs, or throws an exception if unable to do so.
1764       *
1765       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1766       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1767       * is done (but not before this method completes), even if this method throws or the pipeline
1768       * is cancelled.
1769       */
1770      ClosingFuture<U> apply(
1771          DeferredCloser closer,
1772          @NullableDecl V1 value1,
1773          @NullableDecl V2 value2,
1774          @NullableDecl V3 value3,
1775          @NullableDecl V4 value4)
1776          throws Exception;
1777    }
1778
1779    private final ClosingFuture<V1> future1;
1780    private final ClosingFuture<V2> future2;
1781    private final ClosingFuture<V3> future3;
1782    private final ClosingFuture<V4> future4;
1783
1784    private Combiner4(
1785        ClosingFuture<V1> future1,
1786        ClosingFuture<V2> future2,
1787        ClosingFuture<V3> future3,
1788        ClosingFuture<V4> future4) {
1789      super(true, ImmutableList.of(future1, future2, future3, future4));
1790      this.future1 = future1;
1791      this.future2 = future2;
1792      this.future3 = future3;
1793      this.future4 = future4;
1794    }
1795
1796    /**
1797     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1798     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1799     * objects to be closed when the pipeline is done.
1800     *
1801     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1802     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1803     *
1804     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1805     *
1806     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1807     * ExecutionException} will be extracted and used as the failure of the derived step.
1808     */
1809    public <U extends Object> ClosingFuture<U> call(
1810        final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1811      return call(
1812          new CombiningCallable<U>() {
1813            @Override
1814            @NullableDecl
1815            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1816              return function.apply(
1817                  closer,
1818                  peeker.getDone(future1),
1819                  peeker.getDone(future2),
1820                  peeker.getDone(future3),
1821                  peeker.getDone(future4));
1822            }
1823
1824            @Override
1825            public String toString() {
1826              return function.toString();
1827            }
1828          },
1829          executor);
1830    }
1831
1832    /**
1833     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1834     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1835     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1836     * captured by the returned {@link ClosingFuture}).
1837     *
1838     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1839     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1840     *
1841     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1842     *
1843     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1844     * ExecutionException} will be extracted and used as the failure of the derived step.
1845     *
1846     * <p>If the function throws any other exception, it will be used as the failure of the derived
1847     * step.
1848     *
1849     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1850     * the closeable objects in that {@code ClosingFuture} will be closed.
1851     *
1852     * <p>Usage guidelines for this method:
1853     *
1854     * <ul>
1855     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1856     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1857     *       Executor)} instead, with a function that returns the next value directly.
1858     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1859     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1860     *       capture it for later closing.
1861     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1862     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1863     * </ul>
1864     *
1865     * <p>The same warnings about doing heavyweight operations within {@link
1866     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1867     */
1868    public <U extends Object> ClosingFuture<U> callAsync(
1869        final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1870      return callAsync(
1871          new AsyncCombiningCallable<U>() {
1872            @Override
1873            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1874              return function.apply(
1875                  closer,
1876                  peeker.getDone(future1),
1877                  peeker.getDone(future2),
1878                  peeker.getDone(future3),
1879                  peeker.getDone(future4));
1880            }
1881
1882            @Override
1883            public String toString() {
1884              return function.toString();
1885            }
1886          },
1887          executor);
1888    }
1889  }
1890
1891  /**
1892   * A generic {@link Combiner} that lets you use a lambda or method reference to combine five
1893   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1894   * ClosingFuture, ClosingFuture)} to start this combination.
1895   *
1896   * @param <V1> the type returned by the first future
1897   * @param <V2> the type returned by the second future
1898   * @param <V3> the type returned by the third future
1899   * @param <V4> the type returned by the fourth future
1900   * @param <V5> the type returned by the fifth future
1901   */
1902  public static final class Combiner5<
1903          V1 extends Object,
1904          V2 extends Object,
1905          V3 extends Object,
1906          V4 extends Object,
1907          V5 extends Object>
1908      extends Combiner {
1909    /**
1910     * A function that returns a value when applied to the values of the five futures passed to
1911     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture,
1912     * ClosingFuture)}.
1913     *
1914     * @param <V1> the type returned by the first future
1915     * @param <V2> the type returned by the second future
1916     * @param <V3> the type returned by the third future
1917     * @param <V4> the type returned by the fourth future
1918     * @param <V5> the type returned by the fifth future
1919     * @param <U> the type returned by the function
1920     */
1921    public interface ClosingFunction5<
1922        V1 extends Object,
1923        V2 extends Object,
1924        V3 extends Object,
1925        V4 extends Object,
1926        V5 extends Object,
1927        U extends Object> {
1928      /**
1929       * Applies this function to five inputs, or throws an exception if unable to do so.
1930       *
1931       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1932       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1933       * is done (but not before this method completes), even if this method throws or the pipeline
1934       * is cancelled.
1935       */
1936      @NullableDecl
1937      U apply(
1938          DeferredCloser closer,
1939          @NullableDecl V1 value1,
1940          @NullableDecl V2 value2,
1941          @NullableDecl V3 value3,
1942          @NullableDecl V4 value4,
1943          @NullableDecl V5 value5)
1944          throws Exception;
1945    }
1946
1947    /**
1948     * A function that returns a {@link ClosingFuture} when applied to the values of the five
1949     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1950     * ClosingFuture, ClosingFuture)}.
1951     *
1952     * @param <V1> the type returned by the first future
1953     * @param <V2> the type returned by the second future
1954     * @param <V3> the type returned by the third future
1955     * @param <V4> the type returned by the fourth future
1956     * @param <V5> the type returned by the fifth future
1957     * @param <U> the type returned by the function
1958     */
1959    public interface AsyncClosingFunction5<
1960        V1 extends Object,
1961        V2 extends Object,
1962        V3 extends Object,
1963        V4 extends Object,
1964        V5 extends Object,
1965        U extends Object> {
1966      /**
1967       * Applies this function to five inputs, or throws an exception if unable to do so.
1968       *
1969       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1970       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1971       * is done (but not before this method completes), even if this method throws or the pipeline
1972       * is cancelled.
1973       */
1974      ClosingFuture<U> apply(
1975          DeferredCloser closer,
1976          @NullableDecl V1 value1,
1977          @NullableDecl V2 value2,
1978          @NullableDecl V3 value3,
1979          @NullableDecl V4 value4,
1980          @NullableDecl V5 value5)
1981          throws Exception;
1982    }
1983
1984    private final ClosingFuture<V1> future1;
1985    private final ClosingFuture<V2> future2;
1986    private final ClosingFuture<V3> future3;
1987    private final ClosingFuture<V4> future4;
1988    private final ClosingFuture<V5> future5;
1989
1990    private Combiner5(
1991        ClosingFuture<V1> future1,
1992        ClosingFuture<V2> future2,
1993        ClosingFuture<V3> future3,
1994        ClosingFuture<V4> future4,
1995        ClosingFuture<V5> future5) {
1996      super(true, ImmutableList.of(future1, future2, future3, future4, future5));
1997      this.future1 = future1;
1998      this.future2 = future2;
1999      this.future3 = future3;
2000      this.future4 = future4;
2001      this.future5 = future5;
2002    }
2003
2004    /**
2005     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2006     * combining function to their values. The function can use a {@link DeferredCloser} to capture
2007     * objects to be closed when the pipeline is done.
2008     *
2009     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2010     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2011     * returned step.
2012     *
2013     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2014     *
2015     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2016     * ExecutionException} will be extracted and used as the failure of the derived step.
2017     */
2018    public <U extends Object> ClosingFuture<U> call(
2019        final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2020      return call(
2021          new CombiningCallable<U>() {
2022            @Override
2023            @NullableDecl
2024            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
2025              return function.apply(
2026                  closer,
2027                  peeker.getDone(future1),
2028                  peeker.getDone(future2),
2029                  peeker.getDone(future3),
2030                  peeker.getDone(future4),
2031                  peeker.getDone(future5));
2032            }
2033
2034            @Override
2035            public String toString() {
2036              return function.toString();
2037            }
2038          },
2039          executor);
2040    }
2041
2042    /**
2043     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2044     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
2045     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
2046     * captured by the returned {@link ClosingFuture}).
2047     *
2048     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2049     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2050     * returned step.
2051     *
2052     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2053     *
2054     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2055     * ExecutionException} will be extracted and used as the failure of the derived step.
2056     *
2057     * <p>If the function throws any other exception, it will be used as the failure of the derived
2058     * step.
2059     *
2060     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
2061     * the closeable objects in that {@code ClosingFuture} will be closed.
2062     *
2063     * <p>Usage guidelines for this method:
2064     *
2065     * <ul>
2066     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
2067     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
2068     *       Executor)} instead, with a function that returns the next value directly.
2069     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
2070     *       closer.eventuallyClose()} for every closeable object this step creates in order to
2071     *       capture it for later closing.
2072     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
2073     *       ClosingFuture} call {@link #from(ListenableFuture)}.
2074     * </ul>
2075     *
2076     * <p>The same warnings about doing heavyweight operations within {@link
2077     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
2078     */
2079    public <U extends Object> ClosingFuture<U> callAsync(
2080        final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2081      return callAsync(
2082          new AsyncCombiningCallable<U>() {
2083            @Override
2084            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
2085              return function.apply(
2086                  closer,
2087                  peeker.getDone(future1),
2088                  peeker.getDone(future2),
2089                  peeker.getDone(future3),
2090                  peeker.getDone(future4),
2091                  peeker.getDone(future5));
2092            }
2093
2094            @Override
2095            public String toString() {
2096              return function.toString();
2097            }
2098          },
2099          executor);
2100    }
2101  }
2102
2103  @Override
2104  public String toString() {
2105    // TODO(dpb): Better toString, in the style of Futures.transform etc.
2106    return toStringHelper(this).add("state", state.get()).addValue(future).toString();
2107  }
2108
2109  @Override
2110  protected void finalize() {
2111    if (state.get().equals(OPEN)) {
2112      logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this);
2113      FluentFuture<V> unused = finishToFuture();
2114    }
2115  }
2116
2117  private static void closeQuietly(final Closeable closeable, Executor executor) {
2118    if (closeable == null) {
2119      return;
2120    }
2121    try {
2122      executor.execute(
2123          new Runnable() {
2124            @Override
2125            public void run() {
2126              try {
2127                closeable.close();
2128              } catch (IOException | RuntimeException e) {
2129                logger.log(WARNING, "thrown by close()", e);
2130              }
2131            }
2132          });
2133    } catch (RejectedExecutionException e) {
2134      if (logger.isLoggable(WARNING)) {
2135        logger.log(
2136            WARNING, String.format("while submitting close to %s; will close inline", executor), e);
2137      }
2138      closeQuietly(closeable, directExecutor());
2139    }
2140  }
2141
2142  private void checkAndUpdateState(State oldState, State newState) {
2143    checkState(
2144        compareAndUpdateState(oldState, newState),
2145        "Expected state to be %s, but it was %s",
2146        oldState,
2147        newState);
2148  }
2149
2150  private boolean compareAndUpdateState(State oldState, State newState) {
2151    return state.compareAndSet(oldState, newState);
2152  }
2153
2154  // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap?
2155  private static final class CloseableList extends IdentityHashMap<Closeable, Executor>
2156      implements Closeable {
2157    private final DeferredCloser closer = new DeferredCloser(this);
2158    private volatile boolean closed;
2159    private volatile CountDownLatch whenClosed;
2160
2161    <V, U> ListenableFuture<U> applyClosingFunction(
2162        ClosingFunction<? super V, U> transformation, V input) throws Exception {
2163      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2164      CloseableList newCloseables = new CloseableList();
2165      try {
2166        return immediateFuture(transformation.apply(newCloseables.closer, input));
2167      } finally {
2168        add(newCloseables, directExecutor());
2169      }
2170    }
2171
2172    <V, U> FluentFuture<U> applyAsyncClosingFunction(
2173        AsyncClosingFunction<V, U> transformation, V input) throws Exception {
2174      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2175      CloseableList newCloseables = new CloseableList();
2176      try {
2177        ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input);
2178        closingFuture.becomeSubsumedInto(newCloseables);
2179        return closingFuture.future;
2180      } finally {
2181        add(newCloseables, directExecutor());
2182      }
2183    }
2184
2185    @Override
2186    public void close() {
2187      if (closed) {
2188        return;
2189      }
2190      synchronized (this) {
2191        if (closed) {
2192          return;
2193        }
2194        closed = true;
2195      }
2196      for (Map.Entry<Closeable, Executor> entry : entrySet()) {
2197        closeQuietly(entry.getKey(), entry.getValue());
2198      }
2199      clear();
2200      if (whenClosed != null) {
2201        whenClosed.countDown();
2202      }
2203    }
2204
2205    void add(@NullableDecl Closeable closeable, Executor executor) {
2206      checkNotNull(executor);
2207      if (closeable == null) {
2208        return;
2209      }
2210      synchronized (this) {
2211        if (!closed) {
2212          put(closeable, executor);
2213          return;
2214        }
2215      }
2216      closeQuietly(closeable, executor);
2217    }
2218
2219    /**
2220     * Returns a latch that reaches zero when this objects' deferred closeables have been closed.
2221     */
2222    CountDownLatch whenClosedCountDown() {
2223      if (closed) {
2224        return new CountDownLatch(0);
2225      }
2226      synchronized (this) {
2227        if (closed) {
2228          return new CountDownLatch(0);
2229        }
2230        checkState(whenClosed == null);
2231        return whenClosed = new CountDownLatch(1);
2232      }
2233    }
2234  }
2235
2236  /**
2237   * Returns an object that can be used to wait until this objects' deferred closeables have all had
2238   * {@link Runnable}s that close them submitted to each one's closing {@link Executor}.
2239   */
2240  @VisibleForTesting
2241  CountDownLatch whenClosedCountDown() {
2242    return closeables.whenClosedCountDown();
2243  }
2244
2245  /** The state of a {@link CloseableList}. */
2246  enum State {
2247    /** The {@link CloseableList} has not been subsumed or closed. */
2248    OPEN,
2249
2250    /**
2251     * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed
2252     * into any other.
2253     */
2254    SUBSUMED,
2255
2256    /**
2257     * Some {@link ListenableFuture} has a callback attached that will close the {@link
2258     * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed.
2259     */
2260    WILL_CLOSE,
2261
2262    /**
2263     * The callback that closes the {@link CloseableList} is running, but it has not completed. The
2264     * {@link CloseableList} may not be subsumed.
2265     */
2266    CLOSING,
2267
2268    /** The {@link CloseableList} has been closed. It may not be further subsumed. */
2269    CLOSED,
2270
2271    /**
2272     * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been
2273     * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called.
2274     */
2275    WILL_CREATE_VALUE_AND_CLOSER,
2276  }
2277}