001/*
002 * Copyright (C) 2017 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Functions.constant;
020import static com.google.common.base.MoreObjects.toStringHelper;
021import static com.google.common.base.Preconditions.checkArgument;
022import static com.google.common.base.Preconditions.checkNotNull;
023import static com.google.common.base.Preconditions.checkState;
024import static com.google.common.collect.Lists.asList;
025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED;
026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING;
027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN;
028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED;
029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE;
030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER;
031import static com.google.common.util.concurrent.Futures.getDone;
032import static com.google.common.util.concurrent.Futures.immediateFuture;
033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
034import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
035import static java.util.logging.Level.FINER;
036import static java.util.logging.Level.SEVERE;
037import static java.util.logging.Level.WARNING;
038
039import com.google.common.annotations.J2ktIncompatible;
040import com.google.common.annotations.VisibleForTesting;
041import com.google.common.collect.FluentIterable;
042import com.google.common.collect.ImmutableList;
043import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
044import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
045import com.google.common.util.concurrent.Futures.FutureCombiner;
046import com.google.errorprone.annotations.CanIgnoreReturnValue;
047import com.google.errorprone.annotations.DoNotMock;
048import com.google.j2objc.annotations.RetainedWith;
049import java.io.Closeable;
050import java.io.IOException;
051import java.util.IdentityHashMap;
052import java.util.Map;
053import java.util.concurrent.Callable;
054import java.util.concurrent.CancellationException;
055import java.util.concurrent.CountDownLatch;
056import java.util.concurrent.ExecutionException;
057import java.util.concurrent.Executor;
058import java.util.concurrent.Future;
059import java.util.concurrent.RejectedExecutionException;
060import java.util.concurrent.atomic.AtomicReference;
061import java.util.logging.Logger;
062import javax.annotation.CheckForNull;
063import org.checkerframework.checker.nullness.qual.Nullable;
064
065/**
066 * A step in a pipeline of an asynchronous computation. When the last step in the computation is
067 * complete, some objects captured during the computation are closed.
068 *
069 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an
070 * asynchronously-computed intermediate value, or else an exception that indicates the failure or
071 * cancellation of the operation so far. The only way to extract the value or exception from a step
072 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the
073 * "value" of a successful step or the "result" (value or exception) of any step.
074 *
075 * <ol>
076 *   <li>A pipeline starts at its leaf step (or steps), which is created from either a callable
077 *       block or a {@link ListenableFuture}.
078 *   <li>Each other step is derived from one or more input steps. At each step, zero or more objects
079 *       can be captured for later closing.
080 *   <li>There is one last step (the root of the tree), from which you can extract the final result
081 *       of the computation. After that result is available (or the computation fails), all objects
082 *       captured by any of the steps in the pipeline are closed.
083 * </ol>
084 *
085 * <h3>Starting a pipeline</h3>
086 *
087 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a
088 * callable block} that may capture objects for later closing. To start a pipeline from a {@link
089 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link
090 * #from(ListenableFuture)} instead.
091 *
092 * <h3>Derived steps</h3>
093 *
094 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in
095 * ways similar to {@link FluentFuture}s:
096 *
097 * <ul>
098 *   <li>by transforming the value from a successful input step,
099 *   <li>by catching the exception from a failed input step, or
100 *   <li>by combining the results of several input steps.
101 * </ul>
102 *
103 * Each derivation can capture the next value or any intermediate objects for later closing.
104 *
105 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its
106 * exception, or combine it with others, you cannot do anything else with it, including declare it
107 * to be the last step of the pipeline.
108 *
109 * <h4>Transforming</h4>
110 *
111 * To derive the next step by asynchronously applying a function to an input step's value, call
112 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction,
113 * Executor)} on the input step.
114 *
115 * <h4>Catching</h4>
116 *
117 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction,
118 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step.
119 *
120 * <h4>Combining</h4>
121 *
122 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link
123 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads.
124 *
125 * <h3>Cancelling</h3>
126 *
127 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step
128 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a
129 * successfully cancelled step will immediately start closing all objects captured for later closing
130 * by it and by its input steps.
131 *
132 * <h3>Ending a pipeline</h3>
133 *
134 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to
135 * close the captured objects automatically or manually.
136 *
137 * <h4>Automatically closing</h4>
138 *
139 * You can extract a {@link Future} that represents the result of the last step in the pipeline by
140 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin
141 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future
142 * completes before closing starts, rather than once it has finished.
143 *
144 * <pre>{@code
145 * FluentFuture<UserName> userName =
146 *     ClosingFuture.submit(
147 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
148 *             executor)
149 *         .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
150 *         .transform((closer, result) -> result.get("userName"), directExecutor())
151 *         .catching(DBException.class, e -> "no user", directExecutor())
152 *         .finishToFuture();
153 * }</pre>
154 *
155 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query
156 * result cursor will both be closed, even if the operation is cancelled or fails.
157 *
158 * <h4>Manually closing</h4>
159 *
160 * If you want to close the captured objects manually, after you've used the final result, call
161 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the
162 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects.
163 *
164 * <pre>{@code
165 *     ClosingFuture.submit(
166 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
167 *             executor)
168 *     .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
169 *     .transform((closer, result) -> result.get("userName"), directExecutor())
170 *     .catching(DBException.class, e -> "no user", directExecutor())
171 *     .finishToValueAndCloser(
172 *         valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor);
173 *
174 * // later
175 * try { // get() will throw if the operation failed or was cancelled.
176 *   UserName userName = userNameValueAndCloser.get();
177 *   // do something with userName
178 * } finally {
179 *   userNameValueAndCloser.closeAsync();
180 * }
181 * }</pre>
182 *
183 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and
184 * the query result cursor will both be closed, even if the operation is cancelled or fails.
185 *
186 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The
187 * automatic-closing approach described above is safer.
188 *
189 * @param <V> the type of the value of this step
190 * @since 30.0
191 */
192// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations.
193@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)")
194@J2ktIncompatible
195@ElementTypesAreNonnullByDefault
196// TODO(dpb): GWT compatibility.
197public final class ClosingFuture<V extends @Nullable Object> {
198
199  private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName());
200
201  /**
202   * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is
203   * done.
204   */
205  public static final class DeferredCloser {
206    @RetainedWith private final CloseableList list;
207
208    DeferredCloser(CloseableList list) {
209      this.list = list;
210    }
211
212    /**
213     * Captures an object to be closed when a {@link ClosingFuture} pipeline is done.
214     *
215     * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code
216     * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code
217     * Closeable}. (For more about the flavors, see <a
218     * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your
219     * build</a>.)
220     *
221     * <p>Be careful when targeting an older SDK than you are building against (most commonly when
222     * building for Android): Ensure that any object you pass implements the interface not just in
223     * your current SDK version but also at the oldest version you support. For example, <a
224     * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version
225     * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper
226     * {@code Closeable} with a method reference like {@code cursor::close}.
227     *
228     * <p>Note that this method is still binary-compatible between flavors because the erasure of
229     * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}.
230     *
231     * @param closeable the object to be closed (see notes above)
232     * @param closingExecutor the object will be closed on this executor
233     * @return the first argument
234     */
235    @CanIgnoreReturnValue
236    @ParametricNullness
237    // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19.
238    public <C extends @Nullable Object & @Nullable Closeable> C eventuallyClose(
239        @ParametricNullness C closeable, Executor closingExecutor) {
240      checkNotNull(closingExecutor);
241      if (closeable != null) {
242        list.add(closeable, closingExecutor);
243      }
244      return closeable;
245    }
246  }
247
248  /**
249   * An operation that computes a result.
250   *
251   * @param <V> the type of the result
252   */
253  public interface ClosingCallable<V extends @Nullable Object> {
254    /**
255     * Computes a result, or throws an exception if unable to do so.
256     *
257     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
258     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
259     * not before this method completes), even if this method throws or the pipeline is cancelled.
260     */
261    @ParametricNullness
262    V call(DeferredCloser closer) throws Exception;
263  }
264
265  /**
266   * An operation that computes a {@link ClosingFuture} of a result.
267   *
268   * @param <V> the type of the result
269   * @since 30.1
270   */
271  public interface AsyncClosingCallable<V extends @Nullable Object> {
272    /**
273     * Computes a result, or throws an exception if unable to do so.
274     *
275     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
276     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
277     * not before this method completes), even if this method throws or the pipeline is cancelled.
278     */
279    ClosingFuture<V> call(DeferredCloser closer) throws Exception;
280  }
281
282  /**
283   * A function from an input to a result.
284   *
285   * @param <T> the type of the input to the function
286   * @param <U> the type of the result of the function
287   */
288  public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> {
289
290    /**
291     * Applies this function to an input, or throws an exception if unable to do so.
292     *
293     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
294     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
295     * not before this method completes), even if this method throws or the pipeline is cancelled.
296     */
297    @ParametricNullness
298    U apply(DeferredCloser closer, @ParametricNullness T input) throws Exception;
299  }
300
301  /**
302   * A function from an input to a {@link ClosingFuture} of a result.
303   *
304   * @param <T> the type of the input to the function
305   * @param <U> the type of the result of the function
306   */
307  public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> {
308    /**
309     * Applies this function to an input, or throws an exception if unable to do so.
310     *
311     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
312     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
313     * not before this method completes), even if this method throws or the pipeline is cancelled.
314     */
315    ClosingFuture<U> apply(DeferredCloser closer, @ParametricNullness T input) throws Exception;
316  }
317
318  /**
319   * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and
320   * allows the user to close all the closeable objects that were captured during it for later
321   * closing.
322   *
323   * <p>The asynchronous operation will have completed before this object is created.
324   *
325   * @param <V> the type of the value of a successful operation
326   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
327   */
328  public static final class ValueAndCloser<V extends @Nullable Object> {
329
330    private final ClosingFuture<? extends V> closingFuture;
331
332    ValueAndCloser(ClosingFuture<? extends V> closingFuture) {
333      this.closingFuture = checkNotNull(closingFuture);
334    }
335
336    /**
337     * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as
338     * {@link Future#get()} would.
339     *
340     * <p>Because the asynchronous operation has already completed, this method is synchronous and
341     * returns immediately.
342     *
343     * @throws CancellationException if the computation was cancelled
344     * @throws ExecutionException if the computation threw an exception
345     */
346    @ParametricNullness
347    public V get() throws ExecutionException {
348      return getDone(closingFuture.future);
349    }
350
351    /**
352     * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous
353     * operation on the {@link Executor}s specified by calls to {@link
354     * DeferredCloser#eventuallyClose(Object, Executor)}.
355     *
356     * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be
357     * closed synchronously.
358     *
359     * <p>Idempotent: objects will be closed at most once.
360     */
361    public void closeAsync() {
362      closingFuture.close();
363    }
364  }
365
366  /**
367   * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link
368   * ClosingFuture} pipeline.
369   *
370   * @param <V> the type of the final value of a successful pipeline
371   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
372   */
373  public interface ValueAndCloserConsumer<V extends @Nullable Object> {
374
375    /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */
376    void accept(ValueAndCloser<V> valueAndCloser);
377  }
378
379  /**
380   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
381   *
382   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
383   *     execution
384   */
385  public static <V extends @Nullable Object> ClosingFuture<V> submit(
386      ClosingCallable<V> callable, Executor executor) {
387    return new ClosingFuture<>(callable, executor);
388  }
389
390  /**
391   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
392   *
393   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
394   *     execution
395   * @since 30.1
396   */
397  public static <V extends @Nullable Object> ClosingFuture<V> submitAsync(
398      AsyncClosingCallable<V> callable, Executor executor) {
399    return new ClosingFuture<>(callable, executor);
400  }
401
402  /**
403   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
404   *
405   * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V}
406   * implements {@link Closeable}. In order to start a pipeline with a value that will be closed
407   * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead.
408   */
409  public static <V extends @Nullable Object> ClosingFuture<V> from(ListenableFuture<V> future) {
410    return new ClosingFuture<V>(future);
411  }
412
413  /**
414   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
415   *
416   * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)}) when
417   * the pipeline is done, even if the pipeline is canceled or fails.
418   *
419   * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its
420   * value in order to close it.
421   *
422   * @param future the future to create the {@code ClosingFuture} from. For discussion of the
423   *     future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Object,
424   *     Executor)}.
425   * @param closingExecutor the future's result will be closed on this executor
426   * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the
427   *     underlying value may never be closed if the {@link Future} is canceled after its operation
428   *     begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types,
429   *     including those that pass them to this method, with {@link #submit(ClosingCallable,
430   *     Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a
431   *     {@link ListenableFuture} that doesn't create values that should be closed, use {@link
432   *     ClosingFuture#from}.
433   */
434  @Deprecated
435  // TODO(b/163345357): Widen bound to AutoCloseable once we require API Level 19.
436  public static <C extends @Nullable Object & @Nullable Closeable>
437      ClosingFuture<C> eventuallyClosing(
438          ListenableFuture<C> future, final Executor closingExecutor) {
439    checkNotNull(closingExecutor);
440    final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future));
441    Futures.addCallback(
442        future,
443        new FutureCallback<@Nullable Closeable>() {
444          @Override
445          public void onSuccess(@CheckForNull Closeable result) {
446            closingFuture.closeables.closer.eventuallyClose(result, closingExecutor);
447          }
448
449          @Override
450          public void onFailure(Throwable t) {}
451        },
452        directExecutor());
453    return closingFuture;
454  }
455
456  /**
457   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
458   *
459   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
460   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
461   */
462  public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) {
463    return new Combiner(false, futures);
464  }
465
466  /**
467   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
468   *
469   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
470   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
471   */
472  public static Combiner whenAllComplete(
473      ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) {
474    return whenAllComplete(asList(future1, moreFutures));
475  }
476
477  /**
478   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
479   * all succeed. If any fail, the resulting pipeline will fail.
480   *
481   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
482   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
483   */
484  public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) {
485    return new Combiner(true, futures);
486  }
487
488  /**
489   * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming
490   * they all succeed. If any fail, the resulting pipeline will fail.
491   *
492   * <p>Calling this method allows you to use lambdas or method references typed with the types of
493   * the input {@link ClosingFuture}s.
494   *
495   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
496   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
497   */
498  public static <V1 extends @Nullable Object, V2 extends @Nullable Object>
499      Combiner2<V1, V2> whenAllSucceed(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
500    return new Combiner2<>(future1, future2);
501  }
502
503  /**
504   * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming
505   * they all succeed. If any fail, the resulting pipeline will fail.
506   *
507   * <p>Calling this method allows you to use lambdas or method references typed with the types of
508   * the input {@link ClosingFuture}s.
509   *
510   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
511   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
512   */
513  public static <
514          V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object>
515      Combiner3<V1, V2, V3> whenAllSucceed(
516          ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
517    return new Combiner3<>(future1, future2, future3);
518  }
519
520  /**
521   * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming
522   * they all succeed. If any fail, the resulting pipeline will fail.
523   *
524   * <p>Calling this method allows you to use lambdas or method references typed with the types of
525   * the input {@link ClosingFuture}s.
526   *
527   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
528   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
529   */
530  public static <
531          V1 extends @Nullable Object,
532          V2 extends @Nullable Object,
533          V3 extends @Nullable Object,
534          V4 extends @Nullable Object>
535      Combiner4<V1, V2, V3, V4> whenAllSucceed(
536          ClosingFuture<V1> future1,
537          ClosingFuture<V2> future2,
538          ClosingFuture<V3> future3,
539          ClosingFuture<V4> future4) {
540    return new Combiner4<>(future1, future2, future3, future4);
541  }
542
543  /**
544   * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming
545   * they all succeed. If any fail, the resulting pipeline will fail.
546   *
547   * <p>Calling this method allows you to use lambdas or method references typed with the types of
548   * the input {@link ClosingFuture}s.
549   *
550   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
551   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
552   */
553  public static <
554          V1 extends @Nullable Object,
555          V2 extends @Nullable Object,
556          V3 extends @Nullable Object,
557          V4 extends @Nullable Object,
558          V5 extends @Nullable Object>
559      Combiner5<V1, V2, V3, V4, V5> whenAllSucceed(
560          ClosingFuture<V1> future1,
561          ClosingFuture<V2> future2,
562          ClosingFuture<V3> future3,
563          ClosingFuture<V4> future4,
564          ClosingFuture<V5> future5) {
565    return new Combiner5<>(future1, future2, future3, future4, future5);
566  }
567
568  /**
569   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
570   * all succeed. If any fail, the resulting pipeline will fail.
571   *
572   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
573   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
574   */
575  public static Combiner whenAllSucceed(
576      ClosingFuture<?> future1,
577      ClosingFuture<?> future2,
578      ClosingFuture<?> future3,
579      ClosingFuture<?> future4,
580      ClosingFuture<?> future5,
581      ClosingFuture<?> future6,
582      ClosingFuture<?>... moreFutures) {
583    return whenAllSucceed(
584        FluentIterable.of(future1, future2, future3, future4, future5, future6)
585            .append(moreFutures));
586  }
587
588  private final AtomicReference<State> state = new AtomicReference<>(OPEN);
589  private final CloseableList closeables = new CloseableList();
590  private final FluentFuture<V> future;
591
592  private ClosingFuture(ListenableFuture<V> future) {
593    this.future = FluentFuture.from(future);
594  }
595
596  private ClosingFuture(final ClosingCallable<V> callable, Executor executor) {
597    checkNotNull(callable);
598    TrustedListenableFutureTask<V> task =
599        TrustedListenableFutureTask.create(
600            new Callable<V>() {
601              @Override
602              @ParametricNullness
603              public V call() throws Exception {
604                return callable.call(closeables.closer);
605              }
606
607              @Override
608              public String toString() {
609                return callable.toString();
610              }
611            });
612    executor.execute(task);
613    this.future = task;
614  }
615
616  private ClosingFuture(final AsyncClosingCallable<V> callable, Executor executor) {
617    checkNotNull(callable);
618    TrustedListenableFutureTask<V> task =
619        TrustedListenableFutureTask.create(
620            new AsyncCallable<V>() {
621              @Override
622              public ListenableFuture<V> call() throws Exception {
623                CloseableList newCloseables = new CloseableList();
624                try {
625                  ClosingFuture<V> closingFuture = callable.call(newCloseables.closer);
626                  closingFuture.becomeSubsumedInto(closeables);
627                  return closingFuture.future;
628                } finally {
629                  closeables.add(newCloseables, directExecutor());
630                }
631              }
632
633              @Override
634              public String toString() {
635                return callable.toString();
636              }
637            });
638    executor.execute(task);
639    this.future = task;
640  }
641
642  /**
643   * Returns a future that finishes when this step does. Calling {@code get()} on the returned
644   * future returns {@code null} if the step is successful or throws the same exception that would
645   * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code
646   * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline.
647   *
648   * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls
649   * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or
650   * a derivation method <i>on the same instance</i>. This is important because calling {@code
651   * statusFuture} alone does not provide a way to close the pipeline.
652   */
653  public ListenableFuture<?> statusFuture() {
654    return nonCancellationPropagating(future.transform(constant(null), directExecutor()));
655  }
656
657  /**
658   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
659   * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed
660   * when the pipeline is done.
661   *
662   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
663   * ClosingFuture} will be equivalent to this one.
664   *
665   * <p>If the function throws an exception, that exception is used as the result of the derived
666   * {@code ClosingFuture}.
667   *
668   * <p>Example usage:
669   *
670   * <pre>{@code
671   * ClosingFuture<List<Row>> rowsFuture =
672   *     queryFuture.transform((closer, result) -> result.getRows(), executor);
673   * }</pre>
674   *
675   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
676   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
677   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
678   *
679   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
680   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
681   * this {@code ClosingFuture}.
682   *
683   * @param function transforms the value of this step to the value of the derived step
684   * @param executor executor to run the function in
685   * @return the derived step
686   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
687   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
688   *     finished}
689   */
690  public <U extends @Nullable Object> ClosingFuture<U> transform(
691      final ClosingFunction<? super V, U> function, Executor executor) {
692    checkNotNull(function);
693    AsyncFunction<V, U> applyFunction =
694        new AsyncFunction<V, U>() {
695          @Override
696          public ListenableFuture<U> apply(V input) throws Exception {
697            return closeables.applyClosingFunction(function, input);
698          }
699
700          @Override
701          public String toString() {
702            return function.toString();
703          }
704        };
705    // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function).
706    return derive(future.transformAsync(applyFunction, executor));
707  }
708
709  /**
710   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
711   * that returns a {@code ClosingFuture} to its value. The function can use a {@link
712   * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
713   * captured by the returned {@link ClosingFuture}).
714   *
715   * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one
716   * returned by the function.
717   *
718   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
719   * ClosingFuture} will be equivalent to this one.
720   *
721   * <p>If the function throws an exception, that exception is used as the result of the derived
722   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
723   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
724   * closed.
725   *
726   * <p>Usage guidelines for this method:
727   *
728   * <ul>
729   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
730   *       {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction,
731   *       Executor)} instead, with a function that returns the next value directly.
732   *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
733   *       for every closeable object this step creates in order to capture it for later closing.
734   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
735   *       ClosingFuture} call {@link #from(ListenableFuture)}.
736   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
737   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
738   *       {@link #withoutCloser(AsyncFunction)}
739   * </ul>
740   *
741   * <p>Example usage:
742   *
743   * <pre>{@code
744   * // Result.getRowsClosingFuture() returns a ClosingFuture.
745   * ClosingFuture<List<Row>> rowsFuture =
746   *     queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor);
747   *
748   * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the
749   * // number of written rows. openOutputFile() returns a FileOutputStream (which implements
750   * // Closeable).
751   * ClosingFuture<Integer> rowsFuture2 =
752   *     queryFuture.transformAsync(
753   *         (closer, result) -> {
754   *           FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor);
755   *           return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos));
756   *      },
757   *      executor);
758   *
759   * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created).
760   * ClosingFuture<List<Row>> rowsFuture3 =
761   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
762   *
763   * }</pre>
764   *
765   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
766   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
767   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
768   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
769   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
770   * responsible for completing the returned {@code ClosingFuture}.)
771   *
772   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
773   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
774   * this {@code ClosingFuture}.
775   *
776   * @param function transforms the value of this step to a {@code ClosingFuture} with the value of
777   *     the derived step
778   * @param executor executor to run the function in
779   * @return the derived step
780   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
781   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
782   *     finished}
783   */
784  public <U extends @Nullable Object> ClosingFuture<U> transformAsync(
785      final AsyncClosingFunction<? super V, U> function, Executor executor) {
786    checkNotNull(function);
787    AsyncFunction<V, U> applyFunction =
788        new AsyncFunction<V, U>() {
789          @Override
790          public ListenableFuture<U> apply(V input) throws Exception {
791            return closeables.applyAsyncClosingFunction(function, input);
792          }
793
794          @Override
795          public String toString() {
796            return function.toString();
797          }
798        };
799    return derive(future.transformAsync(applyFunction, executor));
800  }
801
802  /**
803   * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input,
804   * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned
805   * {@link ListenableFuture}.
806   *
807   * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction,
808   * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it
809   * meets these conditions:
810   *
811   * <ul>
812   *   <li>It does not need to capture any {@link Closeable} objects by calling {@link
813   *       DeferredCloser#eventuallyClose(Object, Executor)}.
814   *   <li>It returns a {@link ListenableFuture}.
815   * </ul>
816   *
817   * <p>Example usage:
818   *
819   * <pre>{@code
820   * // Result.getRowsFuture() returns a ListenableFuture.
821   * ClosingFuture<List<Row>> rowsFuture =
822   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
823   * }</pre>
824   *
825   * @param function transforms the value of a {@code ClosingFuture} step to a {@link
826   *     ListenableFuture} with the value of a derived step
827   */
828  public static <V extends @Nullable Object, U extends @Nullable Object>
829      AsyncClosingFunction<V, U> withoutCloser(final AsyncFunction<V, U> function) {
830    checkNotNull(function);
831    return new AsyncClosingFunction<V, U>() {
832      @Override
833      public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception {
834        return ClosingFuture.from(function.apply(input));
835      }
836    };
837  }
838
839  /**
840   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
841   * to its exception if it is an instance of a given exception type. The function can use a {@link
842   * DeferredCloser} to capture objects to be closed when the pipeline is done.
843   *
844   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
845   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
846   * one.
847   *
848   * <p>If the function throws an exception, that exception is used as the result of the derived
849   * {@code ClosingFuture}.
850   *
851   * <p>Example usage:
852   *
853   * <pre>{@code
854   * ClosingFuture<QueryResult> queryFuture =
855   *     queryFuture.catching(
856   *         QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor);
857   * }</pre>
858   *
859   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
860   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
861   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
862   *
863   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
864   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
865   * this {@code ClosingFuture}.
866   *
867   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
868   *     type is matched against this step's exception. "This step's exception" means the cause of
869   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
870   *     underlying this step or, if {@code get()} throws a different kind of exception, that
871   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
872   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
873   * @param fallback the function to be called if this step fails with the expected exception type.
874   *     The function's argument is this step's exception. "This step's exception" means the cause
875   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
876   *     underlying this step or, if {@code get()} throws a different kind of exception, that
877   *     exception itself.
878   * @param executor the executor that runs {@code fallback} if the input fails
879   */
880  public <X extends Throwable> ClosingFuture<V> catching(
881      Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) {
882    return catchingMoreGeneric(exceptionType, fallback, executor);
883  }
884
885  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
886  private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric(
887      Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) {
888    checkNotNull(fallback);
889    AsyncFunction<X, W> applyFallback =
890        new AsyncFunction<X, W>() {
891          @Override
892          public ListenableFuture<W> apply(X exception) throws Exception {
893            return closeables.applyClosingFunction(fallback, exception);
894          }
895
896          @Override
897          public String toString() {
898            return fallback.toString();
899          }
900        };
901    // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function).
902    return derive(future.catchingAsync(exceptionType, applyFallback, executor));
903  }
904
905  /**
906   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
907   * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception
908   * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the
909   * pipeline is done (other than those captured by the returned {@link ClosingFuture}).
910   *
911   * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code
912   * ClosingFuture} will be equivalent to the one returned by the function.
913   *
914   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
915   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
916   * one.
917   *
918   * <p>If the function throws an exception, that exception is used as the result of the derived
919   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
920   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
921   * closed.
922   *
923   * <p>Usage guidelines for this method:
924   *
925   * <ul>
926   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
927   *       {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class,
928   *       ClosingFunction, Executor)} instead, with a function that returns the next value
929   *       directly.
930   *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
931   *       for every closeable object this step creates in order to capture it for later closing.
932   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
933   *       ClosingFuture} call {@link #from(ListenableFuture)}.
934   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
935   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
936   *       {@link #withoutCloser(AsyncFunction)}
937   * </ul>
938   *
939   * <p>Example usage:
940   *
941   * <pre>{@code
942   * // Fall back to a secondary input stream in case of IOException.
943   * ClosingFuture<InputStream> inputFuture =
944   *     firstInputFuture.catchingAsync(
945   *         IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor);
946   * }
947   * }</pre>
948   *
949   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
950   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
951   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
952   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
953   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
954   * responsible for completing the returned {@code ClosingFuture}.)
955   *
956   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
957   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
958   * this {@code ClosingFuture}.
959   *
960   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
961   *     type is matched against this step's exception. "This step's exception" means the cause of
962   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
963   *     underlying this step or, if {@code get()} throws a different kind of exception, that
964   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
965   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
966   * @param fallback the function to be called if this step fails with the expected exception type.
967   *     The function's argument is this step's exception. "This step's exception" means the cause
968   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
969   *     underlying this step or, if {@code get()} throws a different kind of exception, that
970   *     exception itself.
971   * @param executor the executor that runs {@code fallback} if the input fails
972   */
973  // TODO(dpb): Should this do something special if the function throws CancellationException or
974  // ExecutionException?
975  public <X extends Throwable> ClosingFuture<V> catchingAsync(
976      Class<X> exceptionType,
977      AsyncClosingFunction<? super X, ? extends V> fallback,
978      Executor executor) {
979    return catchingAsyncMoreGeneric(exceptionType, fallback, executor);
980  }
981
982  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
983  private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric(
984      Class<X> exceptionType,
985      final AsyncClosingFunction<? super X, W> fallback,
986      Executor executor) {
987    checkNotNull(fallback);
988    AsyncFunction<X, W> asyncFunction =
989        new AsyncFunction<X, W>() {
990          @Override
991          public ListenableFuture<W> apply(X exception) throws Exception {
992            return closeables.applyAsyncClosingFunction(fallback, exception);
993          }
994
995          @Override
996          public String toString() {
997            return fallback.toString();
998          }
999        };
1000    return derive(future.catchingAsync(exceptionType, asyncFunction, executor));
1001  }
1002
1003  /**
1004   * Marks this step as the last step in the {@code ClosingFuture} pipeline.
1005   *
1006   * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when
1007   * the pipeline is cancelled.
1008   *
1009   * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously
1010   * <b>after</b> the returned {@code Future} is done: the future completes before closing starts,
1011   * rather than once it has finished.
1012   *
1013   * <p>After calling this method, you may not call {@link
1014   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other
1015   * derivation method on this {@code ClosingFuture}.
1016   *
1017   * @return a {@link Future} that represents the final value or exception of the pipeline
1018   */
1019  public FluentFuture<V> finishToFuture() {
1020    if (compareAndUpdateState(OPEN, WILL_CLOSE)) {
1021      logger.log(FINER, "will close {0}", this);
1022      future.addListener(
1023          new Runnable() {
1024            @Override
1025            public void run() {
1026              checkAndUpdateState(WILL_CLOSE, CLOSING);
1027              close();
1028              checkAndUpdateState(CLOSING, CLOSED);
1029            }
1030          },
1031          directExecutor());
1032    } else {
1033      switch (state.get()) {
1034        case SUBSUMED:
1035          throw new IllegalStateException(
1036              "Cannot call finishToFuture() after deriving another step");
1037
1038        case WILL_CREATE_VALUE_AND_CLOSER:
1039          throw new IllegalStateException(
1040              "Cannot call finishToFuture() after calling finishToValueAndCloser()");
1041
1042        case WILL_CLOSE:
1043        case CLOSING:
1044        case CLOSED:
1045          throw new IllegalStateException("Cannot call finishToFuture() twice");
1046
1047        case OPEN:
1048          throw new AssertionError();
1049      }
1050    }
1051    return future;
1052  }
1053
1054  /**
1055   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done,
1056   * {@code receiver} will be called with an object that contains the result of the operation. The
1057   * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use.
1058   *
1059   * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or
1060   * any other derivation method on this {@code ClosingFuture}.
1061   *
1062   * @param consumer a callback whose method will be called (using {@code executor}) when this
1063   *     operation is done
1064   */
1065  public void finishToValueAndCloser(
1066      final ValueAndCloserConsumer<? super V> consumer, Executor executor) {
1067    checkNotNull(consumer);
1068    if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) {
1069      switch (state.get()) {
1070        case SUBSUMED:
1071          throw new IllegalStateException(
1072              "Cannot call finishToValueAndCloser() after deriving another step");
1073
1074        case WILL_CLOSE:
1075        case CLOSING:
1076        case CLOSED:
1077          throw new IllegalStateException(
1078              "Cannot call finishToValueAndCloser() after calling finishToFuture()");
1079
1080        case WILL_CREATE_VALUE_AND_CLOSER:
1081          throw new IllegalStateException("Cannot call finishToValueAndCloser() twice");
1082
1083        case OPEN:
1084          break;
1085      }
1086      throw new AssertionError(state);
1087    }
1088    future.addListener(
1089        new Runnable() {
1090          @Override
1091          public void run() {
1092            provideValueAndCloser(consumer, ClosingFuture.this);
1093          }
1094        },
1095        executor);
1096  }
1097
1098  private static <C extends @Nullable Object, V extends C> void provideValueAndCloser(
1099      ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) {
1100    consumer.accept(new ValueAndCloser<C>(closingFuture));
1101  }
1102
1103  /**
1104   * Attempts to cancel execution of this step. This attempt will fail if the step has already
1105   * completed, has already been cancelled, or could not be cancelled for some other reason. If
1106   * successful, and this step has not started when {@code cancel} is called, this step should never
1107   * run.
1108   *
1109   * <p>If successful, causes the objects captured by this step (if already started) and its input
1110   * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls
1111   * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously.
1112   *
1113   * @param mayInterruptIfRunning {@code true} if the thread executing this task should be
1114   *     interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be
1115   *     cancelled regardless
1116   * @return {@code false} if the step could not be cancelled, typically because it has already
1117   *     completed normally; {@code true} otherwise
1118   */
1119  @CanIgnoreReturnValue
1120  public boolean cancel(boolean mayInterruptIfRunning) {
1121    logger.log(FINER, "cancelling {0}", this);
1122    boolean cancelled = future.cancel(mayInterruptIfRunning);
1123    if (cancelled) {
1124      close();
1125    }
1126    return cancelled;
1127  }
1128
1129  private void close() {
1130    logger.log(FINER, "closing {0}", this);
1131    closeables.close();
1132  }
1133
1134  private <U extends @Nullable Object> ClosingFuture<U> derive(FluentFuture<U> future) {
1135    ClosingFuture<U> derived = new ClosingFuture<>(future);
1136    becomeSubsumedInto(derived.closeables);
1137    return derived;
1138  }
1139
1140  private void becomeSubsumedInto(CloseableList otherCloseables) {
1141    checkAndUpdateState(OPEN, SUBSUMED);
1142    otherCloseables.add(closeables, directExecutor());
1143  }
1144
1145  /**
1146   * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link
1147   * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}.
1148   *
1149   * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object.
1150   */
1151  public static final class Peeker {
1152    private final ImmutableList<ClosingFuture<?>> futures;
1153    private volatile boolean beingCalled;
1154
1155    private Peeker(ImmutableList<ClosingFuture<?>> futures) {
1156      this.futures = checkNotNull(futures);
1157    }
1158
1159    /**
1160     * Returns the value of {@code closingFuture}.
1161     *
1162     * @throws ExecutionException if {@code closingFuture} is a failed step
1163     * @throws CancellationException if the {@code closingFuture}'s future was cancelled
1164     * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to
1165     *     {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)}
1166     * @throws IllegalStateException if called outside of a call to {@link
1167     *     CombiningCallable#call(DeferredCloser, Peeker)} or {@link
1168     *     AsyncCombiningCallable#call(DeferredCloser, Peeker)}
1169     */
1170    @ParametricNullness
1171    public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture)
1172        throws ExecutionException {
1173      checkState(beingCalled);
1174      checkArgument(futures.contains(closingFuture));
1175      return Futures.getDone(closingFuture.future);
1176    }
1177
1178    @ParametricNullness
1179    private <V extends @Nullable Object> V call(
1180        CombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1181      beingCalled = true;
1182      CloseableList newCloseables = new CloseableList();
1183      try {
1184        return combiner.call(newCloseables.closer, this);
1185      } finally {
1186        closeables.add(newCloseables, directExecutor());
1187        beingCalled = false;
1188      }
1189    }
1190
1191    private <V extends @Nullable Object> FluentFuture<V> callAsync(
1192        AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1193      beingCalled = true;
1194      CloseableList newCloseables = new CloseableList();
1195      try {
1196        ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this);
1197        closingFuture.becomeSubsumedInto(closeables);
1198        return closingFuture.future;
1199      } finally {
1200        closeables.add(newCloseables, directExecutor());
1201        beingCalled = false;
1202      }
1203    }
1204  }
1205
1206  /**
1207   * A builder of a {@link ClosingFuture} step that is derived from more than one input step.
1208   *
1209   * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to
1210   * instantiate this class.
1211   *
1212   * <p>Example:
1213   *
1214   * <pre>{@code
1215   * final ClosingFuture<BufferedReader> file1ReaderFuture = ...;
1216   * final ClosingFuture<BufferedReader> file2ReaderFuture = ...;
1217   * ListenableFuture<Integer> numberOfDifferentLines =
1218   *       ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture)
1219   *           .call(
1220   *               (closer, peeker) -> {
1221   *                 BufferedReader file1Reader = peeker.getDone(file1ReaderFuture);
1222   *                 BufferedReader file2Reader = peeker.getDone(file2ReaderFuture);
1223   *                 return countDifferentLines(file1Reader, file2Reader);
1224   *               },
1225   *               executor)
1226   *           .closing(executor);
1227   * }</pre>
1228   */
1229  // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8.
1230  @com.google.errorprone.annotations.DoNotMock(
1231      "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.")
1232  public static class Combiner {
1233
1234    private final CloseableList closeables = new CloseableList();
1235
1236    /**
1237     * An operation that returns a result and may throw an exception.
1238     *
1239     * @param <V> the type of the result
1240     */
1241    public interface CombiningCallable<V extends @Nullable Object> {
1242      /**
1243       * Computes a result, or throws an exception if unable to do so.
1244       *
1245       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1246       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1247       * (but not before this method completes), even if this method throws or the pipeline is
1248       * cancelled.
1249       *
1250       * @param peeker used to get the value of any of the input futures
1251       */
1252      @ParametricNullness
1253      V call(DeferredCloser closer, Peeker peeker) throws Exception;
1254    }
1255
1256    /**
1257     * An operation that returns a {@link ClosingFuture} result and may throw an exception.
1258     *
1259     * @param <V> the type of the result
1260     */
1261    public interface AsyncCombiningCallable<V extends @Nullable Object> {
1262      /**
1263       * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so.
1264       *
1265       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1266       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1267       * (but not before this method completes), even if this method throws or the pipeline is
1268       * cancelled.
1269       *
1270       * @param peeker used to get the value of any of the input futures
1271       */
1272      ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception;
1273    }
1274
1275    private final boolean allMustSucceed;
1276    protected final ImmutableList<ClosingFuture<?>> inputs;
1277
1278    private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) {
1279      this.allMustSucceed = allMustSucceed;
1280      this.inputs = ImmutableList.copyOf(inputs);
1281      for (ClosingFuture<?> input : inputs) {
1282        input.becomeSubsumedInto(closeables);
1283      }
1284    }
1285
1286    /**
1287     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1288     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1289     * objects to be closed when the pipeline is done.
1290     *
1291     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1292     * fail, so will the returned step.
1293     *
1294     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1295     * cancelled.
1296     *
1297     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1298     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1299     */
1300    public <V extends @Nullable Object> ClosingFuture<V> call(
1301        final CombiningCallable<V> combiningCallable, Executor executor) {
1302      Callable<V> callable =
1303          new Callable<V>() {
1304            @Override
1305            @ParametricNullness
1306            public V call() throws Exception {
1307              return new Peeker(inputs).call(combiningCallable, closeables);
1308            }
1309
1310            @Override
1311            public String toString() {
1312              return combiningCallable.toString();
1313            }
1314          };
1315      ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor));
1316      derived.closeables.add(closeables, directExecutor());
1317      return derived;
1318    }
1319
1320    /**
1321     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1322     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1323     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1324     * captured by the returned {@link ClosingFuture}).
1325     *
1326     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1327     * fail, so will the returned step.
1328     *
1329     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1330     * cancelled.
1331     *
1332     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1333     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1334     *
1335     * <p>If the combiningCallable throws any other exception, it will be used as the failure of the
1336     * derived step.
1337     *
1338     * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture},
1339     * then none of the closeable objects in that {@code ClosingFuture} will be closed.
1340     *
1341     * <p>Usage guidelines for this method:
1342     *
1343     * <ul>
1344     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1345     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1346     *       Executor)} instead, with a function that returns the next value directly.
1347     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1348     *       for every closeable object this step creates in order to capture it for later closing.
1349     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1350     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1351     * </ul>
1352     *
1353     * <p>The same warnings about doing heavyweight operations within {@link
1354     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1355     */
1356    public <V extends @Nullable Object> ClosingFuture<V> callAsync(
1357        final AsyncCombiningCallable<V> combiningCallable, Executor executor) {
1358      AsyncCallable<V> asyncCallable =
1359          new AsyncCallable<V>() {
1360            @Override
1361            public ListenableFuture<V> call() throws Exception {
1362              return new Peeker(inputs).callAsync(combiningCallable, closeables);
1363            }
1364
1365            @Override
1366            public String toString() {
1367              return combiningCallable.toString();
1368            }
1369          };
1370      ClosingFuture<V> derived =
1371          new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor));
1372      derived.closeables.add(closeables, directExecutor());
1373      return derived;
1374    }
1375
1376    private FutureCombiner<@Nullable Object> futureCombiner() {
1377      return allMustSucceed
1378          ? Futures.whenAllSucceed(inputFutures())
1379          : Futures.whenAllComplete(inputFutures());
1380    }
1381
1382
1383    private ImmutableList<FluentFuture<?>> inputFutures() {
1384      return FluentIterable.from(inputs)
1385          .<FluentFuture<?>>transform(future -> future.future)
1386          .toList();
1387    }
1388  }
1389
1390  /**
1391   * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link
1392   * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this
1393   * combination.
1394   *
1395   * @param <V1> the type returned by the first future
1396   * @param <V2> the type returned by the second future
1397   */
1398  public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object>
1399      extends Combiner {
1400
1401    /**
1402     * A function that returns a value when applied to the values of the two futures passed to
1403     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1404     *
1405     * @param <V1> the type returned by the first future
1406     * @param <V2> the type returned by the second future
1407     * @param <U> the type returned by the function
1408     */
1409    public interface ClosingFunction2<
1410        V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> {
1411
1412      /**
1413       * Applies this function to two inputs, or throws an exception if unable to do so.
1414       *
1415       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1416       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1417       * (but not before this method completes), even if this method throws or the pipeline is
1418       * cancelled.
1419       */
1420      @ParametricNullness
1421      U apply(DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2)
1422          throws Exception;
1423    }
1424
1425    /**
1426     * A function that returns a {@link ClosingFuture} when applied to the values of the two futures
1427     * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1428     *
1429     * @param <V1> the type returned by the first future
1430     * @param <V2> the type returned by the second future
1431     * @param <U> the type returned by the function
1432     */
1433    public interface AsyncClosingFunction2<
1434        V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> {
1435
1436      /**
1437       * Applies this function to two inputs, or throws an exception if unable to do so.
1438       *
1439       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1440       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1441       * (but not before this method completes), even if this method throws or the pipeline is
1442       * cancelled.
1443       */
1444      ClosingFuture<U> apply(
1445          DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2)
1446          throws Exception;
1447    }
1448
1449    private final ClosingFuture<V1> future1;
1450    private final ClosingFuture<V2> future2;
1451
1452    private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
1453      super(true, ImmutableList.of(future1, future2));
1454      this.future1 = future1;
1455      this.future2 = future2;
1456    }
1457
1458    /**
1459     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1460     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1461     * objects to be closed when the pipeline is done.
1462     *
1463     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1464     * any of the inputs fail, so will the returned step.
1465     *
1466     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1467     *
1468     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1469     * ExecutionException} will be extracted and used as the failure of the derived step.
1470     */
1471    public <U extends @Nullable Object> ClosingFuture<U> call(
1472        final ClosingFunction2<V1, V2, U> function, Executor executor) {
1473      return call(
1474          new CombiningCallable<U>() {
1475            @Override
1476            @ParametricNullness
1477            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1478              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1479            }
1480
1481            @Override
1482            public String toString() {
1483              return function.toString();
1484            }
1485          },
1486          executor);
1487    }
1488
1489    /**
1490     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1491     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1492     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1493     * captured by the returned {@link ClosingFuture}).
1494     *
1495     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1496     * any of the inputs fail, so will the returned step.
1497     *
1498     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1499     *
1500     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1501     * ExecutionException} will be extracted and used as the failure of the derived step.
1502     *
1503     * <p>If the function throws any other exception, it will be used as the failure of the derived
1504     * step.
1505     *
1506     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1507     * the closeable objects in that {@code ClosingFuture} will be closed.
1508     *
1509     * <p>Usage guidelines for this method:
1510     *
1511     * <ul>
1512     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1513     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1514     *       Executor)} instead, with a function that returns the next value directly.
1515     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1516     *       for every closeable object this step creates in order to capture it for later closing.
1517     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1518     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1519     * </ul>
1520     *
1521     * <p>The same warnings about doing heavyweight operations within {@link
1522     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1523     */
1524    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1525        final AsyncClosingFunction2<V1, V2, U> function, Executor executor) {
1526      return callAsync(
1527          new AsyncCombiningCallable<U>() {
1528            @Override
1529            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1530              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1531            }
1532
1533            @Override
1534            public String toString() {
1535              return function.toString();
1536            }
1537          },
1538          executor);
1539    }
1540  }
1541
1542  /**
1543   * A generic {@link Combiner} that lets you use a lambda or method reference to combine three
1544   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1545   * ClosingFuture)} to start this combination.
1546   *
1547   * @param <V1> the type returned by the first future
1548   * @param <V2> the type returned by the second future
1549   * @param <V3> the type returned by the third future
1550   */
1551  public static final class Combiner3<
1552          V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object>
1553      extends Combiner {
1554    /**
1555     * A function that returns a value when applied to the values of the three futures passed to
1556     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1557     *
1558     * @param <V1> the type returned by the first future
1559     * @param <V2> the type returned by the second future
1560     * @param <V3> the type returned by the third future
1561     * @param <U> the type returned by the function
1562     */
1563    public interface ClosingFunction3<
1564        V1 extends @Nullable Object,
1565        V2 extends @Nullable Object,
1566        V3 extends @Nullable Object,
1567        U extends @Nullable Object> {
1568      /**
1569       * Applies this function to three inputs, or throws an exception if unable to do so.
1570       *
1571       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1572       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1573       * (but not before this method completes), even if this method throws or the pipeline is
1574       * cancelled.
1575       */
1576      @ParametricNullness
1577      U apply(
1578          DeferredCloser closer,
1579          @ParametricNullness V1 value1,
1580          @ParametricNullness V2 value2,
1581          @ParametricNullness V3 value3)
1582          throws Exception;
1583    }
1584
1585    /**
1586     * A function that returns a {@link ClosingFuture} when applied to the values of the three
1587     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1588     *
1589     * @param <V1> the type returned by the first future
1590     * @param <V2> the type returned by the second future
1591     * @param <V3> the type returned by the third future
1592     * @param <U> the type returned by the function
1593     */
1594    public interface AsyncClosingFunction3<
1595        V1 extends @Nullable Object,
1596        V2 extends @Nullable Object,
1597        V3 extends @Nullable Object,
1598        U extends @Nullable Object> {
1599      /**
1600       * Applies this function to three inputs, or throws an exception if unable to do so.
1601       *
1602       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1603       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1604       * (but not before this method completes), even if this method throws or the pipeline is
1605       * cancelled.
1606       */
1607      ClosingFuture<U> apply(
1608          DeferredCloser closer,
1609          @ParametricNullness V1 value1,
1610          @ParametricNullness V2 value2,
1611          @ParametricNullness V3 value3)
1612          throws Exception;
1613    }
1614
1615    private final ClosingFuture<V1> future1;
1616    private final ClosingFuture<V2> future2;
1617    private final ClosingFuture<V3> future3;
1618
1619    private Combiner3(
1620        ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
1621      super(true, ImmutableList.of(future1, future2, future3));
1622      this.future1 = future1;
1623      this.future2 = future2;
1624      this.future3 = future3;
1625    }
1626
1627    /**
1628     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1629     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1630     * objects to be closed when the pipeline is done.
1631     *
1632     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1633     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1634     *
1635     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1636     *
1637     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1638     * ExecutionException} will be extracted and used as the failure of the derived step.
1639     */
1640    public <U extends @Nullable Object> ClosingFuture<U> call(
1641        final ClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1642      return call(
1643          new CombiningCallable<U>() {
1644            @Override
1645            @ParametricNullness
1646            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1647              return function.apply(
1648                  closer,
1649                  peeker.getDone(future1),
1650                  peeker.getDone(future2),
1651                  peeker.getDone(future3));
1652            }
1653
1654            @Override
1655            public String toString() {
1656              return function.toString();
1657            }
1658          },
1659          executor);
1660    }
1661
1662    /**
1663     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1664     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1665     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1666     * captured by the returned {@link ClosingFuture}).
1667     *
1668     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1669     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1670     *
1671     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1672     *
1673     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1674     * ExecutionException} will be extracted and used as the failure of the derived step.
1675     *
1676     * <p>If the function throws any other exception, it will be used as the failure of the derived
1677     * step.
1678     *
1679     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1680     * the closeable objects in that {@code ClosingFuture} will be closed.
1681     *
1682     * <p>Usage guidelines for this method:
1683     *
1684     * <ul>
1685     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1686     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1687     *       Executor)} instead, with a function that returns the next value directly.
1688     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1689     *       for every closeable object this step creates in order to capture it for later closing.
1690     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1691     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1692     * </ul>
1693     *
1694     * <p>The same warnings about doing heavyweight operations within {@link
1695     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1696     */
1697    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1698        final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1699      return callAsync(
1700          new AsyncCombiningCallable<U>() {
1701            @Override
1702            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1703              return function.apply(
1704                  closer,
1705                  peeker.getDone(future1),
1706                  peeker.getDone(future2),
1707                  peeker.getDone(future3));
1708            }
1709
1710            @Override
1711            public String toString() {
1712              return function.toString();
1713            }
1714          },
1715          executor);
1716    }
1717  }
1718
1719  /**
1720   * A generic {@link Combiner} that lets you use a lambda or method reference to combine four
1721   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1722   * ClosingFuture)} to start this combination.
1723   *
1724   * @param <V1> the type returned by the first future
1725   * @param <V2> the type returned by the second future
1726   * @param <V3> the type returned by the third future
1727   * @param <V4> the type returned by the fourth future
1728   */
1729  public static final class Combiner4<
1730          V1 extends @Nullable Object,
1731          V2 extends @Nullable Object,
1732          V3 extends @Nullable Object,
1733          V4 extends @Nullable Object>
1734      extends Combiner {
1735    /**
1736     * A function that returns a value when applied to the values of the four futures passed to
1737     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}.
1738     *
1739     * @param <V1> the type returned by the first future
1740     * @param <V2> the type returned by the second future
1741     * @param <V3> the type returned by the third future
1742     * @param <V4> the type returned by the fourth future
1743     * @param <U> the type returned by the function
1744     */
1745    public interface ClosingFunction4<
1746        V1 extends @Nullable Object,
1747        V2 extends @Nullable Object,
1748        V3 extends @Nullable Object,
1749        V4 extends @Nullable Object,
1750        U extends @Nullable Object> {
1751      /**
1752       * Applies this function to four inputs, or throws an exception if unable to do so.
1753       *
1754       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1755       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1756       * (but not before this method completes), even if this method throws or the pipeline is
1757       * cancelled.
1758       */
1759      @ParametricNullness
1760      U apply(
1761          DeferredCloser closer,
1762          @ParametricNullness V1 value1,
1763          @ParametricNullness V2 value2,
1764          @ParametricNullness V3 value3,
1765          @ParametricNullness V4 value4)
1766          throws Exception;
1767    }
1768
1769    /**
1770     * A function that returns a {@link ClosingFuture} when applied to the values of the four
1771     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1772     * ClosingFuture)}.
1773     *
1774     * @param <V1> the type returned by the first future
1775     * @param <V2> the type returned by the second future
1776     * @param <V3> the type returned by the third future
1777     * @param <V4> the type returned by the fourth future
1778     * @param <U> the type returned by the function
1779     */
1780    public interface AsyncClosingFunction4<
1781        V1 extends @Nullable Object,
1782        V2 extends @Nullable Object,
1783        V3 extends @Nullable Object,
1784        V4 extends @Nullable Object,
1785        U extends @Nullable Object> {
1786      /**
1787       * Applies this function to four inputs, or throws an exception if unable to do so.
1788       *
1789       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1790       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1791       * (but not before this method completes), even if this method throws or the pipeline is
1792       * cancelled.
1793       */
1794      ClosingFuture<U> apply(
1795          DeferredCloser closer,
1796          @ParametricNullness V1 value1,
1797          @ParametricNullness V2 value2,
1798          @ParametricNullness V3 value3,
1799          @ParametricNullness V4 value4)
1800          throws Exception;
1801    }
1802
1803    private final ClosingFuture<V1> future1;
1804    private final ClosingFuture<V2> future2;
1805    private final ClosingFuture<V3> future3;
1806    private final ClosingFuture<V4> future4;
1807
1808    private Combiner4(
1809        ClosingFuture<V1> future1,
1810        ClosingFuture<V2> future2,
1811        ClosingFuture<V3> future3,
1812        ClosingFuture<V4> future4) {
1813      super(true, ImmutableList.of(future1, future2, future3, future4));
1814      this.future1 = future1;
1815      this.future2 = future2;
1816      this.future3 = future3;
1817      this.future4 = future4;
1818    }
1819
1820    /**
1821     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1822     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1823     * objects to be closed when the pipeline is done.
1824     *
1825     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1826     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1827     *
1828     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1829     *
1830     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1831     * ExecutionException} will be extracted and used as the failure of the derived step.
1832     */
1833    public <U extends @Nullable Object> ClosingFuture<U> call(
1834        final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1835      return call(
1836          new CombiningCallable<U>() {
1837            @Override
1838            @ParametricNullness
1839            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1840              return function.apply(
1841                  closer,
1842                  peeker.getDone(future1),
1843                  peeker.getDone(future2),
1844                  peeker.getDone(future3),
1845                  peeker.getDone(future4));
1846            }
1847
1848            @Override
1849            public String toString() {
1850              return function.toString();
1851            }
1852          },
1853          executor);
1854    }
1855
1856    /**
1857     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1858     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1859     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1860     * captured by the returned {@link ClosingFuture}).
1861     *
1862     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1863     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1864     *
1865     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1866     *
1867     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1868     * ExecutionException} will be extracted and used as the failure of the derived step.
1869     *
1870     * <p>If the function throws any other exception, it will be used as the failure of the derived
1871     * step.
1872     *
1873     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1874     * the closeable objects in that {@code ClosingFuture} will be closed.
1875     *
1876     * <p>Usage guidelines for this method:
1877     *
1878     * <ul>
1879     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1880     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1881     *       Executor)} instead, with a function that returns the next value directly.
1882     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1883     *       for every closeable object this step creates in order to capture it for later closing.
1884     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1885     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1886     * </ul>
1887     *
1888     * <p>The same warnings about doing heavyweight operations within {@link
1889     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1890     */
1891    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1892        final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1893      return callAsync(
1894          new AsyncCombiningCallable<U>() {
1895            @Override
1896            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1897              return function.apply(
1898                  closer,
1899                  peeker.getDone(future1),
1900                  peeker.getDone(future2),
1901                  peeker.getDone(future3),
1902                  peeker.getDone(future4));
1903            }
1904
1905            @Override
1906            public String toString() {
1907              return function.toString();
1908            }
1909          },
1910          executor);
1911    }
1912  }
1913
1914  /**
1915   * A generic {@link Combiner} that lets you use a lambda or method reference to combine five
1916   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1917   * ClosingFuture, ClosingFuture)} to start this combination.
1918   *
1919   * @param <V1> the type returned by the first future
1920   * @param <V2> the type returned by the second future
1921   * @param <V3> the type returned by the third future
1922   * @param <V4> the type returned by the fourth future
1923   * @param <V5> the type returned by the fifth future
1924   */
1925  public static final class Combiner5<
1926          V1 extends @Nullable Object,
1927          V2 extends @Nullable Object,
1928          V3 extends @Nullable Object,
1929          V4 extends @Nullable Object,
1930          V5 extends @Nullable Object>
1931      extends Combiner {
1932    /**
1933     * A function that returns a value when applied to the values of the five futures passed to
1934     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture,
1935     * ClosingFuture)}.
1936     *
1937     * @param <V1> the type returned by the first future
1938     * @param <V2> the type returned by the second future
1939     * @param <V3> the type returned by the third future
1940     * @param <V4> the type returned by the fourth future
1941     * @param <V5> the type returned by the fifth future
1942     * @param <U> the type returned by the function
1943     */
1944    public interface ClosingFunction5<
1945        V1 extends @Nullable Object,
1946        V2 extends @Nullable Object,
1947        V3 extends @Nullable Object,
1948        V4 extends @Nullable Object,
1949        V5 extends @Nullable Object,
1950        U extends @Nullable Object> {
1951      /**
1952       * Applies this function to five inputs, or throws an exception if unable to do so.
1953       *
1954       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1955       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1956       * (but not before this method completes), even if this method throws or the pipeline is
1957       * cancelled.
1958       */
1959      @ParametricNullness
1960      U apply(
1961          DeferredCloser closer,
1962          @ParametricNullness V1 value1,
1963          @ParametricNullness V2 value2,
1964          @ParametricNullness V3 value3,
1965          @ParametricNullness V4 value4,
1966          @ParametricNullness V5 value5)
1967          throws Exception;
1968    }
1969
1970    /**
1971     * A function that returns a {@link ClosingFuture} when applied to the values of the five
1972     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1973     * ClosingFuture, ClosingFuture)}.
1974     *
1975     * @param <V1> the type returned by the first future
1976     * @param <V2> the type returned by the second future
1977     * @param <V3> the type returned by the third future
1978     * @param <V4> the type returned by the fourth future
1979     * @param <V5> the type returned by the fifth future
1980     * @param <U> the type returned by the function
1981     */
1982    public interface AsyncClosingFunction5<
1983        V1 extends @Nullable Object,
1984        V2 extends @Nullable Object,
1985        V3 extends @Nullable Object,
1986        V4 extends @Nullable Object,
1987        V5 extends @Nullable Object,
1988        U extends @Nullable Object> {
1989      /**
1990       * Applies this function to five inputs, or throws an exception if unable to do so.
1991       *
1992       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1993       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1994       * (but not before this method completes), even if this method throws or the pipeline is
1995       * cancelled.
1996       */
1997      ClosingFuture<U> apply(
1998          DeferredCloser closer,
1999          @ParametricNullness V1 value1,
2000          @ParametricNullness V2 value2,
2001          @ParametricNullness V3 value3,
2002          @ParametricNullness V4 value4,
2003          @ParametricNullness V5 value5)
2004          throws Exception;
2005    }
2006
2007    private final ClosingFuture<V1> future1;
2008    private final ClosingFuture<V2> future2;
2009    private final ClosingFuture<V3> future3;
2010    private final ClosingFuture<V4> future4;
2011    private final ClosingFuture<V5> future5;
2012
2013    private Combiner5(
2014        ClosingFuture<V1> future1,
2015        ClosingFuture<V2> future2,
2016        ClosingFuture<V3> future3,
2017        ClosingFuture<V4> future4,
2018        ClosingFuture<V5> future5) {
2019      super(true, ImmutableList.of(future1, future2, future3, future4, future5));
2020      this.future1 = future1;
2021      this.future2 = future2;
2022      this.future3 = future3;
2023      this.future4 = future4;
2024      this.future5 = future5;
2025    }
2026
2027    /**
2028     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2029     * combining function to their values. The function can use a {@link DeferredCloser} to capture
2030     * objects to be closed when the pipeline is done.
2031     *
2032     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2033     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2034     * returned step.
2035     *
2036     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2037     *
2038     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2039     * ExecutionException} will be extracted and used as the failure of the derived step.
2040     */
2041    public <U extends @Nullable Object> ClosingFuture<U> call(
2042        final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2043      return call(
2044          new CombiningCallable<U>() {
2045            @Override
2046            @ParametricNullness
2047            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
2048              return function.apply(
2049                  closer,
2050                  peeker.getDone(future1),
2051                  peeker.getDone(future2),
2052                  peeker.getDone(future3),
2053                  peeker.getDone(future4),
2054                  peeker.getDone(future5));
2055            }
2056
2057            @Override
2058            public String toString() {
2059              return function.toString();
2060            }
2061          },
2062          executor);
2063    }
2064
2065    /**
2066     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2067     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
2068     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
2069     * captured by the returned {@link ClosingFuture}).
2070     *
2071     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2072     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2073     * returned step.
2074     *
2075     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2076     *
2077     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2078     * ExecutionException} will be extracted and used as the failure of the derived step.
2079     *
2080     * <p>If the function throws any other exception, it will be used as the failure of the derived
2081     * step.
2082     *
2083     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
2084     * the closeable objects in that {@code ClosingFuture} will be closed.
2085     *
2086     * <p>Usage guidelines for this method:
2087     *
2088     * <ul>
2089     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
2090     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
2091     *       Executor)} instead, with a function that returns the next value directly.
2092     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
2093     *       for every closeable object this step creates in order to capture it for later closing.
2094     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
2095     *       ClosingFuture} call {@link #from(ListenableFuture)}.
2096     * </ul>
2097     *
2098     * <p>The same warnings about doing heavyweight operations within {@link
2099     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
2100     */
2101    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
2102        final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2103      return callAsync(
2104          new AsyncCombiningCallable<U>() {
2105            @Override
2106            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
2107              return function.apply(
2108                  closer,
2109                  peeker.getDone(future1),
2110                  peeker.getDone(future2),
2111                  peeker.getDone(future3),
2112                  peeker.getDone(future4),
2113                  peeker.getDone(future5));
2114            }
2115
2116            @Override
2117            public String toString() {
2118              return function.toString();
2119            }
2120          },
2121          executor);
2122    }
2123  }
2124
2125  @Override
2126  public String toString() {
2127    // TODO(dpb): Better toString, in the style of Futures.transform etc.
2128    return toStringHelper(this).add("state", state.get()).addValue(future).toString();
2129  }
2130
2131  @Override
2132  protected void finalize() {
2133    if (state.get().equals(OPEN)) {
2134      logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this);
2135      FluentFuture<V> unused = finishToFuture();
2136    }
2137  }
2138
2139  private static void closeQuietly(@CheckForNull final Closeable closeable, Executor executor) {
2140    if (closeable == null) {
2141      return;
2142    }
2143    try {
2144      executor.execute(
2145          () -> {
2146            try {
2147              closeable.close();
2148            } catch (IOException | RuntimeException e) {
2149              logger.log(WARNING, "thrown by close()", e);
2150            }
2151          });
2152    } catch (RejectedExecutionException e) {
2153      if (logger.isLoggable(WARNING)) {
2154        logger.log(
2155            WARNING, String.format("while submitting close to %s; will close inline", executor), e);
2156      }
2157      closeQuietly(closeable, directExecutor());
2158    }
2159  }
2160
2161  private void checkAndUpdateState(State oldState, State newState) {
2162    checkState(
2163        compareAndUpdateState(oldState, newState),
2164        "Expected state to be %s, but it was %s",
2165        oldState,
2166        newState);
2167  }
2168
2169  private boolean compareAndUpdateState(State oldState, State newState) {
2170    return state.compareAndSet(oldState, newState);
2171  }
2172
2173  // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap?
2174  private static final class CloseableList extends IdentityHashMap<Closeable, Executor>
2175      implements Closeable {
2176    private final DeferredCloser closer = new DeferredCloser(this);
2177    private volatile boolean closed;
2178    @CheckForNull private volatile CountDownLatch whenClosed;
2179
2180    <V extends @Nullable Object, U extends @Nullable Object>
2181        ListenableFuture<U> applyClosingFunction(
2182            ClosingFunction<? super V, U> transformation, @ParametricNullness V input)
2183            throws Exception {
2184      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2185      CloseableList newCloseables = new CloseableList();
2186      try {
2187        return immediateFuture(transformation.apply(newCloseables.closer, input));
2188      } finally {
2189        add(newCloseables, directExecutor());
2190      }
2191    }
2192
2193    <V extends @Nullable Object, U extends @Nullable Object>
2194        FluentFuture<U> applyAsyncClosingFunction(
2195            AsyncClosingFunction<V, U> transformation, @ParametricNullness V input)
2196            throws Exception {
2197      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2198      CloseableList newCloseables = new CloseableList();
2199      try {
2200        ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input);
2201        closingFuture.becomeSubsumedInto(newCloseables);
2202        return closingFuture.future;
2203      } finally {
2204        add(newCloseables, directExecutor());
2205      }
2206    }
2207
2208    @Override
2209    public void close() {
2210      if (closed) {
2211        return;
2212      }
2213      synchronized (this) {
2214        if (closed) {
2215          return;
2216        }
2217        closed = true;
2218      }
2219      for (Map.Entry<Closeable, Executor> entry : entrySet()) {
2220        closeQuietly(entry.getKey(), entry.getValue());
2221      }
2222      clear();
2223      if (whenClosed != null) {
2224        whenClosed.countDown();
2225      }
2226    }
2227
2228    void add(@CheckForNull Closeable closeable, Executor executor) {
2229      checkNotNull(executor);
2230      if (closeable == null) {
2231        return;
2232      }
2233      synchronized (this) {
2234        if (!closed) {
2235          put(closeable, executor);
2236          return;
2237        }
2238      }
2239      closeQuietly(closeable, executor);
2240    }
2241
2242    /**
2243     * Returns a latch that reaches zero when this objects' deferred closeables have been closed.
2244     */
2245    CountDownLatch whenClosedCountDown() {
2246      if (closed) {
2247        return new CountDownLatch(0);
2248      }
2249      synchronized (this) {
2250        if (closed) {
2251          return new CountDownLatch(0);
2252        }
2253        checkState(whenClosed == null);
2254        return whenClosed = new CountDownLatch(1);
2255      }
2256    }
2257  }
2258
2259  /**
2260   * Returns an object that can be used to wait until this objects' deferred closeables have all had
2261   * {@link Runnable}s that close them submitted to each one's closing {@link Executor}.
2262   */
2263  @VisibleForTesting
2264  CountDownLatch whenClosedCountDown() {
2265    return closeables.whenClosedCountDown();
2266  }
2267
2268  /** The state of a {@link CloseableList}. */
2269  enum State {
2270    /** The {@link CloseableList} has not been subsumed or closed. */
2271    OPEN,
2272
2273    /**
2274     * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed
2275     * into any other.
2276     */
2277    SUBSUMED,
2278
2279    /**
2280     * Some {@link ListenableFuture} has a callback attached that will close the {@link
2281     * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed.
2282     */
2283    WILL_CLOSE,
2284
2285    /**
2286     * The callback that closes the {@link CloseableList} is running, but it has not completed. The
2287     * {@link CloseableList} may not be subsumed.
2288     */
2289    CLOSING,
2290
2291    /** The {@link CloseableList} has been closed. It may not be further subsumed. */
2292    CLOSED,
2293
2294    /**
2295     * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been
2296     * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called.
2297     */
2298    WILL_CREATE_VALUE_AND_CLOSER,
2299  }
2300}