001    /*
002     * Copyright (C) 2006 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkArgument;
020    import static com.google.common.base.Preconditions.checkNotNull;
021    import static com.google.common.base.Preconditions.checkState;
022    import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023    import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024    import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
025    import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
026    import static java.lang.Thread.currentThread;
027    import static java.util.Arrays.asList;
028    
029    import com.google.common.annotations.Beta;
030    import com.google.common.base.Function;
031    import com.google.common.base.Preconditions;
032    import com.google.common.collect.ImmutableList;
033    import com.google.common.collect.Lists;
034    import com.google.common.collect.Ordering;
035    
036    import java.lang.reflect.Constructor;
037    import java.lang.reflect.InvocationTargetException;
038    import java.lang.reflect.UndeclaredThrowableException;
039    import java.util.Arrays;
040    import java.util.List;
041    import java.util.concurrent.BlockingQueue;
042    import java.util.concurrent.CancellationException;
043    import java.util.concurrent.CountDownLatch;
044    import java.util.concurrent.ExecutionException;
045    import java.util.concurrent.Executor;
046    import java.util.concurrent.Future;
047    import java.util.concurrent.LinkedBlockingQueue;
048    import java.util.concurrent.TimeUnit;
049    import java.util.concurrent.TimeoutException;
050    import java.util.concurrent.atomic.AtomicInteger;
051    
052    import javax.annotation.Nullable;
053    
054    /**
055     * Static utility methods pertaining to the {@link Future} interface.
056     *
057     * <p>Many of these methods use the {@link ListenableFuture} API; consult the
058     * Guava User Guide article on <a href=
059     * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
060     * {@code ListenableFuture}</a>.
061     *
062     * @author Kevin Bourrillion
063     * @author Nishant Thakkar
064     * @author Sven Mawson
065     * @since 1.0
066     */
067    @Beta
068    public final class Futures {
069      private Futures() {}
070    
071      /**
072       * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
073       * and a {@link Function} that maps from {@link Exception} instances into the
074       * appropriate checked type.
075       *
076       * <p>The given mapping function will be applied to an
077       * {@link InterruptedException}, a {@link CancellationException}, or an
078       * {@link ExecutionException} with the actual cause of the exception.
079       * See {@link Future#get()} for details on the exceptions thrown.
080       *
081       * @since 9.0 (source-compatible since 1.0)
082       */
083      public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
084          ListenableFuture<V> future, Function<Exception, X> mapper) {
085        return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
086      }
087    
088      /**
089       * Creates a {@code ListenableFuture} which has its value set immediately upon
090       * construction. The getters just return the value. This {@code Future} can't
091       * be canceled or timed out and its {@code isDone()} method always returns
092       * {@code true}.
093       */
094      public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
095        SettableFuture<V> future = SettableFuture.create();
096        future.set(value);
097        return future;
098      }
099    
100      /**
101       * Returns a {@code CheckedFuture} which has its value set immediately upon
102       * construction.
103       *
104       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
105       * method always returns {@code true}. Calling {@code get()} or {@code
106       * checkedGet()} will immediately return the provided value.
107       */
108      public static <V, X extends Exception> CheckedFuture<V, X>
109          immediateCheckedFuture(@Nullable V value) {
110        SettableFuture<V> future = SettableFuture.create();
111        future.set(value);
112        return Futures.makeChecked(future, new Function<Exception, X>() {
113          @Override
114          public X apply(Exception e) {
115            throw new AssertionError("impossible");
116          }
117        });
118      }
119    
120      /**
121       * Returns a {@code ListenableFuture} which has an exception set immediately
122       * upon construction.
123       *
124       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
125       * method always returns {@code true}. Calling {@code get()} will immediately
126       * throw the provided {@code Throwable} wrapped in an {@code
127       * ExecutionException}.
128       *
129       * @throws Error if the throwable is an {@link Error}.
130       */
131      public static <V> ListenableFuture<V> immediateFailedFuture(
132          Throwable throwable) {
133        checkNotNull(throwable);
134        SettableFuture<V> future = SettableFuture.create();
135        future.setException(throwable);
136        return future;
137      }
138    
139      /**
140       * Returns a {@code CheckedFuture} which has an exception set immediately upon
141       * construction.
142       *
143       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
144       * method always returns {@code true}. Calling {@code get()} will immediately
145       * throw the provided {@code Throwable} wrapped in an {@code
146       * ExecutionException}, and calling {@code checkedGet()} will throw the
147       * provided exception itself.
148       *
149       * @throws Error if the throwable is an {@link Error}.
150       */
151      public static <V, X extends Exception> CheckedFuture<V, X>
152          immediateFailedCheckedFuture(final X exception) {
153        checkNotNull(exception);
154        return makeChecked(Futures.<V>immediateFailedFuture(exception),
155            new Function<Exception, X>() {
156              @Override
157              public X apply(Exception e) {
158                return exception;
159              }
160            });
161      }
162    
163      /**
164       * Returns a new {@code ListenableFuture} whose result is asynchronously
165       * derived from the result of the given {@code Future}. More precisely, the
166       * returned {@code Future} takes its result from a {@code Future} produced by
167       * applying the given {@code AsyncFunction} to the result of the original
168       * {@code Future}. Example:
169       *
170       * <pre>   {@code
171       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
172       *   AsyncFunction<RowKey, QueryResult> queryFunction =
173       *       new AsyncFunction<RowKey, QueryResult>() {
174       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
175       *           return dataService.read(rowKey);
176       *         }
177       *       };
178       *   ListenableFuture<QueryResult> queryFuture =
179       *       transform(rowKeyFuture, queryFunction);
180       * }</pre>
181       *
182       * Note: If the derived {@code Future} is slow or heavyweight to create
183       * (whether the {@code Future} itself is slow or heavyweight to complete is
184       * irrelevant), consider {@linkplain #transform(ListenableFuture,
185       * AsyncFunction, Executor) supplying an executor}. If you do not supply an
186       * executor, {@code transform} will use {@link
187       * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
188       * caveats for heavier operations. For example, the call to {@code
189       * function.apply} may run on an unpredictable or undesirable thread:
190       *
191       * <ul>
192       * <li>If the input {@code Future} is done at the time {@code transform} is
193       * called, {@code transform} will call {@code function.apply} inline.
194       * <li>If the input {@code Future} is not yet done, {@code transform} will
195       * schedule {@code function.apply} to be run by the thread that completes the
196       * input {@code Future}, which may be an internal system thread such as an
197       * RPC network thread.
198       * </ul>
199       *
200       * Also note that, regardless of which thread executes {@code
201       * function.apply}, all other registered but unexecuted listeners are
202       * prevented from running during its execution, even if those listeners are
203       * to run in other executors.
204       *
205       * <p>The returned {@code Future} attempts to keep its cancellation state in
206       * sync with that of the input future and that of the future returned by the
207       * function. That is, if the returned {@code Future} is cancelled, it will
208       * attempt to cancel the other two, and if either of the other two is
209       * cancelled, the returned {@code Future} will receive a callback in which it
210       * will attempt to cancel itself.
211       *
212       * @param input The future to transform
213       * @param function A function to transform the result of the input future
214       *     to the result of the output future
215       * @return A future that holds result of the function (if the input succeeded)
216       *     or the original input's failure (if not)
217       * @since 11.0
218       */
219      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
220          AsyncFunction<? super I, ? extends O> function) {
221        return transform(input, function, MoreExecutors.sameThreadExecutor());
222      }
223    
224      /**
225       * Returns a new {@code ListenableFuture} whose result is asynchronously
226       * derived from the result of the given {@code Future}. More precisely, the
227       * returned {@code Future} takes its result from a {@code Future} produced by
228       * applying the given {@code AsyncFunction} to the result of the original
229       * {@code Future}. Example:
230       *
231       * <pre>   {@code
232       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
233       *   AsyncFunction<RowKey, QueryResult> queryFunction =
234       *       new AsyncFunction<RowKey, QueryResult>() {
235       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
236       *           return dataService.read(rowKey);
237       *         }
238       *       };
239       *   ListenableFuture<QueryResult> queryFuture =
240       *       transform(rowKeyFuture, queryFunction, executor);
241       * }</pre>
242       *
243       * <p>The returned {@code Future} attempts to keep its cancellation state in
244       * sync with that of the input future and that of the future returned by the
245       * chain function. That is, if the returned {@code Future} is cancelled, it
246       * will attempt to cancel the other two, and if either of the other two is
247       * cancelled, the returned {@code Future} will receive a callback in which it
248       * will attempt to cancel itself.
249       *
250       * <p>When the execution of {@code function.apply} is fast and lightweight
251       * (though the {@code Future} it returns need not meet these criteria),
252       * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting
253       * the executor} or explicitly specifying {@code sameThreadExecutor}.
254       * However, be aware of the caveats documented in the link above.
255       *
256       * @param input The future to transform
257       * @param function A function to transform the result of the input future
258       *     to the result of the output future
259       * @param executor Executor to run the function in.
260       * @return A future that holds result of the function (if the input succeeded)
261       *     or the original input's failure (if not)
262       * @since 11.0
263       */
264      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
265          AsyncFunction<? super I, ? extends O> function,
266          Executor executor) {
267        ChainingListenableFuture<I, O> output =
268            new ChainingListenableFuture<I, O>(function, input);
269        input.addListener(output, executor);
270        return output;
271      }
272    
273      /**
274       * Returns a new {@code ListenableFuture} whose result is the product of
275       * applying the given {@code Function} to the result of the given {@code
276       * Future}. Example:
277       *
278       * <pre>   {@code
279       *   ListenableFuture<QueryResult> queryFuture = ...;
280       *   Function<QueryResult, List<Row>> rowsFunction =
281       *       new Function<QueryResult, List<Row>>() {
282       *         public List<Row> apply(QueryResult queryResult) {
283       *           return queryResult.getRows();
284       *         }
285       *       };
286       *   ListenableFuture<List<Row>> rowsFuture =
287       *       transform(queryFuture, rowsFunction);
288       * }</pre>
289       *
290       * Note: If the transformation is slow or heavyweight, consider {@linkplain
291       * #transform(ListenableFuture, Function, Executor) supplying an executor}.
292       * If you do not supply an executor, {@code transform} will use {@link
293       * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some
294       * caveats for heavier operations.  For example, the call to {@code
295       * function.apply} may run on an unpredictable or undesirable thread:
296       *
297       * <ul>
298       * <li>If the input {@code Future} is done at the time {@code transform} is
299       * called, {@code transform} will call {@code function.apply} inline.
300       * <li>If the input {@code Future} is not yet done, {@code transform} will
301       * schedule {@code function.apply} to be run by the thread that completes the
302       * input {@code Future}, which may be an internal system thread such as an
303       * RPC network thread.
304       * </ul>
305       *
306       * Also note that, regardless of which thread executes {@code
307       * function.apply}, all other registered but unexecuted listeners are
308       * prevented from running during its execution, even if those listeners are
309       * to run in other executors.
310       *
311       * <p>The returned {@code Future} attempts to keep its cancellation state in
312       * sync with that of the input future. That is, if the returned {@code Future}
313       * is cancelled, it will attempt to cancel the input, and if the input is
314       * cancelled, the returned {@code Future} will receive a callback in which it
315       * will attempt to cancel itself.
316       *
317       * <p>An example use of this method is to convert a serializable object
318       * returned from an RPC into a POJO.
319       *
320       * @param input The future to transform
321       * @param function A Function to transform the results of the provided future
322       *     to the results of the returned future.  This will be run in the thread
323       *     that notifies input it is complete.
324       * @return A future that holds result of the transformation.
325       * @since 9.0 (in 1.0 as {@code compose})
326       */
327      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
328          final Function<? super I, ? extends O> function) {
329        return transform(input, function, MoreExecutors.sameThreadExecutor());
330      }
331    
332      /**
333       * Returns a new {@code ListenableFuture} whose result is the product of
334       * applying the given {@code Function} to the result of the given {@code
335       * Future}. Example:
336       *
337       * <pre>   {@code
338       *   ListenableFuture<QueryResult> queryFuture = ...;
339       *   Function<QueryResult, List<Row>> rowsFunction =
340       *       new Function<QueryResult, List<Row>>() {
341       *         public List<Row> apply(QueryResult queryResult) {
342       *           return queryResult.getRows();
343       *         }
344       *       };
345       *   ListenableFuture<List<Row>> rowsFuture =
346       *       transform(queryFuture, rowsFunction, executor);
347       * }</pre>
348       *
349       * <p>The returned {@code Future} attempts to keep its cancellation state in
350       * sync with that of the input future. That is, if the returned {@code Future}
351       * is cancelled, it will attempt to cancel the input, and if the input is
352       * cancelled, the returned {@code Future} will receive a callback in which it
353       * will attempt to cancel itself.
354       *
355       * <p>An example use of this method is to convert a serializable object
356       * returned from an RPC into a POJO.
357       *
358       * <p>When the transformation is fast and lightweight, consider {@linkplain
359       * #transform(ListenableFuture, Function) omitting the executor} or
360       * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
361       * caveats documented in the link above.
362       *
363       * @param input The future to transform
364       * @param function A Function to transform the results of the provided future
365       *     to the results of the returned future.
366       * @param executor Executor to run the function in.
367       * @return A future that holds result of the transformation.
368       * @since 9.0 (in 2.0 as {@code compose})
369       */
370      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
371          final Function<? super I, ? extends O> function, Executor executor) {
372        checkNotNull(function);
373        AsyncFunction<I, O> wrapperFunction
374            = new AsyncFunction<I, O>() {
375                @Override public ListenableFuture<O> apply(I input) {
376                  O output = function.apply(input);
377                  return immediateFuture(output);
378                }
379            };
380        return transform(input, wrapperFunction, executor);
381      }
382    
383      /**
384       * Like {@link #transform(ListenableFuture, Function)} except that the
385       * transformation {@code function} is invoked on each call to
386       * {@link Future#get() get()} on the returned future.
387       *
388       * <p>The returned {@code Future} reflects the input's cancellation
389       * state directly, and any attempt to cancel the returned Future is likewise
390       * passed through to the input Future.
391       *
392       * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
393       * only apply the timeout to the execution of the underlying {@code Future},
394       * <em>not</em> to the execution of the transformation function.
395       *
396       * <p>The primary audience of this method is callers of {@code transform}
397       * who don't have a {@code ListenableFuture} available and
398       * do not mind repeated, lazy function evaluation.
399       *
400       * @param input The future to transform
401       * @param function A Function to transform the results of the provided future
402       *     to the results of the returned future.
403       * @return A future that returns the result of the transformation.
404       * @since 10.0
405       */
406      @Beta
407      public static <I, O> Future<O> lazyTransform(final Future<I> input,
408          final Function<? super I, ? extends O> function) {
409        checkNotNull(input);
410        checkNotNull(function);
411        return new Future<O>() {
412    
413          @Override
414          public boolean cancel(boolean mayInterruptIfRunning) {
415            return input.cancel(mayInterruptIfRunning);
416          }
417    
418          @Override
419          public boolean isCancelled() {
420            return input.isCancelled();
421          }
422    
423          @Override
424          public boolean isDone() {
425            return input.isDone();
426          }
427    
428          @Override
429          public O get() throws InterruptedException, ExecutionException {
430            return applyTransformation(input.get());
431          }
432    
433          @Override
434          public O get(long timeout, TimeUnit unit)
435              throws InterruptedException, ExecutionException, TimeoutException {
436            return applyTransformation(input.get(timeout, unit));
437          }
438    
439          private O applyTransformation(I input) throws ExecutionException {
440            try {
441              return function.apply(input);
442            } catch (Throwable t) {
443              throw new ExecutionException(t);
444            }
445          }
446        };
447      }
448    
449      /**
450       * An implementation of {@code ListenableFuture} that also implements
451       * {@code Runnable} so that it can be used to nest ListenableFutures.
452       * Once the passed-in {@code ListenableFuture} is complete, it calls the
453       * passed-in {@code Function} to generate the result.
454       *
455       * <p>If the function throws any checked exceptions, they should be wrapped
456       * in a {@code UndeclaredThrowableException} so that this class can get
457       * access to the cause.
458       */
459      private static class ChainingListenableFuture<I, O>
460          extends AbstractFuture<O> implements Runnable {
461    
462        private AsyncFunction<? super I, ? extends O> function;
463        private ListenableFuture<? extends I> inputFuture;
464        private volatile ListenableFuture<? extends O> outputFuture;
465        private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
466            new LinkedBlockingQueue<Boolean>(1);
467        private final CountDownLatch outputCreated = new CountDownLatch(1);
468    
469        private ChainingListenableFuture(
470            AsyncFunction<? super I, ? extends O> function,
471            ListenableFuture<? extends I> inputFuture) {
472          this.function = checkNotNull(function);
473          this.inputFuture = checkNotNull(inputFuture);
474        }
475    
476        @Override
477        public boolean cancel(boolean mayInterruptIfRunning) {
478          /*
479           * Our additional cancellation work needs to occur even if
480           * !mayInterruptIfRunning, so we can't move it into interruptTask().
481           */
482          if (super.cancel(mayInterruptIfRunning)) {
483            // This should never block since only one thread is allowed to cancel
484            // this Future.
485            putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
486            cancel(inputFuture, mayInterruptIfRunning);
487            cancel(outputFuture, mayInterruptIfRunning);
488            return true;
489          }
490          return false;
491        }
492    
493        private void cancel(@Nullable Future<?> future,
494            boolean mayInterruptIfRunning) {
495          if (future != null) {
496            future.cancel(mayInterruptIfRunning);
497          }
498        }
499    
500        @Override
501        public void run() {
502          try {
503            I sourceResult;
504            try {
505              sourceResult = getUninterruptibly(inputFuture);
506            } catch (CancellationException e) {
507              // Cancel this future and return.
508              // At this point, inputFuture is cancelled and outputFuture doesn't
509              // exist, so the value of mayInterruptIfRunning is irrelevant.
510              cancel(false);
511              return;
512            } catch (ExecutionException e) {
513              // Set the cause of the exception as this future's exception
514              setException(e.getCause());
515              return;
516            }
517    
518            final ListenableFuture<? extends O> outputFuture = this.outputFuture =
519                function.apply(sourceResult);
520            if (isCancelled()) {
521              // Handles the case where cancel was called while the function was
522              // being applied.
523              // There is a gap in cancel(boolean) between calling sync.cancel()
524              // and storing the value of mayInterruptIfRunning, so this thread
525              // needs to block, waiting for that value.
526              outputFuture.cancel(
527                  takeUninterruptibly(mayInterruptIfRunningChannel));
528              this.outputFuture = null;
529              return;
530            }
531            outputFuture.addListener(new Runnable() {
532                @Override
533                public void run() {
534                  try {
535                    // Here it would have been nice to have had an
536                    // UninterruptibleListenableFuture, but we don't want to start a
537                    // combinatorial explosion of interfaces, so we have to make do.
538                    set(getUninterruptibly(outputFuture));
539                  } catch (CancellationException e) {
540                    // Cancel this future and return.
541                    // At this point, inputFuture and outputFuture are done, so the
542                    // value of mayInterruptIfRunning is irrelevant.
543                    cancel(false);
544                    return;
545                  } catch (ExecutionException e) {
546                    // Set the cause of the exception as this future's exception
547                    setException(e.getCause());
548                  } finally {
549                    // Don't pin inputs beyond completion
550                    ChainingListenableFuture.this.outputFuture = null;
551                  }
552                }
553              }, MoreExecutors.sameThreadExecutor());
554          } catch (UndeclaredThrowableException e) {
555            // Set the cause of the exception as this future's exception
556            setException(e.getCause());
557          } catch (Exception e) {
558            // This exception is irrelevant in this thread, but useful for the
559            // client
560            setException(e);
561          } catch (Error e) {
562            // Propagate errors up ASAP - our superclass will rethrow the error
563            setException(e);
564          } finally {
565            // Don't pin inputs beyond completion
566            function = null;
567            inputFuture = null;
568            // Allow our get routines to examine outputFuture now.
569            outputCreated.countDown();
570          }
571        }
572      }
573    
574      /**
575       * Returns a new {@code ListenableFuture} whose result is the product of
576       * calling {@code get()} on the {@code Future} nested within the given {@code
577       * Future}, effectively chaining the futures one after the other.  Example:
578       *
579       * <pre>   {@code
580       *   SettableFuture<ListenableFuture<String>> nested = SettableFuture.create();
581       *   ListenableFuture<String> dereferenced = dereference(nested);
582       * }</pre>
583       *
584       * <p>This call has the same cancellation and execution semantics as {@link
585       * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code
586       * Future} attempts to keep its cancellation state in sync with both the
587       * input {@code Future} and the nested {@code Future}.  The transformation
588       * is very lightweight and therefore takes place in the thread that called
589       * {@code dereference}.
590       *
591       * @param nested The nested future to transform.
592       * @return A future that holds result of the inner future.
593       * @since 13.0
594       */
595      @Beta
596      @SuppressWarnings({"rawtypes", "unchecked"})
597      public static <V> ListenableFuture<V> dereference(
598          ListenableFuture<? extends ListenableFuture<? extends V>> nested) {
599        return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER);
600      }
601    
602      /**
603       * Helper {@code Function} for {@link #dereference}.
604       */
605      private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER =
606          new AsyncFunction<ListenableFuture<Object>, Object>() {
607            @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) {
608              return input;
609            }
610          };
611    
612      /**
613       * Creates a new {@code ListenableFuture} whose value is a list containing the
614       * values of all its input futures, if all succeed. If any input fails, the
615       * returned future fails.
616       *
617       * <p>The list of results is in the same order as the input list.
618       *
619       * <p>Canceling this future does not cancel any of the component futures;
620       * however, if any of the provided futures fails or is canceled, this one is,
621       * too.
622       *
623       * @param futures futures to combine
624       * @return a future that provides a list of the results of the component
625       *         futures
626       * @since 10.0
627       */
628      @Beta
629      public static <V> ListenableFuture<List<V>> allAsList(
630          ListenableFuture<? extends V>... futures) {
631        return new ListFuture<V>(ImmutableList.copyOf(futures), true,
632            MoreExecutors.sameThreadExecutor());
633      }
634    
635      /**
636       * Creates a new {@code ListenableFuture} whose value is a list containing the
637       * values of all its input futures, if all succeed. If any input fails, the
638       * returned future fails.
639       *
640       * <p>The list of results is in the same order as the input list.
641       *
642       * <p>Canceling this future does not cancel any of the component futures;
643       * however, if any of the provided futures fails or is canceled, this one is,
644       * too.
645       *
646       * @param futures futures to combine
647       * @return a future that provides a list of the results of the component
648       *         futures
649       * @since 10.0
650       */
651      @Beta
652      public static <V> ListenableFuture<List<V>> allAsList(
653          Iterable<? extends ListenableFuture<? extends V>> futures) {
654        return new ListFuture<V>(ImmutableList.copyOf(futures), true,
655            MoreExecutors.sameThreadExecutor());
656      }
657    
658      /**
659       * Creates a new {@code ListenableFuture} whose value is a list containing the
660       * values of all its successful input futures. The list of results is in the
661       * same order as the input list, and if any of the provided futures fails or
662       * is canceled, its corresponding position will contain {@code null} (which is
663       * indistinguishable from the future having a successful value of
664       * {@code null}).
665       *
666       * @param futures futures to combine
667       * @return a future that provides a list of the results of the component
668       *         futures
669       * @since 10.0
670       */
671      @Beta
672      public static <V> ListenableFuture<List<V>> successfulAsList(
673          ListenableFuture<? extends V>... futures) {
674        return new ListFuture<V>(ImmutableList.copyOf(futures), false,
675            MoreExecutors.sameThreadExecutor());
676      }
677    
678      /**
679       * Creates a new {@code ListenableFuture} whose value is a list containing the
680       * values of all its successful input futures. The list of results is in the
681       * same order as the input list, and if any of the provided futures fails or
682       * is canceled, its corresponding position will contain {@code null} (which is
683       * indistinguishable from the future having a successful value of
684       * {@code null}).
685       *
686       * @param futures futures to combine
687       * @return a future that provides a list of the results of the component
688       *         futures
689       * @since 10.0
690       */
691      @Beta
692      public static <V> ListenableFuture<List<V>> successfulAsList(
693          Iterable<? extends ListenableFuture<? extends V>> futures) {
694        return new ListFuture<V>(ImmutableList.copyOf(futures), false,
695            MoreExecutors.sameThreadExecutor());
696      }
697    
698      /**
699       * Registers separate success and failure callbacks to be run when the {@code
700       * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
701       * complete} or, if the computation is already complete, immediately.
702       *
703       * <p>There is no guaranteed ordering of execution of callbacks, but any
704       * callback added through this method is guaranteed to be called once the
705       * computation is complete.
706       *
707       * Example: <pre> {@code
708       * ListenableFuture<QueryResult> future = ...;
709       * addCallback(future,
710       *     new FutureCallback<QueryResult> {
711       *       public void onSuccess(QueryResult result) {
712       *         storeInCache(result);
713       *       }
714       *       public void onFailure(Throwable t) {
715       *         reportError(t);
716       *       }
717       *     });}</pre>
718       *
719       * Note: If the callback is slow or heavyweight, consider {@linkplain
720       * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an
721       * executor}. If you do not supply an executor, {@code addCallback} will use
722       * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries
723       * some caveats for heavier operations. For example, the callback may run on
724       * an unpredictable or undesirable thread:
725       *
726       * <ul>
727       * <li>If the input {@code Future} is done at the time {@code addCallback} is
728       * called, {@code addCallback} will execute the callback inline.
729       * <li>If the input {@code Future} is not yet done, {@code addCallback} will
730       * schedule the callback to be run by the thread that completes the input
731       * {@code Future}, which may be an internal system thread such as an RPC
732       * network thread.
733       * </ul>
734       *
735       * Also note that, regardless of which thread executes the callback, all
736       * other registered but unexecuted listeners are prevented from running
737       * during its execution, even if those listeners are to run in other
738       * executors.
739       *
740       * <p>For a more general interface to attach a completion listener to a
741       * {@code Future}, see {@link ListenableFuture#addListener addListener}.
742       *
743       * @param future The future attach the callback to.
744       * @param callback The callback to invoke when {@code future} is completed.
745       * @since 10.0
746       */
747      public static <V> void addCallback(ListenableFuture<V> future,
748          FutureCallback<? super V> callback) {
749        addCallback(future, callback, MoreExecutors.sameThreadExecutor());
750      }
751    
752      /**
753       * Registers separate success and failure callbacks to be run when the {@code
754       * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
755       * complete} or, if the computation is already complete, immediately.
756       *
757       * <p>The callback is run in {@code executor}.
758       * There is no guaranteed ordering of execution of callbacks, but any
759       * callback added through this method is guaranteed to be called once the
760       * computation is complete.
761       *
762       * Example: <pre> {@code
763       * ListenableFuture<QueryResult> future = ...;
764       * Executor e = ...
765       * addCallback(future, e,
766       *     new FutureCallback<QueryResult> {
767       *       public void onSuccess(QueryResult result) {
768       *         storeInCache(result);
769       *       }
770       *       public void onFailure(Throwable t) {
771       *         reportError(t);
772       *       }
773       *     });}</pre>
774       *
775       * When the callback is fast and lightweight, consider {@linkplain
776       * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or
777       * explicitly specifying {@code sameThreadExecutor}. However, be aware of the
778       * caveats documented in the link above.
779       *
780       * <p>For a more general interface to attach a completion listener to a
781       * {@code Future}, see {@link ListenableFuture#addListener addListener}.
782       *
783       * @param future The future attach the callback to.
784       * @param callback The callback to invoke when {@code future} is completed.
785       * @param executor The executor to run {@code callback} when the future
786       *    completes.
787       * @since 10.0
788       */
789      public static <V> void addCallback(final ListenableFuture<V> future,
790          final FutureCallback<? super V> callback, Executor executor) {
791        Preconditions.checkNotNull(callback);
792        Runnable callbackListener = new Runnable() {
793          @Override
794          public void run() {
795            try {
796              // TODO(user): (Before Guava release), validate that this
797              // is the thing for IE.
798              V value = getUninterruptibly(future);
799              callback.onSuccess(value);
800            } catch (ExecutionException e) {
801              callback.onFailure(e.getCause());
802            } catch (RuntimeException e) {
803              callback.onFailure(e);
804            } catch (Error e) {
805              callback.onFailure(e);
806            }
807          }
808        };
809        future.addListener(callbackListener, executor);
810      }
811    
812      /**
813       * Returns the result of {@link Future#get()}, converting most exceptions to a
814       * new instance of the given checked exception type. This reduces boilerplate
815       * for a common use of {@code Future} in which it is unnecessary to
816       * programmatically distinguish between exception types or to extract other
817       * information from the exception instance.
818       *
819       * <p>Exceptions from {@code Future.get} are treated as follows:
820       * <ul>
821       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
822       *     {@code X} if the cause is a checked exception, an {@link
823       *     UncheckedExecutionException} if the cause is a {@code
824       *     RuntimeException}, or an {@link ExecutionError} if the cause is an
825       *     {@code Error}.
826       * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
827       *     restoring the interrupt).
828       * <li>Any {@link CancellationException} is propagated untouched, as is any
829       *     other {@link RuntimeException} (though {@code get} implementations are
830       *     discouraged from throwing such exceptions).
831       * </ul>
832       *
833       * The overall principle is to continue to treat every checked exception as a
834       * checked exception, every unchecked exception as an unchecked exception, and
835       * every error as an error. In addition, the cause of any {@code
836       * ExecutionException} is wrapped in order to ensure that the new stack trace
837       * matches that of the current thread.
838       *
839       * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
840       * public constructor that accepts zero or more arguments, all of type {@code
841       * String} or {@code Throwable} (preferring constructors with at least one
842       * {@code String}) and calling the constructor via reflection. If the
843       * exception did not already have a cause, one is set by calling {@link
844       * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
845       * {@code IllegalArgumentException} is thrown.
846       *
847       * @throws X if {@code get} throws any checked exception except for an {@code
848       *         ExecutionException} whose cause is not itself a checked exception
849       * @throws UncheckedExecutionException if {@code get} throws an {@code
850       *         ExecutionException} with a {@code RuntimeException} as its cause
851       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
852       *         with an {@code Error} as its cause
853       * @throws CancellationException if {@code get} throws a {@code
854       *         CancellationException}
855       * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
856       *         RuntimeException} or does not have a suitable constructor
857       * @since 10.0
858       */
859      @Beta
860      public static <V, X extends Exception> V get(
861          Future<V> future, Class<X> exceptionClass) throws X {
862        checkNotNull(future);
863        checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
864            "Futures.get exception type (%s) must not be a RuntimeException",
865            exceptionClass);
866        try {
867          return future.get();
868        } catch (InterruptedException e) {
869          currentThread().interrupt();
870          throw newWithCause(exceptionClass, e);
871        } catch (ExecutionException e) {
872          wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
873          throw new AssertionError();
874        }
875      }
876    
877      /**
878       * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
879       * exceptions to a new instance of the given checked exception type. This
880       * reduces boilerplate for a common use of {@code Future} in which it is
881       * unnecessary to programmatically distinguish between exception types or to
882       * extract other information from the exception instance.
883       *
884       * <p>Exceptions from {@code Future.get} are treated as follows:
885       * <ul>
886       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
887       *     {@code X} if the cause is a checked exception, an {@link
888       *     UncheckedExecutionException} if the cause is a {@code
889       *     RuntimeException}, or an {@link ExecutionError} if the cause is an
890       *     {@code Error}.
891       * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
892       *     restoring the interrupt).
893       * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
894       * <li>Any {@link CancellationException} is propagated untouched, as is any
895       *     other {@link RuntimeException} (though {@code get} implementations are
896       *     discouraged from throwing such exceptions).
897       * </ul>
898       *
899       * The overall principle is to continue to treat every checked exception as a
900       * checked exception, every unchecked exception as an unchecked exception, and
901       * every error as an error. In addition, the cause of any {@code
902       * ExecutionException} is wrapped in order to ensure that the new stack trace
903       * matches that of the current thread.
904       *
905       * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
906       * public constructor that accepts zero or more arguments, all of type {@code
907       * String} or {@code Throwable} (preferring constructors with at least one
908       * {@code String}) and calling the constructor via reflection. If the
909       * exception did not already have a cause, one is set by calling {@link
910       * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
911       * {@code IllegalArgumentException} is thrown.
912       *
913       * @throws X if {@code get} throws any checked exception except for an {@code
914       *         ExecutionException} whose cause is not itself a checked exception
915       * @throws UncheckedExecutionException if {@code get} throws an {@code
916       *         ExecutionException} with a {@code RuntimeException} as its cause
917       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
918       *         with an {@code Error} as its cause
919       * @throws CancellationException if {@code get} throws a {@code
920       *         CancellationException}
921       * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
922       *         RuntimeException} or does not have a suitable constructor
923       * @since 10.0
924       */
925      @Beta
926      public static <V, X extends Exception> V get(
927          Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
928          throws X {
929        checkNotNull(future);
930        checkNotNull(unit);
931        checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
932            "Futures.get exception type (%s) must not be a RuntimeException",
933            exceptionClass);
934        try {
935          return future.get(timeout, unit);
936        } catch (InterruptedException e) {
937          currentThread().interrupt();
938          throw newWithCause(exceptionClass, e);
939        } catch (TimeoutException e) {
940          throw newWithCause(exceptionClass, e);
941        } catch (ExecutionException e) {
942          wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
943          throw new AssertionError();
944        }
945      }
946    
947      private static <X extends Exception> void wrapAndThrowExceptionOrError(
948          Throwable cause, Class<X> exceptionClass) throws X {
949        if (cause instanceof Error) {
950          throw new ExecutionError((Error) cause);
951        }
952        if (cause instanceof RuntimeException) {
953          throw new UncheckedExecutionException(cause);
954        }
955        throw newWithCause(exceptionClass, cause);
956      }
957    
958      /**
959       * Returns the result of calling {@link Future#get()} uninterruptibly on a
960       * task known not to throw a checked exception. This makes {@code Future} more
961       * suitable for lightweight, fast-running tasks that, barring bugs in the
962       * code, will not fail. This gives it exception-handling behavior similar to
963       * that of {@code ForkJoinTask.join}.
964       *
965       * <p>Exceptions from {@code Future.get} are treated as follows:
966       * <ul>
967       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
968       *     {@link UncheckedExecutionException} (if the cause is an {@code
969       *     Exception}) or {@link ExecutionError} (if the cause is an {@code
970       *     Error}).
971       * <li>Any {@link InterruptedException} causes a retry of the {@code get}
972       *     call. The interrupt is restored before {@code getUnchecked} returns.
973       * <li>Any {@link CancellationException} is propagated untouched. So is any
974       *     other {@link RuntimeException} ({@code get} implementations are
975       *     discouraged from throwing such exceptions).
976       * </ul>
977       *
978       * The overall principle is to eliminate all checked exceptions: to loop to
979       * avoid {@code InterruptedException}, to pass through {@code
980       * CancellationException}, and to wrap any exception from the underlying
981       * computation in an {@code UncheckedExecutionException} or {@code
982       * ExecutionError}.
983       *
984       * <p>For an uninterruptible {@code get} that preserves other exceptions, see
985       * {@link Uninterruptibles#getUninterruptibly(Future)}.
986       *
987       * @throws UncheckedExecutionException if {@code get} throws an {@code
988       *         ExecutionException} with an {@code Exception} as its cause
989       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
990       *         with an {@code Error} as its cause
991       * @throws CancellationException if {@code get} throws a {@code
992       *         CancellationException}
993       * @since 10.0
994       */
995      @Beta
996      public static <V> V getUnchecked(Future<V> future) {
997        checkNotNull(future);
998        try {
999          return getUninterruptibly(future);
1000        } catch (ExecutionException e) {
1001          wrapAndThrowUnchecked(e.getCause());
1002          throw new AssertionError();
1003        }
1004      }
1005    
1006      private static void wrapAndThrowUnchecked(Throwable cause) {
1007        if (cause instanceof Error) {
1008          throw new ExecutionError((Error) cause);
1009        }
1010        /*
1011         * It's a non-Error, non-Exception Throwable. From my survey of such
1012         * classes, I believe that most users intended to extend Exception, so we'll
1013         * treat it like an Exception.
1014         */
1015        throw new UncheckedExecutionException(cause);
1016      }
1017    
1018      /*
1019       * TODO(user): FutureChecker interface for these to be static methods on? If
1020       * so, refer to it in the (static-method) Futures.get documentation
1021       */
1022    
1023      /*
1024       * Arguably we don't need a timed getUnchecked because any operation slow
1025       * enough to require a timeout is heavyweight enough to throw a checked
1026       * exception and therefore be inappropriate to use with getUnchecked. Further,
1027       * it's not clear that converting the checked TimeoutException to a
1028       * RuntimeException -- especially to an UncheckedExecutionException, since it
1029       * wasn't thrown by the computation -- makes sense, and if we don't convert
1030       * it, the user still has to write a try-catch block.
1031       *
1032       * If you think you would use this method, let us know.
1033       */
1034    
1035      private static <X extends Exception> X newWithCause(
1036          Class<X> exceptionClass, Throwable cause) {
1037        // getConstructors() guarantees this as long as we don't modify the array.
1038        @SuppressWarnings("unchecked")
1039        List<Constructor<X>> constructors =
1040            (List) Arrays.asList(exceptionClass.getConstructors());
1041        for (Constructor<X> constructor : preferringStrings(constructors)) {
1042          @Nullable X instance = newFromConstructor(constructor, cause);
1043          if (instance != null) {
1044            if (instance.getCause() == null) {
1045              instance.initCause(cause);
1046            }
1047            return instance;
1048          }
1049        }
1050        throw new IllegalArgumentException(
1051            "No appropriate constructor for exception of type " + exceptionClass
1052                + " in response to chained exception", cause);
1053      }
1054    
1055      private static <X extends Exception> List<Constructor<X>>
1056          preferringStrings(List<Constructor<X>> constructors) {
1057        return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1058      }
1059    
1060      private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1061          Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1062            @Override public Boolean apply(Constructor<?> input) {
1063              return asList(input.getParameterTypes()).contains(String.class);
1064            }
1065          }).reverse();
1066    
1067      @Nullable private static <X> X newFromConstructor(
1068          Constructor<X> constructor, Throwable cause) {
1069        Class<?>[] paramTypes = constructor.getParameterTypes();
1070        Object[] params = new Object[paramTypes.length];
1071        for (int i = 0; i < paramTypes.length; i++) {
1072          Class<?> paramType = paramTypes[i];
1073          if (paramType.equals(String.class)) {
1074            params[i] = cause.toString();
1075          } else if (paramType.equals(Throwable.class)) {
1076            params[i] = cause;
1077          } else {
1078            return null;
1079          }
1080        }
1081        try {
1082          return constructor.newInstance(params);
1083        } catch (IllegalArgumentException e) {
1084          return null;
1085        } catch (InstantiationException e) {
1086          return null;
1087        } catch (IllegalAccessException e) {
1088          return null;
1089        } catch (InvocationTargetException e) {
1090          return null;
1091        }
1092      }
1093    
1094      /**
1095       * Class that implements {@link #allAsList} and {@link #successfulAsList}.
1096       * The idea is to create a (null-filled) List and register a listener with
1097       * each component future to fill out the value in the List when that future
1098       * completes.
1099       */
1100      private static class ListFuture<V> extends AbstractFuture<List<V>> {
1101        ImmutableList<? extends ListenableFuture<? extends V>> futures;
1102        final boolean allMustSucceed;
1103        final AtomicInteger remaining;
1104        List<V> values;
1105    
1106        /**
1107         * Constructor.
1108         *
1109         * @param futures all the futures to build the list from
1110         * @param allMustSucceed whether a single failure or cancellation should
1111         *        propagate to this future
1112         * @param listenerExecutor used to run listeners on all the passed in
1113         *        futures.
1114         */
1115        ListFuture(
1116            final ImmutableList<? extends ListenableFuture<? extends V>> futures,
1117            final boolean allMustSucceed, final Executor listenerExecutor) {
1118          this.futures = futures;
1119          this.values = Lists.newArrayListWithCapacity(futures.size());
1120          this.allMustSucceed = allMustSucceed;
1121          this.remaining = new AtomicInteger(futures.size());
1122    
1123          init(listenerExecutor);
1124        }
1125    
1126        private void init(final Executor listenerExecutor) {
1127          // First, schedule cleanup to execute when the Future is done.
1128          addListener(new Runnable() {
1129            @Override
1130            public void run() {
1131              // By now the values array has either been set as the Future's value,
1132              // or (in case of failure) is no longer useful.
1133              ListFuture.this.values = null;
1134    
1135              // Let go of the memory held by other futures
1136              ListFuture.this.futures = null;
1137            }
1138          }, MoreExecutors.sameThreadExecutor());
1139    
1140          // Now begin the "real" initialization.
1141    
1142          // Corner case: List is empty.
1143          if (futures.isEmpty()) {
1144            set(Lists.newArrayList(values));
1145            return;
1146          }
1147    
1148          // Populate the results list with null initially.
1149          for (int i = 0; i < futures.size(); ++i) {
1150            values.add(null);
1151          }
1152    
1153          // Register a listener on each Future in the list to update
1154          // the state of this future.
1155          // Note that if all the futures on the list are done prior to completing
1156          // this loop, the last call to addListener() will callback to
1157          // setOneValue(), transitively call our cleanup listener, and set
1158          // this.futures to null.
1159          // We store a reference to futures to avoid the NPE.
1160          ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures;
1161          for (int i = 0; i < localFutures.size(); i++) {
1162            final ListenableFuture<? extends V> listenable = localFutures.get(i);
1163            final int index = i;
1164            listenable.addListener(new Runnable() {
1165              @Override
1166              public void run() {
1167                setOneValue(index, listenable);
1168              }
1169            }, listenerExecutor);
1170          }
1171        }
1172    
1173        /**
1174         * Sets the value at the given index to that of the given future.
1175         */
1176        private void setOneValue(int index, Future<? extends V> future) {
1177          List<V> localValues = values;
1178          if (isDone() || localValues == null) {
1179            // Some other future failed or has been cancelled, causing this one to
1180            // also be cancelled or have an exception set. This should only happen
1181            // if allMustSucceed is true.
1182            checkState(allMustSucceed,
1183                "Future was done before all dependencies completed");
1184            return;
1185          }
1186    
1187          try {
1188            checkState(future.isDone(),
1189                "Tried to set value from future which is not done");
1190            localValues.set(index, getUninterruptibly(future));
1191          } catch (CancellationException e) {
1192            if (allMustSucceed) {
1193              // Set ourselves as cancelled. Let the input futures keep running
1194              // as some of them may be used elsewhere.
1195              // (Currently we don't override interruptTask, so
1196              // mayInterruptIfRunning==false isn't technically necessary.)
1197              cancel(false);
1198            }
1199          } catch (ExecutionException e) {
1200            if (allMustSucceed) {
1201              // As soon as the first one fails, throw the exception up.
1202              // The result of all other inputs is then ignored.
1203              setException(e.getCause());
1204            }
1205          } catch (RuntimeException e) {
1206            if (allMustSucceed) {
1207              setException(e);
1208            }
1209          } catch (Error e) {
1210            // Propagate errors up ASAP - our superclass will rethrow the error
1211            setException(e);
1212          } finally {
1213            int newRemaining = remaining.decrementAndGet();
1214            checkState(newRemaining >= 0, "Less than 0 remaining futures");
1215            if (newRemaining == 0) {
1216              localValues = values;
1217              if (localValues != null) {
1218                set(Lists.newArrayList(localValues));
1219              } else {
1220                checkState(isDone());
1221              }
1222            }
1223          }
1224        }
1225    
1226      }
1227    
1228      /**
1229       * A checked future that uses a function to map from exceptions to the
1230       * appropriate checked type.
1231       */
1232      private static class MappingCheckedFuture<V, X extends Exception> extends
1233          AbstractCheckedFuture<V, X> {
1234    
1235        final Function<Exception, X> mapper;
1236    
1237        MappingCheckedFuture(ListenableFuture<V> delegate,
1238            Function<Exception, X> mapper) {
1239          super(delegate);
1240    
1241          this.mapper = checkNotNull(mapper);
1242        }
1243    
1244        @Override
1245        protected X mapException(Exception e) {
1246          return mapper.apply(e);
1247        }
1248      }
1249    }