001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024import static java.lang.Thread.currentThread;
025import static java.util.Arrays.asList;
026
027import com.google.common.annotations.Beta;
028import com.google.common.base.Function;
029import com.google.common.base.Optional;
030import com.google.common.base.Preconditions;
031import com.google.common.collect.ImmutableCollection;
032import com.google.common.collect.ImmutableList;
033import com.google.common.collect.Lists;
034import com.google.common.collect.Ordering;
035import com.google.common.collect.Queues;
036import com.google.common.collect.Sets;
037
038import java.lang.reflect.Constructor;
039import java.lang.reflect.InvocationTargetException;
040import java.lang.reflect.UndeclaredThrowableException;
041import java.util.Arrays;
042import java.util.Collections;
043import java.util.List;
044import java.util.Set;
045import java.util.concurrent.CancellationException;
046import java.util.concurrent.ConcurrentLinkedQueue;
047import java.util.concurrent.CountDownLatch;
048import java.util.concurrent.ExecutionException;
049import java.util.concurrent.Executor;
050import java.util.concurrent.Future;
051import java.util.concurrent.TimeUnit;
052import java.util.concurrent.TimeoutException;
053import java.util.concurrent.atomic.AtomicInteger;
054import java.util.logging.Level;
055import java.util.logging.Logger;
056
057import javax.annotation.Nullable;
058
059/**
060 * Static utility methods pertaining to the {@link Future} interface.
061 *
062 * <p>Many of these methods use the {@link ListenableFuture} API; consult the
063 * Guava User Guide article on <a href=
064 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
065 * {@code ListenableFuture}</a>.
066 *
067 * @author Kevin Bourrillion
068 * @author Nishant Thakkar
069 * @author Sven Mawson
070 * @since 1.0
071 */
072@Beta
073public final class Futures {
074  private Futures() {}
075
076  /**
077   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
078   * and a {@link Function} that maps from {@link Exception} instances into the
079   * appropriate checked type.
080   *
081   * <p>The given mapping function will be applied to an
082   * {@link InterruptedException}, a {@link CancellationException}, or an
083   * {@link ExecutionException}.
084   * See {@link Future#get()} for details on the exceptions thrown.
085   *
086   * @since 9.0 (source-compatible since 1.0)
087   */
088  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
089      ListenableFuture<V> future, Function<Exception, X> mapper) {
090    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
091  }
092
093  private abstract static class ImmediateFuture<V>
094      implements ListenableFuture<V> {
095
096    private static final Logger log =
097        Logger.getLogger(ImmediateFuture.class.getName());
098
099    @Override
100    public void addListener(Runnable listener, Executor executor) {
101      checkNotNull(listener, "Runnable was null.");
102      checkNotNull(executor, "Executor was null.");
103      try {
104        executor.execute(listener);
105      } catch (RuntimeException e) {
106        // ListenableFuture's contract is that it will not throw unchecked
107        // exceptions, so log the bad runnable and/or executor and swallow it.
108        log.log(Level.SEVERE, "RuntimeException while executing runnable "
109            + listener + " with executor " + executor, e);
110      }
111    }
112
113    @Override
114    public boolean cancel(boolean mayInterruptIfRunning) {
115      return false;
116    }
117
118    @Override
119    public abstract V get() throws ExecutionException;
120
121    @Override
122    public V get(long timeout, TimeUnit unit) throws ExecutionException {
123      checkNotNull(unit);
124      return get();
125    }
126
127    @Override
128    public boolean isCancelled() {
129      return false;
130    }
131
132    @Override
133    public boolean isDone() {
134      return true;
135    }
136  }
137
138  private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
139
140    @Nullable private final V value;
141
142    ImmediateSuccessfulFuture(@Nullable V value) {
143      this.value = value;
144    }
145
146    @Override
147    public V get() {
148      return value;
149    }
150  }
151
152  private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
153      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
154
155    @Nullable private final V value;
156
157    ImmediateSuccessfulCheckedFuture(@Nullable V value) {
158      this.value = value;
159    }
160
161    @Override
162    public V get() {
163      return value;
164    }
165
166    @Override
167    public V checkedGet() {
168      return value;
169    }
170
171    @Override
172    public V checkedGet(long timeout, TimeUnit unit) {
173      checkNotNull(unit);
174      return value;
175    }
176  }
177
178  private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
179
180    private final Throwable thrown;
181
182    ImmediateFailedFuture(Throwable thrown) {
183      this.thrown = thrown;
184    }
185
186    @Override
187    public V get() throws ExecutionException {
188      throw new ExecutionException(thrown);
189    }
190  }
191
192  private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
193
194    private final CancellationException thrown;
195
196    ImmediateCancelledFuture() {
197      this.thrown = new CancellationException("Immediate cancelled future.");
198    }
199
200    @Override
201    public boolean isCancelled() {
202      return true;
203    }
204
205    @Override
206    public V get() {
207      throw AbstractFuture.cancellationExceptionWithCause(
208          "Task was cancelled.", thrown);
209    }
210  }
211
212  private static class ImmediateFailedCheckedFuture<V, X extends Exception>
213      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
214
215    private final X thrown;
216
217    ImmediateFailedCheckedFuture(X thrown) {
218      this.thrown = thrown;
219    }
220
221    @Override
222    public V get() throws ExecutionException {
223      throw new ExecutionException(thrown);
224    }
225
226    @Override
227    public V checkedGet() throws X {
228      throw thrown;
229    }
230
231    @Override
232    public V checkedGet(long timeout, TimeUnit unit) throws X {
233      checkNotNull(unit);
234      throw thrown;
235    }
236  }
237
238  /**
239   * Creates a {@code ListenableFuture} which has its value set immediately upon
240   * construction. The getters just return the value. This {@code Future} can't
241   * be canceled or timed out and its {@code isDone()} method always returns
242   * {@code true}.
243   */
244  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
245    return new ImmediateSuccessfulFuture<V>(value);
246  }
247
248  /**
249   * Returns a {@code CheckedFuture} which has its value set immediately upon
250   * construction.
251   *
252   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
253   * method always returns {@code true}. Calling {@code get()} or {@code
254   * checkedGet()} will immediately return the provided value.
255   */
256  public static <V, X extends Exception> CheckedFuture<V, X>
257      immediateCheckedFuture(@Nullable V value) {
258    return new ImmediateSuccessfulCheckedFuture<V, X>(value);
259  }
260
261  /**
262   * Returns a {@code ListenableFuture} which has an exception set immediately
263   * upon construction.
264   *
265   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
266   * method always returns {@code true}. Calling {@code get()} will immediately
267   * throw the provided {@code Throwable} wrapped in an {@code
268   * ExecutionException}.
269   */
270  public static <V> ListenableFuture<V> immediateFailedFuture(
271      Throwable throwable) {
272    checkNotNull(throwable);
273    return new ImmediateFailedFuture<V>(throwable);
274  }
275
276  /**
277   * Creates a {@code ListenableFuture} which is cancelled immediately upon
278   * construction, so that {@code isCancelled()} always returns {@code true}.
279   *
280   * @since 14.0
281   */
282  public static <V> ListenableFuture<V> immediateCancelledFuture() {
283    return new ImmediateCancelledFuture<V>();
284  }
285
286  /**
287   * Returns a {@code CheckedFuture} which has an exception set immediately upon
288   * construction.
289   *
290   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
291   * method always returns {@code true}. Calling {@code get()} will immediately
292   * throw the provided {@code Exception} wrapped in an {@code
293   * ExecutionException}, and calling {@code checkedGet()} will throw the
294   * provided exception itself.
295   */
296  public static <V, X extends Exception> CheckedFuture<V, X>
297      immediateFailedCheckedFuture(X exception) {
298    checkNotNull(exception);
299    return new ImmediateFailedCheckedFuture<V, X>(exception);
300  }
301
302  /**
303   * Returns a {@code Future} whose result is taken from the given primary
304   * {@code input} or, if the primary input fails, from the {@code Future}
305   * provided by the {@code fallback}. {@link FutureFallback#create} is not
306   * invoked until the primary input has failed, so if the primary input
307   * succeeds, it is never invoked. If, during the invocation of {@code
308   * fallback}, an exception is thrown, this exception is used as the result of
309   * the output {@code Future}.
310   *
311   * <p>Below is an example of a fallback that returns a default value if an
312   * exception occurs:
313   *
314   * <pre>   {@code
315   *   ListenableFuture<Integer> fetchCounterFuture = ...;
316   *
317   *   // Falling back to a zero counter in case an exception happens when
318   *   // processing the RPC to fetch counters.
319   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
320   *       fetchCounterFuture, new FutureFallback<Integer>() {
321   *         public ListenableFuture<Integer> create(Throwable t) {
322   *           // Returning "0" as the default for the counter when the
323   *           // exception happens.
324   *           return immediateFuture(0);
325   *         }
326   *       });}</pre>
327   *
328   * <p>The fallback can also choose to propagate the original exception when
329   * desired:
330   *
331   * <pre>   {@code
332   *   ListenableFuture<Integer> fetchCounterFuture = ...;
333   *
334   *   // Falling back to a zero counter only in case the exception was a
335   *   // TimeoutException.
336   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
337   *       fetchCounterFuture, new FutureFallback<Integer>() {
338   *         public ListenableFuture<Integer> create(Throwable t) {
339   *           if (t instanceof TimeoutException) {
340   *             return immediateFuture(0);
341   *           }
342   *           return immediateFailedFuture(t);
343   *         }
344   *       });}</pre>
345   *
346   * <p>Note: If the derived {@code Future} is slow or heavyweight to create
347   * (whether the {@code Future} itself is slow or heavyweight to complete is
348   * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
349   * FutureFallback, Executor) supplying an executor}. If you do not supply an
350   * executor, {@code withFallback} will use {@link
351   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
352   * caveats for heavier operations. For example, the call to {@code
353   * fallback.create} may run on an unpredictable or undesirable thread:
354   *
355   * <ul>
356   * <li>If the input {@code Future} is done at the time {@code withFallback}
357   * is called, {@code withFallback} will call {@code fallback.create} inline.
358   * <li>If the input {@code Future} is not yet done, {@code withFallback} will
359   * schedule {@code fallback.create} to be run by the thread that completes
360   * the input {@code Future}, which may be an internal system thread such as
361   * an RPC network thread.
362   * </ul>
363   *
364   * <p>Also note that, regardless of which thread executes the {@code
365   * sameThreadExecutor} {@code fallback.create}, all other registered but
366   * unexecuted listeners are prevented from running during its execution, even
367   * if those listeners are to run in other executors.
368   *
369   * @param input the primary input {@code Future}
370   * @param fallback the {@link FutureFallback} implementation to be called if
371   *     {@code input} fails
372   * @since 14.0
373   */
374  public static <V> ListenableFuture<V> withFallback(
375      ListenableFuture<? extends V> input,
376      FutureFallback<? extends V> fallback) {
377    return withFallback(input, fallback, sameThreadExecutor());
378  }
379
380  /**
381   * Returns a {@code Future} whose result is taken from the given primary
382   * {@code input} or, if the primary input fails, from the {@code Future}
383   * provided by the {@code fallback}. {@link FutureFallback#create} is not
384   * invoked until the primary input has failed, so if the primary input
385   * succeeds, it is never invoked. If, during the invocation of {@code
386   * fallback}, an exception is thrown, this exception is used as the result of
387   * the output {@code Future}.
388   *
389   * <p>Below is an example of a fallback that returns a default value if an
390   * exception occurs:
391   *
392   * <pre>   {@code
393   *   ListenableFuture<Integer> fetchCounterFuture = ...;
394   *
395   *   // Falling back to a zero counter in case an exception happens when
396   *   // processing the RPC to fetch counters.
397   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
398   *       fetchCounterFuture, new FutureFallback<Integer>() {
399   *         public ListenableFuture<Integer> create(Throwable t) {
400   *           // Returning "0" as the default for the counter when the
401   *           // exception happens.
402   *           return immediateFuture(0);
403   *         }
404   *       }, sameThreadExecutor());}</pre>
405   *
406   * <p>The fallback can also choose to propagate the original exception when
407   * desired:
408   *
409   * <pre>   {@code
410   *   ListenableFuture<Integer> fetchCounterFuture = ...;
411   *
412   *   // Falling back to a zero counter only in case the exception was a
413   *   // TimeoutException.
414   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
415   *       fetchCounterFuture, new FutureFallback<Integer>() {
416   *         public ListenableFuture<Integer> create(Throwable t) {
417   *           if (t instanceof TimeoutException) {
418   *             return immediateFuture(0);
419   *           }
420   *           return immediateFailedFuture(t);
421   *         }
422   *       }, sameThreadExecutor());}</pre>
423   *
424   * <p>When the execution of {@code fallback.create} is fast and lightweight
425   * (though the {@code Future} it returns need not meet these criteria),
426   * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
427   * omitting the executor} or explicitly specifying {@code
428   * sameThreadExecutor}. However, be aware of the caveats documented in the
429   * link above.
430   *
431   * @param input the primary input {@code Future}
432   * @param fallback the {@link FutureFallback} implementation to be called if
433   *     {@code input} fails
434   * @param executor the executor that runs {@code fallback} if {@code input}
435   *     fails
436   * @since 14.0
437   */
438  public static <V> ListenableFuture<V> withFallback(
439      ListenableFuture<? extends V> input,
440      FutureFallback<? extends V> fallback, Executor executor) {
441    checkNotNull(fallback);
442    return new FallbackFuture<V>(input, fallback, executor);
443  }
444
445  /**
446   * A future that falls back on a second, generated future, in case its
447   * original future fails.
448   */
449  private static class FallbackFuture<V> extends AbstractFuture<V> {
450
451    private volatile ListenableFuture<? extends V> running;
452
453    FallbackFuture(ListenableFuture<? extends V> input,
454        final FutureFallback<? extends V> fallback,
455        final Executor executor) {
456      running = input;
457      addCallback(running, new FutureCallback<V>() {
458        @Override
459        public void onSuccess(V value) {
460          set(value);
461        }
462
463        @Override
464        public void onFailure(Throwable t) {
465          if (isCancelled()) {
466            return;
467          }
468          try {
469            running = fallback.create(t);
470            if (isCancelled()) { // in case cancel called in the meantime
471              running.cancel(wasInterrupted());
472              return;
473            }
474            addCallback(running, new FutureCallback<V>() {
475              @Override
476              public void onSuccess(V value) {
477                set(value);
478              }
479
480              @Override
481              public void onFailure(Throwable t) {
482                if (running.isCancelled()) {
483                  cancel(false);
484                } else {
485                  setException(t);
486                }
487              }
488            }, sameThreadExecutor());
489          } catch (Throwable e) {
490            setException(e);
491          }
492        }
493      }, executor);
494    }
495
496    @Override
497    public boolean cancel(boolean mayInterruptIfRunning) {
498      if (super.cancel(mayInterruptIfRunning)) {
499        running.cancel(mayInterruptIfRunning);
500        return true;
501      }
502      return false;
503    }
504  }
505
506  /**
507   * Returns a new {@code ListenableFuture} whose result is asynchronously
508   * derived from the result of the given {@code Future}. More precisely, the
509   * returned {@code Future} takes its result from a {@code Future} produced by
510   * applying the given {@code AsyncFunction} to the result of the original
511   * {@code Future}. Example:
512   *
513   * <pre>   {@code
514   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
515   *   AsyncFunction<RowKey, QueryResult> queryFunction =
516   *       new AsyncFunction<RowKey, QueryResult>() {
517   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
518   *           return dataService.read(rowKey);
519   *         }
520   *       };
521   *   ListenableFuture<QueryResult> queryFuture =
522   *       transform(rowKeyFuture, queryFunction);}</pre>
523   *
524   * <p>Note: If the derived {@code Future} is slow or heavyweight to create
525   * (whether the {@code Future} itself is slow or heavyweight to complete is
526   * irrelevant), consider {@linkplain #transform(ListenableFuture,
527   * AsyncFunction, Executor) supplying an executor}. If you do not supply an
528   * executor, {@code transform} will use {@link
529   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
530   * caveats for heavier operations. For example, the call to {@code
531   * function.apply} may run on an unpredictable or undesirable thread:
532   *
533   * <ul>
534   * <li>If the input {@code Future} is done at the time {@code transform} is
535   * called, {@code transform} will call {@code function.apply} inline.
536   * <li>If the input {@code Future} is not yet done, {@code transform} will
537   * schedule {@code function.apply} to be run by the thread that completes the
538   * input {@code Future}, which may be an internal system thread such as an
539   * RPC network thread.
540   * </ul>
541   *
542   * <p>Also note that, regardless of which thread executes the {@code
543   * sameThreadExecutor} {@code function.apply}, all other registered but
544   * unexecuted listeners are prevented from running during its execution, even
545   * if those listeners are to run in other executors.
546   *
547   * <p>The returned {@code Future} attempts to keep its cancellation state in
548   * sync with that of the input future and that of the future returned by the
549   * function. That is, if the returned {@code Future} is cancelled, it will
550   * attempt to cancel the other two, and if either of the other two is
551   * cancelled, the returned {@code Future} will receive a callback in which it
552   * will attempt to cancel itself.
553   *
554   * @param input The future to transform
555   * @param function A function to transform the result of the input future
556   *     to the result of the output future
557   * @return A future that holds result of the function (if the input succeeded)
558   *     or the original input's failure (if not)
559   * @since 11.0
560   */
561  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
562      AsyncFunction<? super I, ? extends O> function) {
563    return transform(input, function, MoreExecutors.sameThreadExecutor());
564  }
565
566  /**
567   * Returns a new {@code ListenableFuture} whose result is asynchronously
568   * derived from the result of the given {@code Future}. More precisely, the
569   * returned {@code Future} takes its result from a {@code Future} produced by
570   * applying the given {@code AsyncFunction} to the result of the original
571   * {@code Future}. Example:
572   *
573   * <pre>   {@code
574   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
575   *   AsyncFunction<RowKey, QueryResult> queryFunction =
576   *       new AsyncFunction<RowKey, QueryResult>() {
577   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
578   *           return dataService.read(rowKey);
579   *         }
580   *       };
581   *   ListenableFuture<QueryResult> queryFuture =
582   *       transform(rowKeyFuture, queryFunction, executor);}</pre>
583   *
584   * <p>The returned {@code Future} attempts to keep its cancellation state in
585   * sync with that of the input future and that of the future returned by the
586   * chain function. That is, if the returned {@code Future} is cancelled, it
587   * will attempt to cancel the other two, and if either of the other two is
588   * cancelled, the returned {@code Future} will receive a callback in which it
589   * will attempt to cancel itself.
590   *
591   * <p>When the execution of {@code function.apply} is fast and lightweight
592   * (though the {@code Future} it returns need not meet these criteria),
593   * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
594   * the executor} or explicitly specifying {@code sameThreadExecutor}.
595   * However, be aware of the caveats documented in the link above.
596   *
597   * @param input The future to transform
598   * @param function A function to transform the result of the input future
599   *     to the result of the output future
600   * @param executor Executor to run the function in.
601   * @return A future that holds result of the function (if the input succeeded)
602   *     or the original input's failure (if not)
603   * @since 11.0
604   */
605  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
606      AsyncFunction<? super I, ? extends O> function,
607      Executor executor) {
608    ChainingListenableFuture<I, O> output =
609        new ChainingListenableFuture<I, O>(function, input);
610    input.addListener(output, executor);
611    return output;
612  }
613
614  /**
615   * Returns a new {@code ListenableFuture} whose result is the product of
616   * applying the given {@code Function} to the result of the given {@code
617   * Future}. Example:
618   *
619   * <pre>   {@code
620   *   ListenableFuture<QueryResult> queryFuture = ...;
621   *   Function<QueryResult, List<Row>> rowsFunction =
622   *       new Function<QueryResult, List<Row>>() {
623   *         public List<Row> apply(QueryResult queryResult) {
624   *           return queryResult.getRows();
625   *         }
626   *       };
627   *   ListenableFuture<List<Row>> rowsFuture =
628   *       transform(queryFuture, rowsFunction);}</pre>
629   *
630   * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain
631   * #transform(ListenableFuture, Function, Executor) supplying an executor}.
632   * If you do not supply an executor, {@code transform} will use {@link
633   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
634   * caveats for heavier operations.  For example, the call to {@code
635   * function.apply} may run on an unpredictable or undesirable thread:
636   *
637   * <ul>
638   * <li>If the input {@code Future} is done at the time {@code transform} is
639   * called, {@code transform} will call {@code function.apply} inline.
640   * <li>If the input {@code Future} is not yet done, {@code transform} will
641   * schedule {@code function.apply} to be run by the thread that completes the
642   * input {@code Future}, which may be an internal system thread such as an
643   * RPC network thread.
644   * </ul>
645   *
646   * <p>Also note that, regardless of which thread executes the {@code
647   * sameThreadExecutor} {@code function.apply}, all other registered but
648   * unexecuted listeners are prevented from running during its execution, even
649   * if those listeners are to run in other executors.
650   *
651   * <p>The returned {@code Future} attempts to keep its cancellation state in
652   * sync with that of the input future. That is, if the returned {@code Future}
653   * is cancelled, it will attempt to cancel the input, and if the input is
654   * cancelled, the returned {@code Future} will receive a callback in which it
655   * will attempt to cancel itself.
656   *
657   * <p>An example use of this method is to convert a serializable object
658   * returned from an RPC into a POJO.
659   *
660   * @param input The future to transform
661   * @param function A Function to transform the results of the provided future
662   *     to the results of the returned future.  This will be run in the thread
663   *     that notifies input it is complete.
664   * @return A future that holds result of the transformation.
665   * @since 9.0 (in 1.0 as {@code compose})
666   */
667  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
668      final Function<? super I, ? extends O> function) {
669    return transform(input, function, MoreExecutors.sameThreadExecutor());
670  }
671
672  /**
673   * Returns a new {@code ListenableFuture} whose result is the product of
674   * applying the given {@code Function} to the result of the given {@code
675   * Future}. Example:
676   *
677   * <pre>   {@code
678   *   ListenableFuture<QueryResult> queryFuture = ...;
679   *   Function<QueryResult, List<Row>> rowsFunction =
680   *       new Function<QueryResult, List<Row>>() {
681   *         public List<Row> apply(QueryResult queryResult) {
682   *           return queryResult.getRows();
683   *         }
684   *       };
685   *   ListenableFuture<List<Row>> rowsFuture =
686   *       transform(queryFuture, rowsFunction, executor);}</pre>
687   *
688   * <p>The returned {@code Future} attempts to keep its cancellation state in
689   * sync with that of the input future. That is, if the returned {@code Future}
690   * is cancelled, it will attempt to cancel the input, and if the input is
691   * cancelled, the returned {@code Future} will receive a callback in which it
692   * will attempt to cancel itself.
693   *
694   * <p>An example use of this method is to convert a serializable object
695   * returned from an RPC into a POJO.
696   *
697   * <p>When the transformation is fast and lightweight, consider {@linkplain
698   * #transform(ListenableFuture, Function) omitting the executor} or
699   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
700   * caveats documented in the link above.
701   *
702   * @param input The future to transform
703   * @param function A Function to transform the results of the provided future
704   *     to the results of the returned future.
705   * @param executor Executor to run the function in.
706   * @return A future that holds result of the transformation.
707   * @since 9.0 (in 2.0 as {@code compose})
708   */
709  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
710      final Function<? super I, ? extends O> function, Executor executor) {
711    checkNotNull(function);
712    AsyncFunction<I, O> wrapperFunction
713        = new AsyncFunction<I, O>() {
714            @Override public ListenableFuture<O> apply(I input) {
715              O output = function.apply(input);
716              return immediateFuture(output);
717            }
718        };
719    return transform(input, wrapperFunction, executor);
720  }
721
722  /**
723   * Like {@link #transform(ListenableFuture, Function)} except that the
724   * transformation {@code function} is invoked on each call to
725   * {@link Future#get() get()} on the returned future.
726   *
727   * <p>The returned {@code Future} reflects the input's cancellation
728   * state directly, and any attempt to cancel the returned Future is likewise
729   * passed through to the input Future.
730   *
731   * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
732   * only apply the timeout to the execution of the underlying {@code Future},
733   * <em>not</em> to the execution of the transformation function.
734   *
735   * <p>The primary audience of this method is callers of {@code transform}
736   * who don't have a {@code ListenableFuture} available and
737   * do not mind repeated, lazy function evaluation.
738   *
739   * @param input The future to transform
740   * @param function A Function to transform the results of the provided future
741   *     to the results of the returned future.
742   * @return A future that returns the result of the transformation.
743   * @since 10.0
744   */
745  public static <I, O> Future<O> lazyTransform(final Future<I> input,
746      final Function<? super I, ? extends O> function) {
747    checkNotNull(input);
748    checkNotNull(function);
749    return new Future<O>() {
750
751      @Override
752      public boolean cancel(boolean mayInterruptIfRunning) {
753        return input.cancel(mayInterruptIfRunning);
754      }
755
756      @Override
757      public boolean isCancelled() {
758        return input.isCancelled();
759      }
760
761      @Override
762      public boolean isDone() {
763        return input.isDone();
764      }
765
766      @Override
767      public O get() throws InterruptedException, ExecutionException {
768        return applyTransformation(input.get());
769      }
770
771      @Override
772      public O get(long timeout, TimeUnit unit)
773          throws InterruptedException, ExecutionException, TimeoutException {
774        return applyTransformation(input.get(timeout, unit));
775      }
776
777      private O applyTransformation(I input) throws ExecutionException {
778        try {
779          return function.apply(input);
780        } catch (Throwable t) {
781          throw new ExecutionException(t);
782        }
783      }
784    };
785  }
786
787  /**
788   * An implementation of {@code ListenableFuture} that also implements
789   * {@code Runnable} so that it can be used to nest ListenableFutures.
790   * Once the passed-in {@code ListenableFuture} is complete, it calls the
791   * passed-in {@code Function} to generate the result.
792   *
793   * <p>For historical reasons, this class has a special case in its exception
794   * handling: If the given {@code AsyncFunction} throws an {@code
795   * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it
796   * and uses its <i>cause</i> as the output future's exception, rather than
797   * using the {@code UndeclaredThrowableException} itself as it would for other
798   * exception types. The reason for this is that {@code Futures.transform} used
799   * to require a {@code Function}, whose {@code apply} method is not allowed to
800   * throw checked exceptions. Nowadays, {@code Futures.transform} has an
801   * overload that accepts an {@code AsyncFunction}, whose {@code apply} method
802   * <i>is</i> allowed to throw checked exception. Users who wish to throw
803   * checked exceptions should use that overload instead, and <a
804   * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we
805   * should remove the {@code UndeclaredThrowableException} special case</a>.
806   */
807  private static class ChainingListenableFuture<I, O>
808      extends AbstractFuture<O> implements Runnable {
809
810    private AsyncFunction<? super I, ? extends O> function;
811    private ListenableFuture<? extends I> inputFuture;
812    private volatile ListenableFuture<? extends O> outputFuture;
813    private final CountDownLatch outputCreated = new CountDownLatch(1);
814
815    private ChainingListenableFuture(
816        AsyncFunction<? super I, ? extends O> function,
817        ListenableFuture<? extends I> inputFuture) {
818      this.function = checkNotNull(function);
819      this.inputFuture = checkNotNull(inputFuture);
820    }
821
822    @Override
823    public boolean cancel(boolean mayInterruptIfRunning) {
824      /*
825       * Our additional cancellation work needs to occur even if
826       * !mayInterruptIfRunning, so we can't move it into interruptTask().
827       */
828      if (super.cancel(mayInterruptIfRunning)) {
829        // This should never block since only one thread is allowed to cancel
830        // this Future.
831        cancel(inputFuture, mayInterruptIfRunning);
832        cancel(outputFuture, mayInterruptIfRunning);
833        return true;
834      }
835      return false;
836    }
837
838    private void cancel(@Nullable Future<?> future,
839        boolean mayInterruptIfRunning) {
840      if (future != null) {
841        future.cancel(mayInterruptIfRunning);
842      }
843    }
844
845    @Override
846    public void run() {
847      try {
848        I sourceResult;
849        try {
850          sourceResult = getUninterruptibly(inputFuture);
851        } catch (CancellationException e) {
852          // Cancel this future and return.
853          // At this point, inputFuture is cancelled and outputFuture doesn't
854          // exist, so the value of mayInterruptIfRunning is irrelevant.
855          cancel(false);
856          return;
857        } catch (ExecutionException e) {
858          // Set the cause of the exception as this future's exception
859          setException(e.getCause());
860          return;
861        }
862
863        final ListenableFuture<? extends O> outputFuture = this.outputFuture =
864            Preconditions.checkNotNull(function.apply(sourceResult),
865                "AsyncFunction may not return null.");
866        if (isCancelled()) {
867          outputFuture.cancel(wasInterrupted());
868          this.outputFuture = null;
869          return;
870        }
871        outputFuture.addListener(new Runnable() {
872            @Override
873            public void run() {
874              try {
875                set(getUninterruptibly(outputFuture));
876              } catch (CancellationException e) {
877                // Cancel this future and return.
878                // At this point, inputFuture and outputFuture are done, so the
879                // value of mayInterruptIfRunning is irrelevant.
880                cancel(false);
881                return;
882              } catch (ExecutionException e) {
883                // Set the cause of the exception as this future's exception
884                setException(e.getCause());
885              } finally {
886                // Don't pin inputs beyond completion
887                ChainingListenableFuture.this.outputFuture = null;
888              }
889            }
890          }, MoreExecutors.sameThreadExecutor());
891      } catch (UndeclaredThrowableException e) {
892        // Set the cause of the exception as this future's exception
893        setException(e.getCause());
894      } catch (Throwable t) {
895        // This exception is irrelevant in this thread, but useful for the
896        // client
897        setException(t);
898      } finally {
899        // Don't pin inputs beyond completion
900        function = null;
901        inputFuture = null;
902        // Allow our get routines to examine outputFuture now.
903        outputCreated.countDown();
904      }
905    }
906  }
907
908  /**
909   * Returns a new {@code ListenableFuture} whose result is the product of
910   * calling {@code get()} on the {@code Future} nested within the given {@code
911   * Future}, effectively chaining the futures one after the other.  Example:
912   *
913   * <pre>   {@code
914   *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
915   *   ListenableFuture<String> dereferenced = dereference(nested);}</pre>
916   *
917   * <p>This call has the same cancellation and execution semantics as {@link
918   * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
919   * Future} attempts to keep its cancellation state in sync with both the
920   * input {@code Future} and the nested {@code Future}.  The transformation
921   * is very lightweight and therefore takes place in the same thread (either
922   * the thread that called {@code dereference}, or the thread in which the
923   * dereferenced future completes).
924   *
925   * @param nested The nested future to transform.
926   * @return A future that holds result of the inner future.
927   * @since 13.0
928   */
929  @SuppressWarnings({"rawtypes", "unchecked"})
930  public static <V> ListenableFuture<V> dereference(
931      ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
932    return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
933  }
934
935  /**
936   * Helper {@code Function} for {@link #dereference}.
937   */
938  private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
939      new AsyncFunction<ListenableFuture<Object>, Object>() {
940        @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
941          return input;
942        }
943      };
944
945  /**
946   * Creates a new {@code ListenableFuture} whose value is a list containing the
947   * values of all its input futures, if all succeed. If any input fails, the
948   * returned future fails.
949   *
950   * <p>The list of results is in the same order as the input list.
951   *
952   * <p>Canceling this future will attempt to cancel all the component futures,
953   * and if any of the provided futures fails or is canceled, this one is,
954   * too.
955   *
956   * @param futures futures to combine
957   * @return a future that provides a list of the results of the component
958   *         futures
959   * @since 10.0
960   */
961  @Beta
962  public static <V> ListenableFuture<List<V>> allAsList(
963      ListenableFuture<? extends V>... futures) {
964    return listFuture(ImmutableList.copyOf(futures), true,
965        MoreExecutors.sameThreadExecutor());
966  }
967
968  /**
969   * Creates a new {@code ListenableFuture} whose value is a list containing the
970   * values of all its input futures, if all succeed. If any input fails, the
971   * returned future fails.
972   *
973   * <p>The list of results is in the same order as the input list.
974   *
975   * <p>Canceling this future will attempt to cancel all the component futures,
976   * and if any of the provided futures fails or is canceled, this one is,
977   * too.
978   *
979   * @param futures futures to combine
980   * @return a future that provides a list of the results of the component
981   *         futures
982   * @since 10.0
983   */
984  @Beta
985  public static <V> ListenableFuture<List<V>> allAsList(
986      Iterable<? extends ListenableFuture<? extends V>> futures) {
987    return listFuture(ImmutableList.copyOf(futures), true,
988        MoreExecutors.sameThreadExecutor());
989  }
990
991  /**
992   * Creates a new {@code ListenableFuture} whose result is set from the
993   * supplied future when it completes.  Cancelling the supplied future
994   * will also cancel the returned future, but cancelling the returned
995   * future will have no effect on the supplied future.
996   *
997   * @since 15.0
998   */
999  public static <V> ListenableFuture<V> nonCancellationPropagating(
1000      ListenableFuture<V> future) {
1001    return new NonCancellationPropagatingFuture<V>(future);
1002  }
1003
1004  /**
1005   * A wrapped future that does not propagate cancellation to its delegate.
1006   */
1007  private static class NonCancellationPropagatingFuture<V>
1008      extends AbstractFuture<V> {
1009    NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) {
1010      checkNotNull(delegate);
1011      addCallback(delegate, new FutureCallback<V>() {
1012        @Override
1013        public void onSuccess(V result) {
1014          set(result);
1015        }
1016
1017        @Override
1018        public void onFailure(Throwable t) {
1019          if (delegate.isCancelled()) {
1020            cancel(false);
1021          } else {
1022            setException(t);
1023          }
1024        }
1025      }, sameThreadExecutor());
1026    }
1027  }
1028
1029  /**
1030   * Creates a new {@code ListenableFuture} whose value is a list containing the
1031   * values of all its successful input futures. The list of results is in the
1032   * same order as the input list, and if any of the provided futures fails or
1033   * is canceled, its corresponding position will contain {@code null} (which is
1034   * indistinguishable from the future having a successful value of
1035   * {@code null}).
1036   *
1037   * <p>Canceling this future will attempt to cancel all the component futures.
1038   *
1039   * @param futures futures to combine
1040   * @return a future that provides a list of the results of the component
1041   *         futures
1042   * @since 10.0
1043   */
1044  @Beta
1045  public static <V> ListenableFuture<List<V>> successfulAsList(
1046      ListenableFuture<? extends V>... futures) {
1047    return listFuture(ImmutableList.copyOf(futures), false,
1048        MoreExecutors.sameThreadExecutor());
1049  }
1050
1051  /**
1052   * Creates a new {@code ListenableFuture} whose value is a list containing the
1053   * values of all its successful input futures. The list of results is in the
1054   * same order as the input list, and if any of the provided futures fails or
1055   * is canceled, its corresponding position will contain {@code null} (which is
1056   * indistinguishable from the future having a successful value of
1057   * {@code null}).
1058   *
1059   * <p>Canceling this future will attempt to cancel all the component futures.
1060   *
1061   * @param futures futures to combine
1062   * @return a future that provides a list of the results of the component
1063   *         futures
1064   * @since 10.0
1065   */
1066  @Beta
1067  public static <V> ListenableFuture<List<V>> successfulAsList(
1068      Iterable<? extends ListenableFuture<? extends V>> futures) {
1069    return listFuture(ImmutableList.copyOf(futures), false,
1070        MoreExecutors.sameThreadExecutor());
1071  }
1072
1073  /**
1074   * Returns a list of delegate futures that correspond to the futures received in the order
1075   * that they complete. Delegate futures return the same value or throw the same exception
1076   * as the corresponding input future returns/throws.
1077   *
1078   * <p>Cancelling a delegate future has no effect on any input future, since the delegate future
1079   * does not correspond to a specific input future until the appropriate number of input
1080   * futures have completed. At that point, it is too late to cancel the input future.
1081   * The input future's result, which cannot be stored into the cancelled delegate future,
1082   * is ignored.
1083   *
1084   * @since 17.0
1085   */
1086  @Beta
1087  public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder(
1088      Iterable<? extends ListenableFuture<? extends T>> futures) {
1089    // A CLQ may be overkill here.  We could save some pointers/memory by synchronizing on an
1090    // ArrayDeque
1091    final ConcurrentLinkedQueue<AsyncSettableFuture<T>> delegates =
1092        Queues.newConcurrentLinkedQueue();
1093    ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder();
1094    // Using SerializingExecutor here will ensure that each CompletionOrderListener executes
1095    // atomically and therefore that each returned future is guaranteed to be in completion order.
1096    // N.B. there are some cases where the use of this executor could have possibly surprising
1097    // effects when input futures finish at approximately the same time _and_ the output futures
1098    // have sameThreadExecutor listeners. In this situation, the listeners may end up running on a
1099    // different thread than if they were attached to the corresponding input future.  We believe
1100    // this to be a negligible cost since:
1101    // 1. Using the sameThreadExecutor implies that your callback is safe to run on any thread.
1102    // 2. This would likely only be noticeable if you were doing something expensive or blocking on
1103    //    a sameThreadExecutor listener on one of the output futures which is an antipattern anyway.
1104    SerializingExecutor executor = new SerializingExecutor(MoreExecutors.sameThreadExecutor());
1105    for (final ListenableFuture<? extends T> future : futures) {
1106      AsyncSettableFuture<T> delegate = AsyncSettableFuture.create();
1107      // Must make sure to add the delegate to the queue first in case the future is already done
1108      delegates.add(delegate);
1109      future.addListener(new Runnable() {
1110        @Override public void run() {
1111          delegates.remove().setFuture(future);
1112        }
1113      }, executor);
1114      listBuilder.add(delegate);
1115    }
1116    return listBuilder.build();
1117  }
1118
1119  /**
1120   * Registers separate success and failure callbacks to be run when the {@code
1121   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1122   * complete} or, if the computation is already complete, immediately.
1123   *
1124   * <p>There is no guaranteed ordering of execution of callbacks, but any
1125   * callback added through this method is guaranteed to be called once the
1126   * computation is complete.
1127   *
1128   * Example: <pre> {@code
1129   * ListenableFuture<QueryResult> future = ...;
1130   * addCallback(future,
1131   *     new FutureCallback<QueryResult> {
1132   *       public void onSuccess(QueryResult result) {
1133   *         storeInCache(result);
1134   *       }
1135   *       public void onFailure(Throwable t) {
1136   *         reportError(t);
1137   *       }
1138   *     });}</pre>
1139   *
1140   * <p>Note: If the callback is slow or heavyweight, consider {@linkplain
1141   * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
1142   * executor}. If you do not supply an executor, {@code addCallback} will use
1143   * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries
1144   * some caveats for heavier operations. For example, the callback may run on
1145   * an unpredictable or undesirable thread:
1146   *
1147   * <ul>
1148   * <li>If the input {@code Future} is done at the time {@code addCallback} is
1149   * called, {@code addCallback} will execute the callback inline.
1150   * <li>If the input {@code Future} is not yet done, {@code addCallback} will
1151   * schedule the callback to be run by the thread that completes the input
1152   * {@code Future}, which may be an internal system thread such as an RPC
1153   * network thread.
1154   * </ul>
1155   *
1156   * <p>Also note that, regardless of which thread executes the {@code
1157   * sameThreadExecutor} callback, all other registered but unexecuted listeners
1158   * are prevented from running during its execution, even if those listeners
1159   * are to run in other executors.
1160   *
1161   * <p>For a more general interface to attach a completion listener to a
1162   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1163   *
1164   * @param future The future attach the callback to.
1165   * @param callback The callback to invoke when {@code future} is completed.
1166   * @since 10.0
1167   */
1168  public static <V> void addCallback(ListenableFuture<V> future,
1169      FutureCallback<? super V> callback) {
1170    addCallback(future, callback, MoreExecutors.sameThreadExecutor());
1171  }
1172
1173  /**
1174   * Registers separate success and failure callbacks to be run when the {@code
1175   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1176   * complete} or, if the computation is already complete, immediately.
1177   *
1178   * <p>The callback is run in {@code executor}.
1179   * There is no guaranteed ordering of execution of callbacks, but any
1180   * callback added through this method is guaranteed to be called once the
1181   * computation is complete.
1182   *
1183   * Example: <pre> {@code
1184   * ListenableFuture<QueryResult> future = ...;
1185   * Executor e = ...
1186   * addCallback(future,
1187   *     new FutureCallback<QueryResult> {
1188   *       public void onSuccess(QueryResult result) {
1189   *         storeInCache(result);
1190   *       }
1191   *       public void onFailure(Throwable t) {
1192   *         reportError(t);
1193   *       }
1194   *     }, e);}</pre>
1195   *
1196   * <p>When the callback is fast and lightweight, consider {@linkplain
1197   * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
1198   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
1199   * caveats documented in the link above.
1200   *
1201   * <p>For a more general interface to attach a completion listener to a
1202   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1203   *
1204   * @param future The future attach the callback to.
1205   * @param callback The callback to invoke when {@code future} is completed.
1206   * @param executor The executor to run {@code callback} when the future
1207   *    completes.
1208   * @since 10.0
1209   */
1210  public static <V> void addCallback(final ListenableFuture<V> future,
1211      final FutureCallback<? super V> callback, Executor executor) {
1212    Preconditions.checkNotNull(callback);
1213    Runnable callbackListener = new Runnable() {
1214      @Override
1215      public void run() {
1216        final V value;
1217        try {
1218          // TODO(user): (Before Guava release), validate that this
1219          // is the thing for IE.
1220          value = getUninterruptibly(future);
1221        } catch (ExecutionException e) {
1222          callback.onFailure(e.getCause());
1223          return;
1224        } catch (RuntimeException e) {
1225          callback.onFailure(e);
1226          return;
1227        } catch (Error e) {
1228          callback.onFailure(e);
1229          return;
1230        }
1231        callback.onSuccess(value);
1232      }
1233    };
1234    future.addListener(callbackListener, executor);
1235  }
1236
1237  /**
1238   * Returns the result of {@link Future#get()}, converting most exceptions to a
1239   * new instance of the given checked exception type. This reduces boilerplate
1240   * for a common use of {@code Future} in which it is unnecessary to
1241   * programmatically distinguish between exception types or to extract other
1242   * information from the exception instance.
1243   *
1244   * <p>Exceptions from {@code Future.get} are treated as follows:
1245   * <ul>
1246   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1247   *     {@code X} if the cause is a checked exception, an {@link
1248   *     UncheckedExecutionException} if the cause is a {@code
1249   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1250   *     {@code Error}.
1251   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1252   *     restoring the interrupt).
1253   * <li>Any {@link CancellationException} is propagated untouched, as is any
1254   *     other {@link RuntimeException} (though {@code get} implementations are
1255   *     discouraged from throwing such exceptions).
1256   * </ul>
1257   *
1258   * <p>The overall principle is to continue to treat every checked exception as a
1259   * checked exception, every unchecked exception as an unchecked exception, and
1260   * every error as an error. In addition, the cause of any {@code
1261   * ExecutionException} is wrapped in order to ensure that the new stack trace
1262   * matches that of the current thread.
1263   *
1264   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1265   * public constructor that accepts zero or more arguments, all of type {@code
1266   * String} or {@code Throwable} (preferring constructors with at least one
1267   * {@code String}) and calling the constructor via reflection. If the
1268   * exception did not already have a cause, one is set by calling {@link
1269   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1270   * {@code IllegalArgumentException} is thrown.
1271   *
1272   * @throws X if {@code get} throws any checked exception except for an {@code
1273   *         ExecutionException} whose cause is not itself a checked exception
1274   * @throws UncheckedExecutionException if {@code get} throws an {@code
1275   *         ExecutionException} with a {@code RuntimeException} as its cause
1276   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1277   *         with an {@code Error} as its cause
1278   * @throws CancellationException if {@code get} throws a {@code
1279   *         CancellationException}
1280   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1281   *         RuntimeException} or does not have a suitable constructor
1282   * @since 10.0
1283   */
1284  public static <V, X extends Exception> V get(
1285      Future<V> future, Class<X> exceptionClass) throws X {
1286    checkNotNull(future);
1287    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1288        "Futures.get exception type (%s) must not be a RuntimeException",
1289        exceptionClass);
1290    try {
1291      return future.get();
1292    } catch (InterruptedException e) {
1293      currentThread().interrupt();
1294      throw newWithCause(exceptionClass, e);
1295    } catch (ExecutionException e) {
1296      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1297      throw new AssertionError();
1298    }
1299  }
1300
1301  /**
1302   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1303   * exceptions to a new instance of the given checked exception type. This
1304   * reduces boilerplate for a common use of {@code Future} in which it is
1305   * unnecessary to programmatically distinguish between exception types or to
1306   * extract other information from the exception instance.
1307   *
1308   * <p>Exceptions from {@code Future.get} are treated as follows:
1309   * <ul>
1310   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1311   *     {@code X} if the cause is a checked exception, an {@link
1312   *     UncheckedExecutionException} if the cause is a {@code
1313   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1314   *     {@code Error}.
1315   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1316   *     restoring the interrupt).
1317   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1318   * <li>Any {@link CancellationException} is propagated untouched, as is any
1319   *     other {@link RuntimeException} (though {@code get} implementations are
1320   *     discouraged from throwing such exceptions).
1321   * </ul>
1322   *
1323   * <p>The overall principle is to continue to treat every checked exception as a
1324   * checked exception, every unchecked exception as an unchecked exception, and
1325   * every error as an error. In addition, the cause of any {@code
1326   * ExecutionException} is wrapped in order to ensure that the new stack trace
1327   * matches that of the current thread.
1328   *
1329   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1330   * public constructor that accepts zero or more arguments, all of type {@code
1331   * String} or {@code Throwable} (preferring constructors with at least one
1332   * {@code String}) and calling the constructor via reflection. If the
1333   * exception did not already have a cause, one is set by calling {@link
1334   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1335   * {@code IllegalArgumentException} is thrown.
1336   *
1337   * @throws X if {@code get} throws any checked exception except for an {@code
1338   *         ExecutionException} whose cause is not itself a checked exception
1339   * @throws UncheckedExecutionException if {@code get} throws an {@code
1340   *         ExecutionException} with a {@code RuntimeException} as its cause
1341   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1342   *         with an {@code Error} as its cause
1343   * @throws CancellationException if {@code get} throws a {@code
1344   *         CancellationException}
1345   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1346   *         RuntimeException} or does not have a suitable constructor
1347   * @since 10.0
1348   */
1349  public static <V, X extends Exception> V get(
1350      Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1351      throws X {
1352    checkNotNull(future);
1353    checkNotNull(unit);
1354    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1355        "Futures.get exception type (%s) must not be a RuntimeException",
1356        exceptionClass);
1357    try {
1358      return future.get(timeout, unit);
1359    } catch (InterruptedException e) {
1360      currentThread().interrupt();
1361      throw newWithCause(exceptionClass, e);
1362    } catch (TimeoutException e) {
1363      throw newWithCause(exceptionClass, e);
1364    } catch (ExecutionException e) {
1365      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1366      throw new AssertionError();
1367    }
1368  }
1369
1370  private static <X extends Exception> void wrapAndThrowExceptionOrError(
1371      Throwable cause, Class<X> exceptionClass) throws X {
1372    if (cause instanceof Error) {
1373      throw new ExecutionError((Error) cause);
1374    }
1375    if (cause instanceof RuntimeException) {
1376      throw new UncheckedExecutionException(cause);
1377    }
1378    throw newWithCause(exceptionClass, cause);
1379  }
1380
1381  /**
1382   * Returns the result of calling {@link Future#get()} uninterruptibly on a
1383   * task known not to throw a checked exception. This makes {@code Future} more
1384   * suitable for lightweight, fast-running tasks that, barring bugs in the
1385   * code, will not fail. This gives it exception-handling behavior similar to
1386   * that of {@code ForkJoinTask.join}.
1387   *
1388   * <p>Exceptions from {@code Future.get} are treated as follows:
1389   * <ul>
1390   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1391   *     {@link UncheckedExecutionException} (if the cause is an {@code
1392   *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1393   *     Error}).
1394   * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1395   *     call. The interrupt is restored before {@code getUnchecked} returns.
1396   * <li>Any {@link CancellationException} is propagated untouched. So is any
1397   *     other {@link RuntimeException} ({@code get} implementations are
1398   *     discouraged from throwing such exceptions).
1399   * </ul>
1400   *
1401   * <p>The overall principle is to eliminate all checked exceptions: to loop to
1402   * avoid {@code InterruptedException}, to pass through {@code
1403   * CancellationException}, and to wrap any exception from the underlying
1404   * computation in an {@code UncheckedExecutionException} or {@code
1405   * ExecutionError}.
1406   *
1407   * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1408   * {@link Uninterruptibles#getUninterruptibly(Future)}.
1409   *
1410   * @throws UncheckedExecutionException if {@code get} throws an {@code
1411   *         ExecutionException} with an {@code Exception} as its cause
1412   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1413   *         with an {@code Error} as its cause
1414   * @throws CancellationException if {@code get} throws a {@code
1415   *         CancellationException}
1416   * @since 10.0
1417   */
1418  public static <V> V getUnchecked(Future<V> future) {
1419    checkNotNull(future);
1420    try {
1421      return getUninterruptibly(future);
1422    } catch (ExecutionException e) {
1423      wrapAndThrowUnchecked(e.getCause());
1424      throw new AssertionError();
1425    }
1426  }
1427
1428  private static void wrapAndThrowUnchecked(Throwable cause) {
1429    if (cause instanceof Error) {
1430      throw new ExecutionError((Error) cause);
1431    }
1432    /*
1433     * It's a non-Error, non-Exception Throwable. From my survey of such
1434     * classes, I believe that most users intended to extend Exception, so we'll
1435     * treat it like an Exception.
1436     */
1437    throw new UncheckedExecutionException(cause);
1438  }
1439
1440  /*
1441   * TODO(user): FutureChecker interface for these to be static methods on? If
1442   * so, refer to it in the (static-method) Futures.get documentation
1443   */
1444
1445  /*
1446   * Arguably we don't need a timed getUnchecked because any operation slow
1447   * enough to require a timeout is heavyweight enough to throw a checked
1448   * exception and therefore be inappropriate to use with getUnchecked. Further,
1449   * it's not clear that converting the checked TimeoutException to a
1450   * RuntimeException -- especially to an UncheckedExecutionException, since it
1451   * wasn't thrown by the computation -- makes sense, and if we don't convert
1452   * it, the user still has to write a try-catch block.
1453   *
1454   * If you think you would use this method, let us know.
1455   */
1456
1457  private static <X extends Exception> X newWithCause(
1458      Class<X> exceptionClass, Throwable cause) {
1459    // getConstructors() guarantees this as long as we don't modify the array.
1460    @SuppressWarnings("unchecked")
1461    List<Constructor<X>> constructors =
1462        (List) Arrays.asList(exceptionClass.getConstructors());
1463    for (Constructor<X> constructor : preferringStrings(constructors)) {
1464      @Nullable X instance = newFromConstructor(constructor, cause);
1465      if (instance != null) {
1466        if (instance.getCause() == null) {
1467          instance.initCause(cause);
1468        }
1469        return instance;
1470      }
1471    }
1472    throw new IllegalArgumentException(
1473        "No appropriate constructor for exception of type " + exceptionClass
1474            + " in response to chained exception", cause);
1475  }
1476
1477  private static <X extends Exception> List<Constructor<X>>
1478      preferringStrings(List<Constructor<X>> constructors) {
1479    return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1480  }
1481
1482  private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1483      Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1484        @Override public Boolean apply(Constructor<?> input) {
1485          return asList(input.getParameterTypes()).contains(String.class);
1486        }
1487      }).reverse();
1488
1489  @Nullable private static <X> X newFromConstructor(
1490      Constructor<X> constructor, Throwable cause) {
1491    Class<?>[] paramTypes = constructor.getParameterTypes();
1492    Object[] params = new Object[paramTypes.length];
1493    for (int i = 0; i < paramTypes.length; i++) {
1494      Class<?> paramType = paramTypes[i];
1495      if (paramType.equals(String.class)) {
1496        params[i] = cause.toString();
1497      } else if (paramType.equals(Throwable.class)) {
1498        params[i] = cause;
1499      } else {
1500        return null;
1501      }
1502    }
1503    try {
1504      return constructor.newInstance(params);
1505    } catch (IllegalArgumentException e) {
1506      return null;
1507    } catch (InstantiationException e) {
1508      return null;
1509    } catch (IllegalAccessException e) {
1510      return null;
1511    } catch (InvocationTargetException e) {
1512      return null;
1513    }
1514  }
1515
1516  private interface FutureCombiner<V, C> {
1517    C combine(List<Optional<V>> values);
1518  }
1519
1520  private static class CombinedFuture<V, C> extends AbstractFuture<C> {
1521    private static final Logger logger =
1522        Logger.getLogger(CombinedFuture.class.getName());
1523
1524    ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
1525    final boolean allMustSucceed;
1526    final AtomicInteger remaining;
1527    FutureCombiner<V, C> combiner;
1528    List<Optional<V>> values;
1529    final Object seenExceptionsLock = new Object();
1530    Set<Throwable> seenExceptions;
1531
1532    CombinedFuture(
1533        ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
1534        boolean allMustSucceed, Executor listenerExecutor,
1535        FutureCombiner<V, C> combiner) {
1536      this.futures = futures;
1537      this.allMustSucceed = allMustSucceed;
1538      this.remaining = new AtomicInteger(futures.size());
1539      this.combiner = combiner;
1540      this.values = Lists.newArrayListWithCapacity(futures.size());
1541      init(listenerExecutor);
1542    }
1543
1544    /**
1545     * Must be called at the end of the constructor.
1546     */
1547    protected void init(final Executor listenerExecutor) {
1548      // First, schedule cleanup to execute when the Future is done.
1549      addListener(new Runnable() {
1550        @Override
1551        public void run() {
1552          // Cancel all the component futures.
1553          if (CombinedFuture.this.isCancelled()) {
1554            for (ListenableFuture<?> future : CombinedFuture.this.futures) {
1555              future.cancel(CombinedFuture.this.wasInterrupted());
1556            }
1557          }
1558
1559          // Let go of the memory held by other futures
1560          CombinedFuture.this.futures = null;
1561
1562          // By now the values array has either been set as the Future's value,
1563          // or (in case of failure) is no longer useful.
1564          CombinedFuture.this.values = null;
1565
1566          // The combiner may also hold state, so free that as well
1567          CombinedFuture.this.combiner = null;
1568        }
1569      }, MoreExecutors.sameThreadExecutor());
1570
1571      // Now begin the "real" initialization.
1572
1573      // Corner case: List is empty.
1574      if (futures.isEmpty()) {
1575        set(combiner.combine(ImmutableList.<Optional<V>>of()));
1576        return;
1577      }
1578
1579      // Populate the results list with null initially.
1580      for (int i = 0; i < futures.size(); ++i) {
1581        values.add(null);
1582      }
1583
1584      // Register a listener on each Future in the list to update
1585      // the state of this future.
1586      // Note that if all the futures on the list are done prior to completing
1587      // this loop, the last call to addListener() will callback to
1588      // setOneValue(), transitively call our cleanup listener, and set
1589      // this.futures to null.
1590      // This is not actually a problem, since the foreach only needs
1591      // this.futures to be non-null at the beginning of the loop.
1592      int i = 0;
1593      for (final ListenableFuture<? extends V> listenable : futures) {
1594        final int index = i++;
1595        listenable.addListener(new Runnable() {
1596          @Override
1597          public void run() {
1598            setOneValue(index, listenable);
1599          }
1600        }, listenerExecutor);
1601      }
1602    }
1603
1604    /**
1605     * Fails this future with the given Throwable if {@link #allMustSucceed} is
1606     * true. Also, logs the throwable if it is an {@link Error} or if
1607     * {@link #allMustSucceed} is {@code true}, the throwable did not cause
1608     * this future to fail, and it is the first time we've seen that particular Throwable.
1609     */
1610    private void setExceptionAndMaybeLog(Throwable throwable) {
1611      boolean visibleFromOutputFuture = false;
1612      boolean firstTimeSeeingThisException = true;
1613      if (allMustSucceed) {
1614        // As soon as the first one fails, throw the exception up.
1615        // The result of all other inputs is then ignored.
1616        visibleFromOutputFuture = super.setException(throwable);
1617
1618        synchronized (seenExceptionsLock) {
1619          if (seenExceptions == null) {
1620            seenExceptions = Sets.newHashSet();
1621          }
1622          firstTimeSeeingThisException = seenExceptions.add(throwable);
1623        }
1624      }
1625
1626      if (throwable instanceof Error
1627          || (allMustSucceed && !visibleFromOutputFuture && firstTimeSeeingThisException)) {
1628        logger.log(Level.SEVERE, "input future failed.", throwable);
1629      }
1630    }
1631
1632    /**
1633     * Sets the value at the given index to that of the given future.
1634     */
1635    private void setOneValue(int index, Future<? extends V> future) {
1636      List<Optional<V>> localValues = values;
1637      // TODO(user): This check appears to be redundant since values is
1638      // assigned null only after the future completes.  However, values
1639      // is not volatile so it may be possible for us to observe the changes
1640      // to these two values in a different order... which I think is why
1641      // we need to check both.  Clear up this craziness either by making
1642      // values volatile or proving that it doesn't need to be for some other
1643      // reason.
1644      if (isDone() || localValues == null) {
1645        // Some other future failed or has been cancelled, causing this one to
1646        // also be cancelled or have an exception set. This should only happen
1647        // if allMustSucceed is true or if the output itself has been
1648        // cancelled.
1649        checkState(allMustSucceed || isCancelled(),
1650            "Future was done before all dependencies completed");
1651      }
1652
1653      try {
1654        checkState(future.isDone(),
1655            "Tried to set value from future which is not done");
1656        V returnValue = getUninterruptibly(future);
1657        if (localValues != null) {
1658          localValues.set(index, Optional.fromNullable(returnValue));
1659        }
1660      } catch (CancellationException e) {
1661        if (allMustSucceed) {
1662          // Set ourselves as cancelled. Let the input futures keep running
1663          // as some of them may be used elsewhere.
1664          cancel(false);
1665        }
1666      } catch (ExecutionException e) {
1667        setExceptionAndMaybeLog(e.getCause());
1668      } catch (Throwable t) {
1669        setExceptionAndMaybeLog(t);
1670      } finally {
1671        int newRemaining = remaining.decrementAndGet();
1672        checkState(newRemaining >= 0, "Less than 0 remaining futures");
1673        if (newRemaining == 0) {
1674          FutureCombiner<V, C> localCombiner = combiner;
1675          if (localCombiner != null && localValues != null) {
1676            set(localCombiner.combine(localValues));
1677          } else {
1678            checkState(isDone());
1679          }
1680        }
1681      }
1682    }
1683
1684  }
1685
1686  /** Used for {@link #allAsList} and {@link #successfulAsList}. */
1687  private static <V> ListenableFuture<List<V>> listFuture(
1688      ImmutableList<ListenableFuture<? extends V>> futures,
1689      boolean allMustSucceed, Executor listenerExecutor) {
1690    return new CombinedFuture<V, List<V>>(
1691        futures, allMustSucceed, listenerExecutor,
1692        new FutureCombiner<V, List<V>>() {
1693          @Override
1694          public List<V> combine(List<Optional<V>> values) {
1695            List<V> result = Lists.newArrayList();
1696            for (Optional<V> element : values) {
1697              result.add(element != null ? element.orNull() : null);
1698            }
1699            return Collections.unmodifiableList(result);
1700          }
1701        });
1702  }
1703
1704  /**
1705   * A checked future that uses a function to map from exceptions to the
1706   * appropriate checked type.
1707   */
1708  private static class MappingCheckedFuture<V, X extends Exception> extends
1709      AbstractCheckedFuture<V, X> {
1710
1711    final Function<Exception, X> mapper;
1712
1713    MappingCheckedFuture(ListenableFuture<V> delegate,
1714        Function<Exception, X> mapper) {
1715      super(delegate);
1716
1717      this.mapper = checkNotNull(mapper);
1718    }
1719
1720    @Override
1721    protected X mapException(Exception e) {
1722      return mapper.apply(e);
1723    }
1724  }
1725}