001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024import static java.lang.Thread.currentThread;
025import static java.util.Arrays.asList;
026
027import com.google.common.annotations.Beta;
028import com.google.common.base.Function;
029import com.google.common.base.Optional;
030import com.google.common.base.Preconditions;
031import com.google.common.collect.ImmutableCollection;
032import com.google.common.collect.ImmutableList;
033import com.google.common.collect.Lists;
034import com.google.common.collect.Ordering;
035
036import java.lang.reflect.Constructor;
037import java.lang.reflect.InvocationTargetException;
038import java.lang.reflect.UndeclaredThrowableException;
039import java.util.Arrays;
040import java.util.List;
041import java.util.concurrent.CancellationException;
042import java.util.concurrent.CountDownLatch;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.Executor;
045import java.util.concurrent.Future;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.TimeoutException;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.logging.Level;
050import java.util.logging.Logger;
051
052import javax.annotation.Nullable;
053
054/**
055 * Static utility methods pertaining to the {@link Future} interface.
056 *
057 * <p>Many of these methods use the {@link ListenableFuture} API; consult the
058 * Guava User Guide article on <a href=
059 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
060 * {@code ListenableFuture}</a>.
061 *
062 * @author Kevin Bourrillion
063 * @author Nishant Thakkar
064 * @author Sven Mawson
065 * @since 1.0
066 */
067@Beta
068public final class Futures {
069  private Futures() {}
070
071  /**
072   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
073   * and a {@link Function} that maps from {@link Exception} instances into the
074   * appropriate checked type.
075   *
076   * <p>The given mapping function will be applied to an
077   * {@link InterruptedException}, a {@link CancellationException}, or an
078   * {@link ExecutionException}.
079   * See {@link Future#get()} for details on the exceptions thrown.
080   *
081   * @since 9.0 (source-compatible since 1.0)
082   */
083  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
084      ListenableFuture<V> future, Function<Exception, X> mapper) {
085    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
086  }
087
088  private abstract static class ImmediateFuture<V>
089      implements ListenableFuture<V> {
090
091    private static final Logger log =
092        Logger.getLogger(ImmediateFuture.class.getName());
093
094    @Override
095    public void addListener(Runnable listener, Executor executor) {
096      checkNotNull(listener, "Runnable was null.");
097      checkNotNull(executor, "Executor was null.");
098      try {
099        executor.execute(listener);
100      } catch (RuntimeException e) {
101        // ListenableFuture's contract is that it will not throw unchecked
102        // exceptions, so log the bad runnable and/or executor and swallow it.
103        log.log(Level.SEVERE, "RuntimeException while executing runnable "
104            + listener + " with executor " + executor, e);
105      }
106    }
107
108    @Override
109    public boolean cancel(boolean mayInterruptIfRunning) {
110      return false;
111    }
112
113    @Override
114    public abstract V get() throws ExecutionException;
115
116    @Override
117    public V get(long timeout, TimeUnit unit) throws ExecutionException {
118      checkNotNull(unit);
119      return get();
120    }
121
122    @Override
123    public boolean isCancelled() {
124      return false;
125    }
126
127    @Override
128    public boolean isDone() {
129      return true;
130    }
131  }
132
133  private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
134
135    @Nullable private final V value;
136
137    ImmediateSuccessfulFuture(@Nullable V value) {
138      this.value = value;
139    }
140
141    @Override
142    public V get() {
143      return value;
144    }
145  }
146
147  private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
148      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
149
150    @Nullable private final V value;
151
152    ImmediateSuccessfulCheckedFuture(@Nullable V value) {
153      this.value = value;
154    }
155
156    @Override
157    public V get() {
158      return value;
159    }
160
161    @Override
162    public V checkedGet() {
163      return value;
164    }
165
166    @Override
167    public V checkedGet(long timeout, TimeUnit unit) {
168      checkNotNull(unit);
169      return value;
170    }
171  }
172
173  private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
174
175    private final Throwable thrown;
176
177    ImmediateFailedFuture(Throwable thrown) {
178      this.thrown = thrown;
179    }
180
181    @Override
182    public V get() throws ExecutionException {
183      throw new ExecutionException(thrown);
184    }
185  }
186
187  private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
188
189    private final CancellationException thrown;
190
191    ImmediateCancelledFuture() {
192      this.thrown = new CancellationException("Immediate cancelled future.");
193    }
194
195    @Override
196    public boolean isCancelled() {
197      return true;
198    }
199
200    @Override
201    public V get() {
202      throw AbstractFuture.cancellationExceptionWithCause(
203          "Task was cancelled.", thrown);
204    }
205  }
206
207  private static class ImmediateFailedCheckedFuture<V, X extends Exception>
208      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
209
210    private final X thrown;
211
212    ImmediateFailedCheckedFuture(X thrown) {
213      this.thrown = thrown;
214    }
215
216    @Override
217    public V get() throws ExecutionException {
218      throw new ExecutionException(thrown);
219    }
220
221    @Override
222    public V checkedGet() throws X {
223      throw thrown;
224    }
225
226    @Override
227    public V checkedGet(long timeout, TimeUnit unit) throws X {
228      checkNotNull(unit);
229      throw thrown;
230    }
231  }
232
233  /**
234   * Creates a {@code ListenableFuture} which has its value set immediately upon
235   * construction. The getters just return the value. This {@code Future} can't
236   * be canceled or timed out and its {@code isDone()} method always returns
237   * {@code true}.
238   */
239  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
240    return new ImmediateSuccessfulFuture<V>(value);
241  }
242
243  /**
244   * Returns a {@code CheckedFuture} which has its value set immediately upon
245   * construction.
246   *
247   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
248   * method always returns {@code true}. Calling {@code get()} or {@code
249   * checkedGet()} will immediately return the provided value.
250   */
251  public static <V, X extends Exception> CheckedFuture<V, X>
252      immediateCheckedFuture(@Nullable V value) {
253    return new ImmediateSuccessfulCheckedFuture<V, X>(value);
254  }
255
256  /**
257   * Returns a {@code ListenableFuture} which has an exception set immediately
258   * upon construction.
259   *
260   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
261   * method always returns {@code true}. Calling {@code get()} will immediately
262   * throw the provided {@code Throwable} wrapped in an {@code
263   * ExecutionException}.
264   */
265  public static <V> ListenableFuture<V> immediateFailedFuture(
266      Throwable throwable) {
267    checkNotNull(throwable);
268    return new ImmediateFailedFuture<V>(throwable);
269  }
270
271  /**
272   * Creates a {@code ListenableFuture} which is cancelled immediately upon
273   * construction, so that {@code isCancelled()} always returns {@code true}.
274   *
275   * @since 14.0
276   */
277  public static <V> ListenableFuture<V> immediateCancelledFuture() {
278    return new ImmediateCancelledFuture<V>();
279  }
280
281  /**
282   * Returns a {@code CheckedFuture} which has an exception set immediately upon
283   * construction.
284   *
285   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
286   * method always returns {@code true}. Calling {@code get()} will immediately
287   * throw the provided {@code Exception} wrapped in an {@code
288   * ExecutionException}, and calling {@code checkedGet()} will throw the
289   * provided exception itself.
290   */
291  public static <V, X extends Exception> CheckedFuture<V, X>
292      immediateFailedCheckedFuture(X exception) {
293    checkNotNull(exception);
294    return new ImmediateFailedCheckedFuture<V, X>(exception);
295  }
296
297  /**
298   * Returns a {@code Future} whose result is taken from the given primary
299   * {@code input} or, if the primary input fails, from the {@code Future}
300   * provided by the {@code fallback}. {@link FutureFallback#create} is not
301   * invoked until the primary input has failed, so if the primary input
302   * succeeds, it is never invoked. If, during the invocation of {@code
303   * fallback}, an exception is thrown, this exception is used as the result of
304   * the output {@code Future}.
305   *
306   * <p>Below is an example of a fallback that returns a default value if an
307   * exception occurs:
308   *
309   * <pre>   {@code
310   *   ListenableFuture<Integer> fetchCounterFuture = ...;
311   *
312   *   // Falling back to a zero counter in case an exception happens when
313   *   // processing the RPC to fetch counters.
314   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
315   *       fetchCounterFuture, new FutureFallback<Integer>() {
316   *         public ListenableFuture<Integer> create(Throwable t) {
317   *           // Returning "0" as the default for the counter when the
318   *           // exception happens.
319   *           return immediateFuture(0);
320   *         }
321   *       });
322   * }</pre>
323   *
324   * The fallback can also choose to propagate the original exception when
325   * desired:
326   *
327   * <pre>   {@code
328   *   ListenableFuture<Integer> fetchCounterFuture = ...;
329   *
330   *   // Falling back to a zero counter only in case the exception was a
331   *   // TimeoutException.
332   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
333   *       fetchCounterFuture, new FutureFallback<Integer>() {
334   *         public ListenableFuture<Integer> create(Throwable t) {
335   *           if (t instanceof TimeoutException) {
336   *             return immediateFuture(0);
337   *           }
338   *           return immediateFailedFuture(t);
339   *         }
340   *       });
341   * }</pre>
342   *
343   * Note: If the derived {@code Future} is slow or heavyweight to create
344   * (whether the {@code Future} itself is slow or heavyweight to complete is
345   * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
346   * FutureFallback, Executor) supplying an executor}. If you do not supply an
347   * executor, {@code withFallback} will use {@link
348   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
349   * caveats for heavier operations. For example, the call to {@code
350   * fallback.create} may run on an unpredictable or undesirable thread:
351   *
352   * <ul>
353   * <li>If the input {@code Future} is done at the time {@code withFallback}
354   * is called, {@code withFallback} will call {@code fallback.create} inline.
355   * <li>If the input {@code Future} is not yet done, {@code withFallback} will
356   * schedule {@code fallback.create} to be run by the thread that completes
357   * the input {@code Future}, which may be an internal system thread such as
358   * an RPC network thread.
359   * </ul>
360   *
361   * Also note that, regardless of which thread executes {@code
362   * fallback.create}, all other registered but unexecuted listeners are
363   * prevented from running during its execution, even if those listeners are
364   * to run in other executors.
365   *
366   * @param input the primary input {@code Future}
367   * @param fallback the {@link FutureFallback} implementation to be called if
368   *     {@code input} fails
369   * @since 14.0
370   */
371  public static <V> ListenableFuture<V> withFallback(
372      ListenableFuture<? extends V> input,
373      FutureFallback<? extends V> fallback) {
374    return withFallback(input, fallback, sameThreadExecutor());
375  }
376
377  /**
378   * Returns a {@code Future} whose result is taken from the given primary
379   * {@code input} or, if the primary input fails, from the {@code Future}
380   * provided by the {@code fallback}. {@link FutureFallback#create} is not
381   * invoked until the primary input has failed, so if the primary input
382   * succeeds, it is never invoked. If, during the invocation of {@code
383   * fallback}, an exception is thrown, this exception is used as the result of
384   * the output {@code Future}.
385   *
386   * <p>Below is an example of a fallback that returns a default value if an
387   * exception occurs:
388   *
389   * <pre>   {@code
390   *   ListenableFuture<Integer> fetchCounterFuture = ...;
391   *
392   *   // Falling back to a zero counter in case an exception happens when
393   *   // processing the RPC to fetch counters.
394   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
395   *       fetchCounterFuture, new FutureFallback<Integer>() {
396   *         public ListenableFuture<Integer> create(Throwable t) {
397   *           // Returning "0" as the default for the counter when the
398   *           // exception happens.
399   *           return immediateFuture(0);
400   *         }
401   *       }, sameThreadExecutor());
402   * }</pre>
403   *
404   * The fallback can also choose to propagate the original exception when
405   * desired:
406   *
407   * <pre>   {@code
408   *   ListenableFuture<Integer> fetchCounterFuture = ...;
409   *
410   *   // Falling back to a zero counter only in case the exception was a
411   *   // TimeoutException.
412   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
413   *       fetchCounterFuture, new FutureFallback<Integer>() {
414   *         public ListenableFuture<Integer> create(Throwable t) {
415   *           if (t instanceof TimeoutException) {
416   *             return immediateFuture(0);
417   *           }
418   *           return immediateFailedFuture(t);
419   *         }
420   *       }, sameThreadExecutor());
421   * }</pre>
422   *
423   * When the execution of {@code fallback.create} is fast and lightweight
424   * (though the {@code Future} it returns need not meet these criteria),
425   * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
426   * omitting the executor} or explicitly specifying {@code
427   * sameThreadExecutor}. However, be aware of the caveats documented in the
428   * link above.
429   *
430   * @param input the primary input {@code Future}
431   * @param fallback the {@link FutureFallback} implementation to be called if
432   *     {@code input} fails
433   * @param executor the executor that runs {@code fallback} if {@code input}
434   *     fails
435   * @since 14.0
436   */
437  public static <V> ListenableFuture<V> withFallback(
438      ListenableFuture<? extends V> input,
439      FutureFallback<? extends V> fallback, Executor executor) {
440    checkNotNull(fallback);
441    return new FallbackFuture<V>(input, fallback, executor);
442  }
443
444  /**
445   * A future that falls back on a second, generated future, in case its
446   * original future fails.
447   */
448  private static class FallbackFuture<V> extends AbstractFuture<V> {
449
450    private volatile ListenableFuture<? extends V> running;
451
452    FallbackFuture(ListenableFuture<? extends V> input,
453        final FutureFallback<? extends V> fallback,
454        final Executor executor) {
455      running = input;
456      addCallback(running, new FutureCallback<V>() {
457        @Override
458        public void onSuccess(V value) {
459          set(value);
460        }
461
462        @Override
463        public void onFailure(Throwable t) {
464          if (isCancelled()) {
465            return;
466          }
467          try {
468            running = fallback.create(t);
469            if (isCancelled()) { // in case cancel called in the meantime
470              running.cancel(wasInterrupted());
471              return;
472            }
473            addCallback(running, new FutureCallback<V>() {
474              @Override
475              public void onSuccess(V value) {
476                set(value);
477              }
478
479              @Override
480              public void onFailure(Throwable t) {
481                if (running.isCancelled()) {
482                  cancel(false);
483                } else {
484                  setException(t);
485                }
486              }
487            }, sameThreadExecutor());
488          } catch (Exception e) {
489            setException(e);
490          } catch (Error e) {
491            setException(e); // note: rethrows
492          }
493        }
494      }, executor);
495    }
496
497    @Override
498    public boolean cancel(boolean mayInterruptIfRunning) {
499      if (super.cancel(mayInterruptIfRunning)) {
500        running.cancel(mayInterruptIfRunning);
501        return true;
502      }
503      return false;
504    }
505  }
506
507  /**
508   * Returns a new {@code ListenableFuture} whose result is asynchronously
509   * derived from the result of the given {@code Future}. More precisely, the
510   * returned {@code Future} takes its result from a {@code Future} produced by
511   * applying the given {@code AsyncFunction} to the result of the original
512   * {@code Future}. Example:
513   *
514   * <pre>   {@code
515   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
516   *   AsyncFunction<RowKey, QueryResult> queryFunction =
517   *       new AsyncFunction<RowKey, QueryResult>() {
518   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
519   *           return dataService.read(rowKey);
520   *         }
521   *       };
522   *   ListenableFuture<QueryResult> queryFuture =
523   *       transform(rowKeyFuture, queryFunction);
524   * }</pre>
525   *
526   * Note: If the derived {@code Future} is slow or heavyweight to create
527   * (whether the {@code Future} itself is slow or heavyweight to complete is
528   * irrelevant), consider {@linkplain #transform(ListenableFuture,
529   * AsyncFunction, Executor) supplying an executor}. If you do not supply an
530   * executor, {@code transform} will use {@link
531   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
532   * caveats for heavier operations. For example, the call to {@code
533   * function.apply} may run on an unpredictable or undesirable thread:
534   *
535   * <ul>
536   * <li>If the input {@code Future} is done at the time {@code transform} is
537   * called, {@code transform} will call {@code function.apply} inline.
538   * <li>If the input {@code Future} is not yet done, {@code transform} will
539   * schedule {@code function.apply} to be run by the thread that completes the
540   * input {@code Future}, which may be an internal system thread such as an
541   * RPC network thread.
542   * </ul>
543   *
544   * Also note that, regardless of which thread executes {@code
545   * function.apply}, all other registered but unexecuted listeners are
546   * prevented from running during its execution, even if those listeners are
547   * to run in other executors.
548   *
549   * <p>The returned {@code Future} attempts to keep its cancellation state in
550   * sync with that of the input future and that of the future returned by the
551   * function. That is, if the returned {@code Future} is cancelled, it will
552   * attempt to cancel the other two, and if either of the other two is
553   * cancelled, the returned {@code Future} will receive a callback in which it
554   * will attempt to cancel itself.
555   *
556   * @param input The future to transform
557   * @param function A function to transform the result of the input future
558   *     to the result of the output future
559   * @return A future that holds result of the function (if the input succeeded)
560   *     or the original input's failure (if not)
561   * @since 11.0
562   */
563  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
564      AsyncFunction<? super I, ? extends O> function) {
565    return transform(input, function, MoreExecutors.sameThreadExecutor());
566  }
567
568  /**
569   * Returns a new {@code ListenableFuture} whose result is asynchronously
570   * derived from the result of the given {@code Future}. More precisely, the
571   * returned {@code Future} takes its result from a {@code Future} produced by
572   * applying the given {@code AsyncFunction} to the result of the original
573   * {@code Future}. Example:
574   *
575   * <pre>   {@code
576   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
577   *   AsyncFunction<RowKey, QueryResult> queryFunction =
578   *       new AsyncFunction<RowKey, QueryResult>() {
579   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
580   *           return dataService.read(rowKey);
581   *         }
582   *       };
583   *   ListenableFuture<QueryResult> queryFuture =
584   *       transform(rowKeyFuture, queryFunction, executor);
585   * }</pre>
586   *
587   * <p>The returned {@code Future} attempts to keep its cancellation state in
588   * sync with that of the input future and that of the future returned by the
589   * chain function. That is, if the returned {@code Future} is cancelled, it
590   * will attempt to cancel the other two, and if either of the other two is
591   * cancelled, the returned {@code Future} will receive a callback in which it
592   * will attempt to cancel itself.
593   *
594   * <p>When the execution of {@code function.apply} is fast and lightweight
595   * (though the {@code Future} it returns need not meet these criteria),
596   * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
597   * the executor} or explicitly specifying {@code sameThreadExecutor}.
598   * However, be aware of the caveats documented in the link above.
599   *
600   * @param input The future to transform
601   * @param function A function to transform the result of the input future
602   *     to the result of the output future
603   * @param executor Executor to run the function in.
604   * @return A future that holds result of the function (if the input succeeded)
605   *     or the original input's failure (if not)
606   * @since 11.0
607   */
608  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
609      AsyncFunction<? super I, ? extends O> function,
610      Executor executor) {
611    ChainingListenableFuture<I, O> output =
612        new ChainingListenableFuture<I, O>(function, input);
613    input.addListener(output, executor);
614    return output;
615  }
616
617  /**
618   * Returns a new {@code ListenableFuture} whose result is the product of
619   * applying the given {@code Function} to the result of the given {@code
620   * Future}. Example:
621   *
622   * <pre>   {@code
623   *   ListenableFuture<QueryResult> queryFuture = ...;
624   *   Function<QueryResult, List<Row>> rowsFunction =
625   *       new Function<QueryResult, List<Row>>() {
626   *         public List<Row> apply(QueryResult queryResult) {
627   *           return queryResult.getRows();
628   *         }
629   *       };
630   *   ListenableFuture<List<Row>> rowsFuture =
631   *       transform(queryFuture, rowsFunction);
632   * }</pre>
633   *
634   * Note: If the transformation is slow or heavyweight, consider {@linkplain
635   * #transform(ListenableFuture, Function, Executor) supplying an executor}.
636   * If you do not supply an executor, {@code transform} will use {@link
637   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
638   * caveats for heavier operations.  For example, the call to {@code
639   * function.apply} may run on an unpredictable or undesirable thread:
640   *
641   * <ul>
642   * <li>If the input {@code Future} is done at the time {@code transform} is
643   * called, {@code transform} will call {@code function.apply} inline.
644   * <li>If the input {@code Future} is not yet done, {@code transform} will
645   * schedule {@code function.apply} to be run by the thread that completes the
646   * input {@code Future}, which may be an internal system thread such as an
647   * RPC network thread.
648   * </ul>
649   *
650   * Also note that, regardless of which thread executes {@code
651   * function.apply}, all other registered but unexecuted listeners are
652   * prevented from running during its execution, even if those listeners are
653   * to run in other executors.
654   *
655   * <p>The returned {@code Future} attempts to keep its cancellation state in
656   * sync with that of the input future. That is, if the returned {@code Future}
657   * is cancelled, it will attempt to cancel the input, and if the input is
658   * cancelled, the returned {@code Future} will receive a callback in which it
659   * will attempt to cancel itself.
660   *
661   * <p>An example use of this method is to convert a serializable object
662   * returned from an RPC into a POJO.
663   *
664   * @param input The future to transform
665   * @param function A Function to transform the results of the provided future
666   *     to the results of the returned future.  This will be run in the thread
667   *     that notifies input it is complete.
668   * @return A future that holds result of the transformation.
669   * @since 9.0 (in 1.0 as {@code compose})
670   */
671  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
672      final Function<? super I, ? extends O> function) {
673    return transform(input, function, MoreExecutors.sameThreadExecutor());
674  }
675
676  /**
677   * Returns a new {@code ListenableFuture} whose result is the product of
678   * applying the given {@code Function} to the result of the given {@code
679   * Future}. Example:
680   *
681   * <pre>   {@code
682   *   ListenableFuture<QueryResult> queryFuture = ...;
683   *   Function<QueryResult, List<Row>> rowsFunction =
684   *       new Function<QueryResult, List<Row>>() {
685   *         public List<Row> apply(QueryResult queryResult) {
686   *           return queryResult.getRows();
687   *         }
688   *       };
689   *   ListenableFuture<List<Row>> rowsFuture =
690   *       transform(queryFuture, rowsFunction, executor);
691   * }</pre>
692   *
693   * <p>The returned {@code Future} attempts to keep its cancellation state in
694   * sync with that of the input future. That is, if the returned {@code Future}
695   * is cancelled, it will attempt to cancel the input, and if the input is
696   * cancelled, the returned {@code Future} will receive a callback in which it
697   * will attempt to cancel itself.
698   *
699   * <p>An example use of this method is to convert a serializable object
700   * returned from an RPC into a POJO.
701   *
702   * <p>When the transformation is fast and lightweight, consider {@linkplain
703   * #transform(ListenableFuture, Function) omitting the executor} or
704   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
705   * caveats documented in the link above.
706   *
707   * @param input The future to transform
708   * @param function A Function to transform the results of the provided future
709   *     to the results of the returned future.
710   * @param executor Executor to run the function in.
711   * @return A future that holds result of the transformation.
712   * @since 9.0 (in 2.0 as {@code compose})
713   */
714  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
715      final Function<? super I, ? extends O> function, Executor executor) {
716    checkNotNull(function);
717    AsyncFunction<I, O> wrapperFunction
718        = new AsyncFunction<I, O>() {
719            @Override public ListenableFuture<O> apply(I input) {
720              O output = function.apply(input);
721              return immediateFuture(output);
722            }
723        };
724    return transform(input, wrapperFunction, executor);
725  }
726
727  /**
728   * Like {@link #transform(ListenableFuture, Function)} except that the
729   * transformation {@code function} is invoked on each call to
730   * {@link Future#get() get()} on the returned future.
731   *
732   * <p>The returned {@code Future} reflects the input's cancellation
733   * state directly, and any attempt to cancel the returned Future is likewise
734   * passed through to the input Future.
735   *
736   * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
737   * only apply the timeout to the execution of the underlying {@code Future},
738   * <em>not</em> to the execution of the transformation function.
739   *
740   * <p>The primary audience of this method is callers of {@code transform}
741   * who don't have a {@code ListenableFuture} available and
742   * do not mind repeated, lazy function evaluation.
743   *
744   * @param input The future to transform
745   * @param function A Function to transform the results of the provided future
746   *     to the results of the returned future.
747   * @return A future that returns the result of the transformation.
748   * @since 10.0
749   */
750  @Beta
751  public static <I, O> Future<O> lazyTransform(final Future<I> input,
752      final Function<? super I, ? extends O> function) {
753    checkNotNull(input);
754    checkNotNull(function);
755    return new Future<O>() {
756
757      @Override
758      public boolean cancel(boolean mayInterruptIfRunning) {
759        return input.cancel(mayInterruptIfRunning);
760      }
761
762      @Override
763      public boolean isCancelled() {
764        return input.isCancelled();
765      }
766
767      @Override
768      public boolean isDone() {
769        return input.isDone();
770      }
771
772      @Override
773      public O get() throws InterruptedException, ExecutionException {
774        return applyTransformation(input.get());
775      }
776
777      @Override
778      public O get(long timeout, TimeUnit unit)
779          throws InterruptedException, ExecutionException, TimeoutException {
780        return applyTransformation(input.get(timeout, unit));
781      }
782
783      private O applyTransformation(I input) throws ExecutionException {
784        try {
785          return function.apply(input);
786        } catch (Throwable t) {
787          throw new ExecutionException(t);
788        }
789      }
790    };
791  }
792
793  /**
794   * An implementation of {@code ListenableFuture} that also implements
795   * {@code Runnable} so that it can be used to nest ListenableFutures.
796   * Once the passed-in {@code ListenableFuture} is complete, it calls the
797   * passed-in {@code Function} to generate the result.
798   *
799   * <p>If the function throws any checked exceptions, they should be wrapped
800   * in a {@code UndeclaredThrowableException} so that this class can get
801   * access to the cause.
802   */
803  private static class ChainingListenableFuture<I, O>
804      extends AbstractFuture<O> implements Runnable {
805
806    private AsyncFunction<? super I, ? extends O> function;
807    private ListenableFuture<? extends I> inputFuture;
808    private volatile ListenableFuture<? extends O> outputFuture;
809    private final CountDownLatch outputCreated = new CountDownLatch(1);
810
811    private ChainingListenableFuture(
812        AsyncFunction<? super I, ? extends O> function,
813        ListenableFuture<? extends I> inputFuture) {
814      this.function = checkNotNull(function);
815      this.inputFuture = checkNotNull(inputFuture);
816    }
817
818    @Override
819    public boolean cancel(boolean mayInterruptIfRunning) {
820      /*
821       * Our additional cancellation work needs to occur even if
822       * !mayInterruptIfRunning, so we can't move it into interruptTask().
823       */
824      if (super.cancel(mayInterruptIfRunning)) {
825        // This should never block since only one thread is allowed to cancel
826        // this Future.
827        cancel(inputFuture, mayInterruptIfRunning);
828        cancel(outputFuture, mayInterruptIfRunning);
829        return true;
830      }
831      return false;
832    }
833
834    private void cancel(@Nullable Future<?> future,
835        boolean mayInterruptIfRunning) {
836      if (future != null) {
837        future.cancel(mayInterruptIfRunning);
838      }
839    }
840
841    @Override
842    public void run() {
843      try {
844        I sourceResult;
845        try {
846          sourceResult = getUninterruptibly(inputFuture);
847        } catch (CancellationException e) {
848          // Cancel this future and return.
849          // At this point, inputFuture is cancelled and outputFuture doesn't
850          // exist, so the value of mayInterruptIfRunning is irrelevant.
851          cancel(false);
852          return;
853        } catch (ExecutionException e) {
854          // Set the cause of the exception as this future's exception
855          setException(e.getCause());
856          return;
857        }
858
859        final ListenableFuture<? extends O> outputFuture = this.outputFuture =
860            function.apply(sourceResult);
861        if (isCancelled()) {
862          outputFuture.cancel(wasInterrupted());
863          this.outputFuture = null;
864          return;
865        }
866        outputFuture.addListener(new Runnable() {
867            @Override
868            public void run() {
869              try {
870                // Here it would have been nice to have had an
871                // UninterruptibleListenableFuture, but we don't want to start a
872                // combinatorial explosion of interfaces, so we have to make do.
873                set(getUninterruptibly(outputFuture));
874              } catch (CancellationException e) {
875                // Cancel this future and return.
876                // At this point, inputFuture and outputFuture are done, so the
877                // value of mayInterruptIfRunning is irrelevant.
878                cancel(false);
879                return;
880              } catch (ExecutionException e) {
881                // Set the cause of the exception as this future's exception
882                setException(e.getCause());
883              } finally {
884                // Don't pin inputs beyond completion
885                ChainingListenableFuture.this.outputFuture = null;
886              }
887            }
888          }, MoreExecutors.sameThreadExecutor());
889      } catch (UndeclaredThrowableException e) {
890        // Set the cause of the exception as this future's exception
891        setException(e.getCause());
892      } catch (Exception e) {
893        // This exception is irrelevant in this thread, but useful for the
894        // client
895        setException(e);
896      } catch (Error e) {
897        // Propagate errors up ASAP - our superclass will rethrow the error
898        setException(e);
899      } finally {
900        // Don't pin inputs beyond completion
901        function = null;
902        inputFuture = null;
903        // Allow our get routines to examine outputFuture now.
904        outputCreated.countDown();
905      }
906    }
907  }
908
909  /**
910   * Returns a new {@code ListenableFuture} whose result is the product of
911   * calling {@code get()} on the {@code Future} nested within the given {@code
912   * Future}, effectively chaining the futures one after the other.  Example:
913   *
914   * <pre>   {@code
915   *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
916   *   ListenableFuture<String> dereferenced = dereference(nested);
917   * }</pre>
918   *
919   * <p>This call has the same cancellation and execution semantics as {@link
920   * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
921   * Future} attempts to keep its cancellation state in sync with both the
922   * input {@code Future} and the nested {@code Future}.  The transformation
923   * is very lightweight and therefore takes place in the thread that called
924   * {@code dereference}.
925   *
926   * @param nested The nested future to transform.
927   * @return A future that holds result of the inner future.
928   * @since 13.0
929   */
930  @Beta
931  @SuppressWarnings({"rawtypes", "unchecked"})
932  public static <V> ListenableFuture<V> dereference(
933      ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
934    return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
935  }
936
937  /**
938   * Helper {@code Function} for {@link #dereference}.
939   */
940  private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
941      new AsyncFunction<ListenableFuture<Object>, Object>() {
942        @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
943          return input;
944        }
945      };
946
947  /**
948   * Creates a new {@code ListenableFuture} whose value is a list containing the
949   * values of all its input futures, if all succeed. If any input fails, the
950   * returned future fails.
951   *
952   * <p>The list of results is in the same order as the input list.
953   *
954   * <p>Canceling this future will attempt to cancel all the component futures,
955   * and if any of the provided futures fails or is canceled, this one is,
956   * too.
957   *
958   * @param futures futures to combine
959   * @return a future that provides a list of the results of the component
960   *         futures
961   * @since 10.0
962   */
963  @Beta
964  public static <V> ListenableFuture<List<V>> allAsList(
965      ListenableFuture<? extends V>... futures) {
966    return listFuture(ImmutableList.copyOf(futures), true,
967        MoreExecutors.sameThreadExecutor());
968  }
969
970  /**
971   * Creates a new {@code ListenableFuture} whose value is a list containing the
972   * values of all its input futures, if all succeed. If any input fails, the
973   * returned future fails.
974   *
975   * <p>The list of results is in the same order as the input list.
976   *
977   * <p>Canceling this future will attempt to cancel all the component futures,
978   * and if any of the provided futures fails or is canceled, this one is,
979   * too.
980   *
981   * @param futures futures to combine
982   * @return a future that provides a list of the results of the component
983   *         futures
984   * @since 10.0
985   */
986  @Beta
987  public static <V> ListenableFuture<List<V>> allAsList(
988      Iterable<? extends ListenableFuture<? extends V>> futures) {
989    return listFuture(ImmutableList.copyOf(futures), true,
990        MoreExecutors.sameThreadExecutor());
991  }
992
993  /**
994   * Creates a new {@code ListenableFuture} whose value is a list containing the
995   * values of all its successful input futures. The list of results is in the
996   * same order as the input list, and if any of the provided futures fails or
997   * is canceled, its corresponding position will contain {@code null} (which is
998   * indistinguishable from the future having a successful value of
999   * {@code null}).
1000   *
1001   * <p>Canceling this future will attempt to cancel all the component futures.
1002   *
1003   * @param futures futures to combine
1004   * @return a future that provides a list of the results of the component
1005   *         futures
1006   * @since 10.0
1007   */
1008  @Beta
1009  public static <V> ListenableFuture<List<V>> successfulAsList(
1010      ListenableFuture<? extends V>... futures) {
1011    return listFuture(ImmutableList.copyOf(futures), false,
1012        MoreExecutors.sameThreadExecutor());
1013  }
1014
1015  /**
1016   * Creates a new {@code ListenableFuture} whose value is a list containing the
1017   * values of all its successful input futures. The list of results is in the
1018   * same order as the input list, and if any of the provided futures fails or
1019   * is canceled, its corresponding position will contain {@code null} (which is
1020   * indistinguishable from the future having a successful value of
1021   * {@code null}).
1022   *
1023   * <p>Canceling this future will attempt to cancel all the component futures.
1024   *
1025   * @param futures futures to combine
1026   * @return a future that provides a list of the results of the component
1027   *         futures
1028   * @since 10.0
1029   */
1030  @Beta
1031  public static <V> ListenableFuture<List<V>> successfulAsList(
1032      Iterable<? extends ListenableFuture<? extends V>> futures) {
1033    return listFuture(ImmutableList.copyOf(futures), false,
1034        MoreExecutors.sameThreadExecutor());
1035  }
1036
1037  /**
1038   * Registers separate success and failure callbacks to be run when the {@code
1039   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1040   * complete} or, if the computation is already complete, immediately.
1041   *
1042   * <p>There is no guaranteed ordering of execution of callbacks, but any
1043   * callback added through this method is guaranteed to be called once the
1044   * computation is complete.
1045   *
1046   * Example: <pre> {@code
1047   * ListenableFuture<QueryResult> future = ...;
1048   * addCallback(future,
1049   *     new FutureCallback<QueryResult> {
1050   *       public void onSuccess(QueryResult result) {
1051   *         storeInCache(result);
1052   *       }
1053   *       public void onFailure(Throwable t) {
1054   *         reportError(t);
1055   *       }
1056   *     });}</pre>
1057   *
1058   * Note: If the callback is slow or heavyweight, consider {@linkplain
1059   * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
1060   * executor}. If you do not supply an executor, {@code addCallback} will use
1061   * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries
1062   * some caveats for heavier operations. For example, the callback may run on
1063   * an unpredictable or undesirable thread:
1064   *
1065   * <ul>
1066   * <li>If the input {@code Future} is done at the time {@code addCallback} is
1067   * called, {@code addCallback} will execute the callback inline.
1068   * <li>If the input {@code Future} is not yet done, {@code addCallback} will
1069   * schedule the callback to be run by the thread that completes the input
1070   * {@code Future}, which may be an internal system thread such as an RPC
1071   * network thread.
1072   * </ul>
1073   *
1074   * Also note that, regardless of which thread executes the callback, all
1075   * other registered but unexecuted listeners are prevented from running
1076   * during its execution, even if those listeners are to run in other
1077   * executors.
1078   *
1079   * <p>For a more general interface to attach a completion listener to a
1080   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1081   *
1082   * @param future The future attach the callback to.
1083   * @param callback The callback to invoke when {@code future} is completed.
1084   * @since 10.0
1085   */
1086  public static <V> void addCallback(ListenableFuture<V> future,
1087      FutureCallback<? super V> callback) {
1088    addCallback(future, callback, MoreExecutors.sameThreadExecutor());
1089  }
1090
1091  /**
1092   * Registers separate success and failure callbacks to be run when the {@code
1093   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1094   * complete} or, if the computation is already complete, immediately.
1095   *
1096   * <p>The callback is run in {@code executor}.
1097   * There is no guaranteed ordering of execution of callbacks, but any
1098   * callback added through this method is guaranteed to be called once the
1099   * computation is complete.
1100   *
1101   * Example: <pre> {@code
1102   * ListenableFuture<QueryResult> future = ...;
1103   * Executor e = ...
1104   * addCallback(future, e,
1105   *     new FutureCallback<QueryResult> {
1106   *       public void onSuccess(QueryResult result) {
1107   *         storeInCache(result);
1108   *       }
1109   *       public void onFailure(Throwable t) {
1110   *         reportError(t);
1111   *       }
1112   *     });}</pre>
1113   *
1114   * When the callback is fast and lightweight, consider {@linkplain
1115   * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
1116   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
1117   * caveats documented in the link above.
1118   *
1119   * <p>For a more general interface to attach a completion listener to a
1120   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1121   *
1122   * @param future The future attach the callback to.
1123   * @param callback The callback to invoke when {@code future} is completed.
1124   * @param executor The executor to run {@code callback} when the future
1125   *    completes.
1126   * @since 10.0
1127   */
1128  public static <V> void addCallback(final ListenableFuture<V> future,
1129      final FutureCallback<? super V> callback, Executor executor) {
1130    Preconditions.checkNotNull(callback);
1131    Runnable callbackListener = new Runnable() {
1132      @Override
1133      public void run() {
1134        final V value;
1135        try {
1136          // TODO(user): (Before Guava release), validate that this
1137          // is the thing for IE.
1138          value = getUninterruptibly(future);
1139        } catch (ExecutionException e) {
1140          callback.onFailure(e.getCause());
1141          return;
1142        } catch (RuntimeException e) {
1143          callback.onFailure(e);
1144          return;
1145        } catch (Error e) {
1146          callback.onFailure(e);
1147          return;
1148        }
1149        callback.onSuccess(value);
1150      }
1151    };
1152    future.addListener(callbackListener, executor);
1153  }
1154
1155  /**
1156   * Returns the result of {@link Future#get()}, converting most exceptions to a
1157   * new instance of the given checked exception type. This reduces boilerplate
1158   * for a common use of {@code Future} in which it is unnecessary to
1159   * programmatically distinguish between exception types or to extract other
1160   * information from the exception instance.
1161   *
1162   * <p>Exceptions from {@code Future.get} are treated as follows:
1163   * <ul>
1164   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1165   *     {@code X} if the cause is a checked exception, an {@link
1166   *     UncheckedExecutionException} if the cause is a {@code
1167   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1168   *     {@code Error}.
1169   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1170   *     restoring the interrupt).
1171   * <li>Any {@link CancellationException} is propagated untouched, as is any
1172   *     other {@link RuntimeException} (though {@code get} implementations are
1173   *     discouraged from throwing such exceptions).
1174   * </ul>
1175   *
1176   * The overall principle is to continue to treat every checked exception as a
1177   * checked exception, every unchecked exception as an unchecked exception, and
1178   * every error as an error. In addition, the cause of any {@code
1179   * ExecutionException} is wrapped in order to ensure that the new stack trace
1180   * matches that of the current thread.
1181   *
1182   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1183   * public constructor that accepts zero or more arguments, all of type {@code
1184   * String} or {@code Throwable} (preferring constructors with at least one
1185   * {@code String}) and calling the constructor via reflection. If the
1186   * exception did not already have a cause, one is set by calling {@link
1187   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1188   * {@code IllegalArgumentException} is thrown.
1189   *
1190   * @throws X if {@code get} throws any checked exception except for an {@code
1191   *         ExecutionException} whose cause is not itself a checked exception
1192   * @throws UncheckedExecutionException if {@code get} throws an {@code
1193   *         ExecutionException} with a {@code RuntimeException} as its cause
1194   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1195   *         with an {@code Error} as its cause
1196   * @throws CancellationException if {@code get} throws a {@code
1197   *         CancellationException}
1198   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1199   *         RuntimeException} or does not have a suitable constructor
1200   * @since 10.0
1201   */
1202  @Beta
1203  public static <V, X extends Exception> V get(
1204      Future<V> future, Class<X> exceptionClass) throws X {
1205    checkNotNull(future);
1206    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1207        "Futures.get exception type (%s) must not be a RuntimeException",
1208        exceptionClass);
1209    try {
1210      return future.get();
1211    } catch (InterruptedException e) {
1212      currentThread().interrupt();
1213      throw newWithCause(exceptionClass, e);
1214    } catch (ExecutionException e) {
1215      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1216      throw new AssertionError();
1217    }
1218  }
1219
1220  /**
1221   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1222   * exceptions to a new instance of the given checked exception type. This
1223   * reduces boilerplate for a common use of {@code Future} in which it is
1224   * unnecessary to programmatically distinguish between exception types or to
1225   * extract other information from the exception instance.
1226   *
1227   * <p>Exceptions from {@code Future.get} are treated as follows:
1228   * <ul>
1229   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1230   *     {@code X} if the cause is a checked exception, an {@link
1231   *     UncheckedExecutionException} if the cause is a {@code
1232   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1233   *     {@code Error}.
1234   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1235   *     restoring the interrupt).
1236   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1237   * <li>Any {@link CancellationException} is propagated untouched, as is any
1238   *     other {@link RuntimeException} (though {@code get} implementations are
1239   *     discouraged from throwing such exceptions).
1240   * </ul>
1241   *
1242   * The overall principle is to continue to treat every checked exception as a
1243   * checked exception, every unchecked exception as an unchecked exception, and
1244   * every error as an error. In addition, the cause of any {@code
1245   * ExecutionException} is wrapped in order to ensure that the new stack trace
1246   * matches that of the current thread.
1247   *
1248   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1249   * public constructor that accepts zero or more arguments, all of type {@code
1250   * String} or {@code Throwable} (preferring constructors with at least one
1251   * {@code String}) and calling the constructor via reflection. If the
1252   * exception did not already have a cause, one is set by calling {@link
1253   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1254   * {@code IllegalArgumentException} is thrown.
1255   *
1256   * @throws X if {@code get} throws any checked exception except for an {@code
1257   *         ExecutionException} whose cause is not itself a checked exception
1258   * @throws UncheckedExecutionException if {@code get} throws an {@code
1259   *         ExecutionException} with a {@code RuntimeException} as its cause
1260   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1261   *         with an {@code Error} as its cause
1262   * @throws CancellationException if {@code get} throws a {@code
1263   *         CancellationException}
1264   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1265   *         RuntimeException} or does not have a suitable constructor
1266   * @since 10.0
1267   */
1268  @Beta
1269  public static <V, X extends Exception> V get(
1270      Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1271      throws X {
1272    checkNotNull(future);
1273    checkNotNull(unit);
1274    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1275        "Futures.get exception type (%s) must not be a RuntimeException",
1276        exceptionClass);
1277    try {
1278      return future.get(timeout, unit);
1279    } catch (InterruptedException e) {
1280      currentThread().interrupt();
1281      throw newWithCause(exceptionClass, e);
1282    } catch (TimeoutException e) {
1283      throw newWithCause(exceptionClass, e);
1284    } catch (ExecutionException e) {
1285      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1286      throw new AssertionError();
1287    }
1288  }
1289
1290  private static <X extends Exception> void wrapAndThrowExceptionOrError(
1291      Throwable cause, Class<X> exceptionClass) throws X {
1292    if (cause instanceof Error) {
1293      throw new ExecutionError((Error) cause);
1294    }
1295    if (cause instanceof RuntimeException) {
1296      throw new UncheckedExecutionException(cause);
1297    }
1298    throw newWithCause(exceptionClass, cause);
1299  }
1300
1301  /**
1302   * Returns the result of calling {@link Future#get()} uninterruptibly on a
1303   * task known not to throw a checked exception. This makes {@code Future} more
1304   * suitable for lightweight, fast-running tasks that, barring bugs in the
1305   * code, will not fail. This gives it exception-handling behavior similar to
1306   * that of {@code ForkJoinTask.join}.
1307   *
1308   * <p>Exceptions from {@code Future.get} are treated as follows:
1309   * <ul>
1310   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1311   *     {@link UncheckedExecutionException} (if the cause is an {@code
1312   *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1313   *     Error}).
1314   * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1315   *     call. The interrupt is restored before {@code getUnchecked} returns.
1316   * <li>Any {@link CancellationException} is propagated untouched. So is any
1317   *     other {@link RuntimeException} ({@code get} implementations are
1318   *     discouraged from throwing such exceptions).
1319   * </ul>
1320   *
1321   * The overall principle is to eliminate all checked exceptions: to loop to
1322   * avoid {@code InterruptedException}, to pass through {@code
1323   * CancellationException}, and to wrap any exception from the underlying
1324   * computation in an {@code UncheckedExecutionException} or {@code
1325   * ExecutionError}.
1326   *
1327   * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1328   * {@link Uninterruptibles#getUninterruptibly(Future)}.
1329   *
1330   * @throws UncheckedExecutionException if {@code get} throws an {@code
1331   *         ExecutionException} with an {@code Exception} as its cause
1332   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1333   *         with an {@code Error} as its cause
1334   * @throws CancellationException if {@code get} throws a {@code
1335   *         CancellationException}
1336   * @since 10.0
1337   */
1338  @Beta
1339  public static <V> V getUnchecked(Future<V> future) {
1340    checkNotNull(future);
1341    try {
1342      return getUninterruptibly(future);
1343    } catch (ExecutionException e) {
1344      wrapAndThrowUnchecked(e.getCause());
1345      throw new AssertionError();
1346    }
1347  }
1348
1349  private static void wrapAndThrowUnchecked(Throwable cause) {
1350    if (cause instanceof Error) {
1351      throw new ExecutionError((Error) cause);
1352    }
1353    /*
1354     * It's a non-Error, non-Exception Throwable. From my survey of such
1355     * classes, I believe that most users intended to extend Exception, so we'll
1356     * treat it like an Exception.
1357     */
1358    throw new UncheckedExecutionException(cause);
1359  }
1360
1361  /*
1362   * TODO(user): FutureChecker interface for these to be static methods on? If
1363   * so, refer to it in the (static-method) Futures.get documentation
1364   */
1365
1366  /*
1367   * Arguably we don't need a timed getUnchecked because any operation slow
1368   * enough to require a timeout is heavyweight enough to throw a checked
1369   * exception and therefore be inappropriate to use with getUnchecked. Further,
1370   * it's not clear that converting the checked TimeoutException to a
1371   * RuntimeException -- especially to an UncheckedExecutionException, since it
1372   * wasn't thrown by the computation -- makes sense, and if we don't convert
1373   * it, the user still has to write a try-catch block.
1374   *
1375   * If you think you would use this method, let us know.
1376   */
1377
1378  private static <X extends Exception> X newWithCause(
1379      Class<X> exceptionClass, Throwable cause) {
1380    // getConstructors() guarantees this as long as we don't modify the array.
1381    @SuppressWarnings("unchecked")
1382    List<Constructor<X>> constructors =
1383        (List) Arrays.asList(exceptionClass.getConstructors());
1384    for (Constructor<X> constructor : preferringStrings(constructors)) {
1385      @Nullable X instance = newFromConstructor(constructor, cause);
1386      if (instance != null) {
1387        if (instance.getCause() == null) {
1388          instance.initCause(cause);
1389        }
1390        return instance;
1391      }
1392    }
1393    throw new IllegalArgumentException(
1394        "No appropriate constructor for exception of type " + exceptionClass
1395            + " in response to chained exception", cause);
1396  }
1397
1398  private static <X extends Exception> List<Constructor<X>>
1399      preferringStrings(List<Constructor<X>> constructors) {
1400    return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1401  }
1402
1403  private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1404      Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1405        @Override public Boolean apply(Constructor<?> input) {
1406          return asList(input.getParameterTypes()).contains(String.class);
1407        }
1408      }).reverse();
1409
1410  @Nullable private static <X> X newFromConstructor(
1411      Constructor<X> constructor, Throwable cause) {
1412    Class<?>[] paramTypes = constructor.getParameterTypes();
1413    Object[] params = new Object[paramTypes.length];
1414    for (int i = 0; i < paramTypes.length; i++) {
1415      Class<?> paramType = paramTypes[i];
1416      if (paramType.equals(String.class)) {
1417        params[i] = cause.toString();
1418      } else if (paramType.equals(Throwable.class)) {
1419        params[i] = cause;
1420      } else {
1421        return null;
1422      }
1423    }
1424    try {
1425      return constructor.newInstance(params);
1426    } catch (IllegalArgumentException e) {
1427      return null;
1428    } catch (InstantiationException e) {
1429      return null;
1430    } catch (IllegalAccessException e) {
1431      return null;
1432    } catch (InvocationTargetException e) {
1433      return null;
1434    }
1435  }
1436
1437  private interface FutureCombiner<V, C> {
1438    C combine(List<Optional<V>> values);
1439  }
1440
1441  private static class CombinedFuture<V, C> extends AbstractFuture<C> {
1442    ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
1443    final boolean allMustSucceed;
1444    final AtomicInteger remaining;
1445    FutureCombiner<V, C> combiner;
1446    List<Optional<V>> values;
1447
1448    CombinedFuture(
1449        ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
1450        boolean allMustSucceed, Executor listenerExecutor,
1451        FutureCombiner<V, C> combiner) {
1452      this.futures = futures;
1453      this.allMustSucceed = allMustSucceed;
1454      this.remaining = new AtomicInteger(futures.size());
1455      this.combiner = combiner;
1456      this.values = Lists.newArrayListWithCapacity(futures.size());
1457      init(listenerExecutor);
1458    }
1459
1460    /**
1461     * Must be called at the end of the constructor.
1462     */
1463    protected void init(final Executor listenerExecutor) {
1464      // First, schedule cleanup to execute when the Future is done.
1465      addListener(new Runnable() {
1466        @Override
1467        public void run() {
1468          // Cancel all the component futures.
1469          if (CombinedFuture.this.isCancelled()) {
1470            for (ListenableFuture<?> future : CombinedFuture.this.futures) {
1471              future.cancel(CombinedFuture.this.wasInterrupted());
1472            }
1473          }
1474
1475          // By now the values array has either been set as the Future's value,
1476          // or (in case of failure) is no longer useful.
1477          CombinedFuture.this.futures = null;
1478
1479          // Let go of the memory held by other futures
1480          CombinedFuture.this.values = null;
1481
1482          // The combiner may also hold state, so free that as well
1483          CombinedFuture.this.combiner = null;
1484        }
1485      }, MoreExecutors.sameThreadExecutor());
1486
1487      // Now begin the "real" initialization.
1488
1489      // Corner case: List is empty.
1490      if (futures.isEmpty()) {
1491        set(combiner.combine(ImmutableList.<Optional<V>>of()));
1492        return;
1493      }
1494
1495      // Populate the results list with null initially.
1496      for (int i = 0; i < futures.size(); ++i) {
1497        values.add(null);
1498      }
1499
1500      // Register a listener on each Future in the list to update
1501      // the state of this future.
1502      // Note that if all the futures on the list are done prior to completing
1503      // this loop, the last call to addListener() will callback to
1504      // setOneValue(), transitively call our cleanup listener, and set
1505      // this.futures to null.
1506      // This is not actually a problem, since the foreach only needs
1507      // this.futures to be non-null at the beginning of the loop.
1508      int i = 0;
1509      for (final ListenableFuture<? extends V> listenable : futures) {
1510        final int index = i++;
1511        listenable.addListener(new Runnable() {
1512          @Override
1513          public void run() {
1514            setOneValue(index, listenable);
1515          }
1516        }, listenerExecutor);
1517      }
1518    }
1519
1520    /**
1521     * Sets the value at the given index to that of the given future.
1522     */
1523    private void setOneValue(int index, Future<? extends V> future) {
1524      List<Optional<V>> localValues = values;
1525      if (isDone() || localValues == null) {
1526        // Some other future failed or has been cancelled, causing this one to
1527        // also be cancelled or have an exception set. This should only happen
1528        // if allMustSucceed is true or if the output itself has been cancelled.
1529        checkState(allMustSucceed || isCancelled(),
1530            "Future was done before all dependencies completed");
1531        return;
1532      }
1533
1534      try {
1535        checkState(future.isDone(),
1536            "Tried to set value from future which is not done");
1537        V returnValue = getUninterruptibly(future);
1538        localValues.set(index, Optional.fromNullable(returnValue));
1539      } catch (CancellationException e) {
1540        if (allMustSucceed) {
1541          // Set ourselves as cancelled. Let the input futures keep running
1542          // as some of them may be used elsewhere.
1543          // (Currently we don't override interruptTask, so
1544          // mayInterruptIfRunning==false isn't technically necessary.)
1545          cancel(false);
1546        }
1547      } catch (ExecutionException e) {
1548        if (allMustSucceed) {
1549          // As soon as the first one fails, throw the exception up.
1550          // The result of all other inputs is then ignored.
1551          setException(e.getCause());
1552        }
1553      } catch (RuntimeException e) {
1554        if (allMustSucceed) {
1555          setException(e);
1556        }
1557      } catch (Error e) {
1558        // Propagate errors up ASAP - our superclass will rethrow the error
1559        setException(e);
1560      } finally {
1561        int newRemaining = remaining.decrementAndGet();
1562        checkState(newRemaining >= 0, "Less than 0 remaining futures");
1563        if (newRemaining == 0) {
1564          FutureCombiner<V, C> localCombiner = combiner;
1565          if (localCombiner != null) {
1566            set(localCombiner.combine(localValues));
1567          } else {
1568            checkState(isDone());
1569          }
1570        }
1571      }
1572    }
1573
1574  }
1575
1576  /** Used for {@link #allAsList} and {@link #successfulAsList}. */
1577  private static <V> ListenableFuture<List<V>> listFuture(
1578      ImmutableList<ListenableFuture<? extends V>> futures,
1579      boolean allMustSucceed, Executor listenerExecutor) {
1580    return new CombinedFuture<V, List<V>>(
1581        futures, allMustSucceed, listenerExecutor,
1582        new FutureCombiner<V, List<V>>() {
1583          @Override
1584          public List<V> combine(List<Optional<V>> values) {
1585            List<V> result = Lists.newArrayList();
1586            for (Optional<V> element : values) {
1587              result.add(element != null ? element.orNull() : null);
1588            }
1589            // TODO(user): This should ultimately return an unmodifiableList
1590            return result;
1591          }
1592        });
1593  }
1594
1595  /**
1596   * A checked future that uses a function to map from exceptions to the
1597   * appropriate checked type.
1598   */
1599  private static class MappingCheckedFuture<V, X extends Exception> extends
1600      AbstractCheckedFuture<V, X> {
1601
1602    final Function<Exception, X> mapper;
1603
1604    MappingCheckedFuture(ListenableFuture<V> delegate,
1605        Function<Exception, X> mapper) {
1606      super(delegate);
1607
1608      this.mapper = checkNotNull(mapper);
1609    }
1610
1611    @Override
1612    protected X mapException(Exception e) {
1613      return mapper.apply(e);
1614    }
1615  }
1616}