001/*
002 * Copyright (C) 2017 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Functions.constant;
020import static com.google.common.base.MoreObjects.toStringHelper;
021import static com.google.common.base.Preconditions.checkArgument;
022import static com.google.common.base.Preconditions.checkNotNull;
023import static com.google.common.base.Preconditions.checkState;
024import static com.google.common.collect.Lists.asList;
025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED;
026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING;
027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN;
028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED;
029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE;
030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER;
031import static com.google.common.util.concurrent.Futures.getDone;
032import static com.google.common.util.concurrent.Futures.immediateFuture;
033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
034import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
035import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException;
036import static java.util.logging.Level.FINER;
037import static java.util.logging.Level.SEVERE;
038import static java.util.logging.Level.WARNING;
039
040import com.google.common.annotations.J2ktIncompatible;
041import com.google.common.annotations.VisibleForTesting;
042import com.google.common.collect.FluentIterable;
043import com.google.common.collect.ImmutableList;
044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
046import com.google.common.util.concurrent.Futures.FutureCombiner;
047import com.google.errorprone.annotations.CanIgnoreReturnValue;
048import com.google.errorprone.annotations.DoNotMock;
049import com.google.j2objc.annotations.RetainedWith;
050import java.io.Closeable;
051import java.util.IdentityHashMap;
052import java.util.Map;
053import java.util.concurrent.Callable;
054import java.util.concurrent.CancellationException;
055import java.util.concurrent.CountDownLatch;
056import java.util.concurrent.ExecutionException;
057import java.util.concurrent.Executor;
058import java.util.concurrent.Future;
059import java.util.concurrent.RejectedExecutionException;
060import java.util.concurrent.atomic.AtomicReference;
061import org.jspecify.annotations.Nullable;
062
063/**
064 * A step in a pipeline of an asynchronous computation. When the last step in the computation is
065 * complete, some objects captured during the computation are closed.
066 *
067 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an
068 * asynchronously-computed intermediate value, or else an exception that indicates the failure or
069 * cancellation of the operation so far. The only way to extract the value or exception from a step
070 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the
071 * "value" of a successful step or the "result" (value or exception) of any step.
072 *
073 * <ol>
074 *   <li>A pipeline starts at its leaf step (or steps), which is created from either a callable
075 *       block or a {@link ListenableFuture}.
076 *   <li>Each other step is derived from one or more input steps. At each step, zero or more objects
077 *       can be captured for later closing.
078 *   <li>There is one last step (the root of the tree), from which you can extract the final result
079 *       of the computation. After that result is available (or the computation fails), all objects
080 *       captured by any of the steps in the pipeline are closed.
081 * </ol>
082 *
083 * <h3>Starting a pipeline</h3>
084 *
085 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a
086 * callable block} that may capture objects for later closing. To start a pipeline from a {@link
087 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link
088 * #from(ListenableFuture)} instead.
089 *
090 * <h3>Derived steps</h3>
091 *
092 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in
093 * ways similar to {@link FluentFuture}s:
094 *
095 * <ul>
096 *   <li>by transforming the value from a successful input step,
097 *   <li>by catching the exception from a failed input step, or
098 *   <li>by combining the results of several input steps.
099 * </ul>
100 *
101 * Each derivation can capture the next value or any intermediate objects for later closing.
102 *
103 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its
104 * exception, or combine it with others, you cannot do anything else with it, including declare it
105 * to be the last step of the pipeline.
106 *
107 * <h4>Transforming</h4>
108 *
109 * To derive the next step by asynchronously applying a function to an input step's value, call
110 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction,
111 * Executor)} on the input step.
112 *
113 * <h4>Catching</h4>
114 *
115 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction,
116 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step.
117 *
118 * <h4>Combining</h4>
119 *
120 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link
121 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads.
122 *
123 * <h3>Cancelling</h3>
124 *
125 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step
126 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a
127 * successfully cancelled step will immediately start closing all objects captured for later closing
128 * by it and by its input steps.
129 *
130 * <h3>Ending a pipeline</h3>
131 *
132 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to
133 * close the captured objects automatically or manually.
134 *
135 * <h4>Automatically closing</h4>
136 *
137 * You can extract a {@link Future} that represents the result of the last step in the pipeline by
138 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin
139 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future
140 * completes before closing starts, rather than once it has finished.
141 *
142 * <pre>{@code
143 * FluentFuture<UserName> userName =
144 *     ClosingFuture.submit(
145 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
146 *             executor)
147 *         .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
148 *         .transform((closer, result) -> result.get("userName"), directExecutor())
149 *         .catching(DBException.class, e -> "no user", directExecutor())
150 *         .finishToFuture();
151 * }</pre>
152 *
153 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query
154 * result cursor will both be closed, even if the operation is cancelled or fails.
155 *
156 * <h4>Manually closing</h4>
157 *
158 * If you want to close the captured objects manually, after you've used the final result, call
159 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the
160 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects.
161 *
162 * <pre>{@code
163 *     ClosingFuture.submit(
164 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
165 *             executor)
166 *     .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
167 *     .transform((closer, result) -> result.get("userName"), directExecutor())
168 *     .catching(DBException.class, e -> "no user", directExecutor())
169 *     .finishToValueAndCloser(
170 *         valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor);
171 *
172 * // later
173 * try { // get() will throw if the operation failed or was cancelled.
174 *   UserName userName = userNameValueAndCloser.get();
175 *   // do something with userName
176 * } finally {
177 *   userNameValueAndCloser.closeAsync();
178 * }
179 * }</pre>
180 *
181 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and
182 * the query result cursor will both be closed, even if the operation is cancelled or fails.
183 *
184 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The
185 * automatic-closing approach described above is safer.
186 *
187 * @param <V> the type of the value of this step
188 * @since 30.0
189 */
190// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations.
191@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)")
192@J2ktIncompatible
193// TODO(dpb): GWT compatibility.
194public final class ClosingFuture<V extends @Nullable Object> {
195
196  private static final LazyLogger logger = new LazyLogger(ClosingFuture.class);
197
198  /**
199   * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is
200   * done.
201   */
202  public static final class DeferredCloser {
203    @RetainedWith private final CloseableList list;
204
205    DeferredCloser(CloseableList list) {
206      this.list = list;
207    }
208
209    /**
210     * Captures an object to be closed when a {@link ClosingFuture} pipeline is done.
211     *
212     * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code
213     * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code
214     * Closeable}. (For more about the flavors, see <a
215     * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your
216     * build</a>.)
217     *
218     * <p>Be careful when targeting an older SDK than you are building against (most commonly when
219     * building for Android): Ensure that any object you pass implements the interface not just in
220     * your current SDK version but also at the oldest version you support. For example, <a
221     * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version
222     * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper
223     * {@code Closeable} with a method reference like {@code cursor::close}.
224     *
225     * <p>Note that this method is still binary-compatible between flavors because the erasure of
226     * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}.
227     *
228     * @param closeable the object to be closed (see notes above)
229     * @param closingExecutor the object will be closed on this executor
230     * @return the first argument
231     */
232    @CanIgnoreReturnValue
233    @ParametricNullness
234    public <C extends @Nullable Object & @Nullable AutoCloseable> C eventuallyClose(
235        @ParametricNullness C closeable, Executor closingExecutor) {
236      checkNotNull(closingExecutor);
237      if (closeable != null) {
238        list.add(closeable, closingExecutor);
239      }
240      return closeable;
241    }
242  }
243
244  /**
245   * An operation that computes a result.
246   *
247   * @param <V> the type of the result
248   */
249  public interface ClosingCallable<V extends @Nullable Object> {
250    /**
251     * Computes a result, or throws an exception if unable to do so.
252     *
253     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
254     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
255     * not before this method completes), even if this method throws or the pipeline is cancelled.
256     */
257    @ParametricNullness
258    V call(DeferredCloser closer) throws Exception;
259  }
260
261  /**
262   * An operation that computes a {@link ClosingFuture} of a result.
263   *
264   * @param <V> the type of the result
265   * @since 30.1
266   */
267  public interface AsyncClosingCallable<V extends @Nullable Object> {
268    /**
269     * Computes a result, or throws an exception if unable to do so.
270     *
271     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
272     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
273     * not before this method completes), even if this method throws or the pipeline is cancelled.
274     */
275    ClosingFuture<V> call(DeferredCloser closer) throws Exception;
276  }
277
278  /**
279   * A function from an input to a result.
280   *
281   * @param <T> the type of the input to the function
282   * @param <U> the type of the result of the function
283   */
284  public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> {
285
286    /**
287     * Applies this function to an input, or throws an exception if unable to do so.
288     *
289     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
290     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
291     * not before this method completes), even if this method throws or the pipeline is cancelled.
292     */
293    @ParametricNullness
294    U apply(DeferredCloser closer, @ParametricNullness T input) throws Exception;
295  }
296
297  /**
298   * A function from an input to a {@link ClosingFuture} of a result.
299   *
300   * @param <T> the type of the input to the function
301   * @param <U> the type of the result of the function
302   */
303  public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> {
304    /**
305     * Applies this function to an input, or throws an exception if unable to do so.
306     *
307     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
308     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
309     * not before this method completes), even if this method throws or the pipeline is cancelled.
310     */
311    ClosingFuture<U> apply(DeferredCloser closer, @ParametricNullness T input) throws Exception;
312  }
313
314  /**
315   * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and
316   * allows the user to close all the closeable objects that were captured during it for later
317   * closing.
318   *
319   * <p>The asynchronous operation will have completed before this object is created.
320   *
321   * @param <V> the type of the value of a successful operation
322   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
323   */
324  public static final class ValueAndCloser<V extends @Nullable Object> {
325
326    private final ClosingFuture<? extends V> closingFuture;
327
328    ValueAndCloser(ClosingFuture<? extends V> closingFuture) {
329      this.closingFuture = checkNotNull(closingFuture);
330    }
331
332    /**
333     * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as
334     * {@link Future#get()} would.
335     *
336     * <p>Because the asynchronous operation has already completed, this method is synchronous and
337     * returns immediately.
338     *
339     * @throws CancellationException if the computation was cancelled
340     * @throws ExecutionException if the computation threw an exception
341     */
342    @ParametricNullness
343    public V get() throws ExecutionException {
344      return getDone(closingFuture.future);
345    }
346
347    /**
348     * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous
349     * operation on the {@link Executor}s specified by calls to {@link
350     * DeferredCloser#eventuallyClose(Object, Executor)}.
351     *
352     * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be
353     * closed synchronously.
354     *
355     * <p>Idempotent: objects will be closed at most once.
356     */
357    public void closeAsync() {
358      closingFuture.close();
359    }
360  }
361
362  /**
363   * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link
364   * ClosingFuture} pipeline.
365   *
366   * @param <V> the type of the final value of a successful pipeline
367   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
368   */
369  public interface ValueAndCloserConsumer<V extends @Nullable Object> {
370
371    /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */
372    void accept(ValueAndCloser<V> valueAndCloser);
373  }
374
375  /**
376   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
377   *
378   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
379   *     execution
380   */
381  public static <V extends @Nullable Object> ClosingFuture<V> submit(
382      ClosingCallable<V> callable, Executor executor) {
383    checkNotNull(callable);
384    CloseableList closeables = new CloseableList();
385    TrustedListenableFutureTask<V> task =
386        TrustedListenableFutureTask.create(
387            new Callable<V>() {
388              @Override
389              @ParametricNullness
390              public V call() throws Exception {
391                return callable.call(closeables.closer);
392              }
393
394              @Override
395              public String toString() {
396                return callable.toString();
397              }
398            });
399    executor.execute(task);
400    return new ClosingFuture<>(task, closeables);
401  }
402
403  /**
404   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
405   *
406   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
407   *     execution
408   * @since 30.1
409   */
410  public static <V extends @Nullable Object> ClosingFuture<V> submitAsync(
411      AsyncClosingCallable<V> callable, Executor executor) {
412    checkNotNull(callable);
413    CloseableList closeables = new CloseableList();
414    TrustedListenableFutureTask<V> task =
415        TrustedListenableFutureTask.create(
416            new AsyncCallable<V>() {
417              @Override
418              public ListenableFuture<V> call() throws Exception {
419                CloseableList newCloseables = new CloseableList();
420                try {
421                  ClosingFuture<V> closingFuture = callable.call(newCloseables.closer);
422                  closingFuture.becomeSubsumedInto(closeables);
423                  return closingFuture.future;
424                } finally {
425                  closeables.add(newCloseables, directExecutor());
426                }
427              }
428
429              @Override
430              public String toString() {
431                return callable.toString();
432              }
433            });
434    executor.execute(task);
435    return new ClosingFuture<>(task, closeables);
436  }
437
438  /**
439   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
440   *
441   * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V}
442   * implements {@link Closeable}. In order to start a pipeline with a value that will be closed
443   * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead.
444   */
445  public static <V extends @Nullable Object> ClosingFuture<V> from(ListenableFuture<V> future) {
446    return new ClosingFuture<>(future);
447  }
448
449  /**
450   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
451   *
452   * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)}) when
453   * the pipeline is done, even if the pipeline is canceled or fails.
454   *
455   * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its
456   * value in order to close it.
457   *
458   * @param future the future to create the {@code ClosingFuture} from. For discussion of the
459   *     future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Object,
460   *     Executor)}.
461   * @param closingExecutor the future's result will be closed on this executor
462   * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the
463   *     underlying value may never be closed if the {@link Future} is canceled after its operation
464   *     begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types,
465   *     including those that pass them to this method, with {@link #submit(ClosingCallable,
466   *     Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a
467   *     {@link ListenableFuture} that doesn't create values that should be closed, use {@link
468   *     ClosingFuture#from}.
469   */
470  @Deprecated
471  public static <C extends @Nullable Object & @Nullable AutoCloseable>
472      ClosingFuture<C> eventuallyClosing(ListenableFuture<C> future, Executor closingExecutor) {
473    checkNotNull(closingExecutor);
474    ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future));
475    Futures.addCallback(
476        future,
477        new FutureCallback<@Nullable AutoCloseable>() {
478          @Override
479          public void onSuccess(@Nullable AutoCloseable result) {
480            closingFuture.closeables.closer.eventuallyClose(result, closingExecutor);
481          }
482
483          @Override
484          public void onFailure(Throwable t) {}
485        },
486        directExecutor());
487    return closingFuture;
488  }
489
490  /**
491   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
492   *
493   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
494   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
495   */
496  public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) {
497    return new Combiner(false, futures);
498  }
499
500  /**
501   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
502   *
503   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
504   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
505   */
506  public static Combiner whenAllComplete(
507      ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) {
508    return whenAllComplete(asList(future1, moreFutures));
509  }
510
511  /**
512   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
513   * all succeed. If any fail, the resulting pipeline will fail.
514   *
515   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
516   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
517   */
518  public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) {
519    return new Combiner(true, futures);
520  }
521
522  /**
523   * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming
524   * they all succeed. If any fail, the resulting pipeline will fail.
525   *
526   * <p>Calling this method allows you to use lambdas or method references typed with the types of
527   * the input {@link ClosingFuture}s.
528   *
529   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
530   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
531   */
532  public static <V1 extends @Nullable Object, V2 extends @Nullable Object>
533      Combiner2<V1, V2> whenAllSucceed(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
534    return new Combiner2<>(future1, future2);
535  }
536
537  /**
538   * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming
539   * they all succeed. If any fail, the resulting pipeline will fail.
540   *
541   * <p>Calling this method allows you to use lambdas or method references typed with the types of
542   * the input {@link ClosingFuture}s.
543   *
544   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
545   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
546   */
547  public static <
548          V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object>
549      Combiner3<V1, V2, V3> whenAllSucceed(
550          ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
551    return new Combiner3<>(future1, future2, future3);
552  }
553
554  /**
555   * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming
556   * they all succeed. If any fail, the resulting pipeline will fail.
557   *
558   * <p>Calling this method allows you to use lambdas or method references typed with the types of
559   * the input {@link ClosingFuture}s.
560   *
561   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
562   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
563   */
564  public static <
565          V1 extends @Nullable Object,
566          V2 extends @Nullable Object,
567          V3 extends @Nullable Object,
568          V4 extends @Nullable Object>
569      Combiner4<V1, V2, V3, V4> whenAllSucceed(
570          ClosingFuture<V1> future1,
571          ClosingFuture<V2> future2,
572          ClosingFuture<V3> future3,
573          ClosingFuture<V4> future4) {
574    return new Combiner4<>(future1, future2, future3, future4);
575  }
576
577  /**
578   * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming
579   * they all succeed. If any fail, the resulting pipeline will fail.
580   *
581   * <p>Calling this method allows you to use lambdas or method references typed with the types of
582   * the input {@link ClosingFuture}s.
583   *
584   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
585   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
586   */
587  public static <
588          V1 extends @Nullable Object,
589          V2 extends @Nullable Object,
590          V3 extends @Nullable Object,
591          V4 extends @Nullable Object,
592          V5 extends @Nullable Object>
593      Combiner5<V1, V2, V3, V4, V5> whenAllSucceed(
594          ClosingFuture<V1> future1,
595          ClosingFuture<V2> future2,
596          ClosingFuture<V3> future3,
597          ClosingFuture<V4> future4,
598          ClosingFuture<V5> future5) {
599    return new Combiner5<>(future1, future2, future3, future4, future5);
600  }
601
602  /**
603   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
604   * all succeed. If any fail, the resulting pipeline will fail.
605   *
606   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
607   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
608   */
609  public static Combiner whenAllSucceed(
610      ClosingFuture<?> future1,
611      ClosingFuture<?> future2,
612      ClosingFuture<?> future3,
613      ClosingFuture<?> future4,
614      ClosingFuture<?> future5,
615      ClosingFuture<?> future6,
616      ClosingFuture<?>... moreFutures) {
617    return whenAllSucceed(
618        FluentIterable.of(future1, future2, future3, future4, future5, future6)
619            .append(moreFutures));
620  }
621
622  private final AtomicReference<State> state = new AtomicReference<>(OPEN);
623  private final CloseableList closeables;
624  private final FluentFuture<V> future;
625
626  private ClosingFuture(ListenableFuture<V> future) {
627    this(future, new CloseableList());
628  }
629
630  private ClosingFuture(ListenableFuture<V> future, CloseableList closeables) {
631    this.future = FluentFuture.from(future);
632    this.closeables = closeables;
633  }
634
635  /**
636   * Returns a future that finishes when this step does. Calling {@code get()} on the returned
637   * future returns {@code null} if the step is successful or throws the same exception that would
638   * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code
639   * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline.
640   *
641   * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls
642   * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or
643   * a derivation method <i>on the same instance</i>. This is important because calling {@code
644   * statusFuture} alone does not provide a way to close the pipeline.
645   */
646  public ListenableFuture<?> statusFuture() {
647    return nonCancellationPropagating(future.transform(constant(null), directExecutor()));
648  }
649
650  /**
651   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
652   * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed
653   * when the pipeline is done.
654   *
655   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
656   * ClosingFuture} will be equivalent to this one.
657   *
658   * <p>If the function throws an exception, that exception is used as the result of the derived
659   * {@code ClosingFuture}.
660   *
661   * <p>Example usage:
662   *
663   * <pre>{@code
664   * ClosingFuture<List<Row>> rowsFuture =
665   *     queryFuture.transform((closer, result) -> result.getRows(), executor);
666   * }</pre>
667   *
668   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
669   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
670   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
671   *
672   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
673   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
674   * the original {@code ClosingFuture} instance.
675   *
676   * @param function transforms the value of this step to the value of the derived step
677   * @param executor executor to run the function in
678   * @return the derived step
679   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
680   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
681   *     finished}
682   */
683  public <U extends @Nullable Object> ClosingFuture<U> transform(
684      ClosingFunction<? super V, U> function, Executor executor) {
685    checkNotNull(function);
686    AsyncFunction<V, U> applyFunction =
687        new AsyncFunction<V, U>() {
688          @Override
689          public ListenableFuture<U> apply(V input) throws Exception {
690            return closeables.applyClosingFunction(function, input);
691          }
692
693          @Override
694          public String toString() {
695            return function.toString();
696          }
697        };
698    // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function).
699    return derive(future.transformAsync(applyFunction, executor));
700  }
701
702  /**
703   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
704   * that returns a {@code ClosingFuture} to its value. The function can use a {@link
705   * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
706   * captured by the returned {@link ClosingFuture}).
707   *
708   * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one
709   * returned by the function.
710   *
711   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
712   * ClosingFuture} will be equivalent to this one.
713   *
714   * <p>If the function throws an exception, that exception is used as the result of the derived
715   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
716   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
717   * closed.
718   *
719   * <p>Usage guidelines for this method:
720   *
721   * <ul>
722   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
723   *       {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction,
724   *       Executor)} instead, with a function that returns the next value directly.
725   *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
726   *       for every closeable object this step creates in order to capture it for later closing.
727   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
728   *       ClosingFuture} call {@link #from(ListenableFuture)}.
729   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
730   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
731   *       {@link #withoutCloser(AsyncFunction)}
732   * </ul>
733   *
734   * <p>Example usage:
735   *
736   * <pre>{@code
737   * // Result.getRowsClosingFuture() returns a ClosingFuture.
738   * ClosingFuture<List<Row>> rowsFuture =
739   *     queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor);
740   *
741   * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the
742   * // number of written rows. openOutputFile() returns a FileOutputStream (which implements
743   * // Closeable).
744   * ClosingFuture<Integer> rowsFuture2 =
745   *     queryFuture.transformAsync(
746   *         (closer, result) -> {
747   *           FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor);
748   *           return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos));
749   *      },
750   *      executor);
751   *
752   * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created).
753   * ClosingFuture<List<Row>> rowsFuture3 =
754   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
755   *
756   * }</pre>
757   *
758   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
759   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
760   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
761   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
762   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
763   * responsible for completing the returned {@code ClosingFuture}.)
764   *
765   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
766   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
767   * the original {@code ClosingFuture} instance.
768   *
769   * @param function transforms the value of this step to a {@code ClosingFuture} with the value of
770   *     the derived step
771   * @param executor executor to run the function in
772   * @return the derived step
773   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
774   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
775   *     finished}
776   */
777  public <U extends @Nullable Object> ClosingFuture<U> transformAsync(
778      AsyncClosingFunction<? super V, U> function, Executor executor) {
779    checkNotNull(function);
780    AsyncFunction<V, U> applyFunction =
781        new AsyncFunction<V, U>() {
782          @Override
783          public ListenableFuture<U> apply(V input) throws Exception {
784            return closeables.applyAsyncClosingFunction(function, input);
785          }
786
787          @Override
788          public String toString() {
789            return function.toString();
790          }
791        };
792    return derive(future.transformAsync(applyFunction, executor));
793  }
794
795  /**
796   * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input,
797   * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned
798   * {@link ListenableFuture}.
799   *
800   * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction,
801   * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it
802   * meets these conditions:
803   *
804   * <ul>
805   *   <li>It does not need to capture any {@link Closeable} objects by calling {@link
806   *       DeferredCloser#eventuallyClose(Object, Executor)}.
807   *   <li>It returns a {@link ListenableFuture}.
808   * </ul>
809   *
810   * <p>Example usage:
811   *
812   * <pre>{@code
813   * // Result.getRowsFuture() returns a ListenableFuture.
814   * ClosingFuture<List<Row>> rowsFuture =
815   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
816   * }</pre>
817   *
818   * @param function transforms the value of a {@code ClosingFuture} step to a {@link
819   *     ListenableFuture} with the value of a derived step
820   */
821  public static <V extends @Nullable Object, U extends @Nullable Object>
822      AsyncClosingFunction<V, U> withoutCloser(AsyncFunction<V, U> function) {
823    checkNotNull(function);
824    return (closer, input) -> ClosingFuture.from(function.apply(input));
825  }
826
827  /**
828   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
829   * to its exception if it is an instance of a given exception type. The function can use a {@link
830   * DeferredCloser} to capture objects to be closed when the pipeline is done.
831   *
832   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
833   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
834   * one.
835   *
836   * <p>If the function throws an exception, that exception is used as the result of the derived
837   * {@code ClosingFuture}.
838   *
839   * <p>Example usage:
840   *
841   * <pre>{@code
842   * ClosingFuture<QueryResult> queryFuture =
843   *     queryFuture.catching(
844   *         QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor);
845   * }</pre>
846   *
847   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
848   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
849   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
850   *
851   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
852   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
853   * the original {@code ClosingFuture} instance.
854   *
855   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
856   *     type is matched against this step's exception. "This step's exception" means the cause of
857   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
858   *     underlying this step or, if {@code get()} throws a different kind of exception, that
859   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
860   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
861   * @param fallback the function to be called if this step fails with the expected exception type.
862   *     The function's argument is this step's exception. "This step's exception" means the cause
863   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
864   *     underlying this step or, if {@code get()} throws a different kind of exception, that
865   *     exception itself.
866   * @param executor the executor that runs {@code fallback} if the input fails
867   */
868  public <X extends Throwable> ClosingFuture<V> catching(
869      Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) {
870    return catchingMoreGeneric(exceptionType, fallback, executor);
871  }
872
873  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
874  private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric(
875      Class<X> exceptionType, ClosingFunction<? super X, W> fallback, Executor executor) {
876    checkNotNull(fallback);
877    AsyncFunction<X, W> applyFallback =
878        new AsyncFunction<X, W>() {
879          @Override
880          public ListenableFuture<W> apply(X exception) throws Exception {
881            return closeables.applyClosingFunction(fallback, exception);
882          }
883
884          @Override
885          public String toString() {
886            return fallback.toString();
887          }
888        };
889    // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function).
890    return derive(future.catchingAsync(exceptionType, applyFallback, executor));
891  }
892
893  /**
894   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
895   * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception
896   * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the
897   * pipeline is done (other than those captured by the returned {@link ClosingFuture}).
898   *
899   * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code
900   * ClosingFuture} will be equivalent to the one returned by the function.
901   *
902   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
903   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
904   * one.
905   *
906   * <p>If the function throws an exception, that exception is used as the result of the derived
907   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
908   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
909   * closed.
910   *
911   * <p>Usage guidelines for this method:
912   *
913   * <ul>
914   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
915   *       {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class,
916   *       ClosingFunction, Executor)} instead, with a function that returns the next value
917   *       directly.
918   *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
919   *       for every closeable object this step creates in order to capture it for later closing.
920   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
921   *       ClosingFuture} call {@link #from(ListenableFuture)}.
922   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
923   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
924   *       {@link #withoutCloser(AsyncFunction)}
925   * </ul>
926   *
927   * <p>Example usage:
928   *
929   * <pre>{@code
930   * // Fall back to a secondary input stream in case of IOException.
931   * ClosingFuture<InputStream> inputFuture =
932   *     firstInputFuture.catchingAsync(
933   *         IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor);
934   * }
935   * }</pre>
936   *
937   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
938   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
939   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
940   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
941   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
942   * responsible for completing the returned {@code ClosingFuture}.)
943   *
944   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
945   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
946   * the original {@code ClosingFuture} instance.
947   *
948   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
949   *     type is matched against this step's exception. "This step's exception" means the cause of
950   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
951   *     underlying this step or, if {@code get()} throws a different kind of exception, that
952   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
953   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
954   * @param fallback the function to be called if this step fails with the expected exception type.
955   *     The function's argument is this step's exception. "This step's exception" means the cause
956   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
957   *     underlying this step or, if {@code get()} throws a different kind of exception, that
958   *     exception itself.
959   * @param executor the executor that runs {@code fallback} if the input fails
960   */
961  // TODO(dpb): Should this do something special if the function throws CancellationException or
962  // ExecutionException?
963  public <X extends Throwable> ClosingFuture<V> catchingAsync(
964      Class<X> exceptionType,
965      AsyncClosingFunction<? super X, ? extends V> fallback,
966      Executor executor) {
967    return catchingAsyncMoreGeneric(exceptionType, fallback, executor);
968  }
969
970  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
971  private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric(
972      Class<X> exceptionType, AsyncClosingFunction<? super X, W> fallback, Executor executor) {
973    checkNotNull(fallback);
974    AsyncFunction<X, W> asyncFunction =
975        new AsyncFunction<X, W>() {
976          @Override
977          public ListenableFuture<W> apply(X exception) throws Exception {
978            return closeables.applyAsyncClosingFunction(fallback, exception);
979          }
980
981          @Override
982          public String toString() {
983            return fallback.toString();
984          }
985        };
986    return derive(future.catchingAsync(exceptionType, asyncFunction, executor));
987  }
988
989  /**
990   * Marks this step as the last step in the {@code ClosingFuture} pipeline.
991   *
992   * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when
993   * the pipeline is cancelled.
994   *
995   * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously
996   * <b>after</b> the returned {@code Future} is done: the future completes before closing starts,
997   * rather than once it has finished.
998   *
999   * <p>After calling this method, you may not call {@link
1000   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other
1001   * derivation method on the original {@code ClosingFuture} instance.
1002   *
1003   * @return a {@link Future} that represents the final value or exception of the pipeline
1004   */
1005  public FluentFuture<V> finishToFuture() {
1006    if (compareAndUpdateState(OPEN, WILL_CLOSE)) {
1007      logger.get().log(FINER, "will close {0}", this);
1008      future.addListener(
1009          () -> {
1010            checkAndUpdateState(WILL_CLOSE, CLOSING);
1011            close();
1012            checkAndUpdateState(CLOSING, CLOSED);
1013          },
1014          directExecutor());
1015    } else {
1016      switch (state.get()) {
1017        case SUBSUMED:
1018          throw new IllegalStateException(
1019              "Cannot call finishToFuture() after deriving another step");
1020
1021        case WILL_CREATE_VALUE_AND_CLOSER:
1022          throw new IllegalStateException(
1023              "Cannot call finishToFuture() after calling finishToValueAndCloser()");
1024
1025        case WILL_CLOSE:
1026        case CLOSING:
1027        case CLOSED:
1028          throw new IllegalStateException("Cannot call finishToFuture() twice");
1029
1030        case OPEN:
1031          throw new AssertionError();
1032      }
1033    }
1034    return future;
1035  }
1036
1037  /**
1038   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done,
1039   * {@code receiver} will be called with an object that contains the result of the operation. The
1040   * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use.
1041   *
1042   * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or
1043   * any other derivation method on the original {@code ClosingFuture} instance.
1044   *
1045   * @param consumer a callback whose method will be called (using {@code executor}) when this
1046   *     operation is done
1047   */
1048  public void finishToValueAndCloser(
1049      ValueAndCloserConsumer<? super V> consumer, Executor executor) {
1050    checkNotNull(consumer);
1051    if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) {
1052      switch (state.get()) {
1053        case SUBSUMED:
1054          throw new IllegalStateException(
1055              "Cannot call finishToValueAndCloser() after deriving another step");
1056
1057        case WILL_CLOSE:
1058        case CLOSING:
1059        case CLOSED:
1060          throw new IllegalStateException(
1061              "Cannot call finishToValueAndCloser() after calling finishToFuture()");
1062
1063        case WILL_CREATE_VALUE_AND_CLOSER:
1064          throw new IllegalStateException("Cannot call finishToValueAndCloser() twice");
1065
1066        case OPEN:
1067          break;
1068      }
1069      throw new AssertionError(state);
1070    }
1071    future.addListener(() -> provideValueAndCloser(consumer, ClosingFuture.this), executor);
1072  }
1073
1074  private static <C extends @Nullable Object, V extends C> void provideValueAndCloser(
1075      ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) {
1076    consumer.accept(new ValueAndCloser<C>(closingFuture));
1077  }
1078
1079  /**
1080   * Attempts to cancel execution of this step. This attempt will fail if the step has already
1081   * completed, has already been cancelled, or could not be cancelled for some other reason. If
1082   * successful, and this step has not started when {@code cancel} is called, this step should never
1083   * run.
1084   *
1085   * <p>If successful, causes the objects captured by this step (if already started) and its input
1086   * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls
1087   * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously.
1088   *
1089   * @param mayInterruptIfRunning {@code true} if the thread executing this task should be
1090   *     interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be
1091   *     cancelled regardless
1092   * @return {@code false} if the step could not be cancelled, typically because it has already
1093   *     completed normally; {@code true} otherwise
1094   */
1095  @CanIgnoreReturnValue
1096  @SuppressWarnings("Interruption") // We are propagating an interrupt from a caller.
1097  public boolean cancel(boolean mayInterruptIfRunning) {
1098    logger.get().log(FINER, "cancelling {0}", this);
1099    boolean cancelled = future.cancel(mayInterruptIfRunning);
1100    if (cancelled) {
1101      close();
1102    }
1103    return cancelled;
1104  }
1105
1106  private void close() {
1107    logger.get().log(FINER, "closing {0}", this);
1108    closeables.close();
1109  }
1110
1111  private <U extends @Nullable Object> ClosingFuture<U> derive(FluentFuture<U> future) {
1112    ClosingFuture<U> derived = new ClosingFuture<>(future);
1113    becomeSubsumedInto(derived.closeables);
1114    return derived;
1115  }
1116
1117  private void becomeSubsumedInto(CloseableList otherCloseables) {
1118    checkAndUpdateState(OPEN, SUBSUMED);
1119    otherCloseables.add(closeables, directExecutor());
1120  }
1121
1122  /**
1123   * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link
1124   * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}.
1125   *
1126   * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object.
1127   */
1128  public static final class Peeker {
1129    private final ImmutableList<ClosingFuture<?>> futures;
1130    private volatile boolean beingCalled;
1131
1132    private Peeker(ImmutableList<ClosingFuture<?>> futures) {
1133      this.futures = checkNotNull(futures);
1134    }
1135
1136    /**
1137     * Returns the value of {@code closingFuture}.
1138     *
1139     * @throws ExecutionException if {@code closingFuture} is a failed step
1140     * @throws CancellationException if the {@code closingFuture}'s future was cancelled
1141     * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to
1142     *     {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)}
1143     * @throws IllegalStateException if called outside of a call to {@link
1144     *     CombiningCallable#call(DeferredCloser, Peeker)} or {@link
1145     *     AsyncCombiningCallable#call(DeferredCloser, Peeker)}
1146     */
1147    @ParametricNullness
1148    public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture)
1149        throws ExecutionException {
1150      checkState(beingCalled);
1151      checkArgument(futures.contains(closingFuture));
1152      return Futures.getDone(closingFuture.future);
1153    }
1154
1155    @ParametricNullness
1156    private <V extends @Nullable Object> V call(
1157        CombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1158      beingCalled = true;
1159      CloseableList newCloseables = new CloseableList();
1160      try {
1161        return combiner.call(newCloseables.closer, this);
1162      } finally {
1163        closeables.add(newCloseables, directExecutor());
1164        beingCalled = false;
1165      }
1166    }
1167
1168    private <V extends @Nullable Object> FluentFuture<V> callAsync(
1169        AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1170      beingCalled = true;
1171      CloseableList newCloseables = new CloseableList();
1172      try {
1173        ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this);
1174        closingFuture.becomeSubsumedInto(closeables);
1175        return closingFuture.future;
1176      } finally {
1177        closeables.add(newCloseables, directExecutor());
1178        beingCalled = false;
1179      }
1180    }
1181  }
1182
1183  /**
1184   * A builder of a {@link ClosingFuture} step that is derived from more than one input step.
1185   *
1186   * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to
1187   * instantiate this class.
1188   *
1189   * <p>Example:
1190   *
1191   * <pre>{@code
1192   * final ClosingFuture<BufferedReader> file1ReaderFuture = ...;
1193   * final ClosingFuture<BufferedReader> file2ReaderFuture = ...;
1194   * ListenableFuture<Integer> numberOfDifferentLines =
1195   *       ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture)
1196   *           .call(
1197   *               (closer, peeker) -> {
1198   *                 BufferedReader file1Reader = peeker.getDone(file1ReaderFuture);
1199   *                 BufferedReader file2Reader = peeker.getDone(file2ReaderFuture);
1200   *                 return countDifferentLines(file1Reader, file2Reader);
1201   *               },
1202   *               executor)
1203   *           .closing(executor);
1204   * }</pre>
1205   */
1206  @DoNotMock("Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.")
1207  public static class Combiner {
1208
1209    private final CloseableList closeables = new CloseableList();
1210
1211    /**
1212     * An operation that returns a result and may throw an exception.
1213     *
1214     * @param <V> the type of the result
1215     */
1216    public interface CombiningCallable<V extends @Nullable Object> {
1217      /**
1218       * Computes a result, or throws an exception if unable to do so.
1219       *
1220       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1221       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1222       * (but not before this method completes), even if this method throws or the pipeline is
1223       * cancelled.
1224       *
1225       * @param peeker used to get the value of any of the input futures
1226       */
1227      @ParametricNullness
1228      V call(DeferredCloser closer, Peeker peeker) throws Exception;
1229    }
1230
1231    /**
1232     * An operation that returns a {@link ClosingFuture} result and may throw an exception.
1233     *
1234     * @param <V> the type of the result
1235     */
1236    public interface AsyncCombiningCallable<V extends @Nullable Object> {
1237      /**
1238       * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so.
1239       *
1240       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1241       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1242       * (but not before this method completes), even if this method throws or the pipeline is
1243       * cancelled.
1244       *
1245       * @param peeker used to get the value of any of the input futures
1246       */
1247      ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception;
1248    }
1249
1250    private final boolean allMustSucceed;
1251    protected final ImmutableList<ClosingFuture<?>> inputs;
1252
1253    private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) {
1254      this.allMustSucceed = allMustSucceed;
1255      this.inputs = ImmutableList.copyOf(inputs);
1256      for (ClosingFuture<?> input : inputs) {
1257        input.becomeSubsumedInto(closeables);
1258      }
1259    }
1260
1261    /**
1262     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1263     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1264     * objects to be closed when the pipeline is done.
1265     *
1266     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1267     * fail, so will the returned step.
1268     *
1269     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1270     * cancelled.
1271     *
1272     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1273     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1274     */
1275    public <V extends @Nullable Object> ClosingFuture<V> call(
1276        CombiningCallable<V> combiningCallable, Executor executor) {
1277      Callable<V> callable =
1278          new Callable<V>() {
1279            @Override
1280            @ParametricNullness
1281            public V call() throws Exception {
1282              return new Peeker(inputs).call(combiningCallable, closeables);
1283            }
1284
1285            @Override
1286            public String toString() {
1287              return combiningCallable.toString();
1288            }
1289          };
1290      ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor));
1291      derived.closeables.add(closeables, directExecutor());
1292      return derived;
1293    }
1294
1295    /**
1296     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1297     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1298     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1299     * captured by the returned {@link ClosingFuture}).
1300     *
1301     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1302     * fail, so will the returned step.
1303     *
1304     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1305     * cancelled.
1306     *
1307     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1308     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1309     *
1310     * <p>If the combiningCallable throws any other exception, it will be used as the failure of the
1311     * derived step.
1312     *
1313     * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture},
1314     * then none of the closeable objects in that {@code ClosingFuture} will be closed.
1315     *
1316     * <p>Usage guidelines for this method:
1317     *
1318     * <ul>
1319     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1320     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1321     *       Executor)} instead, with a function that returns the next value directly.
1322     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1323     *       for every closeable object this step creates in order to capture it for later closing.
1324     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1325     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1326     * </ul>
1327     *
1328     * <p>The same warnings about doing heavyweight operations within {@link
1329     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1330     */
1331    public <V extends @Nullable Object> ClosingFuture<V> callAsync(
1332        AsyncCombiningCallable<V> combiningCallable, Executor executor) {
1333      AsyncCallable<V> asyncCallable =
1334          new AsyncCallable<V>() {
1335            @Override
1336            public ListenableFuture<V> call() throws Exception {
1337              return new Peeker(inputs).callAsync(combiningCallable, closeables);
1338            }
1339
1340            @Override
1341            public String toString() {
1342              return combiningCallable.toString();
1343            }
1344          };
1345      ClosingFuture<V> derived =
1346          new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor));
1347      derived.closeables.add(closeables, directExecutor());
1348      return derived;
1349    }
1350
1351    private FutureCombiner<@Nullable Object> futureCombiner() {
1352      return allMustSucceed
1353          ? Futures.whenAllSucceed(inputFutures())
1354          : Futures.whenAllComplete(inputFutures());
1355    }
1356
1357    private ImmutableList<FluentFuture<?>> inputFutures() {
1358      return FluentIterable.from(inputs)
1359          .<FluentFuture<?>>transform(future -> future.future)
1360          .toList();
1361    }
1362  }
1363
1364  /**
1365   * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link
1366   * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this
1367   * combination.
1368   *
1369   * @param <V1> the type returned by the first future
1370   * @param <V2> the type returned by the second future
1371   */
1372  public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object>
1373      extends Combiner {
1374
1375    /**
1376     * A function that returns a value when applied to the values of the two futures passed to
1377     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1378     *
1379     * @param <V1> the type returned by the first future
1380     * @param <V2> the type returned by the second future
1381     * @param <U> the type returned by the function
1382     */
1383    public interface ClosingFunction2<
1384        V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> {
1385
1386      /**
1387       * Applies this function to two inputs, or throws an exception if unable to do so.
1388       *
1389       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1390       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1391       * (but not before this method completes), even if this method throws or the pipeline is
1392       * cancelled.
1393       */
1394      @ParametricNullness
1395      U apply(DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2)
1396          throws Exception;
1397    }
1398
1399    /**
1400     * A function that returns a {@link ClosingFuture} when applied to the values of the two futures
1401     * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1402     *
1403     * @param <V1> the type returned by the first future
1404     * @param <V2> the type returned by the second future
1405     * @param <U> the type returned by the function
1406     */
1407    public interface AsyncClosingFunction2<
1408        V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> {
1409
1410      /**
1411       * Applies this function to two inputs, or throws an exception if unable to do so.
1412       *
1413       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1414       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1415       * (but not before this method completes), even if this method throws or the pipeline is
1416       * cancelled.
1417       */
1418      ClosingFuture<U> apply(
1419          DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2)
1420          throws Exception;
1421    }
1422
1423    private final ClosingFuture<V1> future1;
1424    private final ClosingFuture<V2> future2;
1425
1426    private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
1427      super(true, ImmutableList.of(future1, future2));
1428      this.future1 = future1;
1429      this.future2 = future2;
1430    }
1431
1432    /**
1433     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1434     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1435     * objects to be closed when the pipeline is done.
1436     *
1437     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1438     * any of the inputs fail, so will the returned step.
1439     *
1440     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1441     *
1442     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1443     * ExecutionException} will be extracted and used as the failure of the derived step.
1444     */
1445    public <U extends @Nullable Object> ClosingFuture<U> call(
1446        ClosingFunction2<V1, V2, U> function, Executor executor) {
1447      return call(
1448          new CombiningCallable<U>() {
1449            @Override
1450            @ParametricNullness
1451            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1452              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1453            }
1454
1455            @Override
1456            public String toString() {
1457              return function.toString();
1458            }
1459          },
1460          executor);
1461    }
1462
1463    /**
1464     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1465     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1466     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1467     * captured by the returned {@link ClosingFuture}).
1468     *
1469     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1470     * any of the inputs fail, so will the returned step.
1471     *
1472     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1473     *
1474     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1475     * ExecutionException} will be extracted and used as the failure of the derived step.
1476     *
1477     * <p>If the function throws any other exception, it will be used as the failure of the derived
1478     * step.
1479     *
1480     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1481     * the closeable objects in that {@code ClosingFuture} will be closed.
1482     *
1483     * <p>Usage guidelines for this method:
1484     *
1485     * <ul>
1486     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1487     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1488     *       Executor)} instead, with a function that returns the next value directly.
1489     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1490     *       for every closeable object this step creates in order to capture it for later closing.
1491     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1492     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1493     * </ul>
1494     *
1495     * <p>The same warnings about doing heavyweight operations within {@link
1496     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1497     */
1498    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1499        AsyncClosingFunction2<V1, V2, U> function, Executor executor) {
1500      return callAsync(
1501          new AsyncCombiningCallable<U>() {
1502            @Override
1503            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1504              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1505            }
1506
1507            @Override
1508            public String toString() {
1509              return function.toString();
1510            }
1511          },
1512          executor);
1513    }
1514  }
1515
1516  /**
1517   * A generic {@link Combiner} that lets you use a lambda or method reference to combine three
1518   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1519   * ClosingFuture)} to start this combination.
1520   *
1521   * @param <V1> the type returned by the first future
1522   * @param <V2> the type returned by the second future
1523   * @param <V3> the type returned by the third future
1524   */
1525  public static final class Combiner3<
1526          V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object>
1527      extends Combiner {
1528    /**
1529     * A function that returns a value when applied to the values of the three futures passed to
1530     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1531     *
1532     * @param <V1> the type returned by the first future
1533     * @param <V2> the type returned by the second future
1534     * @param <V3> the type returned by the third future
1535     * @param <U> the type returned by the function
1536     */
1537    public interface ClosingFunction3<
1538        V1 extends @Nullable Object,
1539        V2 extends @Nullable Object,
1540        V3 extends @Nullable Object,
1541        U extends @Nullable Object> {
1542      /**
1543       * Applies this function to three inputs, or throws an exception if unable to do so.
1544       *
1545       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1546       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1547       * (but not before this method completes), even if this method throws or the pipeline is
1548       * cancelled.
1549       */
1550      @ParametricNullness
1551      U apply(
1552          DeferredCloser closer,
1553          @ParametricNullness V1 value1,
1554          @ParametricNullness V2 value2,
1555          @ParametricNullness V3 value3)
1556          throws Exception;
1557    }
1558
1559    /**
1560     * A function that returns a {@link ClosingFuture} when applied to the values of the three
1561     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1562     *
1563     * @param <V1> the type returned by the first future
1564     * @param <V2> the type returned by the second future
1565     * @param <V3> the type returned by the third future
1566     * @param <U> the type returned by the function
1567     */
1568    public interface AsyncClosingFunction3<
1569        V1 extends @Nullable Object,
1570        V2 extends @Nullable Object,
1571        V3 extends @Nullable Object,
1572        U extends @Nullable Object> {
1573      /**
1574       * Applies this function to three inputs, or throws an exception if unable to do so.
1575       *
1576       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1577       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1578       * (but not before this method completes), even if this method throws or the pipeline is
1579       * cancelled.
1580       */
1581      ClosingFuture<U> apply(
1582          DeferredCloser closer,
1583          @ParametricNullness V1 value1,
1584          @ParametricNullness V2 value2,
1585          @ParametricNullness V3 value3)
1586          throws Exception;
1587    }
1588
1589    private final ClosingFuture<V1> future1;
1590    private final ClosingFuture<V2> future2;
1591    private final ClosingFuture<V3> future3;
1592
1593    private Combiner3(
1594        ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
1595      super(true, ImmutableList.of(future1, future2, future3));
1596      this.future1 = future1;
1597      this.future2 = future2;
1598      this.future3 = future3;
1599    }
1600
1601    /**
1602     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1603     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1604     * objects to be closed when the pipeline is done.
1605     *
1606     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1607     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1608     *
1609     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1610     *
1611     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1612     * ExecutionException} will be extracted and used as the failure of the derived step.
1613     */
1614    public <U extends @Nullable Object> ClosingFuture<U> call(
1615        ClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1616      return call(
1617          new CombiningCallable<U>() {
1618            @Override
1619            @ParametricNullness
1620            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1621              return function.apply(
1622                  closer,
1623                  peeker.getDone(future1),
1624                  peeker.getDone(future2),
1625                  peeker.getDone(future3));
1626            }
1627
1628            @Override
1629            public String toString() {
1630              return function.toString();
1631            }
1632          },
1633          executor);
1634    }
1635
1636    /**
1637     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1638     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1639     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1640     * captured by the returned {@link ClosingFuture}).
1641     *
1642     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1643     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1644     *
1645     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1646     *
1647     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1648     * ExecutionException} will be extracted and used as the failure of the derived step.
1649     *
1650     * <p>If the function throws any other exception, it will be used as the failure of the derived
1651     * step.
1652     *
1653     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1654     * the closeable objects in that {@code ClosingFuture} will be closed.
1655     *
1656     * <p>Usage guidelines for this method:
1657     *
1658     * <ul>
1659     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1660     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1661     *       Executor)} instead, with a function that returns the next value directly.
1662     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1663     *       for every closeable object this step creates in order to capture it for later closing.
1664     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1665     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1666     * </ul>
1667     *
1668     * <p>The same warnings about doing heavyweight operations within {@link
1669     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1670     */
1671    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1672        AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1673      return callAsync(
1674          new AsyncCombiningCallable<U>() {
1675            @Override
1676            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1677              return function.apply(
1678                  closer,
1679                  peeker.getDone(future1),
1680                  peeker.getDone(future2),
1681                  peeker.getDone(future3));
1682            }
1683
1684            @Override
1685            public String toString() {
1686              return function.toString();
1687            }
1688          },
1689          executor);
1690    }
1691  }
1692
1693  /**
1694   * A generic {@link Combiner} that lets you use a lambda or method reference to combine four
1695   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1696   * ClosingFuture)} to start this combination.
1697   *
1698   * @param <V1> the type returned by the first future
1699   * @param <V2> the type returned by the second future
1700   * @param <V3> the type returned by the third future
1701   * @param <V4> the type returned by the fourth future
1702   */
1703  public static final class Combiner4<
1704          V1 extends @Nullable Object,
1705          V2 extends @Nullable Object,
1706          V3 extends @Nullable Object,
1707          V4 extends @Nullable Object>
1708      extends Combiner {
1709    /**
1710     * A function that returns a value when applied to the values of the four futures passed to
1711     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}.
1712     *
1713     * @param <V1> the type returned by the first future
1714     * @param <V2> the type returned by the second future
1715     * @param <V3> the type returned by the third future
1716     * @param <V4> the type returned by the fourth future
1717     * @param <U> the type returned by the function
1718     */
1719    public interface ClosingFunction4<
1720        V1 extends @Nullable Object,
1721        V2 extends @Nullable Object,
1722        V3 extends @Nullable Object,
1723        V4 extends @Nullable Object,
1724        U extends @Nullable Object> {
1725      /**
1726       * Applies this function to four inputs, or throws an exception if unable to do so.
1727       *
1728       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1729       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1730       * (but not before this method completes), even if this method throws or the pipeline is
1731       * cancelled.
1732       */
1733      @ParametricNullness
1734      U apply(
1735          DeferredCloser closer,
1736          @ParametricNullness V1 value1,
1737          @ParametricNullness V2 value2,
1738          @ParametricNullness V3 value3,
1739          @ParametricNullness V4 value4)
1740          throws Exception;
1741    }
1742
1743    /**
1744     * A function that returns a {@link ClosingFuture} when applied to the values of the four
1745     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1746     * ClosingFuture)}.
1747     *
1748     * @param <V1> the type returned by the first future
1749     * @param <V2> the type returned by the second future
1750     * @param <V3> the type returned by the third future
1751     * @param <V4> the type returned by the fourth future
1752     * @param <U> the type returned by the function
1753     */
1754    public interface AsyncClosingFunction4<
1755        V1 extends @Nullable Object,
1756        V2 extends @Nullable Object,
1757        V3 extends @Nullable Object,
1758        V4 extends @Nullable Object,
1759        U extends @Nullable Object> {
1760      /**
1761       * Applies this function to four inputs, or throws an exception if unable to do so.
1762       *
1763       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1764       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1765       * (but not before this method completes), even if this method throws or the pipeline is
1766       * cancelled.
1767       */
1768      ClosingFuture<U> apply(
1769          DeferredCloser closer,
1770          @ParametricNullness V1 value1,
1771          @ParametricNullness V2 value2,
1772          @ParametricNullness V3 value3,
1773          @ParametricNullness V4 value4)
1774          throws Exception;
1775    }
1776
1777    private final ClosingFuture<V1> future1;
1778    private final ClosingFuture<V2> future2;
1779    private final ClosingFuture<V3> future3;
1780    private final ClosingFuture<V4> future4;
1781
1782    private Combiner4(
1783        ClosingFuture<V1> future1,
1784        ClosingFuture<V2> future2,
1785        ClosingFuture<V3> future3,
1786        ClosingFuture<V4> future4) {
1787      super(true, ImmutableList.of(future1, future2, future3, future4));
1788      this.future1 = future1;
1789      this.future2 = future2;
1790      this.future3 = future3;
1791      this.future4 = future4;
1792    }
1793
1794    /**
1795     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1796     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1797     * objects to be closed when the pipeline is done.
1798     *
1799     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1800     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1801     *
1802     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1803     *
1804     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1805     * ExecutionException} will be extracted and used as the failure of the derived step.
1806     */
1807    public <U extends @Nullable Object> ClosingFuture<U> call(
1808        ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1809      return call(
1810          new CombiningCallable<U>() {
1811            @Override
1812            @ParametricNullness
1813            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1814              return function.apply(
1815                  closer,
1816                  peeker.getDone(future1),
1817                  peeker.getDone(future2),
1818                  peeker.getDone(future3),
1819                  peeker.getDone(future4));
1820            }
1821
1822            @Override
1823            public String toString() {
1824              return function.toString();
1825            }
1826          },
1827          executor);
1828    }
1829
1830    /**
1831     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1832     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1833     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1834     * captured by the returned {@link ClosingFuture}).
1835     *
1836     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1837     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1838     *
1839     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1840     *
1841     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1842     * ExecutionException} will be extracted and used as the failure of the derived step.
1843     *
1844     * <p>If the function throws any other exception, it will be used as the failure of the derived
1845     * step.
1846     *
1847     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1848     * the closeable objects in that {@code ClosingFuture} will be closed.
1849     *
1850     * <p>Usage guidelines for this method:
1851     *
1852     * <ul>
1853     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1854     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1855     *       Executor)} instead, with a function that returns the next value directly.
1856     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1857     *       for every closeable object this step creates in order to capture it for later closing.
1858     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1859     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1860     * </ul>
1861     *
1862     * <p>The same warnings about doing heavyweight operations within {@link
1863     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1864     */
1865    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1866        AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1867      return callAsync(
1868          new AsyncCombiningCallable<U>() {
1869            @Override
1870            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1871              return function.apply(
1872                  closer,
1873                  peeker.getDone(future1),
1874                  peeker.getDone(future2),
1875                  peeker.getDone(future3),
1876                  peeker.getDone(future4));
1877            }
1878
1879            @Override
1880            public String toString() {
1881              return function.toString();
1882            }
1883          },
1884          executor);
1885    }
1886  }
1887
1888  /**
1889   * A generic {@link Combiner} that lets you use a lambda or method reference to combine five
1890   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1891   * ClosingFuture, ClosingFuture)} to start this combination.
1892   *
1893   * @param <V1> the type returned by the first future
1894   * @param <V2> the type returned by the second future
1895   * @param <V3> the type returned by the third future
1896   * @param <V4> the type returned by the fourth future
1897   * @param <V5> the type returned by the fifth future
1898   */
1899  public static final class Combiner5<
1900          V1 extends @Nullable Object,
1901          V2 extends @Nullable Object,
1902          V3 extends @Nullable Object,
1903          V4 extends @Nullable Object,
1904          V5 extends @Nullable Object>
1905      extends Combiner {
1906    /**
1907     * A function that returns a value when applied to the values of the five futures passed to
1908     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture,
1909     * ClosingFuture)}.
1910     *
1911     * @param <V1> the type returned by the first future
1912     * @param <V2> the type returned by the second future
1913     * @param <V3> the type returned by the third future
1914     * @param <V4> the type returned by the fourth future
1915     * @param <V5> the type returned by the fifth future
1916     * @param <U> the type returned by the function
1917     */
1918    public interface ClosingFunction5<
1919        V1 extends @Nullable Object,
1920        V2 extends @Nullable Object,
1921        V3 extends @Nullable Object,
1922        V4 extends @Nullable Object,
1923        V5 extends @Nullable Object,
1924        U extends @Nullable Object> {
1925      /**
1926       * Applies this function to five inputs, or throws an exception if unable to do so.
1927       *
1928       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1929       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1930       * (but not before this method completes), even if this method throws or the pipeline is
1931       * cancelled.
1932       */
1933      @ParametricNullness
1934      U apply(
1935          DeferredCloser closer,
1936          @ParametricNullness V1 value1,
1937          @ParametricNullness V2 value2,
1938          @ParametricNullness V3 value3,
1939          @ParametricNullness V4 value4,
1940          @ParametricNullness V5 value5)
1941          throws Exception;
1942    }
1943
1944    /**
1945     * A function that returns a {@link ClosingFuture} when applied to the values of the five
1946     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1947     * ClosingFuture, ClosingFuture)}.
1948     *
1949     * @param <V1> the type returned by the first future
1950     * @param <V2> the type returned by the second future
1951     * @param <V3> the type returned by the third future
1952     * @param <V4> the type returned by the fourth future
1953     * @param <V5> the type returned by the fifth future
1954     * @param <U> the type returned by the function
1955     */
1956    public interface AsyncClosingFunction5<
1957        V1 extends @Nullable Object,
1958        V2 extends @Nullable Object,
1959        V3 extends @Nullable Object,
1960        V4 extends @Nullable Object,
1961        V5 extends @Nullable Object,
1962        U extends @Nullable Object> {
1963      /**
1964       * Applies this function to five inputs, or throws an exception if unable to do so.
1965       *
1966       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1967       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1968       * (but not before this method completes), even if this method throws or the pipeline is
1969       * cancelled.
1970       */
1971      ClosingFuture<U> apply(
1972          DeferredCloser closer,
1973          @ParametricNullness V1 value1,
1974          @ParametricNullness V2 value2,
1975          @ParametricNullness V3 value3,
1976          @ParametricNullness V4 value4,
1977          @ParametricNullness V5 value5)
1978          throws Exception;
1979    }
1980
1981    private final ClosingFuture<V1> future1;
1982    private final ClosingFuture<V2> future2;
1983    private final ClosingFuture<V3> future3;
1984    private final ClosingFuture<V4> future4;
1985    private final ClosingFuture<V5> future5;
1986
1987    private Combiner5(
1988        ClosingFuture<V1> future1,
1989        ClosingFuture<V2> future2,
1990        ClosingFuture<V3> future3,
1991        ClosingFuture<V4> future4,
1992        ClosingFuture<V5> future5) {
1993      super(true, ImmutableList.of(future1, future2, future3, future4, future5));
1994      this.future1 = future1;
1995      this.future2 = future2;
1996      this.future3 = future3;
1997      this.future4 = future4;
1998      this.future5 = future5;
1999    }
2000
2001    /**
2002     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2003     * combining function to their values. The function can use a {@link DeferredCloser} to capture
2004     * objects to be closed when the pipeline is done.
2005     *
2006     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2007     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2008     * returned step.
2009     *
2010     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2011     *
2012     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2013     * ExecutionException} will be extracted and used as the failure of the derived step.
2014     */
2015    public <U extends @Nullable Object> ClosingFuture<U> call(
2016        ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2017      return call(
2018          new CombiningCallable<U>() {
2019            @Override
2020            @ParametricNullness
2021            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
2022              return function.apply(
2023                  closer,
2024                  peeker.getDone(future1),
2025                  peeker.getDone(future2),
2026                  peeker.getDone(future3),
2027                  peeker.getDone(future4),
2028                  peeker.getDone(future5));
2029            }
2030
2031            @Override
2032            public String toString() {
2033              return function.toString();
2034            }
2035          },
2036          executor);
2037    }
2038
2039    /**
2040     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2041     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
2042     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
2043     * captured by the returned {@link ClosingFuture}).
2044     *
2045     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2046     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2047     * returned step.
2048     *
2049     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2050     *
2051     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2052     * ExecutionException} will be extracted and used as the failure of the derived step.
2053     *
2054     * <p>If the function throws any other exception, it will be used as the failure of the derived
2055     * step.
2056     *
2057     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
2058     * the closeable objects in that {@code ClosingFuture} will be closed.
2059     *
2060     * <p>Usage guidelines for this method:
2061     *
2062     * <ul>
2063     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
2064     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
2065     *       Executor)} instead, with a function that returns the next value directly.
2066     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
2067     *       for every closeable object this step creates in order to capture it for later closing.
2068     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
2069     *       ClosingFuture} call {@link #from(ListenableFuture)}.
2070     * </ul>
2071     *
2072     * <p>The same warnings about doing heavyweight operations within {@link
2073     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
2074     */
2075    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
2076        AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2077      return callAsync(
2078          new AsyncCombiningCallable<U>() {
2079            @Override
2080            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
2081              return function.apply(
2082                  closer,
2083                  peeker.getDone(future1),
2084                  peeker.getDone(future2),
2085                  peeker.getDone(future3),
2086                  peeker.getDone(future4),
2087                  peeker.getDone(future5));
2088            }
2089
2090            @Override
2091            public String toString() {
2092              return function.toString();
2093            }
2094          },
2095          executor);
2096    }
2097  }
2098
2099  @Override
2100  public String toString() {
2101    // TODO(dpb): Better toString, in the style of Futures.transform etc.
2102    return toStringHelper(this).add("state", state.get()).addValue(future).toString();
2103  }
2104
2105  @SuppressWarnings({"removal", "Finalize"}) // b/260137033
2106  @Override
2107  protected void finalize() {
2108    if (state.get().equals(OPEN)) {
2109      logger.get().log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this);
2110      FluentFuture<V> unused = finishToFuture();
2111    }
2112  }
2113
2114  private static void closeQuietly(@Nullable AutoCloseable closeable, Executor executor) {
2115    if (closeable == null) {
2116      return;
2117    }
2118    try {
2119      executor.execute(
2120          () -> {
2121            try {
2122              closeable.close();
2123            } catch (Exception e) {
2124              /*
2125               * In guava-jre, any kind of Exception may be thrown because `closeable` has type
2126               * `AutoCloseable`.
2127               *
2128               * In guava-android, the only kinds of Exception that may be thrown are
2129               * RuntimeException and IOException because `closeable` has type `Closeable`—except
2130               * that we have to account for sneaky checked exception.
2131               */
2132              restoreInterruptIfIsInterruptedException(e);
2133              logger.get().log(WARNING, "thrown by close()", e);
2134            }
2135          });
2136    } catch (RejectedExecutionException e) {
2137      if (logger.get().isLoggable(WARNING)) {
2138        logger
2139            .get()
2140            .log(
2141                WARNING,
2142                String.format("while submitting close to %s; will close inline", executor),
2143                e);
2144      }
2145      closeQuietly(closeable, directExecutor());
2146    }
2147  }
2148
2149  private void checkAndUpdateState(State oldState, State newState) {
2150    checkState(
2151        compareAndUpdateState(oldState, newState),
2152        "Expected state to be %s, but it was %s",
2153        oldState,
2154        newState);
2155  }
2156
2157  private boolean compareAndUpdateState(State oldState, State newState) {
2158    return state.compareAndSet(oldState, newState);
2159  }
2160
2161  // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap?
2162  private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor>
2163      implements Closeable {
2164    private final DeferredCloser closer = new DeferredCloser(this);
2165    private volatile boolean closed;
2166    private volatile @Nullable CountDownLatch whenClosed;
2167
2168    <V extends @Nullable Object, U extends @Nullable Object>
2169        ListenableFuture<U> applyClosingFunction(
2170            ClosingFunction<? super V, U> transformation, @ParametricNullness V input)
2171            throws Exception {
2172      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2173      CloseableList newCloseables = new CloseableList();
2174      try {
2175        return immediateFuture(transformation.apply(newCloseables.closer, input));
2176      } finally {
2177        add(newCloseables, directExecutor());
2178      }
2179    }
2180
2181    <V extends @Nullable Object, U extends @Nullable Object>
2182        FluentFuture<U> applyAsyncClosingFunction(
2183            AsyncClosingFunction<V, U> transformation, @ParametricNullness V input)
2184            throws Exception {
2185      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2186      CloseableList newCloseables = new CloseableList();
2187      try {
2188        ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input);
2189        closingFuture.becomeSubsumedInto(newCloseables);
2190        return closingFuture.future;
2191      } finally {
2192        add(newCloseables, directExecutor());
2193      }
2194    }
2195
2196    @Override
2197    public void close() {
2198      if (closed) {
2199        return;
2200      }
2201      synchronized (this) {
2202        if (closed) {
2203          return;
2204        }
2205        closed = true;
2206      }
2207      for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) {
2208        closeQuietly(entry.getKey(), entry.getValue());
2209      }
2210      clear();
2211      if (whenClosed != null) {
2212        whenClosed.countDown();
2213      }
2214    }
2215
2216    void add(@Nullable AutoCloseable closeable, Executor executor) {
2217      checkNotNull(executor);
2218      if (closeable == null) {
2219        return;
2220      }
2221      synchronized (this) {
2222        if (!closed) {
2223          put(closeable, executor);
2224          return;
2225        }
2226      }
2227      closeQuietly(closeable, executor);
2228    }
2229
2230    /**
2231     * Returns a latch that reaches zero when this objects' deferred closeables have been closed.
2232     */
2233    CountDownLatch whenClosedCountDown() {
2234      if (closed) {
2235        return new CountDownLatch(0);
2236      }
2237      synchronized (this) {
2238        if (closed) {
2239          return new CountDownLatch(0);
2240        }
2241        checkState(whenClosed == null);
2242        return whenClosed = new CountDownLatch(1);
2243      }
2244    }
2245  }
2246
2247  /**
2248   * Returns an object that can be used to wait until this objects' deferred closeables have all had
2249   * {@link Runnable}s that close them submitted to each one's closing {@link Executor}.
2250   */
2251  @VisibleForTesting
2252  CountDownLatch whenClosedCountDown() {
2253    return closeables.whenClosedCountDown();
2254  }
2255
2256  /** The state of a {@link CloseableList}. */
2257  enum State {
2258    /** The {@link CloseableList} has not been subsumed or closed. */
2259    OPEN,
2260
2261    /**
2262     * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed
2263     * into any other.
2264     */
2265    SUBSUMED,
2266
2267    /**
2268     * Some {@link ListenableFuture} has a callback attached that will close the {@link
2269     * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed.
2270     */
2271    WILL_CLOSE,
2272
2273    /**
2274     * The callback that closes the {@link CloseableList} is running, but it has not completed. The
2275     * {@link CloseableList} may not be subsumed.
2276     */
2277    CLOSING,
2278
2279    /** The {@link CloseableList} has been closed. It may not be further subsumed. */
2280    CLOSED,
2281
2282    /**
2283     * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been
2284     * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called.
2285     */
2286    WILL_CREATE_VALUE_AND_CLOSER,
2287  }
2288}