001/*
002 * Copyright (C) 2017 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Functions.constant;
020import static com.google.common.base.MoreObjects.toStringHelper;
021import static com.google.common.base.Preconditions.checkArgument;
022import static com.google.common.base.Preconditions.checkNotNull;
023import static com.google.common.base.Preconditions.checkState;
024import static com.google.common.collect.Lists.asList;
025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED;
026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING;
027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN;
028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED;
029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE;
030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER;
031import static com.google.common.util.concurrent.Futures.getDone;
032import static com.google.common.util.concurrent.Futures.immediateFuture;
033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
034import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
035import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException;
036import static java.util.logging.Level.FINER;
037import static java.util.logging.Level.SEVERE;
038import static java.util.logging.Level.WARNING;
039
040import com.google.common.annotations.J2ktIncompatible;
041import com.google.common.annotations.VisibleForTesting;
042import com.google.common.collect.FluentIterable;
043import com.google.common.collect.ImmutableList;
044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
046import com.google.common.util.concurrent.Futures.FutureCombiner;
047import com.google.errorprone.annotations.CanIgnoreReturnValue;
048import com.google.errorprone.annotations.DoNotMock;
049import com.google.j2objc.annotations.RetainedWith;
050import java.io.Closeable;
051import java.util.IdentityHashMap;
052import java.util.Map;
053import java.util.concurrent.Callable;
054import java.util.concurrent.CancellationException;
055import java.util.concurrent.CountDownLatch;
056import java.util.concurrent.ExecutionException;
057import java.util.concurrent.Executor;
058import java.util.concurrent.Future;
059import java.util.concurrent.RejectedExecutionException;
060import java.util.concurrent.atomic.AtomicReference;
061import org.jspecify.annotations.Nullable;
062
063/**
064 * A step in a pipeline of an asynchronous computation. When the last step in the computation is
065 * complete, some objects captured during the computation are closed.
066 *
067 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an
068 * asynchronously-computed intermediate value, or else an exception that indicates the failure or
069 * cancellation of the operation so far. The only way to extract the value or exception from a step
070 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the
071 * "value" of a successful step or the "result" (value or exception) of any step.
072 *
073 * <ol>
074 *   <li>A pipeline starts at its leaf step (or steps), which is created from either a callable
075 *       block or a {@link ListenableFuture}.
076 *   <li>Each other step is derived from one or more input steps. At each step, zero or more objects
077 *       can be captured for later closing.
078 *   <li>There is one last step (the root of the tree), from which you can extract the final result
079 *       of the computation. After that result is available (or the computation fails), all objects
080 *       captured by any of the steps in the pipeline are closed.
081 * </ol>
082 *
083 * <h3>Starting a pipeline</h3>
084 *
085 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a
086 * callable block} that may capture objects for later closing. To start a pipeline from a {@link
087 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link
088 * #from(ListenableFuture)} instead.
089 *
090 * <h3>Derived steps</h3>
091 *
092 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in
093 * ways similar to {@link FluentFuture}s:
094 *
095 * <ul>
096 *   <li>by transforming the value from a successful input step,
097 *   <li>by catching the exception from a failed input step, or
098 *   <li>by combining the results of several input steps.
099 * </ul>
100 *
101 * Each derivation can capture the next value or any intermediate objects for later closing.
102 *
103 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its
104 * exception, or combine it with others, you cannot do anything else with it, including declare it
105 * to be the last step of the pipeline.
106 *
107 * <h4>Transforming</h4>
108 *
109 * To derive the next step by asynchronously applying a function to an input step's value, call
110 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction,
111 * Executor)} on the input step.
112 *
113 * <h4>Catching</h4>
114 *
115 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction,
116 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step.
117 *
118 * <h4>Combining</h4>
119 *
120 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link
121 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads.
122 *
123 * <h3>Cancelling</h3>
124 *
125 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step
126 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a
127 * successfully cancelled step will immediately start closing all objects captured for later closing
128 * by it and by its input steps.
129 *
130 * <h3>Ending a pipeline</h3>
131 *
132 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to
133 * close the captured objects automatically or manually.
134 *
135 * <h4>Automatically closing</h4>
136 *
137 * You can extract a {@link Future} that represents the result of the last step in the pipeline by
138 * calling {@link #finishToFuture()}. All objects the pipeline has captured for closing will begin
139 * to be closed asynchronously <b>after</b> the returned {@code Future} is done: the future
140 * completes before closing starts, rather than once it has finished.
141 *
142 * {@snippet :
143 * FluentFuture<UserName> userName =
144 *     ClosingFuture.submit(
145 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
146 *             executor)
147 *         .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
148 *         .transform((closer, result) -> result.get("userName"), directExecutor())
149 *         .catching(DBException.class, e -> "no user", directExecutor())
150 *         .finishToFuture();
151 * }
152 *
153 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query
154 * result cursor will both be closed, even if the operation is cancelled or fails.
155 *
156 * <h4>Manually closing</h4>
157 *
158 * If you want to close the captured objects manually, after you've used the final result, call
159 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the
160 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects.
161 *
162 * {@snippet :
163 *     ClosingFuture.submit(
164 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
165 *             executor)
166 *     .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
167 *     .transform((closer, result) -> result.get("userName"), directExecutor())
168 *     .catching(DBException.class, e -> "no user", directExecutor())
169 *     .finishToValueAndCloser(
170 *         valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor);
171 *
172 * // later
173 * try { // get() will throw if the operation failed or was cancelled.
174 *   UserName userName = userNameValueAndCloser.get();
175 *   // do something with userName
176 * } finally {
177 *   userNameValueAndCloser.closeAsync();
178 * }
179 * }
180 *
181 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and
182 * the query result cursor will both be closed, even if the operation is cancelled or fails.
183 *
184 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The
185 * automatic-closing approach described above is safer.
186 *
187 * @param <V> the type of the value of this step
188 * @since 30.0
189 */
190// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations.
191@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)")
192@J2ktIncompatible
193// TODO(dpb): GWT compatibility.
194public final class ClosingFuture<V extends @Nullable Object> {
195
196  private static final LazyLogger logger = new LazyLogger(ClosingFuture.class);
197
198  /**
199   * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is
200   * done.
201   */
202  public static final class DeferredCloser {
203    @RetainedWith private final CloseableList list;
204
205    DeferredCloser(CloseableList list) {
206      this.list = list;
207    }
208
209    /**
210     * Captures an object to be closed when a {@link ClosingFuture} pipeline is done.
211     *
212     * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code
213     * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code
214     * Closeable}. (For more about the flavors, see <a
215     * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your
216     * build</a>.)
217     *
218     * <p>Be careful when targeting an older SDK than you are building against (most commonly when
219     * building for Android): Ensure that any object you pass implements the interface not just in
220     * your current SDK version but also at the oldest version you support. For example, <a
221     * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version
222     * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper
223     * {@code Closeable} with a method reference like {@code cursor::close}.
224     *
225     * <p>Note that this method is still binary-compatible between flavors because the erasure of
226     * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}.
227     *
228     * @param closeable the object to be closed (see notes above)
229     * @param closingExecutor the object will be closed on this executor
230     * @return the first argument
231     */
232    @CanIgnoreReturnValue
233    @ParametricNullness
234    public <C extends @Nullable Object & @Nullable AutoCloseable> C eventuallyClose(
235        @ParametricNullness C closeable, Executor closingExecutor) {
236      checkNotNull(closingExecutor);
237      if (closeable != null) {
238        list.add(closeable, closingExecutor);
239      }
240      return closeable;
241    }
242  }
243
244  /**
245   * An operation that computes a result.
246   *
247   * @param <V> the type of the result
248   */
249  public interface ClosingCallable<V extends @Nullable Object> {
250    /**
251     * Computes a result, or throws an exception if unable to do so.
252     *
253     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
254     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
255     * not before this method completes), even if this method throws or the pipeline is cancelled.
256     */
257    @ParametricNullness
258    V call(DeferredCloser closer) throws Exception;
259  }
260
261  /**
262   * An operation that computes a {@link ClosingFuture} of a result.
263   *
264   * @param <V> the type of the result
265   * @since 30.1
266   */
267  public interface AsyncClosingCallable<V extends @Nullable Object> {
268    /**
269     * Computes a result, or throws an exception if unable to do so.
270     *
271     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
272     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
273     * not before this method completes), even if this method throws or the pipeline is cancelled.
274     */
275    ClosingFuture<V> call(DeferredCloser closer) throws Exception;
276  }
277
278  /**
279   * A function from an input to a result.
280   *
281   * @param <T> the type of the input to the function
282   * @param <U> the type of the result of the function
283   */
284  public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> {
285
286    /**
287     * Applies this function to an input, or throws an exception if unable to do so.
288     *
289     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
290     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
291     * not before this method completes), even if this method throws or the pipeline is cancelled.
292     */
293    @ParametricNullness
294    U apply(DeferredCloser closer, @ParametricNullness T input) throws Exception;
295  }
296
297  /**
298   * A function from an input to a {@link ClosingFuture} of a result.
299   *
300   * @param <T> the type of the input to the function
301   * @param <U> the type of the result of the function
302   */
303  public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> {
304    /**
305     * Applies this function to an input, or throws an exception if unable to do so.
306     *
307     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
308     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
309     * not before this method completes), even if this method throws or the pipeline is cancelled.
310     */
311    ClosingFuture<U> apply(DeferredCloser closer, @ParametricNullness T input) throws Exception;
312  }
313
314  /**
315   * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and
316   * allows the user to close all the closeable objects that were captured during it for later
317   * closing.
318   *
319   * <p>The asynchronous operation will have completed before this object is created.
320   *
321   * @param <V> the type of the value of a successful operation
322   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
323   */
324  public static final class ValueAndCloser<V extends @Nullable Object> {
325
326    private final ClosingFuture<? extends V> closingFuture;
327
328    ValueAndCloser(ClosingFuture<? extends V> closingFuture) {
329      this.closingFuture = checkNotNull(closingFuture);
330    }
331
332    /**
333     * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as
334     * {@link Future#get()} would.
335     *
336     * <p>Because the asynchronous operation has already completed, this method is synchronous and
337     * returns immediately.
338     *
339     * @throws CancellationException if the computation was cancelled
340     * @throws ExecutionException if the computation threw an exception
341     */
342    @ParametricNullness
343    public V get() throws ExecutionException {
344      return getDone(closingFuture.future);
345    }
346
347    /**
348     * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous
349     * operation on the {@link Executor}s specified by calls to {@link
350     * DeferredCloser#eventuallyClose(Object, Executor)}.
351     *
352     * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be
353     * closed synchronously.
354     *
355     * <p>Idempotent: objects will be closed at most once.
356     */
357    public void closeAsync() {
358      closingFuture.close();
359    }
360  }
361
362  /**
363   * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link
364   * ClosingFuture} pipeline.
365   *
366   * @param <V> the type of the final value of a successful pipeline
367   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
368   */
369  public interface ValueAndCloserConsumer<V extends @Nullable Object> {
370
371    /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */
372    void accept(ValueAndCloser<V> valueAndCloser);
373  }
374
375  /**
376   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
377   *
378   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
379   *     execution
380   */
381  public static <V extends @Nullable Object> ClosingFuture<V> submit(
382      ClosingCallable<V> callable, Executor executor) {
383    checkNotNull(callable);
384    CloseableList closeables = new CloseableList();
385    TrustedListenableFutureTask<V> task =
386        TrustedListenableFutureTask.create(
387            new Callable<V>() {
388              @Override
389              @ParametricNullness
390              public V call() throws Exception {
391                return callable.call(closeables.closer);
392              }
393
394              @Override
395              public String toString() {
396                return callable.toString();
397              }
398            });
399    executor.execute(task);
400    return new ClosingFuture<>(task, closeables);
401  }
402
403  /**
404   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
405   *
406   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
407   *     execution
408   * @since 30.1
409   */
410  public static <V extends @Nullable Object> ClosingFuture<V> submitAsync(
411      AsyncClosingCallable<V> callable, Executor executor) {
412    checkNotNull(callable);
413    CloseableList closeables = new CloseableList();
414    TrustedListenableFutureTask<V> task =
415        TrustedListenableFutureTask.create(
416            new AsyncCallable<V>() {
417              @Override
418              public ListenableFuture<V> call() throws Exception {
419                CloseableList newCloseables = new CloseableList();
420                try {
421                  ClosingFuture<V> closingFuture = callable.call(newCloseables.closer);
422                  closingFuture.becomeSubsumedInto(closeables);
423                  return closingFuture.future;
424                } finally {
425                  closeables.add(newCloseables, directExecutor());
426                }
427              }
428
429              @Override
430              public String toString() {
431                return callable.toString();
432              }
433            });
434    executor.execute(task);
435    return new ClosingFuture<>(task, closeables);
436  }
437
438  /**
439   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
440   *
441   * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V}
442   * implements {@link Closeable}. In order to start a pipeline with a value that will be closed
443   * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead.
444   */
445  public static <V extends @Nullable Object> ClosingFuture<V> from(ListenableFuture<V> future) {
446    return new ClosingFuture<>(future);
447  }
448
449  /**
450   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
451   *
452   * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)}) when
453   * the pipeline is done, even if the pipeline is canceled or fails.
454   *
455   * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its
456   * value in order to close it.
457   *
458   * @param future the future to create the {@code ClosingFuture} from. For discussion of the
459   *     future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Object,
460   *     Executor)}.
461   * @param closingExecutor the future's result will be closed on this executor
462   * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the
463   *     underlying value may never be closed if the {@link Future} is canceled after its operation
464   *     begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types,
465   *     including those that pass them to this method, with {@link #submit(ClosingCallable,
466   *     Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a
467   *     {@link ListenableFuture} that doesn't create values that should be closed, use {@link
468   *     ClosingFuture#from}.
469   */
470  @Deprecated
471  public static <C extends @Nullable Object & @Nullable AutoCloseable>
472      ClosingFuture<C> eventuallyClosing(ListenableFuture<C> future, Executor closingExecutor) {
473    checkNotNull(closingExecutor);
474    ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future));
475    Futures.addCallback(
476        future,
477        new FutureCallback<@Nullable AutoCloseable>() {
478          @Override
479          public void onSuccess(@Nullable AutoCloseable result) {
480            closingFuture.closeables.closer.eventuallyClose(result, closingExecutor);
481          }
482
483          @Override
484          public void onFailure(Throwable t) {}
485        },
486        directExecutor());
487    return closingFuture;
488  }
489
490  /**
491   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
492   *
493   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
494   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
495   */
496  public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) {
497    return new Combiner(false, futures);
498  }
499
500  /**
501   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
502   *
503   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
504   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
505   */
506  public static Combiner whenAllComplete(
507      ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) {
508    return whenAllComplete(asList(future1, moreFutures));
509  }
510
511  /**
512   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
513   * all succeed. If any fail, the resulting pipeline will fail.
514   *
515   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
516   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
517   */
518  public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) {
519    return new Combiner(true, futures);
520  }
521
522  /**
523   * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming
524   * they all succeed. If any fail, the resulting pipeline will fail.
525   *
526   * <p>Calling this method allows you to use lambdas or method references typed with the types of
527   * the input {@link ClosingFuture}s.
528   *
529   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
530   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
531   */
532  public static <V1 extends @Nullable Object, V2 extends @Nullable Object>
533      Combiner2<V1, V2> whenAllSucceed(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
534    return new Combiner2<>(future1, future2);
535  }
536
537  /**
538   * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming
539   * they all succeed. If any fail, the resulting pipeline will fail.
540   *
541   * <p>Calling this method allows you to use lambdas or method references typed with the types of
542   * the input {@link ClosingFuture}s.
543   *
544   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
545   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
546   */
547  public static <
548          V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object>
549      Combiner3<V1, V2, V3> whenAllSucceed(
550          ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
551    return new Combiner3<>(future1, future2, future3);
552  }
553
554  /**
555   * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming
556   * they all succeed. If any fail, the resulting pipeline will fail.
557   *
558   * <p>Calling this method allows you to use lambdas or method references typed with the types of
559   * the input {@link ClosingFuture}s.
560   *
561   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
562   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
563   */
564  public static <
565          V1 extends @Nullable Object,
566          V2 extends @Nullable Object,
567          V3 extends @Nullable Object,
568          V4 extends @Nullable Object>
569      Combiner4<V1, V2, V3, V4> whenAllSucceed(
570          ClosingFuture<V1> future1,
571          ClosingFuture<V2> future2,
572          ClosingFuture<V3> future3,
573          ClosingFuture<V4> future4) {
574    return new Combiner4<>(future1, future2, future3, future4);
575  }
576
577  /**
578   * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming
579   * they all succeed. If any fail, the resulting pipeline will fail.
580   *
581   * <p>Calling this method allows you to use lambdas or method references typed with the types of
582   * the input {@link ClosingFuture}s.
583   *
584   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
585   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
586   */
587  public static <
588          V1 extends @Nullable Object,
589          V2 extends @Nullable Object,
590          V3 extends @Nullable Object,
591          V4 extends @Nullable Object,
592          V5 extends @Nullable Object>
593      Combiner5<V1, V2, V3, V4, V5> whenAllSucceed(
594          ClosingFuture<V1> future1,
595          ClosingFuture<V2> future2,
596          ClosingFuture<V3> future3,
597          ClosingFuture<V4> future4,
598          ClosingFuture<V5> future5) {
599    return new Combiner5<>(future1, future2, future3, future4, future5);
600  }
601
602  /**
603   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
604   * all succeed. If any fail, the resulting pipeline will fail.
605   *
606   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
607   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
608   */
609  public static Combiner whenAllSucceed(
610      ClosingFuture<?> future1,
611      ClosingFuture<?> future2,
612      ClosingFuture<?> future3,
613      ClosingFuture<?> future4,
614      ClosingFuture<?> future5,
615      ClosingFuture<?> future6,
616      ClosingFuture<?>... moreFutures) {
617    return whenAllSucceed(
618        FluentIterable.of(future1, future2, future3, future4, future5, future6)
619            .append(moreFutures));
620  }
621
622  private final AtomicReference<State> state = new AtomicReference<>(OPEN);
623  private final CloseableList closeables;
624  private final FluentFuture<V> future;
625
626  private ClosingFuture(ListenableFuture<V> future) {
627    this(future, new CloseableList());
628  }
629
630  private ClosingFuture(ListenableFuture<V> future, CloseableList closeables) {
631    this.future = FluentFuture.from(future);
632    this.closeables = closeables;
633  }
634
635  /**
636   * Returns a future that finishes when this step does. Calling {@code get()} on the returned
637   * future returns {@code null} if the step is successful or throws the same exception that would
638   * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code
639   * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline.
640   *
641   * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls
642   * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or
643   * a derivation method <i>on the same instance</i>. This is important because calling {@code
644   * statusFuture} alone does not provide a way to close the pipeline.
645   */
646  public ListenableFuture<?> statusFuture() {
647    return nonCancellationPropagating(future.transform(constant(null), directExecutor()));
648  }
649
650  /**
651   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
652   * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed
653   * when the pipeline is done.
654   *
655   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
656   * ClosingFuture} will be equivalent to this one.
657   *
658   * <p>If the function throws an exception, that exception is used as the result of the derived
659   * {@code ClosingFuture}.
660   *
661   * <p>Example usage:
662   *
663   * {@snippet :
664   * ClosingFuture<List<Row>> rowsFuture =
665   *     queryFuture.transform((closer, result) -> result.getRows(), executor);
666   * }
667   *
668   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
669   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
670   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
671   *
672   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
673   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
674   * the original {@code ClosingFuture} instance.
675   *
676   * @param function transforms the value of this step to the value of the derived step
677   * @param executor executor to run the function in
678   * @return the derived step
679   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
680   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
681   *     finished}
682   */
683  public <U extends @Nullable Object> ClosingFuture<U> transform(
684      ClosingFunction<? super V, U> function, Executor executor) {
685    checkNotNull(function);
686    AsyncFunction<V, U> applyFunction =
687        new AsyncFunction<V, U>() {
688          @Override
689          public ListenableFuture<U> apply(V input) throws Exception {
690            return closeables.applyClosingFunction(function, input);
691          }
692
693          @Override
694          public String toString() {
695            return function.toString();
696          }
697        };
698    // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function).
699    return derive(future.transformAsync(applyFunction, executor));
700  }
701
702  /**
703   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
704   * that returns a {@code ClosingFuture} to its value. The function can use a {@link
705   * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
706   * captured by the returned {@link ClosingFuture}).
707   *
708   * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one
709   * returned by the function.
710   *
711   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
712   * ClosingFuture} will be equivalent to this one.
713   *
714   * <p>If the function throws an exception, that exception is used as the result of the derived
715   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
716   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
717   * closed.
718   *
719   * <p>Usage guidelines for this method:
720   *
721   * <ul>
722   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
723   *       {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction,
724   *       Executor)} instead, with a function that returns the next value directly.
725   *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
726   *       for every closeable object this step creates in order to capture it for later closing.
727   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
728   *       ClosingFuture} call {@link #from(ListenableFuture)}.
729   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
730   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
731   *       {@link #withoutCloser(AsyncFunction)}
732   * </ul>
733   *
734   * <p>Example usage:
735   *
736   * {@snippet :
737   * // Result.getRowsClosingFuture() returns a ClosingFuture.
738   * ClosingFuture<List<Row>> rowsFuture =
739   *     queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor);
740   *
741   * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the
742   * // number of written rows. openOutputFile() returns a FileOutputStream (which implements
743   * // Closeable).
744   * ClosingFuture<Integer> rowsFuture2 =
745   *     queryFuture.transformAsync(
746   *         (closer, result) -> {
747   *           FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor);
748   *           return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos));
749   *      },
750   *      executor);
751   *
752   * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created).
753   * ClosingFuture<List<Row>> rowsFuture3 =
754   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
755   *
756   * }
757   *
758   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
759   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
760   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
761   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
762   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
763   * responsible for completing the returned {@code ClosingFuture}.)
764   *
765   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
766   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
767   * the original {@code ClosingFuture} instance.
768   *
769   * @param function transforms the value of this step to a {@code ClosingFuture} with the value of
770   *     the derived step
771   * @param executor executor to run the function in
772   * @return the derived step
773   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
774   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
775   *     finished}
776   */
777  public <U extends @Nullable Object> ClosingFuture<U> transformAsync(
778      AsyncClosingFunction<? super V, U> function, Executor executor) {
779    checkNotNull(function);
780    AsyncFunction<V, U> applyFunction =
781        new AsyncFunction<V, U>() {
782          @Override
783          public ListenableFuture<U> apply(V input) throws Exception {
784            return closeables.applyAsyncClosingFunction(function, input);
785          }
786
787          @Override
788          public String toString() {
789            return function.toString();
790          }
791        };
792    return derive(future.transformAsync(applyFunction, executor));
793  }
794
795  /**
796   * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input,
797   * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned
798   * {@link ListenableFuture}.
799   *
800   * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction,
801   * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it
802   * meets these conditions:
803   *
804   * <ul>
805   *   <li>It does not need to capture any {@link Closeable} objects by calling {@link
806   *       DeferredCloser#eventuallyClose(Object, Executor)}.
807   *   <li>It returns a {@link ListenableFuture}.
808   * </ul>
809   *
810   * <p>Example usage:
811   *
812   * {@snippet :
813   * // Result.getRowsFuture() returns a ListenableFuture.
814   * ClosingFuture<List<Row>> rowsFuture =
815   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
816   * }
817   *
818   * @param function transforms the value of a {@code ClosingFuture} step to a {@link
819   *     ListenableFuture} with the value of a derived step
820   */
821  public static <V extends @Nullable Object, U extends @Nullable Object>
822      AsyncClosingFunction<V, U> withoutCloser(AsyncFunction<V, U> function) {
823    checkNotNull(function);
824    return (closer, input) -> ClosingFuture.from(function.apply(input));
825  }
826
827  /**
828   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
829   * to its exception if it is an instance of a given exception type. The function can use a {@link
830   * DeferredCloser} to capture objects to be closed when the pipeline is done.
831   *
832   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
833   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
834   * one.
835   *
836   * <p>If the function throws an exception, that exception is used as the result of the derived
837   * {@code ClosingFuture}.
838   *
839   * <p>Example usage:
840   *
841   * {@snippet :
842   * ClosingFuture<QueryResult> queryFuture =
843   *     queryFuture.catching(
844   *         QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor);
845   * }
846   *
847   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
848   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
849   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
850   *
851   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
852   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
853   * the original {@code ClosingFuture} instance.
854   *
855   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
856   *     type is matched against this step's exception. "This step's exception" means the cause of
857   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
858   *     underlying this step or, if {@code get()} throws a different kind of exception, that
859   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
860   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
861   * @param fallback the function to be called if this step fails with the expected exception type.
862   *     The function's argument is this step's exception. "This step's exception" means the cause
863   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
864   *     underlying this step or, if {@code get()} throws a different kind of exception, that
865   *     exception itself.
866   * @param executor the executor that runs {@code fallback} if the input fails
867   */
868  public <X extends Throwable> ClosingFuture<V> catching(
869      Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) {
870    return catchingMoreGeneric(exceptionType, fallback, executor);
871  }
872
873  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
874  private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric(
875      Class<X> exceptionType, ClosingFunction<? super X, W> fallback, Executor executor) {
876    checkNotNull(fallback);
877    AsyncFunction<X, W> applyFallback =
878        new AsyncFunction<X, W>() {
879          @Override
880          public ListenableFuture<W> apply(X exception) throws Exception {
881            return closeables.applyClosingFunction(fallback, exception);
882          }
883
884          @Override
885          public String toString() {
886            return fallback.toString();
887          }
888        };
889    // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function).
890    return derive(future.catchingAsync(exceptionType, applyFallback, executor));
891  }
892
893  /**
894   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
895   * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception
896   * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the
897   * pipeline is done (other than those captured by the returned {@link ClosingFuture}).
898   *
899   * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code
900   * ClosingFuture} will be equivalent to the one returned by the function.
901   *
902   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
903   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
904   * one.
905   *
906   * <p>If the function throws an exception, that exception is used as the result of the derived
907   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
908   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
909   * closed.
910   *
911   * <p>Usage guidelines for this method:
912   *
913   * <ul>
914   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
915   *       {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class,
916   *       ClosingFunction, Executor)} instead, with a function that returns the next value
917   *       directly.
918   *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
919   *       for every closeable object this step creates in order to capture it for later closing.
920   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
921   *       ClosingFuture} call {@link #from(ListenableFuture)}.
922   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
923   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
924   *       {@link #withoutCloser(AsyncFunction)}
925   * </ul>
926   *
927   * <p>Example usage:
928   *
929   * {@snippet :
930   * // Fall back to a secondary input stream in case of IOException.
931   * ClosingFuture<InputStream> inputFuture =
932   *     firstInputFuture.catchingAsync(
933   *         IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor);
934   * }
935   *
936   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
937   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
938   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
939   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
940   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
941   * responsible for completing the returned {@code ClosingFuture}.)
942   *
943   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
944   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
945   * the original {@code ClosingFuture} instance.
946   *
947   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
948   *     type is matched against this step's exception. "This step's exception" means the cause of
949   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
950   *     underlying this step or, if {@code get()} throws a different kind of exception, that
951   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
952   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
953   * @param fallback the function to be called if this step fails with the expected exception type.
954   *     The function's argument is this step's exception. "This step's exception" means the cause
955   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
956   *     underlying this step or, if {@code get()} throws a different kind of exception, that
957   *     exception itself.
958   * @param executor the executor that runs {@code fallback} if the input fails
959   */
960  // TODO(dpb): Should this do something special if the function throws CancellationException or
961  // ExecutionException?
962  public <X extends Throwable> ClosingFuture<V> catchingAsync(
963      Class<X> exceptionType,
964      AsyncClosingFunction<? super X, ? extends V> fallback,
965      Executor executor) {
966    return catchingAsyncMoreGeneric(exceptionType, fallback, executor);
967  }
968
969  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
970  private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric(
971      Class<X> exceptionType, AsyncClosingFunction<? super X, W> fallback, Executor executor) {
972    checkNotNull(fallback);
973    AsyncFunction<X, W> asyncFunction =
974        new AsyncFunction<X, W>() {
975          @Override
976          public ListenableFuture<W> apply(X exception) throws Exception {
977            return closeables.applyAsyncClosingFunction(fallback, exception);
978          }
979
980          @Override
981          public String toString() {
982            return fallback.toString();
983          }
984        };
985    return derive(future.catchingAsync(exceptionType, asyncFunction, executor));
986  }
987
988  /**
989   * Marks this step as the last step in the {@code ClosingFuture} pipeline.
990   *
991   * <p>The returned {@link Future} is completed when the pipeline's computation completes, or when
992   * the pipeline is cancelled.
993   *
994   * <p>All objects the pipeline has captured for closing will begin to be closed asynchronously
995   * <b>after</b> the returned {@code Future} is done: the future completes before closing starts,
996   * rather than once it has finished.
997   *
998   * <p>After calling this method, you may not call {@link
999   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other
1000   * derivation method on the original {@code ClosingFuture} instance.
1001   *
1002   * @return a {@link Future} that represents the final value or exception of the pipeline
1003   */
1004  public FluentFuture<V> finishToFuture() {
1005    if (compareAndUpdateState(OPEN, WILL_CLOSE)) {
1006      logger.get().log(FINER, "will close {0}", this);
1007      future.addListener(
1008          () -> {
1009            checkAndUpdateState(WILL_CLOSE, CLOSING);
1010            close();
1011            checkAndUpdateState(CLOSING, CLOSED);
1012          },
1013          directExecutor());
1014    } else {
1015      switch (state.get()) {
1016        case SUBSUMED:
1017          throw new IllegalStateException(
1018              "Cannot call finishToFuture() after deriving another step");
1019
1020        case WILL_CREATE_VALUE_AND_CLOSER:
1021          throw new IllegalStateException(
1022              "Cannot call finishToFuture() after calling finishToValueAndCloser()");
1023
1024        case WILL_CLOSE:
1025        case CLOSING:
1026        case CLOSED:
1027          throw new IllegalStateException("Cannot call finishToFuture() twice");
1028
1029        case OPEN:
1030          throw new AssertionError();
1031      }
1032    }
1033    return future;
1034  }
1035
1036  /**
1037   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done,
1038   * {@code receiver} will be called with an object that contains the result of the operation. The
1039   * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use.
1040   *
1041   * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or
1042   * any other derivation method on the original {@code ClosingFuture} instance.
1043   *
1044   * @param consumer a callback whose method will be called (using {@code executor}) when this
1045   *     operation is done
1046   */
1047  public void finishToValueAndCloser(
1048      ValueAndCloserConsumer<? super V> consumer, Executor executor) {
1049    checkNotNull(consumer);
1050    if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) {
1051      switch (state.get()) {
1052        case SUBSUMED:
1053          throw new IllegalStateException(
1054              "Cannot call finishToValueAndCloser() after deriving another step");
1055
1056        case WILL_CLOSE:
1057        case CLOSING:
1058        case CLOSED:
1059          throw new IllegalStateException(
1060              "Cannot call finishToValueAndCloser() after calling finishToFuture()");
1061
1062        case WILL_CREATE_VALUE_AND_CLOSER:
1063          throw new IllegalStateException("Cannot call finishToValueAndCloser() twice");
1064
1065        case OPEN:
1066          break;
1067      }
1068      throw new AssertionError(state);
1069    }
1070    future.addListener(() -> provideValueAndCloser(consumer, ClosingFuture.this), executor);
1071  }
1072
1073  private static <C extends @Nullable Object, V extends C> void provideValueAndCloser(
1074      ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) {
1075    consumer.accept(new ValueAndCloser<C>(closingFuture));
1076  }
1077
1078  /**
1079   * Attempts to cancel execution of this step. This attempt will fail if the step has already
1080   * completed, has already been cancelled, or could not be cancelled for some other reason. If
1081   * successful, and this step has not started when {@code cancel} is called, this step should never
1082   * run.
1083   *
1084   * <p>If successful, causes the objects captured by this step (if already started) and its input
1085   * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls
1086   * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously.
1087   *
1088   * @param mayInterruptIfRunning {@code true} if the thread executing this task should be
1089   *     interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be
1090   *     cancelled regardless
1091   * @return {@code false} if the step could not be cancelled, typically because it has already
1092   *     completed normally; {@code true} otherwise
1093   */
1094  @CanIgnoreReturnValue
1095  @SuppressWarnings("Interruption") // We are propagating an interrupt from a caller.
1096  public boolean cancel(boolean mayInterruptIfRunning) {
1097    logger.get().log(FINER, "cancelling {0}", this);
1098    boolean cancelled = future.cancel(mayInterruptIfRunning);
1099    if (cancelled) {
1100      close();
1101    }
1102    return cancelled;
1103  }
1104
1105  private void close() {
1106    logger.get().log(FINER, "closing {0}", this);
1107    closeables.close();
1108  }
1109
1110  private <U extends @Nullable Object> ClosingFuture<U> derive(FluentFuture<U> future) {
1111    ClosingFuture<U> derived = new ClosingFuture<>(future);
1112    becomeSubsumedInto(derived.closeables);
1113    return derived;
1114  }
1115
1116  private void becomeSubsumedInto(CloseableList otherCloseables) {
1117    checkAndUpdateState(OPEN, SUBSUMED);
1118    otherCloseables.add(closeables, directExecutor());
1119  }
1120
1121  /**
1122   * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link
1123   * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}.
1124   *
1125   * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object.
1126   */
1127  public static final class Peeker {
1128    private final ImmutableList<ClosingFuture<?>> futures;
1129    private volatile boolean beingCalled;
1130
1131    private Peeker(ImmutableList<ClosingFuture<?>> futures) {
1132      this.futures = checkNotNull(futures);
1133    }
1134
1135    /**
1136     * Returns the value of {@code closingFuture}.
1137     *
1138     * @throws ExecutionException if {@code closingFuture} is a failed step
1139     * @throws CancellationException if the {@code closingFuture}'s future was cancelled
1140     * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to
1141     *     {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)}
1142     * @throws IllegalStateException if called outside of a call to {@link
1143     *     CombiningCallable#call(DeferredCloser, Peeker)} or {@link
1144     *     AsyncCombiningCallable#call(DeferredCloser, Peeker)}
1145     */
1146    @ParametricNullness
1147    public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture)
1148        throws ExecutionException {
1149      checkState(beingCalled);
1150      checkArgument(futures.contains(closingFuture));
1151      return Futures.getDone(closingFuture.future);
1152    }
1153
1154    @ParametricNullness
1155    private <V extends @Nullable Object> V call(
1156        CombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1157      beingCalled = true;
1158      CloseableList newCloseables = new CloseableList();
1159      try {
1160        return combiner.call(newCloseables.closer, this);
1161      } finally {
1162        closeables.add(newCloseables, directExecutor());
1163        beingCalled = false;
1164      }
1165    }
1166
1167    private <V extends @Nullable Object> FluentFuture<V> callAsync(
1168        AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1169      beingCalled = true;
1170      CloseableList newCloseables = new CloseableList();
1171      try {
1172        ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this);
1173        closingFuture.becomeSubsumedInto(closeables);
1174        return closingFuture.future;
1175      } finally {
1176        closeables.add(newCloseables, directExecutor());
1177        beingCalled = false;
1178      }
1179    }
1180  }
1181
1182  /**
1183   * A builder of a {@link ClosingFuture} step that is derived from more than one input step.
1184   *
1185   * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to
1186   * instantiate this class.
1187   *
1188   * <p>Example:
1189   *
1190   * {@snippet :
1191   * final ClosingFuture<BufferedReader> file1ReaderFuture = ...;
1192   * final ClosingFuture<BufferedReader> file2ReaderFuture = ...;
1193   * ListenableFuture<Integer> numberOfDifferentLines =
1194   *       ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture)
1195   *           .call(
1196   *               (closer, peeker) -> {
1197   *                 BufferedReader file1Reader = peeker.getDone(file1ReaderFuture);
1198   *                 BufferedReader file2Reader = peeker.getDone(file2ReaderFuture);
1199   *                 return countDifferentLines(file1Reader, file2Reader);
1200   *               },
1201   *               executor)
1202   *           .closing(executor);
1203   * }
1204   */
1205  @DoNotMock("Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.")
1206  public static class Combiner {
1207
1208    private final CloseableList closeables = new CloseableList();
1209
1210    /**
1211     * An operation that returns a result and may throw an exception.
1212     *
1213     * @param <V> the type of the result
1214     */
1215    public interface CombiningCallable<V extends @Nullable Object> {
1216      /**
1217       * Computes a result, or throws an exception if unable to do so.
1218       *
1219       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1220       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1221       * (but not before this method completes), even if this method throws or the pipeline is
1222       * cancelled.
1223       *
1224       * @param peeker used to get the value of any of the input futures
1225       */
1226      @ParametricNullness
1227      V call(DeferredCloser closer, Peeker peeker) throws Exception;
1228    }
1229
1230    /**
1231     * An operation that returns a {@link ClosingFuture} result and may throw an exception.
1232     *
1233     * @param <V> the type of the result
1234     */
1235    public interface AsyncCombiningCallable<V extends @Nullable Object> {
1236      /**
1237       * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so.
1238       *
1239       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1240       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1241       * (but not before this method completes), even if this method throws or the pipeline is
1242       * cancelled.
1243       *
1244       * @param peeker used to get the value of any of the input futures
1245       */
1246      ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception;
1247    }
1248
1249    private final boolean allMustSucceed;
1250    protected final ImmutableList<ClosingFuture<?>> inputs;
1251
1252    private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) {
1253      this.allMustSucceed = allMustSucceed;
1254      this.inputs = ImmutableList.copyOf(inputs);
1255      for (ClosingFuture<?> input : inputs) {
1256        input.becomeSubsumedInto(closeables);
1257      }
1258    }
1259
1260    /**
1261     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1262     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1263     * objects to be closed when the pipeline is done.
1264     *
1265     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1266     * fail, so will the returned step.
1267     *
1268     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1269     * cancelled.
1270     *
1271     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1272     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1273     */
1274    public <V extends @Nullable Object> ClosingFuture<V> call(
1275        CombiningCallable<V> combiningCallable, Executor executor) {
1276      Callable<V> callable =
1277          new Callable<V>() {
1278            @Override
1279            @ParametricNullness
1280            public V call() throws Exception {
1281              return new Peeker(inputs).call(combiningCallable, closeables);
1282            }
1283
1284            @Override
1285            public String toString() {
1286              return combiningCallable.toString();
1287            }
1288          };
1289      ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor));
1290      derived.closeables.add(closeables, directExecutor());
1291      return derived;
1292    }
1293
1294    /**
1295     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1296     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1297     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1298     * captured by the returned {@link ClosingFuture}).
1299     *
1300     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1301     * fail, so will the returned step.
1302     *
1303     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1304     * cancelled.
1305     *
1306     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1307     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1308     *
1309     * <p>If the combiningCallable throws any other exception, it will be used as the failure of the
1310     * derived step.
1311     *
1312     * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture},
1313     * then none of the closeable objects in that {@code ClosingFuture} will be closed.
1314     *
1315     * <p>Usage guidelines for this method:
1316     *
1317     * <ul>
1318     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1319     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1320     *       Executor)} instead, with a function that returns the next value directly.
1321     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1322     *       for every closeable object this step creates in order to capture it for later closing.
1323     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1324     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1325     * </ul>
1326     *
1327     * <p>The same warnings about doing heavyweight operations within {@link
1328     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1329     */
1330    public <V extends @Nullable Object> ClosingFuture<V> callAsync(
1331        AsyncCombiningCallable<V> combiningCallable, Executor executor) {
1332      AsyncCallable<V> asyncCallable =
1333          new AsyncCallable<V>() {
1334            @Override
1335            public ListenableFuture<V> call() throws Exception {
1336              return new Peeker(inputs).callAsync(combiningCallable, closeables);
1337            }
1338
1339            @Override
1340            public String toString() {
1341              return combiningCallable.toString();
1342            }
1343          };
1344      ClosingFuture<V> derived =
1345          new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor));
1346      derived.closeables.add(closeables, directExecutor());
1347      return derived;
1348    }
1349
1350    private FutureCombiner<@Nullable Object> futureCombiner() {
1351      return allMustSucceed
1352          ? Futures.whenAllSucceed(inputFutures())
1353          : Futures.whenAllComplete(inputFutures());
1354    }
1355
1356    private ImmutableList<FluentFuture<?>> inputFutures() {
1357      return FluentIterable.from(inputs)
1358          .<FluentFuture<?>>transform(future -> future.future)
1359          .toList();
1360    }
1361  }
1362
1363  /**
1364   * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link
1365   * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this
1366   * combination.
1367   *
1368   * @param <V1> the type returned by the first future
1369   * @param <V2> the type returned by the second future
1370   */
1371  public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object>
1372      extends Combiner {
1373
1374    /**
1375     * A function that returns a value when applied to the values of the two futures passed to
1376     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1377     *
1378     * @param <V1> the type returned by the first future
1379     * @param <V2> the type returned by the second future
1380     * @param <U> the type returned by the function
1381     */
1382    public interface ClosingFunction2<
1383        V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> {
1384
1385      /**
1386       * Applies this function to two inputs, or throws an exception if unable to do so.
1387       *
1388       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1389       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1390       * (but not before this method completes), even if this method throws or the pipeline is
1391       * cancelled.
1392       */
1393      @ParametricNullness
1394      U apply(DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2)
1395          throws Exception;
1396    }
1397
1398    /**
1399     * A function that returns a {@link ClosingFuture} when applied to the values of the two futures
1400     * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1401     *
1402     * @param <V1> the type returned by the first future
1403     * @param <V2> the type returned by the second future
1404     * @param <U> the type returned by the function
1405     */
1406    public interface AsyncClosingFunction2<
1407        V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> {
1408
1409      /**
1410       * Applies this function to two inputs, or throws an exception if unable to do so.
1411       *
1412       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1413       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1414       * (but not before this method completes), even if this method throws or the pipeline is
1415       * cancelled.
1416       */
1417      ClosingFuture<U> apply(
1418          DeferredCloser closer, @ParametricNullness V1 value1, @ParametricNullness V2 value2)
1419          throws Exception;
1420    }
1421
1422    private final ClosingFuture<V1> future1;
1423    private final ClosingFuture<V2> future2;
1424
1425    private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
1426      super(true, ImmutableList.of(future1, future2));
1427      this.future1 = future1;
1428      this.future2 = future2;
1429    }
1430
1431    /**
1432     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1433     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1434     * objects to be closed when the pipeline is done.
1435     *
1436     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1437     * any of the inputs fail, so will the returned step.
1438     *
1439     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1440     *
1441     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1442     * ExecutionException} will be extracted and used as the failure of the derived step.
1443     */
1444    public <U extends @Nullable Object> ClosingFuture<U> call(
1445        ClosingFunction2<V1, V2, U> function, Executor executor) {
1446      return call(
1447          new CombiningCallable<U>() {
1448            @Override
1449            @ParametricNullness
1450            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1451              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1452            }
1453
1454            @Override
1455            public String toString() {
1456              return function.toString();
1457            }
1458          },
1459          executor);
1460    }
1461
1462    /**
1463     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1464     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1465     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1466     * captured by the returned {@link ClosingFuture}).
1467     *
1468     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1469     * any of the inputs fail, so will the returned step.
1470     *
1471     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1472     *
1473     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1474     * ExecutionException} will be extracted and used as the failure of the derived step.
1475     *
1476     * <p>If the function throws any other exception, it will be used as the failure of the derived
1477     * step.
1478     *
1479     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1480     * the closeable objects in that {@code ClosingFuture} will be closed.
1481     *
1482     * <p>Usage guidelines for this method:
1483     *
1484     * <ul>
1485     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1486     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1487     *       Executor)} instead, with a function that returns the next value directly.
1488     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1489     *       for every closeable object this step creates in order to capture it for later closing.
1490     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1491     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1492     * </ul>
1493     *
1494     * <p>The same warnings about doing heavyweight operations within {@link
1495     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1496     */
1497    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1498        AsyncClosingFunction2<V1, V2, U> function, Executor executor) {
1499      return callAsync(
1500          new AsyncCombiningCallable<U>() {
1501            @Override
1502            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1503              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1504            }
1505
1506            @Override
1507            public String toString() {
1508              return function.toString();
1509            }
1510          },
1511          executor);
1512    }
1513  }
1514
1515  /**
1516   * A generic {@link Combiner} that lets you use a lambda or method reference to combine three
1517   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1518   * ClosingFuture)} to start this combination.
1519   *
1520   * @param <V1> the type returned by the first future
1521   * @param <V2> the type returned by the second future
1522   * @param <V3> the type returned by the third future
1523   */
1524  public static final class Combiner3<
1525          V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object>
1526      extends Combiner {
1527    /**
1528     * A function that returns a value when applied to the values of the three futures passed to
1529     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1530     *
1531     * @param <V1> the type returned by the first future
1532     * @param <V2> the type returned by the second future
1533     * @param <V3> the type returned by the third future
1534     * @param <U> the type returned by the function
1535     */
1536    public interface ClosingFunction3<
1537        V1 extends @Nullable Object,
1538        V2 extends @Nullable Object,
1539        V3 extends @Nullable Object,
1540        U extends @Nullable Object> {
1541      /**
1542       * Applies this function to three inputs, or throws an exception if unable to do so.
1543       *
1544       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1545       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1546       * (but not before this method completes), even if this method throws or the pipeline is
1547       * cancelled.
1548       */
1549      @ParametricNullness
1550      U apply(
1551          DeferredCloser closer,
1552          @ParametricNullness V1 value1,
1553          @ParametricNullness V2 value2,
1554          @ParametricNullness V3 value3)
1555          throws Exception;
1556    }
1557
1558    /**
1559     * A function that returns a {@link ClosingFuture} when applied to the values of the three
1560     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1561     *
1562     * @param <V1> the type returned by the first future
1563     * @param <V2> the type returned by the second future
1564     * @param <V3> the type returned by the third future
1565     * @param <U> the type returned by the function
1566     */
1567    public interface AsyncClosingFunction3<
1568        V1 extends @Nullable Object,
1569        V2 extends @Nullable Object,
1570        V3 extends @Nullable Object,
1571        U extends @Nullable Object> {
1572      /**
1573       * Applies this function to three inputs, or throws an exception if unable to do so.
1574       *
1575       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1576       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1577       * (but not before this method completes), even if this method throws or the pipeline is
1578       * cancelled.
1579       */
1580      ClosingFuture<U> apply(
1581          DeferredCloser closer,
1582          @ParametricNullness V1 value1,
1583          @ParametricNullness V2 value2,
1584          @ParametricNullness V3 value3)
1585          throws Exception;
1586    }
1587
1588    private final ClosingFuture<V1> future1;
1589    private final ClosingFuture<V2> future2;
1590    private final ClosingFuture<V3> future3;
1591
1592    private Combiner3(
1593        ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
1594      super(true, ImmutableList.of(future1, future2, future3));
1595      this.future1 = future1;
1596      this.future2 = future2;
1597      this.future3 = future3;
1598    }
1599
1600    /**
1601     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1602     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1603     * objects to be closed when the pipeline is done.
1604     *
1605     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1606     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1607     *
1608     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1609     *
1610     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1611     * ExecutionException} will be extracted and used as the failure of the derived step.
1612     */
1613    public <U extends @Nullable Object> ClosingFuture<U> call(
1614        ClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1615      return call(
1616          new CombiningCallable<U>() {
1617            @Override
1618            @ParametricNullness
1619            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1620              return function.apply(
1621                  closer,
1622                  peeker.getDone(future1),
1623                  peeker.getDone(future2),
1624                  peeker.getDone(future3));
1625            }
1626
1627            @Override
1628            public String toString() {
1629              return function.toString();
1630            }
1631          },
1632          executor);
1633    }
1634
1635    /**
1636     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1637     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1638     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1639     * captured by the returned {@link ClosingFuture}).
1640     *
1641     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1642     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1643     *
1644     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1645     *
1646     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1647     * ExecutionException} will be extracted and used as the failure of the derived step.
1648     *
1649     * <p>If the function throws any other exception, it will be used as the failure of the derived
1650     * step.
1651     *
1652     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1653     * the closeable objects in that {@code ClosingFuture} will be closed.
1654     *
1655     * <p>Usage guidelines for this method:
1656     *
1657     * <ul>
1658     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1659     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1660     *       Executor)} instead, with a function that returns the next value directly.
1661     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1662     *       for every closeable object this step creates in order to capture it for later closing.
1663     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1664     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1665     * </ul>
1666     *
1667     * <p>The same warnings about doing heavyweight operations within {@link
1668     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1669     */
1670    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1671        AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1672      return callAsync(
1673          new AsyncCombiningCallable<U>() {
1674            @Override
1675            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1676              return function.apply(
1677                  closer,
1678                  peeker.getDone(future1),
1679                  peeker.getDone(future2),
1680                  peeker.getDone(future3));
1681            }
1682
1683            @Override
1684            public String toString() {
1685              return function.toString();
1686            }
1687          },
1688          executor);
1689    }
1690  }
1691
1692  /**
1693   * A generic {@link Combiner} that lets you use a lambda or method reference to combine four
1694   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1695   * ClosingFuture)} to start this combination.
1696   *
1697   * @param <V1> the type returned by the first future
1698   * @param <V2> the type returned by the second future
1699   * @param <V3> the type returned by the third future
1700   * @param <V4> the type returned by the fourth future
1701   */
1702  public static final class Combiner4<
1703          V1 extends @Nullable Object,
1704          V2 extends @Nullable Object,
1705          V3 extends @Nullable Object,
1706          V4 extends @Nullable Object>
1707      extends Combiner {
1708    /**
1709     * A function that returns a value when applied to the values of the four futures passed to
1710     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}.
1711     *
1712     * @param <V1> the type returned by the first future
1713     * @param <V2> the type returned by the second future
1714     * @param <V3> the type returned by the third future
1715     * @param <V4> the type returned by the fourth future
1716     * @param <U> the type returned by the function
1717     */
1718    public interface ClosingFunction4<
1719        V1 extends @Nullable Object,
1720        V2 extends @Nullable Object,
1721        V3 extends @Nullable Object,
1722        V4 extends @Nullable Object,
1723        U extends @Nullable Object> {
1724      /**
1725       * Applies this function to four inputs, or throws an exception if unable to do so.
1726       *
1727       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1728       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1729       * (but not before this method completes), even if this method throws or the pipeline is
1730       * cancelled.
1731       */
1732      @ParametricNullness
1733      U apply(
1734          DeferredCloser closer,
1735          @ParametricNullness V1 value1,
1736          @ParametricNullness V2 value2,
1737          @ParametricNullness V3 value3,
1738          @ParametricNullness V4 value4)
1739          throws Exception;
1740    }
1741
1742    /**
1743     * A function that returns a {@link ClosingFuture} when applied to the values of the four
1744     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1745     * ClosingFuture)}.
1746     *
1747     * @param <V1> the type returned by the first future
1748     * @param <V2> the type returned by the second future
1749     * @param <V3> the type returned by the third future
1750     * @param <V4> the type returned by the fourth future
1751     * @param <U> the type returned by the function
1752     */
1753    public interface AsyncClosingFunction4<
1754        V1 extends @Nullable Object,
1755        V2 extends @Nullable Object,
1756        V3 extends @Nullable Object,
1757        V4 extends @Nullable Object,
1758        U extends @Nullable Object> {
1759      /**
1760       * Applies this function to four inputs, or throws an exception if unable to do so.
1761       *
1762       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1763       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1764       * (but not before this method completes), even if this method throws or the pipeline is
1765       * cancelled.
1766       */
1767      ClosingFuture<U> apply(
1768          DeferredCloser closer,
1769          @ParametricNullness V1 value1,
1770          @ParametricNullness V2 value2,
1771          @ParametricNullness V3 value3,
1772          @ParametricNullness V4 value4)
1773          throws Exception;
1774    }
1775
1776    private final ClosingFuture<V1> future1;
1777    private final ClosingFuture<V2> future2;
1778    private final ClosingFuture<V3> future3;
1779    private final ClosingFuture<V4> future4;
1780
1781    private Combiner4(
1782        ClosingFuture<V1> future1,
1783        ClosingFuture<V2> future2,
1784        ClosingFuture<V3> future3,
1785        ClosingFuture<V4> future4) {
1786      super(true, ImmutableList.of(future1, future2, future3, future4));
1787      this.future1 = future1;
1788      this.future2 = future2;
1789      this.future3 = future3;
1790      this.future4 = future4;
1791    }
1792
1793    /**
1794     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1795     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1796     * objects to be closed when the pipeline is done.
1797     *
1798     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1799     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1800     *
1801     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1802     *
1803     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1804     * ExecutionException} will be extracted and used as the failure of the derived step.
1805     */
1806    public <U extends @Nullable Object> ClosingFuture<U> call(
1807        ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1808      return call(
1809          new CombiningCallable<U>() {
1810            @Override
1811            @ParametricNullness
1812            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1813              return function.apply(
1814                  closer,
1815                  peeker.getDone(future1),
1816                  peeker.getDone(future2),
1817                  peeker.getDone(future3),
1818                  peeker.getDone(future4));
1819            }
1820
1821            @Override
1822            public String toString() {
1823              return function.toString();
1824            }
1825          },
1826          executor);
1827    }
1828
1829    /**
1830     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1831     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1832     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1833     * captured by the returned {@link ClosingFuture}).
1834     *
1835     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1836     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1837     *
1838     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1839     *
1840     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1841     * ExecutionException} will be extracted and used as the failure of the derived step.
1842     *
1843     * <p>If the function throws any other exception, it will be used as the failure of the derived
1844     * step.
1845     *
1846     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1847     * the closeable objects in that {@code ClosingFuture} will be closed.
1848     *
1849     * <p>Usage guidelines for this method:
1850     *
1851     * <ul>
1852     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1853     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1854     *       Executor)} instead, with a function that returns the next value directly.
1855     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
1856     *       for every closeable object this step creates in order to capture it for later closing.
1857     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1858     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1859     * </ul>
1860     *
1861     * <p>The same warnings about doing heavyweight operations within {@link
1862     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1863     */
1864    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1865        AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1866      return callAsync(
1867          new AsyncCombiningCallable<U>() {
1868            @Override
1869            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1870              return function.apply(
1871                  closer,
1872                  peeker.getDone(future1),
1873                  peeker.getDone(future2),
1874                  peeker.getDone(future3),
1875                  peeker.getDone(future4));
1876            }
1877
1878            @Override
1879            public String toString() {
1880              return function.toString();
1881            }
1882          },
1883          executor);
1884    }
1885  }
1886
1887  /**
1888   * A generic {@link Combiner} that lets you use a lambda or method reference to combine five
1889   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1890   * ClosingFuture, ClosingFuture)} to start this combination.
1891   *
1892   * @param <V1> the type returned by the first future
1893   * @param <V2> the type returned by the second future
1894   * @param <V3> the type returned by the third future
1895   * @param <V4> the type returned by the fourth future
1896   * @param <V5> the type returned by the fifth future
1897   */
1898  public static final class Combiner5<
1899          V1 extends @Nullable Object,
1900          V2 extends @Nullable Object,
1901          V3 extends @Nullable Object,
1902          V4 extends @Nullable Object,
1903          V5 extends @Nullable Object>
1904      extends Combiner {
1905    /**
1906     * A function that returns a value when applied to the values of the five futures passed to
1907     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture,
1908     * ClosingFuture)}.
1909     *
1910     * @param <V1> the type returned by the first future
1911     * @param <V2> the type returned by the second future
1912     * @param <V3> the type returned by the third future
1913     * @param <V4> the type returned by the fourth future
1914     * @param <V5> the type returned by the fifth future
1915     * @param <U> the type returned by the function
1916     */
1917    public interface ClosingFunction5<
1918        V1 extends @Nullable Object,
1919        V2 extends @Nullable Object,
1920        V3 extends @Nullable Object,
1921        V4 extends @Nullable Object,
1922        V5 extends @Nullable Object,
1923        U extends @Nullable Object> {
1924      /**
1925       * Applies this function to five inputs, or throws an exception if unable to do so.
1926       *
1927       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1928       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1929       * (but not before this method completes), even if this method throws or the pipeline is
1930       * cancelled.
1931       */
1932      @ParametricNullness
1933      U apply(
1934          DeferredCloser closer,
1935          @ParametricNullness V1 value1,
1936          @ParametricNullness V2 value2,
1937          @ParametricNullness V3 value3,
1938          @ParametricNullness V4 value4,
1939          @ParametricNullness V5 value5)
1940          throws Exception;
1941    }
1942
1943    /**
1944     * A function that returns a {@link ClosingFuture} when applied to the values of the five
1945     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1946     * ClosingFuture, ClosingFuture)}.
1947     *
1948     * @param <V1> the type returned by the first future
1949     * @param <V2> the type returned by the second future
1950     * @param <V3> the type returned by the third future
1951     * @param <V4> the type returned by the fourth future
1952     * @param <V5> the type returned by the fifth future
1953     * @param <U> the type returned by the function
1954     */
1955    public interface AsyncClosingFunction5<
1956        V1 extends @Nullable Object,
1957        V2 extends @Nullable Object,
1958        V3 extends @Nullable Object,
1959        V4 extends @Nullable Object,
1960        V5 extends @Nullable Object,
1961        U extends @Nullable Object> {
1962      /**
1963       * Applies this function to five inputs, or throws an exception if unable to do so.
1964       *
1965       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Object, Executor)
1966       * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done
1967       * (but not before this method completes), even if this method throws or the pipeline is
1968       * cancelled.
1969       */
1970      ClosingFuture<U> apply(
1971          DeferredCloser closer,
1972          @ParametricNullness V1 value1,
1973          @ParametricNullness V2 value2,
1974          @ParametricNullness V3 value3,
1975          @ParametricNullness V4 value4,
1976          @ParametricNullness V5 value5)
1977          throws Exception;
1978    }
1979
1980    private final ClosingFuture<V1> future1;
1981    private final ClosingFuture<V2> future2;
1982    private final ClosingFuture<V3> future3;
1983    private final ClosingFuture<V4> future4;
1984    private final ClosingFuture<V5> future5;
1985
1986    private Combiner5(
1987        ClosingFuture<V1> future1,
1988        ClosingFuture<V2> future2,
1989        ClosingFuture<V3> future3,
1990        ClosingFuture<V4> future4,
1991        ClosingFuture<V5> future5) {
1992      super(true, ImmutableList.of(future1, future2, future3, future4, future5));
1993      this.future1 = future1;
1994      this.future2 = future2;
1995      this.future3 = future3;
1996      this.future4 = future4;
1997      this.future5 = future5;
1998    }
1999
2000    /**
2001     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2002     * combining function to their values. The function can use a {@link DeferredCloser} to capture
2003     * objects to be closed when the pipeline is done.
2004     *
2005     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2006     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2007     * returned step.
2008     *
2009     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2010     *
2011     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2012     * ExecutionException} will be extracted and used as the failure of the derived step.
2013     */
2014    public <U extends @Nullable Object> ClosingFuture<U> call(
2015        ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2016      return call(
2017          new CombiningCallable<U>() {
2018            @Override
2019            @ParametricNullness
2020            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
2021              return function.apply(
2022                  closer,
2023                  peeker.getDone(future1),
2024                  peeker.getDone(future2),
2025                  peeker.getDone(future3),
2026                  peeker.getDone(future4),
2027                  peeker.getDone(future5));
2028            }
2029
2030            @Override
2031            public String toString() {
2032              return function.toString();
2033            }
2034          },
2035          executor);
2036    }
2037
2038    /**
2039     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
2040     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
2041     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
2042     * captured by the returned {@link ClosingFuture}).
2043     *
2044     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
2045     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
2046     * returned step.
2047     *
2048     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
2049     *
2050     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
2051     * ExecutionException} will be extracted and used as the failure of the derived step.
2052     *
2053     * <p>If the function throws any other exception, it will be used as the failure of the derived
2054     * step.
2055     *
2056     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
2057     * the closeable objects in that {@code ClosingFuture} will be closed.
2058     *
2059     * <p>Usage guidelines for this method:
2060     *
2061     * <ul>
2062     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
2063     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
2064     *       Executor)} instead, with a function that returns the next value directly.
2065     *   <li>Call {@link DeferredCloser#eventuallyClose(Object, Executor) closer.eventuallyClose()}
2066     *       for every closeable object this step creates in order to capture it for later closing.
2067     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
2068     *       ClosingFuture} call {@link #from(ListenableFuture)}.
2069     * </ul>
2070     *
2071     * <p>The same warnings about doing heavyweight operations within {@link
2072     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
2073     */
2074    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
2075        AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2076      return callAsync(
2077          new AsyncCombiningCallable<U>() {
2078            @Override
2079            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
2080              return function.apply(
2081                  closer,
2082                  peeker.getDone(future1),
2083                  peeker.getDone(future2),
2084                  peeker.getDone(future3),
2085                  peeker.getDone(future4),
2086                  peeker.getDone(future5));
2087            }
2088
2089            @Override
2090            public String toString() {
2091              return function.toString();
2092            }
2093          },
2094          executor);
2095    }
2096  }
2097
2098  @Override
2099  public String toString() {
2100    // TODO(dpb): Better toString, in the style of Futures.transform etc.
2101    return toStringHelper(this).add("state", state.get()).addValue(future).toString();
2102  }
2103
2104  @SuppressWarnings({"removal", "Finalize"}) // b/260137033
2105  @Override
2106  protected void finalize() {
2107    if (state.get().equals(OPEN)) {
2108      logger.get().log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this);
2109      FluentFuture<V> unused = finishToFuture();
2110    }
2111  }
2112
2113  private static void closeQuietly(@Nullable AutoCloseable closeable, Executor executor) {
2114    if (closeable == null) {
2115      return;
2116    }
2117    try {
2118      executor.execute(
2119          () -> {
2120            try {
2121              closeable.close();
2122            } catch (Exception e) {
2123              /*
2124               * In guava-jre, any kind of Exception may be thrown because `closeable` has type
2125               * `AutoCloseable`.
2126               *
2127               * In guava-android, the only kinds of Exception that may be thrown are
2128               * RuntimeException and IOException because `closeable` has type `Closeable`—except
2129               * that we have to account for sneaky checked exception.
2130               */
2131              restoreInterruptIfIsInterruptedException(e);
2132              logger.get().log(WARNING, "thrown by close()", e);
2133            }
2134          });
2135    } catch (RejectedExecutionException e) {
2136      if (logger.get().isLoggable(WARNING)) {
2137        logger
2138            .get()
2139            .log(
2140                WARNING,
2141                String.format("while submitting close to %s; will close inline", executor),
2142                e);
2143      }
2144      closeQuietly(closeable, directExecutor());
2145    }
2146  }
2147
2148  private void checkAndUpdateState(State oldState, State newState) {
2149    checkState(
2150        compareAndUpdateState(oldState, newState),
2151        "Expected state to be %s, but it was %s",
2152        oldState,
2153        newState);
2154  }
2155
2156  private boolean compareAndUpdateState(State oldState, State newState) {
2157    return state.compareAndSet(oldState, newState);
2158  }
2159
2160  // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap?
2161  private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor>
2162      implements Closeable {
2163    private final DeferredCloser closer = new DeferredCloser(this);
2164    private volatile boolean closed;
2165    private volatile @Nullable CountDownLatch whenClosed;
2166
2167    <V extends @Nullable Object, U extends @Nullable Object>
2168        ListenableFuture<U> applyClosingFunction(
2169            ClosingFunction<? super V, U> transformation, @ParametricNullness V input)
2170            throws Exception {
2171      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2172      CloseableList newCloseables = new CloseableList();
2173      try {
2174        return immediateFuture(transformation.apply(newCloseables.closer, input));
2175      } finally {
2176        add(newCloseables, directExecutor());
2177      }
2178    }
2179
2180    <V extends @Nullable Object, U extends @Nullable Object>
2181        FluentFuture<U> applyAsyncClosingFunction(
2182            AsyncClosingFunction<V, U> transformation, @ParametricNullness V input)
2183            throws Exception {
2184      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2185      CloseableList newCloseables = new CloseableList();
2186      try {
2187        ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input);
2188        closingFuture.becomeSubsumedInto(newCloseables);
2189        return closingFuture.future;
2190      } finally {
2191        add(newCloseables, directExecutor());
2192      }
2193    }
2194
2195    @Override
2196    public void close() {
2197      if (closed) {
2198        return;
2199      }
2200      synchronized (this) {
2201        if (closed) {
2202          return;
2203        }
2204        closed = true;
2205      }
2206      for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) {
2207        closeQuietly(entry.getKey(), entry.getValue());
2208      }
2209      clear();
2210      if (whenClosed != null) {
2211        whenClosed.countDown();
2212      }
2213    }
2214
2215    void add(@Nullable AutoCloseable closeable, Executor executor) {
2216      checkNotNull(executor);
2217      if (closeable == null) {
2218        return;
2219      }
2220      synchronized (this) {
2221        if (!closed) {
2222          put(closeable, executor);
2223          return;
2224        }
2225      }
2226      closeQuietly(closeable, executor);
2227    }
2228
2229    /**
2230     * Returns a latch that reaches zero when this objects' deferred closeables have been closed.
2231     */
2232    CountDownLatch whenClosedCountDown() {
2233      if (closed) {
2234        return new CountDownLatch(0);
2235      }
2236      synchronized (this) {
2237        if (closed) {
2238          return new CountDownLatch(0);
2239        }
2240        checkState(whenClosed == null);
2241        return whenClosed = new CountDownLatch(1);
2242      }
2243    }
2244  }
2245
2246  /**
2247   * Returns an object that can be used to wait until this objects' deferred closeables have all had
2248   * {@link Runnable}s that close them submitted to each one's closing {@link Executor}.
2249   */
2250  @VisibleForTesting
2251  CountDownLatch whenClosedCountDown() {
2252    return closeables.whenClosedCountDown();
2253  }
2254
2255  /** The state of a {@link CloseableList}. */
2256  enum State {
2257    /** The {@link CloseableList} has not been subsumed or closed. */
2258    OPEN,
2259
2260    /**
2261     * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed
2262     * into any other.
2263     */
2264    SUBSUMED,
2265
2266    /**
2267     * Some {@link ListenableFuture} has a callback attached that will close the {@link
2268     * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed.
2269     */
2270    WILL_CLOSE,
2271
2272    /**
2273     * The callback that closes the {@link CloseableList} is running, but it has not completed. The
2274     * {@link CloseableList} may not be subsumed.
2275     */
2276    CLOSING,
2277
2278    /** The {@link CloseableList} has been closed. It may not be further subsumed. */
2279    CLOSED,
2280
2281    /**
2282     * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been
2283     * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called.
2284     */
2285    WILL_CREATE_VALUE_AND_CLOSER,
2286  }
2287}