001    /*
002     * Copyright (C) 2006 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkArgument;
020    import static com.google.common.base.Preconditions.checkNotNull;
021    import static com.google.common.base.Preconditions.checkState;
022    import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023    import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024    import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
025    import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
026    import static java.lang.Thread.currentThread;
027    import static java.util.Arrays.asList;
028    
029    import com.google.common.annotations.Beta;
030    import com.google.common.base.Function;
031    import com.google.common.base.Preconditions;
032    import com.google.common.collect.ImmutableList;
033    import com.google.common.collect.Lists;
034    import com.google.common.collect.Ordering;
035    
036    import java.lang.reflect.Constructor;
037    import java.lang.reflect.InvocationTargetException;
038    import java.lang.reflect.UndeclaredThrowableException;
039    import java.util.Arrays;
040    import java.util.List;
041    import java.util.concurrent.BlockingQueue;
042    import java.util.concurrent.CancellationException;
043    import java.util.concurrent.CountDownLatch;
044    import java.util.concurrent.ExecutionException;
045    import java.util.concurrent.Executor;
046    import java.util.concurrent.Future;
047    import java.util.concurrent.LinkedBlockingQueue;
048    import java.util.concurrent.TimeUnit;
049    import java.util.concurrent.TimeoutException;
050    import java.util.concurrent.atomic.AtomicInteger;
051    
052    import javax.annotation.Nullable;
053    
054    /**
055     * Static utility methods pertaining to the {@link Future} interface.
056     *
057     * <p>Many of these methods use the {@link ListenableFuture} API; consult the
058     * Guava User Guide article on <a href=
059     * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained">
060     * {@code ListenableFuture}</a>.
061     *
062     * @author Kevin Bourrillion
063     * @author Nishant Thakkar
064     * @author Sven Mawson
065     * @since 1.0
066     */
067    @Beta
068    public final class Futures {
069      private Futures() {}
070    
071      /**
072       * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
073       * and a {@link Function} that maps from {@link Exception} instances into the
074       * appropriate checked type.
075       *
076       * <p>The given mapping function will be applied to an
077       * {@link InterruptedException}, a {@link CancellationException}, or an
078       * {@link ExecutionException} with the actual cause of the exception.
079       * See {@link Future#get()} for details on the exceptions thrown.
080       *
081       * @since 9.0 (source-compatible since 1.0)
082       */
083      public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
084          ListenableFuture<V> future, Function<Exception, X> mapper) {
085        return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
086      }
087    
088      /**
089       * Creates a {@code ListenableFuture} which has its value set immediately upon
090       * construction. The getters just return the value. This {@code Future} can't
091       * be canceled or timed out and its {@code isDone()} method always returns
092       * {@code true}.
093       */
094      public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
095        SettableFuture<V> future = SettableFuture.create();
096        future.set(value);
097        return future;
098      }
099    
100      /**
101       * Returns a {@code CheckedFuture} which has its value set immediately upon
102       * construction.
103       *
104       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
105       * method always returns {@code true}. Calling {@code get()} or {@code
106       * checkedGet()} will immediately return the provided value.
107       */
108      public static <V, X extends Exception> CheckedFuture<V, X>
109          immediateCheckedFuture(@Nullable V value) {
110        SettableFuture<V> future = SettableFuture.create();
111        future.set(value);
112        return Futures.makeChecked(future, new Function<Exception, X>() {
113          @Override
114          public X apply(Exception e) {
115            throw new AssertionError("impossible");
116          }
117        });
118      }
119    
120      /**
121       * Returns a {@code ListenableFuture} which has an exception set immediately
122       * upon construction.
123       *
124       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
125       * method always returns {@code true}. Calling {@code get()} will immediately
126       * throw the provided {@code Throwable} wrapped in an {@code
127       * ExecutionException}.
128       *
129       * @throws Error if the throwable is an {@link Error}.
130       */
131      public static <V> ListenableFuture<V> immediateFailedFuture(
132          Throwable throwable) {
133        checkNotNull(throwable);
134        SettableFuture<V> future = SettableFuture.create();
135        future.setException(throwable);
136        return future;
137      }
138    
139      /**
140       * Returns a {@code CheckedFuture} which has an exception set immediately upon
141       * construction.
142       *
143       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
144       * method always returns {@code true}. Calling {@code get()} will immediately
145       * throw the provided {@code Throwable} wrapped in an {@code
146       * ExecutionException}, and calling {@code checkedGet()} will throw the
147       * provided exception itself.
148       *
149       * @throws Error if the throwable is an {@link Error}.
150       */
151      public static <V, X extends Exception> CheckedFuture<V, X>
152          immediateFailedCheckedFuture(final X exception) {
153        checkNotNull(exception);
154        return makeChecked(Futures.<V>immediateFailedFuture(exception),
155            new Function<Exception, X>() {
156              @Override
157              public X apply(Exception e) {
158                return exception;
159              }
160            });
161      }
162    
163      /**
164       * Returns a new {@code ListenableFuture} whose result is asynchronously
165       * derived from the result of the given {@code Future}. More precisely, the
166       * returned {@code Future} takes its result from a {@code Future} produced by
167       * applying the given {@code AsyncFunction} to the result of the original
168       * {@code Future}. Example:
169       *
170       * <pre>   {@code
171       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
172       *   AsyncFunction<RowKey, QueryResult> queryFunction =
173       *       new AsyncFunction<RowKey, QueryResult>() {
174       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
175       *           return dataService.read(rowKey);
176       *         }
177       *       };
178       *   ListenableFuture<QueryResult> queryFuture =
179       *       transform(rowKeyFuture, queryFunction);
180       * }</pre>
181       *
182       * <p>Note: This overload of {@code transform} is designed for cases in which
183       * the work of creating the derived {@code Future} is fast and lightweight,
184       * as the method does not accept an {@code Executor} in which to perform the
185       * the work. (The created {@code Future} itself need not complete quickly.)
186       * For heavier operations, this overload carries some caveats: First, the
187       * thread that {@code function.apply} runs in depends on whether the input
188       * {@code Future} is done at the time {@code transform} is called. In
189       * particular, if called late, {@code transform} will run the operation in
190       * the thread that called {@code transform}.  Second, {@code function.apply}
191       * may run in an internal thread of the system responsible for the input
192       * {@code Future}, such as an RPC network thread.  Finally, during the
193       * execution of a {@code sameThreadExecutor} {@code function.apply}, all
194       * other registered but unexecuted listeners are prevented from running, even
195       * if those listeners are to run in other executors.
196       *
197       * <p>The returned {@code Future} attempts to keep its cancellation state in
198       * sync with that of the input future and that of the future returned by the
199       * function. That is, if the returned {@code Future} is cancelled, it will
200       * attempt to cancel the other two, and if either of the other two is
201       * cancelled, the returned {@code Future} will receive a callback in which it
202       * will attempt to cancel itself.
203       *
204       * @param input The future to transform
205       * @param function A function to transform the result of the input future
206       *     to the result of the output future
207       * @return A future that holds result of the function (if the input succeeded)
208       *     or the original input's failure (if not)
209       * @since 11.0
210       */
211      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
212          AsyncFunction<? super I, ? extends O> function) {
213        return transform(input, function, MoreExecutors.sameThreadExecutor());
214      }
215    
216      /**
217       * Returns a new {@code ListenableFuture} whose result is asynchronously
218       * derived from the result of the given {@code Future}. More precisely, the
219       * returned {@code Future} takes its result from a {@code Future} produced by
220       * applying the given {@code AsyncFunction} to the result of the original
221       * {@code Future}. Example:
222       *
223       * <pre>   {@code
224       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
225       *   AsyncFunction<RowKey, QueryResult> queryFunction =
226       *       new AsyncFunction<RowKey, QueryResult>() {
227       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
228       *           return dataService.read(rowKey);
229       *         }
230       *       };
231       *   ListenableFuture<QueryResult> queryFuture =
232       *       transform(rowKeyFuture, queryFunction, executor);
233       * }</pre>
234       *
235       * <p>The returned {@code Future} attempts to keep its cancellation state in
236       * sync with that of the input future and that of the future returned by the
237       * chain function. That is, if the returned {@code Future} is cancelled, it
238       * will attempt to cancel the other two, and if either of the other two is
239       * cancelled, the returned {@code Future} will receive a callback in which it
240       * will attempt to cancel itself.
241       *
242       * <p>Note: For cases in which the work of creating the derived future is
243       * fast and lightweight, consider {@linkplain
244       * Futures#transform(ListenableFuture, Function) the other overload} or
245       * explicit use of {@code sameThreadExecutor}. For heavier derivations, this
246       * choice carries some caveats: First, the thread that {@code function.apply}
247       * runs in depends on whether the input {@code Future} is done at the time
248       * {@code transform} is called. In particular, if called late, {@code
249       * transform} will run the operation in the thread that called {@code
250       * transform}.  Second, {@code function.apply} may run in an internal thread
251       * of the system responsible for the input {@code Future}, such as an RPC
252       * network thread.  Finally, during the execution of a {@code
253       * sameThreadExecutor} {@code function.apply}, all other registered but
254       * unexecuted listeners are prevented from running, even if those listeners
255       * are to run in other executors.
256       *
257       * @param input The future to transform
258       * @param function A function to transform the result of the input future
259       *     to the result of the output future
260       * @param executor Executor to run the function in.
261       * @return A future that holds result of the function (if the input succeeded)
262       *     or the original input's failure (if not)
263       * @since 11.0
264       */
265      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
266          AsyncFunction<? super I, ? extends O> function,
267          Executor executor) {
268        ChainingListenableFuture<I, O> output =
269            new ChainingListenableFuture<I, O>(function, input);
270        input.addListener(output, executor);
271        return output;
272      }
273    
274      /**
275       * Returns a new {@code ListenableFuture} whose result is the product of
276       * applying the given {@code Function} to the result of the given {@code
277       * Future}. Example:
278       *
279       * <pre>   {@code
280       *   ListenableFuture<QueryResult> queryFuture = ...;
281       *   Function<QueryResult, List<Row>> rowsFunction =
282       *       new Function<QueryResult, List<Row>>() {
283       *         public List<Row> apply(QueryResult queryResult) {
284       *           return queryResult.getRows();
285       *         }
286       *       };
287       *   ListenableFuture<List<Row>> rowsFuture =
288       *       transform(queryFuture, rowsFunction);
289       * }</pre>
290       *
291       * <p>Note: This overload of {@code transform} is designed for cases in which
292       * the transformation is fast and lightweight, as the method does not accept
293       * an {@code Executor} in which to perform the the work. For heavier
294       * transformations, this overload carries some caveats: First, the thread
295       * that the transformation runs in depends on whether the input {@code
296       * Future} is done at the time {@code transform} is called. In particular, if
297       * called late, {@code transform} will perform the transformation in the
298       * thread that called {@code transform}. Second, transformations may run in
299       * an internal thread of the system responsible for the input {@code Future},
300       * such as an RPC network thread. Finally, during the execution of a {@code
301       * sameThreadExecutor} transformation, all other registered but unexecuted
302       * listeners are prevented from running, even if those listeners are to run
303       * in other executors.
304       *
305       * <p>The returned {@code Future} attempts to keep its cancellation state in
306       * sync with that of the input future. That is, if the returned {@code Future}
307       * is cancelled, it will attempt to cancel the input, and if the input is
308       * cancelled, the returned {@code Future} will receive a callback in which it
309       * will attempt to cancel itself.
310       *
311       * <p>An example use of this method is to convert a serializable object
312       * returned from an RPC into a POJO.
313       *
314       * @param input The future to transform
315       * @param function A Function to transform the results of the provided future
316       *     to the results of the returned future.  This will be run in the thread
317       *     that notifies input it is complete.
318       * @return A future that holds result of the transformation.
319       * @since 9.0 (in 1.0 as {@code compose})
320       */
321      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
322          final Function<? super I, ? extends O> function) {
323        return transform(input, function, MoreExecutors.sameThreadExecutor());
324      }
325    
326      /**
327       * Returns a new {@code ListenableFuture} whose result is the product of
328       * applying the given {@code Function} to the result of the given {@code
329       * Future}. Example:
330       *
331       * <pre>   {@code
332       *   ListenableFuture<QueryResult> queryFuture = ...;
333       *   Function<QueryResult, List<Row>> rowsFunction =
334       *       new Function<QueryResult, List<Row>>() {
335       *         public List<Row> apply(QueryResult queryResult) {
336       *           return queryResult.getRows();
337       *         }
338       *       };
339       *   ListenableFuture<List<Row>> rowsFuture =
340       *       transform(queryFuture, rowsFunction, executor);
341       * }</pre>
342       *
343       * <p>The returned {@code Future} attempts to keep its cancellation state in
344       * sync with that of the input future. That is, if the returned {@code Future}
345       * is cancelled, it will attempt to cancel the input, and if the input is
346       * cancelled, the returned {@code Future} will receive a callback in which it
347       * will attempt to cancel itself.
348       *
349       * <p>An example use of this method is to convert a serializable object
350       * returned from an RPC into a POJO.
351       *
352       * <p>Note: For cases in which the transformation is fast and lightweight,
353       * consider {@linkplain Futures#transform(ListenableFuture, Function) the
354       * other overload} or explicit use of {@link
355       * MoreExecutors#sameThreadExecutor}. For heavier transformations, this
356       * choice carries some caveats: First, the thread that the transformation
357       * runs in depends on whether the input {@code Future} is done at the time
358       * {@code transform} is called. In particular, if called late, {@code
359       * transform} will perform the transformation in the thread that called
360       * {@code transform}.  Second, transformations may run in an internal thread
361       * of the system responsible for the input {@code Future}, such as an RPC
362       * network thread.  Finally, during the execution of a {@code
363       * sameThreadExecutor} transformation, all other registered but unexecuted
364       * listeners are prevented from running, even if those listeners are to run
365       * in other executors.
366       *
367       * @param input The future to transform
368       * @param function A Function to transform the results of the provided future
369       *     to the results of the returned future.
370       * @param executor Executor to run the function in.
371       * @return A future that holds result of the transformation.
372       * @since 9.0 (in 2.0 as {@code compose})
373       */
374      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
375          final Function<? super I, ? extends O> function, Executor executor) {
376        checkNotNull(function);
377        AsyncFunction<I, O> wrapperFunction
378            = new AsyncFunction<I, O>() {
379                @Override public ListenableFuture<O> apply(I input) {
380                  O output = function.apply(input);
381                  return immediateFuture(output);
382                }
383            };
384        return transform(input, wrapperFunction, executor);
385      }
386    
387      /**
388       * Like {@link #transform(ListenableFuture, Function)} except that the
389       * transformation {@code function} is invoked on each call to
390       * {@link Future#get() get()} on the returned future.
391       *
392       * <p>The returned {@code Future} reflects the input's cancellation
393       * state directly, and any attempt to cancel the returned Future is likewise
394       * passed through to the input Future.
395       *
396       * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
397       * only apply the timeout to the execution of the underlying {@code Future},
398       * <em>not</em> to the execution of the transformation function.
399       *
400       * <p>The primary audience of this method is callers of {@code transform}
401       * who don't have a {@code ListenableFuture} available and
402       * do not mind repeated, lazy function evaluation.
403       *
404       * @param input The future to transform
405       * @param function A Function to transform the results of the provided future
406       *     to the results of the returned future.
407       * @return A future that returns the result of the transformation.
408       * @since 10.0
409       */
410      @Beta
411      public static <I, O> Future<O> lazyTransform(final Future<I> input,
412          final Function<? super I, ? extends O> function) {
413        checkNotNull(input);
414        checkNotNull(function);
415        return new Future<O>() {
416    
417          @Override
418          public boolean cancel(boolean mayInterruptIfRunning) {
419            return input.cancel(mayInterruptIfRunning);
420          }
421    
422          @Override
423          public boolean isCancelled() {
424            return input.isCancelled();
425          }
426    
427          @Override
428          public boolean isDone() {
429            return input.isDone();
430          }
431    
432          @Override
433          public O get() throws InterruptedException, ExecutionException {
434            return applyTransformation(input.get());
435          }
436    
437          @Override
438          public O get(long timeout, TimeUnit unit)
439              throws InterruptedException, ExecutionException, TimeoutException {
440            return applyTransformation(input.get(timeout, unit));
441          }
442    
443          private O applyTransformation(I input) throws ExecutionException {
444            try {
445              return function.apply(input);
446            } catch (Throwable t) {
447              throw new ExecutionException(t);
448            }
449          }
450        };
451      }
452    
453      /**
454       * An implementation of {@code ListenableFuture} that also implements
455       * {@code Runnable} so that it can be used to nest ListenableFutures.
456       * Once the passed-in {@code ListenableFuture} is complete, it calls the
457       * passed-in {@code Function} to generate the result.
458       *
459       * <p>If the function throws any checked exceptions, they should be wrapped
460       * in a {@code UndeclaredThrowableException} so that this class can get
461       * access to the cause.
462       */
463      private static class ChainingListenableFuture<I, O>
464          extends AbstractFuture<O> implements Runnable {
465    
466        private AsyncFunction<? super I, ? extends O> function;
467        private ListenableFuture<? extends I> inputFuture;
468        private volatile ListenableFuture<? extends O> outputFuture;
469        private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
470            new LinkedBlockingQueue<Boolean>(1);
471        private final CountDownLatch outputCreated = new CountDownLatch(1);
472    
473        private ChainingListenableFuture(
474            AsyncFunction<? super I, ? extends O> function,
475            ListenableFuture<? extends I> inputFuture) {
476          this.function = checkNotNull(function);
477          this.inputFuture = checkNotNull(inputFuture);
478        }
479    
480        @Override
481        public boolean cancel(boolean mayInterruptIfRunning) {
482          /*
483           * Our additional cancellation work needs to occur even if
484           * !mayInterruptIfRunning, so we can't move it into interruptTask().
485           */
486          if (super.cancel(mayInterruptIfRunning)) {
487            // This should never block since only one thread is allowed to cancel
488            // this Future.
489            putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
490            cancel(inputFuture, mayInterruptIfRunning);
491            cancel(outputFuture, mayInterruptIfRunning);
492            return true;
493          }
494          return false;
495        }
496    
497        private void cancel(@Nullable Future<?> future,
498            boolean mayInterruptIfRunning) {
499          if (future != null) {
500            future.cancel(mayInterruptIfRunning);
501          }
502        }
503    
504        @Override
505        public void run() {
506          try {
507            I sourceResult;
508            try {
509              sourceResult = getUninterruptibly(inputFuture);
510            } catch (CancellationException e) {
511              // Cancel this future and return.
512              // At this point, inputFuture is cancelled and outputFuture doesn't
513              // exist, so the value of mayInterruptIfRunning is irrelevant.
514              cancel(false);
515              return;
516            } catch (ExecutionException e) {
517              // Set the cause of the exception as this future's exception
518              setException(e.getCause());
519              return;
520            }
521    
522            final ListenableFuture<? extends O> outputFuture = this.outputFuture =
523                function.apply(sourceResult);
524            if (isCancelled()) {
525              // Handles the case where cancel was called while the function was
526              // being applied.
527              // There is a gap in cancel(boolean) between calling sync.cancel()
528              // and storing the value of mayInterruptIfRunning, so this thread
529              // needs to block, waiting for that value.
530              outputFuture.cancel(
531                  takeUninterruptibly(mayInterruptIfRunningChannel));
532              this.outputFuture = null;
533              return;
534            }
535            outputFuture.addListener(new Runnable() {
536                @Override
537                public void run() {
538                  try {
539                    // Here it would have been nice to have had an
540                    // UninterruptibleListenableFuture, but we don't want to start a
541                    // combinatorial explosion of interfaces, so we have to make do.
542                    set(getUninterruptibly(outputFuture));
543                  } catch (CancellationException e) {
544                    // Cancel this future and return.
545                    // At this point, inputFuture and outputFuture are done, so the
546                    // value of mayInterruptIfRunning is irrelevant.
547                    cancel(false);
548                    return;
549                  } catch (ExecutionException e) {
550                    // Set the cause of the exception as this future's exception
551                    setException(e.getCause());
552                  } finally {
553                    // Don't pin inputs beyond completion
554                    ChainingListenableFuture.this.outputFuture = null;
555                  }
556                }
557              }, MoreExecutors.sameThreadExecutor());
558          } catch (UndeclaredThrowableException e) {
559            // Set the cause of the exception as this future's exception
560            setException(e.getCause());
561          } catch (Exception e) {
562            // This exception is irrelevant in this thread, but useful for the
563            // client
564            setException(e);
565          } catch (Error e) {
566            // Propagate errors up ASAP - our superclass will rethrow the error
567            setException(e);
568          } finally {
569            // Don't pin inputs beyond completion
570            function = null;
571            inputFuture = null;
572            // Allow our get routines to examine outputFuture now.
573            outputCreated.countDown();
574          }
575        }
576      }
577    
578      /**
579       * Creates a new {@code ListenableFuture} whose value is a list containing the
580       * values of all its input futures, if all succeed. If any input fails, the
581       * returned future fails.
582       *
583       * <p>The list of results is in the same order as the input list.
584       *
585       * <p>Canceling this future does not cancel any of the component futures;
586       * however, if any of the provided futures fails or is canceled, this one is,
587       * too.
588       *
589       * @param futures futures to combine
590       * @return a future that provides a list of the results of the component
591       *         futures
592       * @since 10.0
593       */
594      @Beta
595      public static <V> ListenableFuture<List<V>> allAsList(
596          ListenableFuture<? extends V>... futures) {
597        return new ListFuture<V>(ImmutableList.copyOf(futures), true,
598            MoreExecutors.sameThreadExecutor());
599      }
600    
601      /**
602       * Creates a new {@code ListenableFuture} whose value is a list containing the
603       * values of all its input futures, if all succeed. If any input fails, the
604       * returned future fails.
605       *
606       * <p>The list of results is in the same order as the input list.
607       *
608       * <p>Canceling this future does not cancel any of the component futures;
609       * however, if any of the provided futures fails or is canceled, this one is,
610       * too.
611       *
612       * @param futures futures to combine
613       * @return a future that provides a list of the results of the component
614       *         futures
615       * @since 10.0
616       */
617      @Beta
618      public static <V> ListenableFuture<List<V>> allAsList(
619          Iterable<? extends ListenableFuture<? extends V>> futures) {
620        return new ListFuture<V>(ImmutableList.copyOf(futures), true,
621            MoreExecutors.sameThreadExecutor());
622      }
623    
624      /**
625       * Creates a new {@code ListenableFuture} whose value is a list containing the
626       * values of all its successful input futures. The list of results is in the
627       * same order as the input list, and if any of the provided futures fails or
628       * is canceled, its corresponding position will contain {@code null} (which is
629       * indistinguishable from the future having a successful value of
630       * {@code null}).
631       *
632       * @param futures futures to combine
633       * @return a future that provides a list of the results of the component
634       *         futures
635       * @since 10.0
636       */
637      @Beta
638      public static <V> ListenableFuture<List<V>> successfulAsList(
639          ListenableFuture<? extends V>... futures) {
640        return new ListFuture<V>(ImmutableList.copyOf(futures), false,
641            MoreExecutors.sameThreadExecutor());
642      }
643    
644      /**
645       * Creates a new {@code ListenableFuture} whose value is a list containing the
646       * values of all its successful input futures. The list of results is in the
647       * same order as the input list, and if any of the provided futures fails or
648       * is canceled, its corresponding position will contain {@code null} (which is
649       * indistinguishable from the future having a successful value of
650       * {@code null}).
651       *
652       * @param futures futures to combine
653       * @return a future that provides a list of the results of the component
654       *         futures
655       * @since 10.0
656       */
657      @Beta
658      public static <V> ListenableFuture<List<V>> successfulAsList(
659          Iterable<? extends ListenableFuture<? extends V>> futures) {
660        return new ListFuture<V>(ImmutableList.copyOf(futures), false,
661            MoreExecutors.sameThreadExecutor());
662      }
663    
664      /**
665       * Registers separate success and failure callbacks to be run when the {@code
666       * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
667       * complete} or, if the computation is already complete, immediately.
668       *
669       * <p>There is no guaranteed ordering of execution of callbacks, but any
670       * callback added through this method is guaranteed to be called once the
671       * computation is complete.
672       *
673       * Example: <pre> {@code
674       * ListenableFuture<QueryResult> future = ...;
675       * addCallback(future,
676       *     new FutureCallback<QueryResult> {
677       *       public void onSuccess(QueryResult result) {
678       *         storeInCache(result);
679       *       }
680       *       public void onFailure(Throwable t) {
681       *         reportError(t);
682       *       }
683       *     });}</pre>
684       *
685       * <p>Note: This overload of {@code addCallback} is designed for cases in
686       * which the callack is fast and lightweight, as the method does not accept
687       * an {@code Executor} in which to perform the the work. For heavier
688       * callbacks, this overload carries some caveats: First, the thread that the
689       * callback runs in depends on whether the input {@code Future} is done at the
690       * time {@code addCallback} is called and on whether the input {@code Future}
691       * is ever cancelled. In particular, {@code addCallback} may execute the
692       * callback in the thread that calls {@code addCallback} or {@code
693       * Future.cancel}. Second, callbacks may run in an internal thread of the
694       * system responsible for the input {@code Future}, such as an RPC network
695       * thread. Finally, during the execution of a {@code sameThreadExecutor}
696       * callback, all other registered but unexecuted listeners are prevented from
697       * running, even if those listeners are to run in other executors.
698       *
699       * <p>For a more general interface to attach a completion listener to a
700       * {@code Future}, see {@link ListenableFuture#addListener addListener}.
701       *
702       * @param future The future attach the callback to.
703       * @param callback The callback to invoke when {@code future} is completed.
704       * @since 10.0
705       */
706      public static <V> void addCallback(ListenableFuture<V> future,
707          FutureCallback<? super V> callback) {
708        addCallback(future, callback, MoreExecutors.sameThreadExecutor());
709      }
710    
711      /**
712       * Registers separate success and failure callbacks to be run when the {@code
713       * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
714       * complete} or, if the computation is already complete, immediately.
715       *
716       * <p>The callback is run in {@code executor}.
717       * There is no guaranteed ordering of execution of callbacks, but any
718       * callback added through this method is guaranteed to be called once the
719       * computation is complete.
720       *
721       * Example: <pre> {@code
722       * ListenableFuture<QueryResult> future = ...;
723       * Executor e = ...
724       * addCallback(future, e,
725       *     new FutureCallback<QueryResult> {
726       *       public void onSuccess(QueryResult result) {
727       *         storeInCache(result);
728       *       }
729       *       public void onFailure(Throwable t) {
730       *         reportError(t);
731       *       }
732       *     });}</pre>
733       *
734       * When the callback is fast and lightweight consider {@linkplain
735       * Futures#addCallback(ListenableFuture, FutureCallback) the other overload}
736       * or explicit use of {@link MoreExecutors#sameThreadExecutor
737       * sameThreadExecutor}. For heavier callbacks, this choice carries some
738       * caveats: First, the thread that the callback runs in depends on whether
739       * the input {@code Future} is done at the time {@code addCallback} is called
740       * and on whether the input {@code Future} is ever cancelled. In particular,
741       * {@code addCallback} may execute the callback in the thread that calls
742       * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in
743       * an internal thread of the system responsible for the input {@code Future},
744       * such as an RPC network thread. Finally, during the execution of a {@code
745       * sameThreadExecutor} callback, all other registered but unexecuted
746       * listeners are prevented from running, even if those listeners are to run
747       * in other executors.
748       *
749       * <p>For a more general interface to attach a completion listener to a
750       * {@code Future}, see {@link ListenableFuture#addListener addListener}.
751       *
752       * @param future The future attach the callback to.
753       * @param callback The callback to invoke when {@code future} is completed.
754       * @param executor The executor to run {@code callback} when the future
755       *    completes.
756       * @since 10.0
757       */
758      public static <V> void addCallback(final ListenableFuture<V> future,
759          final FutureCallback<? super V> callback, Executor executor) {
760        Preconditions.checkNotNull(callback);
761        Runnable callbackListener = new Runnable() {
762          @Override
763          public void run() {
764            try {
765              // TODO(user): (Before Guava release), validate that this
766              // is the thing for IE.
767              V value = getUninterruptibly(future);
768              callback.onSuccess(value);
769            } catch (ExecutionException e) {
770              callback.onFailure(e.getCause());
771            } catch (RuntimeException e) {
772              callback.onFailure(e);
773            } catch (Error e) {
774              callback.onFailure(e);
775            }
776          }
777        };
778        future.addListener(callbackListener, executor);
779      }
780    
781      /**
782       * Returns the result of {@link Future#get()}, converting most exceptions to a
783       * new instance of the given checked exception type. This reduces boilerplate
784       * for a common use of {@code Future} in which it is unnecessary to
785       * programmatically distinguish between exception types or to extract other
786       * information from the exception instance.
787       *
788       * <p>Exceptions from {@code Future.get} are treated as follows:
789       * <ul>
790       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
791       *     {@code X} if the cause is a checked exception, an {@link
792       *     UncheckedExecutionException} if the cause is a {@code
793       *     RuntimeException}, or an {@link ExecutionError} if the cause is an
794       *     {@code Error}.
795       * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
796       *     restoring the interrupt).
797       * <li>Any {@link CancellationException} is propagated untouched, as is any
798       *     other {@link RuntimeException} (though {@code get} implementations are
799       *     discouraged from throwing such exceptions).
800       * </ul>
801       *
802       * The overall principle is to continue to treat every checked exception as a
803       * checked exception, every unchecked exception as an unchecked exception, and
804       * every error as an error. In addition, the cause of any {@code
805       * ExecutionException} is wrapped in order to ensure that the new stack trace
806       * matches that of the current thread.
807       *
808       * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
809       * public constructor that accepts zero or more arguments, all of type {@code
810       * String} or {@code Throwable} (preferring constructors with at least one
811       * {@code String}) and calling the constructor via reflection. If the
812       * exception did not already have a cause, one is set by calling {@link
813       * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
814       * {@code IllegalArgumentException} is thrown.
815       *
816       * @throws X if {@code get} throws any checked exception except for an {@code
817       *         ExecutionException} whose cause is not itself a checked exception
818       * @throws UncheckedExecutionException if {@code get} throws an {@code
819       *         ExecutionException} with a {@code RuntimeException} as its cause
820       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
821       *         with an {@code Error} as its cause
822       * @throws CancellationException if {@code get} throws a {@code
823       *         CancellationException}
824       * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
825       *         RuntimeException} or does not have a suitable constructor
826       * @since 10.0
827       */
828      @Beta
829      public static <V, X extends Exception> V get(
830          Future<V> future, Class<X> exceptionClass) throws X {
831        checkNotNull(future);
832        checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
833            "Futures.get exception type (%s) must not be a RuntimeException",
834            exceptionClass);
835        try {
836          return future.get();
837        } catch (InterruptedException e) {
838          currentThread().interrupt();
839          throw newWithCause(exceptionClass, e);
840        } catch (ExecutionException e) {
841          wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
842          throw new AssertionError();
843        }
844      }
845    
846      /**
847       * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
848       * exceptions to a new instance of the given checked exception type. This
849       * reduces boilerplate for a common use of {@code Future} in which it is
850       * unnecessary to programmatically distinguish between exception types or to
851       * extract other information from the exception instance.
852       *
853       * <p>Exceptions from {@code Future.get} are treated as follows:
854       * <ul>
855       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
856       *     {@code X} if the cause is a checked exception, an {@link
857       *     UncheckedExecutionException} if the cause is a {@code
858       *     RuntimeException}, or an {@link ExecutionError} if the cause is an
859       *     {@code Error}.
860       * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
861       *     restoring the interrupt).
862       * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
863       * <li>Any {@link CancellationException} is propagated untouched, as is any
864       *     other {@link RuntimeException} (though {@code get} implementations are
865       *     discouraged from throwing such exceptions).
866       * </ul>
867       *
868       * The overall principle is to continue to treat every checked exception as a
869       * checked exception, every unchecked exception as an unchecked exception, and
870       * every error as an error. In addition, the cause of any {@code
871       * ExecutionException} is wrapped in order to ensure that the new stack trace
872       * matches that of the current thread.
873       *
874       * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
875       * public constructor that accepts zero or more arguments, all of type {@code
876       * String} or {@code Throwable} (preferring constructors with at least one
877       * {@code String}) and calling the constructor via reflection. If the
878       * exception did not already have a cause, one is set by calling {@link
879       * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
880       * {@code IllegalArgumentException} is thrown.
881       *
882       * @throws X if {@code get} throws any checked exception except for an {@code
883       *         ExecutionException} whose cause is not itself a checked exception
884       * @throws UncheckedExecutionException if {@code get} throws an {@code
885       *         ExecutionException} with a {@code RuntimeException} as its cause
886       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
887       *         with an {@code Error} as its cause
888       * @throws CancellationException if {@code get} throws a {@code
889       *         CancellationException}
890       * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
891       *         RuntimeException} or does not have a suitable constructor
892       * @since 10.0
893       */
894      @Beta
895      public static <V, X extends Exception> V get(
896          Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
897          throws X {
898        checkNotNull(future);
899        checkNotNull(unit);
900        checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
901            "Futures.get exception type (%s) must not be a RuntimeException",
902            exceptionClass);
903        try {
904          return future.get(timeout, unit);
905        } catch (InterruptedException e) {
906          currentThread().interrupt();
907          throw newWithCause(exceptionClass, e);
908        } catch (TimeoutException e) {
909          throw newWithCause(exceptionClass, e);
910        } catch (ExecutionException e) {
911          wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
912          throw new AssertionError();
913        }
914      }
915    
916      private static <X extends Exception> void wrapAndThrowExceptionOrError(
917          Throwable cause, Class<X> exceptionClass) throws X {
918        if (cause instanceof Error) {
919          throw new ExecutionError((Error) cause);
920        }
921        if (cause instanceof RuntimeException) {
922          throw new UncheckedExecutionException(cause);
923        }
924        throw newWithCause(exceptionClass, cause);
925      }
926    
927      /**
928       * Returns the result of calling {@link Future#get()} uninterruptibly on a
929       * task known not to throw a checked exception. This makes {@code Future} more
930       * suitable for lightweight, fast-running tasks that, barring bugs in the
931       * code, will not fail. This gives it exception-handling behavior similar to
932       * that of {@code ForkJoinTask.join}.
933       *
934       * <p>Exceptions from {@code Future.get} are treated as follows:
935       * <ul>
936       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
937       *     {@link UncheckedExecutionException} (if the cause is an {@code
938       *     Exception}) or {@link ExecutionError} (if the cause is an {@code
939       *     Error}).
940       * <li>Any {@link InterruptedException} causes a retry of the {@code get}
941       *     call. The interrupt is restored before {@code getUnchecked} returns.
942       * <li>Any {@link CancellationException} is propagated untouched. So is any
943       *     other {@link RuntimeException} ({@code get} implementations are
944       *     discouraged from throwing such exceptions).
945       * </ul>
946       *
947       * The overall principle is to eliminate all checked exceptions: to loop to
948       * avoid {@code InterruptedException}, to pass through {@code
949       * CancellationException}, and to wrap any exception from the underlying
950       * computation in an {@code UncheckedExecutionException} or {@code
951       * ExecutionError}.
952       *
953       * <p>For an uninterruptible {@code get} that preserves other exceptions, see
954       * {@link Uninterruptibles#getUninterruptibly(Future)}.
955       *
956       * @throws UncheckedExecutionException if {@code get} throws an {@code
957       *         ExecutionException} with an {@code Exception} as its cause
958       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
959       *         with an {@code Error} as its cause
960       * @throws CancellationException if {@code get} throws a {@code
961       *         CancellationException}
962       * @since 10.0
963       */
964      @Beta
965      public static <V> V getUnchecked(Future<V> future) {
966        checkNotNull(future);
967        try {
968          return getUninterruptibly(future);
969        } catch (ExecutionException e) {
970          wrapAndThrowUnchecked(e.getCause());
971          throw new AssertionError();
972        }
973      }
974    
975      private static void wrapAndThrowUnchecked(Throwable cause) {
976        if (cause instanceof Error) {
977          throw new ExecutionError((Error) cause);
978        }
979        /*
980         * It's a non-Error, non-Exception Throwable. From my survey of such
981         * classes, I believe that most users intended to extend Exception, so we'll
982         * treat it like an Exception.
983         */
984        throw new UncheckedExecutionException(cause);
985      }
986    
987      /*
988       * TODO(user): FutureChecker interface for these to be static methods on? If
989       * so, refer to it in the (static-method) Futures.get documentation
990       */
991    
992      /*
993       * Arguably we don't need a timed getUnchecked because any operation slow
994       * enough to require a timeout is heavyweight enough to throw a checked
995       * exception and therefore be inappropriate to use with getUnchecked. Further,
996       * it's not clear that converting the checked TimeoutException to a
997       * RuntimeException -- especially to an UncheckedExecutionException, since it
998       * wasn't thrown by the computation -- makes sense, and if we don't convert
999       * it, the user still has to write a try-catch block.
1000       *
1001       * If you think you would use this method, let us know.
1002       */
1003    
1004      private static <X extends Exception> X newWithCause(
1005          Class<X> exceptionClass, Throwable cause) {
1006        // getConstructors() guarantees this as long as we don't modify the array.
1007        @SuppressWarnings("unchecked")
1008        List<Constructor<X>> constructors =
1009            (List) Arrays.asList(exceptionClass.getConstructors());
1010        for (Constructor<X> constructor : preferringStrings(constructors)) {
1011          @Nullable X instance = newFromConstructor(constructor, cause);
1012          if (instance != null) {
1013            if (instance.getCause() == null) {
1014              instance.initCause(cause);
1015            }
1016            return instance;
1017          }
1018        }
1019        throw new IllegalArgumentException(
1020            "No appropriate constructor for exception of type " + exceptionClass
1021                + " in response to chained exception", cause);
1022      }
1023    
1024      private static <X extends Exception> List<Constructor<X>>
1025          preferringStrings(List<Constructor<X>> constructors) {
1026        return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1027      }
1028    
1029      private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1030          Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1031            @Override public Boolean apply(Constructor<?> input) {
1032              return asList(input.getParameterTypes()).contains(String.class);
1033            }
1034          }).reverse();
1035    
1036      @Nullable private static <X> X newFromConstructor(
1037          Constructor<X> constructor, Throwable cause) {
1038        Class<?>[] paramTypes = constructor.getParameterTypes();
1039        Object[] params = new Object[paramTypes.length];
1040        for (int i = 0; i < paramTypes.length; i++) {
1041          Class<?> paramType = paramTypes[i];
1042          if (paramType.equals(String.class)) {
1043            params[i] = cause.toString();
1044          } else if (paramType.equals(Throwable.class)) {
1045            params[i] = cause;
1046          } else {
1047            return null;
1048          }
1049        }
1050        try {
1051          return constructor.newInstance(params);
1052        } catch (IllegalArgumentException e) {
1053          return null;
1054        } catch (InstantiationException e) {
1055          return null;
1056        } catch (IllegalAccessException e) {
1057          return null;
1058        } catch (InvocationTargetException e) {
1059          return null;
1060        }
1061      }
1062    
1063      /**
1064       * Class that implements {@link #allAsList} and {@link #successfulAsList}.
1065       * The idea is to create a (null-filled) List and register a listener with
1066       * each component future to fill out the value in the List when that future
1067       * completes.
1068       */
1069      private static class ListFuture<V> extends AbstractFuture<List<V>> {
1070        ImmutableList<? extends ListenableFuture<? extends V>> futures;
1071        final boolean allMustSucceed;
1072        final AtomicInteger remaining;
1073        List<V> values;
1074    
1075        /**
1076         * Constructor.
1077         *
1078         * @param futures all the futures to build the list from
1079         * @param allMustSucceed whether a single failure or cancellation should
1080         *        propagate to this future
1081         * @param listenerExecutor used to run listeners on all the passed in
1082         *        futures.
1083         */
1084        ListFuture(
1085            final ImmutableList<? extends ListenableFuture<? extends V>> futures,
1086            final boolean allMustSucceed, final Executor listenerExecutor) {
1087          this.futures = futures;
1088          this.values = Lists.newArrayListWithCapacity(futures.size());
1089          this.allMustSucceed = allMustSucceed;
1090          this.remaining = new AtomicInteger(futures.size());
1091    
1092          init(listenerExecutor);
1093        }
1094    
1095        private void init(final Executor listenerExecutor) {
1096          // First, schedule cleanup to execute when the Future is done.
1097          addListener(new Runnable() {
1098            @Override
1099            public void run() {
1100              // By now the values array has either been set as the Future's value,
1101              // or (in case of failure) is no longer useful.
1102              ListFuture.this.values = null;
1103    
1104              // Let go of the memory held by other futures
1105              ListFuture.this.futures = null;
1106            }
1107          }, MoreExecutors.sameThreadExecutor());
1108    
1109          // Now begin the "real" initialization.
1110    
1111          // Corner case: List is empty.
1112          if (futures.isEmpty()) {
1113            set(Lists.newArrayList(values));
1114            return;
1115          }
1116    
1117          // Populate the results list with null initially.
1118          for (int i = 0; i < futures.size(); ++i) {
1119            values.add(null);
1120          }
1121    
1122          // Register a listener on each Future in the list to update
1123          // the state of this future.
1124          // Note that if all the futures on the list are done prior to completing
1125          // this loop, the last call to addListener() will callback to
1126          // setOneValue(), transitively call our cleanup listener, and set
1127          // this.futures to null.
1128          // We store a reference to futures to avoid the NPE.
1129          ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures;
1130          for (int i = 0; i < localFutures.size(); i++) {
1131            final ListenableFuture<? extends V> listenable = localFutures.get(i);
1132            final int index = i;
1133            listenable.addListener(new Runnable() {
1134              @Override
1135              public void run() {
1136                setOneValue(index, listenable);
1137              }
1138            }, listenerExecutor);
1139          }
1140        }
1141    
1142        /**
1143         * Sets the value at the given index to that of the given future.
1144         */
1145        private void setOneValue(int index, Future<? extends V> future) {
1146          List<V> localValues = values;
1147          if (isDone() || localValues == null) {
1148            // Some other future failed or has been cancelled, causing this one to
1149            // also be cancelled or have an exception set. This should only happen
1150            // if allMustSucceed is true.
1151            checkState(allMustSucceed,
1152                "Future was done before all dependencies completed");
1153            return;
1154          }
1155    
1156          try {
1157            checkState(future.isDone(),
1158                "Tried to set value from future which is not done");
1159            localValues.set(index, getUninterruptibly(future));
1160          } catch (CancellationException e) {
1161            if (allMustSucceed) {
1162              // Set ourselves as cancelled. Let the input futures keep running
1163              // as some of them may be used elsewhere.
1164              // (Currently we don't override interruptTask, so
1165              // mayInterruptIfRunning==false isn't technically necessary.)
1166              cancel(false);
1167            }
1168          } catch (ExecutionException e) {
1169            if (allMustSucceed) {
1170              // As soon as the first one fails, throw the exception up.
1171              // The result of all other inputs is then ignored.
1172              setException(e.getCause());
1173            }
1174          } catch (RuntimeException e) {
1175            if (allMustSucceed) {
1176              setException(e);
1177            }
1178          } catch (Error e) {
1179            // Propagate errors up ASAP - our superclass will rethrow the error
1180            setException(e);
1181          } finally {
1182            int newRemaining = remaining.decrementAndGet();
1183            checkState(newRemaining >= 0, "Less than 0 remaining futures");
1184            if (newRemaining == 0) {
1185              localValues = values;
1186              if (localValues != null) {
1187                set(Lists.newArrayList(localValues));
1188              } else {
1189                checkState(isDone());
1190              }
1191            }
1192          }
1193        }
1194    
1195      }
1196    
1197      /**
1198       * A checked future that uses a function to map from exceptions to the
1199       * appropriate checked type.
1200       */
1201      private static class MappingCheckedFuture<V, X extends Exception> extends
1202          AbstractCheckedFuture<V, X> {
1203    
1204        final Function<Exception, X> mapper;
1205    
1206        MappingCheckedFuture(ListenableFuture<V> delegate,
1207            Function<Exception, X> mapper) {
1208          super(delegate);
1209    
1210          this.mapper = checkNotNull(mapper);
1211        }
1212    
1213        @Override
1214        protected X mapException(Exception e) {
1215          return mapper.apply(e);
1216        }
1217      }
1218    }