001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024import static java.lang.Thread.currentThread;
025import static java.util.Arrays.asList;
026
027import com.google.common.annotations.Beta;
028import com.google.common.base.Function;
029import com.google.common.base.Optional;
030import com.google.common.base.Preconditions;
031import com.google.common.collect.ImmutableCollection;
032import com.google.common.collect.ImmutableList;
033import com.google.common.collect.Lists;
034import com.google.common.collect.Ordering;
035import com.google.common.collect.Sets;
036
037import java.lang.reflect.Constructor;
038import java.lang.reflect.InvocationTargetException;
039import java.lang.reflect.UndeclaredThrowableException;
040import java.util.Arrays;
041import java.util.Collections;
042import java.util.List;
043import java.util.Set;
044import java.util.concurrent.CancellationException;
045import java.util.concurrent.CountDownLatch;
046import java.util.concurrent.ExecutionException;
047import java.util.concurrent.Executor;
048import java.util.concurrent.Future;
049import java.util.concurrent.TimeUnit;
050import java.util.concurrent.TimeoutException;
051import java.util.concurrent.atomic.AtomicInteger;
052import java.util.logging.Level;
053import java.util.logging.Logger;
054
055import javax.annotation.Nullable;
056
057/**
058 * Static utility methods pertaining to the {@link Future} interface.
059 *
060 * <p>Many of these methods use the {@link ListenableFuture} API; consult the
061 * Guava User Guide article on <a href=
062 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
063 * {@code ListenableFuture}</a>.
064 *
065 * @author Kevin Bourrillion
066 * @author Nishant Thakkar
067 * @author Sven Mawson
068 * @since 1.0
069 */
070@Beta
071public final class Futures {
072  private Futures() {}
073
074  /**
075   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
076   * and a {@link Function} that maps from {@link Exception} instances into the
077   * appropriate checked type.
078   *
079   * <p>The given mapping function will be applied to an
080   * {@link InterruptedException}, a {@link CancellationException}, or an
081   * {@link ExecutionException}.
082   * See {@link Future#get()} for details on the exceptions thrown.
083   *
084   * @since 9.0 (source-compatible since 1.0)
085   */
086  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
087      ListenableFuture<V> future, Function<Exception, X> mapper) {
088    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
089  }
090
091  private abstract static class ImmediateFuture<V>
092      implements ListenableFuture<V> {
093
094    private static final Logger log =
095        Logger.getLogger(ImmediateFuture.class.getName());
096
097    @Override
098    public void addListener(Runnable listener, Executor executor) {
099      checkNotNull(listener, "Runnable was null.");
100      checkNotNull(executor, "Executor was null.");
101      try {
102        executor.execute(listener);
103      } catch (RuntimeException e) {
104        // ListenableFuture's contract is that it will not throw unchecked
105        // exceptions, so log the bad runnable and/or executor and swallow it.
106        log.log(Level.SEVERE, "RuntimeException while executing runnable "
107            + listener + " with executor " + executor, e);
108      }
109    }
110
111    @Override
112    public boolean cancel(boolean mayInterruptIfRunning) {
113      return false;
114    }
115
116    @Override
117    public abstract V get() throws ExecutionException;
118
119    @Override
120    public V get(long timeout, TimeUnit unit) throws ExecutionException {
121      checkNotNull(unit);
122      return get();
123    }
124
125    @Override
126    public boolean isCancelled() {
127      return false;
128    }
129
130    @Override
131    public boolean isDone() {
132      return true;
133    }
134  }
135
136  private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
137
138    @Nullable private final V value;
139
140    ImmediateSuccessfulFuture(@Nullable V value) {
141      this.value = value;
142    }
143
144    @Override
145    public V get() {
146      return value;
147    }
148  }
149
150  private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
151      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
152
153    @Nullable private final V value;
154
155    ImmediateSuccessfulCheckedFuture(@Nullable V value) {
156      this.value = value;
157    }
158
159    @Override
160    public V get() {
161      return value;
162    }
163
164    @Override
165    public V checkedGet() {
166      return value;
167    }
168
169    @Override
170    public V checkedGet(long timeout, TimeUnit unit) {
171      checkNotNull(unit);
172      return value;
173    }
174  }
175
176  private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
177
178    private final Throwable thrown;
179
180    ImmediateFailedFuture(Throwable thrown) {
181      this.thrown = thrown;
182    }
183
184    @Override
185    public V get() throws ExecutionException {
186      throw new ExecutionException(thrown);
187    }
188  }
189
190  private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
191
192    private final CancellationException thrown;
193
194    ImmediateCancelledFuture() {
195      this.thrown = new CancellationException("Immediate cancelled future.");
196    }
197
198    @Override
199    public boolean isCancelled() {
200      return true;
201    }
202
203    @Override
204    public V get() {
205      throw AbstractFuture.cancellationExceptionWithCause(
206          "Task was cancelled.", thrown);
207    }
208  }
209
210  private static class ImmediateFailedCheckedFuture<V, X extends Exception>
211      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
212
213    private final X thrown;
214
215    ImmediateFailedCheckedFuture(X thrown) {
216      this.thrown = thrown;
217    }
218
219    @Override
220    public V get() throws ExecutionException {
221      throw new ExecutionException(thrown);
222    }
223
224    @Override
225    public V checkedGet() throws X {
226      throw thrown;
227    }
228
229    @Override
230    public V checkedGet(long timeout, TimeUnit unit) throws X {
231      checkNotNull(unit);
232      throw thrown;
233    }
234  }
235
236  /**
237   * Creates a {@code ListenableFuture} which has its value set immediately upon
238   * construction. The getters just return the value. This {@code Future} can't
239   * be canceled or timed out and its {@code isDone()} method always returns
240   * {@code true}.
241   */
242  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
243    return new ImmediateSuccessfulFuture<V>(value);
244  }
245
246  /**
247   * Returns a {@code CheckedFuture} which has its value set immediately upon
248   * construction.
249   *
250   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
251   * method always returns {@code true}. Calling {@code get()} or {@code
252   * checkedGet()} will immediately return the provided value.
253   */
254  public static <V, X extends Exception> CheckedFuture<V, X>
255      immediateCheckedFuture(@Nullable V value) {
256    return new ImmediateSuccessfulCheckedFuture<V, X>(value);
257  }
258
259  /**
260   * Returns a {@code ListenableFuture} which has an exception set immediately
261   * upon construction.
262   *
263   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
264   * method always returns {@code true}. Calling {@code get()} will immediately
265   * throw the provided {@code Throwable} wrapped in an {@code
266   * ExecutionException}.
267   */
268  public static <V> ListenableFuture<V> immediateFailedFuture(
269      Throwable throwable) {
270    checkNotNull(throwable);
271    return new ImmediateFailedFuture<V>(throwable);
272  }
273
274  /**
275   * Creates a {@code ListenableFuture} which is cancelled immediately upon
276   * construction, so that {@code isCancelled()} always returns {@code true}.
277   *
278   * @since 14.0
279   */
280  public static <V> ListenableFuture<V> immediateCancelledFuture() {
281    return new ImmediateCancelledFuture<V>();
282  }
283
284  /**
285   * Returns a {@code CheckedFuture} which has an exception set immediately upon
286   * construction.
287   *
288   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
289   * method always returns {@code true}. Calling {@code get()} will immediately
290   * throw the provided {@code Exception} wrapped in an {@code
291   * ExecutionException}, and calling {@code checkedGet()} will throw the
292   * provided exception itself.
293   */
294  public static <V, X extends Exception> CheckedFuture<V, X>
295      immediateFailedCheckedFuture(X exception) {
296    checkNotNull(exception);
297    return new ImmediateFailedCheckedFuture<V, X>(exception);
298  }
299
300  /**
301   * Returns a {@code Future} whose result is taken from the given primary
302   * {@code input} or, if the primary input fails, from the {@code Future}
303   * provided by the {@code fallback}. {@link FutureFallback#create} is not
304   * invoked until the primary input has failed, so if the primary input
305   * succeeds, it is never invoked. If, during the invocation of {@code
306   * fallback}, an exception is thrown, this exception is used as the result of
307   * the output {@code Future}.
308   *
309   * <p>Below is an example of a fallback that returns a default value if an
310   * exception occurs:
311   *
312   * <pre>   {@code
313   *   ListenableFuture<Integer> fetchCounterFuture = ...;
314   *
315   *   // Falling back to a zero counter in case an exception happens when
316   *   // processing the RPC to fetch counters.
317   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
318   *       fetchCounterFuture, new FutureFallback<Integer>() {
319   *         public ListenableFuture<Integer> create(Throwable t) {
320   *           // Returning "0" as the default for the counter when the
321   *           // exception happens.
322   *           return immediateFuture(0);
323   *         }
324   *       });}</pre>
325   *
326   * <p>The fallback can also choose to propagate the original exception when
327   * desired:
328   *
329   * <pre>   {@code
330   *   ListenableFuture<Integer> fetchCounterFuture = ...;
331   *
332   *   // Falling back to a zero counter only in case the exception was a
333   *   // TimeoutException.
334   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
335   *       fetchCounterFuture, new FutureFallback<Integer>() {
336   *         public ListenableFuture<Integer> create(Throwable t) {
337   *           if (t instanceof TimeoutException) {
338   *             return immediateFuture(0);
339   *           }
340   *           return immediateFailedFuture(t);
341   *         }
342   *       });}</pre>
343   *
344   * <p>Note: If the derived {@code Future} is slow or heavyweight to create
345   * (whether the {@code Future} itself is slow or heavyweight to complete is
346   * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
347   * FutureFallback, Executor) supplying an executor}. If you do not supply an
348   * executor, {@code withFallback} will use {@link
349   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
350   * caveats for heavier operations. For example, the call to {@code
351   * fallback.create} may run on an unpredictable or undesirable thread:
352   *
353   * <ul>
354   * <li>If the input {@code Future} is done at the time {@code withFallback}
355   * is called, {@code withFallback} will call {@code fallback.create} inline.
356   * <li>If the input {@code Future} is not yet done, {@code withFallback} will
357   * schedule {@code fallback.create} to be run by the thread that completes
358   * the input {@code Future}, which may be an internal system thread such as
359   * an RPC network thread.
360   * </ul>
361   *
362   * <p>Also note that, regardless of which thread executes the {@code
363   * sameThreadExecutor} {@code fallback.create}, all other registered but
364   * unexecuted listeners are prevented from running during its execution, even
365   * if those listeners are to run in other executors.
366   *
367   * @param input the primary input {@code Future}
368   * @param fallback the {@link FutureFallback} implementation to be called if
369   *     {@code input} fails
370   * @since 14.0
371   */
372  public static <V> ListenableFuture<V> withFallback(
373      ListenableFuture<? extends V> input,
374      FutureFallback<? extends V> fallback) {
375    return withFallback(input, fallback, sameThreadExecutor());
376  }
377
378  /**
379   * Returns a {@code Future} whose result is taken from the given primary
380   * {@code input} or, if the primary input fails, from the {@code Future}
381   * provided by the {@code fallback}. {@link FutureFallback#create} is not
382   * invoked until the primary input has failed, so if the primary input
383   * succeeds, it is never invoked. If, during the invocation of {@code
384   * fallback}, an exception is thrown, this exception is used as the result of
385   * the output {@code Future}.
386   *
387   * <p>Below is an example of a fallback that returns a default value if an
388   * exception occurs:
389   *
390   * <pre>   {@code
391   *   ListenableFuture<Integer> fetchCounterFuture = ...;
392   *
393   *   // Falling back to a zero counter in case an exception happens when
394   *   // processing the RPC to fetch counters.
395   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
396   *       fetchCounterFuture, new FutureFallback<Integer>() {
397   *         public ListenableFuture<Integer> create(Throwable t) {
398   *           // Returning "0" as the default for the counter when the
399   *           // exception happens.
400   *           return immediateFuture(0);
401   *         }
402   *       }, sameThreadExecutor());}</pre>
403   *
404   * <p>The fallback can also choose to propagate the original exception when
405   * desired:
406   *
407   * <pre>   {@code
408   *   ListenableFuture<Integer> fetchCounterFuture = ...;
409   *
410   *   // Falling back to a zero counter only in case the exception was a
411   *   // TimeoutException.
412   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
413   *       fetchCounterFuture, new FutureFallback<Integer>() {
414   *         public ListenableFuture<Integer> create(Throwable t) {
415   *           if (t instanceof TimeoutException) {
416   *             return immediateFuture(0);
417   *           }
418   *           return immediateFailedFuture(t);
419   *         }
420   *       }, sameThreadExecutor());}</pre>
421   *
422   * <p>When the execution of {@code fallback.create} is fast and lightweight
423   * (though the {@code Future} it returns need not meet these criteria),
424   * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
425   * omitting the executor} or explicitly specifying {@code
426   * sameThreadExecutor}. However, be aware of the caveats documented in the
427   * link above.
428   *
429   * @param input the primary input {@code Future}
430   * @param fallback the {@link FutureFallback} implementation to be called if
431   *     {@code input} fails
432   * @param executor the executor that runs {@code fallback} if {@code input}
433   *     fails
434   * @since 14.0
435   */
436  public static <V> ListenableFuture<V> withFallback(
437      ListenableFuture<? extends V> input,
438      FutureFallback<? extends V> fallback, Executor executor) {
439    checkNotNull(fallback);
440    return new FallbackFuture<V>(input, fallback, executor);
441  }
442
443  /**
444   * A future that falls back on a second, generated future, in case its
445   * original future fails.
446   */
447  private static class FallbackFuture<V> extends AbstractFuture<V> {
448
449    private volatile ListenableFuture<? extends V> running;
450
451    FallbackFuture(ListenableFuture<? extends V> input,
452        final FutureFallback<? extends V> fallback,
453        final Executor executor) {
454      running = input;
455      addCallback(running, new FutureCallback<V>() {
456        @Override
457        public void onSuccess(V value) {
458          set(value);
459        }
460
461        @Override
462        public void onFailure(Throwable t) {
463          if (isCancelled()) {
464            return;
465          }
466          try {
467            running = fallback.create(t);
468            if (isCancelled()) { // in case cancel called in the meantime
469              running.cancel(wasInterrupted());
470              return;
471            }
472            addCallback(running, new FutureCallback<V>() {
473              @Override
474              public void onSuccess(V value) {
475                set(value);
476              }
477
478              @Override
479              public void onFailure(Throwable t) {
480                if (running.isCancelled()) {
481                  cancel(false);
482                } else {
483                  setException(t);
484                }
485              }
486            }, sameThreadExecutor());
487          } catch (Throwable e) {
488            setException(e);
489          }
490        }
491      }, executor);
492    }
493
494    @Override
495    public boolean cancel(boolean mayInterruptIfRunning) {
496      if (super.cancel(mayInterruptIfRunning)) {
497        running.cancel(mayInterruptIfRunning);
498        return true;
499      }
500      return false;
501    }
502  }
503
504  /**
505   * Returns a new {@code ListenableFuture} whose result is asynchronously
506   * derived from the result of the given {@code Future}. More precisely, the
507   * returned {@code Future} takes its result from a {@code Future} produced by
508   * applying the given {@code AsyncFunction} to the result of the original
509   * {@code Future}. Example:
510   *
511   * <pre>   {@code
512   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
513   *   AsyncFunction<RowKey, QueryResult> queryFunction =
514   *       new AsyncFunction<RowKey, QueryResult>() {
515   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
516   *           return dataService.read(rowKey);
517   *         }
518   *       };
519   *   ListenableFuture<QueryResult> queryFuture =
520   *       transform(rowKeyFuture, queryFunction);}</pre>
521   *
522   * <p>Note: If the derived {@code Future} is slow or heavyweight to create
523   * (whether the {@code Future} itself is slow or heavyweight to complete is
524   * irrelevant), consider {@linkplain #transform(ListenableFuture,
525   * AsyncFunction, Executor) supplying an executor}. If you do not supply an
526   * executor, {@code transform} will use {@link
527   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
528   * caveats for heavier operations. For example, the call to {@code
529   * function.apply} may run on an unpredictable or undesirable thread:
530   *
531   * <ul>
532   * <li>If the input {@code Future} is done at the time {@code transform} is
533   * called, {@code transform} will call {@code function.apply} inline.
534   * <li>If the input {@code Future} is not yet done, {@code transform} will
535   * schedule {@code function.apply} to be run by the thread that completes the
536   * input {@code Future}, which may be an internal system thread such as an
537   * RPC network thread.
538   * </ul>
539   *
540   * <p>Also note that, regardless of which thread executes the {@code
541   * sameThreadExecutor} {@code function.apply}, all other registered but
542   * unexecuted listeners are prevented from running during its execution, even
543   * if those listeners are to run in other executors.
544   *
545   * <p>The returned {@code Future} attempts to keep its cancellation state in
546   * sync with that of the input future and that of the future returned by the
547   * function. That is, if the returned {@code Future} is cancelled, it will
548   * attempt to cancel the other two, and if either of the other two is
549   * cancelled, the returned {@code Future} will receive a callback in which it
550   * will attempt to cancel itself.
551   *
552   * @param input The future to transform
553   * @param function A function to transform the result of the input future
554   *     to the result of the output future
555   * @return A future that holds result of the function (if the input succeeded)
556   *     or the original input's failure (if not)
557   * @since 11.0
558   */
559  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
560      AsyncFunction<? super I, ? extends O> function) {
561    return transform(input, function, MoreExecutors.sameThreadExecutor());
562  }
563
564  /**
565   * Returns a new {@code ListenableFuture} whose result is asynchronously
566   * derived from the result of the given {@code Future}. More precisely, the
567   * returned {@code Future} takes its result from a {@code Future} produced by
568   * applying the given {@code AsyncFunction} to the result of the original
569   * {@code Future}. Example:
570   *
571   * <pre>   {@code
572   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
573   *   AsyncFunction<RowKey, QueryResult> queryFunction =
574   *       new AsyncFunction<RowKey, QueryResult>() {
575   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
576   *           return dataService.read(rowKey);
577   *         }
578   *       };
579   *   ListenableFuture<QueryResult> queryFuture =
580   *       transform(rowKeyFuture, queryFunction, executor);}</pre>
581   *
582   * <p>The returned {@code Future} attempts to keep its cancellation state in
583   * sync with that of the input future and that of the future returned by the
584   * chain function. That is, if the returned {@code Future} is cancelled, it
585   * will attempt to cancel the other two, and if either of the other two is
586   * cancelled, the returned {@code Future} will receive a callback in which it
587   * will attempt to cancel itself.
588   *
589   * <p>When the execution of {@code function.apply} is fast and lightweight
590   * (though the {@code Future} it returns need not meet these criteria),
591   * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
592   * the executor} or explicitly specifying {@code sameThreadExecutor}.
593   * However, be aware of the caveats documented in the link above.
594   *
595   * @param input The future to transform
596   * @param function A function to transform the result of the input future
597   *     to the result of the output future
598   * @param executor Executor to run the function in.
599   * @return A future that holds result of the function (if the input succeeded)
600   *     or the original input's failure (if not)
601   * @since 11.0
602   */
603  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
604      AsyncFunction<? super I, ? extends O> function,
605      Executor executor) {
606    ChainingListenableFuture<I, O> output =
607        new ChainingListenableFuture<I, O>(function, input);
608    input.addListener(output, executor);
609    return output;
610  }
611
612  /**
613   * Returns a new {@code ListenableFuture} whose result is the product of
614   * applying the given {@code Function} to the result of the given {@code
615   * Future}. Example:
616   *
617   * <pre>   {@code
618   *   ListenableFuture<QueryResult> queryFuture = ...;
619   *   Function<QueryResult, List<Row>> rowsFunction =
620   *       new Function<QueryResult, List<Row>>() {
621   *         public List<Row> apply(QueryResult queryResult) {
622   *           return queryResult.getRows();
623   *         }
624   *       };
625   *   ListenableFuture<List<Row>> rowsFuture =
626   *       transform(queryFuture, rowsFunction);}</pre>
627   *
628   * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain
629   * #transform(ListenableFuture, Function, Executor) supplying an executor}.
630   * If you do not supply an executor, {@code transform} will use {@link
631   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
632   * caveats for heavier operations.  For example, the call to {@code
633   * function.apply} may run on an unpredictable or undesirable thread:
634   *
635   * <ul>
636   * <li>If the input {@code Future} is done at the time {@code transform} is
637   * called, {@code transform} will call {@code function.apply} inline.
638   * <li>If the input {@code Future} is not yet done, {@code transform} will
639   * schedule {@code function.apply} to be run by the thread that completes the
640   * input {@code Future}, which may be an internal system thread such as an
641   * RPC network thread.
642   * </ul>
643   *
644   * <p>Also note that, regardless of which thread executes the {@code
645   * sameThreadExecutor} {@code function.apply}, all other registered but
646   * unexecuted listeners are prevented from running during its execution, even
647   * if those listeners are to run in other executors.
648   *
649   * <p>The returned {@code Future} attempts to keep its cancellation state in
650   * sync with that of the input future. That is, if the returned {@code Future}
651   * is cancelled, it will attempt to cancel the input, and if the input is
652   * cancelled, the returned {@code Future} will receive a callback in which it
653   * will attempt to cancel itself.
654   *
655   * <p>An example use of this method is to convert a serializable object
656   * returned from an RPC into a POJO.
657   *
658   * @param input The future to transform
659   * @param function A Function to transform the results of the provided future
660   *     to the results of the returned future.  This will be run in the thread
661   *     that notifies input it is complete.
662   * @return A future that holds result of the transformation.
663   * @since 9.0 (in 1.0 as {@code compose})
664   */
665  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
666      final Function<? super I, ? extends O> function) {
667    return transform(input, function, MoreExecutors.sameThreadExecutor());
668  }
669
670  /**
671   * Returns a new {@code ListenableFuture} whose result is the product of
672   * applying the given {@code Function} to the result of the given {@code
673   * Future}. Example:
674   *
675   * <pre>   {@code
676   *   ListenableFuture<QueryResult> queryFuture = ...;
677   *   Function<QueryResult, List<Row>> rowsFunction =
678   *       new Function<QueryResult, List<Row>>() {
679   *         public List<Row> apply(QueryResult queryResult) {
680   *           return queryResult.getRows();
681   *         }
682   *       };
683   *   ListenableFuture<List<Row>> rowsFuture =
684   *       transform(queryFuture, rowsFunction, executor);}</pre>
685   *
686   * <p>The returned {@code Future} attempts to keep its cancellation state in
687   * sync with that of the input future. That is, if the returned {@code Future}
688   * is cancelled, it will attempt to cancel the input, and if the input is
689   * cancelled, the returned {@code Future} will receive a callback in which it
690   * will attempt to cancel itself.
691   *
692   * <p>An example use of this method is to convert a serializable object
693   * returned from an RPC into a POJO.
694   *
695   * <p>When the transformation is fast and lightweight, consider {@linkplain
696   * #transform(ListenableFuture, Function) omitting the executor} or
697   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
698   * caveats documented in the link above.
699   *
700   * @param input The future to transform
701   * @param function A Function to transform the results of the provided future
702   *     to the results of the returned future.
703   * @param executor Executor to run the function in.
704   * @return A future that holds result of the transformation.
705   * @since 9.0 (in 2.0 as {@code compose})
706   */
707  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
708      final Function<? super I, ? extends O> function, Executor executor) {
709    checkNotNull(function);
710    AsyncFunction<I, O> wrapperFunction
711        = new AsyncFunction<I, O>() {
712            @Override public ListenableFuture<O> apply(I input) {
713              O output = function.apply(input);
714              return immediateFuture(output);
715            }
716        };
717    return transform(input, wrapperFunction, executor);
718  }
719
720  /**
721   * Like {@link #transform(ListenableFuture, Function)} except that the
722   * transformation {@code function} is invoked on each call to
723   * {@link Future#get() get()} on the returned future.
724   *
725   * <p>The returned {@code Future} reflects the input's cancellation
726   * state directly, and any attempt to cancel the returned Future is likewise
727   * passed through to the input Future.
728   *
729   * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
730   * only apply the timeout to the execution of the underlying {@code Future},
731   * <em>not</em> to the execution of the transformation function.
732   *
733   * <p>The primary audience of this method is callers of {@code transform}
734   * who don't have a {@code ListenableFuture} available and
735   * do not mind repeated, lazy function evaluation.
736   *
737   * @param input The future to transform
738   * @param function A Function to transform the results of the provided future
739   *     to the results of the returned future.
740   * @return A future that returns the result of the transformation.
741   * @since 10.0
742   */
743  public static <I, O> Future<O> lazyTransform(final Future<I> input,
744      final Function<? super I, ? extends O> function) {
745    checkNotNull(input);
746    checkNotNull(function);
747    return new Future<O>() {
748
749      @Override
750      public boolean cancel(boolean mayInterruptIfRunning) {
751        return input.cancel(mayInterruptIfRunning);
752      }
753
754      @Override
755      public boolean isCancelled() {
756        return input.isCancelled();
757      }
758
759      @Override
760      public boolean isDone() {
761        return input.isDone();
762      }
763
764      @Override
765      public O get() throws InterruptedException, ExecutionException {
766        return applyTransformation(input.get());
767      }
768
769      @Override
770      public O get(long timeout, TimeUnit unit)
771          throws InterruptedException, ExecutionException, TimeoutException {
772        return applyTransformation(input.get(timeout, unit));
773      }
774
775      private O applyTransformation(I input) throws ExecutionException {
776        try {
777          return function.apply(input);
778        } catch (Throwable t) {
779          throw new ExecutionException(t);
780        }
781      }
782    };
783  }
784
785  /**
786   * An implementation of {@code ListenableFuture} that also implements
787   * {@code Runnable} so that it can be used to nest ListenableFutures.
788   * Once the passed-in {@code ListenableFuture} is complete, it calls the
789   * passed-in {@code Function} to generate the result.
790   *
791   * <p>For historical reasons, this class has a special case in its exception
792   * handling: If the given {@code AsyncFunction} throws an {@code
793   * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it
794   * and uses its <i>cause</i> as the output future's exception, rather than
795   * using the {@code UndeclaredThrowableException} itself as it would for other
796   * exception types. The reason for this is that {@code Futures.transform} used
797   * to require a {@code Function}, whose {@code apply} method is not allowed to
798   * throw checked exceptions. Nowadays, {@code Futures.transform} has an
799   * overload that accepts an {@code AsyncFunction}, whose {@code apply} method
800   * <i>is</i> allowed to throw checked exception. Users who wish to throw
801   * checked exceptions should use that overload instead, and <a
802   * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we
803   * should remove the {@code UndeclaredThrowableException} special case</a>.
804   */
805  private static class ChainingListenableFuture<I, O>
806      extends AbstractFuture<O> implements Runnable {
807
808    private AsyncFunction<? super I, ? extends O> function;
809    private ListenableFuture<? extends I> inputFuture;
810    private volatile ListenableFuture<? extends O> outputFuture;
811    private final CountDownLatch outputCreated = new CountDownLatch(1);
812
813    private ChainingListenableFuture(
814        AsyncFunction<? super I, ? extends O> function,
815        ListenableFuture<? extends I> inputFuture) {
816      this.function = checkNotNull(function);
817      this.inputFuture = checkNotNull(inputFuture);
818    }
819
820    @Override
821    public boolean cancel(boolean mayInterruptIfRunning) {
822      /*
823       * Our additional cancellation work needs to occur even if
824       * !mayInterruptIfRunning, so we can't move it into interruptTask().
825       */
826      if (super.cancel(mayInterruptIfRunning)) {
827        // This should never block since only one thread is allowed to cancel
828        // this Future.
829        cancel(inputFuture, mayInterruptIfRunning);
830        cancel(outputFuture, mayInterruptIfRunning);
831        return true;
832      }
833      return false;
834    }
835
836    private void cancel(@Nullable Future<?> future,
837        boolean mayInterruptIfRunning) {
838      if (future != null) {
839        future.cancel(mayInterruptIfRunning);
840      }
841    }
842
843    @Override
844    public void run() {
845      try {
846        I sourceResult;
847        try {
848          sourceResult = getUninterruptibly(inputFuture);
849        } catch (CancellationException e) {
850          // Cancel this future and return.
851          // At this point, inputFuture is cancelled and outputFuture doesn't
852          // exist, so the value of mayInterruptIfRunning is irrelevant.
853          cancel(false);
854          return;
855        } catch (ExecutionException e) {
856          // Set the cause of the exception as this future's exception
857          setException(e.getCause());
858          return;
859        }
860
861        final ListenableFuture<? extends O> outputFuture = this.outputFuture =
862            function.apply(sourceResult);
863        if (isCancelled()) {
864          outputFuture.cancel(wasInterrupted());
865          this.outputFuture = null;
866          return;
867        }
868        outputFuture.addListener(new Runnable() {
869            @Override
870            public void run() {
871              try {
872                set(getUninterruptibly(outputFuture));
873              } catch (CancellationException e) {
874                // Cancel this future and return.
875                // At this point, inputFuture and outputFuture are done, so the
876                // value of mayInterruptIfRunning is irrelevant.
877                cancel(false);
878                return;
879              } catch (ExecutionException e) {
880                // Set the cause of the exception as this future's exception
881                setException(e.getCause());
882              } finally {
883                // Don't pin inputs beyond completion
884                ChainingListenableFuture.this.outputFuture = null;
885              }
886            }
887          }, MoreExecutors.sameThreadExecutor());
888      } catch (UndeclaredThrowableException e) {
889        // Set the cause of the exception as this future's exception
890        setException(e.getCause());
891      } catch (Throwable t) {
892        // This exception is irrelevant in this thread, but useful for the
893        // client
894        setException(t);
895      } finally {
896        // Don't pin inputs beyond completion
897        function = null;
898        inputFuture = null;
899        // Allow our get routines to examine outputFuture now.
900        outputCreated.countDown();
901      }
902    }
903  }
904
905  /**
906   * Returns a new {@code ListenableFuture} whose result is the product of
907   * calling {@code get()} on the {@code Future} nested within the given {@code
908   * Future}, effectively chaining the futures one after the other.  Example:
909   *
910   * <pre>   {@code
911   *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
912   *   ListenableFuture<String> dereferenced = dereference(nested);}</pre>
913   *
914   * <p>This call has the same cancellation and execution semantics as {@link
915   * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
916   * Future} attempts to keep its cancellation state in sync with both the
917   * input {@code Future} and the nested {@code Future}.  The transformation
918   * is very lightweight and therefore takes place in the thread that called
919   * {@code dereference}.
920   *
921   * @param nested The nested future to transform.
922   * @return A future that holds result of the inner future.
923   * @since 13.0
924   */
925  @SuppressWarnings({"rawtypes", "unchecked"})
926  public static <V> ListenableFuture<V> dereference(
927      ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
928    return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
929  }
930
931  /**
932   * Helper {@code Function} for {@link #dereference}.
933   */
934  private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
935      new AsyncFunction<ListenableFuture<Object>, Object>() {
936        @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
937          return input;
938        }
939      };
940
941  /**
942   * Creates a new {@code ListenableFuture} whose value is a list containing the
943   * values of all its input futures, if all succeed. If any input fails, the
944   * returned future fails.
945   *
946   * <p>The list of results is in the same order as the input list.
947   *
948   * <p>Canceling this future will attempt to cancel all the component futures,
949   * and if any of the provided futures fails or is canceled, this one is,
950   * too.
951   *
952   * @param futures futures to combine
953   * @return a future that provides a list of the results of the component
954   *         futures
955   * @since 10.0
956   */
957  @Beta
958  public static <V> ListenableFuture<List<V>> allAsList(
959      ListenableFuture<? extends V>... futures) {
960    return listFuture(ImmutableList.copyOf(futures), true,
961        MoreExecutors.sameThreadExecutor());
962  }
963
964  /**
965   * Creates a new {@code ListenableFuture} whose value is a list containing the
966   * values of all its input futures, if all succeed. If any input fails, the
967   * returned future fails.
968   *
969   * <p>The list of results is in the same order as the input list.
970   *
971   * <p>Canceling this future will attempt to cancel all the component futures,
972   * and if any of the provided futures fails or is canceled, this one is,
973   * too.
974   *
975   * @param futures futures to combine
976   * @return a future that provides a list of the results of the component
977   *         futures
978   * @since 10.0
979   */
980  @Beta
981  public static <V> ListenableFuture<List<V>> allAsList(
982      Iterable<? extends ListenableFuture<? extends V>> futures) {
983    return listFuture(ImmutableList.copyOf(futures), true,
984        MoreExecutors.sameThreadExecutor());
985  }
986
987  /**
988   * Creates a new {@code ListenableFuture} whose result is set from the
989   * supplied future when it completes.  Cancelling the supplied future
990   * will also cancel the returned future, but cancelling the returned
991   * future will have no effect on the supplied future.
992   *
993   * @since 15.0
994   */
995  public static <V> ListenableFuture<V> nonCancellationPropagating(
996      ListenableFuture<V> future) {
997    return new NonCancellationPropagatingFuture<V>(future);
998  }
999
1000  /**
1001   * A wrapped future that does not propagate cancellation to its delegate.
1002   */
1003  private static class NonCancellationPropagatingFuture<V>
1004      extends AbstractFuture<V> {
1005    NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) {
1006      checkNotNull(delegate);
1007      addCallback(delegate, new FutureCallback<V>() {
1008        @Override
1009        public void onSuccess(V result) {
1010          set(result);
1011        }
1012
1013        @Override
1014        public void onFailure(Throwable t) {
1015          if (delegate.isCancelled()) {
1016            cancel(false);
1017          } else {
1018            setException(t);
1019          }
1020        }
1021      }, sameThreadExecutor());
1022    }
1023  }
1024
1025  /**
1026   * Creates a new {@code ListenableFuture} whose value is a list containing the
1027   * values of all its successful input futures. The list of results is in the
1028   * same order as the input list, and if any of the provided futures fails or
1029   * is canceled, its corresponding position will contain {@code null} (which is
1030   * indistinguishable from the future having a successful value of
1031   * {@code null}).
1032   *
1033   * <p>Canceling this future will attempt to cancel all the component futures.
1034   *
1035   * @param futures futures to combine
1036   * @return a future that provides a list of the results of the component
1037   *         futures
1038   * @since 10.0
1039   */
1040  @Beta
1041  public static <V> ListenableFuture<List<V>> successfulAsList(
1042      ListenableFuture<? extends V>... futures) {
1043    return listFuture(ImmutableList.copyOf(futures), false,
1044        MoreExecutors.sameThreadExecutor());
1045  }
1046
1047  /**
1048   * Creates a new {@code ListenableFuture} whose value is a list containing the
1049   * values of all its successful input futures. The list of results is in the
1050   * same order as the input list, and if any of the provided futures fails or
1051   * is canceled, its corresponding position will contain {@code null} (which is
1052   * indistinguishable from the future having a successful value of
1053   * {@code null}).
1054   *
1055   * <p>Canceling this future will attempt to cancel all the component futures.
1056   *
1057   * @param futures futures to combine
1058   * @return a future that provides a list of the results of the component
1059   *         futures
1060   * @since 10.0
1061   */
1062  @Beta
1063  public static <V> ListenableFuture<List<V>> successfulAsList(
1064      Iterable<? extends ListenableFuture<? extends V>> futures) {
1065    return listFuture(ImmutableList.copyOf(futures), false,
1066        MoreExecutors.sameThreadExecutor());
1067  }
1068
1069  /**
1070   * Registers separate success and failure callbacks to be run when the {@code
1071   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1072   * complete} or, if the computation is already complete, immediately.
1073   *
1074   * <p>There is no guaranteed ordering of execution of callbacks, but any
1075   * callback added through this method is guaranteed to be called once the
1076   * computation is complete.
1077   *
1078   * Example: <pre> {@code
1079   * ListenableFuture<QueryResult> future = ...;
1080   * addCallback(future,
1081   *     new FutureCallback<QueryResult> {
1082   *       public void onSuccess(QueryResult result) {
1083   *         storeInCache(result);
1084   *       }
1085   *       public void onFailure(Throwable t) {
1086   *         reportError(t);
1087   *       }
1088   *     });}</pre>
1089   *
1090   * <p>Note: If the callback is slow or heavyweight, consider {@linkplain
1091   * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
1092   * executor}. If you do not supply an executor, {@code addCallback} will use
1093   * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries
1094   * some caveats for heavier operations. For example, the callback may run on
1095   * an unpredictable or undesirable thread:
1096   *
1097   * <ul>
1098   * <li>If the input {@code Future} is done at the time {@code addCallback} is
1099   * called, {@code addCallback} will execute the callback inline.
1100   * <li>If the input {@code Future} is not yet done, {@code addCallback} will
1101   * schedule the callback to be run by the thread that completes the input
1102   * {@code Future}, which may be an internal system thread such as an RPC
1103   * network thread.
1104   * </ul>
1105   *
1106   * <p>Also note that, regardless of which thread executes the {@code
1107   * sameThreadExecutor} callback, all other registered but unexecuted listeners
1108   * are prevented from running during its execution, even if those listeners
1109   * are to run in other executors.
1110   *
1111   * <p>For a more general interface to attach a completion listener to a
1112   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1113   *
1114   * @param future The future attach the callback to.
1115   * @param callback The callback to invoke when {@code future} is completed.
1116   * @since 10.0
1117   */
1118  public static <V> void addCallback(ListenableFuture<V> future,
1119      FutureCallback<? super V> callback) {
1120    addCallback(future, callback, MoreExecutors.sameThreadExecutor());
1121  }
1122
1123  /**
1124   * Registers separate success and failure callbacks to be run when the {@code
1125   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1126   * complete} or, if the computation is already complete, immediately.
1127   *
1128   * <p>The callback is run in {@code executor}.
1129   * There is no guaranteed ordering of execution of callbacks, but any
1130   * callback added through this method is guaranteed to be called once the
1131   * computation is complete.
1132   *
1133   * Example: <pre> {@code
1134   * ListenableFuture<QueryResult> future = ...;
1135   * Executor e = ...
1136   * addCallback(future,
1137   *     new FutureCallback<QueryResult> {
1138   *       public void onSuccess(QueryResult result) {
1139   *         storeInCache(result);
1140   *       }
1141   *       public void onFailure(Throwable t) {
1142   *         reportError(t);
1143   *       }
1144   *     }, e);}</pre>
1145   *
1146   * <p>When the callback is fast and lightweight, consider {@linkplain
1147   * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
1148   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
1149   * caveats documented in the link above.
1150   *
1151   * <p>For a more general interface to attach a completion listener to a
1152   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1153   *
1154   * @param future The future attach the callback to.
1155   * @param callback The callback to invoke when {@code future} is completed.
1156   * @param executor The executor to run {@code callback} when the future
1157   *    completes.
1158   * @since 10.0
1159   */
1160  public static <V> void addCallback(final ListenableFuture<V> future,
1161      final FutureCallback<? super V> callback, Executor executor) {
1162    Preconditions.checkNotNull(callback);
1163    Runnable callbackListener = new Runnable() {
1164      @Override
1165      public void run() {
1166        final V value;
1167        try {
1168          // TODO(user): (Before Guava release), validate that this
1169          // is the thing for IE.
1170          value = getUninterruptibly(future);
1171        } catch (ExecutionException e) {
1172          callback.onFailure(e.getCause());
1173          return;
1174        } catch (RuntimeException e) {
1175          callback.onFailure(e);
1176          return;
1177        } catch (Error e) {
1178          callback.onFailure(e);
1179          return;
1180        }
1181        callback.onSuccess(value);
1182      }
1183    };
1184    future.addListener(callbackListener, executor);
1185  }
1186
1187  /**
1188   * Returns the result of {@link Future#get()}, converting most exceptions to a
1189   * new instance of the given checked exception type. This reduces boilerplate
1190   * for a common use of {@code Future} in which it is unnecessary to
1191   * programmatically distinguish between exception types or to extract other
1192   * information from the exception instance.
1193   *
1194   * <p>Exceptions from {@code Future.get} are treated as follows:
1195   * <ul>
1196   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1197   *     {@code X} if the cause is a checked exception, an {@link
1198   *     UncheckedExecutionException} if the cause is a {@code
1199   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1200   *     {@code Error}.
1201   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1202   *     restoring the interrupt).
1203   * <li>Any {@link CancellationException} is propagated untouched, as is any
1204   *     other {@link RuntimeException} (though {@code get} implementations are
1205   *     discouraged from throwing such exceptions).
1206   * </ul>
1207   *
1208   * <p>The overall principle is to continue to treat every checked exception as a
1209   * checked exception, every unchecked exception as an unchecked exception, and
1210   * every error as an error. In addition, the cause of any {@code
1211   * ExecutionException} is wrapped in order to ensure that the new stack trace
1212   * matches that of the current thread.
1213   *
1214   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1215   * public constructor that accepts zero or more arguments, all of type {@code
1216   * String} or {@code Throwable} (preferring constructors with at least one
1217   * {@code String}) and calling the constructor via reflection. If the
1218   * exception did not already have a cause, one is set by calling {@link
1219   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1220   * {@code IllegalArgumentException} is thrown.
1221   *
1222   * @throws X if {@code get} throws any checked exception except for an {@code
1223   *         ExecutionException} whose cause is not itself a checked exception
1224   * @throws UncheckedExecutionException if {@code get} throws an {@code
1225   *         ExecutionException} with a {@code RuntimeException} as its cause
1226   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1227   *         with an {@code Error} as its cause
1228   * @throws CancellationException if {@code get} throws a {@code
1229   *         CancellationException}
1230   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1231   *         RuntimeException} or does not have a suitable constructor
1232   * @since 10.0
1233   */
1234  public static <V, X extends Exception> V get(
1235      Future<V> future, Class<X> exceptionClass) throws X {
1236    checkNotNull(future);
1237    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1238        "Futures.get exception type (%s) must not be a RuntimeException",
1239        exceptionClass);
1240    try {
1241      return future.get();
1242    } catch (InterruptedException e) {
1243      currentThread().interrupt();
1244      throw newWithCause(exceptionClass, e);
1245    } catch (ExecutionException e) {
1246      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1247      throw new AssertionError();
1248    }
1249  }
1250
1251  /**
1252   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1253   * exceptions to a new instance of the given checked exception type. This
1254   * reduces boilerplate for a common use of {@code Future} in which it is
1255   * unnecessary to programmatically distinguish between exception types or to
1256   * extract other information from the exception instance.
1257   *
1258   * <p>Exceptions from {@code Future.get} are treated as follows:
1259   * <ul>
1260   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1261   *     {@code X} if the cause is a checked exception, an {@link
1262   *     UncheckedExecutionException} if the cause is a {@code
1263   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1264   *     {@code Error}.
1265   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1266   *     restoring the interrupt).
1267   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1268   * <li>Any {@link CancellationException} is propagated untouched, as is any
1269   *     other {@link RuntimeException} (though {@code get} implementations are
1270   *     discouraged from throwing such exceptions).
1271   * </ul>
1272   *
1273   * <p>The overall principle is to continue to treat every checked exception as a
1274   * checked exception, every unchecked exception as an unchecked exception, and
1275   * every error as an error. In addition, the cause of any {@code
1276   * ExecutionException} is wrapped in order to ensure that the new stack trace
1277   * matches that of the current thread.
1278   *
1279   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1280   * public constructor that accepts zero or more arguments, all of type {@code
1281   * String} or {@code Throwable} (preferring constructors with at least one
1282   * {@code String}) and calling the constructor via reflection. If the
1283   * exception did not already have a cause, one is set by calling {@link
1284   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1285   * {@code IllegalArgumentException} is thrown.
1286   *
1287   * @throws X if {@code get} throws any checked exception except for an {@code
1288   *         ExecutionException} whose cause is not itself a checked exception
1289   * @throws UncheckedExecutionException if {@code get} throws an {@code
1290   *         ExecutionException} with a {@code RuntimeException} as its cause
1291   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1292   *         with an {@code Error} as its cause
1293   * @throws CancellationException if {@code get} throws a {@code
1294   *         CancellationException}
1295   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1296   *         RuntimeException} or does not have a suitable constructor
1297   * @since 10.0
1298   */
1299  public static <V, X extends Exception> V get(
1300      Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1301      throws X {
1302    checkNotNull(future);
1303    checkNotNull(unit);
1304    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1305        "Futures.get exception type (%s) must not be a RuntimeException",
1306        exceptionClass);
1307    try {
1308      return future.get(timeout, unit);
1309    } catch (InterruptedException e) {
1310      currentThread().interrupt();
1311      throw newWithCause(exceptionClass, e);
1312    } catch (TimeoutException e) {
1313      throw newWithCause(exceptionClass, e);
1314    } catch (ExecutionException e) {
1315      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1316      throw new AssertionError();
1317    }
1318  }
1319
1320  private static <X extends Exception> void wrapAndThrowExceptionOrError(
1321      Throwable cause, Class<X> exceptionClass) throws X {
1322    if (cause instanceof Error) {
1323      throw new ExecutionError((Error) cause);
1324    }
1325    if (cause instanceof RuntimeException) {
1326      throw new UncheckedExecutionException(cause);
1327    }
1328    throw newWithCause(exceptionClass, cause);
1329  }
1330
1331  /**
1332   * Returns the result of calling {@link Future#get()} uninterruptibly on a
1333   * task known not to throw a checked exception. This makes {@code Future} more
1334   * suitable for lightweight, fast-running tasks that, barring bugs in the
1335   * code, will not fail. This gives it exception-handling behavior similar to
1336   * that of {@code ForkJoinTask.join}.
1337   *
1338   * <p>Exceptions from {@code Future.get} are treated as follows:
1339   * <ul>
1340   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1341   *     {@link UncheckedExecutionException} (if the cause is an {@code
1342   *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1343   *     Error}).
1344   * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1345   *     call. The interrupt is restored before {@code getUnchecked} returns.
1346   * <li>Any {@link CancellationException} is propagated untouched. So is any
1347   *     other {@link RuntimeException} ({@code get} implementations are
1348   *     discouraged from throwing such exceptions).
1349   * </ul>
1350   *
1351   * <p>The overall principle is to eliminate all checked exceptions: to loop to
1352   * avoid {@code InterruptedException}, to pass through {@code
1353   * CancellationException}, and to wrap any exception from the underlying
1354   * computation in an {@code UncheckedExecutionException} or {@code
1355   * ExecutionError}.
1356   *
1357   * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1358   * {@link Uninterruptibles#getUninterruptibly(Future)}.
1359   *
1360   * @throws UncheckedExecutionException if {@code get} throws an {@code
1361   *         ExecutionException} with an {@code Exception} as its cause
1362   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1363   *         with an {@code Error} as its cause
1364   * @throws CancellationException if {@code get} throws a {@code
1365   *         CancellationException}
1366   * @since 10.0
1367   */
1368  public static <V> V getUnchecked(Future<V> future) {
1369    checkNotNull(future);
1370    try {
1371      return getUninterruptibly(future);
1372    } catch (ExecutionException e) {
1373      wrapAndThrowUnchecked(e.getCause());
1374      throw new AssertionError();
1375    }
1376  }
1377
1378  private static void wrapAndThrowUnchecked(Throwable cause) {
1379    if (cause instanceof Error) {
1380      throw new ExecutionError((Error) cause);
1381    }
1382    /*
1383     * It's a non-Error, non-Exception Throwable. From my survey of such
1384     * classes, I believe that most users intended to extend Exception, so we'll
1385     * treat it like an Exception.
1386     */
1387    throw new UncheckedExecutionException(cause);
1388  }
1389
1390  /*
1391   * TODO(user): FutureChecker interface for these to be static methods on? If
1392   * so, refer to it in the (static-method) Futures.get documentation
1393   */
1394
1395  /*
1396   * Arguably we don't need a timed getUnchecked because any operation slow
1397   * enough to require a timeout is heavyweight enough to throw a checked
1398   * exception and therefore be inappropriate to use with getUnchecked. Further,
1399   * it's not clear that converting the checked TimeoutException to a
1400   * RuntimeException -- especially to an UncheckedExecutionException, since it
1401   * wasn't thrown by the computation -- makes sense, and if we don't convert
1402   * it, the user still has to write a try-catch block.
1403   *
1404   * If you think you would use this method, let us know.
1405   */
1406
1407  private static <X extends Exception> X newWithCause(
1408      Class<X> exceptionClass, Throwable cause) {
1409    // getConstructors() guarantees this as long as we don't modify the array.
1410    @SuppressWarnings("unchecked")
1411    List<Constructor<X>> constructors =
1412        (List) Arrays.asList(exceptionClass.getConstructors());
1413    for (Constructor<X> constructor : preferringStrings(constructors)) {
1414      @Nullable X instance = newFromConstructor(constructor, cause);
1415      if (instance != null) {
1416        if (instance.getCause() == null) {
1417          instance.initCause(cause);
1418        }
1419        return instance;
1420      }
1421    }
1422    throw new IllegalArgumentException(
1423        "No appropriate constructor for exception of type " + exceptionClass
1424            + " in response to chained exception", cause);
1425  }
1426
1427  private static <X extends Exception> List<Constructor<X>>
1428      preferringStrings(List<Constructor<X>> constructors) {
1429    return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1430  }
1431
1432  private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1433      Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1434        @Override public Boolean apply(Constructor<?> input) {
1435          return asList(input.getParameterTypes()).contains(String.class);
1436        }
1437      }).reverse();
1438
1439  @Nullable private static <X> X newFromConstructor(
1440      Constructor<X> constructor, Throwable cause) {
1441    Class<?>[] paramTypes = constructor.getParameterTypes();
1442    Object[] params = new Object[paramTypes.length];
1443    for (int i = 0; i < paramTypes.length; i++) {
1444      Class<?> paramType = paramTypes[i];
1445      if (paramType.equals(String.class)) {
1446        params[i] = cause.toString();
1447      } else if (paramType.equals(Throwable.class)) {
1448        params[i] = cause;
1449      } else {
1450        return null;
1451      }
1452    }
1453    try {
1454      return constructor.newInstance(params);
1455    } catch (IllegalArgumentException e) {
1456      return null;
1457    } catch (InstantiationException e) {
1458      return null;
1459    } catch (IllegalAccessException e) {
1460      return null;
1461    } catch (InvocationTargetException e) {
1462      return null;
1463    }
1464  }
1465
1466  private interface FutureCombiner<V, C> {
1467    C combine(List<Optional<V>> values);
1468  }
1469
1470  private static class CombinedFuture<V, C> extends AbstractFuture<C> {
1471    private static final Logger logger =
1472        Logger.getLogger(CombinedFuture.class.getName());
1473
1474    ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
1475    final boolean allMustSucceed;
1476    final AtomicInteger remaining;
1477    FutureCombiner<V, C> combiner;
1478    List<Optional<V>> values;
1479    final Object seenExceptionsLock = new Object();
1480    Set<Throwable> seenExceptions;
1481
1482    CombinedFuture(
1483        ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
1484        boolean allMustSucceed, Executor listenerExecutor,
1485        FutureCombiner<V, C> combiner) {
1486      this.futures = futures;
1487      this.allMustSucceed = allMustSucceed;
1488      this.remaining = new AtomicInteger(futures.size());
1489      this.combiner = combiner;
1490      this.values = Lists.newArrayListWithCapacity(futures.size());
1491      init(listenerExecutor);
1492    }
1493
1494    /**
1495     * Must be called at the end of the constructor.
1496     */
1497    protected void init(final Executor listenerExecutor) {
1498      // First, schedule cleanup to execute when the Future is done.
1499      addListener(new Runnable() {
1500        @Override
1501        public void run() {
1502          // Cancel all the component futures.
1503          if (CombinedFuture.this.isCancelled()) {
1504            for (ListenableFuture<?> future : CombinedFuture.this.futures) {
1505              future.cancel(CombinedFuture.this.wasInterrupted());
1506            }
1507          }
1508
1509          // Let go of the memory held by other futures
1510          CombinedFuture.this.futures = null;
1511
1512          // By now the values array has either been set as the Future's value,
1513          // or (in case of failure) is no longer useful.
1514          CombinedFuture.this.values = null;
1515
1516          // The combiner may also hold state, so free that as well
1517          CombinedFuture.this.combiner = null;
1518        }
1519      }, MoreExecutors.sameThreadExecutor());
1520
1521      // Now begin the "real" initialization.
1522
1523      // Corner case: List is empty.
1524      if (futures.isEmpty()) {
1525        set(combiner.combine(ImmutableList.<Optional<V>>of()));
1526        return;
1527      }
1528
1529      // Populate the results list with null initially.
1530      for (int i = 0; i < futures.size(); ++i) {
1531        values.add(null);
1532      }
1533
1534      // Register a listener on each Future in the list to update
1535      // the state of this future.
1536      // Note that if all the futures on the list are done prior to completing
1537      // this loop, the last call to addListener() will callback to
1538      // setOneValue(), transitively call our cleanup listener, and set
1539      // this.futures to null.
1540      // This is not actually a problem, since the foreach only needs
1541      // this.futures to be non-null at the beginning of the loop.
1542      int i = 0;
1543      for (final ListenableFuture<? extends V> listenable : futures) {
1544        final int index = i++;
1545        listenable.addListener(new Runnable() {
1546          @Override
1547          public void run() {
1548            setOneValue(index, listenable);
1549          }
1550        }, listenerExecutor);
1551      }
1552    }
1553
1554    /**
1555     * Fails this future with the given Throwable if {@link #allMustSucceed} is
1556     * true. Also, logs the throwable if it is an {@link Error} or if
1557     * {@link #allMustSucceed} is {@code true}, the throwable did not cause
1558     * this future to fail, and it is the first time we've seen that particular Throwable.
1559     */
1560    private void setExceptionAndMaybeLog(Throwable throwable) {
1561      boolean visibleFromOutputFuture = false;
1562      boolean firstTimeSeeingThisException = true;
1563      if (allMustSucceed) {
1564        // As soon as the first one fails, throw the exception up.
1565        // The result of all other inputs is then ignored.
1566        visibleFromOutputFuture = super.setException(throwable);
1567
1568        synchronized (seenExceptionsLock) {
1569          if (seenExceptions == null) {
1570            seenExceptions = Sets.newHashSet();
1571          }
1572          firstTimeSeeingThisException = seenExceptions.add(throwable);
1573        }
1574      }
1575
1576      if (throwable instanceof Error
1577          || (allMustSucceed && !visibleFromOutputFuture && firstTimeSeeingThisException)) {
1578        logger.log(Level.SEVERE, "input future failed.", throwable);
1579      }
1580    }
1581
1582    /**
1583     * Sets the value at the given index to that of the given future.
1584     */
1585    private void setOneValue(int index, Future<? extends V> future) {
1586      List<Optional<V>> localValues = values;
1587      // TODO(user): This check appears to be redundant since values is
1588      // assigned null only after the future completes.  However, values
1589      // is not volatile so it may be possible for us to observe the changes
1590      // to these two values in a different order... which I think is why
1591      // we need to check both.  Clear up this craziness either by making
1592      // values volatile or proving that it doesn't need to be for some other
1593      // reason.
1594      if (isDone() || localValues == null) {
1595        // Some other future failed or has been cancelled, causing this one to
1596        // also be cancelled or have an exception set. This should only happen
1597        // if allMustSucceed is true or if the output itself has been
1598        // cancelled.
1599        checkState(allMustSucceed || isCancelled(),
1600            "Future was done before all dependencies completed");
1601      }
1602
1603      try {
1604        checkState(future.isDone(),
1605            "Tried to set value from future which is not done");
1606        V returnValue = getUninterruptibly(future);
1607        if (localValues != null) {
1608          localValues.set(index, Optional.fromNullable(returnValue));
1609        }
1610      } catch (CancellationException e) {
1611        if (allMustSucceed) {
1612          // Set ourselves as cancelled. Let the input futures keep running
1613          // as some of them may be used elsewhere.
1614          cancel(false);
1615        }
1616      } catch (ExecutionException e) {
1617        setExceptionAndMaybeLog(e.getCause());
1618      } catch (Throwable t) {
1619        setExceptionAndMaybeLog(t);
1620      } finally {
1621        int newRemaining = remaining.decrementAndGet();
1622        checkState(newRemaining >= 0, "Less than 0 remaining futures");
1623        if (newRemaining == 0) {
1624          FutureCombiner<V, C> localCombiner = combiner;
1625          if (localCombiner != null && localValues != null) {
1626            set(localCombiner.combine(localValues));
1627          } else {
1628            checkState(isDone());
1629          }
1630        }
1631      }
1632    }
1633
1634  }
1635
1636  /** Used for {@link #allAsList} and {@link #successfulAsList}. */
1637  private static <V> ListenableFuture<List<V>> listFuture(
1638      ImmutableList<ListenableFuture<? extends V>> futures,
1639      boolean allMustSucceed, Executor listenerExecutor) {
1640    return new CombinedFuture<V, List<V>>(
1641        futures, allMustSucceed, listenerExecutor,
1642        new FutureCombiner<V, List<V>>() {
1643          @Override
1644          public List<V> combine(List<Optional<V>> values) {
1645            List<V> result = Lists.newArrayList();
1646            for (Optional<V> element : values) {
1647              result.add(element != null ? element.orNull() : null);
1648            }
1649            return Collections.unmodifiableList(result);
1650          }
1651        });
1652  }
1653
1654  /**
1655   * A checked future that uses a function to map from exceptions to the
1656   * appropriate checked type.
1657   */
1658  private static class MappingCheckedFuture<V, X extends Exception> extends
1659      AbstractCheckedFuture<V, X> {
1660
1661    final Function<Exception, X> mapper;
1662
1663    MappingCheckedFuture(ListenableFuture<V> delegate,
1664        Function<Exception, X> mapper) {
1665      super(delegate);
1666
1667      this.mapper = checkNotNull(mapper);
1668    }
1669
1670    @Override
1671    protected X mapException(Exception e) {
1672      return mapper.apply(e);
1673    }
1674  }
1675}