001/*
002 * Copyright (C) 2017 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Functions.constant;
020import static com.google.common.base.MoreObjects.toStringHelper;
021import static com.google.common.base.Preconditions.checkArgument;
022import static com.google.common.base.Preconditions.checkNotNull;
023import static com.google.common.base.Preconditions.checkState;
024import static com.google.common.collect.Lists.asList;
025import static com.google.common.util.concurrent.ClosingFuture.State.CLOSED;
026import static com.google.common.util.concurrent.ClosingFuture.State.CLOSING;
027import static com.google.common.util.concurrent.ClosingFuture.State.OPEN;
028import static com.google.common.util.concurrent.ClosingFuture.State.SUBSUMED;
029import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CLOSE;
030import static com.google.common.util.concurrent.ClosingFuture.State.WILL_CREATE_VALUE_AND_CLOSER;
031import static com.google.common.util.concurrent.Futures.getDone;
032import static com.google.common.util.concurrent.Futures.immediateFuture;
033import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
034import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
035import static java.util.logging.Level.FINER;
036import static java.util.logging.Level.SEVERE;
037import static java.util.logging.Level.WARNING;
038
039import com.google.common.annotations.Beta;
040import com.google.common.annotations.VisibleForTesting;
041import com.google.common.base.Function;
042import com.google.common.collect.FluentIterable;
043import com.google.common.collect.ImmutableList;
044import com.google.common.util.concurrent.ClosingFuture.Combiner.AsyncCombiningCallable;
045import com.google.common.util.concurrent.ClosingFuture.Combiner.CombiningCallable;
046import com.google.common.util.concurrent.Futures.FutureCombiner;
047import com.google.errorprone.annotations.CanIgnoreReturnValue;
048import com.google.errorprone.annotations.DoNotMock;
049import com.google.j2objc.annotations.RetainedWith;
050import java.io.Closeable;
051import java.util.IdentityHashMap;
052import java.util.Map;
053import java.util.concurrent.Callable;
054import java.util.concurrent.CancellationException;
055import java.util.concurrent.CountDownLatch;
056import java.util.concurrent.ExecutionException;
057import java.util.concurrent.Executor;
058import java.util.concurrent.Future;
059import java.util.concurrent.RejectedExecutionException;
060import java.util.concurrent.atomic.AtomicReference;
061import java.util.logging.Logger;
062import org.checkerframework.checker.nullness.qual.Nullable;
063
064/**
065 * A step in a pipeline of an asynchronous computation. When the last step in the computation is
066 * complete, some objects captured during the computation are closed.
067 *
068 * <p>A pipeline of {@code ClosingFuture}s is a tree of steps. Each step represents either an
069 * asynchronously-computed intermediate value, or else an exception that indicates the failure or
070 * cancellation of the operation so far. The only way to extract the value or exception from a step
071 * is by declaring that step to be the last step of the pipeline. Nevertheless, we refer to the
072 * "value" of a successful step or the "result" (value or exception) of any step.
073 *
074 * <ol>
075 *   <li>A pipeline starts at its leaf step (or steps), which is created from either a callable
076 *       block or a {@link ListenableFuture}.
077 *   <li>Each other step is derived from one or more input steps. At each step, zero or more objects
078 *       can be captured for later closing.
079 *   <li>There is one last step (the root of the tree), from which you can extract the final result
080 *       of the computation. After that result is available (or the computation fails), all objects
081 *       captured by any of the steps in the pipeline are closed.
082 * </ol>
083 *
084 * <h3>Starting a pipeline</h3>
085 *
086 * Start a {@code ClosingFuture} pipeline {@linkplain #submit(ClosingCallable, Executor) from a
087 * callable block} that may capture objects for later closing. To start a pipeline from a {@link
088 * ListenableFuture} that doesn't create resources that should be closed later, you can use {@link
089 * #from(ListenableFuture)} instead.
090 *
091 * <h3>Derived steps</h3>
092 *
093 * A {@code ClosingFuture} step can be derived from one or more input {@code ClosingFuture} steps in
094 * ways similar to {@link FluentFuture}s:
095 *
096 * <ul>
097 *   <li>by transforming the value from a successful input step,
098 *   <li>by catching the exception from a failed input step, or
099 *   <li>by combining the results of several input steps.
100 * </ul>
101 *
102 * Each derivation can capture the next value or any intermediate objects for later closing.
103 *
104 * <p>A step can be the input to at most one derived step. Once you transform its value, catch its
105 * exception, or combine it with others, you cannot do anything else with it, including declare it
106 * to be the last step of the pipeline.
107 *
108 * <h4>Transforming</h4>
109 *
110 * To derive the next step by asynchronously applying a function to an input step's value, call
111 * {@link #transform(ClosingFunction, Executor)} or {@link #transformAsync(AsyncClosingFunction,
112 * Executor)} on the input step.
113 *
114 * <h4>Catching</h4>
115 *
116 * To derive the next step from a failed input step, call {@link #catching(Class, ClosingFunction,
117 * Executor)} or {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} on the input step.
118 *
119 * <h4>Combining</h4>
120 *
121 * To derive a {@code ClosingFuture} from two or more input steps, pass the input steps to {@link
122 * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)} or its overloads.
123 *
124 * <h3>Cancelling</h3>
125 *
126 * Any step in a pipeline can be {@linkplain #cancel(boolean) cancelled}, even after another step
127 * has been derived, with the same semantics as cancelling a {@link Future}. In addition, a
128 * successfully cancelled step will immediately start closing all objects captured for later closing
129 * by it and by its input steps.
130 *
131 * <h3>Ending a pipeline</h3>
132 *
133 * Each {@code ClosingFuture} pipeline must be ended. To end a pipeline, decide whether you want to
134 * close the captured objects automatically or manually.
135 *
136 * <h4>Automatically closing</h4>
137 *
138 * You can extract a {@link Future} that represents the result of the last step in the pipeline by
139 * calling {@link #finishToFuture()}. When that final {@link Future} is done, all objects captured
140 * by all steps in the pipeline will be closed.
141 *
142 * <pre>{@code
143 * FluentFuture<UserName> userName =
144 *     ClosingFuture.submit(
145 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
146 *             executor)
147 *         .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
148 *         .transform((closer, result) -> result.get("userName"), directExecutor())
149 *         .catching(DBException.class, e -> "no user", directExecutor())
150 *         .finishToFuture();
151 * }</pre>
152 *
153 * In this example, when the {@code userName} {@link Future} is done, the transaction and the query
154 * result cursor will both be closed, even if the operation is cancelled or fails.
155 *
156 * <h4>Manually closing</h4>
157 *
158 * If you want to close the captured objects manually, after you've used the final result, call
159 * {@link #finishToValueAndCloser(ValueAndCloserConsumer, Executor)} to get an object that holds the
160 * final result. You then call {@link ValueAndCloser#closeAsync()} to close the captured objects.
161 *
162 * <pre>{@code
163 *     ClosingFuture.submit(
164 *             closer -> closer.eventuallyClose(database.newTransaction(), closingExecutor),
165 *             executor)
166 *     .transformAsync((closer, transaction) -> transaction.queryClosingFuture("..."), executor)
167 *     .transform((closer, result) -> result.get("userName"), directExecutor())
168 *     .catching(DBException.class, e -> "no user", directExecutor())
169 *     .finishToValueAndCloser(
170 *         valueAndCloser -> this.userNameValueAndCloser = valueAndCloser, executor);
171 *
172 * // later
173 * try { // get() will throw if the operation failed or was cancelled.
174 *   UserName userName = userNameValueAndCloser.get();
175 *   // do something with userName
176 * } finally {
177 *   userNameValueAndCloser.closeAsync();
178 * }
179 * }</pre>
180 *
181 * In this example, when {@code userNameValueAndCloser.closeAsync()} is called, the transaction and
182 * the query result cursor will both be closed, even if the operation is cancelled or fails.
183 *
184 * <p>Note that if you don't call {@code closeAsync()}, the captured objects will not be closed. The
185 * automatic-closing approach described above is safer.
186 *
187 * @param <V> the type of the value of this step
188 * @since 30.0
189 */
190// TODO(dpb): Consider reusing one CloseableList for the entire pipeline, modulo combinations.
191@Beta // @Beta for one release.
192@DoNotMock("Use ClosingFuture.from(Futures.immediate*Future)")
193// TODO(dpb): GWT compatibility.
194public final class ClosingFuture<V> {
195
196  private static final Logger logger = Logger.getLogger(ClosingFuture.class.getName());
197
198  /**
199   * An object that can capture objects to be closed later, when a {@link ClosingFuture} pipeline is
200   * done.
201   */
202  public static final class DeferredCloser {
203    @RetainedWith private final CloseableList list;
204
205    DeferredCloser(CloseableList list) {
206      this.list = list;
207    }
208
209    /**
210     * Captures an object to be closed when a {@link ClosingFuture} pipeline is done.
211     *
212     * <p>For users of the {@code -jre} flavor of Guava, the object can be any {@code
213     * AutoCloseable}. For users of the {@code -android} flavor, the object must be a {@code
214     * Closeable}. (For more about the flavors, see <a
215     * href="https://github.com/google/guava#adding-guava-to-your-build">Adding Guava to your
216     * build</a>.)
217     *
218     * <p>Be careful when targeting an older SDK than you are building against (most commonly when
219     * building for Android): Ensure that any object you pass implements the interface not just in
220     * your current SDK version but also at the oldest version you support. For example, <a
221     * href="https://developer.android.com/sdk/api_diff/16/">API Level 16</a> is the first version
222     * in which {@code Cursor} is {@code Closeable}. To support older versions, pass a wrapper
223     * {@code Closeable} with a method reference like {@code cursor::close}.
224     *
225     * <p>Note that this method is still binary-compatible between flavors because the erasure of
226     * its parameter type is {@code Object}, not {@code AutoCloseable} or {@code Closeable}.
227     *
228     * @param closeable the object to be closed (see notes above)
229     * @param closingExecutor the object will be closed on this executor
230     * @return the first argument
231     */
232    @CanIgnoreReturnValue
233    public <C extends @Nullable Object & @Nullable AutoCloseable> C eventuallyClose(
234        C closeable, Executor closingExecutor) {
235      checkNotNull(closingExecutor);
236      if (closeable != null) {
237        list.add(closeable, closingExecutor);
238      }
239      return closeable;
240    }
241  }
242
243  /**
244   * An operation that computes a result.
245   *
246   * @param <V> the type of the result
247   */
248  @FunctionalInterface
249  public interface ClosingCallable<V extends @Nullable Object> {
250    /**
251     * Computes a result, or throws an exception if unable to do so.
252     *
253     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
254     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
255     * not before this method completes), even if this method throws or the pipeline is cancelled.
256     */
257    V call(DeferredCloser closer) throws Exception;
258  }
259
260  /**
261   * A function from an input to a result.
262   *
263   * @param <T> the type of the input to the function
264   * @param <U> the type of the result of the function
265   */
266  @FunctionalInterface
267  public interface ClosingFunction<T extends @Nullable Object, U extends @Nullable Object> {
268
269    /**
270     * Applies this function to an input, or throws an exception if unable to do so.
271     *
272     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
273     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
274     * not before this method completes), even if this method throws or the pipeline is cancelled.
275     */
276    U apply(DeferredCloser closer, T input) throws Exception;
277  }
278
279  /**
280   * A function from an input to a {@link ClosingFuture} of a result.
281   *
282   * @param <T> the type of the input to the function
283   * @param <U> the type of the result of the function
284   */
285  @FunctionalInterface
286  public interface AsyncClosingFunction<T extends @Nullable Object, U extends @Nullable Object> {
287    /**
288     * Applies this function to an input, or throws an exception if unable to do so.
289     *
290     * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable, Executor)
291     * closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline is done (but
292     * not before this method completes), even if this method throws or the pipeline is cancelled.
293     */
294    ClosingFuture<U> apply(DeferredCloser closer, T input) throws Exception;
295  }
296
297  /**
298   * An object that holds the final result of an asynchronous {@link ClosingFuture} operation and
299   * allows the user to close all the closeable objects that were captured during it for later
300   * closing.
301   *
302   * <p>The asynchronous operation will have completed before this object is created.
303   *
304   * @param <V> the type of the value of a successful operation
305   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
306   */
307  public static final class ValueAndCloser<V> {
308
309    private final ClosingFuture<? extends V> closingFuture;
310
311    ValueAndCloser(ClosingFuture<? extends V> closingFuture) {
312      this.closingFuture = checkNotNull(closingFuture);
313    }
314
315    /**
316     * Returns the final value of the associated {@link ClosingFuture}, or throws an exception as
317     * {@link Future#get()} would.
318     *
319     * <p>Because the asynchronous operation has already completed, this method is synchronous and
320     * returns immediately.
321     *
322     * @throws CancellationException if the computation was cancelled
323     * @throws ExecutionException if the computation threw an exception
324     */
325    @Nullable
326    public V get() throws ExecutionException {
327      return getDone(closingFuture.future);
328    }
329
330    /**
331     * Starts closing all closeable objects captured during the {@link ClosingFuture}'s asynchronous
332     * operation on the {@link Executor}s specified by calls to {@link
333     * DeferredCloser#eventuallyClose(Closeable, Executor)}.
334     *
335     * <p>If any such calls specified {@link MoreExecutors#directExecutor()}, those objects will be
336     * closed synchronously.
337     *
338     * <p>Idempotent: objects will be closed at most once.
339     */
340    public void closeAsync() {
341      closingFuture.close();
342    }
343  }
344
345  /**
346   * Represents an operation that accepts a {@link ValueAndCloser} for the last step in a {@link
347   * ClosingFuture} pipeline.
348   *
349   * @param <V> the type of the final value of a successful pipeline
350   * @see ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)
351   */
352  @FunctionalInterface
353  public interface ValueAndCloserConsumer<V> {
354
355    /** Accepts a {@link ValueAndCloser} for the last step in a {@link ClosingFuture} pipeline. */
356    void accept(ValueAndCloser<V> valueAndCloser);
357  }
358
359  /**
360   * Starts a {@link ClosingFuture} pipeline by submitting a callable block to an executor.
361   *
362   * @throws java.util.concurrent.RejectedExecutionException if the task cannot be scheduled for
363   *     execution
364   */
365  public static <V> ClosingFuture<V> submit(ClosingCallable<V> callable, Executor executor) {
366    return new ClosingFuture<>(callable, executor);
367  }
368
369  // TODO(dpb, cpovirk): Do we need submitAsync?
370
371  /**
372   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
373   *
374   * <p>{@code future}'s value will not be closed when the pipeline is done even if {@code V}
375   * implements {@link Closeable}. In order to start a pipeline with a value that will be closed
376   * when the pipeline is done, use {@link #submit(ClosingCallable, Executor)} instead.
377   */
378  public static <V> ClosingFuture<V> from(ListenableFuture<V> future) {
379    return new ClosingFuture<V>(future);
380  }
381
382  /**
383   * Starts a {@link ClosingFuture} pipeline with a {@link ListenableFuture}.
384   *
385   * <p>If {@code future} succeeds, its value will be closed (using {@code closingExecutor)} when
386   * the pipeline is done, even if the pipeline is canceled or fails.
387   *
388   * <p>Cancelling the pipeline will not cancel {@code future}, so that the pipeline can access its
389   * value in order to close it.
390   *
391   * @param future the future to create the {@code ClosingFuture} from. For discussion of the
392   *     future's result type {@code C}, see {@link DeferredCloser#eventuallyClose(Closeable,
393   *     Executor)}.
394   * @param closingExecutor the future's result will be closed on this executor
395   * @deprecated Creating {@link Future}s of closeable types is dangerous in general because the
396   *     underlying value may never be closed if the {@link Future} is canceled after its operation
397   *     begins. Consider replacing code that creates {@link ListenableFuture}s of closeable types,
398   *     including those that pass them to this method, with {@link #submit(ClosingCallable,
399   *     Executor)} in order to ensure that resources do not leak. Or, to start a pipeline with a
400   *     {@link ListenableFuture} that doesn't create values that should be closed, use {@link
401   *     ClosingFuture#from}.
402   */
403  @Deprecated
404  public static <C extends @Nullable Object & @Nullable AutoCloseable>
405      ClosingFuture<C> eventuallyClosing(
406          ListenableFuture<C> future, final Executor closingExecutor) {
407    checkNotNull(closingExecutor);
408    final ClosingFuture<C> closingFuture = new ClosingFuture<>(nonCancellationPropagating(future));
409    Futures.addCallback(
410        future,
411        new FutureCallback<@Nullable AutoCloseable>() {
412          @Override
413          public void onSuccess(@Nullable AutoCloseable result) {
414            closingFuture.closeables.closer.eventuallyClose(result, closingExecutor);
415          }
416
417          @Override
418          public void onFailure(Throwable t) {}
419        },
420        directExecutor());
421    return closingFuture;
422  }
423
424  /**
425   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
426   *
427   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
428   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
429   */
430  public static Combiner whenAllComplete(Iterable<? extends ClosingFuture<?>> futures) {
431    return new Combiner(false, futures);
432  }
433
434  /**
435   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline.
436   *
437   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
438   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
439   */
440  public static Combiner whenAllComplete(
441      ClosingFuture<?> future1, ClosingFuture<?>... moreFutures) {
442    return whenAllComplete(asList(future1, moreFutures));
443  }
444
445  /**
446   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
447   * all succeed. If any fail, the resulting pipeline will fail.
448   *
449   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
450   *     the {@code futures}, or if any has already been {@linkplain #finishToFuture() finished}
451   */
452  public static Combiner whenAllSucceed(Iterable<? extends ClosingFuture<?>> futures) {
453    return new Combiner(true, futures);
454  }
455
456  /**
457   * Starts specifying how to combine two {@link ClosingFuture}s into a single pipeline, assuming
458   * they all succeed. If any fail, the resulting pipeline will fail.
459   *
460   * <p>Calling this method allows you to use lambdas or method references typed with the types of
461   * the input {@link ClosingFuture}s.
462   *
463   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
464   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
465   */
466  public static <V1, V2> Combiner2<V1, V2> whenAllSucceed(
467      ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
468    return new Combiner2<>(future1, future2);
469  }
470
471  /**
472   * Starts specifying how to combine three {@link ClosingFuture}s into a single pipeline, assuming
473   * they all succeed. If any fail, the resulting pipeline will fail.
474   *
475   * <p>Calling this method allows you to use lambdas or method references typed with the types of
476   * the input {@link ClosingFuture}s.
477   *
478   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
479   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
480   */
481  public static <V1, V2, V3> Combiner3<V1, V2, V3> whenAllSucceed(
482      ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
483    return new Combiner3<>(future1, future2, future3);
484  }
485
486  /**
487   * Starts specifying how to combine four {@link ClosingFuture}s into a single pipeline, assuming
488   * they all succeed. If any fail, the resulting pipeline will fail.
489   *
490   * <p>Calling this method allows you to use lambdas or method references typed with the types of
491   * the input {@link ClosingFuture}s.
492   *
493   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
494   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
495   */
496  public static <V1, V2, V3, V4> Combiner4<V1, V2, V3, V4> whenAllSucceed(
497      ClosingFuture<V1> future1,
498      ClosingFuture<V2> future2,
499      ClosingFuture<V3> future3,
500      ClosingFuture<V4> future4) {
501    return new Combiner4<>(future1, future2, future3, future4);
502  }
503
504  /**
505   * Starts specifying how to combine five {@link ClosingFuture}s into a single pipeline, assuming
506   * they all succeed. If any fail, the resulting pipeline will fail.
507   *
508   * <p>Calling this method allows you to use lambdas or method references typed with the types of
509   * the input {@link ClosingFuture}s.
510   *
511   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
512   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
513   */
514  public static <V1, V2, V3, V4, V5> Combiner5<V1, V2, V3, V4, V5> whenAllSucceed(
515      ClosingFuture<V1> future1,
516      ClosingFuture<V2> future2,
517      ClosingFuture<V3> future3,
518      ClosingFuture<V4> future4,
519      ClosingFuture<V5> future5) {
520    return new Combiner5<>(future1, future2, future3, future4, future5);
521  }
522
523  /**
524   * Starts specifying how to combine {@link ClosingFuture}s into a single pipeline, assuming they
525   * all succeed. If any fail, the resulting pipeline will fail.
526   *
527   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from any of
528   *     the arguments, or if any has already been {@linkplain #finishToFuture() finished}
529   */
530  public static Combiner whenAllSucceed(
531      ClosingFuture<?> future1,
532      ClosingFuture<?> future2,
533      ClosingFuture<?> future3,
534      ClosingFuture<?> future4,
535      ClosingFuture<?> future5,
536      ClosingFuture<?> future6,
537      ClosingFuture<?>... moreFutures) {
538    return whenAllSucceed(
539        FluentIterable.of(future1, future2, future3, future4, future5, future6)
540            .append(moreFutures));
541  }
542
543  private final AtomicReference<State> state = new AtomicReference<>(OPEN);
544  private final CloseableList closeables = new CloseableList();
545  private final FluentFuture<V> future;
546
547  private ClosingFuture(ListenableFuture<V> future) {
548    this.future = FluentFuture.from(future);
549  }
550
551  private ClosingFuture(final ClosingCallable<V> callable, Executor executor) {
552    checkNotNull(callable);
553    TrustedListenableFutureTask<V> task =
554        TrustedListenableFutureTask.create(
555            new Callable<V>() {
556              @Override
557              public V call() throws Exception {
558                return callable.call(closeables.closer);
559              }
560
561              @Override
562              public String toString() {
563                return callable.toString();
564              }
565            });
566    executor.execute(task);
567    this.future = task;
568  }
569
570  /**
571   * Returns a future that finishes when this step does. Calling {@code get()} on the returned
572   * future returns {@code null} if the step is successful or throws the same exception that would
573   * be thrown by calling {@code finishToFuture().get()} if this were the last step. Calling {@code
574   * cancel()} on the returned future has no effect on the {@code ClosingFuture} pipeline.
575   *
576   * <p>{@code statusFuture} differs from most methods on {@code ClosingFuture}: You can make calls
577   * to {@code statusFuture} <i>in addition to</i> the call you make to {@link #finishToFuture()} or
578   * a derivation method <i>on the same instance</i>. This is important because calling {@code
579   * statusFuture} alone does not provide a way to close the pipeline.
580   */
581  public ListenableFuture<?> statusFuture() {
582    return nonCancellationPropagating(future.transform(constant(null), directExecutor()));
583  }
584
585  /**
586   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
587   * to its value. The function can use a {@link DeferredCloser} to capture objects to be closed
588   * when the pipeline is done.
589   *
590   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
591   * ClosingFuture} will be equivalent to this one.
592   *
593   * <p>If the function throws an exception, that exception is used as the result of the derived
594   * {@code ClosingFuture}.
595   *
596   * <p>Example usage:
597   *
598   * <pre>{@code
599   * ClosingFuture<List<Row>> rowsFuture =
600   *     queryFuture.transform((closer, result) -> result.getRows(), executor);
601   * }</pre>
602   *
603   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
604   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
605   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
606   *
607   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
608   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
609   * this {@code ClosingFuture}.
610   *
611   * @param function transforms the value of this step to the value of the derived step
612   * @param executor executor to run the function in
613   * @return the derived step
614   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
615   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
616   *     finished}
617   */
618  public <U> ClosingFuture<U> transform(
619      final ClosingFunction<? super V, U> function, Executor executor) {
620    checkNotNull(function);
621    AsyncFunction<V, U> applyFunction =
622        new AsyncFunction<V, U>() {
623          @Override
624          public ListenableFuture<U> apply(V input) throws Exception {
625            return closeables.applyClosingFunction(function, input);
626          }
627
628          @Override
629          public String toString() {
630            return function.toString();
631          }
632        };
633    // TODO(dpb): Switch to future.transformSync when that exists (passing a throwing function).
634    return derive(future.transformAsync(applyFunction, executor));
635  }
636
637  /**
638   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
639   * that returns a {@code ClosingFuture} to its value. The function can use a {@link
640   * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
641   * captured by the returned {@link ClosingFuture}).
642   *
643   * <p>If this {@code ClosingFuture} succeeds, the derived one will be equivalent to the one
644   * returned by the function.
645   *
646   * <p>If this {@code ClosingFuture} fails, the function will not be called, and the derived {@code
647   * ClosingFuture} will be equivalent to this one.
648   *
649   * <p>If the function throws an exception, that exception is used as the result of the derived
650   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
651   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
652   * closed.
653   *
654   * <p>Usage guidelines for this method:
655   *
656   * <ul>
657   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
658   *       {@code ClosingFuture}. If possible, prefer calling {@link #transform(ClosingFunction,
659   *       Executor)} instead, with a function that returns the next value directly.
660   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
661   *       for every closeable object this step creates in order to capture it for later closing.
662   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
663   *       ClosingFuture} call {@link #from(ListenableFuture)}.
664   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
665   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
666   *       {@link #withoutCloser(AsyncFunction)}
667   * </ul>
668   *
669   * <p>Example usage:
670   *
671   * <pre>{@code
672   * // Result.getRowsClosingFuture() returns a ClosingFuture.
673   * ClosingFuture<List<Row>> rowsFuture =
674   *     queryFuture.transformAsync((closer, result) -> result.getRowsClosingFuture(), executor);
675   *
676   * // Result.writeRowsToOutputStreamFuture() returns a ListenableFuture that resolves to the
677   * // number of written rows. openOutputFile() returns a FileOutputStream (which implements
678   * // Closeable).
679   * ClosingFuture<Integer> rowsFuture2 =
680   *     queryFuture.transformAsync(
681   *         (closer, result) -> {
682   *           FileOutputStream fos = closer.eventuallyClose(openOutputFile(), closingExecutor);
683   *           return ClosingFuture.from(result.writeRowsToOutputStreamFuture(fos));
684   *      },
685   *      executor);
686   *
687   * // Result.getRowsFuture() returns a ListenableFuture (no new closeables are created).
688   * ClosingFuture<List<Row>> rowsFuture3 =
689   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
690   *
691   * }</pre>
692   *
693   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
694   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
695   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
696   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
697   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
698   * responsible for completing the returned {@code ClosingFuture}.)
699   *
700   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
701   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
702   * this {@code ClosingFuture}.
703   *
704   * @param function transforms the value of this step to a {@code ClosingFuture} with the value of
705   *     the derived step
706   * @param executor executor to run the function in
707   * @return the derived step
708   * @throws IllegalStateException if a {@code ClosingFuture} has already been derived from this
709   *     one, or if this {@code ClosingFuture} has already been {@linkplain #finishToFuture()
710   *     finished}
711   */
712  public <U> ClosingFuture<U> transformAsync(
713      final AsyncClosingFunction<? super V, U> function, Executor executor) {
714    checkNotNull(function);
715    AsyncFunction<V, U> applyFunction =
716        new AsyncFunction<V, U>() {
717          @Override
718          public ListenableFuture<U> apply(V input) throws Exception {
719            return closeables.applyAsyncClosingFunction(function, input);
720          }
721
722          @Override
723          public String toString() {
724            return function.toString();
725          }
726        };
727    return derive(future.transformAsync(applyFunction, executor));
728  }
729
730  /**
731   * Returns an {@link AsyncClosingFunction} that applies an {@link AsyncFunction} to an input,
732   * ignoring the DeferredCloser and returning a {@code ClosingFuture} derived from the returned
733   * {@link ListenableFuture}.
734   *
735   * <p>Use this method to pass a transformation to {@link #transformAsync(AsyncClosingFunction,
736   * Executor)} or to {@link #catchingAsync(Class, AsyncClosingFunction, Executor)} as long as it
737   * meets these conditions:
738   *
739   * <ul>
740   *   <li>It does not need to capture any {@link Closeable} objects by calling {@link
741   *       DeferredCloser#eventuallyClose(Closeable, Executor)}.
742   *   <li>It returns a {@link ListenableFuture}.
743   * </ul>
744   *
745   * <p>Example usage:
746   *
747   * <pre>{@code
748   * // Result.getRowsFuture() returns a ListenableFuture.
749   * ClosingFuture<List<Row>> rowsFuture =
750   *     queryFuture.transformAsync(withoutCloser(Result::getRowsFuture), executor);
751   * }</pre>
752   *
753   * @param function transforms the value of a {@code ClosingFuture} step to a {@link
754   *     ListenableFuture} with the value of a derived step
755   */
756  public static <V, U> AsyncClosingFunction<V, U> withoutCloser(
757      final AsyncFunction<V, U> function) {
758    checkNotNull(function);
759    return new AsyncClosingFunction<V, U>() {
760      @Override
761      public ClosingFuture<U> apply(DeferredCloser closer, V input) throws Exception {
762        return ClosingFuture.from(function.apply(input));
763      }
764    };
765  }
766
767  /**
768   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
769   * to its exception if it is an instance of a given exception type. The function can use a {@link
770   * DeferredCloser} to capture objects to be closed when the pipeline is done.
771   *
772   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
773   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
774   * one.
775   *
776   * <p>If the function throws an exception, that exception is used as the result of the derived
777   * {@code ClosingFuture}.
778   *
779   * <p>Example usage:
780   *
781   * <pre>{@code
782   * ClosingFuture<QueryResult> queryFuture =
783   *     queryFuture.catching(
784   *         QueryException.class, (closer, x) -> Query.emptyQueryResult(), executor);
785   * }</pre>
786   *
787   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
788   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
789   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
790   *
791   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
792   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
793   * this {@code ClosingFuture}.
794   *
795   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
796   *     type is matched against this step's exception. "This step's exception" means the cause of
797   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
798   *     underlying this step or, if {@code get()} throws a different kind of exception, that
799   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
800   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
801   * @param fallback the function to be called if this step fails with the expected exception type.
802   *     The function's argument is this step's exception. "This step's exception" means the cause
803   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
804   *     underlying this step or, if {@code get()} throws a different kind of exception, that
805   *     exception itself.
806   * @param executor the executor that runs {@code fallback} if the input fails
807   */
808  public <X extends Throwable> ClosingFuture<V> catching(
809      Class<X> exceptionType, ClosingFunction<? super X, ? extends V> fallback, Executor executor) {
810    return catchingMoreGeneric(exceptionType, fallback, executor);
811  }
812
813  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
814  private <X extends Throwable, W extends V> ClosingFuture<V> catchingMoreGeneric(
815      Class<X> exceptionType, final ClosingFunction<? super X, W> fallback, Executor executor) {
816    checkNotNull(fallback);
817    AsyncFunction<X, W> applyFallback =
818        new AsyncFunction<X, W>() {
819          @Override
820          public ListenableFuture<W> apply(X exception) throws Exception {
821            return closeables.applyClosingFunction(fallback, exception);
822          }
823
824          @Override
825          public String toString() {
826            return fallback.toString();
827          }
828        };
829    // TODO(dpb): Switch to future.catchingSync when that exists (passing a throwing function).
830    return derive(future.catchingAsync(exceptionType, applyFallback, executor));
831  }
832
833  /**
834   * Returns a new {@code ClosingFuture} pipeline step derived from this one by applying a function
835   * that returns a {@code ClosingFuture} to its exception if it is an instance of a given exception
836   * type. The function can use a {@link DeferredCloser} to capture objects to be closed when the
837   * pipeline is done (other than those captured by the returned {@link ClosingFuture}).
838   *
839   * <p>If this {@code ClosingFuture} fails with an exception of the given type, the derived {@code
840   * ClosingFuture} will be equivalent to the one returned by the function.
841   *
842   * <p>If this {@code ClosingFuture} succeeds or fails with a different exception type, the
843   * function will not be called, and the derived {@code ClosingFuture} will be equivalent to this
844   * one.
845   *
846   * <p>If the function throws an exception, that exception is used as the result of the derived
847   * {@code ClosingFuture}. But if the exception is thrown after the function creates a {@code
848   * ClosingFuture}, then none of the closeable objects in that {@code ClosingFuture} will be
849   * closed.
850   *
851   * <p>Usage guidelines for this method:
852   *
853   * <ul>
854   *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
855   *       {@code ClosingFuture}. If possible, prefer calling {@link #catching(Class,
856   *       ClosingFunction, Executor)} instead, with a function that returns the next value
857   *       directly.
858   *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor) closer.eventuallyClose()}
859   *       for every closeable object this step creates in order to capture it for later closing.
860   *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
861   *       ClosingFuture} call {@link #from(ListenableFuture)}.
862   *   <li>In case this step doesn't create new closeables, you can adapt an API that returns a
863   *       {@link ListenableFuture} to return a {@code ClosingFuture} by wrapping it with a call to
864   *       {@link #withoutCloser(AsyncFunction)}
865   * </ul>
866   *
867   * <p>Example usage:
868   *
869   * <pre>{@code
870   * // Fall back to a secondary input stream in case of IOException.
871   * ClosingFuture<InputStream> inputFuture =
872   *     firstInputFuture.catchingAsync(
873   *         IOException.class, (closer, x) -> secondaryInputStreamClosingFuture(), executor);
874   * }
875   * }</pre>
876   *
877   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
878   * the discussion in the {@link ListenableFuture#addListener} documentation. All its warnings
879   * about heavyweight listeners are also applicable to heavyweight functions passed to this method.
880   * (Specifically, {@code directExecutor} functions should avoid heavyweight operations inside
881   * {@code AsyncClosingFunction.apply}. Any heavyweight operations should occur in other threads
882   * responsible for completing the returned {@code ClosingFuture}.)
883   *
884   * <p>After calling this method, you may not call {@link #finishToFuture()}, {@link
885   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, or any other derivation method on
886   * this {@code ClosingFuture}.
887   *
888   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
889   *     type is matched against this step's exception. "This step's exception" means the cause of
890   *     the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
891   *     underlying this step or, if {@code get()} throws a different kind of exception, that
892   *     exception itself. To avoid hiding bugs and other unrecoverable errors, callers should
893   *     prefer more specific types, avoiding {@code Throwable.class} in particular.
894   * @param fallback the function to be called if this step fails with the expected exception type.
895   *     The function's argument is this step's exception. "This step's exception" means the cause
896   *     of the {@link ExecutionException} thrown by {@link Future#get()} on the {@link Future}
897   *     underlying this step or, if {@code get()} throws a different kind of exception, that
898   *     exception itself.
899   * @param executor the executor that runs {@code fallback} if the input fails
900   */
901  // TODO(dpb): Should this do something special if the function throws CancellationException or
902  // ExecutionException?
903  public <X extends Throwable> ClosingFuture<V> catchingAsync(
904      Class<X> exceptionType,
905      AsyncClosingFunction<? super X, ? extends V> fallback,
906      Executor executor) {
907    return catchingAsyncMoreGeneric(exceptionType, fallback, executor);
908  }
909
910  // Avoids generic type capture inconsistency problems where |? extends V| is incompatible with V.
911  private <X extends Throwable, W extends V> ClosingFuture<V> catchingAsyncMoreGeneric(
912      Class<X> exceptionType,
913      final AsyncClosingFunction<? super X, W> fallback,
914      Executor executor) {
915    checkNotNull(fallback);
916    AsyncFunction<X, W> asyncFunction =
917        new AsyncFunction<X, W>() {
918          @Override
919          public ListenableFuture<W> apply(X exception) throws Exception {
920            return closeables.applyAsyncClosingFunction(fallback, exception);
921          }
922
923          @Override
924          public String toString() {
925            return fallback.toString();
926          }
927        };
928    return derive(future.catchingAsync(exceptionType, asyncFunction, executor));
929  }
930
931  /**
932   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When the returned
933   * {@link Future} is done, all objects captured for closing during the pipeline's computation will
934   * be closed.
935   *
936   * <p>After calling this method, you may not call {@link
937   * #finishToValueAndCloser(ValueAndCloserConsumer, Executor)}, this method, or any other
938   * derivation method on this {@code ClosingFuture}.
939   *
940   * @return a {@link Future} that represents the final value or exception of the pipeline
941   */
942  public FluentFuture<V> finishToFuture() {
943    if (compareAndUpdateState(OPEN, WILL_CLOSE)) {
944      logger.log(FINER, "will close {0}", this);
945      future.addListener(
946          new Runnable() {
947            @Override
948            public void run() {
949              checkAndUpdateState(WILL_CLOSE, CLOSING);
950              close();
951              checkAndUpdateState(CLOSING, CLOSED);
952            }
953          },
954          directExecutor());
955    } else {
956      switch (state.get()) {
957        case SUBSUMED:
958          throw new IllegalStateException(
959              "Cannot call finishToFuture() after deriving another step");
960
961        case WILL_CREATE_VALUE_AND_CLOSER:
962          throw new IllegalStateException(
963              "Cannot call finishToFuture() after calling finishToValueAndCloser()");
964
965        case WILL_CLOSE:
966        case CLOSING:
967        case CLOSED:
968          throw new IllegalStateException("Cannot call finishToFuture() twice");
969
970        case OPEN:
971          throw new AssertionError();
972      }
973    }
974    return future;
975  }
976
977  /**
978   * Marks this step as the last step in the {@code ClosingFuture} pipeline. When this step is done,
979   * {@code receiver} will be called with an object that contains the result of the operation. The
980   * receiver can store the {@link ValueAndCloser} outside the receiver for later synchronous use.
981   *
982   * <p>After calling this method, you may not call {@link #finishToFuture()}, this method again, or
983   * any other derivation method on this {@code ClosingFuture}.
984   *
985   * @param consumer a callback whose method will be called (using {@code executor}) when this
986   *     operation is done
987   */
988  public void finishToValueAndCloser(
989      final ValueAndCloserConsumer<? super V> consumer, Executor executor) {
990    checkNotNull(consumer);
991    if (!compareAndUpdateState(OPEN, WILL_CREATE_VALUE_AND_CLOSER)) {
992      switch (state.get()) {
993        case SUBSUMED:
994          throw new IllegalStateException(
995              "Cannot call finishToValueAndCloser() after deriving another step");
996
997        case WILL_CLOSE:
998        case CLOSING:
999        case CLOSED:
1000          throw new IllegalStateException(
1001              "Cannot call finishToValueAndCloser() after calling finishToFuture()");
1002
1003        case WILL_CREATE_VALUE_AND_CLOSER:
1004          throw new IllegalStateException("Cannot call finishToValueAndCloser() twice");
1005
1006        case OPEN:
1007          break;
1008      }
1009      throw new AssertionError(state);
1010    }
1011    future.addListener(
1012        new Runnable() {
1013          @Override
1014          public void run() {
1015            provideValueAndCloser(consumer, ClosingFuture.this);
1016          }
1017        },
1018        executor);
1019  }
1020
1021  private static <C, V extends C> void provideValueAndCloser(
1022      ValueAndCloserConsumer<C> consumer, ClosingFuture<V> closingFuture) {
1023    consumer.accept(new ValueAndCloser<C>(closingFuture));
1024  }
1025
1026  /**
1027   * Attempts to cancel execution of this step. This attempt will fail if the step has already
1028   * completed, has already been cancelled, or could not be cancelled for some other reason. If
1029   * successful, and this step has not started when {@code cancel} is called, this step should never
1030   * run.
1031   *
1032   * <p>If successful, causes the objects captured by this step (if already started) and its input
1033   * step(s) for later closing to be closed on their respective {@link Executor}s. If any such calls
1034   * specified {@link MoreExecutors#directExecutor()}, those objects will be closed synchronously.
1035   *
1036   * @param mayInterruptIfRunning {@code true} if the thread executing this task should be
1037   *     interrupted; otherwise, in-progress tasks are allowed to complete, but the step will be
1038   *     cancelled regardless
1039   * @return {@code false} if the step could not be cancelled, typically because it has already
1040   *     completed normally; {@code true} otherwise
1041   */
1042  @CanIgnoreReturnValue
1043  public boolean cancel(boolean mayInterruptIfRunning) {
1044    logger.log(FINER, "cancelling {0}", this);
1045    boolean cancelled = future.cancel(mayInterruptIfRunning);
1046    if (cancelled) {
1047      close();
1048    }
1049    return cancelled;
1050  }
1051
1052  private void close() {
1053    logger.log(FINER, "closing {0}", this);
1054    closeables.close();
1055  }
1056
1057  private <U> ClosingFuture<U> derive(FluentFuture<U> future) {
1058    ClosingFuture<U> derived = new ClosingFuture<>(future);
1059    becomeSubsumedInto(derived.closeables);
1060    return derived;
1061  }
1062
1063  private void becomeSubsumedInto(CloseableList otherCloseables) {
1064    checkAndUpdateState(OPEN, SUBSUMED);
1065    otherCloseables.add(closeables, directExecutor());
1066  }
1067
1068  /**
1069   * An object that can return the value of the {@link ClosingFuture}s that are passed to {@link
1070   * #whenAllComplete(Iterable)} or {@link #whenAllSucceed(Iterable)}.
1071   *
1072   * <p>Only for use by a {@link CombiningCallable} or {@link AsyncCombiningCallable} object.
1073   */
1074  public static final class Peeker {
1075    private final ImmutableList<ClosingFuture<?>> futures;
1076    private volatile boolean beingCalled;
1077
1078    private Peeker(ImmutableList<ClosingFuture<?>> futures) {
1079      this.futures = checkNotNull(futures);
1080    }
1081
1082    /**
1083     * Returns the value of {@code closingFuture}.
1084     *
1085     * @throws ExecutionException if {@code closingFuture} is a failed step
1086     * @throws CancellationException if the {@code closingFuture}'s future was cancelled
1087     * @throws IllegalArgumentException if {@code closingFuture} is not one of the futures passed to
1088     *     {@link #whenAllComplete(Iterable)} or {@link #whenAllComplete(Iterable)}
1089     * @throws IllegalStateException if called outside of a call to {@link
1090     *     CombiningCallable#call(DeferredCloser, Peeker)} or {@link
1091     *     AsyncCombiningCallable#call(DeferredCloser, Peeker)}
1092     */
1093    public final <D extends @Nullable Object> D getDone(ClosingFuture<D> closingFuture)
1094        throws ExecutionException {
1095      checkState(beingCalled);
1096      checkArgument(futures.contains(closingFuture));
1097      return Futures.getDone(closingFuture.future);
1098    }
1099
1100    private <V extends @Nullable Object> V call(
1101        CombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1102      beingCalled = true;
1103      CloseableList newCloseables = new CloseableList();
1104      try {
1105        return combiner.call(newCloseables.closer, this);
1106      } finally {
1107        closeables.add(newCloseables, directExecutor());
1108        beingCalled = false;
1109      }
1110    }
1111
1112    private <V extends @Nullable Object> FluentFuture<V> callAsync(
1113        AsyncCombiningCallable<V> combiner, CloseableList closeables) throws Exception {
1114      beingCalled = true;
1115      CloseableList newCloseables = new CloseableList();
1116      try {
1117        ClosingFuture<V> closingFuture = combiner.call(newCloseables.closer, this);
1118        closingFuture.becomeSubsumedInto(closeables);
1119        return closingFuture.future;
1120      } finally {
1121        closeables.add(newCloseables, directExecutor());
1122        beingCalled = false;
1123      }
1124    }
1125  }
1126
1127  /**
1128   * A builder of a {@link ClosingFuture} step that is derived from more than one input step.
1129   *
1130   * <p>See {@link #whenAllComplete(Iterable)} and {@link #whenAllSucceed(Iterable)} for how to
1131   * instantiate this class.
1132   *
1133   * <p>Example:
1134   *
1135   * <pre>{@code
1136   * final ClosingFuture<BufferedReader> file1ReaderFuture = ...;
1137   * final ClosingFuture<BufferedReader> file2ReaderFuture = ...;
1138   * ListenableFuture<Integer> numberOfDifferentLines =
1139   *       ClosingFuture.whenAllSucceed(file1ReaderFuture, file2ReaderFuture)
1140   *           .call(
1141   *               (closer, peeker) -> {
1142   *                 BufferedReader file1Reader = peeker.getDone(file1ReaderFuture);
1143   *                 BufferedReader file2Reader = peeker.getDone(file2ReaderFuture);
1144   *                 return countDifferentLines(file1Reader, file2Reader);
1145   *               },
1146   *               executor)
1147   *           .closing(executor);
1148   * }</pre>
1149   */
1150  // TODO(cpovirk): Use simple name instead of fully qualified after we stop building with JDK 8.
1151  @com.google.errorprone.annotations.DoNotMock(
1152      "Use ClosingFuture.whenAllSucceed() or .whenAllComplete() instead.")
1153  public static class Combiner {
1154
1155    private final CloseableList closeables = new CloseableList();
1156
1157    /**
1158     * An operation that returns a result and may throw an exception.
1159     *
1160     * @param <V> the type of the result
1161     */
1162    @FunctionalInterface
1163    public interface CombiningCallable<V extends @Nullable Object> {
1164      /**
1165       * Computes a result, or throws an exception if unable to do so.
1166       *
1167       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1168       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1169       * is done (but not before this method completes), even if this method throws or the pipeline
1170       * is cancelled.
1171       *
1172       * @param peeker used to get the value of any of the input futures
1173       */
1174      V call(DeferredCloser closer, Peeker peeker) throws Exception;
1175    }
1176
1177    /**
1178     * An operation that returns a {@link ClosingFuture} result and may throw an exception.
1179     *
1180     * @param <V> the type of the result
1181     */
1182    @FunctionalInterface
1183    public interface AsyncCombiningCallable<V extends @Nullable Object> {
1184      /**
1185       * Computes a {@link ClosingFuture} result, or throws an exception if unable to do so.
1186       *
1187       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1188       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1189       * is done (but not before this method completes), even if this method throws or the pipeline
1190       * is cancelled.
1191       *
1192       * @param peeker used to get the value of any of the input futures
1193       */
1194      ClosingFuture<V> call(DeferredCloser closer, Peeker peeker) throws Exception;
1195    }
1196
1197    private final boolean allMustSucceed;
1198    protected final ImmutableList<ClosingFuture<?>> inputs;
1199
1200    private Combiner(boolean allMustSucceed, Iterable<? extends ClosingFuture<?>> inputs) {
1201      this.allMustSucceed = allMustSucceed;
1202      this.inputs = ImmutableList.copyOf(inputs);
1203      for (ClosingFuture<?> input : inputs) {
1204        input.becomeSubsumedInto(closeables);
1205      }
1206    }
1207
1208    /**
1209     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1210     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1211     * objects to be closed when the pipeline is done.
1212     *
1213     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1214     * fail, so will the returned step.
1215     *
1216     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1217     * cancelled.
1218     *
1219     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1220     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1221     */
1222    public <V> ClosingFuture<V> call(
1223        final CombiningCallable<V> combiningCallable, Executor executor) {
1224      Callable<V> callable =
1225          new Callable<V>() {
1226            @Override
1227            public V call() throws Exception {
1228              return new Peeker(inputs).call(combiningCallable, closeables);
1229            }
1230
1231            @Override
1232            public String toString() {
1233              return combiningCallable.toString();
1234            }
1235          };
1236      ClosingFuture<V> derived = new ClosingFuture<>(futureCombiner().call(callable, executor));
1237      derived.closeables.add(closeables, directExecutor());
1238      return derived;
1239    }
1240
1241    /**
1242     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1243     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1244     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1245     * captured by the returned {@link ClosingFuture}).
1246     *
1247     * <p>If this combiner was returned by a {@link #whenAllSucceed} method and any of the inputs
1248     * fail, so will the returned step.
1249     *
1250     * <p>If the combiningCallable throws a {@code CancellationException}, the pipeline will be
1251     * cancelled.
1252     *
1253     * <p>If the combiningCallable throws an {@code ExecutionException}, the cause of the thrown
1254     * {@code ExecutionException} will be extracted and used as the failure of the derived step.
1255     *
1256     * <p>If the combiningCallable throws any other exception, it will be used as the failure of the
1257     * derived step.
1258     *
1259     * <p>If an exception is thrown after the combiningCallable creates a {@code ClosingFuture},
1260     * then none of the closeable objects in that {@code ClosingFuture} will be closed.
1261     *
1262     * <p>Usage guidelines for this method:
1263     *
1264     * <ul>
1265     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1266     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1267     *       Executor)} instead, with a function that returns the next value directly.
1268     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1269     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1270     *       capture it for later closing.
1271     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1272     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1273     * </ul>
1274     *
1275     * <p>The same warnings about doing heavyweight operations within {@link
1276     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1277     */
1278    public <V> ClosingFuture<V> callAsync(
1279        final AsyncCombiningCallable<V> combiningCallable, Executor executor) {
1280      AsyncCallable<V> asyncCallable =
1281          new AsyncCallable<V>() {
1282            @Override
1283            public ListenableFuture<V> call() throws Exception {
1284              return new Peeker(inputs).callAsync(combiningCallable, closeables);
1285            }
1286
1287            @Override
1288            public String toString() {
1289              return combiningCallable.toString();
1290            }
1291          };
1292      ClosingFuture<V> derived =
1293          new ClosingFuture<>(futureCombiner().callAsync(asyncCallable, executor));
1294      derived.closeables.add(closeables, directExecutor());
1295      return derived;
1296    }
1297
1298    private FutureCombiner<Object> futureCombiner() {
1299      return allMustSucceed
1300          ? Futures.whenAllSucceed(inputFutures())
1301          : Futures.whenAllComplete(inputFutures());
1302    }
1303
1304    private static final Function<ClosingFuture<?>, FluentFuture<?>> INNER_FUTURE =
1305        new Function<ClosingFuture<?>, FluentFuture<?>>() {
1306          @Override
1307          public FluentFuture<?> apply(ClosingFuture<?> future) {
1308            return future.future;
1309          }
1310        };
1311
1312    private ImmutableList<FluentFuture<?>> inputFutures() {
1313      return FluentIterable.from(inputs).transform(INNER_FUTURE).toList();
1314    }
1315  }
1316
1317  /**
1318   * A generic {@link Combiner} that lets you use a lambda or method reference to combine two {@link
1319   * ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} to start this
1320   * combination.
1321   *
1322   * @param <V1> the type returned by the first future
1323   * @param <V2> the type returned by the second future
1324   */
1325  public static final class Combiner2<V1 extends @Nullable Object, V2 extends @Nullable Object>
1326      extends Combiner {
1327
1328    /**
1329     * A function that returns a value when applied to the values of the two futures passed to
1330     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1331     *
1332     * @param <V1> the type returned by the first future
1333     * @param <V2> the type returned by the second future
1334     * @param <U> the type returned by the function
1335     */
1336    @FunctionalInterface
1337    public interface ClosingFunction2<
1338        V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> {
1339
1340      /**
1341       * Applies this function to two inputs, or throws an exception if unable to do so.
1342       *
1343       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1344       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1345       * is done (but not before this method completes), even if this method throws or the pipeline
1346       * is cancelled.
1347       */
1348      U apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception;
1349    }
1350
1351    /**
1352     * A function that returns a {@link ClosingFuture} when applied to the values of the two futures
1353     * passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture)}.
1354     *
1355     * @param <V1> the type returned by the first future
1356     * @param <V2> the type returned by the second future
1357     * @param <U> the type returned by the function
1358     */
1359    @FunctionalInterface
1360    public interface AsyncClosingFunction2<
1361        V1 extends @Nullable Object, V2 extends @Nullable Object, U extends @Nullable Object> {
1362
1363      /**
1364       * Applies this function to two inputs, or throws an exception if unable to do so.
1365       *
1366       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1367       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1368       * is done (but not before this method completes), even if this method throws or the pipeline
1369       * is cancelled.
1370       */
1371      ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2) throws Exception;
1372    }
1373
1374    private final ClosingFuture<V1> future1;
1375    private final ClosingFuture<V2> future2;
1376
1377    private Combiner2(ClosingFuture<V1> future1, ClosingFuture<V2> future2) {
1378      super(true, ImmutableList.of(future1, future2));
1379      this.future1 = future1;
1380      this.future2 = future2;
1381    }
1382
1383    /**
1384     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1385     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1386     * objects to be closed when the pipeline is done.
1387     *
1388     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1389     * any of the inputs fail, so will the returned step.
1390     *
1391     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1392     *
1393     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1394     * ExecutionException} will be extracted and used as the failure of the derived step.
1395     */
1396    public <U extends @Nullable Object> ClosingFuture<U> call(
1397        final ClosingFunction2<V1, V2, U> function, Executor executor) {
1398      return call(
1399          new CombiningCallable<U>() {
1400            @Override
1401            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1402              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1403            }
1404
1405            @Override
1406            public String toString() {
1407              return function.toString();
1408            }
1409          },
1410          executor);
1411    }
1412
1413    /**
1414     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1415     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1416     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1417     * captured by the returned {@link ClosingFuture}).
1418     *
1419     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture)} and
1420     * any of the inputs fail, so will the returned step.
1421     *
1422     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1423     *
1424     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1425     * ExecutionException} will be extracted and used as the failure of the derived step.
1426     *
1427     * <p>If the function throws any other exception, it will be used as the failure of the derived
1428     * step.
1429     *
1430     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1431     * the closeable objects in that {@code ClosingFuture} will be closed.
1432     *
1433     * <p>Usage guidelines for this method:
1434     *
1435     * <ul>
1436     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1437     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1438     *       Executor)} instead, with a function that returns the next value directly.
1439     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1440     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1441     *       capture it for later closing.
1442     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1443     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1444     * </ul>
1445     *
1446     * <p>The same warnings about doing heavyweight operations within {@link
1447     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1448     */
1449    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1450        final AsyncClosingFunction2<V1, V2, U> function, Executor executor) {
1451      return callAsync(
1452          new AsyncCombiningCallable<U>() {
1453            @Override
1454            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1455              return function.apply(closer, peeker.getDone(future1), peeker.getDone(future2));
1456            }
1457
1458            @Override
1459            public String toString() {
1460              return function.toString();
1461            }
1462          },
1463          executor);
1464    }
1465  }
1466
1467  /**
1468   * A generic {@link Combiner} that lets you use a lambda or method reference to combine three
1469   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1470   * ClosingFuture)} to start this combination.
1471   *
1472   * @param <V1> the type returned by the first future
1473   * @param <V2> the type returned by the second future
1474   * @param <V3> the type returned by the third future
1475   */
1476  public static final class Combiner3<
1477          V1 extends @Nullable Object, V2 extends @Nullable Object, V3 extends @Nullable Object>
1478      extends Combiner {
1479    /**
1480     * A function that returns a value when applied to the values of the three futures passed to
1481     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1482     *
1483     * @param <V1> the type returned by the first future
1484     * @param <V2> the type returned by the second future
1485     * @param <V3> the type returned by the third future
1486     * @param <U> the type returned by the function
1487     */
1488    @FunctionalInterface
1489    public interface ClosingFunction3<
1490        V1 extends @Nullable Object,
1491        V2 extends @Nullable Object,
1492        V3 extends @Nullable Object,
1493        U extends @Nullable Object> {
1494      /**
1495       * Applies this function to three inputs, or throws an exception if unable to do so.
1496       *
1497       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1498       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1499       * is done (but not before this method completes), even if this method throws or the pipeline
1500       * is cancelled.
1501       */
1502      U apply(DeferredCloser closer, V1 value1, V2 value2, V3 v3) throws Exception;
1503    }
1504
1505    /**
1506     * A function that returns a {@link ClosingFuture} when applied to the values of the three
1507     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture)}.
1508     *
1509     * @param <V1> the type returned by the first future
1510     * @param <V2> the type returned by the second future
1511     * @param <V3> the type returned by the third future
1512     * @param <U> the type returned by the function
1513     */
1514    @FunctionalInterface
1515    public interface AsyncClosingFunction3<
1516        V1 extends @Nullable Object,
1517        V2 extends @Nullable Object,
1518        V3 extends @Nullable Object,
1519        U extends @Nullable Object> {
1520      /**
1521       * Applies this function to three inputs, or throws an exception if unable to do so.
1522       *
1523       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1524       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1525       * is done (but not before this method completes), even if this method throws or the pipeline
1526       * is cancelled.
1527       */
1528      ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3)
1529          throws Exception;
1530    }
1531
1532    private final ClosingFuture<V1> future1;
1533    private final ClosingFuture<V2> future2;
1534    private final ClosingFuture<V3> future3;
1535
1536    private Combiner3(
1537        ClosingFuture<V1> future1, ClosingFuture<V2> future2, ClosingFuture<V3> future3) {
1538      super(true, ImmutableList.of(future1, future2, future3));
1539      this.future1 = future1;
1540      this.future2 = future2;
1541      this.future3 = future3;
1542    }
1543
1544    /**
1545     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1546     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1547     * objects to be closed when the pipeline is done.
1548     *
1549     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1550     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1551     *
1552     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1553     *
1554     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1555     * ExecutionException} will be extracted and used as the failure of the derived step.
1556     */
1557    public <U extends @Nullable Object> ClosingFuture<U> call(
1558        final ClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1559      return call(
1560          new CombiningCallable<U>() {
1561            @Override
1562            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1563              return function.apply(
1564                  closer,
1565                  peeker.getDone(future1),
1566                  peeker.getDone(future2),
1567                  peeker.getDone(future3));
1568            }
1569
1570            @Override
1571            public String toString() {
1572              return function.toString();
1573            }
1574          },
1575          executor);
1576    }
1577
1578    /**
1579     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1580     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1581     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1582     * captured by the returned {@link ClosingFuture}).
1583     *
1584     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1585     * ClosingFuture)} and any of the inputs fail, so will the returned step.
1586     *
1587     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1588     *
1589     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1590     * ExecutionException} will be extracted and used as the failure of the derived step.
1591     *
1592     * <p>If the function throws any other exception, it will be used as the failure of the derived
1593     * step.
1594     *
1595     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1596     * the closeable objects in that {@code ClosingFuture} will be closed.
1597     *
1598     * <p>Usage guidelines for this method:
1599     *
1600     * <ul>
1601     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1602     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1603     *       Executor)} instead, with a function that returns the next value directly.
1604     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1605     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1606     *       capture it for later closing.
1607     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1608     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1609     * </ul>
1610     *
1611     * <p>The same warnings about doing heavyweight operations within {@link
1612     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1613     */
1614    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1615        final AsyncClosingFunction3<V1, V2, V3, U> function, Executor executor) {
1616      return callAsync(
1617          new AsyncCombiningCallable<U>() {
1618            @Override
1619            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1620              return function.apply(
1621                  closer,
1622                  peeker.getDone(future1),
1623                  peeker.getDone(future2),
1624                  peeker.getDone(future3));
1625            }
1626
1627            @Override
1628            public String toString() {
1629              return function.toString();
1630            }
1631          },
1632          executor);
1633    }
1634  }
1635
1636  /**
1637   * A generic {@link Combiner} that lets you use a lambda or method reference to combine four
1638   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1639   * ClosingFuture)} to start this combination.
1640   *
1641   * @param <V1> the type returned by the first future
1642   * @param <V2> the type returned by the second future
1643   * @param <V3> the type returned by the third future
1644   * @param <V4> the type returned by the fourth future
1645   */
1646  public static final class Combiner4<
1647          V1 extends @Nullable Object,
1648          V2 extends @Nullable Object,
1649          V3 extends @Nullable Object,
1650          V4 extends @Nullable Object>
1651      extends Combiner {
1652    /**
1653     * A function that returns a value when applied to the values of the four futures passed to
1654     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture)}.
1655     *
1656     * @param <V1> the type returned by the first future
1657     * @param <V2> the type returned by the second future
1658     * @param <V3> the type returned by the third future
1659     * @param <V4> the type returned by the fourth future
1660     * @param <U> the type returned by the function
1661     */
1662    @FunctionalInterface
1663    public interface ClosingFunction4<
1664        V1 extends @Nullable Object,
1665        V2 extends @Nullable Object,
1666        V3 extends @Nullable Object,
1667        V4 extends @Nullable Object,
1668        U extends @Nullable Object> {
1669      /**
1670       * Applies this function to four inputs, or throws an exception if unable to do so.
1671       *
1672       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1673       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1674       * is done (but not before this method completes), even if this method throws or the pipeline
1675       * is cancelled.
1676       */
1677      U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4) throws Exception;
1678    }
1679
1680    /**
1681     * A function that returns a {@link ClosingFuture} when applied to the values of the four
1682     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1683     * ClosingFuture)}.
1684     *
1685     * @param <V1> the type returned by the first future
1686     * @param <V2> the type returned by the second future
1687     * @param <V3> the type returned by the third future
1688     * @param <V4> the type returned by the fourth future
1689     * @param <U> the type returned by the function
1690     */
1691    @FunctionalInterface
1692    public interface AsyncClosingFunction4<
1693        V1 extends @Nullable Object,
1694        V2 extends @Nullable Object,
1695        V3 extends @Nullable Object,
1696        V4 extends @Nullable Object,
1697        U extends @Nullable Object> {
1698      /**
1699       * Applies this function to four inputs, or throws an exception if unable to do so.
1700       *
1701       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1702       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1703       * is done (but not before this method completes), even if this method throws or the pipeline
1704       * is cancelled.
1705       */
1706      ClosingFuture<U> apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4)
1707          throws Exception;
1708    }
1709
1710    private final ClosingFuture<V1> future1;
1711    private final ClosingFuture<V2> future2;
1712    private final ClosingFuture<V3> future3;
1713    private final ClosingFuture<V4> future4;
1714
1715    private Combiner4(
1716        ClosingFuture<V1> future1,
1717        ClosingFuture<V2> future2,
1718        ClosingFuture<V3> future3,
1719        ClosingFuture<V4> future4) {
1720      super(true, ImmutableList.of(future1, future2, future3, future4));
1721      this.future1 = future1;
1722      this.future2 = future2;
1723      this.future3 = future3;
1724      this.future4 = future4;
1725    }
1726
1727    /**
1728     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1729     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1730     * objects to be closed when the pipeline is done.
1731     *
1732     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1733     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1734     *
1735     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1736     *
1737     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1738     * ExecutionException} will be extracted and used as the failure of the derived step.
1739     */
1740    public <U extends @Nullable Object> ClosingFuture<U> call(
1741        final ClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1742      return call(
1743          new CombiningCallable<U>() {
1744            @Override
1745            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1746              return function.apply(
1747                  closer,
1748                  peeker.getDone(future1),
1749                  peeker.getDone(future2),
1750                  peeker.getDone(future3),
1751                  peeker.getDone(future4));
1752            }
1753
1754            @Override
1755            public String toString() {
1756              return function.toString();
1757            }
1758          },
1759          executor);
1760    }
1761
1762    /**
1763     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1764     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1765     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1766     * captured by the returned {@link ClosingFuture}).
1767     *
1768     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1769     * ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the returned step.
1770     *
1771     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1772     *
1773     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1774     * ExecutionException} will be extracted and used as the failure of the derived step.
1775     *
1776     * <p>If the function throws any other exception, it will be used as the failure of the derived
1777     * step.
1778     *
1779     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1780     * the closeable objects in that {@code ClosingFuture} will be closed.
1781     *
1782     * <p>Usage guidelines for this method:
1783     *
1784     * <ul>
1785     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1786     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1787     *       Executor)} instead, with a function that returns the next value directly.
1788     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1789     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1790     *       capture it for later closing.
1791     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1792     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1793     * </ul>
1794     *
1795     * <p>The same warnings about doing heavyweight operations within {@link
1796     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1797     */
1798    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1799        final AsyncClosingFunction4<V1, V2, V3, V4, U> function, Executor executor) {
1800      return callAsync(
1801          new AsyncCombiningCallable<U>() {
1802            @Override
1803            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
1804              return function.apply(
1805                  closer,
1806                  peeker.getDone(future1),
1807                  peeker.getDone(future2),
1808                  peeker.getDone(future3),
1809                  peeker.getDone(future4));
1810            }
1811
1812            @Override
1813            public String toString() {
1814              return function.toString();
1815            }
1816          },
1817          executor);
1818    }
1819  }
1820
1821  /**
1822   * A generic {@link Combiner} that lets you use a lambda or method reference to combine five
1823   * {@link ClosingFuture}s. Use {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1824   * ClosingFuture, ClosingFuture)} to start this combination.
1825   *
1826   * @param <V1> the type returned by the first future
1827   * @param <V2> the type returned by the second future
1828   * @param <V3> the type returned by the third future
1829   * @param <V4> the type returned by the fourth future
1830   * @param <V5> the type returned by the fifth future
1831   */
1832  public static final class Combiner5<
1833          V1 extends @Nullable Object,
1834          V2 extends @Nullable Object,
1835          V3 extends @Nullable Object,
1836          V4 extends @Nullable Object,
1837          V5 extends @Nullable Object>
1838      extends Combiner {
1839    /**
1840     * A function that returns a value when applied to the values of the five futures passed to
1841     * {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture, ClosingFuture,
1842     * ClosingFuture)}.
1843     *
1844     * @param <V1> the type returned by the first future
1845     * @param <V2> the type returned by the second future
1846     * @param <V3> the type returned by the third future
1847     * @param <V4> the type returned by the fourth future
1848     * @param <V5> the type returned by the fifth future
1849     * @param <U> the type returned by the function
1850     */
1851    @FunctionalInterface
1852    public interface ClosingFunction5<
1853        V1 extends @Nullable Object,
1854        V2 extends @Nullable Object,
1855        V3 extends @Nullable Object,
1856        V4 extends @Nullable Object,
1857        V5 extends @Nullable Object,
1858        U extends @Nullable Object> {
1859      /**
1860       * Applies this function to five inputs, or throws an exception if unable to do so.
1861       *
1862       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1863       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1864       * is done (but not before this method completes), even if this method throws or the pipeline
1865       * is cancelled.
1866       */
1867      U apply(DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5)
1868          throws Exception;
1869    }
1870
1871    /**
1872     * A function that returns a {@link ClosingFuture} when applied to the values of the five
1873     * futures passed to {@link #whenAllSucceed(ClosingFuture, ClosingFuture, ClosingFuture,
1874     * ClosingFuture, ClosingFuture)}.
1875     *
1876     * @param <V1> the type returned by the first future
1877     * @param <V2> the type returned by the second future
1878     * @param <V3> the type returned by the third future
1879     * @param <V4> the type returned by the fourth future
1880     * @param <V5> the type returned by the fifth future
1881     * @param <U> the type returned by the function
1882     */
1883    @FunctionalInterface
1884    public interface AsyncClosingFunction5<
1885        V1 extends @Nullable Object,
1886        V2 extends @Nullable Object,
1887        V3 extends @Nullable Object,
1888        V4 extends @Nullable Object,
1889        V5 extends @Nullable Object,
1890        U extends @Nullable Object> {
1891      /**
1892       * Applies this function to five inputs, or throws an exception if unable to do so.
1893       *
1894       * <p>Any objects that are passed to {@link DeferredCloser#eventuallyClose(Closeable,
1895       * Executor) closer.eventuallyClose()} will be closed when the {@link ClosingFuture} pipeline
1896       * is done (but not before this method completes), even if this method throws or the pipeline
1897       * is cancelled.
1898       */
1899      ClosingFuture<U> apply(
1900          DeferredCloser closer, V1 value1, V2 value2, V3 value3, V4 value4, V5 value5)
1901          throws Exception;
1902    }
1903
1904    private final ClosingFuture<V1> future1;
1905    private final ClosingFuture<V2> future2;
1906    private final ClosingFuture<V3> future3;
1907    private final ClosingFuture<V4> future4;
1908    private final ClosingFuture<V5> future5;
1909
1910    private Combiner5(
1911        ClosingFuture<V1> future1,
1912        ClosingFuture<V2> future2,
1913        ClosingFuture<V3> future3,
1914        ClosingFuture<V4> future4,
1915        ClosingFuture<V5> future5) {
1916      super(true, ImmutableList.of(future1, future2, future3, future4, future5));
1917      this.future1 = future1;
1918      this.future2 = future2;
1919      this.future3 = future3;
1920      this.future4 = future4;
1921      this.future5 = future5;
1922    }
1923
1924    /**
1925     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1926     * combining function to their values. The function can use a {@link DeferredCloser} to capture
1927     * objects to be closed when the pipeline is done.
1928     *
1929     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1930     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
1931     * returned step.
1932     *
1933     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1934     *
1935     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1936     * ExecutionException} will be extracted and used as the failure of the derived step.
1937     */
1938    public <U extends @Nullable Object> ClosingFuture<U> call(
1939        final ClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
1940      return call(
1941          new CombiningCallable<U>() {
1942            @Override
1943            public U call(DeferredCloser closer, Peeker peeker) throws Exception {
1944              return function.apply(
1945                  closer,
1946                  peeker.getDone(future1),
1947                  peeker.getDone(future2),
1948                  peeker.getDone(future3),
1949                  peeker.getDone(future4),
1950                  peeker.getDone(future5));
1951            }
1952
1953            @Override
1954            public String toString() {
1955              return function.toString();
1956            }
1957          },
1958          executor);
1959    }
1960
1961    /**
1962     * Returns a new {@code ClosingFuture} pipeline step derived from the inputs by applying a
1963     * {@code ClosingFuture}-returning function to their values. The function can use a {@link
1964     * DeferredCloser} to capture objects to be closed when the pipeline is done (other than those
1965     * captured by the returned {@link ClosingFuture}).
1966     *
1967     * <p>If this combiner was returned by {@link #whenAllSucceed(ClosingFuture, ClosingFuture,
1968     * ClosingFuture, ClosingFuture, ClosingFuture)} and any of the inputs fail, so will the
1969     * returned step.
1970     *
1971     * <p>If the function throws a {@code CancellationException}, the pipeline will be cancelled.
1972     *
1973     * <p>If the function throws an {@code ExecutionException}, the cause of the thrown {@code
1974     * ExecutionException} will be extracted and used as the failure of the derived step.
1975     *
1976     * <p>If the function throws any other exception, it will be used as the failure of the derived
1977     * step.
1978     *
1979     * <p>If an exception is thrown after the function creates a {@code ClosingFuture}, then none of
1980     * the closeable objects in that {@code ClosingFuture} will be closed.
1981     *
1982     * <p>Usage guidelines for this method:
1983     *
1984     * <ul>
1985     *   <li>Use this method only when calling an API that returns a {@link ListenableFuture} or a
1986     *       {@code ClosingFuture}. If possible, prefer calling {@link #call(CombiningCallable,
1987     *       Executor)} instead, with a function that returns the next value directly.
1988     *   <li>Call {@link DeferredCloser#eventuallyClose(Closeable, Executor)
1989     *       closer.eventuallyClose()} for every closeable object this step creates in order to
1990     *       capture it for later closing.
1991     *   <li>Return a {@code ClosingFuture}. To turn a {@link ListenableFuture} into a {@code
1992     *       ClosingFuture} call {@link #from(ListenableFuture)}.
1993     * </ul>
1994     *
1995     * <p>The same warnings about doing heavyweight operations within {@link
1996     * ClosingFuture#transformAsync(AsyncClosingFunction, Executor)} apply here.
1997     */
1998    public <U extends @Nullable Object> ClosingFuture<U> callAsync(
1999        final AsyncClosingFunction5<V1, V2, V3, V4, V5, U> function, Executor executor) {
2000      return callAsync(
2001          new AsyncCombiningCallable<U>() {
2002            @Override
2003            public ClosingFuture<U> call(DeferredCloser closer, Peeker peeker) throws Exception {
2004              return function.apply(
2005                  closer,
2006                  peeker.getDone(future1),
2007                  peeker.getDone(future2),
2008                  peeker.getDone(future3),
2009                  peeker.getDone(future4),
2010                  peeker.getDone(future5));
2011            }
2012
2013            @Override
2014            public String toString() {
2015              return function.toString();
2016            }
2017          },
2018          executor);
2019    }
2020  }
2021
2022  @Override
2023  public String toString() {
2024    // TODO(dpb): Better toString, in the style of Futures.transform etc.
2025    return toStringHelper(this).add("state", state.get()).addValue(future).toString();
2026  }
2027
2028  @Override
2029  protected void finalize() {
2030    if (state.get().equals(OPEN)) {
2031      logger.log(SEVERE, "Uh oh! An open ClosingFuture has leaked and will close: {0}", this);
2032      FluentFuture<V> unused = finishToFuture();
2033    }
2034  }
2035
2036  private static void closeQuietly(final AutoCloseable closeable, Executor executor) {
2037    if (closeable == null) {
2038      return;
2039    }
2040    try {
2041      executor.execute(
2042          new Runnable() {
2043            @Override
2044            public void run() {
2045              try {
2046                closeable.close();
2047              } catch (Exception e) {
2048                logger.log(WARNING, "thrown by close()", e);
2049              }
2050            }
2051          });
2052    } catch (RejectedExecutionException e) {
2053      if (logger.isLoggable(WARNING)) {
2054        logger.log(
2055            WARNING, String.format("while submitting close to %s; will close inline", executor), e);
2056      }
2057      closeQuietly(closeable, directExecutor());
2058    }
2059  }
2060
2061  private void checkAndUpdateState(State oldState, State newState) {
2062    checkState(
2063        compareAndUpdateState(oldState, newState),
2064        "Expected state to be %s, but it was %s",
2065        oldState,
2066        newState);
2067  }
2068
2069  private boolean compareAndUpdateState(State oldState, State newState) {
2070    return state.compareAndSet(oldState, newState);
2071  }
2072
2073  // TODO(dpb): Should we use a pair of ArrayLists instead of an IdentityHashMap?
2074  private static final class CloseableList extends IdentityHashMap<AutoCloseable, Executor>
2075      implements Closeable {
2076    private final DeferredCloser closer = new DeferredCloser(this);
2077    private volatile boolean closed;
2078    private volatile CountDownLatch whenClosed;
2079
2080    <V, U> ListenableFuture<U> applyClosingFunction(
2081        ClosingFunction<? super V, U> transformation, V input) throws Exception {
2082      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2083      CloseableList newCloseables = new CloseableList();
2084      try {
2085        return immediateFuture(transformation.apply(newCloseables.closer, input));
2086      } finally {
2087        add(newCloseables, directExecutor());
2088      }
2089    }
2090
2091    <V, U> FluentFuture<U> applyAsyncClosingFunction(
2092        AsyncClosingFunction<V, U> transformation, V input) throws Exception {
2093      // TODO(dpb): Consider ways to defer closing without creating a separate CloseableList.
2094      CloseableList newCloseables = new CloseableList();
2095      try {
2096        ClosingFuture<U> closingFuture = transformation.apply(newCloseables.closer, input);
2097        closingFuture.becomeSubsumedInto(newCloseables);
2098        return closingFuture.future;
2099      } finally {
2100        add(newCloseables, directExecutor());
2101      }
2102    }
2103
2104    @Override
2105    public void close() {
2106      if (closed) {
2107        return;
2108      }
2109      synchronized (this) {
2110        if (closed) {
2111          return;
2112        }
2113        closed = true;
2114      }
2115      for (Map.Entry<AutoCloseable, Executor> entry : entrySet()) {
2116        closeQuietly(entry.getKey(), entry.getValue());
2117      }
2118      clear();
2119      if (whenClosed != null) {
2120        whenClosed.countDown();
2121      }
2122    }
2123
2124    void add(@Nullable AutoCloseable closeable, Executor executor) {
2125      checkNotNull(executor);
2126      if (closeable == null) {
2127        return;
2128      }
2129      synchronized (this) {
2130        if (!closed) {
2131          put(closeable, executor);
2132          return;
2133        }
2134      }
2135      closeQuietly(closeable, executor);
2136    }
2137
2138    /**
2139     * Returns a latch that reaches zero when this objects' deferred closeables have been closed.
2140     */
2141    CountDownLatch whenClosedCountDown() {
2142      if (closed) {
2143        return new CountDownLatch(0);
2144      }
2145      synchronized (this) {
2146        if (closed) {
2147          return new CountDownLatch(0);
2148        }
2149        checkState(whenClosed == null);
2150        return whenClosed = new CountDownLatch(1);
2151      }
2152    }
2153  }
2154
2155  /**
2156   * Returns an object that can be used to wait until this objects' deferred closeables have all had
2157   * {@link Runnable}s that close them submitted to each one's closing {@link Executor}.
2158   */
2159  @VisibleForTesting
2160  CountDownLatch whenClosedCountDown() {
2161    return closeables.whenClosedCountDown();
2162  }
2163
2164  /** The state of a {@link CloseableList}. */
2165  enum State {
2166    /** The {@link CloseableList} has not been subsumed or closed. */
2167    OPEN,
2168
2169    /**
2170     * The {@link CloseableList} has been subsumed into another. It may not be closed or subsumed
2171     * into any other.
2172     */
2173    SUBSUMED,
2174
2175    /**
2176     * Some {@link ListenableFuture} has a callback attached that will close the {@link
2177     * CloseableList}, but it has not yet run. The {@link CloseableList} may not be subsumed.
2178     */
2179    WILL_CLOSE,
2180
2181    /**
2182     * The callback that closes the {@link CloseableList} is running, but it has not completed. The
2183     * {@link CloseableList} may not be subsumed.
2184     */
2185    CLOSING,
2186
2187    /** The {@link CloseableList} has been closed. It may not be further subsumed. */
2188    CLOSED,
2189
2190    /**
2191     * {@link ClosingFuture#finishToValueAndCloser(ValueAndCloserConsumer, Executor)} has been
2192     * called. The step may not be further subsumed, nor may {@link #finishToFuture()} be called.
2193     */
2194    WILL_CREATE_VALUE_AND_CLOSER,
2195  }
2196}