001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024import static java.lang.Thread.currentThread;
025import static java.util.Arrays.asList;
026
027import com.google.common.annotations.Beta;
028import com.google.common.base.Function;
029import com.google.common.base.Optional;
030import com.google.common.base.Preconditions;
031import com.google.common.collect.ImmutableCollection;
032import com.google.common.collect.ImmutableList;
033import com.google.common.collect.Lists;
034import com.google.common.collect.Ordering;
035
036import java.lang.reflect.Constructor;
037import java.lang.reflect.InvocationTargetException;
038import java.lang.reflect.UndeclaredThrowableException;
039import java.util.Arrays;
040import java.util.List;
041import java.util.concurrent.CancellationException;
042import java.util.concurrent.CountDownLatch;
043import java.util.concurrent.ExecutionException;
044import java.util.concurrent.Executor;
045import java.util.concurrent.Future;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.TimeoutException;
048import java.util.concurrent.atomic.AtomicInteger;
049import java.util.logging.Level;
050import java.util.logging.Logger;
051
052import javax.annotation.Nullable;
053
054/**
055 * Static utility methods pertaining to the {@link Future} interface.
056 *
057 * <p>Many of these methods use the {@link ListenableFuture} API; consult the
058 * Guava User Guide article on <a href=
059 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
060 * {@code ListenableFuture}</a>.
061 *
062 * @author Kevin Bourrillion
063 * @author Nishant Thakkar
064 * @author Sven Mawson
065 * @since 1.0
066 */
067public final class Futures {
068  private Futures() {}
069
070  /**
071   * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
072   * and a {@link Function} that maps from {@link Exception} instances into the
073   * appropriate checked type.
074   *
075   * <p>The given mapping function will be applied to an
076   * {@link InterruptedException}, a {@link CancellationException}, or an
077   * {@link ExecutionException}.
078   * See {@link Future#get()} for details on the exceptions thrown.
079   *
080   * @since 9.0 (source-compatible since 1.0)
081   */
082  @Beta
083  public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
084      ListenableFuture<V> future, Function<Exception, X> mapper) {
085    return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
086  }
087
088  private abstract static class ImmediateFuture<V>
089      implements ListenableFuture<V> {
090
091    private static final Logger log =
092        Logger.getLogger(ImmediateFuture.class.getName());
093
094    @Override
095    public void addListener(Runnable listener, Executor executor) {
096      checkNotNull(listener, "Runnable was null.");
097      checkNotNull(executor, "Executor was null.");
098      try {
099        executor.execute(listener);
100      } catch (RuntimeException e) {
101        // ListenableFuture's contract is that it will not throw unchecked
102        // exceptions, so log the bad runnable and/or executor and swallow it.
103        log.log(Level.SEVERE, "RuntimeException while executing runnable "
104            + listener + " with executor " + executor, e);
105      }
106    }
107
108    @Override
109    public boolean cancel(boolean mayInterruptIfRunning) {
110      return false;
111    }
112
113    @Override
114    public abstract V get() throws ExecutionException;
115
116    @Override
117    public V get(long timeout, TimeUnit unit) throws ExecutionException {
118      checkNotNull(unit);
119      return get();
120    }
121
122    @Override
123    public boolean isCancelled() {
124      return false;
125    }
126
127    @Override
128    public boolean isDone() {
129      return true;
130    }
131  }
132
133  private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> {
134
135    @Nullable private final V value;
136
137    ImmediateSuccessfulFuture(@Nullable V value) {
138      this.value = value;
139    }
140
141    @Override
142    public V get() {
143      return value;
144    }
145  }
146
147  private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception>
148      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
149
150    @Nullable private final V value;
151
152    ImmediateSuccessfulCheckedFuture(@Nullable V value) {
153      this.value = value;
154    }
155
156    @Override
157    public V get() {
158      return value;
159    }
160
161    @Override
162    public V checkedGet() {
163      return value;
164    }
165
166    @Override
167    public V checkedGet(long timeout, TimeUnit unit) {
168      checkNotNull(unit);
169      return value;
170    }
171  }
172
173  private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> {
174
175    private final Throwable thrown;
176
177    ImmediateFailedFuture(Throwable thrown) {
178      this.thrown = thrown;
179    }
180
181    @Override
182    public V get() throws ExecutionException {
183      throw new ExecutionException(thrown);
184    }
185  }
186
187  private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> {
188
189    private final CancellationException thrown;
190
191    ImmediateCancelledFuture() {
192      this.thrown = new CancellationException("Immediate cancelled future.");
193    }
194
195    @Override
196    public boolean isCancelled() {
197      return true;
198    }
199
200    @Override
201    public V get() {
202      throw AbstractFuture.cancellationExceptionWithCause(
203          "Task was cancelled.", thrown);
204    }
205  }
206
207  private static class ImmediateFailedCheckedFuture<V, X extends Exception>
208      extends ImmediateFuture<V> implements CheckedFuture<V, X> {
209
210    private final X thrown;
211
212    ImmediateFailedCheckedFuture(X thrown) {
213      this.thrown = thrown;
214    }
215
216    @Override
217    public V get() throws ExecutionException {
218      throw new ExecutionException(thrown);
219    }
220
221    @Override
222    public V checkedGet() throws X {
223      throw thrown;
224    }
225
226    @Override
227    public V checkedGet(long timeout, TimeUnit unit) throws X {
228      checkNotNull(unit);
229      throw thrown;
230    }
231  }
232
233  /**
234   * Creates a {@code ListenableFuture} which has its value set immediately upon
235   * construction. The getters just return the value. This {@code Future} can't
236   * be canceled or timed out and its {@code isDone()} method always returns
237   * {@code true}.
238   */
239  public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
240    return new ImmediateSuccessfulFuture<V>(value);
241  }
242
243  /**
244   * Returns a {@code CheckedFuture} which has its value set immediately upon
245   * construction.
246   *
247   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
248   * method always returns {@code true}. Calling {@code get()} or {@code
249   * checkedGet()} will immediately return the provided value.
250   */
251  @Beta
252  public static <V, X extends Exception> CheckedFuture<V, X>
253      immediateCheckedFuture(@Nullable V value) {
254    return new ImmediateSuccessfulCheckedFuture<V, X>(value);
255  }
256
257  /**
258   * Returns a {@code ListenableFuture} which has an exception set immediately
259   * upon construction.
260   *
261   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
262   * method always returns {@code true}. Calling {@code get()} will immediately
263   * throw the provided {@code Throwable} wrapped in an {@code
264   * ExecutionException}.
265   */
266  public static <V> ListenableFuture<V> immediateFailedFuture(
267      Throwable throwable) {
268    checkNotNull(throwable);
269    return new ImmediateFailedFuture<V>(throwable);
270  }
271
272  /**
273   * Creates a {@code ListenableFuture} which is cancelled immediately upon
274   * construction, so that {@code isCancelled()} always returns {@code true}.
275   *
276   * @since 14.0
277   */
278  @Beta
279  public static <V> ListenableFuture<V> immediateCancelledFuture() {
280    return new ImmediateCancelledFuture<V>();
281  }
282
283  /**
284   * Returns a {@code CheckedFuture} which has an exception set immediately upon
285   * construction.
286   *
287   * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
288   * method always returns {@code true}. Calling {@code get()} will immediately
289   * throw the provided {@code Exception} wrapped in an {@code
290   * ExecutionException}, and calling {@code checkedGet()} will throw the
291   * provided exception itself.
292   */
293  @Beta
294  public static <V, X extends Exception> CheckedFuture<V, X>
295      immediateFailedCheckedFuture(X exception) {
296    checkNotNull(exception);
297    return new ImmediateFailedCheckedFuture<V, X>(exception);
298  }
299
300  /**
301   * Returns a {@code Future} whose result is taken from the given primary
302   * {@code input} or, if the primary input fails, from the {@code Future}
303   * provided by the {@code fallback}. {@link FutureFallback#create} is not
304   * invoked until the primary input has failed, so if the primary input
305   * succeeds, it is never invoked. If, during the invocation of {@code
306   * fallback}, an exception is thrown, this exception is used as the result of
307   * the output {@code Future}.
308   *
309   * <p>Below is an example of a fallback that returns a default value if an
310   * exception occurs:
311   *
312   * <pre>   {@code
313   *   ListenableFuture<Integer> fetchCounterFuture = ...;
314   *
315   *   // Falling back to a zero counter in case an exception happens when
316   *   // processing the RPC to fetch counters.
317   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
318   *       fetchCounterFuture, new FutureFallback<Integer>() {
319   *         public ListenableFuture<Integer> create(Throwable t) {
320   *           // Returning "0" as the default for the counter when the
321   *           // exception happens.
322   *           return immediateFuture(0);
323   *         }
324   *       });
325   * }</pre>
326   *
327   * The fallback can also choose to propagate the original exception when
328   * desired:
329   *
330   * <pre>   {@code
331   *   ListenableFuture<Integer> fetchCounterFuture = ...;
332   *
333   *   // Falling back to a zero counter only in case the exception was a
334   *   // TimeoutException.
335   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
336   *       fetchCounterFuture, new FutureFallback<Integer>() {
337   *         public ListenableFuture<Integer> create(Throwable t) {
338   *           if (t instanceof TimeoutException) {
339   *             return immediateFuture(0);
340   *           }
341   *           return immediateFailedFuture(t);
342   *         }
343   *       });
344   * }</pre>
345   *
346   * Note: If the derived {@code Future} is slow or heavyweight to create
347   * (whether the {@code Future} itself is slow or heavyweight to complete is
348   * irrelevant), consider {@linkplain #withFallback(ListenableFuture,
349   * FutureFallback, Executor) supplying an executor}. If you do not supply an
350   * executor, {@code withFallback} will use {@link
351   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
352   * caveats for heavier operations. For example, the call to {@code
353   * fallback.create} may run on an unpredictable or undesirable thread:
354   *
355   * <ul>
356   * <li>If the input {@code Future} is done at the time {@code withFallback}
357   * is called, {@code withFallback} will call {@code fallback.create} inline.
358   * <li>If the input {@code Future} is not yet done, {@code withFallback} will
359   * schedule {@code fallback.create} to be run by the thread that completes
360   * the input {@code Future}, which may be an internal system thread such as
361   * an RPC network thread.
362   * </ul>
363   *
364   * Also note that, regardless of which thread executes {@code
365   * fallback.create}, all other registered but unexecuted listeners are
366   * prevented from running during its execution, even if those listeners are
367   * to run in other executors.
368   *
369   * @param input the primary input {@code Future}
370   * @param fallback the {@link FutureFallback} implementation to be called if
371   *     {@code input} fails
372   * @since 14.0
373   */
374  @Beta
375  public static <V> ListenableFuture<V> withFallback(
376      ListenableFuture<? extends V> input,
377      FutureFallback<? extends V> fallback) {
378    return withFallback(input, fallback, sameThreadExecutor());
379  }
380
381  /**
382   * Returns a {@code Future} whose result is taken from the given primary
383   * {@code input} or, if the primary input fails, from the {@code Future}
384   * provided by the {@code fallback}. {@link FutureFallback#create} is not
385   * invoked until the primary input has failed, so if the primary input
386   * succeeds, it is never invoked. If, during the invocation of {@code
387   * fallback}, an exception is thrown, this exception is used as the result of
388   * the output {@code Future}.
389   *
390   * <p>Below is an example of a fallback that returns a default value if an
391   * exception occurs:
392   *
393   * <pre>   {@code
394   *   ListenableFuture<Integer> fetchCounterFuture = ...;
395   *
396   *   // Falling back to a zero counter in case an exception happens when
397   *   // processing the RPC to fetch counters.
398   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
399   *       fetchCounterFuture, new FutureFallback<Integer>() {
400   *         public ListenableFuture<Integer> create(Throwable t) {
401   *           // Returning "0" as the default for the counter when the
402   *           // exception happens.
403   *           return immediateFuture(0);
404   *         }
405   *       }, sameThreadExecutor());
406   * }</pre>
407   *
408   * The fallback can also choose to propagate the original exception when
409   * desired:
410   *
411   * <pre>   {@code
412   *   ListenableFuture<Integer> fetchCounterFuture = ...;
413   *
414   *   // Falling back to a zero counter only in case the exception was a
415   *   // TimeoutException.
416   *   ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback(
417   *       fetchCounterFuture, new FutureFallback<Integer>() {
418   *         public ListenableFuture<Integer> create(Throwable t) {
419   *           if (t instanceof TimeoutException) {
420   *             return immediateFuture(0);
421   *           }
422   *           return immediateFailedFuture(t);
423   *         }
424   *       }, sameThreadExecutor());
425   * }</pre>
426   *
427   * When the execution of {@code fallback.create} is fast and lightweight
428   * (though the {@code Future} it returns need not meet these criteria),
429   * consider {@linkplain #withFallback(ListenableFuture, FutureFallback)
430   * omitting the executor} or explicitly specifying {@code
431   * sameThreadExecutor}. However, be aware of the caveats documented in the
432   * link above.
433   *
434   * @param input the primary input {@code Future}
435   * @param fallback the {@link FutureFallback} implementation to be called if
436   *     {@code input} fails
437   * @param executor the executor that runs {@code fallback} if {@code input}
438   *     fails
439   * @since 14.0
440   */
441  @Beta
442  public static <V> ListenableFuture<V> withFallback(
443      ListenableFuture<? extends V> input,
444      FutureFallback<? extends V> fallback, Executor executor) {
445    checkNotNull(fallback);
446    return new FallbackFuture<V>(input, fallback, executor);
447  }
448
449  /**
450   * A future that falls back on a second, generated future, in case its
451   * original future fails.
452   */
453  private static class FallbackFuture<V> extends AbstractFuture<V> {
454
455    private volatile ListenableFuture<? extends V> running;
456
457    FallbackFuture(ListenableFuture<? extends V> input,
458        final FutureFallback<? extends V> fallback,
459        final Executor executor) {
460      running = input;
461      addCallback(running, new FutureCallback<V>() {
462        @Override
463        public void onSuccess(V value) {
464          set(value);
465        }
466
467        @Override
468        public void onFailure(Throwable t) {
469          if (isCancelled()) {
470            return;
471          }
472          try {
473            running = fallback.create(t);
474            if (isCancelled()) { // in case cancel called in the meantime
475              running.cancel(wasInterrupted());
476              return;
477            }
478            addCallback(running, new FutureCallback<V>() {
479              @Override
480              public void onSuccess(V value) {
481                set(value);
482              }
483
484              @Override
485              public void onFailure(Throwable t) {
486                if (running.isCancelled()) {
487                  cancel(false);
488                } else {
489                  setException(t);
490                }
491              }
492            }, sameThreadExecutor());
493          } catch (Exception e) {
494            setException(e);
495          } catch (Error e) {
496            setException(e); // note: rethrows
497          }
498        }
499      }, executor);
500    }
501
502    @Override
503    public boolean cancel(boolean mayInterruptIfRunning) {
504      if (super.cancel(mayInterruptIfRunning)) {
505        running.cancel(mayInterruptIfRunning);
506        return true;
507      }
508      return false;
509    }
510  }
511
512  /**
513   * Returns a new {@code ListenableFuture} whose result is asynchronously
514   * derived from the result of the given {@code Future}. More precisely, the
515   * returned {@code Future} takes its result from a {@code Future} produced by
516   * applying the given {@code AsyncFunction} to the result of the original
517   * {@code Future}. Example:
518   *
519   * <pre>   {@code
520   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
521   *   AsyncFunction<RowKey, QueryResult> queryFunction =
522   *       new AsyncFunction<RowKey, QueryResult>() {
523   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
524   *           return dataService.read(rowKey);
525   *         }
526   *       };
527   *   ListenableFuture<QueryResult> queryFuture =
528   *       transform(rowKeyFuture, queryFunction);
529   * }</pre>
530   *
531   * Note: If the derived {@code Future} is slow or heavyweight to create
532   * (whether the {@code Future} itself is slow or heavyweight to complete is
533   * irrelevant), consider {@linkplain #transform(ListenableFuture,
534   * AsyncFunction, Executor) supplying an executor}. If you do not supply an
535   * executor, {@code transform} will use {@link
536   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
537   * caveats for heavier operations. For example, the call to {@code
538   * function.apply} may run on an unpredictable or undesirable thread:
539   *
540   * <ul>
541   * <li>If the input {@code Future} is done at the time {@code transform} is
542   * called, {@code transform} will call {@code function.apply} inline.
543   * <li>If the input {@code Future} is not yet done, {@code transform} will
544   * schedule {@code function.apply} to be run by the thread that completes the
545   * input {@code Future}, which may be an internal system thread such as an
546   * RPC network thread.
547   * </ul>
548   *
549   * Also note that, regardless of which thread executes {@code
550   * function.apply}, all other registered but unexecuted listeners are
551   * prevented from running during its execution, even if those listeners are
552   * to run in other executors.
553   *
554   * <p>The returned {@code Future} attempts to keep its cancellation state in
555   * sync with that of the input future and that of the future returned by the
556   * function. That is, if the returned {@code Future} is cancelled, it will
557   * attempt to cancel the other two, and if either of the other two is
558   * cancelled, the returned {@code Future} will receive a callback in which it
559   * will attempt to cancel itself.
560   *
561   * @param input The future to transform
562   * @param function A function to transform the result of the input future
563   *     to the result of the output future
564   * @return A future that holds result of the function (if the input succeeded)
565   *     or the original input's failure (if not)
566   * @since 11.0
567   */
568  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
569      AsyncFunction<? super I, ? extends O> function) {
570    return transform(input, function, MoreExecutors.sameThreadExecutor());
571  }
572
573  /**
574   * Returns a new {@code ListenableFuture} whose result is asynchronously
575   * derived from the result of the given {@code Future}. More precisely, the
576   * returned {@code Future} takes its result from a {@code Future} produced by
577   * applying the given {@code AsyncFunction} to the result of the original
578   * {@code Future}. Example:
579   *
580   * <pre>   {@code
581   *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
582   *   AsyncFunction<RowKey, QueryResult> queryFunction =
583   *       new AsyncFunction<RowKey, QueryResult>() {
584   *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
585   *           return dataService.read(rowKey);
586   *         }
587   *       };
588   *   ListenableFuture<QueryResult> queryFuture =
589   *       transform(rowKeyFuture, queryFunction, executor);
590   * }</pre>
591   *
592   * <p>The returned {@code Future} attempts to keep its cancellation state in
593   * sync with that of the input future and that of the future returned by the
594   * chain function. That is, if the returned {@code Future} is cancelled, it
595   * will attempt to cancel the other two, and if either of the other two is
596   * cancelled, the returned {@code Future} will receive a callback in which it
597   * will attempt to cancel itself.
598   *
599   * <p>When the execution of {@code function.apply} is fast and lightweight
600   * (though the {@code Future} it returns need not meet these criteria),
601   * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
602   * the executor} or explicitly specifying {@code sameThreadExecutor}.
603   * However, be aware of the caveats documented in the link above.
604   *
605   * @param input The future to transform
606   * @param function A function to transform the result of the input future
607   *     to the result of the output future
608   * @param executor Executor to run the function in.
609   * @return A future that holds result of the function (if the input succeeded)
610   *     or the original input's failure (if not)
611   * @since 11.0
612   */
613  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
614      AsyncFunction<? super I, ? extends O> function,
615      Executor executor) {
616    ChainingListenableFuture<I, O> output =
617        new ChainingListenableFuture<I, O>(function, input);
618    input.addListener(output, executor);
619    return output;
620  }
621
622  /**
623   * Returns a new {@code ListenableFuture} whose result is the product of
624   * applying the given {@code Function} to the result of the given {@code
625   * Future}. Example:
626   *
627   * <pre>   {@code
628   *   ListenableFuture<QueryResult> queryFuture = ...;
629   *   Function<QueryResult, List<Row>> rowsFunction =
630   *       new Function<QueryResult, List<Row>>() {
631   *         public List<Row> apply(QueryResult queryResult) {
632   *           return queryResult.getRows();
633   *         }
634   *       };
635   *   ListenableFuture<List<Row>> rowsFuture =
636   *       transform(queryFuture, rowsFunction);
637   * }</pre>
638   *
639   * Note: If the transformation is slow or heavyweight, consider {@linkplain
640   * #transform(ListenableFuture, Function, Executor) supplying an executor}.
641   * If you do not supply an executor, {@code transform} will use {@link
642   * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
643   * caveats for heavier operations.  For example, the call to {@code
644   * function.apply} may run on an unpredictable or undesirable thread:
645   *
646   * <ul>
647   * <li>If the input {@code Future} is done at the time {@code transform} is
648   * called, {@code transform} will call {@code function.apply} inline.
649   * <li>If the input {@code Future} is not yet done, {@code transform} will
650   * schedule {@code function.apply} to be run by the thread that completes the
651   * input {@code Future}, which may be an internal system thread such as an
652   * RPC network thread.
653   * </ul>
654   *
655   * Also note that, regardless of which thread executes {@code
656   * function.apply}, all other registered but unexecuted listeners are
657   * prevented from running during its execution, even if those listeners are
658   * to run in other executors.
659   *
660   * <p>The returned {@code Future} attempts to keep its cancellation state in
661   * sync with that of the input future. That is, if the returned {@code Future}
662   * is cancelled, it will attempt to cancel the input, and if the input is
663   * cancelled, the returned {@code Future} will receive a callback in which it
664   * will attempt to cancel itself.
665   *
666   * <p>An example use of this method is to convert a serializable object
667   * returned from an RPC into a POJO.
668   *
669   * @param input The future to transform
670   * @param function A Function to transform the results of the provided future
671   *     to the results of the returned future.  This will be run in the thread
672   *     that notifies input it is complete.
673   * @return A future that holds result of the transformation.
674   * @since 9.0 (in 1.0 as {@code compose})
675   */
676  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
677      final Function<? super I, ? extends O> function) {
678    return transform(input, function, MoreExecutors.sameThreadExecutor());
679  }
680
681  /**
682   * Returns a new {@code ListenableFuture} whose result is the product of
683   * applying the given {@code Function} to the result of the given {@code
684   * Future}. Example:
685   *
686   * <pre>   {@code
687   *   ListenableFuture<QueryResult> queryFuture = ...;
688   *   Function<QueryResult, List<Row>> rowsFunction =
689   *       new Function<QueryResult, List<Row>>() {
690   *         public List<Row> apply(QueryResult queryResult) {
691   *           return queryResult.getRows();
692   *         }
693   *       };
694   *   ListenableFuture<List<Row>> rowsFuture =
695   *       transform(queryFuture, rowsFunction, executor);
696   * }</pre>
697   *
698   * <p>The returned {@code Future} attempts to keep its cancellation state in
699   * sync with that of the input future. That is, if the returned {@code Future}
700   * is cancelled, it will attempt to cancel the input, and if the input is
701   * cancelled, the returned {@code Future} will receive a callback in which it
702   * will attempt to cancel itself.
703   *
704   * <p>An example use of this method is to convert a serializable object
705   * returned from an RPC into a POJO.
706   *
707   * <p>When the transformation is fast and lightweight, consider {@linkplain
708   * #transform(ListenableFuture, Function) omitting the executor} or
709   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
710   * caveats documented in the link above.
711   *
712   * @param input The future to transform
713   * @param function A Function to transform the results of the provided future
714   *     to the results of the returned future.
715   * @param executor Executor to run the function in.
716   * @return A future that holds result of the transformation.
717   * @since 9.0 (in 2.0 as {@code compose})
718   */
719  public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
720      final Function<? super I, ? extends O> function, Executor executor) {
721    checkNotNull(function);
722    AsyncFunction<I, O> wrapperFunction
723        = new AsyncFunction<I, O>() {
724            @Override public ListenableFuture<O> apply(I input) {
725              O output = function.apply(input);
726              return immediateFuture(output);
727            }
728        };
729    return transform(input, wrapperFunction, executor);
730  }
731
732  /**
733   * Like {@link #transform(ListenableFuture, Function)} except that the
734   * transformation {@code function} is invoked on each call to
735   * {@link Future#get() get()} on the returned future.
736   *
737   * <p>The returned {@code Future} reflects the input's cancellation
738   * state directly, and any attempt to cancel the returned Future is likewise
739   * passed through to the input Future.
740   *
741   * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
742   * only apply the timeout to the execution of the underlying {@code Future},
743   * <em>not</em> to the execution of the transformation function.
744   *
745   * <p>The primary audience of this method is callers of {@code transform}
746   * who don't have a {@code ListenableFuture} available and
747   * do not mind repeated, lazy function evaluation.
748   *
749   * @param input The future to transform
750   * @param function A Function to transform the results of the provided future
751   *     to the results of the returned future.
752   * @return A future that returns the result of the transformation.
753   * @since 10.0
754   */
755  @Beta
756  public static <I, O> Future<O> lazyTransform(final Future<I> input,
757      final Function<? super I, ? extends O> function) {
758    checkNotNull(input);
759    checkNotNull(function);
760    return new Future<O>() {
761
762      @Override
763      public boolean cancel(boolean mayInterruptIfRunning) {
764        return input.cancel(mayInterruptIfRunning);
765      }
766
767      @Override
768      public boolean isCancelled() {
769        return input.isCancelled();
770      }
771
772      @Override
773      public boolean isDone() {
774        return input.isDone();
775      }
776
777      @Override
778      public O get() throws InterruptedException, ExecutionException {
779        return applyTransformation(input.get());
780      }
781
782      @Override
783      public O get(long timeout, TimeUnit unit)
784          throws InterruptedException, ExecutionException, TimeoutException {
785        return applyTransformation(input.get(timeout, unit));
786      }
787
788      private O applyTransformation(I input) throws ExecutionException {
789        try {
790          return function.apply(input);
791        } catch (Throwable t) {
792          throw new ExecutionException(t);
793        }
794      }
795    };
796  }
797
798  /**
799   * An implementation of {@code ListenableFuture} that also implements
800   * {@code Runnable} so that it can be used to nest ListenableFutures.
801   * Once the passed-in {@code ListenableFuture} is complete, it calls the
802   * passed-in {@code Function} to generate the result.
803   *
804   * <p>If the function throws any checked exceptions, they should be wrapped
805   * in a {@code UndeclaredThrowableException} so that this class can get
806   * access to the cause.
807   */
808  private static class ChainingListenableFuture<I, O>
809      extends AbstractFuture<O> implements Runnable {
810
811    private AsyncFunction<? super I, ? extends O> function;
812    private ListenableFuture<? extends I> inputFuture;
813    private volatile ListenableFuture<? extends O> outputFuture;
814    private final CountDownLatch outputCreated = new CountDownLatch(1);
815
816    private ChainingListenableFuture(
817        AsyncFunction<? super I, ? extends O> function,
818        ListenableFuture<? extends I> inputFuture) {
819      this.function = checkNotNull(function);
820      this.inputFuture = checkNotNull(inputFuture);
821    }
822
823    @Override
824    public boolean cancel(boolean mayInterruptIfRunning) {
825      /*
826       * Our additional cancellation work needs to occur even if
827       * !mayInterruptIfRunning, so we can't move it into interruptTask().
828       */
829      if (super.cancel(mayInterruptIfRunning)) {
830        // This should never block since only one thread is allowed to cancel
831        // this Future.
832        cancel(inputFuture, mayInterruptIfRunning);
833        cancel(outputFuture, mayInterruptIfRunning);
834        return true;
835      }
836      return false;
837    }
838
839    private void cancel(@Nullable Future<?> future,
840        boolean mayInterruptIfRunning) {
841      if (future != null) {
842        future.cancel(mayInterruptIfRunning);
843      }
844    }
845
846    @Override
847    public void run() {
848      try {
849        I sourceResult;
850        try {
851          sourceResult = getUninterruptibly(inputFuture);
852        } catch (CancellationException e) {
853          // Cancel this future and return.
854          // At this point, inputFuture is cancelled and outputFuture doesn't
855          // exist, so the value of mayInterruptIfRunning is irrelevant.
856          cancel(false);
857          return;
858        } catch (ExecutionException e) {
859          // Set the cause of the exception as this future's exception
860          setException(e.getCause());
861          return;
862        }
863
864        final ListenableFuture<? extends O> outputFuture = this.outputFuture =
865            function.apply(sourceResult);
866        if (isCancelled()) {
867          outputFuture.cancel(wasInterrupted());
868          this.outputFuture = null;
869          return;
870        }
871        outputFuture.addListener(new Runnable() {
872            @Override
873            public void run() {
874              try {
875                // Here it would have been nice to have had an
876                // UninterruptibleListenableFuture, but we don't want to start a
877                // combinatorial explosion of interfaces, so we have to make do.
878                set(getUninterruptibly(outputFuture));
879              } catch (CancellationException e) {
880                // Cancel this future and return.
881                // At this point, inputFuture and outputFuture are done, so the
882                // value of mayInterruptIfRunning is irrelevant.
883                cancel(false);
884                return;
885              } catch (ExecutionException e) {
886                // Set the cause of the exception as this future's exception
887                setException(e.getCause());
888              } finally {
889                // Don't pin inputs beyond completion
890                ChainingListenableFuture.this.outputFuture = null;
891              }
892            }
893          }, MoreExecutors.sameThreadExecutor());
894      } catch (UndeclaredThrowableException e) {
895        // Set the cause of the exception as this future's exception
896        setException(e.getCause());
897      } catch (Exception e) {
898        // This exception is irrelevant in this thread, but useful for the
899        // client
900        setException(e);
901      } catch (Error e) {
902        // Propagate errors up ASAP - our superclass will rethrow the error
903        setException(e);
904      } finally {
905        // Don't pin inputs beyond completion
906        function = null;
907        inputFuture = null;
908        // Allow our get routines to examine outputFuture now.
909        outputCreated.countDown();
910      }
911    }
912  }
913
914  /**
915   * Returns a new {@code ListenableFuture} whose result is the product of
916   * calling {@code get()} on the {@code Future} nested within the given {@code
917   * Future}, effectively chaining the futures one after the other.  Example:
918   *
919   * <pre>   {@code
920   *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
921   *   ListenableFuture<String> dereferenced = dereference(nested);
922   * }</pre>
923   *
924   * <p>This call has the same cancellation and execution semantics as {@link
925   * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
926   * Future} attempts to keep its cancellation state in sync with both the
927   * input {@code Future} and the nested {@code Future}.  The transformation
928   * is very lightweight and therefore takes place in the thread that called
929   * {@code dereference}.
930   *
931   * @param nested The nested future to transform.
932   * @return A future that holds result of the inner future.
933   * @since 13.0
934   */
935  @Beta
936  @SuppressWarnings({"rawtypes", "unchecked"})
937  public static <V> ListenableFuture<V> dereference(
938      ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
939    return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
940  }
941
942  /**
943   * Helper {@code Function} for {@link #dereference}.
944   */
945  private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
946      new AsyncFunction<ListenableFuture<Object>, Object>() {
947        @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
948          return input;
949        }
950      };
951
952  /**
953   * Creates a new {@code ListenableFuture} whose value is a list containing the
954   * values of all its input futures, if all succeed. If any input fails, the
955   * returned future fails.
956   *
957   * <p>The list of results is in the same order as the input list.
958   *
959   * <p>Canceling this future will attempt to cancel all the component futures,
960   * and if any of the provided futures fails or is canceled, this one is,
961   * too.
962   *
963   * @param futures futures to combine
964   * @return a future that provides a list of the results of the component
965   *         futures
966   * @since 10.0
967   */
968  @Beta
969  public static <V> ListenableFuture<List<V>> allAsList(
970      ListenableFuture<? extends V>... futures) {
971    return listFuture(ImmutableList.copyOf(futures), true,
972        MoreExecutors.sameThreadExecutor());
973  }
974
975  /**
976   * Creates a new {@code ListenableFuture} whose value is a list containing the
977   * values of all its input futures, if all succeed. If any input fails, the
978   * returned future fails.
979   *
980   * <p>The list of results is in the same order as the input list.
981   *
982   * <p>Canceling this future will attempt to cancel all the component futures,
983   * and if any of the provided futures fails or is canceled, this one is,
984   * too.
985   *
986   * @param futures futures to combine
987   * @return a future that provides a list of the results of the component
988   *         futures
989   * @since 10.0
990   */
991  @Beta
992  public static <V> ListenableFuture<List<V>> allAsList(
993      Iterable<? extends ListenableFuture<? extends V>> futures) {
994    return listFuture(ImmutableList.copyOf(futures), true,
995        MoreExecutors.sameThreadExecutor());
996  }
997
998  /**
999   * Creates a new {@code ListenableFuture} whose value is a list containing the
1000   * values of all its successful input futures. The list of results is in the
1001   * same order as the input list, and if any of the provided futures fails or
1002   * is canceled, its corresponding position will contain {@code null} (which is
1003   * indistinguishable from the future having a successful value of
1004   * {@code null}).
1005   *
1006   * <p>Canceling this future will attempt to cancel all the component futures.
1007   *
1008   * @param futures futures to combine
1009   * @return a future that provides a list of the results of the component
1010   *         futures
1011   * @since 10.0
1012   */
1013  @Beta
1014  public static <V> ListenableFuture<List<V>> successfulAsList(
1015      ListenableFuture<? extends V>... futures) {
1016    return listFuture(ImmutableList.copyOf(futures), false,
1017        MoreExecutors.sameThreadExecutor());
1018  }
1019
1020  /**
1021   * Creates a new {@code ListenableFuture} whose value is a list containing the
1022   * values of all its successful input futures. The list of results is in the
1023   * same order as the input list, and if any of the provided futures fails or
1024   * is canceled, its corresponding position will contain {@code null} (which is
1025   * indistinguishable from the future having a successful value of
1026   * {@code null}).
1027   *
1028   * <p>Canceling this future will attempt to cancel all the component futures.
1029   *
1030   * @param futures futures to combine
1031   * @return a future that provides a list of the results of the component
1032   *         futures
1033   * @since 10.0
1034   */
1035  @Beta
1036  public static <V> ListenableFuture<List<V>> successfulAsList(
1037      Iterable<? extends ListenableFuture<? extends V>> futures) {
1038    return listFuture(ImmutableList.copyOf(futures), false,
1039        MoreExecutors.sameThreadExecutor());
1040  }
1041
1042  /**
1043   * Registers separate success and failure callbacks to be run when the {@code
1044   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1045   * complete} or, if the computation is already complete, immediately.
1046   *
1047   * <p>There is no guaranteed ordering of execution of callbacks, but any
1048   * callback added through this method is guaranteed to be called once the
1049   * computation is complete.
1050   *
1051   * Example: <pre> {@code
1052   * ListenableFuture<QueryResult> future = ...;
1053   * addCallback(future,
1054   *     new FutureCallback<QueryResult> {
1055   *       public void onSuccess(QueryResult result) {
1056   *         storeInCache(result);
1057   *       }
1058   *       public void onFailure(Throwable t) {
1059   *         reportError(t);
1060   *       }
1061   *     });}</pre>
1062   *
1063   * Note: If the callback is slow or heavyweight, consider {@linkplain
1064   * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
1065   * executor}. If you do not supply an executor, {@code addCallback} will use
1066   * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries
1067   * some caveats for heavier operations. For example, the callback may run on
1068   * an unpredictable or undesirable thread:
1069   *
1070   * <ul>
1071   * <li>If the input {@code Future} is done at the time {@code addCallback} is
1072   * called, {@code addCallback} will execute the callback inline.
1073   * <li>If the input {@code Future} is not yet done, {@code addCallback} will
1074   * schedule the callback to be run by the thread that completes the input
1075   * {@code Future}, which may be an internal system thread such as an RPC
1076   * network thread.
1077   * </ul>
1078   *
1079   * Also note that, regardless of which thread executes the callback, all
1080   * other registered but unexecuted listeners are prevented from running
1081   * during its execution, even if those listeners are to run in other
1082   * executors.
1083   *
1084   * <p>For a more general interface to attach a completion listener to a
1085   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1086   *
1087   * @param future The future attach the callback to.
1088   * @param callback The callback to invoke when {@code future} is completed.
1089   * @since 10.0
1090   */
1091  public static <V> void addCallback(ListenableFuture<V> future,
1092      FutureCallback<? super V> callback) {
1093    addCallback(future, callback, MoreExecutors.sameThreadExecutor());
1094  }
1095
1096  /**
1097   * Registers separate success and failure callbacks to be run when the {@code
1098   * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1099   * complete} or, if the computation is already complete, immediately.
1100   *
1101   * <p>The callback is run in {@code executor}.
1102   * There is no guaranteed ordering of execution of callbacks, but any
1103   * callback added through this method is guaranteed to be called once the
1104   * computation is complete.
1105   *
1106   * Example: <pre> {@code
1107   * ListenableFuture<QueryResult> future = ...;
1108   * Executor e = ...
1109   * addCallback(future, e,
1110   *     new FutureCallback<QueryResult> {
1111   *       public void onSuccess(QueryResult result) {
1112   *         storeInCache(result);
1113   *       }
1114   *       public void onFailure(Throwable t) {
1115   *         reportError(t);
1116   *       }
1117   *     });}</pre>
1118   *
1119   * When the callback is fast and lightweight, consider {@linkplain
1120   * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
1121   * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
1122   * caveats documented in the link above.
1123   *
1124   * <p>For a more general interface to attach a completion listener to a
1125   * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1126   *
1127   * @param future The future attach the callback to.
1128   * @param callback The callback to invoke when {@code future} is completed.
1129   * @param executor The executor to run {@code callback} when the future
1130   *    completes.
1131   * @since 10.0
1132   */
1133  public static <V> void addCallback(final ListenableFuture<V> future,
1134      final FutureCallback<? super V> callback, Executor executor) {
1135    Preconditions.checkNotNull(callback);
1136    Runnable callbackListener = new Runnable() {
1137      @Override
1138      public void run() {
1139        final V value;
1140        try {
1141          // TODO(user): (Before Guava release), validate that this
1142          // is the thing for IE.
1143          value = getUninterruptibly(future);
1144        } catch (ExecutionException e) {
1145          callback.onFailure(e.getCause());
1146          return;
1147        } catch (RuntimeException e) {
1148          callback.onFailure(e);
1149          return;
1150        } catch (Error e) {
1151          callback.onFailure(e);
1152          return;
1153        }
1154        callback.onSuccess(value);
1155      }
1156    };
1157    future.addListener(callbackListener, executor);
1158  }
1159
1160  /**
1161   * Returns the result of {@link Future#get()}, converting most exceptions to a
1162   * new instance of the given checked exception type. This reduces boilerplate
1163   * for a common use of {@code Future} in which it is unnecessary to
1164   * programmatically distinguish between exception types or to extract other
1165   * information from the exception instance.
1166   *
1167   * <p>Exceptions from {@code Future.get} are treated as follows:
1168   * <ul>
1169   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1170   *     {@code X} if the cause is a checked exception, an {@link
1171   *     UncheckedExecutionException} if the cause is a {@code
1172   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1173   *     {@code Error}.
1174   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1175   *     restoring the interrupt).
1176   * <li>Any {@link CancellationException} is propagated untouched, as is any
1177   *     other {@link RuntimeException} (though {@code get} implementations are
1178   *     discouraged from throwing such exceptions).
1179   * </ul>
1180   *
1181   * The overall principle is to continue to treat every checked exception as a
1182   * checked exception, every unchecked exception as an unchecked exception, and
1183   * every error as an error. In addition, the cause of any {@code
1184   * ExecutionException} is wrapped in order to ensure that the new stack trace
1185   * matches that of the current thread.
1186   *
1187   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1188   * public constructor that accepts zero or more arguments, all of type {@code
1189   * String} or {@code Throwable} (preferring constructors with at least one
1190   * {@code String}) and calling the constructor via reflection. If the
1191   * exception did not already have a cause, one is set by calling {@link
1192   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1193   * {@code IllegalArgumentException} is thrown.
1194   *
1195   * @throws X if {@code get} throws any checked exception except for an {@code
1196   *         ExecutionException} whose cause is not itself a checked exception
1197   * @throws UncheckedExecutionException if {@code get} throws an {@code
1198   *         ExecutionException} with a {@code RuntimeException} as its cause
1199   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1200   *         with an {@code Error} as its cause
1201   * @throws CancellationException if {@code get} throws a {@code
1202   *         CancellationException}
1203   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1204   *         RuntimeException} or does not have a suitable constructor
1205   * @since 10.0
1206   */
1207  @Beta
1208  public static <V, X extends Exception> V get(
1209      Future<V> future, Class<X> exceptionClass) throws X {
1210    checkNotNull(future);
1211    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1212        "Futures.get exception type (%s) must not be a RuntimeException",
1213        exceptionClass);
1214    try {
1215      return future.get();
1216    } catch (InterruptedException e) {
1217      currentThread().interrupt();
1218      throw newWithCause(exceptionClass, e);
1219    } catch (ExecutionException e) {
1220      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1221      throw new AssertionError();
1222    }
1223  }
1224
1225  /**
1226   * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1227   * exceptions to a new instance of the given checked exception type. This
1228   * reduces boilerplate for a common use of {@code Future} in which it is
1229   * unnecessary to programmatically distinguish between exception types or to
1230   * extract other information from the exception instance.
1231   *
1232   * <p>Exceptions from {@code Future.get} are treated as follows:
1233   * <ul>
1234   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1235   *     {@code X} if the cause is a checked exception, an {@link
1236   *     UncheckedExecutionException} if the cause is a {@code
1237   *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1238   *     {@code Error}.
1239   * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1240   *     restoring the interrupt).
1241   * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1242   * <li>Any {@link CancellationException} is propagated untouched, as is any
1243   *     other {@link RuntimeException} (though {@code get} implementations are
1244   *     discouraged from throwing such exceptions).
1245   * </ul>
1246   *
1247   * The overall principle is to continue to treat every checked exception as a
1248   * checked exception, every unchecked exception as an unchecked exception, and
1249   * every error as an error. In addition, the cause of any {@code
1250   * ExecutionException} is wrapped in order to ensure that the new stack trace
1251   * matches that of the current thread.
1252   *
1253   * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1254   * public constructor that accepts zero or more arguments, all of type {@code
1255   * String} or {@code Throwable} (preferring constructors with at least one
1256   * {@code String}) and calling the constructor via reflection. If the
1257   * exception did not already have a cause, one is set by calling {@link
1258   * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1259   * {@code IllegalArgumentException} is thrown.
1260   *
1261   * @throws X if {@code get} throws any checked exception except for an {@code
1262   *         ExecutionException} whose cause is not itself a checked exception
1263   * @throws UncheckedExecutionException if {@code get} throws an {@code
1264   *         ExecutionException} with a {@code RuntimeException} as its cause
1265   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1266   *         with an {@code Error} as its cause
1267   * @throws CancellationException if {@code get} throws a {@code
1268   *         CancellationException}
1269   * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1270   *         RuntimeException} or does not have a suitable constructor
1271   * @since 10.0
1272   */
1273  @Beta
1274  public static <V, X extends Exception> V get(
1275      Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1276      throws X {
1277    checkNotNull(future);
1278    checkNotNull(unit);
1279    checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1280        "Futures.get exception type (%s) must not be a RuntimeException",
1281        exceptionClass);
1282    try {
1283      return future.get(timeout, unit);
1284    } catch (InterruptedException e) {
1285      currentThread().interrupt();
1286      throw newWithCause(exceptionClass, e);
1287    } catch (TimeoutException e) {
1288      throw newWithCause(exceptionClass, e);
1289    } catch (ExecutionException e) {
1290      wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1291      throw new AssertionError();
1292    }
1293  }
1294
1295  private static <X extends Exception> void wrapAndThrowExceptionOrError(
1296      Throwable cause, Class<X> exceptionClass) throws X {
1297    if (cause instanceof Error) {
1298      throw new ExecutionError((Error) cause);
1299    }
1300    if (cause instanceof RuntimeException) {
1301      throw new UncheckedExecutionException(cause);
1302    }
1303    throw newWithCause(exceptionClass, cause);
1304  }
1305
1306  /**
1307   * Returns the result of calling {@link Future#get()} uninterruptibly on a
1308   * task known not to throw a checked exception. This makes {@code Future} more
1309   * suitable for lightweight, fast-running tasks that, barring bugs in the
1310   * code, will not fail. This gives it exception-handling behavior similar to
1311   * that of {@code ForkJoinTask.join}.
1312   *
1313   * <p>Exceptions from {@code Future.get} are treated as follows:
1314   * <ul>
1315   * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1316   *     {@link UncheckedExecutionException} (if the cause is an {@code
1317   *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1318   *     Error}).
1319   * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1320   *     call. The interrupt is restored before {@code getUnchecked} returns.
1321   * <li>Any {@link CancellationException} is propagated untouched. So is any
1322   *     other {@link RuntimeException} ({@code get} implementations are
1323   *     discouraged from throwing such exceptions).
1324   * </ul>
1325   *
1326   * The overall principle is to eliminate all checked exceptions: to loop to
1327   * avoid {@code InterruptedException}, to pass through {@code
1328   * CancellationException}, and to wrap any exception from the underlying
1329   * computation in an {@code UncheckedExecutionException} or {@code
1330   * ExecutionError}.
1331   *
1332   * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1333   * {@link Uninterruptibles#getUninterruptibly(Future)}.
1334   *
1335   * @throws UncheckedExecutionException if {@code get} throws an {@code
1336   *         ExecutionException} with an {@code Exception} as its cause
1337   * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1338   *         with an {@code Error} as its cause
1339   * @throws CancellationException if {@code get} throws a {@code
1340   *         CancellationException}
1341   * @since 10.0
1342   */
1343  @Beta
1344  public static <V> V getUnchecked(Future<V> future) {
1345    checkNotNull(future);
1346    try {
1347      return getUninterruptibly(future);
1348    } catch (ExecutionException e) {
1349      wrapAndThrowUnchecked(e.getCause());
1350      throw new AssertionError();
1351    }
1352  }
1353
1354  private static void wrapAndThrowUnchecked(Throwable cause) {
1355    if (cause instanceof Error) {
1356      throw new ExecutionError((Error) cause);
1357    }
1358    /*
1359     * It's a non-Error, non-Exception Throwable. From my survey of such
1360     * classes, I believe that most users intended to extend Exception, so we'll
1361     * treat it like an Exception.
1362     */
1363    throw new UncheckedExecutionException(cause);
1364  }
1365
1366  /*
1367   * TODO(user): FutureChecker interface for these to be static methods on? If
1368   * so, refer to it in the (static-method) Futures.get documentation
1369   */
1370
1371  /*
1372   * Arguably we don't need a timed getUnchecked because any operation slow
1373   * enough to require a timeout is heavyweight enough to throw a checked
1374   * exception and therefore be inappropriate to use with getUnchecked. Further,
1375   * it's not clear that converting the checked TimeoutException to a
1376   * RuntimeException -- especially to an UncheckedExecutionException, since it
1377   * wasn't thrown by the computation -- makes sense, and if we don't convert
1378   * it, the user still has to write a try-catch block.
1379   *
1380   * If you think you would use this method, let us know.
1381   */
1382
1383  private static <X extends Exception> X newWithCause(
1384      Class<X> exceptionClass, Throwable cause) {
1385    // getConstructors() guarantees this as long as we don't modify the array.
1386    @SuppressWarnings("unchecked")
1387    List<Constructor<X>> constructors =
1388        (List) Arrays.asList(exceptionClass.getConstructors());
1389    for (Constructor<X> constructor : preferringStrings(constructors)) {
1390      @Nullable X instance = newFromConstructor(constructor, cause);
1391      if (instance != null) {
1392        if (instance.getCause() == null) {
1393          instance.initCause(cause);
1394        }
1395        return instance;
1396      }
1397    }
1398    throw new IllegalArgumentException(
1399        "No appropriate constructor for exception of type " + exceptionClass
1400            + " in response to chained exception", cause);
1401  }
1402
1403  private static <X extends Exception> List<Constructor<X>>
1404      preferringStrings(List<Constructor<X>> constructors) {
1405    return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1406  }
1407
1408  private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1409      Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1410        @Override public Boolean apply(Constructor<?> input) {
1411          return asList(input.getParameterTypes()).contains(String.class);
1412        }
1413      }).reverse();
1414
1415  @Nullable private static <X> X newFromConstructor(
1416      Constructor<X> constructor, Throwable cause) {
1417    Class<?>[] paramTypes = constructor.getParameterTypes();
1418    Object[] params = new Object[paramTypes.length];
1419    for (int i = 0; i < paramTypes.length; i++) {
1420      Class<?> paramType = paramTypes[i];
1421      if (paramType.equals(String.class)) {
1422        params[i] = cause.toString();
1423      } else if (paramType.equals(Throwable.class)) {
1424        params[i] = cause;
1425      } else {
1426        return null;
1427      }
1428    }
1429    try {
1430      return constructor.newInstance(params);
1431    } catch (IllegalArgumentException e) {
1432      return null;
1433    } catch (InstantiationException e) {
1434      return null;
1435    } catch (IllegalAccessException e) {
1436      return null;
1437    } catch (InvocationTargetException e) {
1438      return null;
1439    }
1440  }
1441
1442  private interface FutureCombiner<V, C> {
1443    C combine(List<Optional<V>> values);
1444  }
1445
1446  private static class CombinedFuture<V, C> extends AbstractFuture<C> {
1447    ImmutableCollection<? extends ListenableFuture<? extends V>> futures;
1448    final boolean allMustSucceed;
1449    final AtomicInteger remaining;
1450    FutureCombiner<V, C> combiner;
1451    List<Optional<V>> values;
1452
1453    CombinedFuture(
1454        ImmutableCollection<? extends ListenableFuture<? extends V>> futures,
1455        boolean allMustSucceed, Executor listenerExecutor,
1456        FutureCombiner<V, C> combiner) {
1457      this.futures = futures;
1458      this.allMustSucceed = allMustSucceed;
1459      this.remaining = new AtomicInteger(futures.size());
1460      this.combiner = combiner;
1461      this.values = Lists.newArrayListWithCapacity(futures.size());
1462      init(listenerExecutor);
1463    }
1464
1465    /**
1466     * Must be called at the end of the constructor.
1467     */
1468    protected void init(final Executor listenerExecutor) {
1469      // First, schedule cleanup to execute when the Future is done.
1470      addListener(new Runnable() {
1471        @Override
1472        public void run() {
1473          // Cancel all the component futures.
1474          if (CombinedFuture.this.isCancelled()) {
1475            for (ListenableFuture<?> future : CombinedFuture.this.futures) {
1476              future.cancel(CombinedFuture.this.wasInterrupted());
1477            }
1478          }
1479
1480          // By now the values array has either been set as the Future's value,
1481          // or (in case of failure) is no longer useful.
1482          CombinedFuture.this.futures = null;
1483
1484          // Let go of the memory held by other futures
1485          CombinedFuture.this.values = null;
1486
1487          // The combiner may also hold state, so free that as well
1488          CombinedFuture.this.combiner = null;
1489        }
1490      }, MoreExecutors.sameThreadExecutor());
1491
1492      // Now begin the "real" initialization.
1493
1494      // Corner case: List is empty.
1495      if (futures.isEmpty()) {
1496        set(combiner.combine(ImmutableList.<Optional<V>>of()));
1497        return;
1498      }
1499
1500      // Populate the results list with null initially.
1501      for (int i = 0; i < futures.size(); ++i) {
1502        values.add(null);
1503      }
1504
1505      // Register a listener on each Future in the list to update
1506      // the state of this future.
1507      // Note that if all the futures on the list are done prior to completing
1508      // this loop, the last call to addListener() will callback to
1509      // setOneValue(), transitively call our cleanup listener, and set
1510      // this.futures to null.
1511      // This is not actually a problem, since the foreach only needs
1512      // this.futures to be non-null at the beginning of the loop.
1513      int i = 0;
1514      for (final ListenableFuture<? extends V> listenable : futures) {
1515        final int index = i++;
1516        listenable.addListener(new Runnable() {
1517          @Override
1518          public void run() {
1519            setOneValue(index, listenable);
1520          }
1521        }, listenerExecutor);
1522      }
1523    }
1524
1525    /**
1526     * Sets the value at the given index to that of the given future.
1527     */
1528    private void setOneValue(int index, Future<? extends V> future) {
1529      List<Optional<V>> localValues = values;
1530      if (isDone() || localValues == null) {
1531        // Some other future failed or has been cancelled, causing this one to
1532        // also be cancelled or have an exception set. This should only happen
1533        // if allMustSucceed is true or if the output itself has been cancelled.
1534        checkState(allMustSucceed || isCancelled(),
1535            "Future was done before all dependencies completed");
1536        return;
1537      }
1538
1539      try {
1540        checkState(future.isDone(),
1541            "Tried to set value from future which is not done");
1542        V returnValue = getUninterruptibly(future);
1543        localValues.set(index, Optional.fromNullable(returnValue));
1544      } catch (CancellationException e) {
1545        if (allMustSucceed) {
1546          // Set ourselves as cancelled. Let the input futures keep running
1547          // as some of them may be used elsewhere.
1548          // (Currently we don't override interruptTask, so
1549          // mayInterruptIfRunning==false isn't technically necessary.)
1550          cancel(false);
1551        }
1552      } catch (ExecutionException e) {
1553        if (allMustSucceed) {
1554          // As soon as the first one fails, throw the exception up.
1555          // The result of all other inputs is then ignored.
1556          setException(e.getCause());
1557        }
1558      } catch (RuntimeException e) {
1559        if (allMustSucceed) {
1560          setException(e);
1561        }
1562      } catch (Error e) {
1563        // Propagate errors up ASAP - our superclass will rethrow the error
1564        setException(e);
1565      } finally {
1566        int newRemaining = remaining.decrementAndGet();
1567        checkState(newRemaining >= 0, "Less than 0 remaining futures");
1568        if (newRemaining == 0) {
1569          FutureCombiner<V, C> localCombiner = combiner;
1570          if (localCombiner != null) {
1571            set(localCombiner.combine(localValues));
1572          } else {
1573            checkState(isDone());
1574          }
1575        }
1576      }
1577    }
1578
1579  }
1580
1581  /** Used for {@link #allAsList} and {@link #successfulAsList}. */
1582  private static <V> ListenableFuture<List<V>> listFuture(
1583      ImmutableList<ListenableFuture<? extends V>> futures,
1584      boolean allMustSucceed, Executor listenerExecutor) {
1585    return new CombinedFuture<V, List<V>>(
1586        futures, allMustSucceed, listenerExecutor,
1587        new FutureCombiner<V, List<V>>() {
1588          @Override
1589          public List<V> combine(List<Optional<V>> values) {
1590            List<V> result = Lists.newArrayList();
1591            for (Optional<V> element : values) {
1592              result.add(element != null ? element.orNull() : null);
1593            }
1594            // TODO(user): This should ultimately return an unmodifiableList
1595            return result;
1596          }
1597        });
1598  }
1599
1600  /**
1601   * A checked future that uses a function to map from exceptions to the
1602   * appropriate checked type.
1603   */
1604  private static class MappingCheckedFuture<V, X extends Exception> extends
1605      AbstractCheckedFuture<V, X> {
1606
1607    final Function<Exception, X> mapper;
1608
1609    MappingCheckedFuture(ListenableFuture<V> delegate,
1610        Function<Exception, X> mapper) {
1611      super(delegate);
1612
1613      this.mapper = checkNotNull(mapper);
1614    }
1615
1616    @Override
1617    protected X mapException(Exception e) {
1618      return mapper.apply(e);
1619    }
1620  }
1621}