001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
021import static com.google.common.util.concurrent.Platform.isInstanceOfThrowableClass;
022import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
023
024import com.google.common.annotations.Beta;
025import com.google.common.annotations.GwtCompatible;
026import com.google.common.annotations.GwtIncompatible;
027import com.google.common.base.Function;
028import com.google.common.base.Optional;
029import com.google.common.base.Preconditions;
030import com.google.common.collect.ImmutableCollection;
031import com.google.common.collect.ImmutableList;
032import com.google.common.collect.Lists;
033import com.google.common.collect.Queues;
034
035import java.lang.reflect.UndeclaredThrowableException;
036import java.util.Collections;
037import java.util.List;
038import java.util.concurrent.CancellationException;
039import java.util.concurrent.ConcurrentLinkedQueue;
040import java.util.concurrent.ExecutionException;
041import java.util.concurrent.Executor;
042import java.util.concurrent.Future;
043import java.util.concurrent.RejectedExecutionException;
044import java.util.concurrent.ScheduledExecutorService;
045import java.util.concurrent.TimeUnit;
046import java.util.concurrent.TimeoutException;
047import java.util.logging.Level;
048import java.util.logging.Logger;
049
050import javax.annotation.CheckReturnValue;
051import javax.annotation.Nullable;
052
053/**
054 * Static utility methods pertaining to the {@link Future} interface.
055 *
056 * <p>Many of these methods use the {@link ListenableFuture} API; consult the
057 * Guava User Guide article on <a href=
058 * "https://github.com/google/guava/wiki/ListenableFutureExplained">
059 * {@code ListenableFuture}</a>.
060 *
061 * @author Kevin Bourrillion
062 * @author Nishant Thakkar
063 * @author Sven Mawson
064 * @since 1.0
065 */
066@Beta
067@GwtCompatible(emulated = true)
068public final class Futures extends GwtFuturesCatchingSpecialization {
069
070  // A note on memory visibility.
071  // Many of the utilities in this class (transform, withFallback, withTimeout, asList, combine)
072  // have two requirements that significantly complicate their design.
073  // 1. Cancellation should propagate from the returned future to the input future(s).
074  // 2. The returned futures shouldn't unnecessarily 'pin' their inputs after completion.
075  //
076  // A consequence of these requirements is that the delegate futures cannot be stored in
077  // final fields.
078  //
079  // For simplicity the rest of this description will discuss Futures.catching since it is the
080  // simplest instance, though very similar descriptions apply to many other classes in this file.
081  //
082  // In the constructor of AbstractCatchingFuture, the delegate future is assigned to a field
083  // 'inputFuture'. That field is non-final and non-volatile.  There are 2 places where the
084  // 'inputFuture' field is read and where we will have to consider visibility of the write
085  // operation in the constructor.
086  //
087  // 1. In the listener that performs the callback.  In this case it is fine since inputFuture is
088  //    assigned prior to calling addListener, and addListener happens-before any invocation of the
089  //    listener. Notably, this means that 'volatile' is unnecessary to make 'inputFuture' visible
090  //    to the listener.
091  //
092  // 2. In done() where we may propagate cancellation to the input.  In this case it is _not_ fine.
093  //    There is currently nothing that enforces that the write to inputFuture in the constructor is
094  //    visible to done().  This is because there is no happens before edge between the write and a
095  //    (hypothetical) unsafe read by our caller. Note: adding 'volatile' does not fix this issue,
096  //    it would just add an edge such that if done() observed non-null, then it would also
097  //    definitely observe all earlier writes, but we still have no guarantee that done() would see
098  //    the inital write (just stronger guarantees if it does).
099  //
100  // See: http://cs.oswego.edu/pipermail/concurrency-interest/2015-January/013800.html
101  // For a (long) discussion about this specific issue and the general futility of life.
102  //
103  // For the time being we are OK with the problem discussed above since it requires a caller to
104  // introduce a very specific kind of data-race.  And given the other operations performed by these
105  // methods that involve volatile read/write operations, in practice there is no issue.  Also, the
106  // way in such a visibility issue would surface is most likely as a failure of cancel() to
107  // propagate to the input.  Cancellation propagation is fundamentally racy so this is fine.
108  //
109  // Future versions of the JMM may revise safe construction semantics in such a way that we can
110  // safely publish these objects and we won't need this whole discussion.
111  // TODO(user,lukes): consider adding volatile to all these fields since in current known JVMs
112  // that should resolve the issue.  This comes at the cost of adding more write barriers to the
113  // implementations.
114
115  private Futures() {}
116
117  /**
118   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} and a {@link Function}
119   * that maps from {@link Exception} instances into the appropriate checked type.
120   *
121   * <p><b>Warning:</b> We recommend against using {@code CheckedFuture} in new projects. {@code
122   * CheckedFuture} is difficult to build libraries atop. {@code CheckedFuture} ports of methods
123   * like {@link Futures#transformAsync} have historically had bugs, and some of these bugs are
124   * necessary, unavoidable consequences of the {@code CheckedFuture} API. Additionally, {@code
125   * CheckedFuture} encourages users to take exceptions from one thread and rethrow them in another,
126   * producing confusing stack traces.
127   *
128   * <p>The given mapping function will be applied to an
129   * {@link InterruptedException}, a {@link CancellationException}, or an
130   * {@link ExecutionException}.
131   * See {@link Future#get()} for details on the exceptions thrown.
132   *
133   * @since 9.0 (source-compatible since 1.0)
134   */
135  @GwtIncompatible("TODO")
136  @CheckReturnValue
137  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
138      ListenableFuture<V> future, Function<? super Exception, X> mapper) {
139    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
140  }
141
142  private abstract static class ImmediateFuture<V>
143      implements ListenableFuture<V> {
144
145    private static final Logger log =
146        Logger.getLogger(ImmediateFuture.class.getName());
147
148    @Override
149    public void addListener(Runnable listener, Executor executor) {
150      checkNotNull(listener, "Runnable was null.");
151      checkNotNull(executor, "Executor was null.");
152      try {
153        executor.execute(listener);
154      } catch (RuntimeException e) {
155        // ListenableFuture's contract is that it will not throw unchecked
156        // exceptions, so log the bad runnable and/or executor and swallow it.
157        log.log(Level.SEVERE, "RuntimeException while executing runnable "
158            + listener + " with executor " + executor, e);
159      }
160    }
161
162    @Override
163    public boolean cancel(boolean mayInterruptIfRunning) {
164      return false;
165    }
166
167    @Override
168    public abstract V get() throws ExecutionException;
169
170    @Override
171    public V get(long timeout, TimeUnit unit) throws ExecutionException {
172      checkNotNull(unit);
173      return get();
174    }
175
176    @Override
177    public boolean isCancelled() {
178      return false;
179    }
180
181    @Override
182    public boolean isDone() {
183      return true;
184    }
185  }
186
187  private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
188    static final ImmediateSuccessfulFuture<Object> NULL =
189        new ImmediateSuccessfulFuture<Object>(null);
190
191    @Nullable private final V value;
192
193    ImmediateSuccessfulFuture(@Nullable V value) {
194      this.value = value;
195    }
196
197    @Override
198    public V get() {
199      return value;
200    }
201  }
202
203  @GwtIncompatible("TODO")
204  private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
205      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
206
207    @Nullable private final V value;
208
209    ImmediateSuccessfulCheckedFuture(@Nullable V value) {
210      this.value = value;
211    }
212
213    @Override
214    public V get() {
215      return value;
216    }
217
218    @Override
219    public V checkedGet() {
220      return value;
221    }
222
223    @Override
224    public V checkedGet(long timeout, TimeUnit unit) {
225      checkNotNull(unit);
226      return value;
227    }
228  }
229
230  private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
231
232    private final Throwable thrown;
233
234    ImmediateFailedFuture(Throwable thrown) {
235      this.thrown = thrown;
236    }
237
238    @Override
239    public V get() throws ExecutionException {
240      throw new ExecutionException(thrown);
241    }
242  }
243
244  @GwtIncompatible("TODO")
245  private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
246
247    private final CancellationException thrown;
248
249    ImmediateCancelledFuture() {
250      this.thrown = new CancellationException("Immediate cancelled future.");
251    }
252
253    @Override
254    public boolean isCancelled() {
255      return true;
256    }
257
258    @Override
259    public V get() {
260      throw AbstractFuture.cancellationExceptionWithCause(
261          "Task was cancelled.", thrown);
262    }
263  }
264
265  @GwtIncompatible("TODO")
266  private static class ImmediateFailedCheckedFuture<V, X extends Exception>
267      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
268
269    private final X thrown;
270
271    ImmediateFailedCheckedFuture(X thrown) {
272      this.thrown = thrown;
273    }
274
275    @Override
276    public V get() throws ExecutionException {
277      throw new ExecutionException(thrown);
278    }
279
280    @Override
281    public V checkedGet() throws X {
282      throw thrown;
283    }
284
285    @Override
286    public V checkedGet(long timeout, TimeUnit unit) throws X {
287      checkNotNull(unit);
288      throw thrown;
289    }
290  }
291
292  /**
293   * Creates a {@code ListenableFuture} which has its value set immediately upon
294   * construction. The getters just return the value. This {@code Future} can't
295   * be canceled or timed out and its {@code isDone()} method always returns
296   * {@code true}.
297   */
298  @CheckReturnValue
299  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
300    if (value == null) {
301      // This cast is safe because null is assignable to V for all V (i.e. it is covariant)
302      @SuppressWarnings({"unchecked", "rawtypes"})
303      ListenableFuture<V> typedNull = (ListenableFuture) ImmediateSuccessfulFuture.NULL;
304      return typedNull;
305    }
306    return new ImmediateSuccessfulFuture<V>(value);
307  }
308
309  /**
310   * Returns a {@code CheckedFuture} which has its value set immediately upon
311   * construction.
312   *
313   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
314   * method always returns {@code true}. Calling {@code get()} or {@code
315   * checkedGet()} will immediately return the provided value.
316   */
317  @GwtIncompatible("TODO")
318  @CheckReturnValue
319  public static <V, X extends Exception> CheckedFuture<V, X>
320      immediateCheckedFuture(@Nullable V value) {
321    return new ImmediateSuccessfulCheckedFuture<V, X>(value);
322  }
323
324  /**
325   * Returns a {@code ListenableFuture} which has an exception set immediately
326   * upon construction.
327   *
328   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
329   * method always returns {@code true}. Calling {@code get()} will immediately
330   * throw the provided {@code Throwable} wrapped in an {@code
331   * ExecutionException}.
332   */
333  @CheckReturnValue
334  public static <V> ListenableFuture<V> immediateFailedFuture(
335      Throwable throwable) {
336    checkNotNull(throwable);
337    return new ImmediateFailedFuture<V>(throwable);
338  }
339
340  /**
341   * Creates a {@code ListenableFuture} which is cancelled immediately upon
342   * construction, so that {@code isCancelled()} always returns {@code true}.
343   *
344   * @since 14.0
345   */
346  @GwtIncompatible("TODO")
347  @CheckReturnValue
348  public static <V> ListenableFuture<V> immediateCancelledFuture() {
349    return new ImmediateCancelledFuture<V>();
350  }
351
352  /**
353   * Returns a {@code CheckedFuture} which has an exception set immediately upon
354   * construction.
355   *
356   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
357   * method always returns {@code true}. Calling {@code get()} will immediately
358   * throw the provided {@code Exception} wrapped in an {@code
359   * ExecutionException}, and calling {@code checkedGet()} will throw the
360   * provided exception itself.
361   */
362  @GwtIncompatible("TODO")
363  @CheckReturnValue
364  public static <V, X extends Exception> CheckedFuture<V, X>
365      immediateFailedCheckedFuture(X exception) {
366    checkNotNull(exception);
367    return new ImmediateFailedCheckedFuture<V, X>(exception);
368  }
369
370  /**
371   * Returns a {@code Future} whose result is taken from the given primary
372   * {@code input} or, if the primary input fails, from the {@code Future}
373   * provided by the {@code fallback}. {@link FutureFallback#create} is not
374   * invoked until the primary input has failed, so if the primary input
375   * succeeds, it is never invoked. If, during the invocation of {@code
376   * fallback}, an exception is thrown, this exception is used as the result of
377   * the output {@code Future}.
378   *
379   * <p>Below is an example of a fallback that returns a default value if an
380   * exception occurs:
381   *
382   * <pre>   {@code
383   *   ListenableFuture<Integer> fetchCounterFuture = ...;
384   *
385   *   // Falling back to a zero counter in case an exception happens when
386   *   // processing the RPC to fetch counters.
387   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
388   *       fetchCounterFuture, new FutureFallback<Integer>() {
389   *         public ListenableFuture<Integer> create(Throwable t) {
390   *           // Returning "0" as the default for the counter when the
391   *           // exception happens.
392   *           return immediateFuture(0);
393   *         }
394   *       });}</pre>
395   *
396   * <p>The fallback can also choose to propagate the original exception when
397   * desired:
398   *
399   * <pre>   {@code
400   *   ListenableFuture<Integer> fetchCounterFuture = ...;
401   *
402   *   // Falling back to a zero counter only in case the exception was a
403   *   // TimeoutException.
404   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
405   *       fetchCounterFuture, new FutureFallback<Integer>() {
406   *         public ListenableFuture<Integer> create(Throwable t) {
407   *           if (t instanceof TimeoutException) {
408   *             return immediateFuture(0);
409   *           }
410   *           return immediateFailedFuture(t);
411   *         }
412   *       });}</pre>
413   *
414   * <p>This overload, which does not accept an executor, uses {@code
415   * directExecutor}, a dangerous choice in some cases. See the discussion in
416   * the {@link ListenableFuture#addListener ListenableFuture.addListener}
417   * documentation. The documentation's warnings about "lightweight listeners"
418   * refer here to the work done during {@code FutureFallback.create}, not to
419   * any work done to complete the returned {@code Future}.
420   *
421   * @param input the primary input {@code Future}
422   * @param fallback the {@link FutureFallback} implementation to be called if
423   *     {@code input} fails
424   * @since 14.0
425   * @deprecated Use {@link #catchingAsync(ListenableFuture, Class,
426   *     AsyncFunction) catchingAsync(input, Throwable.class,
427   *     fallbackImplementedAsAnAsyncFunction)}, usually replacing {@code
428   *     Throwable.class} with the specific type you want to handle. This method
429   *     will be removed in Guava release 20.0.
430   */
431  @Deprecated
432  @CheckReturnValue
433  public static <V> ListenableFuture<V> withFallback(
434      ListenableFuture<? extends V> input,
435      FutureFallback<? extends V> fallback) {
436    return withFallback(input, fallback, directExecutor());
437  }
438
439  /**
440   * Returns a {@code Future} whose result is taken from the given primary
441   * {@code input} or, if the primary input fails, from the {@code Future}
442   * provided by the {@code fallback}. {@link FutureFallback#create} is not
443   * invoked until the primary input has failed, so if the primary input
444   * succeeds, it is never invoked. If, during the invocation of {@code
445   * fallback}, an exception is thrown, this exception is used as the result of
446   * the output {@code Future}.
447   *
448   * <p>Below is an example of a fallback that returns a default value if an
449   * exception occurs:
450   *
451   * <pre>   {@code
452   *   ListenableFuture<Integer> fetchCounterFuture = ...;
453   *
454   *   // Falling back to a zero counter in case an exception happens when
455   *   // processing the RPC to fetch counters.
456   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
457   *       fetchCounterFuture, new FutureFallback<Integer>() {
458   *         public ListenableFuture<Integer> create(Throwable t) {
459   *           // Returning "0" as the default for the counter when the
460   *           // exception happens.
461   *           return immediateFuture(0);
462   *         }
463   *       }, directExecutor());}</pre>
464   *
465   * <p>The fallback can also choose to propagate the original exception when
466   * desired:
467   *
468   * <pre>   {@code
469   *   ListenableFuture<Integer> fetchCounterFuture = ...;
470   *
471   *   // Falling back to a zero counter only in case the exception was a
472   *   // TimeoutException.
473   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
474   *       fetchCounterFuture, new FutureFallback<Integer>() {
475   *         public ListenableFuture<Integer> create(Throwable t) {
476   *           if (t instanceof TimeoutException) {
477   *             return immediateFuture(0);
478   *           }
479   *           return immediateFailedFuture(t);
480   *         }
481   *       }, directExecutor());}</pre>
482   *
483   * <p>When selecting an executor, note that {@code directExecutor} is
484   * dangerous in some cases. See the discussion in the {@link
485   * ListenableFuture#addListener ListenableFuture.addListener} documentation.
486   * The documentation's warnings about "lightweight listeners" refer here to
487   * the work done during {@code FutureFallback.create}, not to any work done to
488   * complete the returned {@code Future}.
489   *
490   * @param input the primary input {@code Future}
491   * @param fallback the {@link FutureFallback} implementation to be called if
492   *     {@code input} fails
493   * @param executor the executor that runs {@code fallback} if {@code input}
494   *     fails
495   * @since 14.0
496   * @deprecated Use {@link #catchingAsync(ListenableFuture, Class,
497   *     AsyncFunction, Executor) catchingAsync(input, Throwable.class,
498   *     fallbackImplementedAsAnAsyncFunction, executor)}, usually replacing
499   *     {@code Throwable.class} with the specific type you want to handle. This method
500   *     will be removed in Guava release 20.0.
501   */
502  @Deprecated
503  @CheckReturnValue
504  public static <V> ListenableFuture<V> withFallback(
505      ListenableFuture<? extends V> input,
506      FutureFallback<? extends V> fallback, Executor executor) {
507    return catchingAsync(
508        input, Throwable.class, asAsyncFunction(fallback), executor);
509  }
510
511  /**
512   * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the
513   * primary input fails with the given {@code exceptionType}, from the result provided by the
514   * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so
515   * if the primary input succeeds, it is never invoked. If, during the invocation of {@code
516   * fallback}, an exception is thrown, this exception is used as the result of the output {@code
517   * Future}.
518   *
519   * <p>Usage example:
520   *
521   * <pre>   {@code
522   *   ListenableFuture<Integer> fetchCounterFuture = ...;
523   *
524   *   // Falling back to a zero counter in case an exception happens when
525   *   // processing the RPC to fetch counters.
526   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catching(
527   *       fetchCounterFuture, FetchException.class,
528   *       new Function<FetchException, Integer>() {
529   *         public Integer apply(FetchException e) {
530   *           return 0;
531   *         }
532   *       });}</pre>
533   *
534   * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous
535   * choice in some cases. See the discussion in the {@link ListenableFuture#addListener
536   * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight
537   * listeners" refer here to the work done during {@code Function.apply}.
538   *
539   * @param input the primary input {@code Future}
540   * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding
541   *     bugs and other unrecoverable errors, callers should prefer more specific types, avoiding
542   *     {@code Throwable.class} in particular.
543   * @param fallback the {@link Function} implementation to be called if {@code input} fails with
544   *     the expected exception type
545   * @since 19.0
546   */
547  @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
548  @CheckReturnValue
549  public static <V, X extends Throwable> ListenableFuture<V> catching(
550      ListenableFuture<? extends V> input, Class<X> exceptionType,
551      Function<? super X, ? extends V> fallback) {
552    CatchingFuture<V, X> future = new CatchingFuture<V, X>(input, exceptionType, fallback);
553    input.addListener(future, directExecutor());
554    return future;
555  }
556
557  /**
558   * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the
559   * primary input fails with the given {@code exceptionType}, from the result provided by the
560   * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so
561   * if the primary input succeeds, it is never invoked. If, during the invocation of {@code
562   * fallback}, an exception is thrown, this exception is used as the result of the output {@code
563   * Future}.
564   *
565   * <p>Usage example:
566   *
567   * <pre>   {@code
568   *   ListenableFuture<Integer> fetchCounterFuture = ...;
569   *
570   *   // Falling back to a zero counter in case an exception happens when
571   *   // processing the RPC to fetch counters.
572   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catching(
573   *       fetchCounterFuture, FetchException.class,
574   *       new Function<FetchException, Integer>() {
575   *         public Integer apply(FetchException e) {
576   *           return 0;
577   *         }
578   *       }, directExecutor());}</pre>
579   *
580   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
581   * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
582   * documentation. The documentation's warnings about "lightweight listeners" refer here to the
583   * work done during {@code Function.apply}.
584   *
585   * @param input the primary input {@code Future}
586   * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding
587   *     bugs and other unrecoverable errors, callers should prefer more specific types, avoiding
588   *     {@code Throwable.class} in particular.
589   * @param fallback the {@link Function} implementation to be called if {@code input} fails with
590   *     the expected exception type
591   * @param executor the executor that runs {@code fallback} if {@code input} fails
592   * @since 19.0
593   */
594  @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
595  @CheckReturnValue
596  public static <V, X extends Throwable> ListenableFuture<V> catching(
597      ListenableFuture<? extends V> input, Class<X> exceptionType,
598      Function<? super X, ? extends V> fallback, Executor executor) {
599    CatchingFuture<V, X> future = new CatchingFuture<V, X>(input, exceptionType, fallback);
600    input.addListener(future, rejectionPropagatingExecutor(executor, future));
601    return future;
602  }
603
604  /**
605   * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the
606   * primary input fails with the given {@code exceptionType}, from the result provided by the
607   * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has
608   * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of
609   * {@code fallback}, an exception is thrown, this exception is used as the result of the output
610   * {@code Future}.
611   *
612   * <p>Usage examples:
613   *
614   * <pre>   {@code
615   *   ListenableFuture<Integer> fetchCounterFuture = ...;
616   *
617   *   // Falling back to a zero counter in case an exception happens when
618   *   // processing the RPC to fetch counters.
619   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync(
620   *       fetchCounterFuture, FetchException.class,
621   *       new AsyncFunction<FetchException, Integer>() {
622   *         public ListenableFuture<Integer> apply(FetchException e) {
623   *           return immediateFuture(0);
624   *         }
625   *       });}</pre>
626   *
627   * <p>The fallback can also choose to propagate the original exception when desired:
628   *
629   * <pre>   {@code
630   *   ListenableFuture<Integer> fetchCounterFuture = ...;
631   *
632   *   // Falling back to a zero counter only in case the exception was a
633   *   // TimeoutException.
634   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync(
635   *       fetchCounterFuture, FetchException.class,
636   *       new AsyncFunction<FetchException, Integer>() {
637   *         public ListenableFuture<Integer> apply(FetchException e)
638   *             throws FetchException {
639   *           if (omitDataOnFetchFailure) {
640   *             return immediateFuture(0);
641   *           }
642   *           throw e;
643   *         }
644   *       });}</pre>
645   *
646   * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous
647   * choice in some cases. See the discussion in the {@link ListenableFuture#addListener
648   * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight
649   * listeners" refer here to the work done during {@code AsyncFunction.apply}, not to any work done
650   * to complete the returned {@code Future}.
651   *
652   * @param input the primary input {@code Future}
653   * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding
654   *     bugs and other unrecoverable errors, callers should prefer more specific types, avoiding
655   *     {@code Throwable.class} in particular.
656   * @param fallback the {@link AsyncFunction} implementation to be called if {@code input} fails
657   *     with the expected exception type
658   * @since 19.0 (similar functionality in 14.0 as {@code withFallback})
659   */
660  @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
661  // TODO(kak): @CheckReturnValue
662  public static <V, X extends Throwable> ListenableFuture<V> catchingAsync(
663      ListenableFuture<? extends V> input, Class<X> exceptionType,
664      AsyncFunction<? super X, ? extends V> fallback) {
665    AsyncCatchingFuture<V, X> future =
666        new AsyncCatchingFuture<V, X>(input, exceptionType, fallback);
667    input.addListener(future, directExecutor());
668    return future;
669  }
670
671  /**
672   * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the
673   * primary input fails with the given {@code exceptionType}, from the result provided by the
674   * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has
675   * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of
676   * {@code fallback}, an exception is thrown, this exception is used as the result of the output
677   * {@code Future}.
678   *
679   * <p>Usage examples:
680   *
681   * <pre>   {@code
682   *   ListenableFuture<Integer> fetchCounterFuture = ...;
683   *
684   *   // Falling back to a zero counter in case an exception happens when
685   *   // processing the RPC to fetch counters.
686   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync(
687   *       fetchCounterFuture, FetchException.class,
688   *       new AsyncFunction<FetchException, Integer>() {
689   *         public ListenableFuture<Integer> apply(FetchException e) {
690   *           return immediateFuture(0);
691   *         }
692   *       }, directExecutor());}</pre>
693   *
694   * <p>The fallback can also choose to propagate the original exception when desired:
695   *
696   * <pre>   {@code
697   *   ListenableFuture<Integer> fetchCounterFuture = ...;
698   *
699   *   // Falling back to a zero counter only in case the exception was a
700   *   // TimeoutException.
701   *   ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync(
702   *       fetchCounterFuture, FetchException.class,
703   *       new AsyncFunction<FetchException, Integer>() {
704   *         public ListenableFuture<Integer> apply(FetchException e)
705   *             throws FetchException {
706   *           if (omitDataOnFetchFailure) {
707   *             return immediateFuture(0);
708   *           }
709   *           throw e;
710   *         }
711   *       }, directExecutor());}</pre>
712   *
713   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
714   * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
715   * documentation. The documentation's warnings about "lightweight listeners" refer here to the
716   * work done during {@code AsyncFunction.apply}, not to any work done to complete the returned
717   * {@code Future}.
718   *
719   * @param input the primary input {@code Future}
720   * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding
721   *     bugs and other unrecoverable errors, callers should prefer more specific types, avoiding
722   *     {@code Throwable.class} in particular.
723   * @param fallback the {@link AsyncFunction} implementation to be called if {@code input} fails
724   *     with the expected exception type
725   * @param executor the executor that runs {@code fallback} if {@code input} fails
726   * @since 19.0 (similar functionality in 14.0 as {@code withFallback})
727   */
728  @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
729  // TODO(kak): @CheckReturnValue
730  public static <V, X extends Throwable> ListenableFuture<V> catchingAsync(
731      ListenableFuture<? extends V> input, Class<X> exceptionType,
732      AsyncFunction<? super X, ? extends V> fallback, Executor executor) {
733    AsyncCatchingFuture<V, X> future =
734        new AsyncCatchingFuture<V, X>(input, exceptionType, fallback);
735    input.addListener(future, rejectionPropagatingExecutor(executor, future));
736    return future;
737  }
738
739  @Deprecated
740  static <V> AsyncFunction<Throwable, V> asAsyncFunction(final FutureFallback<V> fallback) {
741    checkNotNull(fallback);
742    return new AsyncFunction<Throwable, V>() {
743      @Override
744      public ListenableFuture<V> apply(Throwable t) throws Exception {
745        return checkNotNull(fallback.create(t), "FutureFallback.create returned null instead of a "
746            + "Future. Did you mean to return immediateFuture(null)?");
747      }
748    };
749  }
750
751  private abstract static class AbstractCatchingFuture<V, X extends Throwable, F>
752      extends AbstractFuture.TrustedFuture<V> implements Runnable {
753    @Nullable ListenableFuture<? extends V> inputFuture;
754    @Nullable Class<X> exceptionType;
755    @Nullable F fallback;
756
757    AbstractCatchingFuture(
758        ListenableFuture<? extends V> inputFuture, Class<X> exceptionType, F fallback) {
759      this.inputFuture = checkNotNull(inputFuture);
760      this.exceptionType = checkNotNull(exceptionType);
761      this.fallback = checkNotNull(fallback);
762    }
763
764    @Override public final void run() {
765      ListenableFuture<? extends V> localInputFuture = inputFuture;
766      Class<X> localExceptionType = exceptionType;
767      F localFallback = fallback;
768      if (localInputFuture == null | localExceptionType == null | localFallback == null
769          | isCancelled()) {
770        return;
771      }
772      inputFuture = null;
773      exceptionType = null;
774      fallback = null;
775
776      Throwable throwable;
777      try {
778        set(getUninterruptibly(localInputFuture));
779        return;
780      } catch (ExecutionException e) {
781        throwable = e.getCause();
782      } catch (Throwable e) {  // this includes cancellation exception
783        throwable = e;
784      }
785      try {
786        if (isInstanceOfThrowableClass(throwable, localExceptionType)) {
787          @SuppressWarnings("unchecked") // verified safe by isInstance
788          X castThrowable = (X) throwable;
789          doFallback(localFallback, castThrowable);
790        } else {
791          setException(throwable);
792        }
793      } catch (Throwable e) {
794        setException(e);
795      }
796    }
797
798    /** Template method for subtypes to actually run the fallback. */
799    abstract void doFallback(F fallback, X throwable) throws Exception;
800
801    @Override final void done() {
802      maybePropagateCancellation(inputFuture);
803      this.inputFuture = null;
804      this.exceptionType = null;
805      this.fallback = null;
806    }
807  }
808
809  /**
810   * A {@link AbstractCatchingFuture} that delegates to an {@link AsyncFunction}
811   * and {@link #setFuture(ListenableFuture)} to implement {@link #doFallback}
812   */
813  static final class AsyncCatchingFuture<V, X extends Throwable>
814      extends AbstractCatchingFuture<V, X, AsyncFunction<? super X, ? extends V>> {
815
816    AsyncCatchingFuture(ListenableFuture<? extends V> input, Class<X> exceptionType,
817        AsyncFunction<? super X, ? extends V> fallback) {
818      super(input, exceptionType, fallback);
819    }
820
821    @Override void doFallback(
822        AsyncFunction<? super X, ? extends V> fallback, X cause) throws Exception {
823      ListenableFuture<? extends V> replacement = fallback.apply(cause);
824      checkNotNull(replacement, "AsyncFunction.apply returned null instead of a Future. "
825          + "Did you mean to return immediateFuture(null)?");
826      setFuture(replacement);
827    }
828  }
829
830  /**
831   * A {@link AbstractCatchingFuture} that delegates to a {@link Function}
832   * and {@link #set(Object)} to implement {@link #doFallback}
833   */
834  static final class CatchingFuture<V, X extends Throwable>
835      extends AbstractCatchingFuture<V, X, Function<? super X, ? extends V>> {
836    CatchingFuture(ListenableFuture<? extends V> input, Class<X> exceptionType,
837        Function<? super X, ? extends V> fallback) {
838      super(input, exceptionType, fallback);
839    }
840
841    @Override void doFallback(Function<? super X, ? extends V> fallback, X cause) throws Exception {
842      V replacement = fallback.apply(cause);
843      set(replacement);
844    }
845  }
846
847  /**
848   * Returns a future that delegates to another but will finish early (via a
849   * {@link TimeoutException} wrapped in an {@link ExecutionException}) if the
850   * specified duration expires.
851   *
852   * <p>The delegate future is interrupted and cancelled if it times out.
853   *
854   * @param delegate The future to delegate to.
855   * @param time when to timeout the future
856   * @param unit the time unit of the time parameter
857   * @param scheduledExecutor The executor service to enforce the timeout.
858   *
859   * @since 19.0
860   */
861  @GwtIncompatible("java.util.concurrent.ScheduledExecutorService")
862  @CheckReturnValue
863  public static <V> ListenableFuture<V> withTimeout(ListenableFuture<V> delegate,
864      long time, TimeUnit unit, ScheduledExecutorService scheduledExecutor) {
865    TimeoutFuture<V> result = new TimeoutFuture<V>(delegate);
866    TimeoutFuture.Fire<V> fire = new TimeoutFuture.Fire<V>(result);
867    result.timer = scheduledExecutor.schedule(fire, time, unit);
868    delegate.addListener(fire, directExecutor());
869    return result;
870  }
871
872  /**
873   * Future that delegates to another but will finish early (via a {@link
874   * TimeoutException} wrapped in an {@link ExecutionException}) if the
875   * specified duration expires.
876   * The delegate future is interrupted and cancelled if it times out.
877   */
878  private static final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture<V> {
879    // Memory visibility of these fields.
880    // There are two cases to consider.
881    // 1. visibility of the writes to these fields to Fire.run
882    //    The initial write to delegateRef is made definitely visible via the semantics of
883    //    addListener/SES.schedule.  The later racy write in cancel() is not guaranteed to be
884    //    observed, however that is fine since the correctness is based on the atomic state in
885    //    our base class.
886    //    The initial write to timer is never definitely visible to Fire.run since it is assigned
887    //    after SES.schedule is called. Therefore Fire.run has to check for null.  However, it
888    //    should be visible if Fire.run is called by delegate.addListener since addListener is
889    //    called after the assignment to timer, and importantly this is the main situation in which
890    //    we need to be able to see the write.
891    // 2. visibility of the writes to cancel
892    //    Since these fields are non-final that means that TimeoutFuture is not being 'safely
893    //    published', thus a motivated caller may be able to expose the reference to another thread
894    //    that would then call cancel() and be unable to cancel the delegate.
895    //    There are a number of ways to solve this, none of which are very pretty, and it is
896    //    currently believed to be a purely theoretical problem (since the other actions should
897    //    supply sufficient write-barriers).
898
899    @Nullable ListenableFuture<V> delegateRef;
900    @Nullable Future<?> timer;
901
902    TimeoutFuture(ListenableFuture<V> delegate) {
903      this.delegateRef = Preconditions.checkNotNull(delegate);
904    }
905
906    /** A runnable that is called when the delegate or the timer completes. */
907    private static final class Fire<V> implements Runnable {
908      @Nullable TimeoutFuture<V> timeoutFutureRef;
909
910      Fire(TimeoutFuture<V> timeoutFuture) {
911        this.timeoutFutureRef = timeoutFuture;
912      }
913
914      @Override public void run() {
915        // If either of these reads return null then we must be after a successful cancel
916        // or another call to this method.
917        TimeoutFuture<V> timeoutFuture = timeoutFutureRef;
918        if (timeoutFuture == null) {
919          return;
920        }
921        ListenableFuture<V> delegate = timeoutFuture.delegateRef;
922        if (delegate == null) {
923          return;
924        }
925
926        /*
927         * If we're about to complete the TimeoutFuture, we want to release our reference to it.
928         * Otherwise, we'll pin it (and its result) in memory until the timeout task is GCed. (The
929         * need to clear our reference to the TimeoutFuture is the reason we use a *static* nested
930         * class with a manual reference back to the "containing" class.)
931         *
932         * This has the nice-ish side effect of limiting reentrancy: run() calls
933         * timeoutFuture.setException() calls run(). That reentrancy would already be harmless,
934         * since timeoutFuture can be set (and delegate cancelled) only once. (And "set only once"
935         * is important for other reasons: run() can still be invoked concurrently in different
936         * threads, even with the above null checks.)
937         */
938        timeoutFutureRef = null;
939        if (delegate.isDone()) {
940          timeoutFuture.setFuture(delegate);
941        } else {
942          try {
943            // TODO(lukes): this stack trace is particularly useless (all it does is point at the
944            // scheduledexecutorservice thread), consider eliminating it altogether?
945            timeoutFuture.setException(new TimeoutException("Future timed out: " + delegate));
946          } finally {
947            delegate.cancel(true);
948          }
949        }
950      }
951    }
952
953    @Override void done() {
954      maybePropagateCancellation(delegateRef);
955
956      Future<?> localTimer = timer;
957      // Try to cancel the timer as an optimization
958      // timer may be null if this call to run was by the timer task since there is no
959      // happens-before edge between the assignment to timer and an execution of the timer task.
960      if (localTimer != null) {
961        localTimer.cancel(false);
962      }
963
964      delegateRef = null;
965      timer = null;
966    }
967  }
968
969  /**
970   * Returns a new {@code ListenableFuture} whose result is asynchronously
971   * derived from the result of the given {@code Future}. More precisely, the
972   * returned {@code Future} takes its result from a {@code Future} produced by
973   * applying the given {@code AsyncFunction} to the result of the original
974   * {@code Future}. Example:
975   *
976   * <pre>   {@code
977   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
978   *   AsyncFunction<RowKey, QueryResult> queryFunction =
979   *       new AsyncFunction<RowKey, QueryResult>() {
980   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
981   *           return dataService.read(rowKey);
982   *         }
983   *       };
984   *   ListenableFuture<QueryResult> queryFuture =
985   *       transform(rowKeyFuture, queryFunction);}</pre>
986   *
987   * <p>This overload, which does not accept an executor, uses {@code
988   * directExecutor}, a dangerous choice in some cases. See the discussion in
989   * the {@link ListenableFuture#addListener ListenableFuture.addListener}
990   * documentation. The documentation's warnings about "lightweight listeners"
991   * refer here to the work done during {@code AsyncFunction.apply}, not to any
992   * work done to complete the returned {@code Future}.
993   *
994   * <p>The returned {@code Future} attempts to keep its cancellation state in
995   * sync with that of the input future and that of the future returned by the
996   * function. That is, if the returned {@code Future} is cancelled, it will
997   * attempt to cancel the other two, and if either of the other two is
998   * cancelled, the returned {@code Future} will receive a callback in which it
999   * will attempt to cancel itself.
1000   *
1001   * @param input The future to transform
1002   * @param function A function to transform the result of the input future
1003   *     to the result of the output future
1004   * @return A future that holds result of the function (if the input succeeded)
1005   *     or the original input's failure (if not)
1006   * @since 11.0
1007   * @deprecated These {@code AsyncFunction} overloads of {@code transform} are
1008   *     being renamed to {@code transformAsync}. (The {@code Function}
1009   *     overloads are keeping the "transform" name.) This method will be removed in Guava release
1010   *     20.0.
1011   */
1012  @Deprecated
1013  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
1014      AsyncFunction<? super I, ? extends O> function) {
1015    return transformAsync(input, function);
1016  }
1017
1018  /**
1019   * Returns a new {@code ListenableFuture} whose result is asynchronously
1020   * derived from the result of the given {@code Future}. More precisely, the
1021   * returned {@code Future} takes its result from a {@code Future} produced by
1022   * applying the given {@code AsyncFunction} to the result of the original
1023   * {@code Future}. Example:
1024   *
1025   * <pre>   {@code
1026   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
1027   *   AsyncFunction<RowKey, QueryResult> queryFunction =
1028   *       new AsyncFunction<RowKey, QueryResult>() {
1029   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
1030   *           return dataService.read(rowKey);
1031   *         }
1032   *       };
1033   *   ListenableFuture<QueryResult> queryFuture =
1034   *       transform(rowKeyFuture, queryFunction, executor);}</pre>
1035   *
1036   * <p>When selecting an executor, note that {@code directExecutor} is
1037   * dangerous in some cases. See the discussion in the {@link
1038   * ListenableFuture#addListener ListenableFuture.addListener} documentation.
1039   * The documentation's warnings about "lightweight listeners" refer here to
1040   * the work done during {@code AsyncFunction.apply}, not to any work done to
1041   * complete the returned {@code Future}.
1042   *
1043   * <p>The returned {@code Future} attempts to keep its cancellation state in
1044   * sync with that of the input future and that of the future returned by the
1045   * chain function. That is, if the returned {@code Future} is cancelled, it
1046   * will attempt to cancel the other two, and if either of the other two is
1047   * cancelled, the returned {@code Future} will receive a callback in which it
1048   * will attempt to cancel itself.
1049   *
1050   * @param input The future to transform
1051   * @param function A function to transform the result of the input future
1052   *     to the result of the output future
1053   * @param executor Executor to run the function in.
1054   * @return A future that holds result of the function (if the input succeeded)
1055   *     or the original input's failure (if not)
1056   * @since 11.0
1057   * @deprecated These {@code AsyncFunction} overloads of {@code transform} are
1058   *     being renamed to {@code transformAsync}. (The {@code Function}
1059   *     overloads are keeping the "transform" name.) This method will be removed in Guava release
1060   *     20.0.
1061   */
1062  @Deprecated
1063  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
1064      AsyncFunction<? super I, ? extends O> function,
1065      Executor executor) {
1066    return transformAsync(input, function, executor);
1067  }
1068
1069  /**
1070   * Returns a new {@code ListenableFuture} whose result is asynchronously derived from the result
1071   * of the given {@code Future}. More precisely, the returned {@code Future} takes its result from
1072   * a {@code Future} produced by applying the given {@code AsyncFunction} to the result of the
1073   * original {@code Future}. Example:
1074   *
1075   * <pre>   {@code
1076   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
1077   *   AsyncFunction<RowKey, QueryResult> queryFunction =
1078   *       new AsyncFunction<RowKey, QueryResult>() {
1079   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
1080   *           return dataService.read(rowKey);
1081   *         }
1082   *       };
1083   *   ListenableFuture<QueryResult> queryFuture =
1084   *       transformAsync(rowKeyFuture, queryFunction);}</pre>
1085   *
1086   * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous
1087   * choice in some cases. See the discussion in the {@link ListenableFuture#addListener
1088   * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight
1089   * listeners" refer here to the work done during {@code AsyncFunction.apply}, not to any work done
1090   * to complete the returned {@code Future}.
1091   *
1092   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
1093   * input future and that of the future returned by the function. That is, if the returned {@code
1094   * Future} is cancelled, it will attempt to cancel the other two, and if either of the other two
1095   * is cancelled, the returned {@code Future} will receive a callback in which it will attempt to
1096   * cancel itself.
1097   *
1098   * @param input The future to transform
1099   * @param function A function to transform the result of the input future to the result of the
1100   *     output future
1101   * @return A future that holds result of the function (if the input succeeded) or the original
1102   *     input's failure (if not)
1103   * @since 19.0 (in 11.0 as {@code transform})
1104   */
1105  public static <I, O> ListenableFuture<O> transformAsync(
1106      ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) {
1107    AsyncChainingFuture<I, O> output = new AsyncChainingFuture<I, O>(input, function);
1108    input.addListener(output, directExecutor());
1109    return output;
1110  }
1111
1112  /**
1113   * Returns a new {@code ListenableFuture} whose result is asynchronously derived from the result
1114   * of the given {@code Future}. More precisely, the returned {@code Future} takes its result from
1115   * a {@code Future} produced by applying the given {@code AsyncFunction} to the result of the
1116   * original {@code Future}. Example:
1117   *
1118   * <pre>   {@code
1119   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
1120   *   AsyncFunction<RowKey, QueryResult> queryFunction =
1121   *       new AsyncFunction<RowKey, QueryResult>() {
1122   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
1123   *           return dataService.read(rowKey);
1124   *         }
1125   *       };
1126   *   ListenableFuture<QueryResult> queryFuture =
1127   *       transformAsync(rowKeyFuture, queryFunction, executor);}</pre>
1128   *
1129   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
1130   * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener}
1131   * documentation. The documentation's warnings about "lightweight listeners" refer here to the
1132   * work done during {@code AsyncFunction.apply}, not to any work done to complete the returned
1133   * {@code Future}.
1134   *
1135   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
1136   * input future and that of the future returned by the chain function. That is, if the returned
1137   * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the
1138   * other two is cancelled, the returned {@code Future} will receive a callback in which it will
1139   * attempt to cancel itself.
1140   *
1141   * @param input The future to transform
1142   * @param function A function to transform the result of the input future to the result of the
1143   *     output future
1144   * @param executor Executor to run the function in.
1145   * @return A future that holds result of the function (if the input succeeded) or the original
1146   *     input's failure (if not)
1147   * @since 19.0 (in 11.0 as {@code transform})
1148   */
1149  public static <I, O> ListenableFuture<O> transformAsync(ListenableFuture<I> input,
1150      AsyncFunction<? super I, ? extends O> function, Executor executor) {
1151    checkNotNull(executor);
1152    AsyncChainingFuture<I, O> output = new AsyncChainingFuture<I, O>(input, function);
1153    input.addListener(output, rejectionPropagatingExecutor(executor, output));
1154    return output;
1155  }
1156
1157  /**
1158   * Returns an Executor that will propagate {@link RejectedExecutionException} from the delegate
1159   * executor to the given {@code future}.
1160   *
1161   * <p>Note, the returned executor can only be used once.
1162   */
1163  private static Executor rejectionPropagatingExecutor(
1164      final Executor delegate, final AbstractFuture<?> future) {
1165    checkNotNull(delegate);
1166    if (delegate == directExecutor()) {
1167      // directExecutor() cannot throw RejectedExecutionException
1168      return delegate;
1169    }
1170    return new Executor() {
1171      volatile boolean thrownFromDelegate = true;
1172      @Override public void execute(final Runnable command) {
1173        try {
1174          delegate.execute(new Runnable() {
1175            @Override public void run() {
1176              thrownFromDelegate = false;
1177              command.run();
1178            }
1179          });
1180        } catch (RejectedExecutionException e) {
1181          if (thrownFromDelegate) {
1182            // wrap exception?
1183            future.setException(e);
1184          }
1185          // otherwise it must have been thrown from a transitive call and the delegate runnable
1186          // should have handled it.
1187        }
1188      }
1189    };
1190  }
1191
1192  /**
1193   * Returns a new {@code ListenableFuture} whose result is the product of
1194   * applying the given {@code Function} to the result of the given {@code
1195   * Future}. Example:
1196   *
1197   * <pre>   {@code
1198   *   ListenableFuture<QueryResult> queryFuture = ...;
1199   *   Function<QueryResult, List<Row>> rowsFunction =
1200   *       new Function<QueryResult, List<Row>>() {
1201   *         public List<Row> apply(QueryResult queryResult) {
1202   *           return queryResult.getRows();
1203   *         }
1204   *       };
1205   *   ListenableFuture<List<Row>> rowsFuture =
1206   *       transform(queryFuture, rowsFunction);}</pre>
1207   *
1208   * <p>This overload, which does not accept an executor, uses {@code
1209   * directExecutor}, a dangerous choice in some cases. See the discussion in
1210   * the {@link ListenableFuture#addListener ListenableFuture.addListener}
1211   * documentation. The documentation's warnings about "lightweight listeners"
1212   * refer here to the work done during {@code Function.apply}.
1213   *
1214   * <p>The returned {@code Future} attempts to keep its cancellation state in
1215   * sync with that of the input future. That is, if the returned {@code Future}
1216   * is cancelled, it will attempt to cancel the input, and if the input is
1217   * cancelled, the returned {@code Future} will receive a callback in which it
1218   * will attempt to cancel itself.
1219   *
1220   * <p>An example use of this method is to convert a serializable object
1221   * returned from an RPC into a POJO.
1222   *
1223   * @param input The future to transform
1224   * @param function A Function to transform the results of the provided future
1225   *     to the results of the returned future.  This will be run in the thread
1226   *     that notifies input it is complete.
1227   * @return A future that holds result of the transformation.
1228   * @since 9.0 (in 1.0 as {@code compose})
1229   */
1230  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
1231      final Function<? super I, ? extends O> function) {
1232    checkNotNull(function);
1233    ChainingFuture<I, O> output = new ChainingFuture<I, O>(input, function);
1234    input.addListener(output, directExecutor());
1235    return output;
1236  }
1237
1238  /**
1239   * Returns a new {@code ListenableFuture} whose result is the product of
1240   * applying the given {@code Function} to the result of the given {@code
1241   * Future}. Example:
1242   *
1243   * <pre>   {@code
1244   *   ListenableFuture<QueryResult> queryFuture = ...;
1245   *   Function<QueryResult, List<Row>> rowsFunction =
1246   *       new Function<QueryResult, List<Row>>() {
1247   *         public List<Row> apply(QueryResult queryResult) {
1248   *           return queryResult.getRows();
1249   *         }
1250   *       };
1251   *   ListenableFuture<List<Row>> rowsFuture =
1252   *       transform(queryFuture, rowsFunction, executor);}</pre>
1253   *
1254   * <p>When selecting an executor, note that {@code directExecutor} is
1255   * dangerous in some cases. See the discussion in the {@link
1256   * ListenableFuture#addListener ListenableFuture.addListener} documentation.
1257   * The documentation's warnings about "lightweight listeners" refer here to
1258   * the work done during {@code Function.apply}.
1259   *
1260   * <p>The returned {@code Future} attempts to keep its cancellation state in
1261   * sync with that of the input future. That is, if the returned {@code Future}
1262   * is cancelled, it will attempt to cancel the input, and if the input is
1263   * cancelled, the returned {@code Future} will receive a callback in which it
1264   * will attempt to cancel itself.
1265   *
1266   * <p>An example use of this method is to convert a serializable object
1267   * returned from an RPC into a POJO.
1268   *
1269   * @param input The future to transform
1270   * @param function A Function to transform the results of the provided future
1271   *     to the results of the returned future.
1272   * @param executor Executor to run the function in.
1273   * @return A future that holds result of the transformation.
1274   * @since 9.0 (in 2.0 as {@code compose})
1275   */
1276  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
1277      final Function<? super I, ? extends O> function, Executor executor) {
1278    checkNotNull(function);
1279    ChainingFuture<I, O> output = new ChainingFuture<I, O>(input, function);
1280    input.addListener(output, rejectionPropagatingExecutor(executor, output));
1281    return output;
1282  }
1283
1284  /**
1285   * Like {@link #transform(ListenableFuture, Function)} except that the
1286   * transformation {@code function} is invoked on each call to
1287   * {@link Future#get() get()} on the returned future.
1288   *
1289   * <p>The returned {@code Future} reflects the input's cancellation
1290   * state directly, and any attempt to cancel the returned Future is likewise
1291   * passed through to the input Future.
1292   *
1293   * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
1294   * only apply the timeout to the execution of the underlying {@code Future},
1295   * <em>not</em> to the execution of the transformation function.
1296   *
1297   * <p>The primary audience of this method is callers of {@code transform}
1298   * who don't have a {@code ListenableFuture} available and
1299   * do not mind repeated, lazy function evaluation.
1300   *
1301   * @param input The future to transform
1302   * @param function A Function to transform the results of the provided future
1303   *     to the results of the returned future.
1304   * @return A future that returns the result of the transformation.
1305   * @since 10.0
1306   */
1307  @GwtIncompatible("TODO")
1308  @CheckReturnValue
1309  public static <I, O> Future<O> lazyTransform(final Future<I> input,
1310      final Function<? super I, ? extends O> function) {
1311    checkNotNull(input);
1312    checkNotNull(function);
1313    return new Future<O>() {
1314
1315      @Override
1316      public boolean cancel(boolean mayInterruptIfRunning) {
1317        return input.cancel(mayInterruptIfRunning);
1318      }
1319
1320      @Override
1321      public boolean isCancelled() {
1322        return input.isCancelled();
1323      }
1324
1325      @Override
1326      public boolean isDone() {
1327        return input.isDone();
1328      }
1329
1330      @Override
1331      public O get() throws InterruptedException, ExecutionException {
1332        return applyTransformation(input.get());
1333      }
1334
1335      @Override
1336      public O get(long timeout, TimeUnit unit)
1337          throws InterruptedException, ExecutionException, TimeoutException {
1338        return applyTransformation(input.get(timeout, unit));
1339      }
1340
1341      private O applyTransformation(I input) throws ExecutionException {
1342        try {
1343          return function.apply(input);
1344        } catch (Throwable t) {
1345          throw new ExecutionException(t);
1346        }
1347      }
1348    };
1349  }
1350
1351  /**
1352   * An implementation of {@code ListenableFuture} that also implements
1353   * {@code Runnable} so that it can be used to nest ListenableFutures.
1354   * Once the passed-in {@code ListenableFuture} is complete, it calls the
1355   * passed-in {@code Function} to generate the result.
1356   *
1357   * <p>For historical reasons, this class has a special case in its exception
1358   * handling: If the given {@code AsyncFunction} throws an {@code
1359   * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it
1360   * and uses its <i>cause</i> as the output future's exception, rather than
1361   * using the {@code UndeclaredThrowableException} itself as it would for other
1362   * exception types. The reason for this is that {@code Futures.transform} used
1363   * to require a {@code Function}, whose {@code apply} method is not allowed to
1364   * throw checked exceptions. Nowadays, {@code Futures.transform} has an
1365   * overload that accepts an {@code AsyncFunction}, whose {@code apply} method
1366   * <i>is</i> allowed to throw checked exception. Users who wish to throw
1367   * checked exceptions should use that overload instead, and <a
1368   * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we
1369   * should remove the {@code UndeclaredThrowableException} special case</a>.
1370   */
1371  private abstract static class AbstractChainingFuture<I, O, F>
1372      extends AbstractFuture.TrustedFuture<O> implements Runnable {
1373    // In theory, this field might not be visible to a cancel() call in certain circumstances. For
1374    // details, see the comments on the fields of TimeoutFuture.
1375    @Nullable ListenableFuture<? extends I> inputFuture;
1376    @Nullable F function;
1377
1378    AbstractChainingFuture(ListenableFuture<? extends I> inputFuture, F function) {
1379      this.inputFuture = checkNotNull(inputFuture);
1380      this.function = checkNotNull(function);
1381    }
1382
1383    @Override
1384    public final void run() {
1385      try {
1386        ListenableFuture<? extends I> localInputFuture = inputFuture;
1387        F localFunction = function;
1388        if (isCancelled() | localInputFuture == null | localFunction == null) {
1389          return;
1390        }
1391        inputFuture = null;
1392        function = null;
1393
1394        I sourceResult;
1395        try {
1396          sourceResult = getUninterruptibly(localInputFuture);
1397        } catch (CancellationException e) {
1398          // Cancel this future and return.
1399          // At this point, inputFuture is cancelled and outputFuture doesn't
1400          // exist, so the value of mayInterruptIfRunning is irrelevant.
1401          cancel(false);
1402          return;
1403        } catch (ExecutionException e) {
1404          // Set the cause of the exception as this future's exception
1405          setException(e.getCause());
1406          return;
1407        }
1408        doTransform(localFunction, sourceResult);
1409      } catch (UndeclaredThrowableException e) {
1410        // Set the cause of the exception as this future's exception
1411        setException(e.getCause());
1412      } catch (Throwable t) {
1413        // This exception is irrelevant in this thread, but useful for the
1414        // client
1415        setException(t);
1416      }
1417    }
1418
1419    /** Template method for subtypes to actually run the transform. */
1420    abstract void doTransform(F function, I result) throws Exception;
1421
1422    @Override final void done() {
1423      maybePropagateCancellation(inputFuture);
1424      this.inputFuture = null;
1425      this.function = null;
1426    }
1427  }
1428
1429  /**
1430   * A {@link AbstractChainingFuture} that delegates to an {@link AsyncFunction} and
1431   * {@link #setFuture(ListenableFuture)} to implement {@link #doTransform}.
1432   */
1433  private static final class AsyncChainingFuture<I, O>
1434      extends AbstractChainingFuture<I, O, AsyncFunction<? super I, ? extends O>> {
1435    AsyncChainingFuture(ListenableFuture<? extends I> inputFuture,
1436        AsyncFunction<? super I, ? extends O> function) {
1437      super(inputFuture, function);
1438    }
1439
1440    @Override
1441    void doTransform(AsyncFunction<? super I, ? extends O> function, I input) throws Exception {
1442      ListenableFuture<? extends O> outputFuture = function.apply(input);
1443      checkNotNull(outputFuture, "AsyncFunction.apply returned null instead of a Future. "
1444          + "Did you mean to return immediateFuture(null)?");
1445      setFuture(outputFuture);
1446    }
1447  }
1448
1449  /**
1450   * A {@link AbstractChainingFuture} that delegates to a {@link Function} and
1451   * {@link #set(Object)} to implement {@link #doTransform}.
1452   */
1453  private static final class ChainingFuture<I, O>
1454      extends AbstractChainingFuture<I, O, Function<? super I, ? extends O>> {
1455
1456    ChainingFuture(ListenableFuture<? extends I> inputFuture,
1457        Function<? super I, ? extends O> function) {
1458      super(inputFuture, function);
1459    }
1460
1461    @Override
1462    void doTransform(Function<? super I, ? extends O> function, I input) {
1463      // TODO(lukes): move the UndeclaredThrowable catch block here?
1464      set(function.apply(input));
1465    }
1466  }
1467
1468  /**
1469   * Returns a new {@code ListenableFuture} whose result is the product of
1470   * calling {@code get()} on the {@code Future} nested within the given {@code
1471   * Future}, effectively chaining the futures one after the other.  Example:
1472   *
1473   * <pre>   {@code
1474   *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
1475   *   ListenableFuture<String> dereferenced = dereference(nested);}</pre>
1476   *
1477   * <p>This call has the same cancellation and execution semantics as {@link
1478   * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
1479   * Future} attempts to keep its cancellation state in sync with both the
1480   * input {@code Future} and the nested {@code Future}.  The transformation
1481   * is very lightweight and therefore takes place in the same thread (either
1482   * the thread that called {@code dereference}, or the thread in which the
1483   * dereferenced future completes).
1484   *
1485   * @param nested The nested future to transform.
1486   * @return A future that holds result of the inner future.
1487   * @since 13.0
1488   */
1489  @SuppressWarnings({"rawtypes", "unchecked"})
1490  @CheckReturnValue
1491  public static <V> ListenableFuture<V> dereference(
1492      ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
1493    return transformAsync((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
1494  }
1495
1496  /**
1497   * Helper {@code Function} for {@link #dereference}.
1498   */
1499  private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
1500      new AsyncFunction<ListenableFuture<Object>, Object>() {
1501        @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
1502          return input;
1503        }
1504      };
1505
1506  /**
1507   * Creates a new {@code ListenableFuture} whose value is a list containing the
1508   * values of all its input futures, if all succeed. If any input fails, the
1509   * returned future fails immediately.
1510   *
1511   * <p>The list of results is in the same order as the input list.
1512   *
1513   * <p>Canceling this future will attempt to cancel all the component futures,
1514   * and if any of the provided futures fails or is canceled, this one is,
1515   * too.
1516   *
1517   * @param futures futures to combine
1518   * @return a future that provides a list of the results of the component
1519   *         futures
1520   * @since 10.0
1521   */
1522  @Beta
1523  @SafeVarargs
1524  @CheckReturnValue
1525  public static <V> ListenableFuture<List<V>> allAsList(
1526      ListenableFuture<? extends V>... futures) {
1527    return new ListFuture<V>(ImmutableList.copyOf(futures), true);
1528  }
1529
1530  /**
1531   * Creates a new {@code ListenableFuture} whose value is a list containing the
1532   * values of all its input futures, if all succeed. If any input fails, the
1533   * returned future fails immediately.
1534   *
1535   * <p>The list of results is in the same order as the input list.
1536   *
1537   * <p>Canceling this future will attempt to cancel all the component futures,
1538   * and if any of the provided futures fails or is canceled, this one is,
1539   * too.
1540   *
1541   * @param futures futures to combine
1542   * @return a future that provides a list of the results of the component
1543   *         futures
1544   * @since 10.0
1545   */
1546  @Beta
1547  @CheckReturnValue
1548  public static <V> ListenableFuture<List<V>> allAsList(
1549      Iterable<? extends ListenableFuture<? extends V>> futures) {
1550    return new ListFuture<V>(ImmutableList.copyOf(futures), true);
1551  }
1552
1553  /**
1554   * Creates a new {@code ListenableFuture} whose result is set from the
1555   * supplied future when it completes.  Cancelling the supplied future
1556   * will also cancel the returned future, but cancelling the returned
1557   * future will have no effect on the supplied future.
1558   *
1559   * @since 15.0
1560   */
1561  @GwtIncompatible("TODO")
1562  @CheckReturnValue
1563  public static <V> ListenableFuture<V> nonCancellationPropagating(
1564      ListenableFuture<V> future) {
1565    return new NonCancellationPropagatingFuture<V>(future);
1566  }
1567
1568  /**
1569   * A wrapped future that does not propagate cancellation to its delegate.
1570   */
1571  @GwtIncompatible("TODO")
1572  private static final class NonCancellationPropagatingFuture<V>
1573      extends AbstractFuture.TrustedFuture<V> {
1574    NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) {
1575      delegate.addListener(new Runnable() {
1576        @Override public void run() {
1577          // This prevents cancellation from propagating because we don't assign delegate until
1578          // delegate is already done, so calling cancel() on it is a no-op.
1579          setFuture(delegate);
1580        }
1581      }, directExecutor());
1582    }
1583  }
1584
1585  /**
1586   * Creates a new {@code ListenableFuture} whose value is a list containing the
1587   * values of all its successful input futures. The list of results is in the
1588   * same order as the input list, and if any of the provided futures fails or
1589   * is canceled, its corresponding position will contain {@code null} (which is
1590   * indistinguishable from the future having a successful value of
1591   * {@code null}).
1592   *
1593   * <p>Canceling this future will attempt to cancel all the component futures.
1594   *
1595   * @param futures futures to combine
1596   * @return a future that provides a list of the results of the component
1597   *         futures
1598   * @since 10.0
1599   */
1600  @Beta
1601  @SafeVarargs
1602  @CheckReturnValue
1603  public static <V> ListenableFuture<List<V>> successfulAsList(
1604      ListenableFuture<? extends V>... futures) {
1605    return new ListFuture<V>(ImmutableList.copyOf(futures), false);
1606  }
1607
1608  /**
1609   * Creates a new {@code ListenableFuture} whose value is a list containing the
1610   * values of all its successful input futures. The list of results is in the
1611   * same order as the input list, and if any of the provided futures fails or
1612   * is canceled, its corresponding position will contain {@code null} (which is
1613   * indistinguishable from the future having a successful value of
1614   * {@code null}).
1615   *
1616   * <p>Canceling this future will attempt to cancel all the component futures.
1617   *
1618   * @param futures futures to combine
1619   * @return a future that provides a list of the results of the component
1620   *         futures
1621   * @since 10.0
1622   */
1623  @Beta
1624  @CheckReturnValue
1625  public static <V> ListenableFuture<List<V>> successfulAsList(
1626      Iterable<? extends ListenableFuture<? extends V>> futures) {
1627    return new ListFuture<V>(ImmutableList.copyOf(futures), false);
1628  }
1629
1630  /**
1631   * Returns a list of delegate futures that correspond to the futures received in the order
1632   * that they complete. Delegate futures return the same value or throw the same exception
1633   * as the corresponding input future returns/throws.
1634   *
1635   * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
1636   * does not correspond to a specific input future until the appropriate number of input
1637   * futures have completed. At that point, it is too late to cancel the input future.
1638   * The input future's result, which cannot be stored into the cancelled delegate future,
1639   * is ignored.
1640   *
1641   * @since 17.0
1642   */
1643  @Beta
1644  @GwtIncompatible("TODO")
1645  @CheckReturnValue
1646  public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(
1647      Iterable<? extends ListenableFuture<? extends T>> futures) {
1648    // A CLQ may be overkill here.  We could save some pointers/memory by synchronizing on an
1649    // ArrayDeque
1650    final ConcurrentLinkedQueue<SettableFuture<T>> delegates =
1651        Queues.newConcurrentLinkedQueue();
1652    ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
1653    // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
1654    // atomically and therefore that each returned future is guaranteed to be in completion order.
1655    // N.B. there are some cases where the use of this executor could have possibly surprising
1656    // effects when input futures finish at approximately the same time _and_ the output futures
1657    // have directExecutor listeners. In this situation, the listeners may end up running on a
1658    // different thread than if they were attached to the corresponding input future.  We believe
1659    // this to be a negligible cost since:
1660    // 1. Using the directExecutor implies that your callback is safe to run on any thread.
1661    // 2. This would likely only be noticeable if you were doing something expensive or blocking on
1662    //    a directExecutor listener on one of the output futures which is an antipattern anyway.
1663    SerializingExecutor executor = new SerializingExecutor(directExecutor());
1664    for (final ListenableFuture<? extends T> future : futures) {
1665      SettableFuture<T> delegate = SettableFuture.create();
1666      // Must make sure to add the delegate to the queue first in case the future is already done
1667      delegates.add(delegate);
1668      future.addListener(new Runnable() {
1669        @Override public void run() {
1670          delegates.remove().setFuture(future);
1671        }
1672      }, executor);
1673      listBuilder.add(delegate);
1674    }
1675    return listBuilder.build();
1676  }
1677
1678  /**
1679   * Registers separate success and failure callbacks to be run when the {@code
1680   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1681   * complete} or, if the computation is already complete, immediately.
1682   *
1683   * <p>There is no guaranteed ordering of execution of callbacks, but any
1684   * callback added through this method is guaranteed to be called once the
1685   * computation is complete.
1686   *
1687   * Example: <pre> {@code
1688   * ListenableFuture<QueryResult> future = ...;
1689   * addCallback(future,
1690   *     new FutureCallback<QueryResult> {
1691   *       public void onSuccess(QueryResult result) {
1692   *         storeInCache(result);
1693   *       }
1694   *       public void onFailure(Throwable t) {
1695   *         reportError(t);
1696   *       }
1697   *     });}</pre>
1698   *
1699   * <p>This overload, which does not accept an executor, uses {@code
1700   * directExecutor}, a dangerous choice in some cases. See the discussion in
1701   * the {@link ListenableFuture#addListener ListenableFuture.addListener}
1702   * documentation.
1703   *
1704   * <p>For a more general interface to attach a completion listener to a
1705   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1706   *
1707   * @param future The future attach the callback to.
1708   * @param callback The callback to invoke when {@code future} is completed.
1709   * @since 10.0
1710   */
1711  public static <V> void addCallback(ListenableFuture<V> future,
1712      FutureCallback<? super V> callback) {
1713    addCallback(future, callback, directExecutor());
1714  }
1715
1716  /**
1717   * Registers separate success and failure callbacks to be run when the {@code
1718   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1719   * complete} or, if the computation is already complete, immediately.
1720   *
1721   * <p>The callback is run in {@code executor}.
1722   * There is no guaranteed ordering of execution of callbacks, but any
1723   * callback added through this method is guaranteed to be called once the
1724   * computation is complete.
1725   *
1726   * Example: <pre> {@code
1727   * ListenableFuture<QueryResult> future = ...;
1728   * Executor e = ...
1729   * addCallback(future,
1730   *     new FutureCallback<QueryResult> {
1731   *       public void onSuccess(QueryResult result) {
1732   *         storeInCache(result);
1733   *       }
1734   *       public void onFailure(Throwable t) {
1735   *         reportError(t);
1736   *       }
1737   *     }, e);}</pre>
1738   *
1739   * <p>When selecting an executor, note that {@code directExecutor} is
1740   * dangerous in some cases. See the discussion in the {@link
1741   * ListenableFuture#addListener ListenableFuture.addListener} documentation.
1742   *
1743   * <p>For a more general interface to attach a completion listener to a
1744   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1745   *
1746   * @param future The future attach the callback to.
1747   * @param callback The callback to invoke when {@code future} is completed.
1748   * @param executor The executor to run {@code callback} when the future
1749   *    completes.
1750   * @since 10.0
1751   */
1752  public static <V> void addCallback(final ListenableFuture<V> future,
1753      final FutureCallback<? super V> callback, Executor executor) {
1754    Preconditions.checkNotNull(callback);
1755    Runnable callbackListener = new Runnable() {
1756      @Override
1757      public void run() {
1758        final V value;
1759        try {
1760          // TODO(user): (Before Guava release), validate that this
1761          // is the thing for IE.
1762          value = getUninterruptibly(future);
1763        } catch (ExecutionException e) {
1764          callback.onFailure(e.getCause());
1765          return;
1766        } catch (RuntimeException e) {
1767          callback.onFailure(e);
1768          return;
1769        } catch (Error e) {
1770          callback.onFailure(e);
1771          return;
1772        }
1773        callback.onSuccess(value);
1774      }
1775    };
1776    future.addListener(callbackListener, executor);
1777  }
1778
1779  /**
1780   * Returns the result of {@link Future#get()}, converting most exceptions to a
1781   * new instance of the given checked exception type. This reduces boilerplate
1782   * for a common use of {@code Future} in which it is unnecessary to
1783   * programmatically distinguish between exception types or to extract other
1784   * information from the exception instance.
1785   *
1786   * <p>Exceptions from {@code Future.get} are treated as follows:
1787   * <ul>
1788   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1789   *     {@code X} if the cause is a checked exception, an {@link
1790   *     UncheckedExecutionException} if the cause is a {@code
1791   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1792   *     {@code Error}.
1793   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1794   *     restoring the interrupt).
1795   * <li>Any {@link CancellationException} is propagated untouched, as is any
1796   *     other {@link RuntimeException} (though {@code get} implementations are
1797   *     discouraged from throwing such exceptions).
1798   * </ul>
1799   *
1800   * <p>The overall principle is to continue to treat every checked exception as a
1801   * checked exception, every unchecked exception as an unchecked exception, and
1802   * every error as an error. In addition, the cause of any {@code
1803   * ExecutionException} is wrapped in order to ensure that the new stack trace
1804   * matches that of the current thread.
1805   *
1806   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1807   * public constructor that accepts zero or more arguments, all of type {@code
1808   * String} or {@code Throwable} (preferring constructors with at least one
1809   * {@code String}) and calling the constructor via reflection. If the
1810   * exception did not already have a cause, one is set by calling {@link
1811   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1812   * {@code IllegalArgumentException} is thrown.
1813   *
1814   * @throws X if {@code get} throws any checked exception except for an {@code
1815   *         ExecutionException} whose cause is not itself a checked exception
1816   * @throws UncheckedExecutionException if {@code get} throws an {@code
1817   *         ExecutionException} with a {@code RuntimeException} as its cause
1818   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1819   *         with an {@code Error} as its cause
1820   * @throws CancellationException if {@code get} throws a {@code
1821   *         CancellationException}
1822   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1823   *         RuntimeException} or does not have a suitable constructor
1824   * @since 10.0
1825   * @deprecated Use {@link #getChecked(Future, Class)}. This method will be
1826   *     removed in Guava release 20.0.
1827   */
1828  @Deprecated
1829  @GwtIncompatible("reflection")
1830  public static <V, X extends Exception> V get(
1831      Future<V> future, Class<X> exceptionClass) throws X {
1832    return getChecked(future, exceptionClass);
1833  }
1834
1835  /**
1836   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1837   * exceptions to a new instance of the given checked exception type. This
1838   * reduces boilerplate for a common use of {@code Future} in which it is
1839   * unnecessary to programmatically distinguish between exception types or to
1840   * extract other information from the exception instance.
1841   *
1842   * <p>Exceptions from {@code Future.get} are treated as follows:
1843   * <ul>
1844   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1845   *     {@code X} if the cause is a checked exception, an {@link
1846   *     UncheckedExecutionException} if the cause is a {@code
1847   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1848   *     {@code Error}.
1849   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1850   *     restoring the interrupt).
1851   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1852   * <li>Any {@link CancellationException} is propagated untouched, as is any
1853   *     other {@link RuntimeException} (though {@code get} implementations are
1854   *     discouraged from throwing such exceptions).
1855   * </ul>
1856   *
1857   * <p>The overall principle is to continue to treat every checked exception as a
1858   * checked exception, every unchecked exception as an unchecked exception, and
1859   * every error as an error. In addition, the cause of any {@code
1860   * ExecutionException} is wrapped in order to ensure that the new stack trace
1861   * matches that of the current thread.
1862   *
1863   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1864   * public constructor that accepts zero or more arguments, all of type {@code
1865   * String} or {@code Throwable} (preferring constructors with at least one
1866   * {@code String}) and calling the constructor via reflection. If the
1867   * exception did not already have a cause, one is set by calling {@link
1868   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1869   * {@code IllegalArgumentException} is thrown.
1870   *
1871   * @throws X if {@code get} throws any checked exception except for an {@code
1872   *         ExecutionException} whose cause is not itself a checked exception
1873   * @throws UncheckedExecutionException if {@code get} throws an {@code
1874   *         ExecutionException} with a {@code RuntimeException} as its cause
1875   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1876   *         with an {@code Error} as its cause
1877   * @throws CancellationException if {@code get} throws a {@code
1878   *         CancellationException}
1879   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1880   *         RuntimeException} or does not have a suitable constructor
1881   * @since 10.0
1882   * @deprecated Use {@link #getChecked(Future, Class, long, TimeUnit)}, noting
1883   *     the change in parameter order. This method will be removed in Guava
1884   *     release 20.0.
1885   */
1886  @Deprecated
1887  @GwtIncompatible("reflection")
1888  public static <V, X extends Exception> V get(
1889      Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1890      throws X {
1891    return getChecked(future, exceptionClass, timeout, unit);
1892  }
1893
1894  /**
1895   * Returns the result of {@link Future#get()}, converting most exceptions to a
1896   * new instance of the given checked exception type. This reduces boilerplate
1897   * for a common use of {@code Future} in which it is unnecessary to
1898   * programmatically distinguish between exception types or to extract other
1899   * information from the exception instance.
1900   *
1901   * <p>Exceptions from {@code Future.get} are treated as follows:
1902   * <ul>
1903   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1904   *     {@code X} if the cause is a checked exception, an {@link
1905   *     UncheckedExecutionException} if the cause is a {@code
1906   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1907   *     {@code Error}.
1908   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1909   *     restoring the interrupt).
1910   * <li>Any {@link CancellationException} is propagated untouched, as is any
1911   *     other {@link RuntimeException} (though {@code get} implementations are
1912   *     discouraged from throwing such exceptions).
1913   * </ul>
1914   *
1915   * <p>The overall principle is to continue to treat every checked exception as a
1916   * checked exception, every unchecked exception as an unchecked exception, and
1917   * every error as an error. In addition, the cause of any {@code
1918   * ExecutionException} is wrapped in order to ensure that the new stack trace
1919   * matches that of the current thread.
1920   *
1921   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1922   * public constructor that accepts zero or more arguments, all of type {@code
1923   * String} or {@code Throwable} (preferring constructors with at least one
1924   * {@code String}) and calling the constructor via reflection. If the
1925   * exception did not already have a cause, one is set by calling {@link
1926   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1927   * {@code IllegalArgumentException} is thrown.
1928   *
1929   * @throws X if {@code get} throws any checked exception except for an {@code
1930   *     ExecutionException} whose cause is not itself a checked exception
1931   * @throws UncheckedExecutionException if {@code get} throws an {@code
1932   *     ExecutionException} with a {@code RuntimeException} as its cause
1933   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1934   *     with an {@code Error} as its cause
1935   * @throws CancellationException if {@code get} throws a {@code
1936   *     CancellationException}
1937   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1938   *     RuntimeException} or does not have a suitable constructor
1939   * @since 19.0 (in 10.0 as {@code get})
1940   */
1941  @GwtIncompatible("reflection")
1942  public static <V, X extends Exception> V getChecked(
1943      Future<V> future, Class<X> exceptionClass) throws X {
1944    return FuturesGetChecked.getChecked(future, exceptionClass);
1945  }
1946
1947  /**
1948   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1949   * exceptions to a new instance of the given checked exception type. This
1950   * reduces boilerplate for a common use of {@code Future} in which it is
1951   * unnecessary to programmatically distinguish between exception types or to
1952   * extract other information from the exception instance.
1953   *
1954   * <p>Exceptions from {@code Future.get} are treated as follows:
1955   * <ul>
1956   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1957   *     {@code X} if the cause is a checked exception, an {@link
1958   *     UncheckedExecutionException} if the cause is a {@code
1959   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1960   *     {@code Error}.
1961   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1962   *     restoring the interrupt).
1963   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1964   * <li>Any {@link CancellationException} is propagated untouched, as is any
1965   *     other {@link RuntimeException} (though {@code get} implementations are
1966   *     discouraged from throwing such exceptions).
1967   * </ul>
1968   *
1969   * <p>The overall principle is to continue to treat every checked exception as a
1970   * checked exception, every unchecked exception as an unchecked exception, and
1971   * every error as an error. In addition, the cause of any {@code
1972   * ExecutionException} is wrapped in order to ensure that the new stack trace
1973   * matches that of the current thread.
1974   *
1975   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1976   * public constructor that accepts zero or more arguments, all of type {@code
1977   * String} or {@code Throwable} (preferring constructors with at least one
1978   * {@code String}) and calling the constructor via reflection. If the
1979   * exception did not already have a cause, one is set by calling {@link
1980   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1981   * {@code IllegalArgumentException} is thrown.
1982   *
1983   * @throws X if {@code get} throws any checked exception except for an {@code
1984   *     ExecutionException} whose cause is not itself a checked exception
1985   * @throws UncheckedExecutionException if {@code get} throws an {@code
1986   *     ExecutionException} with a {@code RuntimeException} as its cause
1987   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1988   *     with an {@code Error} as its cause
1989   * @throws CancellationException if {@code get} throws a {@code
1990   *     CancellationException}
1991   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1992   *     RuntimeException} or does not have a suitable constructor
1993   * @since 19.0 (in 10.0 as {@code get} and with different parameter order)
1994   */
1995  @GwtIncompatible("reflection")
1996  public static <V, X extends Exception> V getChecked(
1997      Future<V> future, Class<X> exceptionClass, long timeout, TimeUnit unit)
1998      throws X {
1999    return FuturesGetChecked.getChecked(future, exceptionClass, timeout, unit);
2000  }
2001
2002  /**
2003   * Returns the result of calling {@link Future#get()} uninterruptibly on a
2004   * task known not to throw a checked exception. This makes {@code Future} more
2005   * suitable for lightweight, fast-running tasks that, barring bugs in the
2006   * code, will not fail. This gives it exception-handling behavior similar to
2007   * that of {@code ForkJoinTask.join}.
2008   *
2009   * <p>Exceptions from {@code Future.get} are treated as follows:
2010   * <ul>
2011   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
2012   *     {@link UncheckedExecutionException} (if the cause is an {@code
2013   *     Exception}) or {@link ExecutionError} (if the cause is an {@code
2014   *     Error}).
2015   * <li>Any {@link InterruptedException} causes a retry of the {@code get}
2016   *     call. The interrupt is restored before {@code getUnchecked} returns.
2017   * <li>Any {@link CancellationException} is propagated untouched. So is any
2018   *     other {@link RuntimeException} ({@code get} implementations are
2019   *     discouraged from throwing such exceptions).
2020   * </ul>
2021   *
2022   * <p>The overall principle is to eliminate all checked exceptions: to loop to
2023   * avoid {@code InterruptedException}, to pass through {@code
2024   * CancellationException}, and to wrap any exception from the underlying
2025   * computation in an {@code UncheckedExecutionException} or {@code
2026   * ExecutionError}.
2027   *
2028   * <p>For an uninterruptible {@code get} that preserves other exceptions, see
2029   * {@link Uninterruptibles#getUninterruptibly(Future)}.
2030   *
2031   * @throws UncheckedExecutionException if {@code get} throws an {@code
2032   *         ExecutionException} with an {@code Exception} as its cause
2033   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
2034   *         with an {@code Error} as its cause
2035   * @throws CancellationException if {@code get} throws a {@code
2036   *         CancellationException}
2037   * @since 10.0
2038   */
2039  @GwtIncompatible("TODO")
2040  public static <V> V getUnchecked(Future<V> future) {
2041    checkNotNull(future);
2042    try {
2043      return getUninterruptibly(future);
2044    } catch (ExecutionException e) {
2045      wrapAndThrowUnchecked(e.getCause());
2046      throw new AssertionError();
2047    }
2048  }
2049
2050  @GwtIncompatible("TODO")
2051  private static void wrapAndThrowUnchecked(Throwable cause) {
2052    if (cause instanceof Error) {
2053      throw new ExecutionError((Error) cause);
2054    }
2055    /*
2056     * It's a non-Error, non-Exception Throwable. From my survey of such
2057     * classes, I believe that most users intended to extend Exception, so we'll
2058     * treat it like an Exception.
2059     */
2060    throw new UncheckedExecutionException(cause);
2061  }
2062
2063  /*
2064   * Arguably we don't need a timed getUnchecked because any operation slow
2065   * enough to require a timeout is heavyweight enough to throw a checked
2066   * exception and therefore be inappropriate to use with getUnchecked. Further,
2067   * it's not clear that converting the checked TimeoutException to a
2068   * RuntimeException -- especially to an UncheckedExecutionException, since it
2069   * wasn't thrown by the computation -- makes sense, and if we don't convert
2070   * it, the user still has to write a try-catch block.
2071   *
2072   * If you think you would use this method, let us know. You might also also
2073   * look into the Fork-Join framework:
2074   * http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
2075   */
2076
2077  /** Used for {@link #allAsList} and {@link #successfulAsList}. */
2078  private static final class ListFuture<V> extends CollectionFuture<V, List<V>> {
2079    ListFuture(ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
2080        boolean allMustSucceed) {
2081      init(new ListFutureRunningState(futures, allMustSucceed));
2082    }
2083
2084    private final class ListFutureRunningState extends CollectionFutureRunningState {
2085      ListFutureRunningState(ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
2086        boolean allMustSucceed) {
2087        super(futures, allMustSucceed);
2088      }
2089
2090      @Override
2091      public List<V> combine(List<Optional<V>> values) {
2092        List<V> result = Lists.newArrayList();
2093        for (Optional<V> element : values) {
2094          result.add(element != null ? element.orNull() : null);
2095        }
2096        return Collections.unmodifiableList(result);
2097      }
2098    }
2099  }
2100
2101  /**
2102   * A checked future that uses a function to map from exceptions to the
2103   * appropriate checked type.
2104   */
2105  @GwtIncompatible("TODO")
2106  private static class MappingCheckedFuture<V, X extends Exception> extends
2107      AbstractCheckedFuture<V, X> {
2108
2109    final Function<? super Exception, X> mapper;
2110
2111    MappingCheckedFuture(ListenableFuture<V> delegate,
2112        Function<? super Exception, X> mapper) {
2113      super(delegate);
2114
2115      this.mapper = checkNotNull(mapper);
2116    }
2117
2118    @Override
2119    protected X mapException(Exception e) {
2120      return mapper.apply(e);
2121    }
2122  }
2123}