001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024import static java.lang.Thread.currentThread;
025import static java.util.Arrays.asList;
026
027import com.google.common.annotations.Beta;
028import com.google.common.base.Function;
029import com.google.common.base.Optional;
030import com.google.common.base.Preconditions;
031import com.google.common.collect.ImmutableCollection;
032import com.google.common.collect.ImmutableList;
033import com.google.common.collect.Lists;
034import com.google.common.collect.Ordering;
035
036import java.lang.reflect.Constructor;
037import java.lang.reflect.InvocationTargetException;
038import java.lang.reflect.UndeclaredThrowableException;
039import java.util.Arrays;
040import java.util.Collections;
041import java.util.List;
042import java.util.concurrent.CancellationException;
043import java.util.concurrent.CountDownLatch;
044import java.util.concurrent.ExecutionException;
045import java.util.concurrent.Executor;
046import java.util.concurrent.Future;
047import java.util.concurrent.TimeUnit;
048import java.util.concurrent.TimeoutException;
049import java.util.concurrent.atomic.AtomicInteger;
050import java.util.logging.Level;
051import java.util.logging.Logger;
052
053import javax.annotation.Nullable;
054
055/**
056 * Static utility methods pertaining to the {@link Future} interface.
057 *
058 * <p>Many of these methods use the {@link ListenableFuture} API; consult the
059 * Guava User Guide article on <a href=
060 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
061 * {@code ListenableFuture}</a>.
062 *
063 * @author Kevin Bourrillion
064 * @author Nishant Thakkar
065 * @author Sven Mawson
066 * @since 1.0
067 */
068@Beta
069public final class Futures {
070  private Futures() {}
071
072  /**
073   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
074   * and a {@link Function} that maps from {@link Exception} instances into the
075   * appropriate checked type.
076   *
077   * <p>The given mapping function will be applied to an
078   * {@link InterruptedException}, a {@link CancellationException}, or an
079   * {@link ExecutionException}.
080   * See {@link Future#get()} for details on the exceptions thrown.
081   *
082   * @since 9.0 (source-compatible since 1.0)
083   */
084  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
085      ListenableFuture<V> future, Function<Exception, X> mapper) {
086    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
087  }
088
089  private abstract static class ImmediateFuture<V>
090      implements ListenableFuture<V> {
091
092    private static final Logger log =
093        Logger.getLogger(ImmediateFuture.class.getName());
094
095    @Override
096    public void addListener(Runnable listener, Executor executor) {
097      checkNotNull(listener, "Runnable was null.");
098      checkNotNull(executor, "Executor was null.");
099      try {
100        executor.execute(listener);
101      } catch (RuntimeException e) {
102        // ListenableFuture's contract is that it will not throw unchecked
103        // exceptions, so log the bad runnable and/or executor and swallow it.
104        log.log(Level.SEVERE, "RuntimeException while executing runnable "
105            + listener + " with executor " + executor, e);
106      }
107    }
108
109    @Override
110    public boolean cancel(boolean mayInterruptIfRunning) {
111      return false;
112    }
113
114    @Override
115    public abstract V get() throws ExecutionException;
116
117    @Override
118    public V get(long timeout, TimeUnit unit) throws ExecutionException {
119      checkNotNull(unit);
120      return get();
121    }
122
123    @Override
124    public boolean isCancelled() {
125      return false;
126    }
127
128    @Override
129    public boolean isDone() {
130      return true;
131    }
132  }
133
134  private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
135
136    @Nullable private final V value;
137
138    ImmediateSuccessfulFuture(@Nullable V value) {
139      this.value = value;
140    }
141
142    @Override
143    public V get() {
144      return value;
145    }
146  }
147
148  private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
149      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
150
151    @Nullable private final V value;
152
153    ImmediateSuccessfulCheckedFuture(@Nullable V value) {
154      this.value = value;
155    }
156
157    @Override
158    public V get() {
159      return value;
160    }
161
162    @Override
163    public V checkedGet() {
164      return value;
165    }
166
167    @Override
168    public V checkedGet(long timeout, TimeUnit unit) {
169      checkNotNull(unit);
170      return value;
171    }
172  }
173
174  private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
175
176    private final Throwable thrown;
177
178    ImmediateFailedFuture(Throwable thrown) {
179      this.thrown = thrown;
180    }
181
182    @Override
183    public V get() throws ExecutionException {
184      throw new ExecutionException(thrown);
185    }
186  }
187
188  private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
189
190    private final CancellationException thrown;
191
192    ImmediateCancelledFuture() {
193      this.thrown = new CancellationException("Immediate cancelled future.");
194    }
195
196    @Override
197    public boolean isCancelled() {
198      return true;
199    }
200
201    @Override
202    public V get() {
203      throw AbstractFuture.cancellationExceptionWithCause(
204          "Task was cancelled.", thrown);
205    }
206  }
207
208  private static class ImmediateFailedCheckedFuture<V, X extends Exception>
209      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
210
211    private final X thrown;
212
213    ImmediateFailedCheckedFuture(X thrown) {
214      this.thrown = thrown;
215    }
216
217    @Override
218    public V get() throws ExecutionException {
219      throw new ExecutionException(thrown);
220    }
221
222    @Override
223    public V checkedGet() throws X {
224      throw thrown;
225    }
226
227    @Override
228    public V checkedGet(long timeout, TimeUnit unit) throws X {
229      checkNotNull(unit);
230      throw thrown;
231    }
232  }
233
234  /**
235   * Creates a {@code ListenableFuture} which has its value set immediately upon
236   * construction. The getters just return the value. This {@code Future} can't
237   * be canceled or timed out and its {@code isDone()} method always returns
238   * {@code true}.
239   */
240  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
241    return new ImmediateSuccessfulFuture<V>(value);
242  }
243
244  /**
245   * Returns a {@code CheckedFuture} which has its value set immediately upon
246   * construction.
247   *
248   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
249   * method always returns {@code true}. Calling {@code get()} or {@code
250   * checkedGet()} will immediately return the provided value.
251   */
252  public static <V, X extends Exception> CheckedFuture<V, X>
253      immediateCheckedFuture(@Nullable V value) {
254    return new ImmediateSuccessfulCheckedFuture<V, X>(value);
255  }
256
257  /**
258   * Returns a {@code ListenableFuture} which has an exception set immediately
259   * upon construction.
260   *
261   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
262   * method always returns {@code true}. Calling {@code get()} will immediately
263   * throw the provided {@code Throwable} wrapped in an {@code
264   * ExecutionException}.
265   */
266  public static <V> ListenableFuture<V> immediateFailedFuture(
267      Throwable throwable) {
268    checkNotNull(throwable);
269    return new ImmediateFailedFuture<V>(throwable);
270  }
271
272  /**
273   * Creates a {@code ListenableFuture} which is cancelled immediately upon
274   * construction, so that {@code isCancelled()} always returns {@code true}.
275   *
276   * @since 14.0
277   */
278  public static <V> ListenableFuture<V> immediateCancelledFuture() {
279    return new ImmediateCancelledFuture<V>();
280  }
281
282  /**
283   * Returns a {@code CheckedFuture} which has an exception set immediately upon
284   * construction.
285   *
286   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
287   * method always returns {@code true}. Calling {@code get()} will immediately
288   * throw the provided {@code Exception} wrapped in an {@code
289   * ExecutionException}, and calling {@code checkedGet()} will throw the
290   * provided exception itself.
291   */
292  public static <V, X extends Exception> CheckedFuture<V, X>
293      immediateFailedCheckedFuture(X exception) {
294    checkNotNull(exception);
295    return new ImmediateFailedCheckedFuture<V, X>(exception);
296  }
297
298  /**
299   * Returns a {@code Future} whose result is taken from the given primary
300   * {@code input} or, if the primary input fails, from the {@code Future}
301   * provided by the {@code fallback}. {@link FutureFallback#create} is not
302   * invoked until the primary input has failed, so if the primary input
303   * succeeds, it is never invoked. If, during the invocation of {@code
304   * fallback}, an exception is thrown, this exception is used as the result of
305   * the output {@code Future}.
306   *
307   * <p>Below is an example of a fallback that returns a default value if an
308   * exception occurs:
309   *
310   * <pre>   {@code
311   *   ListenableFuture<Integer> fetchCounterFuture = ...;
312   *
313   *   // Falling back to a zero counter in case an exception happens when
314   *   // processing the RPC to fetch counters.
315   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
316   *       fetchCounterFuture, new FutureFallback<Integer>() {
317   *         public ListenableFuture<Integer> create(Throwable t) {
318   *           // Returning "0" as the default for the counter when the
319   *           // exception happens.
320   *           return immediateFuture(0);
321   *         }
322   *       });}</pre>
323   *
324   * <p>The fallback can also choose to propagate the original exception when
325   * desired:
326   *
327   * <pre>   {@code
328   *   ListenableFuture<Integer> fetchCounterFuture = ...;
329   *
330   *   // Falling back to a zero counter only in case the exception was a
331   *   // TimeoutException.
332   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
333   *       fetchCounterFuture, new FutureFallback<Integer>() {
334   *         public ListenableFuture<Integer> create(Throwable t) {
335   *           if (t instanceof TimeoutException) {
336   *             return immediateFuture(0);
337   *           }
338   *           return immediateFailedFuture(t);
339   *         }
340   *       });}</pre>
341   *
342   * <p>Note: If the derived {@code Future} is slow or heavyweight to create
343   * (whether the {@code Future} itself is slow or heavyweight to complete is
344   * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
345   * FutureFallback, Executor) supplying an executor}. If you do not supply an
346   * executor, {@code withFallback} will use {@link
347   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
348   * caveats for heavier operations. For example, the call to {@code
349   * fallback.create} may run on an unpredictable or undesirable thread:
350   *
351   * <ul>
352   * <li>If the input {@code Future} is done at the time {@code withFallback}
353   * is called, {@code withFallback} will call {@code fallback.create} inline.
354   * <li>If the input {@code Future} is not yet done, {@code withFallback} will
355   * schedule {@code fallback.create} to be run by the thread that completes
356   * the input {@code Future}, which may be an internal system thread such as
357   * an RPC network thread.
358   * </ul>
359   *
360   * <p>Also note that, regardless of which thread executes the {@code
361   * sameThreadExecutor} {@code fallback.create}, all other registered but
362   * unexecuted listeners are prevented from running during its execution, even
363   * if those listeners are to run in other executors.
364   *
365   * @param input the primary input {@code Future}
366   * @param fallback the {@link FutureFallback} implementation to be called if
367   *     {@code input} fails
368   * @since 14.0
369   */
370  public static <V> ListenableFuture<V> withFallback(
371      ListenableFuture<? extends V> input,
372      FutureFallback<? extends V> fallback) {
373    return withFallback(input, fallback, sameThreadExecutor());
374  }
375
376  /**
377   * Returns a {@code Future} whose result is taken from the given primary
378   * {@code input} or, if the primary input fails, from the {@code Future}
379   * provided by the {@code fallback}. {@link FutureFallback#create} is not
380   * invoked until the primary input has failed, so if the primary input
381   * succeeds, it is never invoked. If, during the invocation of {@code
382   * fallback}, an exception is thrown, this exception is used as the result of
383   * the output {@code Future}.
384   *
385   * <p>Below is an example of a fallback that returns a default value if an
386   * exception occurs:
387   *
388   * <pre>   {@code
389   *   ListenableFuture<Integer> fetchCounterFuture = ...;
390   *
391   *   // Falling back to a zero counter in case an exception happens when
392   *   // processing the RPC to fetch counters.
393   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
394   *       fetchCounterFuture, new FutureFallback<Integer>() {
395   *         public ListenableFuture<Integer> create(Throwable t) {
396   *           // Returning "0" as the default for the counter when the
397   *           // exception happens.
398   *           return immediateFuture(0);
399   *         }
400   *       }, sameThreadExecutor());}</pre>
401   *
402   * <p>The fallback can also choose to propagate the original exception when
403   * desired:
404   *
405   * <pre>   {@code
406   *   ListenableFuture<Integer> fetchCounterFuture = ...;
407   *
408   *   // Falling back to a zero counter only in case the exception was a
409   *   // TimeoutException.
410   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
411   *       fetchCounterFuture, new FutureFallback<Integer>() {
412   *         public ListenableFuture<Integer> create(Throwable t) {
413   *           if (t instanceof TimeoutException) {
414   *             return immediateFuture(0);
415   *           }
416   *           return immediateFailedFuture(t);
417   *         }
418   *       }, sameThreadExecutor());}</pre>
419   *
420   * <p>When the execution of {@code fallback.create} is fast and lightweight
421   * (though the {@code Future} it returns need not meet these criteria),
422   * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
423   * omitting the executor} or explicitly specifying {@code
424   * sameThreadExecutor}. However, be aware of the caveats documented in the
425   * link above.
426   *
427   * @param input the primary input {@code Future}
428   * @param fallback the {@link FutureFallback} implementation to be called if
429   *     {@code input} fails
430   * @param executor the executor that runs {@code fallback} if {@code input}
431   *     fails
432   * @since 14.0
433   */
434  public static <V> ListenableFuture<V> withFallback(
435      ListenableFuture<? extends V> input,
436      FutureFallback<? extends V> fallback, Executor executor) {
437    checkNotNull(fallback);
438    return new FallbackFuture<V>(input, fallback, executor);
439  }
440
441  /**
442   * A future that falls back on a second, generated future, in case its
443   * original future fails.
444   */
445  private static class FallbackFuture<V> extends AbstractFuture<V> {
446
447    private volatile ListenableFuture<? extends V> running;
448
449    FallbackFuture(ListenableFuture<? extends V> input,
450        final FutureFallback<? extends V> fallback,
451        final Executor executor) {
452      running = input;
453      addCallback(running, new FutureCallback<V>() {
454        @Override
455        public void onSuccess(V value) {
456          set(value);
457        }
458
459        @Override
460        public void onFailure(Throwable t) {
461          if (isCancelled()) {
462            return;
463          }
464          try {
465            running = fallback.create(t);
466            if (isCancelled()) { // in case cancel called in the meantime
467              running.cancel(wasInterrupted());
468              return;
469            }
470            addCallback(running, new FutureCallback<V>() {
471              @Override
472              public void onSuccess(V value) {
473                set(value);
474              }
475
476              @Override
477              public void onFailure(Throwable t) {
478                if (running.isCancelled()) {
479                  cancel(false);
480                } else {
481                  setException(t);
482                }
483              }
484            }, sameThreadExecutor());
485          } catch (Throwable e) {
486            setException(e);
487          }
488        }
489      }, executor);
490    }
491
492    @Override
493    public boolean cancel(boolean mayInterruptIfRunning) {
494      if (super.cancel(mayInterruptIfRunning)) {
495        running.cancel(mayInterruptIfRunning);
496        return true;
497      }
498      return false;
499    }
500  }
501
502  /**
503   * Returns a new {@code ListenableFuture} whose result is asynchronously
504   * derived from the result of the given {@code Future}. More precisely, the
505   * returned {@code Future} takes its result from a {@code Future} produced by
506   * applying the given {@code AsyncFunction} to the result of the original
507   * {@code Future}. Example:
508   *
509   * <pre>   {@code
510   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
511   *   AsyncFunction<RowKey, QueryResult> queryFunction =
512   *       new AsyncFunction<RowKey, QueryResult>() {
513   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
514   *           return dataService.read(rowKey);
515   *         }
516   *       };
517   *   ListenableFuture<QueryResult> queryFuture =
518   *       transform(rowKeyFuture, queryFunction);}</pre>
519   *
520   * <p>Note: If the derived {@code Future} is slow or heavyweight to create
521   * (whether the {@code Future} itself is slow or heavyweight to complete is
522   * irrelevant), consider {@linkplain #transform(ListenableFuture,
523   * AsyncFunction, Executor) supplying an executor}. If you do not supply an
524   * executor, {@code transform} will use {@link
525   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
526   * caveats for heavier operations. For example, the call to {@code
527   * function.apply} may run on an unpredictable or undesirable thread:
528   *
529   * <ul>
530   * <li>If the input {@code Future} is done at the time {@code transform} is
531   * called, {@code transform} will call {@code function.apply} inline.
532   * <li>If the input {@code Future} is not yet done, {@code transform} will
533   * schedule {@code function.apply} to be run by the thread that completes the
534   * input {@code Future}, which may be an internal system thread such as an
535   * RPC network thread.
536   * </ul>
537   *
538   * <p>Also note that, regardless of which thread executes the {@code
539   * sameThreadExecutor} {@code function.apply}, all other registered but
540   * unexecuted listeners are prevented from running during its execution, even
541   * if those listeners are to run in other executors.
542   *
543   * <p>The returned {@code Future} attempts to keep its cancellation state in
544   * sync with that of the input future and that of the future returned by the
545   * function. That is, if the returned {@code Future} is cancelled, it will
546   * attempt to cancel the other two, and if either of the other two is
547   * cancelled, the returned {@code Future} will receive a callback in which it
548   * will attempt to cancel itself.
549   *
550   * @param input The future to transform
551   * @param function A function to transform the result of the input future
552   *     to the result of the output future
553   * @return A future that holds result of the function (if the input succeeded)
554   *     or the original input's failure (if not)
555   * @since 11.0
556   */
557  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
558      AsyncFunction<? super I, ? extends O> function) {
559    return transform(input, function, MoreExecutors.sameThreadExecutor());
560  }
561
562  /**
563   * Returns a new {@code ListenableFuture} whose result is asynchronously
564   * derived from the result of the given {@code Future}. More precisely, the
565   * returned {@code Future} takes its result from a {@code Future} produced by
566   * applying the given {@code AsyncFunction} to the result of the original
567   * {@code Future}. Example:
568   *
569   * <pre>   {@code
570   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
571   *   AsyncFunction<RowKey, QueryResult> queryFunction =
572   *       new AsyncFunction<RowKey, QueryResult>() {
573   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
574   *           return dataService.read(rowKey);
575   *         }
576   *       };
577   *   ListenableFuture<QueryResult> queryFuture =
578   *       transform(rowKeyFuture, queryFunction, executor);}</pre>
579   *
580   * <p>The returned {@code Future} attempts to keep its cancellation state in
581   * sync with that of the input future and that of the future returned by the
582   * chain function. That is, if the returned {@code Future} is cancelled, it
583   * will attempt to cancel the other two, and if either of the other two is
584   * cancelled, the returned {@code Future} will receive a callback in which it
585   * will attempt to cancel itself.
586   *
587   * <p>When the execution of {@code function.apply} is fast and lightweight
588   * (though the {@code Future} it returns need not meet these criteria),
589   * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
590   * the executor} or explicitly specifying {@code sameThreadExecutor}.
591   * However, be aware of the caveats documented in the link above.
592   *
593   * @param input The future to transform
594   * @param function A function to transform the result of the input future
595   *     to the result of the output future
596   * @param executor Executor to run the function in.
597   * @return A future that holds result of the function (if the input succeeded)
598   *     or the original input's failure (if not)
599   * @since 11.0
600   */
601  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
602      AsyncFunction<? super I, ? extends O> function,
603      Executor executor) {
604    ChainingListenableFuture<I, O> output =
605        new ChainingListenableFuture<I, O>(function, input);
606    input.addListener(output, executor);
607    return output;
608  }
609
610  /**
611   * Returns a new {@code ListenableFuture} whose result is the product of
612   * applying the given {@code Function} to the result of the given {@code
613   * Future}. Example:
614   *
615   * <pre>   {@code
616   *   ListenableFuture<QueryResult> queryFuture = ...;
617   *   Function<QueryResult, List<Row>> rowsFunction =
618   *       new Function<QueryResult, List<Row>>() {
619   *         public List<Row> apply(QueryResult queryResult) {
620   *           return queryResult.getRows();
621   *         }
622   *       };
623   *   ListenableFuture<List<Row>> rowsFuture =
624   *       transform(queryFuture, rowsFunction);}</pre>
625   *
626   * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain
627   * #transform(ListenableFuture, Function, Executor) supplying an executor}.
628   * If you do not supply an executor, {@code transform} will use {@link
629   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
630   * caveats for heavier operations.  For example, the call to {@code
631   * function.apply} may run on an unpredictable or undesirable thread:
632   *
633   * <ul>
634   * <li>If the input {@code Future} is done at the time {@code transform} is
635   * called, {@code transform} will call {@code function.apply} inline.
636   * <li>If the input {@code Future} is not yet done, {@code transform} will
637   * schedule {@code function.apply} to be run by the thread that completes the
638   * input {@code Future}, which may be an internal system thread such as an
639   * RPC network thread.
640   * </ul>
641   *
642   * <p>Also note that, regardless of which thread executes the {@code
643   * sameThreadExecutor} {@code function.apply}, all other registered but
644   * unexecuted listeners are prevented from running during its execution, even
645   * if those listeners are to run in other executors.
646   *
647   * <p>The returned {@code Future} attempts to keep its cancellation state in
648   * sync with that of the input future. That is, if the returned {@code Future}
649   * is cancelled, it will attempt to cancel the input, and if the input is
650   * cancelled, the returned {@code Future} will receive a callback in which it
651   * will attempt to cancel itself.
652   *
653   * <p>An example use of this method is to convert a serializable object
654   * returned from an RPC into a POJO.
655   *
656   * @param input The future to transform
657   * @param function A Function to transform the results of the provided future
658   *     to the results of the returned future.  This will be run in the thread
659   *     that notifies input it is complete.
660   * @return A future that holds result of the transformation.
661   * @since 9.0 (in 1.0 as {@code compose})
662   */
663  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
664      final Function<? super I, ? extends O> function) {
665    return transform(input, function, MoreExecutors.sameThreadExecutor());
666  }
667
668  /**
669   * Returns a new {@code ListenableFuture} whose result is the product of
670   * applying the given {@code Function} to the result of the given {@code
671   * Future}. Example:
672   *
673   * <pre>   {@code
674   *   ListenableFuture<QueryResult> queryFuture = ...;
675   *   Function<QueryResult, List<Row>> rowsFunction =
676   *       new Function<QueryResult, List<Row>>() {
677   *         public List<Row> apply(QueryResult queryResult) {
678   *           return queryResult.getRows();
679   *         }
680   *       };
681   *   ListenableFuture<List<Row>> rowsFuture =
682   *       transform(queryFuture, rowsFunction, executor);}</pre>
683   *
684   * <p>The returned {@code Future} attempts to keep its cancellation state in
685   * sync with that of the input future. That is, if the returned {@code Future}
686   * is cancelled, it will attempt to cancel the input, and if the input is
687   * cancelled, the returned {@code Future} will receive a callback in which it
688   * will attempt to cancel itself.
689   *
690   * <p>An example use of this method is to convert a serializable object
691   * returned from an RPC into a POJO.
692   *
693   * <p>When the transformation is fast and lightweight, consider {@linkplain
694   * #transform(ListenableFuture, Function) omitting the executor} or
695   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
696   * caveats documented in the link above.
697   *
698   * @param input The future to transform
699   * @param function A Function to transform the results of the provided future
700   *     to the results of the returned future.
701   * @param executor Executor to run the function in.
702   * @return A future that holds result of the transformation.
703   * @since 9.0 (in 2.0 as {@code compose})
704   */
705  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
706      final Function<? super I, ? extends O> function, Executor executor) {
707    checkNotNull(function);
708    AsyncFunction<I, O> wrapperFunction
709        = new AsyncFunction<I, O>() {
710            @Override public ListenableFuture<O> apply(I input) {
711              O output = function.apply(input);
712              return immediateFuture(output);
713            }
714        };
715    return transform(input, wrapperFunction, executor);
716  }
717
718  /**
719   * Like {@link #transform(ListenableFuture, Function)} except that the
720   * transformation {@code function} is invoked on each call to
721   * {@link Future#get() get()} on the returned future.
722   *
723   * <p>The returned {@code Future} reflects the input's cancellation
724   * state directly, and any attempt to cancel the returned Future is likewise
725   * passed through to the input Future.
726   *
727   * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
728   * only apply the timeout to the execution of the underlying {@code Future},
729   * <em>not</em> to the execution of the transformation function.
730   *
731   * <p>The primary audience of this method is callers of {@code transform}
732   * who don't have a {@code ListenableFuture} available and
733   * do not mind repeated, lazy function evaluation.
734   *
735   * @param input The future to transform
736   * @param function A Function to transform the results of the provided future
737   *     to the results of the returned future.
738   * @return A future that returns the result of the transformation.
739   * @since 10.0
740   */
741  public static <I, O> Future<O> lazyTransform(final Future<I> input,
742      final Function<? super I, ? extends O> function) {
743    checkNotNull(input);
744    checkNotNull(function);
745    return new Future<O>() {
746
747      @Override
748      public boolean cancel(boolean mayInterruptIfRunning) {
749        return input.cancel(mayInterruptIfRunning);
750      }
751
752      @Override
753      public boolean isCancelled() {
754        return input.isCancelled();
755      }
756
757      @Override
758      public boolean isDone() {
759        return input.isDone();
760      }
761
762      @Override
763      public O get() throws InterruptedException, ExecutionException {
764        return applyTransformation(input.get());
765      }
766
767      @Override
768      public O get(long timeout, TimeUnit unit)
769          throws InterruptedException, ExecutionException, TimeoutException {
770        return applyTransformation(input.get(timeout, unit));
771      }
772
773      private O applyTransformation(I input) throws ExecutionException {
774        try {
775          return function.apply(input);
776        } catch (Throwable t) {
777          throw new ExecutionException(t);
778        }
779      }
780    };
781  }
782
783  /**
784   * An implementation of {@code ListenableFuture} that also implements
785   * {@code Runnable} so that it can be used to nest ListenableFutures.
786   * Once the passed-in {@code ListenableFuture} is complete, it calls the
787   * passed-in {@code Function} to generate the result.
788   *
789   * <p>If the function throws any checked exceptions, they should be wrapped
790   * in a {@code UndeclaredThrowableException} so that this class can get
791   * access to the cause.
792   */
793  private static class ChainingListenableFuture<I, O>
794      extends AbstractFuture<O> implements Runnable {
795
796    private AsyncFunction<? super I, ? extends O> function;
797    private ListenableFuture<? extends I> inputFuture;
798    private volatile ListenableFuture<? extends O> outputFuture;
799    private final CountDownLatch outputCreated = new CountDownLatch(1);
800
801    private ChainingListenableFuture(
802        AsyncFunction<? super I, ? extends O> function,
803        ListenableFuture<? extends I> inputFuture) {
804      this.function = checkNotNull(function);
805      this.inputFuture = checkNotNull(inputFuture);
806    }
807
808    @Override
809    public boolean cancel(boolean mayInterruptIfRunning) {
810      /*
811       * Our additional cancellation work needs to occur even if
812       * !mayInterruptIfRunning, so we can't move it into interruptTask().
813       */
814      if (super.cancel(mayInterruptIfRunning)) {
815        // This should never block since only one thread is allowed to cancel
816        // this Future.
817        cancel(inputFuture, mayInterruptIfRunning);
818        cancel(outputFuture, mayInterruptIfRunning);
819        return true;
820      }
821      return false;
822    }
823
824    private void cancel(@Nullable Future<?> future,
825        boolean mayInterruptIfRunning) {
826      if (future != null) {
827        future.cancel(mayInterruptIfRunning);
828      }
829    }
830
831    @Override
832    public void run() {
833      try {
834        I sourceResult;
835        try {
836          sourceResult = getUninterruptibly(inputFuture);
837        } catch (CancellationException e) {
838          // Cancel this future and return.
839          // At this point, inputFuture is cancelled and outputFuture doesn't
840          // exist, so the value of mayInterruptIfRunning is irrelevant.
841          cancel(false);
842          return;
843        } catch (ExecutionException e) {
844          // Set the cause of the exception as this future's exception
845          setException(e.getCause());
846          return;
847        }
848
849        final ListenableFuture<? extends O> outputFuture = this.outputFuture =
850            function.apply(sourceResult);
851        if (isCancelled()) {
852          outputFuture.cancel(wasInterrupted());
853          this.outputFuture = null;
854          return;
855        }
856        outputFuture.addListener(new Runnable() {
857            @Override
858            public void run() {
859              try {
860                set(getUninterruptibly(outputFuture));
861              } catch (CancellationException e) {
862                // Cancel this future and return.
863                // At this point, inputFuture and outputFuture are done, so the
864                // value of mayInterruptIfRunning is irrelevant.
865                cancel(false);
866                return;
867              } catch (ExecutionException e) {
868                // Set the cause of the exception as this future's exception
869                setException(e.getCause());
870              } finally {
871                // Don't pin inputs beyond completion
872                ChainingListenableFuture.this.outputFuture = null;
873              }
874            }
875          }, MoreExecutors.sameThreadExecutor());
876      } catch (UndeclaredThrowableException e) {
877        // Set the cause of the exception as this future's exception
878        setException(e.getCause());
879      } catch (Throwable t) {
880        // This exception is irrelevant in this thread, but useful for the
881        // client
882        setException(t);
883      } finally {
884        // Don't pin inputs beyond completion
885        function = null;
886        inputFuture = null;
887        // Allow our get routines to examine outputFuture now.
888        outputCreated.countDown();
889      }
890    }
891  }
892
893  /**
894   * Returns a new {@code ListenableFuture} whose result is the product of
895   * calling {@code get()} on the {@code Future} nested within the given {@code
896   * Future}, effectively chaining the futures one after the other.  Example:
897   *
898   * <pre>   {@code
899   *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
900   *   ListenableFuture<String> dereferenced = dereference(nested);}</pre>
901   *
902   * <p>This call has the same cancellation and execution semantics as {@link
903   * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
904   * Future} attempts to keep its cancellation state in sync with both the
905   * input {@code Future} and the nested {@code Future}.  The transformation
906   * is very lightweight and therefore takes place in the thread that called
907   * {@code dereference}.
908   *
909   * @param nested The nested future to transform.
910   * @return A future that holds result of the inner future.
911   * @since 13.0
912   */
913  @SuppressWarnings({"rawtypes", "unchecked"})
914  public static <V> ListenableFuture<V> dereference(
915      ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
916    return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
917  }
918
919  /**
920   * Helper {@code Function} for {@link #dereference}.
921   */
922  private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
923      new AsyncFunction<ListenableFuture<Object>, Object>() {
924        @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
925          return input;
926        }
927      };
928
929  /**
930   * Creates a new {@code ListenableFuture} whose value is a list containing the
931   * values of all its input futures, if all succeed. If any input fails, the
932   * returned future fails.
933   *
934   * <p>The list of results is in the same order as the input list.
935   *
936   * <p>Canceling this future will attempt to cancel all the component futures,
937   * and if any of the provided futures fails or is canceled, this one is,
938   * too.
939   *
940   * @param futures futures to combine
941   * @return a future that provides a list of the results of the component
942   *         futures
943   * @since 10.0
944   */
945  @Beta
946  public static <V> ListenableFuture<List<V>> allAsList(
947      ListenableFuture<? extends V>... futures) {
948    return listFuture(ImmutableList.copyOf(futures), true,
949        MoreExecutors.sameThreadExecutor());
950  }
951
952  /**
953   * Creates a new {@code ListenableFuture} whose value is a list containing the
954   * values of all its input futures, if all succeed. If any input fails, the
955   * returned future fails.
956   *
957   * <p>The list of results is in the same order as the input list.
958   *
959   * <p>Canceling this future will attempt to cancel all the component futures,
960   * and if any of the provided futures fails or is canceled, this one is,
961   * too.
962   *
963   * @param futures futures to combine
964   * @return a future that provides a list of the results of the component
965   *         futures
966   * @since 10.0
967   */
968  @Beta
969  public static <V> ListenableFuture<List<V>> allAsList(
970      Iterable<? extends ListenableFuture<? extends V>> futures) {
971    return listFuture(ImmutableList.copyOf(futures), true,
972        MoreExecutors.sameThreadExecutor());
973  }
974
975  /**
976   * Creates a new {@code ListenableFuture} whose result is set from the
977   * supplied future when it completes.  Cancelling the supplied future
978   * will also cancel the returned future, but cancelling the returned
979   * future will have no effect on the supplied future.
980   *
981   * @since 15.0
982   */
983  public static <V> ListenableFuture<V> nonCancellationPropagating(
984      ListenableFuture<V> future) {
985    return new NonCancellationPropagatingFuture<V>(future);
986  }
987
988  /**
989   * A wrapped future that does not propagate cancellation to its delegate.
990   */
991  private static class NonCancellationPropagatingFuture<V>
992      extends AbstractFuture<V> {
993    NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) {
994      checkNotNull(delegate);
995      addCallback(delegate, new FutureCallback<V>() {
996        @Override
997        public void onSuccess(V result) {
998          set(result);
999        }
1000
1001        @Override
1002        public void onFailure(Throwable t) {
1003          if (delegate.isCancelled()) {
1004            cancel(false);
1005          } else {
1006            setException(t);
1007          }
1008        }
1009      }, sameThreadExecutor());
1010    }
1011  }
1012
1013  /**
1014   * Creates a new {@code ListenableFuture} whose value is a list containing the
1015   * values of all its successful input futures. The list of results is in the
1016   * same order as the input list, and if any of the provided futures fails or
1017   * is canceled, its corresponding position will contain {@code null} (which is
1018   * indistinguishable from the future having a successful value of
1019   * {@code null}).
1020   *
1021   * <p>Canceling this future will attempt to cancel all the component futures.
1022   *
1023   * @param futures futures to combine
1024   * @return a future that provides a list of the results of the component
1025   *         futures
1026   * @since 10.0
1027   */
1028  @Beta
1029  public static <V> ListenableFuture<List<V>> successfulAsList(
1030      ListenableFuture<? extends V>... futures) {
1031    return listFuture(ImmutableList.copyOf(futures), false,
1032        MoreExecutors.sameThreadExecutor());
1033  }
1034
1035  /**
1036   * Creates a new {@code ListenableFuture} whose value is a list containing the
1037   * values of all its successful input futures. The list of results is in the
1038   * same order as the input list, and if any of the provided futures fails or
1039   * is canceled, its corresponding position will contain {@code null} (which is
1040   * indistinguishable from the future having a successful value of
1041   * {@code null}).
1042   *
1043   * <p>Canceling this future will attempt to cancel all the component futures.
1044   *
1045   * @param futures futures to combine
1046   * @return a future that provides a list of the results of the component
1047   *         futures
1048   * @since 10.0
1049   */
1050  @Beta
1051  public static <V> ListenableFuture<List<V>> successfulAsList(
1052      Iterable<? extends ListenableFuture<? extends V>> futures) {
1053    return listFuture(ImmutableList.copyOf(futures), false,
1054        MoreExecutors.sameThreadExecutor());
1055  }
1056
1057  /**
1058   * Registers separate success and failure callbacks to be run when the {@code
1059   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1060   * complete} or, if the computation is already complete, immediately.
1061   *
1062   * <p>There is no guaranteed ordering of execution of callbacks, but any
1063   * callback added through this method is guaranteed to be called once the
1064   * computation is complete.
1065   *
1066   * Example: <pre> {@code
1067   * ListenableFuture<QueryResult> future = ...;
1068   * addCallback(future,
1069   *     new FutureCallback<QueryResult> {
1070   *       public void onSuccess(QueryResult result) {
1071   *         storeInCache(result);
1072   *       }
1073   *       public void onFailure(Throwable t) {
1074   *         reportError(t);
1075   *       }
1076   *     });}</pre>
1077   *
1078   * <p>Note: If the callback is slow or heavyweight, consider {@linkplain
1079   * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
1080   * executor}. If you do not supply an executor, {@code addCallback} will use
1081   * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries
1082   * some caveats for heavier operations. For example, the callback may run on
1083   * an unpredictable or undesirable thread:
1084   *
1085   * <ul>
1086   * <li>If the input {@code Future} is done at the time {@code addCallback} is
1087   * called, {@code addCallback} will execute the callback inline.
1088   * <li>If the input {@code Future} is not yet done, {@code addCallback} will
1089   * schedule the callback to be run by the thread that completes the input
1090   * {@code Future}, which may be an internal system thread such as an RPC
1091   * network thread.
1092   * </ul>
1093   *
1094   * <p>Also note that, regardless of which thread executes the {@code
1095   * sameThreadExecutor} callback, all other registered but unexecuted listeners
1096   * are prevented from running during its execution, even if those listeners
1097   * are to run in other executors.
1098   *
1099   * <p>For a more general interface to attach a completion listener to a
1100   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1101   *
1102   * @param future The future attach the callback to.
1103   * @param callback The callback to invoke when {@code future} is completed.
1104   * @since 10.0
1105   */
1106  public static <V> void addCallback(ListenableFuture<V> future,
1107      FutureCallback<? super V> callback) {
1108    addCallback(future, callback, MoreExecutors.sameThreadExecutor());
1109  }
1110
1111  /**
1112   * Registers separate success and failure callbacks to be run when the {@code
1113   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1114   * complete} or, if the computation is already complete, immediately.
1115   *
1116   * <p>The callback is run in {@code executor}.
1117   * There is no guaranteed ordering of execution of callbacks, but any
1118   * callback added through this method is guaranteed to be called once the
1119   * computation is complete.
1120   *
1121   * Example: <pre> {@code
1122   * ListenableFuture<QueryResult> future = ...;
1123   * Executor e = ...
1124   * addCallback(future, e,
1125   *     new FutureCallback<QueryResult> {
1126   *       public void onSuccess(QueryResult result) {
1127   *         storeInCache(result);
1128   *       }
1129   *       public void onFailure(Throwable t) {
1130   *         reportError(t);
1131   *       }
1132   *     });}</pre>
1133   *
1134   * <p>When the callback is fast and lightweight, consider {@linkplain
1135   * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
1136   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
1137   * caveats documented in the link above.
1138   *
1139   * <p>For a more general interface to attach a completion listener to a
1140   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1141   *
1142   * @param future The future attach the callback to.
1143   * @param callback The callback to invoke when {@code future} is completed.
1144   * @param executor The executor to run {@code callback} when the future
1145   *    completes.
1146   * @since 10.0
1147   */
1148  public static <V> void addCallback(final ListenableFuture<V> future,
1149      final FutureCallback<? super V> callback, Executor executor) {
1150    Preconditions.checkNotNull(callback);
1151    Runnable callbackListener = new Runnable() {
1152      @Override
1153      public void run() {
1154        final V value;
1155        try {
1156          // TODO(user): (Before Guava release), validate that this
1157          // is the thing for IE.
1158          value = getUninterruptibly(future);
1159        } catch (ExecutionException e) {
1160          callback.onFailure(e.getCause());
1161          return;
1162        } catch (RuntimeException e) {
1163          callback.onFailure(e);
1164          return;
1165        } catch (Error e) {
1166          callback.onFailure(e);
1167          return;
1168        }
1169        callback.onSuccess(value);
1170      }
1171    };
1172    future.addListener(callbackListener, executor);
1173  }
1174
1175  /**
1176   * Returns the result of {@link Future#get()}, converting most exceptions to a
1177   * new instance of the given checked exception type. This reduces boilerplate
1178   * for a common use of {@code Future} in which it is unnecessary to
1179   * programmatically distinguish between exception types or to extract other
1180   * information from the exception instance.
1181   *
1182   * <p>Exceptions from {@code Future.get} are treated as follows:
1183   * <ul>
1184   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1185   *     {@code X} if the cause is a checked exception, an {@link
1186   *     UncheckedExecutionException} if the cause is a {@code
1187   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1188   *     {@code Error}.
1189   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1190   *     restoring the interrupt).
1191   * <li>Any {@link CancellationException} is propagated untouched, as is any
1192   *     other {@link RuntimeException} (though {@code get} implementations are
1193   *     discouraged from throwing such exceptions).
1194   * </ul>
1195   *
1196   * <p>The overall principle is to continue to treat every checked exception as a
1197   * checked exception, every unchecked exception as an unchecked exception, and
1198   * every error as an error. In addition, the cause of any {@code
1199   * ExecutionException} is wrapped in order to ensure that the new stack trace
1200   * matches that of the current thread.
1201   *
1202   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1203   * public constructor that accepts zero or more arguments, all of type {@code
1204   * String} or {@code Throwable} (preferring constructors with at least one
1205   * {@code String}) and calling the constructor via reflection. If the
1206   * exception did not already have a cause, one is set by calling {@link
1207   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1208   * {@code IllegalArgumentException} is thrown.
1209   *
1210   * @throws X if {@code get} throws any checked exception except for an {@code
1211   *         ExecutionException} whose cause is not itself a checked exception
1212   * @throws UncheckedExecutionException if {@code get} throws an {@code
1213   *         ExecutionException} with a {@code RuntimeException} as its cause
1214   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1215   *         with an {@code Error} as its cause
1216   * @throws CancellationException if {@code get} throws a {@code
1217   *         CancellationException}
1218   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1219   *         RuntimeException} or does not have a suitable constructor
1220   * @since 10.0
1221   */
1222  public static <V, X extends Exception> V get(
1223      Future<V> future, Class<X> exceptionClass) throws X {
1224    checkNotNull(future);
1225    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1226        "Futures.get exception type (%s) must not be a RuntimeException",
1227        exceptionClass);
1228    try {
1229      return future.get();
1230    } catch (InterruptedException e) {
1231      currentThread().interrupt();
1232      throw newWithCause(exceptionClass, e);
1233    } catch (ExecutionException e) {
1234      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1235      throw new AssertionError();
1236    }
1237  }
1238
1239  /**
1240   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1241   * exceptions to a new instance of the given checked exception type. This
1242   * reduces boilerplate for a common use of {@code Future} in which it is
1243   * unnecessary to programmatically distinguish between exception types or to
1244   * extract other information from the exception instance.
1245   *
1246   * <p>Exceptions from {@code Future.get} are treated as follows:
1247   * <ul>
1248   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1249   *     {@code X} if the cause is a checked exception, an {@link
1250   *     UncheckedExecutionException} if the cause is a {@code
1251   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1252   *     {@code Error}.
1253   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1254   *     restoring the interrupt).
1255   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1256   * <li>Any {@link CancellationException} is propagated untouched, as is any
1257   *     other {@link RuntimeException} (though {@code get} implementations are
1258   *     discouraged from throwing such exceptions).
1259   * </ul>
1260   *
1261   * <p>The overall principle is to continue to treat every checked exception as a
1262   * checked exception, every unchecked exception as an unchecked exception, and
1263   * every error as an error. In addition, the cause of any {@code
1264   * ExecutionException} is wrapped in order to ensure that the new stack trace
1265   * matches that of the current thread.
1266   *
1267   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1268   * public constructor that accepts zero or more arguments, all of type {@code
1269   * String} or {@code Throwable} (preferring constructors with at least one
1270   * {@code String}) and calling the constructor via reflection. If the
1271   * exception did not already have a cause, one is set by calling {@link
1272   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1273   * {@code IllegalArgumentException} is thrown.
1274   *
1275   * @throws X if {@code get} throws any checked exception except for an {@code
1276   *         ExecutionException} whose cause is not itself a checked exception
1277   * @throws UncheckedExecutionException if {@code get} throws an {@code
1278   *         ExecutionException} with a {@code RuntimeException} as its cause
1279   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1280   *         with an {@code Error} as its cause
1281   * @throws CancellationException if {@code get} throws a {@code
1282   *         CancellationException}
1283   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1284   *         RuntimeException} or does not have a suitable constructor
1285   * @since 10.0
1286   */
1287  public static <V, X extends Exception> V get(
1288      Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1289      throws X {
1290    checkNotNull(future);
1291    checkNotNull(unit);
1292    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1293        "Futures.get exception type (%s) must not be a RuntimeException",
1294        exceptionClass);
1295    try {
1296      return future.get(timeout, unit);
1297    } catch (InterruptedException e) {
1298      currentThread().interrupt();
1299      throw newWithCause(exceptionClass, e);
1300    } catch (TimeoutException e) {
1301      throw newWithCause(exceptionClass, e);
1302    } catch (ExecutionException e) {
1303      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1304      throw new AssertionError();
1305    }
1306  }
1307
1308  private static <X extends Exception> void wrapAndThrowExceptionOrError(
1309      Throwable cause, Class<X> exceptionClass) throws X {
1310    if (cause instanceof Error) {
1311      throw new ExecutionError((Error) cause);
1312    }
1313    if (cause instanceof RuntimeException) {
1314      throw new UncheckedExecutionException(cause);
1315    }
1316    throw newWithCause(exceptionClass, cause);
1317  }
1318
1319  /**
1320   * Returns the result of calling {@link Future#get()} uninterruptibly on a
1321   * task known not to throw a checked exception. This makes {@code Future} more
1322   * suitable for lightweight, fast-running tasks that, barring bugs in the
1323   * code, will not fail. This gives it exception-handling behavior similar to
1324   * that of {@code ForkJoinTask.join}.
1325   *
1326   * <p>Exceptions from {@code Future.get} are treated as follows:
1327   * <ul>
1328   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1329   *     {@link UncheckedExecutionException} (if the cause is an {@code
1330   *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1331   *     Error}).
1332   * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1333   *     call. The interrupt is restored before {@code getUnchecked} returns.
1334   * <li>Any {@link CancellationException} is propagated untouched. So is any
1335   *     other {@link RuntimeException} ({@code get} implementations are
1336   *     discouraged from throwing such exceptions).
1337   * </ul>
1338   *
1339   * <p>The overall principle is to eliminate all checked exceptions: to loop to
1340   * avoid {@code InterruptedException}, to pass through {@code
1341   * CancellationException}, and to wrap any exception from the underlying
1342   * computation in an {@code UncheckedExecutionException} or {@code
1343   * ExecutionError}.
1344   *
1345   * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1346   * {@link Uninterruptibles#getUninterruptibly(Future)}.
1347   *
1348   * @throws UncheckedExecutionException if {@code get} throws an {@code
1349   *         ExecutionException} with an {@code Exception} as its cause
1350   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1351   *         with an {@code Error} as its cause
1352   * @throws CancellationException if {@code get} throws a {@code
1353   *         CancellationException}
1354   * @since 10.0
1355   */
1356  public static <V> V getUnchecked(Future<V> future) {
1357    checkNotNull(future);
1358    try {
1359      return getUninterruptibly(future);
1360    } catch (ExecutionException e) {
1361      wrapAndThrowUnchecked(e.getCause());
1362      throw new AssertionError();
1363    }
1364  }
1365
1366  private static void wrapAndThrowUnchecked(Throwable cause) {
1367    if (cause instanceof Error) {
1368      throw new ExecutionError((Error) cause);
1369    }
1370    /*
1371     * It's a non-Error, non-Exception Throwable. From my survey of such
1372     * classes, I believe that most users intended to extend Exception, so we'll
1373     * treat it like an Exception.
1374     */
1375    throw new UncheckedExecutionException(cause);
1376  }
1377
1378  /*
1379   * TODO(user): FutureChecker interface for these to be static methods on? If
1380   * so, refer to it in the (static-method) Futures.get documentation
1381   */
1382
1383  /*
1384   * Arguably we don't need a timed getUnchecked because any operation slow
1385   * enough to require a timeout is heavyweight enough to throw a checked
1386   * exception and therefore be inappropriate to use with getUnchecked. Further,
1387   * it's not clear that converting the checked TimeoutException to a
1388   * RuntimeException -- especially to an UncheckedExecutionException, since it
1389   * wasn't thrown by the computation -- makes sense, and if we don't convert
1390   * it, the user still has to write a try-catch block.
1391   *
1392   * If you think you would use this method, let us know.
1393   */
1394
1395  private static <X extends Exception> X newWithCause(
1396      Class<X> exceptionClass, Throwable cause) {
1397    // getConstructors() guarantees this as long as we don't modify the array.
1398    @SuppressWarnings("unchecked")
1399    List<Constructor<X>> constructors =
1400        (List) Arrays.asList(exceptionClass.getConstructors());
1401    for (Constructor<X> constructor : preferringStrings(constructors)) {
1402      @Nullable X instance = newFromConstructor(constructor, cause);
1403      if (instance != null) {
1404        if (instance.getCause() == null) {
1405          instance.initCause(cause);
1406        }
1407        return instance;
1408      }
1409    }
1410    throw new IllegalArgumentException(
1411        "No appropriate constructor for exception of type " + exceptionClass
1412            + " in response to chained exception", cause);
1413  }
1414
1415  private static <X extends Exception> List<Constructor<X>>
1416      preferringStrings(List<Constructor<X>> constructors) {
1417    return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1418  }
1419
1420  private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1421      Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1422        @Override public Boolean apply(Constructor<?> input) {
1423          return asList(input.getParameterTypes()).contains(String.class);
1424        }
1425      }).reverse();
1426
1427  @Nullable private static <X> X newFromConstructor(
1428      Constructor<X> constructor, Throwable cause) {
1429    Class<?>[] paramTypes = constructor.getParameterTypes();
1430    Object[] params = new Object[paramTypes.length];
1431    for (int i = 0; i < paramTypes.length; i++) {
1432      Class<?> paramType = paramTypes[i];
1433      if (paramType.equals(String.class)) {
1434        params[i] = cause.toString();
1435      } else if (paramType.equals(Throwable.class)) {
1436        params[i] = cause;
1437      } else {
1438        return null;
1439      }
1440    }
1441    try {
1442      return constructor.newInstance(params);
1443    } catch (IllegalArgumentException e) {
1444      return null;
1445    } catch (InstantiationException e) {
1446      return null;
1447    } catch (IllegalAccessException e) {
1448      return null;
1449    } catch (InvocationTargetException e) {
1450      return null;
1451    }
1452  }
1453
1454  private interface FutureCombiner<V, C> {
1455    C combine(List<Optional<V>> values);
1456  }
1457
1458  private static class CombinedFuture<V, C> extends AbstractFuture<C> {
1459    private static final Logger logger =
1460        Logger.getLogger(CombinedFuture.class.getName());
1461
1462    ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
1463    final boolean allMustSucceed;
1464    final AtomicInteger remaining;
1465    FutureCombiner<V, C> combiner;
1466    List<Optional<V>> values;
1467
1468    CombinedFuture(
1469        ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
1470        boolean allMustSucceed, Executor listenerExecutor,
1471        FutureCombiner<V, C> combiner) {
1472      this.futures = futures;
1473      this.allMustSucceed = allMustSucceed;
1474      this.remaining = new AtomicInteger(futures.size());
1475      this.combiner = combiner;
1476      this.values = Lists.newArrayListWithCapacity(futures.size());
1477      init(listenerExecutor);
1478    }
1479
1480    /**
1481     * Must be called at the end of the constructor.
1482     */
1483    protected void init(final Executor listenerExecutor) {
1484      // First, schedule cleanup to execute when the Future is done.
1485      addListener(new Runnable() {
1486        @Override
1487        public void run() {
1488          // Cancel all the component futures.
1489          if (CombinedFuture.this.isCancelled()) {
1490            for (ListenableFuture<?> future : CombinedFuture.this.futures) {
1491              future.cancel(CombinedFuture.this.wasInterrupted());
1492            }
1493          }
1494
1495          // Let go of the memory held by other futures
1496          CombinedFuture.this.futures = null;
1497
1498          // By now the values array has either been set as the Future's value,
1499          // or (in case of failure) is no longer useful.
1500          CombinedFuture.this.values = null;
1501
1502          // The combiner may also hold state, so free that as well
1503          CombinedFuture.this.combiner = null;
1504        }
1505      }, MoreExecutors.sameThreadExecutor());
1506
1507      // Now begin the "real" initialization.
1508
1509      // Corner case: List is empty.
1510      if (futures.isEmpty()) {
1511        set(combiner.combine(ImmutableList.<Optional<V>>of()));
1512        return;
1513      }
1514
1515      // Populate the results list with null initially.
1516      for (int i = 0; i < futures.size(); ++i) {
1517        values.add(null);
1518      }
1519
1520      // Register a listener on each Future in the list to update
1521      // the state of this future.
1522      // Note that if all the futures on the list are done prior to completing
1523      // this loop, the last call to addListener() will callback to
1524      // setOneValue(), transitively call our cleanup listener, and set
1525      // this.futures to null.
1526      // This is not actually a problem, since the foreach only needs
1527      // this.futures to be non-null at the beginning of the loop.
1528      int i = 0;
1529      for (final ListenableFuture<? extends V> listenable : futures) {
1530        final int index = i++;
1531        listenable.addListener(new Runnable() {
1532          @Override
1533          public void run() {
1534            setOneValue(index, listenable);
1535          }
1536        }, listenerExecutor);
1537      }
1538    }
1539
1540    /**
1541     * Fails this future with the given Throwable if {@link #allMustSucceed} is
1542     * true. Also, logs the throwable if it is an {@link Error} or if
1543     * {@link #allMustSucceed} is {@code true} and the throwable did not cause
1544     * this future to fail.
1545     */
1546    private void setExceptionAndMaybeLog(Throwable throwable) {
1547      boolean result = false;
1548      if (allMustSucceed) {
1549        // As soon as the first one fails, throw the exception up.
1550        // The result of all other inputs is then ignored.
1551        result = super.setException(throwable);
1552      }
1553      if (throwable instanceof Error || (allMustSucceed && !result)) {
1554        logger.log(Level.SEVERE, "input future failed.", throwable);
1555      }
1556    }
1557
1558    /**
1559     * Sets the value at the given index to that of the given future.
1560     */
1561    private void setOneValue(int index, Future<? extends V> future) {
1562      List<Optional<V>> localValues = values;
1563      // TODO(user): This check appears to be redundant since values is
1564      // assigned null only after the future completes.  However, values
1565      // is not volatile so it may be possible for us to observe the changes
1566      // to these two values in a different order... which I think is why
1567      // we need to check both.  Clear up this craziness either by making
1568      // values volatile or proving that it doesn't need to be for some other
1569      // reason.
1570      if (isDone() || localValues == null) {
1571        // Some other future failed or has been cancelled, causing this one to
1572        // also be cancelled or have an exception set. This should only happen
1573        // if allMustSucceed is true or if the output itself has been
1574        // cancelled.
1575        checkState(allMustSucceed || isCancelled(),
1576            "Future was done before all dependencies completed");
1577      }
1578
1579      try {
1580        checkState(future.isDone(),
1581            "Tried to set value from future which is not done");
1582        V returnValue = getUninterruptibly(future);
1583        if (localValues != null) {
1584          localValues.set(index, Optional.fromNullable(returnValue));
1585        }
1586      } catch (CancellationException e) {
1587        if (allMustSucceed) {
1588          // Set ourselves as cancelled. Let the input futures keep running
1589          // as some of them may be used elsewhere.
1590          cancel(false);
1591        }
1592      } catch (ExecutionException e) {
1593        setExceptionAndMaybeLog(e.getCause());
1594      } catch (Throwable t) {
1595        setExceptionAndMaybeLog(t);
1596      } finally {
1597        int newRemaining = remaining.decrementAndGet();
1598        checkState(newRemaining >= 0, "Less than 0 remaining futures");
1599        if (newRemaining == 0) {
1600          FutureCombiner<V, C> localCombiner = combiner;
1601          if (localCombiner != null && localValues != null) {
1602            set(localCombiner.combine(localValues));
1603          } else {
1604            checkState(isDone());
1605          }
1606        }
1607      }
1608    }
1609
1610  }
1611
1612  /** Used for {@link #allAsList} and {@link #successfulAsList}. */
1613  private static <V> ListenableFuture<List<V>> listFuture(
1614      ImmutableList<ListenableFuture<? extends V>> futures,
1615      boolean allMustSucceed, Executor listenerExecutor) {
1616    return new CombinedFuture<V, List<V>>(
1617        futures, allMustSucceed, listenerExecutor,
1618        new FutureCombiner<V, List<V>>() {
1619          @Override
1620          public List<V> combine(List<Optional<V>> values) {
1621            List<V> result = Lists.newArrayList();
1622            for (Optional<V> element : values) {
1623              result.add(element != null ? element.orNull() : null);
1624            }
1625            return Collections.unmodifiableList(result);
1626          }
1627        });
1628  }
1629
1630  /**
1631   * A checked future that uses a function to map from exceptions to the
1632   * appropriate checked type.
1633   */
1634  private static class MappingCheckedFuture<V, X extends Exception> extends
1635      AbstractCheckedFuture<V, X> {
1636
1637    final Function<Exception, X> mapper;
1638
1639    MappingCheckedFuture(ListenableFuture<V> delegate,
1640        Function<Exception, X> mapper) {
1641      super(delegate);
1642
1643      this.mapper = checkNotNull(mapper);
1644    }
1645
1646    @Override
1647    protected X mapException(Exception e) {
1648      return mapper.apply(e);
1649    }
1650  }
1651}