001    /*
002     * Copyright (C) 2006 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkArgument;
020    import static com.google.common.base.Preconditions.checkNotNull;
021    import static com.google.common.base.Preconditions.checkState;
022    import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023    import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024    import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
025    import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
026    import static java.lang.Thread.currentThread;
027    import static java.util.Arrays.asList;
028    import static java.util.concurrent.TimeUnit.NANOSECONDS;
029    
030    import com.google.common.annotations.Beta;
031    import com.google.common.base.Function;
032    import com.google.common.base.Preconditions;
033    import com.google.common.collect.ImmutableList;
034    import com.google.common.collect.Lists;
035    import com.google.common.collect.Ordering;
036    
037    import java.lang.reflect.Constructor;
038    import java.lang.reflect.InvocationTargetException;
039    import java.lang.reflect.UndeclaredThrowableException;
040    import java.util.Arrays;
041    import java.util.List;
042    import java.util.concurrent.BlockingQueue;
043    import java.util.concurrent.CancellationException;
044    import java.util.concurrent.CountDownLatch;
045    import java.util.concurrent.ExecutionException;
046    import java.util.concurrent.Executor;
047    import java.util.concurrent.Future;
048    import java.util.concurrent.LinkedBlockingQueue;
049    import java.util.concurrent.TimeUnit;
050    import java.util.concurrent.TimeoutException;
051    import java.util.concurrent.atomic.AtomicInteger;
052    
053    import javax.annotation.Nullable;
054    
055    /**
056     * Static utility methods pertaining to the {@link Future} interface.
057     *
058     * @author Kevin Bourrillion
059     * @author Nishant Thakkar
060     * @author Sven Mawson
061     * @since 1.0
062     */
063    @Beta
064    public final class Futures {
065      private Futures() {}
066    
067      /**
068       * Returns an uninterruptible view of a {@code Future}. If a thread is
069       * interrupted during an attempt to {@code get()} from the returned future, it
070       * continues to wait on the result until it is available or the timeout
071       * elapses, and only then re-interrupts the thread.
072       * @deprecated Use
073       * {@link Uninterruptibles#getUninterruptibly(Future) getUninterruptibly}.
074       * <b>This method is scheduled for deletion in Guava Release 11.</b>
075       */
076      @Deprecated @SuppressWarnings("deprecation")
077      public
078      static <V> UninterruptibleFuture<V> makeUninterruptible(
079          final Future<V> future) {
080        checkNotNull(future);
081        if (future instanceof UninterruptibleFuture<?>) {
082          return (UninterruptibleFuture<V>) future;
083        }
084        return new UninterruptibleFuture<V>() {
085          @Override
086          public boolean cancel(boolean mayInterruptIfRunning) {
087            return future.cancel(mayInterruptIfRunning);
088          }
089          @Override
090          public boolean isCancelled() {
091            return future.isCancelled();
092          }
093          @Override
094          public boolean isDone() {
095            return future.isDone();
096          }
097    
098          @Override
099          public V get(long timeout, TimeUnit unit)
100              throws TimeoutException, ExecutionException {
101            return Uninterruptibles.getUninterruptibly(future, timeout, unit);
102          }
103    
104          @Override
105          public V get() throws ExecutionException {
106            return Uninterruptibles.getUninterruptibly(future);
107          }
108        };
109      }
110    
111      /**
112       *
113       * <p>Creates a {@link ListenableFuture} out of a normal {@link Future}. The
114       * returned future will create a thread to wait for the source future to
115       * complete before executing the listeners.
116       *
117       * <p><b>Warning:</b> If the input future does not already implement {@link
118       * ListenableFuture}, the returned future will emulate {@link
119       * ListenableFuture#addListener} by taking a thread from an internal,
120       * unbounded pool at the first call to {@code addListener} and holding it
121       * until the future is {@linkplain Future#isDone() done}.
122       *
123       * @deprecated Prefer to create {@code ListenableFuture} instances with {@link
124       *     SettableFuture}, {@link MoreExecutors#listeningDecorator(
125       *     java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
126       *     {@link AbstractFuture}, and other utilities over creating plain {@code
127       *     Future} instances to be upgraded to {@code ListenableFuture} after the
128       *     fact. If this is not possible, the functionality of {@code
129       *     makeListenable} is now available as {@link
130       *     JdkFutureAdapters#listenInPoolThread}. <b>This method is scheduled
131       *     for deletion from Guava in Guava release 11.0.</b>
132       */
133      @Deprecated
134      public
135      static <V> ListenableFuture<V> makeListenable(Future<V> future) {
136        return JdkFutureAdapters.listenInPoolThread(future);
137      }
138    
139      /**
140       * Creates a {@link CheckedFuture} out of a normal {@link Future} and a
141       * {@link Function} that maps from {@link Exception} instances into the
142       * appropriate checked type.
143       *
144       * <p><b>Warning:</b> If the input future does not implement {@link
145       * ListenableFuture}, the returned future will emulate {@link
146       * ListenableFuture#addListener} by taking a thread from an internal,
147       * unbounded pool at the first call to {@code addListener} and holding it
148       * until the future is {@linkplain Future#isDone() done}.
149       *
150       * <p>The given mapping function will be applied to an
151       * {@link InterruptedException}, a {@link CancellationException}, or an
152       * {@link ExecutionException} with the actual cause of the exception.
153       * See {@link Future#get()} for details on the exceptions thrown.
154       *
155       * @deprecated Obtain a {@link ListenableFuture}, following the advice in its
156       *     documentation and use {@link #makeChecked(ListenableFuture, Function)}.
157       *     <b>This method is scheduled for deletion from Guava in Guava release
158       *     11.0.</b>
159       */
160      @Deprecated
161      public
162      static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
163          Future<V> future, Function<Exception, X> mapper) {
164        return new MappingCheckedFuture<V, X>(makeListenable(future), mapper);
165      }
166    
167      /**
168       * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
169       * and a {@link Function} that maps from {@link Exception} instances into the
170       * appropriate checked type.
171       *
172       * <p>The given mapping function will be applied to an
173       * {@link InterruptedException}, a {@link CancellationException}, or an
174       * {@link ExecutionException} with the actual cause of the exception.
175       * See {@link Future#get()} for details on the exceptions thrown.
176       *
177       * @since 9.0 (source-compatible since 1.0)
178       */
179      public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
180          ListenableFuture<V> future, Function<Exception, X> mapper) {
181        return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
182      }
183    
184      /**
185       * Creates a {@code ListenableFuture} which has its value set immediately upon
186       * construction. The getters just return the value. This {@code Future} can't
187       * be canceled or timed out and its {@code isDone()} method always returns
188       * {@code true}.
189       */
190      public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
191        SettableFuture<V> future = SettableFuture.create();
192        future.set(value);
193        return future;
194      }
195    
196      /**
197       * Returns a {@code CheckedFuture} which has its value set immediately upon
198       * construction.
199       *
200       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
201       * method always returns {@code true}. Calling {@code get()} or {@code
202       * checkedGet()} will immediately return the provided value.
203       */
204      public static <V, X extends Exception> CheckedFuture<V, X>
205          immediateCheckedFuture(@Nullable V value) {
206        SettableFuture<V> future = SettableFuture.create();
207        future.set(value);
208        return Futures.makeChecked(future, new Function<Exception, X>() {
209          @Override
210          public X apply(Exception e) {
211            throw new AssertionError("impossible");
212          }
213        });
214      }
215    
216      /**
217       * Returns a {@code ListenableFuture} which has an exception set immediately
218       * upon construction.
219       *
220       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
221       * method always returns {@code true}. Calling {@code get()} will immediately
222       * throw the provided {@code Throwable} wrapped in an {@code
223       * ExecutionException}.
224       *
225       * @throws Error if the throwable is an {@link Error}.
226       */
227      public static <V> ListenableFuture<V> immediateFailedFuture(
228          Throwable throwable) {
229        checkNotNull(throwable);
230        SettableFuture<V> future = SettableFuture.create();
231        future.setException(throwable);
232        return future;
233      }
234    
235      /**
236       * Returns a {@code CheckedFuture} which has an exception set immediately upon
237       * construction.
238       *
239       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
240       * method always returns {@code true}. Calling {@code get()} will immediately
241       * throw the provided {@code Throwable} wrapped in an {@code
242       * ExecutionException}, and calling {@code checkedGet()} will throw the
243       * provided exception itself.
244       *
245       * @throws Error if the throwable is an {@link Error}.
246       */
247      public static <V, X extends Exception> CheckedFuture<V, X>
248          immediateFailedCheckedFuture(final X exception) {
249        checkNotNull(exception);
250        return makeChecked(Futures.<V>immediateFailedFuture(exception),
251            new Function<Exception, X>() {
252              @Override
253              public X apply(Exception e) {
254                return exception;
255              }
256            });
257      }
258    
259      /**
260       * Returns a new {@code ListenableFuture} whose result is asynchronously
261       * derived from the result of the given {@code Future}. More precisely, the
262       * returned {@code Future} takes its result from a {@code Future} produced by
263       * applying the given {@code Function} to the result of the original {@code
264       * Future}. Example:
265       *
266       * <pre>   {@code
267       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
268       *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
269       *       new Function<RowKey, ListenableFuture<QueryResult>>() {
270       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
271       *           return dataService.read(rowKey);
272       *         }
273       *       };
274       *   ListenableFuture<QueryResult> queryFuture =
275       *       chain(queryFuture, queryFunction);
276       * }</pre>
277       *
278       * <p>Note: This overload of {@code chain} is designed for cases in which the
279       * work of creating the derived future is fast and lightweight, as the method
280       * does not accept an {@code Executor} to perform the the work in. For heavier
281       * derivations, this overload carries some caveats: First, the thread that the
282       * derivation runs in depends on whether the input {@code Future} is done at
283       * the time {@code chain} is called. In particular, if called late, {@code
284       * chain} will run the derivation in the thread that called {@code chain}.
285       * Second, derivations may run in an internal thread of the system responsible
286       * for the input {@code Future}, such as an RPC network thread. Finally,
287       * during the execution of a {@link MoreExecutors#sameThreadExecutor
288       * sameThreadExecutor} {@code chain} function, all other registered but
289       * unexecuted listeners are prevented from running, even if those listeners
290       * are to run in other executors.
291       *
292       * <p>The returned {@code Future} attempts to keep its cancellation state in
293       * sync with that of the input future and that of the future returned by the
294       * chain function. That is, if the returned {@code Future} is cancelled, it
295       * will attempt to cancel the other two, and if either of the other two is
296       * cancelled, the returned {@code Future} will receive a callback in which it
297       * will attempt to cancel itself.
298       *
299       * <p>The typical use for this method would be when a RPC call is dependent on
300       * the results of another RPC.  One would call the first RPC (input), create a
301       * function that calls another RPC based on input's result, and then call
302       * chain on input and that function to get a {@code ListenableFuture} of
303       * the result.
304       *
305       * @param input The future to chain
306       * @param function A function to chain the results of the provided future
307       *     to the results of the returned future.  This will be run in the thread
308       *     that notifies input it is complete.
309       * @return A future that holds result of the chain.
310       */
311      public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
312          Function<? super I, ? extends ListenableFuture<? extends O>> function) {
313        return chain(input, function, MoreExecutors.sameThreadExecutor());
314      }
315    
316      /**
317       * Returns a new {@code ListenableFuture} whose result is asynchronously
318       * derived from the result of the given {@code Future}. More precisely, the
319       * returned {@code Future} takes its result from a {@code Future} produced by
320       * applying the given {@code Function} to the result of the original {@code
321       * Future}. Example:
322       *
323       * <pre>   {@code
324       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
325       *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
326       *       new Function<RowKey, ListenableFuture<QueryResult>>() {
327       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
328       *           return dataService.read(rowKey);
329       *         }
330       *       };
331       *   ListenableFuture<QueryResult> queryFuture =
332       *       chain(queryFuture, queryFunction, executor);
333       * }</pre>
334       *
335       * <p>The returned {@code Future} attempts to keep its cancellation state in
336       * sync with that of the input future and that of the future returned by the
337       * chain function. That is, if the returned {@code Future} is cancelled, it
338       * will attempt to cancel the other two, and if either of the other two is
339       * cancelled, the returned {@code Future} will receive a callback in which it
340       * will attempt to cancel itself.
341       *
342       * <p>Note: For cases in which the work of creating the derived future is fast
343       * and lightweight, consider {@linkplain Futures#chain(ListenableFuture,
344       * Function) the other overload} or explicit use of {@link
345       * MoreExecutors#sameThreadExecutor}. For heavier derivations, this choice
346       * carries some caveats: First, the thread that the derivation runs in depends
347       * on whether the input {@code Future} is done at the time {@code chain} is
348       * called. In particular, if called late, {@code chain} will run the
349       * derivation in the thread that called {@code chain}. Second, derivations may
350       * run in an internal thread of the system responsible for the input {@code
351       * Future}, such as an RPC network thread. Finally, during the execution of a
352       * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor} {@code chain}
353       * function, all other registered but unexecuted listeners are prevented from
354       * running, even if those listeners are to run in other executors.
355       *
356       * @param input The future to chain
357       * @param function A function to chain the results of the provided future
358       *     to the results of the returned future.
359       * @param exec Executor to run the function in.
360       * @return A future that holds result of the chain.
361       */
362      public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
363          Function<? super I, ? extends ListenableFuture<? extends O>> function,
364          Executor exec) {
365        ChainingListenableFuture<I, O> chain =
366            new ChainingListenableFuture<I, O>(function, input);
367        input.addListener(chain, exec);
368        return chain;
369      }
370    
371      /**
372       * Returns a new {@code ListenableFuture} whose result is the product of
373       * applying the given {@code Function} to the result of the given {@code
374       * Future}. Example:
375       *
376       * <pre>   {@code
377       *   ListenableFuture<QueryResult> queryFuture = ...;
378       *   Function<QueryResult, List<Row>> rowsFunction =
379       *       new Function<QueryResult, List<Row>>() {
380       *         public List<Row> apply(QueryResult queryResult) {
381       *           return queryResult.getRows();
382       *         }
383       *       };
384       *   ListenableFuture<List<Row>> rowsFuture =
385       *       transform(queryFuture, rowsFunction);
386       * }</pre>
387       *
388       * <p>Note: This overload of {@code transform} is designed for cases in which
389       * the transformation is fast and lightweight, as the method does not accept
390       * an {@code Executor} to perform the the work in. For heavier
391       * transformations, this overload carries some caveats: First, the thread that
392       * the transformation runs in depends on whether the input {@code Future} is
393       * done at the time {@code transform} is called. In particular, if called
394       * late, {@code transform} will perform the transformation in the thread that
395       * called {@code transform}. Second, transformations may run in an internal
396       * thread of the system responsible for the input {@code Future}, such as an
397       * RPC network thread. Finally, during the execution of a {@link
398       * MoreExecutors#sameThreadExecutor sameThreadExecutor} transformation, all
399       * other registered but unexecuted listeners are prevented from running, even
400       * if those listeners are to run in other executors.
401       *
402       * <p>The returned {@code Future} attempts to keep its cancellation state in
403       * sync with that of the input future. That is, if the returned {@code Future}
404       * is cancelled, it will attempt to cancel the input, and if the input is
405       * cancelled, the returned {@code Future} will receive a callback in which it
406       * will attempt to cancel itself.
407       *
408       * <p>An example use of this method is to convert a serializable object
409       * returned from an RPC into a POJO.
410       *
411       * @param future The future to transform
412       * @param function A Function to transform the results of the provided future
413       *     to the results of the returned future.  This will be run in the thread
414       *     that notifies input it is complete.
415       * @return A future that holds result of the transformation.
416       * @since 9.0 (in 1.0 as {@code compose})
417       */
418      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
419          final Function<? super I, ? extends O> function) {
420        return transform(future, function, MoreExecutors.sameThreadExecutor());
421      }
422    
423      /**
424       * Returns a new {@code ListenableFuture} whose result is the product of
425       * applying the given {@code Function} to the result of the given {@code
426       * Future}. Example:
427       *
428       * <pre>   {@code
429       *   ListenableFuture<QueryResult> queryFuture = ...;
430       *   Function<QueryResult, List<Row>> rowsFunction =
431       *       new Function<QueryResult, List<Row>>() {
432       *         public List<Row> apply(QueryResult queryResult) {
433       *           return queryResult.getRows();
434       *         }
435       *       };
436       *   ListenableFuture<List<Row>> rowsFuture =
437       *       transform(queryFuture, rowsFunction, executor);
438       * }</pre>
439       *
440       * <p>The returned {@code Future} attempts to keep its cancellation state in
441       * sync with that of the input future. That is, if the returned {@code Future}
442       * is cancelled, it will attempt to cancel the input, and if the input is
443       * cancelled, the returned {@code Future} will receive a callback in which it
444       * will attempt to cancel itself.
445       *
446       * <p>An example use of this method is to convert a serializable object
447       * returned from an RPC into a POJO.
448       *
449       * <p>Note: For cases in which the transformation is fast and lightweight,
450       * consider {@linkplain Futures#transform(ListenableFuture, Function) the
451       * other overload} or explicit use of {@link
452       * MoreExecutors#sameThreadExecutor}. For heavier transformations, this choice
453       * carries some caveats: First, the thread that the transformation runs in
454       * depends on whether the input {@code Future} is done at the time {@code
455       * transform} is called. In particular, if called late, {@code transform} will
456       * perform the transformation in the thread that called {@code transform}.
457       * Second, transformations may run in an internal thread of the system
458       * responsible for the input {@code Future}, such as an RPC network thread.
459       * Finally, during the execution of a {@link MoreExecutors#sameThreadExecutor
460       * sameThreadExecutor} transformation, all other registered but unexecuted
461       * listeners are prevented from running, even if those listeners are to run
462       * in other executors.
463       *
464       * @param future The future to transform
465       * @param function A Function to transform the results of the provided future
466       *     to the results of the returned future.
467       * @param exec Executor to run the function in.
468       * @return A future that holds result of the transformation.
469       * @since 9.0 (in 2.0 as {@code compose})
470       */
471      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
472          final Function<? super I, ? extends O> function, Executor exec) {
473        checkNotNull(function);
474        Function<I, ListenableFuture<O>> wrapperFunction
475            = new Function<I, ListenableFuture<O>>() {
476                @Override public ListenableFuture<O> apply(I input) {
477                  O output = function.apply(input);
478                  return immediateFuture(output);
479                }
480            };
481        return chain(future, wrapperFunction, exec);
482      }
483    
484      /**
485       * Like {@link #transform(ListenableFuture, Function)} except that the
486       * transformation {@code function} is invoked on each call to
487       * {@link Future#get() get()} on the returned future.
488       *
489       * <p>The returned {@code Future} reflects the input's cancellation
490       * state directly, and any attempt to cancel the returned Future is likewise
491       * passed through to the input Future.
492       *
493       * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
494       * only apply the timeout to the execution of the underlying {@code Future},
495       * <em>not</em> to the execution of the transformation function.
496       *
497       * <p>The primary audience of this method is callers of {@code transform}
498       * who don't have a {@code ListenableFuture} available and
499       * do not mind repeated, lazy function evaluation.
500       *
501       * @param future The future to transform
502       * @param function A Function to transform the results of the provided future
503       *     to the results of the returned future.
504       * @return A future that returns the result of the transformation.
505       * @since 10.0
506       */
507      @Beta
508      public static <I, O> Future<O> lazyTransform(final Future<I> future,
509          final Function<? super I, ? extends O> function) {
510        checkNotNull(future);
511        checkNotNull(function);
512        return new Future<O>() {
513    
514          @Override
515          public boolean cancel(boolean mayInterruptIfRunning) {
516            return future.cancel(mayInterruptIfRunning);
517          }
518    
519          @Override
520          public boolean isCancelled() {
521            return future.isCancelled();
522          }
523    
524          @Override
525          public boolean isDone() {
526            return future.isDone();
527          }
528    
529          @Override
530          public O get() throws InterruptedException, ExecutionException {
531            return applyTransformation(future.get());
532          }
533    
534          @Override
535          public O get(long timeout, TimeUnit unit)
536              throws InterruptedException, ExecutionException, TimeoutException {
537            return applyTransformation(future.get(timeout, unit));
538          }
539    
540          private O applyTransformation(I input) throws ExecutionException {
541            try {
542              return function.apply(input);
543            } catch (Throwable t) {
544              throw new ExecutionException(t);
545            }
546          }
547        };
548      }
549    
550      /**
551       * Returns a new {@code Future} whose result is the product of applying the
552       * given {@code Function} to the result of the given {@code Future}. Example:
553       *
554       * <pre>   {@code
555       *   Future<QueryResult> queryFuture = ...;
556       *   Function<QueryResult, List<Row>> rowsFunction =
557       *       new Function<QueryResult, List<Row>>() {
558       *         public List<Row> apply(QueryResult queryResult) {
559       *           return queryResult.getRows();
560       *         }
561       *       };
562       *   Future<List<Row>> rowsFuture = transform(queryFuture, rowsFunction);
563       * }</pre>
564       *
565       * <p>Each call to {@code Future<O>.get(*)} results in a call to
566       * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it
567       * is assumed that {@code Future<I>.get(*)} is idempotent.
568       *
569       * <p>When calling {@link Future#get(long, TimeUnit)} on the returned
570       * future, the timeout only applies to the future passed in to this method.
571       * Any additional time taken by applying {@code function} is not considered.
572       * (Exception: If the input future is a {@link ListenableFuture}, timeouts
573       * will be strictly enforced.)
574       *
575       * @param future The future to transform
576       * @param function A Function to transform the results of the provided future
577       *     to the results of the returned future.  This will be run in the thread
578       *     that calls one of the varieties of {@code get()}.
579       * @return A future that computes result of the transformation
580       * @since 9.0 (in 1.0 as {@code compose})
581       * @deprecated Obtain a {@code ListenableFuture} (following the advice in its
582       *     documentation) and use {@link #transform(ListenableFuture, Function)}
583       *     or use {@link #lazyTransform(Future, Function)}, which will apply the
584       *     transformation on each call to {@code get()}.
585       *     <b>This method is scheduled for deletion from Guava in Guava release
586       *     11.0.</b>
587       */
588      @Deprecated
589      public static <I, O> Future<O> transform(final Future<I> future,
590          final Function<? super I, ? extends O> function) {
591        if (future instanceof ListenableFuture) {
592          return transform((ListenableFuture<I>) future, function);
593        }
594        checkNotNull(future);
595        checkNotNull(function);
596        return new Future<O>() {
597    
598          /*
599           * Concurrency detail:
600           *
601           * <p>To preserve the idempotency of calls to this.get(*) calls to the
602           * function are only applied once. A lock is required to prevent multiple
603           * applications of the function. The calls to future.get(*) are performed
604           * outside the lock, as is required to prevent calls to
605           * get(long, TimeUnit) to persist beyond their timeout.
606           *
607           * <p>Calls to future.get(*) on every call to this.get(*) also provide
608           * the cancellation behavior for this.
609           *
610           * <p>(Consider: in thread A, call get(), in thread B call get(long,
611           * TimeUnit). Thread B may have to wait for Thread A to finish, which
612           * would be unacceptable.)
613           *
614           * <p>Note that each call to Future<O>.get(*) results in a call to
615           * Future<I>.get(*), but the function is only applied once, so
616           * Future<I>.get(*) is assumed to be idempotent.
617           */
618    
619          private final Object lock = new Object();
620          private boolean set = false;
621          private O value = null;
622          private ExecutionException exception = null;
623    
624          @Override
625          public O get() throws InterruptedException, ExecutionException {
626            return apply(future.get());
627          }
628    
629          @Override
630          public O get(long timeout, TimeUnit unit) throws InterruptedException,
631              ExecutionException, TimeoutException {
632            return apply(future.get(timeout, unit));
633          }
634    
635          private O apply(I raw) throws ExecutionException {
636            synchronized (lock) {
637              if (!set) {
638                try {
639                  value = function.apply(raw);
640                } catch (RuntimeException e) {
641                  exception = new ExecutionException(e);
642                } catch (Error e) {
643                  exception = new ExecutionException(e);
644                }
645                set = true;
646              }
647    
648              if (exception != null) {
649                throw exception;
650              }
651              return value;
652            }
653          }
654    
655          @Override
656          public boolean cancel(boolean mayInterruptIfRunning) {
657            return future.cancel(mayInterruptIfRunning);
658          }
659    
660          @Override
661          public boolean isCancelled() {
662            return future.isCancelled();
663          }
664    
665          @Override
666          public boolean isDone() {
667            return future.isDone();
668          }
669        };
670      }
671    
672      /**
673       * An implementation of {@code ListenableFuture} that also implements
674       * {@code Runnable} so that it can be used to nest ListenableFutures.
675       * Once the passed-in {@code ListenableFuture} is complete, it calls the
676       * passed-in {@code Function} to generate the result.
677       *
678       * <p>If the function throws any checked exceptions, they should be wrapped
679       * in a {@code UndeclaredThrowableException} so that this class can get
680       * access to the cause.
681       */
682      private static class ChainingListenableFuture<I, O>
683          extends AbstractFuture<O> implements Runnable {
684    
685        private Function<? super I, ? extends ListenableFuture<? extends O>>
686            function;
687        private ListenableFuture<? extends I> inputFuture;
688        private volatile ListenableFuture<? extends O> outputFuture;
689        private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
690            new LinkedBlockingQueue<Boolean>(1);
691        private final CountDownLatch outputCreated = new CountDownLatch(1);
692    
693        private ChainingListenableFuture(
694            Function<? super I, ? extends ListenableFuture<? extends O>> function,
695            ListenableFuture<? extends I> inputFuture) {
696          this.function = checkNotNull(function);
697          this.inputFuture = checkNotNull(inputFuture);
698        }
699    
700        /**
701         * Delegate the get() to the input and output futures, in case
702         * their implementations defer starting computation until their
703         * own get() is invoked.
704         */
705        @Override
706        public O get() throws InterruptedException, ExecutionException {
707          if (!isDone()) {
708            // Invoking get on the inputFuture will ensure our own run()
709            // method below is invoked as a listener when inputFuture sets
710            // its value.  Therefore when get() returns we should then see
711            // the outputFuture be created.
712            ListenableFuture<? extends I> inputFuture = this.inputFuture;
713            if (inputFuture != null) {
714              inputFuture.get();
715            }
716    
717            // If our listener was scheduled to run on an executor we may
718            // need to wait for our listener to finish running before the
719            // outputFuture has been constructed by the function.
720            outputCreated.await();
721    
722            // Like above with the inputFuture, we have a listener on
723            // the outputFuture that will set our own value when its
724            // value is set.  Invoking get will ensure the output can
725            // complete and invoke our listener, so that we can later
726            // get the result.
727            ListenableFuture<? extends O> outputFuture = this.outputFuture;
728            if (outputFuture != null) {
729              outputFuture.get();
730            }
731          }
732          return super.get();
733        }
734    
735        /**
736         * Delegate the get() to the input and output futures, in case
737         * their implementations defer starting computation until their
738         * own get() is invoked.
739         */
740        @Override
741        public O get(long timeout, TimeUnit unit) throws TimeoutException,
742            ExecutionException, InterruptedException {
743          if (!isDone()) {
744            // Use a single time unit so we can decrease remaining timeout
745            // as we wait for various phases to complete.
746            if (unit != NANOSECONDS) {
747              timeout = NANOSECONDS.convert(timeout, unit);
748              unit = NANOSECONDS;
749            }
750    
751            // Invoking get on the inputFuture will ensure our own run()
752            // method below is invoked as a listener when inputFuture sets
753            // its value.  Therefore when get() returns we should then see
754            // the outputFuture be created.
755            ListenableFuture<? extends I> inputFuture = this.inputFuture;
756            if (inputFuture != null) {
757              long start = System.nanoTime();
758              inputFuture.get(timeout, unit);
759              timeout -= Math.max(0, System.nanoTime() - start);
760            }
761    
762            // If our listener was scheduled to run on an executor we may
763            // need to wait for our listener to finish running before the
764            // outputFuture has been constructed by the function.
765            long start = System.nanoTime();
766            if (!outputCreated.await(timeout, unit)) {
767              throw new TimeoutException();
768            }
769            timeout -= Math.max(0, System.nanoTime() - start);
770    
771            // Like above with the inputFuture, we have a listener on
772            // the outputFuture that will set our own value when its
773            // value is set.  Invoking get will ensure the output can
774            // complete and invoke our listener, so that we can later
775            // get the result.
776            ListenableFuture<? extends O> outputFuture = this.outputFuture;
777            if (outputFuture != null) {
778              outputFuture.get(timeout, unit);
779            }
780          }
781          return super.get(timeout, unit);
782        }
783    
784        @Override
785        public boolean cancel(boolean mayInterruptIfRunning) {
786          /*
787           * Our additional cancellation work needs to occur even if
788           * !mayInterruptIfRunning, so we can't move it into interruptTask().
789           */
790          if (super.cancel(mayInterruptIfRunning)) {
791            // This should never block since only one thread is allowed to cancel
792            // this Future.
793            putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
794            cancel(inputFuture, mayInterruptIfRunning);
795            cancel(outputFuture, mayInterruptIfRunning);
796            return true;
797          }
798          return false;
799        }
800    
801        private void cancel(@Nullable Future<?> future,
802            boolean mayInterruptIfRunning) {
803          if (future != null) {
804            future.cancel(mayInterruptIfRunning);
805          }
806        }
807    
808        @Override
809        public void run() {
810          try {
811            I sourceResult;
812            try {
813              sourceResult = getUninterruptibly(inputFuture);
814            } catch (CancellationException e) {
815              // Cancel this future and return.
816              // At this point, inputFuture is cancelled and outputFuture doesn't
817              // exist, so the value of mayInterruptIfRunning is irrelevant.
818              cancel(false);
819              return;
820            } catch (ExecutionException e) {
821              // Set the cause of the exception as this future's exception
822              setException(e.getCause());
823              return;
824            }
825    
826            final ListenableFuture<? extends O> outputFuture = this.outputFuture =
827                function.apply(sourceResult);
828            if (isCancelled()) {
829              // Handles the case where cancel was called while the function was
830              // being applied.
831              // There is a gap in cancel(boolean) between calling sync.cancel()
832              // and storing the value of mayInterruptIfRunning, so this thread
833              // needs to block, waiting for that value.
834              outputFuture.cancel(
835                  takeUninterruptibly(mayInterruptIfRunningChannel));
836              this.outputFuture = null;
837              return;
838            }
839            outputFuture.addListener(new Runnable() {
840                @Override
841                public void run() {
842                  try {
843                    // Here it would have been nice to have had an
844                    // UninterruptibleListenableFuture, but we don't want to start a
845                    // combinatorial explosion of interfaces, so we have to make do.
846                    set(getUninterruptibly(outputFuture));
847                  } catch (CancellationException e) {
848                    // Cancel this future and return.
849                    // At this point, inputFuture and outputFuture are done, so the
850                    // value of mayInterruptIfRunning is irrelevant.
851                    cancel(false);
852                    return;
853                  } catch (ExecutionException e) {
854                    // Set the cause of the exception as this future's exception
855                    setException(e.getCause());
856                  } finally {
857                    // Don't pin inputs beyond completion
858                    ChainingListenableFuture.this.outputFuture = null;
859                  }
860                }
861              }, MoreExecutors.sameThreadExecutor());
862          } catch (UndeclaredThrowableException e) {
863            // Set the cause of the exception as this future's exception
864            setException(e.getCause());
865          } catch (RuntimeException e) {
866            // This exception is irrelevant in this thread, but useful for the
867            // client
868            setException(e);
869          } catch (Error e) {
870            // Propagate errors up ASAP - our superclass will rethrow the error
871            setException(e);
872          } finally {
873            // Don't pin inputs beyond completion
874            function = null;
875            inputFuture = null;
876            // Allow our get routines to examine outputFuture now.
877            outputCreated.countDown();
878          }
879        }
880      }
881    
882      /**
883       * Creates a new {@code ListenableFuture} whose value is a list containing the
884       * values of all its input futures, if all succeed. If any input fails, the
885       * returned future fails.
886       *
887       * <p>The list of results is in the same order as the input list.
888       *
889       * <p>Canceling this future does not cancel any of the component futures;
890       * however, if any of the provided futures fails or is canceled, this one is,
891       * too.
892       *
893       * @param futures futures to combine
894       * @return a future that provides a list of the results of the component
895       *         futures
896       * @since 10.0
897       */
898      @Beta
899      public static <V> ListenableFuture<List<V>> allAsList(
900          ListenableFuture<? extends V>... futures) {
901        return new ListFuture<V>(ImmutableList.copyOf(futures), true,
902            MoreExecutors.sameThreadExecutor());
903      }
904    
905      /**
906       * Creates a new {@code ListenableFuture} whose value is a list containing the
907       * values of all its input futures, if all succeed. If any input fails, the
908       * returned future fails.
909       *
910       * <p>The list of results is in the same order as the input list.
911       *
912       * <p>Canceling this future does not cancel any of the component futures;
913       * however, if any of the provided futures fails or is canceled, this one is,
914       * too.
915       *
916       * @param futures futures to combine
917       * @return a future that provides a list of the results of the component
918       *         futures
919       * @since 10.0
920       */
921      @Beta
922      public static <V> ListenableFuture<List<V>> allAsList(
923          Iterable<? extends ListenableFuture<? extends V>> futures) {
924        return new ListFuture<V>(ImmutableList.copyOf(futures), true,
925            MoreExecutors.sameThreadExecutor());
926      }
927    
928      /**
929       * Creates a new {@code ListenableFuture} whose value is a list containing the
930       * values of all its successful input futures. The list of results is in the
931       * same order as the input list, and if any of the provided futures fails or
932       * is canceled, its corresponding position will contain {@code null} (which is
933       * indistinguishable from the future having a successful value of
934       * {@code null}).
935       *
936       * @param futures futures to combine
937       * @return a future that provides a list of the results of the component
938       *         futures
939       * @since 10.0
940       */
941      @Beta
942      public static <V> ListenableFuture<List<V>> successfulAsList(
943          ListenableFuture<? extends V>... futures) {
944        return new ListFuture<V>(ImmutableList.copyOf(futures), false,
945            MoreExecutors.sameThreadExecutor());
946      }
947    
948      /**
949       * Creates a new {@code ListenableFuture} whose value is a list containing the
950       * values of all its successful input futures. The list of results is in the
951       * same order as the input list, and if any of the provided futures fails or
952       * is canceled, its corresponding position will contain {@code null} (which is
953       * indistinguishable from the future having a successful value of
954       * {@code null}).
955       *
956       * @param futures futures to combine
957       * @return a future that provides a list of the results of the component
958       *         futures
959       * @since 10.0
960       */
961      @Beta
962      public static <V> ListenableFuture<List<V>> successfulAsList(
963          Iterable<? extends ListenableFuture<? extends V>> futures) {
964        return new ListFuture<V>(ImmutableList.copyOf(futures), false,
965            MoreExecutors.sameThreadExecutor());
966      }
967    
968      /**
969       * Registers separate success and failure callbacks to be run when the {@code
970       * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
971       * complete} or, if the computation is already complete, immediately.
972       *
973       * <p>There is no guaranteed ordering of execution of callbacks, but any
974       * callback added through this method is guaranteed to be called once the
975       * computation is complete.
976       *
977       * Example: <pre> {@code
978       * ListenableFuture<QueryResult> future = ...;
979       * addCallback(future,
980       *     new FutureCallback<QueryResult> {
981       *       public void onSuccess(QueryResult result) {
982       *         storeInCache(result);
983       *       }
984       *       public void onFailure(Throwable t) {
985       *         reportError(t);
986       *       }
987       *     });}</pre>
988       *
989       * <p>Note: This overload of {@code addCallback} is designed for cases in
990       * which the callack is fast and lightweight, as the method does not accept
991       * an {@code Executor} to perform the the work in. For heavier
992       * callbacks, this overload carries some caveats: First, the thread that
993       * the callback runs in depends on whether the input {@code Future} is
994       * done at the time {@code addCallback} is called. In particular, if called
995       * late, {@code addCallback} will execute the callback in the thread that
996       * called {@code addCallback}. Second, callbacks may run in an internal
997       * thread of the system responsible for the input {@code Future}, such as an
998       * RPC network thread. Finally, during the execution of a {@link
999       * MoreExecutors#sameThreadExecutor sameThreadExecutor} callback, all other
1000       * registered but unexecuted listeners are prevented from running, even if
1001       * those listeners are to run in other executors.
1002       *
1003       * <p>For a more general interface to attach a completion listener to a
1004       * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1005       *
1006       * @param future The future attach the callback to.
1007       * @param callback The callback to invoke when {@code future} is completed.
1008       * @since 10.0
1009       */
1010      public static <V> void addCallback(ListenableFuture<V> future,
1011          FutureCallback<? super V> callback) {
1012        addCallback(future, callback, MoreExecutors.sameThreadExecutor());
1013      }
1014    
1015      /**
1016       * Registers separate success and failure callbacks to be run when the {@code
1017       * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1018       * complete} or, if the computation is already complete, immediately.
1019       *
1020       * <p>The callback is run in {@code executor}.
1021       * There is no guaranteed ordering of execution of callbacks, but any
1022       * callback added through this method is guaranteed to be called once the
1023       * computation is complete.
1024       *
1025       * Example: <pre> {@code
1026       * ListenableFuture<QueryResult> future = ...;
1027       * Executor e = ...
1028       * addCallback(future, e,
1029       *     new FutureCallback<QueryResult> {
1030       *       public void onSuccess(QueryResult result) {
1031       *         storeInCache(result);
1032       *       }
1033       *       public void onFailure(Throwable t) {
1034       *         reportError(t);
1035       *       }
1036       *     });}</pre>
1037       *
1038       * When the callback is fast and lightweight consider
1039       * {@linkplain Futures#addCallback(ListenableFuture, FutureCallback)
1040       * the other overload} or explicit use of
1041       * {@link MoreExecutors#sameThreadExecutor() sameThreadExecutor}. For heavier
1042       * callbacks, this choice carries some caveats: First, the thread that
1043       * the callback runs in depends on whether the input {@code Future} is
1044       * done at the time {@code addCallback} is called. In particular, if called
1045       * late, {@code addCallback} will execute the callback in the thread that
1046       * called {@code addCallback}. Second, callbacks may run in an internal
1047       * thread of the system responsible for the input {@code Future}, such as an
1048       * RPC network thread. Finally, during the execution of a {@link
1049       * MoreExecutors#sameThreadExecutor sameThreadExecutor} callback, all other
1050       * registered but unexecuted listeners are prevented from running, even if
1051       * those listeners are to run in other executors.
1052       *
1053       * <p>For a more general interface to attach a completion listener to a
1054       * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1055       *
1056       * @param future The future attach the callback to.
1057       * @param callback The callback to invoke when {@code future} is completed.
1058       * @param executor The executor to run {@code callback} when the future
1059       *    completes.
1060       * @since 10.0
1061       */
1062      public static <V> void addCallback(final ListenableFuture<V> future,
1063          final FutureCallback<? super V> callback, Executor executor) {
1064        Preconditions.checkNotNull(callback);
1065        Runnable callbackListener = new Runnable() {
1066          @Override
1067          public void run() {
1068            try {
1069              // TODO(user): (Before Guava release), validate that this
1070              // is the thing for IE.
1071              V value = getUninterruptibly(future);
1072              callback.onSuccess(value);
1073            } catch (ExecutionException e) {
1074              callback.onFailure(e.getCause());
1075            } catch (RuntimeException e) {
1076              callback.onFailure(e);
1077            } catch (Error e) {
1078              callback.onFailure(e);
1079            }
1080          }
1081        };
1082        future.addListener(callbackListener, executor);
1083      }
1084    
1085      /**
1086       * Returns the result of {@link Future#get()}, converting most exceptions to a
1087       * new instance of the given checked exception type. This reduces boilerplate
1088       * for a common use of {@code Future} in which it is unnecessary to
1089       * programmatically distinguish between exception types or to extract other
1090       * information from the exception instance.
1091       *
1092       * <p>Exceptions from {@code Future.get} are treated as follows:
1093       * <ul>
1094       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1095       *     {@code X} if the cause is a checked exception, an {@link
1096       *     UncheckedExecutionException} if the cause is a {@code
1097       *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1098       *     {@code Error}.
1099       * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1100       *     restoring the interrupt).
1101       * <li>Any {@link CancellationException} is propagated untouched, as is any
1102       *     other {@link RuntimeException} (though {@code get} implementations are
1103       *     discouraged from throwing such exceptions).
1104       * </ul>
1105       *
1106       * The overall principle is to continue to treat every checked exception as a
1107       * checked exception, every unchecked exception as an unchecked exception, and
1108       * every error as an error. In addition, the cause of any {@code
1109       * ExecutionException} is wrapped in order to ensure that the new stack trace
1110       * matches that of the current thread.
1111       *
1112       * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1113       * public constructor that accepts zero or more arguments, all of type {@code
1114       * String} or {@code Throwable} (preferring constructors with at least one
1115       * {@code String}) and calling the constructor via reflection. If the
1116       * exception did not already have a cause, one is set by calling {@link
1117       * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1118       * {@code IllegalArgumentException} is thrown.
1119       *
1120       * @throws X if {@code get} throws any checked exception except for an {@code
1121       *         ExecutionException} whose cause is not itself a checked exception
1122       * @throws UncheckedExecutionException if {@code get} throws an {@code
1123       *         ExecutionException} with a {@code RuntimeException} as its cause
1124       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1125       *         with an {@code Error} as its cause
1126       * @throws CancellationException if {@code get} throws a {@code
1127       *         CancellationException}
1128       * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1129       *         RuntimeException} or does not have a suitable constructor
1130       * @since 10.0
1131       */
1132      @Beta
1133      public static <V, X extends Exception> V get(
1134          Future<V> future, Class<X> exceptionClass) throws X {
1135        checkNotNull(future);
1136        checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1137            "Futures.get exception type (%s) must not be a RuntimeException",
1138            exceptionClass);
1139        try {
1140          return future.get();
1141        } catch (InterruptedException e) {
1142          currentThread().interrupt();
1143          throw newWithCause(exceptionClass, e);
1144        } catch (ExecutionException e) {
1145          wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1146          throw new AssertionError();
1147        }
1148      }
1149    
1150      /**
1151       * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1152       * exceptions to a new instance of the given checked exception type. This
1153       * reduces boilerplate for a common use of {@code Future} in which it is
1154       * unnecessary to programmatically distinguish between exception types or to
1155       * extract other information from the exception instance.
1156       *
1157       * <p>Exceptions from {@code Future.get} are treated as follows:
1158       * <ul>
1159       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1160       *     {@code X} if the cause is a checked exception, an {@link
1161       *     UncheckedExecutionException} if the cause is a {@code
1162       *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1163       *     {@code Error}.
1164       * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1165       *     restoring the interrupt).
1166       * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1167       * <li>Any {@link CancellationException} is propagated untouched, as is any
1168       *     other {@link RuntimeException} (though {@code get} implementations are
1169       *     discouraged from throwing such exceptions).
1170       * </ul>
1171       *
1172       * The overall principle is to continue to treat every checked exception as a
1173       * checked exception, every unchecked exception as an unchecked exception, and
1174       * every error as an error. In addition, the cause of any {@code
1175       * ExecutionException} is wrapped in order to ensure that the new stack trace
1176       * matches that of the current thread.
1177       *
1178       * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1179       * public constructor that accepts zero or more arguments, all of type {@code
1180       * String} or {@code Throwable} (preferring constructors with at least one
1181       * {@code String}) and calling the constructor via reflection. If the
1182       * exception did not already have a cause, one is set by calling {@link
1183       * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1184       * {@code IllegalArgumentException} is thrown.
1185       *
1186       * @throws X if {@code get} throws any checked exception except for an {@code
1187       *         ExecutionException} whose cause is not itself a checked exception
1188       * @throws UncheckedExecutionException if {@code get} throws an {@code
1189       *         ExecutionException} with a {@code RuntimeException} as its cause
1190       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1191       *         with an {@code Error} as its cause
1192       * @throws CancellationException if {@code get} throws a {@code
1193       *         CancellationException}
1194       * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1195       *         RuntimeException} or does not have a suitable constructor
1196       * @since 10.0
1197       */
1198      @Beta
1199      public static <V, X extends Exception> V get(
1200          Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1201          throws X {
1202        checkNotNull(future);
1203        checkNotNull(unit);
1204        checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1205            "Futures.get exception type (%s) must not be a RuntimeException",
1206            exceptionClass);
1207        try {
1208          return future.get(timeout, unit);
1209        } catch (InterruptedException e) {
1210          currentThread().interrupt();
1211          throw newWithCause(exceptionClass, e);
1212        } catch (TimeoutException e) {
1213          throw newWithCause(exceptionClass, e);
1214        } catch (ExecutionException e) {
1215          wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1216          throw new AssertionError();
1217        }
1218      }
1219    
1220      private static <X extends Exception> void wrapAndThrowExceptionOrError(
1221          Throwable cause, Class<X> exceptionClass) throws X {
1222        if (cause instanceof Error) {
1223          throw new ExecutionError((Error) cause);
1224        }
1225        if (cause instanceof RuntimeException) {
1226          throw new UncheckedExecutionException(cause);
1227        }
1228        throw newWithCause(exceptionClass, cause);
1229      }
1230    
1231      /**
1232       * Returns the result of calling {@link Future#get()} uninterruptibly on a
1233       * task known not to throw a checked exception. This makes {@code Future} more
1234       * suitable for lightweight, fast-running tasks that, barring bugs in the
1235       * code, will not fail. This gives it exception-handling behavior similar to
1236       * that of {@code ForkJoinTask.join}.
1237       *
1238       * <p>Exceptions from {@code Future.get} are treated as follows:
1239       * <ul>
1240       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1241       *     {@link UncheckedExecutionException} (if the cause is an {@code
1242       *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1243       *     Error}).
1244       * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1245       *     call. The interrupt is restored before {@code getUnchecked} returns.
1246       * <li>Any {@link CancellationException} is propagated untouched. So is any
1247       *     other {@link RuntimeException} ({@code get} implementations are
1248       *     discouraged from throwing such exceptions).
1249       * </ul>
1250       *
1251       * The overall principle is to eliminate all checked exceptions: to loop to
1252       * avoid {@code InterruptedException}, to pass through {@code
1253       * CancellationException}, and to wrap any exception from the underlying
1254       * computation in an {@code UncheckedExecutionException} or {@code
1255       * ExecutionError}.
1256       *
1257       * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1258       * {@link Uninterruptibles#getUninterruptibly(Future)}.
1259       *
1260       * @throws UncheckedExecutionException if {@code get} throws an {@code
1261       *         ExecutionException} with an {@code Exception} as its cause
1262       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1263       *         with an {@code Error} as its cause
1264       * @throws CancellationException if {@code get} throws a {@code
1265       *         CancellationException}
1266       * @since 10.0
1267       */
1268      @Beta
1269      public static <V> V getUnchecked(Future<V> future) {
1270        checkNotNull(future);
1271        try {
1272          return getUninterruptibly(future);
1273        } catch (ExecutionException e) {
1274          wrapAndThrowUnchecked(e.getCause());
1275          throw new AssertionError();
1276        }
1277      }
1278    
1279      private static void wrapAndThrowUnchecked(Throwable cause) {
1280        if (cause instanceof Error) {
1281          throw new ExecutionError((Error) cause);
1282        }
1283        /*
1284         * It's a non-Error, non-Exception Throwable. From my survey of such
1285         * classes, I believe that most users intended to extend Exception, so we'll
1286         * treat it like an Exception.
1287         */
1288        throw new UncheckedExecutionException(cause);
1289      }
1290    
1291      /*
1292       * TODO(user): FutureChecker interface for these to be static methods on? If
1293       * so, refer to it in the (static-method) Futures.get documentation
1294       */
1295    
1296      /*
1297       * Arguably we don't need a timed getUnchecked because any operation slow
1298       * enough to require a timeout is heavyweight enough to throw a checked
1299       * exception and therefore be inappropriate to use with getUnchecked. Further,
1300       * it's not clear that converting the checked TimeoutException to a
1301       * RuntimeException -- especially to an UncheckedExecutionException, since it
1302       * wasn't thrown by the computation -- makes sense, and if we don't convert
1303       * it, the user still has to write a try-catch block.
1304       *
1305       * If you think you would use this method, let us know.
1306       */
1307    
1308      private static <X extends Exception> X newWithCause(
1309          Class<X> exceptionClass, Throwable cause) {
1310        // getConstructors() guarantees this as long as we don't modify the array.
1311        @SuppressWarnings("unchecked")
1312        List<Constructor<X>> constructors =
1313            (List) Arrays.asList(exceptionClass.getConstructors());
1314        for (Constructor<X> constructor : preferringStrings(constructors)) {
1315          @Nullable X instance = newFromConstructor(constructor, cause);
1316          if (instance != null) {
1317            if (instance.getCause() == null) {
1318              instance.initCause(cause);
1319            }
1320            return instance;
1321          }
1322        }
1323        throw new IllegalArgumentException(
1324            "No appropriate constructor for exception of type " + exceptionClass
1325                + " in response to chained exception", cause);
1326      }
1327    
1328      private static <X extends Exception> List<Constructor<X>>
1329          preferringStrings(List<Constructor<X>> constructors) {
1330        return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1331      }
1332    
1333      private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1334          Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1335            @Override public Boolean apply(Constructor<?> input) {
1336              return asList(input.getParameterTypes()).contains(String.class);
1337            }
1338          }).reverse();
1339    
1340      @Nullable private static <X> X newFromConstructor(
1341          Constructor<X> constructor, Throwable cause) {
1342        Class<?>[] paramTypes = constructor.getParameterTypes();
1343        Object[] params = new Object[paramTypes.length];
1344        for (int i = 0; i < paramTypes.length; i++) {
1345          Class<?> paramType = paramTypes[i];
1346          if (paramType.equals(String.class)) {
1347            params[i] = cause.toString();
1348          } else if (paramType.equals(Throwable.class)) {
1349            params[i] = cause;
1350          } else {
1351            return null;
1352          }
1353        }
1354        try {
1355          return constructor.newInstance(params);
1356        } catch (IllegalArgumentException e) {
1357          return null;
1358        } catch (InstantiationException e) {
1359          return null;
1360        } catch (IllegalAccessException e) {
1361          return null;
1362        } catch (InvocationTargetException e) {
1363          return null;
1364        }
1365      }
1366    
1367      /**
1368       * Class that implements {@link #allAsList} and {@link #successfulAsList}.
1369       * The idea is to create a (null-filled) List and register a listener with
1370       * each component future to fill out the value in the List when that future
1371       * completes.
1372       */
1373      private static class ListFuture<V> extends AbstractFuture<List<V>> {
1374        ImmutableList<? extends ListenableFuture<? extends V>> futures;
1375        final boolean allMustSucceed;
1376        final AtomicInteger remaining;
1377        List<V> values;
1378    
1379        /**
1380         * Constructor.
1381         *
1382         * @param futures all the futures to build the list from
1383         * @param allMustSucceed whether a single failure or cancellation should
1384         *        propagate to this future
1385         * @param listenerExecutor used to run listeners on all the passed in
1386         *        futures.
1387         */
1388        ListFuture(
1389            final ImmutableList<? extends ListenableFuture<? extends V>> futures,
1390            final boolean allMustSucceed, final Executor listenerExecutor) {
1391          this.futures = futures;
1392          this.values = Lists.newArrayListWithCapacity(futures.size());
1393          this.allMustSucceed = allMustSucceed;
1394          this.remaining = new AtomicInteger(futures.size());
1395    
1396          init(listenerExecutor);
1397        }
1398    
1399        private void init(final Executor listenerExecutor) {
1400          // First, schedule cleanup to execute when the Future is done.
1401          addListener(new Runnable() {
1402            @Override
1403            public void run() {
1404              // By now the values array has either been set as the Future's value,
1405              // or (in case of failure) is no longer useful.
1406              ListFuture.this.values = null;
1407    
1408              // Let go of the memory held by other futures
1409              ListFuture.this.futures = null;
1410            }
1411          }, MoreExecutors.sameThreadExecutor());
1412    
1413          // Now begin the "real" initialization.
1414    
1415          // Corner case: List is empty.
1416          if (futures.isEmpty()) {
1417            set(Lists.newArrayList(values));
1418            return;
1419          }
1420    
1421          // Populate the results list with null initially.
1422          for (int i = 0; i < futures.size(); ++i) {
1423            values.add(null);
1424          }
1425    
1426          // Register a listener on each Future in the list to update
1427          // the state of this future.
1428          // Note that if all the futures on the list are done prior to completing
1429          // this loop, the last call to addListener() will callback to
1430          // setOneValue(), transitively call our cleanup listener, and set
1431          // this.futures to null.
1432          // We store a reference to futures to avoid the NPE.
1433          ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures;
1434          for (int i = 0; i < localFutures.size(); i++) {
1435            final ListenableFuture<? extends V> listenable = localFutures.get(i);
1436            final int index = i;
1437            listenable.addListener(new Runnable() {
1438              @Override
1439              public void run() {
1440                setOneValue(index, listenable);
1441              }
1442            }, listenerExecutor);
1443          }
1444        }
1445    
1446        /**
1447         * Sets the value at the given index to that of the given future.
1448         */
1449        private void setOneValue(int index, Future<? extends V> future) {
1450          List<V> localValues = values;
1451          if (isDone() || localValues == null) {
1452            // Some other future failed or has been cancelled, causing this one to
1453            // also be cancelled or have an exception set. This should only happen
1454            // if allMustSucceed is true.
1455            checkState(allMustSucceed,
1456                "Future was done before all dependencies completed");
1457            return;
1458          }
1459    
1460          try {
1461            checkState(future.isDone(),
1462                "Tried to set value from future which is not done");
1463            localValues.set(index, getUninterruptibly(future));
1464          } catch (CancellationException e) {
1465            if (allMustSucceed) {
1466              // Set ourselves as cancelled. Let the input futures keep running
1467              // as some of them may be used elsewhere.
1468              // (Currently we don't override interruptTask, so
1469              // mayInterruptIfRunning==false isn't technically necessary.)
1470              cancel(false);
1471            }
1472          } catch (ExecutionException e) {
1473            if (allMustSucceed) {
1474              // As soon as the first one fails, throw the exception up.
1475              // The result of all other inputs is then ignored.
1476              setException(e.getCause());
1477            }
1478          } catch (RuntimeException e) {
1479            if (allMustSucceed) {
1480              setException(e);
1481            }
1482          } catch (Error e) {
1483            // Propagate errors up ASAP - our superclass will rethrow the error
1484            setException(e);
1485          } finally {
1486            int newRemaining = remaining.decrementAndGet();
1487            checkState(newRemaining >= 0, "Less than 0 remaining futures");
1488            if (newRemaining == 0) {
1489              localValues = values;
1490              if (localValues != null) {
1491                set(Lists.newArrayList(localValues));
1492              } else {
1493                checkState(isDone());
1494              }
1495            }
1496          }
1497        }
1498    
1499        @Override
1500        public List<V> get() throws InterruptedException, ExecutionException {
1501          callAllGets();
1502    
1503          // This may still block in spite of the calls above, as the listeners may
1504          // be scheduled for execution in other threads.
1505          return super.get();
1506        }
1507    
1508        /**
1509         * Calls the get method of all dependency futures to work around a bug in
1510         * some ListenableFutures where the listeners aren't called until get() is
1511         * called.
1512         */
1513        private void callAllGets() throws InterruptedException {
1514          List<? extends ListenableFuture<? extends V>> oldFutures = futures;
1515          if (oldFutures != null && !isDone()) {
1516            for (ListenableFuture<? extends V> future : oldFutures) {
1517              // We wait for a little while for the future, but if it's not done,
1518              // we check that no other futures caused a cancellation or failure.
1519              // This can introduce a delay of up to 10ms in reporting an exception.
1520              while (!future.isDone()) {
1521                try {
1522                  future.get();
1523                } catch (Error e) {
1524                  throw e;
1525                } catch (InterruptedException e) {
1526                  throw e;
1527                } catch (Throwable e) {
1528                  // ExecutionException / CancellationException / RuntimeException
1529                  if (allMustSucceed) {
1530                    return;
1531                  } else {
1532                    continue;
1533                  }
1534                }
1535              }
1536            }
1537          }
1538        }
1539      }
1540    
1541      /**
1542       * A checked future that uses a function to map from exceptions to the
1543       * appropriate checked type.
1544       */
1545      private static class MappingCheckedFuture<V, X extends Exception> extends
1546          AbstractCheckedFuture<V, X> {
1547    
1548        final Function<Exception, X> mapper;
1549    
1550        MappingCheckedFuture(ListenableFuture<V> delegate,
1551            Function<Exception, X> mapper) {
1552          super(delegate);
1553    
1554          this.mapper = checkNotNull(mapper);
1555        }
1556    
1557        @Override
1558        protected X mapException(Exception e) {
1559          return mapper.apply(e);
1560        }
1561      }
1562    }