001    /*
002     * Copyright (C) 2006 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkArgument;
020    import static com.google.common.base.Preconditions.checkNotNull;
021    import static com.google.common.base.Preconditions.checkState;
022    import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023    import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024    import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
025    import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
026    import static java.lang.Thread.currentThread;
027    import static java.util.Arrays.asList;
028    import static java.util.concurrent.TimeUnit.NANOSECONDS;
029    
030    import com.google.common.annotations.Beta;
031    import com.google.common.base.Function;
032    import com.google.common.base.Preconditions;
033    import com.google.common.collect.ImmutableList;
034    import com.google.common.collect.Lists;
035    import com.google.common.collect.Ordering;
036    
037    import java.lang.reflect.Constructor;
038    import java.lang.reflect.InvocationTargetException;
039    import java.lang.reflect.UndeclaredThrowableException;
040    import java.util.Arrays;
041    import java.util.List;
042    import java.util.concurrent.BlockingQueue;
043    import java.util.concurrent.CancellationException;
044    import java.util.concurrent.CountDownLatch;
045    import java.util.concurrent.ExecutionException;
046    import java.util.concurrent.Executor;
047    import java.util.concurrent.Future;
048    import java.util.concurrent.LinkedBlockingQueue;
049    import java.util.concurrent.TimeUnit;
050    import java.util.concurrent.TimeoutException;
051    import java.util.concurrent.atomic.AtomicInteger;
052    
053    import javax.annotation.Nullable;
054    
055    /**
056     * Static utility methods pertaining to the {@link Future} interface.
057     *
058     * @author Kevin Bourrillion
059     * @author Nishant Thakkar
060     * @author Sven Mawson
061     * @since 1.0
062     */
063    @Beta
064    public final class Futures {
065      private Futures() {}
066    
067      /**
068       * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
069       * and a {@link Function} that maps from {@link Exception} instances into the
070       * appropriate checked type.
071       *
072       * <p>The given mapping function will be applied to an
073       * {@link InterruptedException}, a {@link CancellationException}, or an
074       * {@link ExecutionException} with the actual cause of the exception.
075       * See {@link Future#get()} for details on the exceptions thrown.
076       *
077       * @since 9.0 (source-compatible since 1.0)
078       */
079      public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
080          ListenableFuture<V> future, Function<Exception, X> mapper) {
081        return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
082      }
083    
084      /**
085       * Creates a {@code ListenableFuture} which has its value set immediately upon
086       * construction. The getters just return the value. This {@code Future} can't
087       * be canceled or timed out and its {@code isDone()} method always returns
088       * {@code true}.
089       */
090      public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
091        SettableFuture<V> future = SettableFuture.create();
092        future.set(value);
093        return future;
094      }
095    
096      /**
097       * Returns a {@code CheckedFuture} which has its value set immediately upon
098       * construction.
099       *
100       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
101       * method always returns {@code true}. Calling {@code get()} or {@code
102       * checkedGet()} will immediately return the provided value.
103       */
104      public static <V, X extends Exception> CheckedFuture<V, X>
105          immediateCheckedFuture(@Nullable V value) {
106        SettableFuture<V> future = SettableFuture.create();
107        future.set(value);
108        return Futures.makeChecked(future, new Function<Exception, X>() {
109          @Override
110          public X apply(Exception e) {
111            throw new AssertionError("impossible");
112          }
113        });
114      }
115    
116      /**
117       * Returns a {@code ListenableFuture} which has an exception set immediately
118       * upon construction.
119       *
120       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
121       * method always returns {@code true}. Calling {@code get()} will immediately
122       * throw the provided {@code Throwable} wrapped in an {@code
123       * ExecutionException}.
124       *
125       * @throws Error if the throwable is an {@link Error}.
126       */
127      public static <V> ListenableFuture<V> immediateFailedFuture(
128          Throwable throwable) {
129        checkNotNull(throwable);
130        SettableFuture<V> future = SettableFuture.create();
131        future.setException(throwable);
132        return future;
133      }
134    
135      /**
136       * Returns a {@code CheckedFuture} which has an exception set immediately upon
137       * construction.
138       *
139       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
140       * method always returns {@code true}. Calling {@code get()} will immediately
141       * throw the provided {@code Throwable} wrapped in an {@code
142       * ExecutionException}, and calling {@code checkedGet()} will throw the
143       * provided exception itself.
144       *
145       * @throws Error if the throwable is an {@link Error}.
146       */
147      public static <V, X extends Exception> CheckedFuture<V, X>
148          immediateFailedCheckedFuture(final X exception) {
149        checkNotNull(exception);
150        return makeChecked(Futures.<V>immediateFailedFuture(exception),
151            new Function<Exception, X>() {
152              @Override
153              public X apply(Exception e) {
154                return exception;
155              }
156            });
157      }
158    
159      /**
160       * <p>Returns a new {@code ListenableFuture} whose result is asynchronously
161       * derived from the result of the given {@code Future}. More precisely, the
162       * returned {@code Future} takes its result from a {@code Future} produced by
163       * applying the given {@code Function} to the result of the original {@code
164       * Future}. Example:
165       *
166       * <pre>   {@code
167       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
168       *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
169       *       new Function<RowKey, ListenableFuture<QueryResult>>() {
170       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
171       *           return dataService.read(rowKey);
172       *         }
173       *       };
174       *   ListenableFuture<QueryResult> queryFuture =
175       *       chain(rowKeyFuture, queryFunction);
176       * }</pre>
177       *
178       * <p>Note: This overload of {@code chain} is designed for cases in which the
179       * work of creating the derived future is fast and lightweight, as the method
180       * does not accept an {@code Executor} in which to perform the the work. For
181       * heavier derivations, this overload carries some caveats: First, the thread
182       * that the derivation runs in depends on whether the input {@code Future} is
183       * done at the time {@code chain} is called. In particular, if called late,
184       * {@code chain} will run the derivation in the thread that called {@code
185       * chain}.  Second, derivations may run in an internal thread of the system
186       * responsible for the input {@code Future}, such as an RPC network thread.
187       * Finally, during the execution of a {@code sameThreadExecutor} {@code
188       * chain} function, all other registered but unexecuted listeners are
189       * prevented from running, even if those listeners are to run in other
190       * executors.
191       *
192       * <p>The returned {@code Future} attempts to keep its cancellation state in
193       * sync with that of the input future and that of the future returned by the
194       * chain function. That is, if the returned {@code Future} is cancelled, it
195       * will attempt to cancel the other two, and if either of the other two is
196       * cancelled, the returned {@code Future} will receive a callback in which it
197       * will attempt to cancel itself.
198       *
199       * @param input The future to chain
200       * @param function A function to chain the results of the provided future
201       *     to the results of the returned future.  This will be run in the thread
202       *     that notifies input it is complete.
203       * @return A future that holds result of the chain.
204       * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and
205       *     use {@link #transform(ListenableFuture, AsyncFunction)}. This method is
206       *     scheduled to be removed from Guava in Guava release 12.0.
207       */
208      @Deprecated
209      public static <I, O> ListenableFuture<O> chain(
210          ListenableFuture<I> input,
211          Function<? super I, ? extends ListenableFuture<? extends O>> function) {
212        return chain(input, function, MoreExecutors.sameThreadExecutor());
213      }
214    
215      /**
216       * <p>Returns a new {@code ListenableFuture} whose result is asynchronously
217       * derived from the result of the given {@code Future}. More precisely, the
218       * returned {@code Future} takes its result from a {@code Future} produced by
219       * applying the given {@code Function} to the result of the original {@code
220       * Future}. Example:
221       *
222       * <pre>   {@code
223       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
224       *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
225       *       new Function<RowKey, ListenableFuture<QueryResult>>() {
226       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
227       *           return dataService.read(rowKey);
228       *         }
229       *       };
230       *   ListenableFuture<QueryResult> queryFuture =
231       *       chain(rowKeyFuture, queryFunction, executor);
232       * }</pre>
233       *
234       * <p>The returned {@code Future} attempts to keep its cancellation state in
235       * sync with that of the input future and that of the future returned by the
236       * chain function. That is, if the returned {@code Future} is cancelled, it
237       * will attempt to cancel the other two, and if either of the other two is
238       * cancelled, the returned {@code Future} will receive a callback in which it
239       * will attempt to cancel itself.
240       *
241       * <p>Note: For cases in which the work of creating the derived future is
242       * fast and lightweight, consider {@linkplain Futures#chain(ListenableFuture,
243       * Function) the other overload} or explicit use of {@code
244       * sameThreadExecutor}. For heavier derivations, this choice carries some
245       * caveats: First, the thread that the derivation runs in depends on whether
246       * the input {@code Future} is done at the time {@code chain} is called. In
247       * particular, if called late, {@code chain} will run the derivation in the
248       * thread that called {@code chain}. Second, derivations may run in an
249       * internal thread of the system responsible for the input {@code Future},
250       * such as an RPC network thread. Finally, during the execution of a {@code
251       * sameThreadExecutor} {@code chain} function, all other registered but
252       * unexecuted listeners are prevented from running, even if those listeners
253       * are to run in other executors.
254       *
255       * @param input The future to chain
256       * @param function A function to chain the results of the provided future
257       *     to the results of the returned future.
258       * @param executor Executor to run the function in.
259       * @return A future that holds result of the chain.
260       * @deprecated Convert your {@code Function} to a {@code AsyncFunction}, and
261       *     use {@link #transform(ListenableFuture, AsyncFunction, Executor)}. This
262       *     method is scheduled to be removed from Guava in Guava release 12.0.
263       */
264      @Deprecated
265      public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
266          final Function<? super I, ? extends ListenableFuture<? extends O>>
267              function,
268          Executor executor) {
269        checkNotNull(function);
270        ChainingListenableFuture<I, O> chain =
271            new ChainingListenableFuture<I, O>(new AsyncFunction<I, O>() {
272              @Override
273              /*
274               * All methods of ListenableFuture are covariant, and we don't expose
275               * the object anywhere that would allow it to be downcast.
276               */
277              @SuppressWarnings("unchecked")
278              public ListenableFuture<O> apply(I input) {
279                return (ListenableFuture) function.apply(input);
280              }
281            }, input);
282        input.addListener(chain, executor);
283        return chain;
284      }
285    
286      /**
287       * Returns a new {@code ListenableFuture} whose result is asynchronously
288       * derived from the result of the given {@code Future}. More precisely, the
289       * returned {@code Future} takes its result from a {@code Future} produced by
290       * applying the given {@code AsyncFunction} to the result of the original
291       * {@code Future}. Example:
292       *
293       * <pre>   {@code
294       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
295       *   AsyncFunction<RowKey, QueryResult> queryFunction =
296       *       new AsyncFunction<RowKey, QueryResult>() {
297       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
298       *           return dataService.read(rowKey);
299       *         }
300       *       };
301       *   ListenableFuture<QueryResult> queryFuture =
302       *       transform(rowKeyFuture, queryFunction);
303       * }</pre>
304       *
305       * <p>Note: This overload of {@code transform} is designed for cases in which
306       * the work of creating the derived {@code Future} is fast and lightweight,
307       * as the method does not accept an {@code Executor} in which to perform the
308       * the work. (The created {@code Future} itself need not complete quickly.)
309       * For heavier operations, this overload carries some caveats: First, the
310       * thread that {@code function.apply} runs in depends on whether the input
311       * {@code Future} is done at the time {@code transform} is called. In
312       * particular, if called late, {@code transform} will run the operation in
313       * the thread that called {@code transform}.  Second, {@code function.apply}
314       * may run in an internal thread of the system responsible for the input
315       * {@code Future}, such as an RPC network thread.  Finally, during the
316       * execution of a {@code sameThreadExecutor} {@code function.apply}, all
317       * other registered but unexecuted listeners are prevented from running, even
318       * if those listeners are to run in other executors.
319       *
320       * <p>The returned {@code Future} attempts to keep its cancellation state in
321       * sync with that of the input future and that of the future returned by the
322       * function. That is, if the returned {@code Future} is cancelled, it will
323       * attempt to cancel the other two, and if either of the other two is
324       * cancelled, the returned {@code Future} will receive a callback in which it
325       * will attempt to cancel itself.
326       *
327       * @param input The future to transform
328       * @param function A function to transform the result of the input future
329       *     to the result of the output future
330       * @return A future that holds result of the function (if the input succeeded)
331       *     or the original input's failure (if not)
332       * @since 11.0
333       */
334      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
335          AsyncFunction<? super I, ? extends O> function) {
336        return transform(input, function, MoreExecutors.sameThreadExecutor());
337      }
338    
339      /**
340       * Returns a new {@code ListenableFuture} whose result is asynchronously
341       * derived from the result of the given {@code Future}. More precisely, the
342       * returned {@code Future} takes its result from a {@code Future} produced by
343       * applying the given {@code AsyncFunction} to the result of the original
344       * {@code Future}. Example:
345       *
346       * <pre>   {@code
347       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
348       *   AsyncFunction<RowKey, QueryResult> queryFunction =
349       *       new AsyncFunction<RowKey, QueryResult>() {
350       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
351       *           return dataService.read(rowKey);
352       *         }
353       *       };
354       *   ListenableFuture<QueryResult> queryFuture =
355       *       transform(rowKeyFuture, queryFunction, executor);
356       * }</pre>
357       *
358       * <p>The returned {@code Future} attempts to keep its cancellation state in
359       * sync with that of the input future and that of the future returned by the
360       * chain function. That is, if the returned {@code Future} is cancelled, it
361       * will attempt to cancel the other two, and if either of the other two is
362       * cancelled, the returned {@code Future} will receive a callback in which it
363       * will attempt to cancel itself.
364       *
365       * <p>Note: For cases in which the work of creating the derived future is
366       * fast and lightweight, consider {@linkplain
367       * Futures#transform(ListenableFuture, Function) the other overload} or
368       * explicit use of {@code sameThreadExecutor}. For heavier derivations, this
369       * choice carries some caveats: First, the thread that {@code function.apply}
370       * runs in depends on whether the input {@code Future} is done at the time
371       * {@code transform} is called. In particular, if called late, {@code
372       * transform} will run the operation in the thread that called {@code
373       * transform}.  Second, {@code function.apply} may run in an internal thread
374       * of the system responsible for the input {@code Future}, such as an RPC
375       * network thread.  Finally, during the execution of a {@code
376       * sameThreadExecutor} {@code function.apply}, all other registered but
377       * unexecuted listeners are prevented from running, even if those listeners
378       * are to run in other executors.
379       *
380       * @param input The future to transform
381       * @param function A function to transform the result of the input future
382       *     to the result of the output future
383       * @param executor Executor to run the function in.
384       * @return A future that holds result of the function (if the input succeeded)
385       *     or the original input's failure (if not)
386       * @since 11.0
387       */
388      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input,
389          AsyncFunction<? super I, ? extends O> function,
390          Executor executor) {
391        ChainingListenableFuture<I, O> output =
392            new ChainingListenableFuture<I, O>(function, input);
393        input.addListener(output, executor);
394        return output;
395      }
396    
397      /**
398       * Returns a new {@code ListenableFuture} whose result is the product of
399       * applying the given {@code Function} to the result of the given {@code
400       * Future}. Example:
401       *
402       * <pre>   {@code
403       *   ListenableFuture<QueryResult> queryFuture = ...;
404       *   Function<QueryResult, List<Row>> rowsFunction =
405       *       new Function<QueryResult, List<Row>>() {
406       *         public List<Row> apply(QueryResult queryResult) {
407       *           return queryResult.getRows();
408       *         }
409       *       };
410       *   ListenableFuture<List<Row>> rowsFuture =
411       *       transform(queryFuture, rowsFunction);
412       * }</pre>
413       *
414       * <p>Note: This overload of {@code transform} is designed for cases in which
415       * the transformation is fast and lightweight, as the method does not accept
416       * an {@code Executor} in which to perform the the work. For heavier
417       * transformations, this overload carries some caveats: First, the thread
418       * that the transformation runs in depends on whether the input {@code
419       * Future} is done at the time {@code transform} is called. In particular, if
420       * called late, {@code transform} will perform the transformation in the
421       * thread that called {@code transform}. Second, transformations may run in
422       * an internal thread of the system responsible for the input {@code Future},
423       * such as an RPC network thread. Finally, during the execution of a {@code
424       * sameThreadExecutor} transformation, all other registered but unexecuted
425       * listeners are prevented from running, even if those listeners are to run
426       * in other executors.
427       *
428       * <p>The returned {@code Future} attempts to keep its cancellation state in
429       * sync with that of the input future. That is, if the returned {@code Future}
430       * is cancelled, it will attempt to cancel the input, and if the input is
431       * cancelled, the returned {@code Future} will receive a callback in which it
432       * will attempt to cancel itself.
433       *
434       * <p>An example use of this method is to convert a serializable object
435       * returned from an RPC into a POJO.
436       *
437       * @param future The future to transform
438       * @param function A Function to transform the results of the provided future
439       *     to the results of the returned future.  This will be run in the thread
440       *     that notifies input it is complete.
441       * @return A future that holds result of the transformation.
442       * @since 9.0 (in 1.0 as {@code compose})
443       */
444      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
445          final Function<? super I, ? extends O> function) {
446        return transform(future, function, MoreExecutors.sameThreadExecutor());
447      }
448    
449      /**
450       * Returns a new {@code ListenableFuture} whose result is the product of
451       * applying the given {@code Function} to the result of the given {@code
452       * Future}. Example:
453       *
454       * <pre>   {@code
455       *   ListenableFuture<QueryResult> queryFuture = ...;
456       *   Function<QueryResult, List<Row>> rowsFunction =
457       *       new Function<QueryResult, List<Row>>() {
458       *         public List<Row> apply(QueryResult queryResult) {
459       *           return queryResult.getRows();
460       *         }
461       *       };
462       *   ListenableFuture<List<Row>> rowsFuture =
463       *       transform(queryFuture, rowsFunction, executor);
464       * }</pre>
465       *
466       * <p>The returned {@code Future} attempts to keep its cancellation state in
467       * sync with that of the input future. That is, if the returned {@code Future}
468       * is cancelled, it will attempt to cancel the input, and if the input is
469       * cancelled, the returned {@code Future} will receive a callback in which it
470       * will attempt to cancel itself.
471       *
472       * <p>An example use of this method is to convert a serializable object
473       * returned from an RPC into a POJO.
474       *
475       * <p>Note: For cases in which the transformation is fast and lightweight,
476       * consider {@linkplain Futures#transform(ListenableFuture, Function) the
477       * other overload} or explicit use of {@link
478       * MoreExecutors#sameThreadExecutor}. For heavier transformations, this
479       * choice carries some caveats: First, the thread that the transformation
480       * runs in depends on whether the input {@code Future} is done at the time
481       * {@code transform} is called. In particular, if called late, {@code
482       * transform} will perform the transformation in the thread that called
483       * {@code transform}.  Second, transformations may run in an internal thread
484       * of the system responsible for the input {@code Future}, such as an RPC
485       * network thread.  Finally, during the execution of a {@code
486       * sameThreadExecutor} transformation, all other registered but unexecuted
487       * listeners are prevented from running, even if those listeners are to run
488       * in other executors.
489       *
490       * @param future The future to transform
491       * @param function A Function to transform the results of the provided future
492       *     to the results of the returned future.
493       * @param executor Executor to run the function in.
494       * @return A future that holds result of the transformation.
495       * @since 9.0 (in 2.0 as {@code compose})
496       */
497      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
498          final Function<? super I, ? extends O> function, Executor executor) {
499        checkNotNull(function);
500        Function<I, ListenableFuture<O>> wrapperFunction
501            = new Function<I, ListenableFuture<O>>() {
502                @Override public ListenableFuture<O> apply(I input) {
503                  O output = function.apply(input);
504                  return immediateFuture(output);
505                }
506            };
507        return chain(future, wrapperFunction, executor);
508      }
509    
510      /**
511       * Like {@link #transform(ListenableFuture, Function)} except that the
512       * transformation {@code function} is invoked on each call to
513       * {@link Future#get() get()} on the returned future.
514       *
515       * <p>The returned {@code Future} reflects the input's cancellation
516       * state directly, and any attempt to cancel the returned Future is likewise
517       * passed through to the input Future.
518       *
519       * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
520       * only apply the timeout to the execution of the underlying {@code Future},
521       * <em>not</em> to the execution of the transformation function.
522       *
523       * <p>The primary audience of this method is callers of {@code transform}
524       * who don't have a {@code ListenableFuture} available and
525       * do not mind repeated, lazy function evaluation.
526       *
527       * @param future The future to transform
528       * @param function A Function to transform the results of the provided future
529       *     to the results of the returned future.
530       * @return A future that returns the result of the transformation.
531       * @since 10.0
532       */
533      @Beta
534      public static <I, O> Future<O> lazyTransform(final Future<I> future,
535          final Function<? super I, ? extends O> function) {
536        checkNotNull(future);
537        checkNotNull(function);
538        return new Future<O>() {
539    
540          @Override
541          public boolean cancel(boolean mayInterruptIfRunning) {
542            return future.cancel(mayInterruptIfRunning);
543          }
544    
545          @Override
546          public boolean isCancelled() {
547            return future.isCancelled();
548          }
549    
550          @Override
551          public boolean isDone() {
552            return future.isDone();
553          }
554    
555          @Override
556          public O get() throws InterruptedException, ExecutionException {
557            return applyTransformation(future.get());
558          }
559    
560          @Override
561          public O get(long timeout, TimeUnit unit)
562              throws InterruptedException, ExecutionException, TimeoutException {
563            return applyTransformation(future.get(timeout, unit));
564          }
565    
566          private O applyTransformation(I input) throws ExecutionException {
567            try {
568              return function.apply(input);
569            } catch (Throwable t) {
570              throw new ExecutionException(t);
571            }
572          }
573        };
574      }
575    
576      /**
577       * An implementation of {@code ListenableFuture} that also implements
578       * {@code Runnable} so that it can be used to nest ListenableFutures.
579       * Once the passed-in {@code ListenableFuture} is complete, it calls the
580       * passed-in {@code Function} to generate the result.
581       *
582       * <p>If the function throws any checked exceptions, they should be wrapped
583       * in a {@code UndeclaredThrowableException} so that this class can get
584       * access to the cause.
585       */
586      private static class ChainingListenableFuture<I, O>
587          extends AbstractFuture<O> implements Runnable {
588    
589        private AsyncFunction<? super I, ? extends O> function;
590        private ListenableFuture<? extends I> inputFuture;
591        private volatile ListenableFuture<? extends O> outputFuture;
592        private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
593            new LinkedBlockingQueue<Boolean>(1);
594        private final CountDownLatch outputCreated = new CountDownLatch(1);
595    
596        private ChainingListenableFuture(
597            AsyncFunction<? super I, ? extends O> function,
598            ListenableFuture<? extends I> inputFuture) {
599          this.function = checkNotNull(function);
600          this.inputFuture = checkNotNull(inputFuture);
601        }
602    
603        /**
604         * Delegate the get() to the input and output futures, in case
605         * their implementations defer starting computation until their
606         * own get() is invoked.
607         */
608        @Override
609        public O get() throws InterruptedException, ExecutionException {
610          if (!isDone()) {
611            // Invoking get on the inputFuture will ensure our own run()
612            // method below is invoked as a listener when inputFuture sets
613            // its value.  Therefore when get() returns we should then see
614            // the outputFuture be created.
615            ListenableFuture<? extends I> inputFuture = this.inputFuture;
616            if (inputFuture != null) {
617              inputFuture.get();
618            }
619    
620            // If our listener was scheduled to run on an executor we may
621            // need to wait for our listener to finish running before the
622            // outputFuture has been constructed by the function.
623            outputCreated.await();
624    
625            // Like above with the inputFuture, we have a listener on
626            // the outputFuture that will set our own value when its
627            // value is set.  Invoking get will ensure the output can
628            // complete and invoke our listener, so that we can later
629            // get the result.
630            ListenableFuture<? extends O> outputFuture = this.outputFuture;
631            if (outputFuture != null) {
632              outputFuture.get();
633            }
634          }
635          return super.get();
636        }
637    
638        /**
639         * Delegate the get() to the input and output futures, in case
640         * their implementations defer starting computation until their
641         * own get() is invoked.
642         */
643        @Override
644        public O get(long timeout, TimeUnit unit) throws TimeoutException,
645            ExecutionException, InterruptedException {
646          if (!isDone()) {
647            // Use a single time unit so we can decrease remaining timeout
648            // as we wait for various phases to complete.
649            if (unit != NANOSECONDS) {
650              timeout = NANOSECONDS.convert(timeout, unit);
651              unit = NANOSECONDS;
652            }
653    
654            // Invoking get on the inputFuture will ensure our own run()
655            // method below is invoked as a listener when inputFuture sets
656            // its value.  Therefore when get() returns we should then see
657            // the outputFuture be created.
658            ListenableFuture<? extends I> inputFuture = this.inputFuture;
659            if (inputFuture != null) {
660              long start = System.nanoTime();
661              inputFuture.get(timeout, unit);
662              timeout -= Math.max(0, System.nanoTime() - start);
663            }
664    
665            // If our listener was scheduled to run on an executor we may
666            // need to wait for our listener to finish running before the
667            // outputFuture has been constructed by the function.
668            long start = System.nanoTime();
669            if (!outputCreated.await(timeout, unit)) {
670              throw new TimeoutException();
671            }
672            timeout -= Math.max(0, System.nanoTime() - start);
673    
674            // Like above with the inputFuture, we have a listener on
675            // the outputFuture that will set our own value when its
676            // value is set.  Invoking get will ensure the output can
677            // complete and invoke our listener, so that we can later
678            // get the result.
679            ListenableFuture<? extends O> outputFuture = this.outputFuture;
680            if (outputFuture != null) {
681              outputFuture.get(timeout, unit);
682            }
683          }
684          return super.get(timeout, unit);
685        }
686    
687        @Override
688        public boolean cancel(boolean mayInterruptIfRunning) {
689          /*
690           * Our additional cancellation work needs to occur even if
691           * !mayInterruptIfRunning, so we can't move it into interruptTask().
692           */
693          if (super.cancel(mayInterruptIfRunning)) {
694            // This should never block since only one thread is allowed to cancel
695            // this Future.
696            putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
697            cancel(inputFuture, mayInterruptIfRunning);
698            cancel(outputFuture, mayInterruptIfRunning);
699            return true;
700          }
701          return false;
702        }
703    
704        private void cancel(@Nullable Future<?> future,
705            boolean mayInterruptIfRunning) {
706          if (future != null) {
707            future.cancel(mayInterruptIfRunning);
708          }
709        }
710    
711        @Override
712        public void run() {
713          try {
714            I sourceResult;
715            try {
716              sourceResult = getUninterruptibly(inputFuture);
717            } catch (CancellationException e) {
718              // Cancel this future and return.
719              // At this point, inputFuture is cancelled and outputFuture doesn't
720              // exist, so the value of mayInterruptIfRunning is irrelevant.
721              cancel(false);
722              return;
723            } catch (ExecutionException e) {
724              // Set the cause of the exception as this future's exception
725              setException(e.getCause());
726              return;
727            }
728    
729            final ListenableFuture<? extends O> outputFuture = this.outputFuture =
730                function.apply(sourceResult);
731            if (isCancelled()) {
732              // Handles the case where cancel was called while the function was
733              // being applied.
734              // There is a gap in cancel(boolean) between calling sync.cancel()
735              // and storing the value of mayInterruptIfRunning, so this thread
736              // needs to block, waiting for that value.
737              outputFuture.cancel(
738                  takeUninterruptibly(mayInterruptIfRunningChannel));
739              this.outputFuture = null;
740              return;
741            }
742            outputFuture.addListener(new Runnable() {
743                @Override
744                public void run() {
745                  try {
746                    // Here it would have been nice to have had an
747                    // UninterruptibleListenableFuture, but we don't want to start a
748                    // combinatorial explosion of interfaces, so we have to make do.
749                    set(getUninterruptibly(outputFuture));
750                  } catch (CancellationException e) {
751                    // Cancel this future and return.
752                    // At this point, inputFuture and outputFuture are done, so the
753                    // value of mayInterruptIfRunning is irrelevant.
754                    cancel(false);
755                    return;
756                  } catch (ExecutionException e) {
757                    // Set the cause of the exception as this future's exception
758                    setException(e.getCause());
759                  } finally {
760                    // Don't pin inputs beyond completion
761                    ChainingListenableFuture.this.outputFuture = null;
762                  }
763                }
764              }, MoreExecutors.sameThreadExecutor());
765          } catch (UndeclaredThrowableException e) {
766            // Set the cause of the exception as this future's exception
767            setException(e.getCause());
768          } catch (Exception e) {
769            // This exception is irrelevant in this thread, but useful for the
770            // client
771            setException(e);
772          } catch (Error e) {
773            // Propagate errors up ASAP - our superclass will rethrow the error
774            setException(e);
775          } finally {
776            // Don't pin inputs beyond completion
777            function = null;
778            inputFuture = null;
779            // Allow our get routines to examine outputFuture now.
780            outputCreated.countDown();
781          }
782        }
783      }
784    
785      /**
786       * Creates a new {@code ListenableFuture} whose value is a list containing the
787       * values of all its input futures, if all succeed. If any input fails, the
788       * returned future fails.
789       *
790       * <p>The list of results is in the same order as the input list.
791       *
792       * <p>Canceling this future does not cancel any of the component futures;
793       * however, if any of the provided futures fails or is canceled, this one is,
794       * too.
795       *
796       * @param futures futures to combine
797       * @return a future that provides a list of the results of the component
798       *         futures
799       * @since 10.0
800       */
801      @Beta
802      public static <V> ListenableFuture<List<V>> allAsList(
803          ListenableFuture<? extends V>... futures) {
804        return new ListFuture<V>(ImmutableList.copyOf(futures), true,
805            MoreExecutors.sameThreadExecutor());
806      }
807    
808      /**
809       * Creates a new {@code ListenableFuture} whose value is a list containing the
810       * values of all its input futures, if all succeed. If any input fails, the
811       * returned future fails.
812       *
813       * <p>The list of results is in the same order as the input list.
814       *
815       * <p>Canceling this future does not cancel any of the component futures;
816       * however, if any of the provided futures fails or is canceled, this one is,
817       * too.
818       *
819       * @param futures futures to combine
820       * @return a future that provides a list of the results of the component
821       *         futures
822       * @since 10.0
823       */
824      @Beta
825      public static <V> ListenableFuture<List<V>> allAsList(
826          Iterable<? extends ListenableFuture<? extends V>> futures) {
827        return new ListFuture<V>(ImmutableList.copyOf(futures), true,
828            MoreExecutors.sameThreadExecutor());
829      }
830    
831      /**
832       * Creates a new {@code ListenableFuture} whose value is a list containing the
833       * values of all its successful input futures. The list of results is in the
834       * same order as the input list, and if any of the provided futures fails or
835       * is canceled, its corresponding position will contain {@code null} (which is
836       * indistinguishable from the future having a successful value of
837       * {@code null}).
838       *
839       * @param futures futures to combine
840       * @return a future that provides a list of the results of the component
841       *         futures
842       * @since 10.0
843       */
844      @Beta
845      public static <V> ListenableFuture<List<V>> successfulAsList(
846          ListenableFuture<? extends V>... futures) {
847        return new ListFuture<V>(ImmutableList.copyOf(futures), false,
848            MoreExecutors.sameThreadExecutor());
849      }
850    
851      /**
852       * Creates a new {@code ListenableFuture} whose value is a list containing the
853       * values of all its successful input futures. The list of results is in the
854       * same order as the input list, and if any of the provided futures fails or
855       * is canceled, its corresponding position will contain {@code null} (which is
856       * indistinguishable from the future having a successful value of
857       * {@code null}).
858       *
859       * @param futures futures to combine
860       * @return a future that provides a list of the results of the component
861       *         futures
862       * @since 10.0
863       */
864      @Beta
865      public static <V> ListenableFuture<List<V>> successfulAsList(
866          Iterable<? extends ListenableFuture<? extends V>> futures) {
867        return new ListFuture<V>(ImmutableList.copyOf(futures), false,
868            MoreExecutors.sameThreadExecutor());
869      }
870    
871      /**
872       * Registers separate success and failure callbacks to be run when the {@code
873       * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
874       * complete} or, if the computation is already complete, immediately.
875       *
876       * <p>There is no guaranteed ordering of execution of callbacks, but any
877       * callback added through this method is guaranteed to be called once the
878       * computation is complete.
879       *
880       * Example: <pre> {@code
881       * ListenableFuture<QueryResult> future = ...;
882       * addCallback(future,
883       *     new FutureCallback<QueryResult> {
884       *       public void onSuccess(QueryResult result) {
885       *         storeInCache(result);
886       *       }
887       *       public void onFailure(Throwable t) {
888       *         reportError(t);
889       *       }
890       *     });}</pre>
891       *
892       * <p>Note: This overload of {@code addCallback} is designed for cases in
893       * which the callack is fast and lightweight, as the method does not accept
894       * an {@code Executor} in which to perform the the work. For heavier
895       * callbacks, this overload carries some caveats: First, the thread that the
896       * callback runs in depends on whether the input {@code Future} is done at the
897       * time {@code addCallback} is called and on whether the input {@code Future}
898       * is ever cancelled. In particular, {@code addCallback} may execute the
899       * callback in the thread that calls {@code addCallback} or {@code
900       * Future.cancel}. Second, callbacks may run in an internal thread of the
901       * system responsible for the input {@code Future}, such as an RPC network
902       * thread. Finally, during the execution of a {@code sameThreadExecutor}
903       * callback, all other registered but unexecuted listeners are prevented from
904       * running, even if those listeners are to run in other executors.
905       *
906       * <p>For a more general interface to attach a completion listener to a
907       * {@code Future}, see {@link ListenableFuture#addListener addListener}.
908       *
909       * @param future The future attach the callback to.
910       * @param callback The callback to invoke when {@code future} is completed.
911       * @since 10.0
912       */
913      public static <V> void addCallback(ListenableFuture<V> future,
914          FutureCallback<? super V> callback) {
915        addCallback(future, callback, MoreExecutors.sameThreadExecutor());
916      }
917    
918      /**
919       * Registers separate success and failure callbacks to be run when the {@code
920       * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
921       * complete} or, if the computation is already complete, immediately.
922       *
923       * <p>The callback is run in {@code executor}.
924       * There is no guaranteed ordering of execution of callbacks, but any
925       * callback added through this method is guaranteed to be called once the
926       * computation is complete.
927       *
928       * Example: <pre> {@code
929       * ListenableFuture<QueryResult> future = ...;
930       * Executor e = ...
931       * addCallback(future, e,
932       *     new FutureCallback<QueryResult> {
933       *       public void onSuccess(QueryResult result) {
934       *         storeInCache(result);
935       *       }
936       *       public void onFailure(Throwable t) {
937       *         reportError(t);
938       *       }
939       *     });}</pre>
940       *
941       * When the callback is fast and lightweight consider {@linkplain
942       * Futures#addCallback(ListenableFuture, FutureCallback) the other overload}
943       * or explicit use of {@link MoreExecutors#sameThreadExecutor
944       * sameThreadExecutor}. For heavier callbacks, this choice carries some
945       * caveats: First, the thread that the callback runs in depends on whether
946       * the input {@code Future} is done at the time {@code addCallback} is called
947       * and on whether the input {@code Future} is ever cancelled. In particular,
948       * {@code addCallback} may execute the callback in the thread that calls
949       * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in
950       * an internal thread of the system responsible for the input {@code Future},
951       * such as an RPC network thread. Finally, during the execution of a {@code
952       * sameThreadExecutor} callback, all other registered but unexecuted
953       * listeners are prevented from running, even if those listeners are to run
954       * in other executors.
955       *
956       * <p>For a more general interface to attach a completion listener to a
957       * {@code Future}, see {@link ListenableFuture#addListener addListener}.
958       *
959       * @param future The future attach the callback to.
960       * @param callback The callback to invoke when {@code future} is completed.
961       * @param executor The executor to run {@code callback} when the future
962       *    completes.
963       * @since 10.0
964       */
965      public static <V> void addCallback(final ListenableFuture<V> future,
966          final FutureCallback<? super V> callback, Executor executor) {
967        Preconditions.checkNotNull(callback);
968        Runnable callbackListener = new Runnable() {
969          @Override
970          public void run() {
971            try {
972              // TODO(user): (Before Guava release), validate that this
973              // is the thing for IE.
974              V value = getUninterruptibly(future);
975              callback.onSuccess(value);
976            } catch (ExecutionException e) {
977              callback.onFailure(e.getCause());
978            } catch (RuntimeException e) {
979              callback.onFailure(e);
980            } catch (Error e) {
981              callback.onFailure(e);
982            }
983          }
984        };
985        future.addListener(callbackListener, executor);
986      }
987    
988      /**
989       * Returns the result of {@link Future#get()}, converting most exceptions to a
990       * new instance of the given checked exception type. This reduces boilerplate
991       * for a common use of {@code Future} in which it is unnecessary to
992       * programmatically distinguish between exception types or to extract other
993       * information from the exception instance.
994       *
995       * <p>Exceptions from {@code Future.get} are treated as follows:
996       * <ul>
997       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
998       *     {@code X} if the cause is a checked exception, an {@link
999       *     UncheckedExecutionException} if the cause is a {@code
1000       *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1001       *     {@code Error}.
1002       * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1003       *     restoring the interrupt).
1004       * <li>Any {@link CancellationException} is propagated untouched, as is any
1005       *     other {@link RuntimeException} (though {@code get} implementations are
1006       *     discouraged from throwing such exceptions).
1007       * </ul>
1008       *
1009       * The overall principle is to continue to treat every checked exception as a
1010       * checked exception, every unchecked exception as an unchecked exception, and
1011       * every error as an error. In addition, the cause of any {@code
1012       * ExecutionException} is wrapped in order to ensure that the new stack trace
1013       * matches that of the current thread.
1014       *
1015       * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1016       * public constructor that accepts zero or more arguments, all of type {@code
1017       * String} or {@code Throwable} (preferring constructors with at least one
1018       * {@code String}) and calling the constructor via reflection. If the
1019       * exception did not already have a cause, one is set by calling {@link
1020       * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1021       * {@code IllegalArgumentException} is thrown.
1022       *
1023       * @throws X if {@code get} throws any checked exception except for an {@code
1024       *         ExecutionException} whose cause is not itself a checked exception
1025       * @throws UncheckedExecutionException if {@code get} throws an {@code
1026       *         ExecutionException} with a {@code RuntimeException} as its cause
1027       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1028       *         with an {@code Error} as its cause
1029       * @throws CancellationException if {@code get} throws a {@code
1030       *         CancellationException}
1031       * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1032       *         RuntimeException} or does not have a suitable constructor
1033       * @since 10.0
1034       */
1035      @Beta
1036      public static <V, X extends Exception> V get(
1037          Future<V> future, Class<X> exceptionClass) throws X {
1038        checkNotNull(future);
1039        checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1040            "Futures.get exception type (%s) must not be a RuntimeException",
1041            exceptionClass);
1042        try {
1043          return future.get();
1044        } catch (InterruptedException e) {
1045          currentThread().interrupt();
1046          throw newWithCause(exceptionClass, e);
1047        } catch (ExecutionException e) {
1048          wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1049          throw new AssertionError();
1050        }
1051      }
1052    
1053      /**
1054       * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1055       * exceptions to a new instance of the given checked exception type. This
1056       * reduces boilerplate for a common use of {@code Future} in which it is
1057       * unnecessary to programmatically distinguish between exception types or to
1058       * extract other information from the exception instance.
1059       *
1060       * <p>Exceptions from {@code Future.get} are treated as follows:
1061       * <ul>
1062       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1063       *     {@code X} if the cause is a checked exception, an {@link
1064       *     UncheckedExecutionException} if the cause is a {@code
1065       *     RuntimeException}, or an {@link ExecutionError} if the cause is an
1066       *     {@code Error}.
1067       * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1068       *     restoring the interrupt).
1069       * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1070       * <li>Any {@link CancellationException} is propagated untouched, as is any
1071       *     other {@link RuntimeException} (though {@code get} implementations are
1072       *     discouraged from throwing such exceptions).
1073       * </ul>
1074       *
1075       * The overall principle is to continue to treat every checked exception as a
1076       * checked exception, every unchecked exception as an unchecked exception, and
1077       * every error as an error. In addition, the cause of any {@code
1078       * ExecutionException} is wrapped in order to ensure that the new stack trace
1079       * matches that of the current thread.
1080       *
1081       * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1082       * public constructor that accepts zero or more arguments, all of type {@code
1083       * String} or {@code Throwable} (preferring constructors with at least one
1084       * {@code String}) and calling the constructor via reflection. If the
1085       * exception did not already have a cause, one is set by calling {@link
1086       * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1087       * {@code IllegalArgumentException} is thrown.
1088       *
1089       * @throws X if {@code get} throws any checked exception except for an {@code
1090       *         ExecutionException} whose cause is not itself a checked exception
1091       * @throws UncheckedExecutionException if {@code get} throws an {@code
1092       *         ExecutionException} with a {@code RuntimeException} as its cause
1093       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1094       *         with an {@code Error} as its cause
1095       * @throws CancellationException if {@code get} throws a {@code
1096       *         CancellationException}
1097       * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1098       *         RuntimeException} or does not have a suitable constructor
1099       * @since 10.0
1100       */
1101      @Beta
1102      public static <V, X extends Exception> V get(
1103          Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1104          throws X {
1105        checkNotNull(future);
1106        checkNotNull(unit);
1107        checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1108            "Futures.get exception type (%s) must not be a RuntimeException",
1109            exceptionClass);
1110        try {
1111          return future.get(timeout, unit);
1112        } catch (InterruptedException e) {
1113          currentThread().interrupt();
1114          throw newWithCause(exceptionClass, e);
1115        } catch (TimeoutException e) {
1116          throw newWithCause(exceptionClass, e);
1117        } catch (ExecutionException e) {
1118          wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1119          throw new AssertionError();
1120        }
1121      }
1122    
1123      private static <X extends Exception> void wrapAndThrowExceptionOrError(
1124          Throwable cause, Class<X> exceptionClass) throws X {
1125        if (cause instanceof Error) {
1126          throw new ExecutionError((Error) cause);
1127        }
1128        if (cause instanceof RuntimeException) {
1129          throw new UncheckedExecutionException(cause);
1130        }
1131        throw newWithCause(exceptionClass, cause);
1132      }
1133    
1134      /**
1135       * Returns the result of calling {@link Future#get()} uninterruptibly on a
1136       * task known not to throw a checked exception. This makes {@code Future} more
1137       * suitable for lightweight, fast-running tasks that, barring bugs in the
1138       * code, will not fail. This gives it exception-handling behavior similar to
1139       * that of {@code ForkJoinTask.join}.
1140       *
1141       * <p>Exceptions from {@code Future.get} are treated as follows:
1142       * <ul>
1143       * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1144       *     {@link UncheckedExecutionException} (if the cause is an {@code
1145       *     Exception}) or {@link ExecutionError} (if the cause is an {@code
1146       *     Error}).
1147       * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1148       *     call. The interrupt is restored before {@code getUnchecked} returns.
1149       * <li>Any {@link CancellationException} is propagated untouched. So is any
1150       *     other {@link RuntimeException} ({@code get} implementations are
1151       *     discouraged from throwing such exceptions).
1152       * </ul>
1153       *
1154       * The overall principle is to eliminate all checked exceptions: to loop to
1155       * avoid {@code InterruptedException}, to pass through {@code
1156       * CancellationException}, and to wrap any exception from the underlying
1157       * computation in an {@code UncheckedExecutionException} or {@code
1158       * ExecutionError}.
1159       *
1160       * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1161       * {@link Uninterruptibles#getUninterruptibly(Future)}.
1162       *
1163       * @throws UncheckedExecutionException if {@code get} throws an {@code
1164       *         ExecutionException} with an {@code Exception} as its cause
1165       * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1166       *         with an {@code Error} as its cause
1167       * @throws CancellationException if {@code get} throws a {@code
1168       *         CancellationException}
1169       * @since 10.0
1170       */
1171      @Beta
1172      public static <V> V getUnchecked(Future<V> future) {
1173        checkNotNull(future);
1174        try {
1175          return getUninterruptibly(future);
1176        } catch (ExecutionException e) {
1177          wrapAndThrowUnchecked(e.getCause());
1178          throw new AssertionError();
1179        }
1180      }
1181    
1182      private static void wrapAndThrowUnchecked(Throwable cause) {
1183        if (cause instanceof Error) {
1184          throw new ExecutionError((Error) cause);
1185        }
1186        /*
1187         * It's a non-Error, non-Exception Throwable. From my survey of such
1188         * classes, I believe that most users intended to extend Exception, so we'll
1189         * treat it like an Exception.
1190         */
1191        throw new UncheckedExecutionException(cause);
1192      }
1193    
1194      /*
1195       * TODO(user): FutureChecker interface for these to be static methods on? If
1196       * so, refer to it in the (static-method) Futures.get documentation
1197       */
1198    
1199      /*
1200       * Arguably we don't need a timed getUnchecked because any operation slow
1201       * enough to require a timeout is heavyweight enough to throw a checked
1202       * exception and therefore be inappropriate to use with getUnchecked. Further,
1203       * it's not clear that converting the checked TimeoutException to a
1204       * RuntimeException -- especially to an UncheckedExecutionException, since it
1205       * wasn't thrown by the computation -- makes sense, and if we don't convert
1206       * it, the user still has to write a try-catch block.
1207       *
1208       * If you think you would use this method, let us know.
1209       */
1210    
1211      private static <X extends Exception> X newWithCause(
1212          Class<X> exceptionClass, Throwable cause) {
1213        // getConstructors() guarantees this as long as we don't modify the array.
1214        @SuppressWarnings("unchecked")
1215        List<Constructor<X>> constructors =
1216            (List) Arrays.asList(exceptionClass.getConstructors());
1217        for (Constructor<X> constructor : preferringStrings(constructors)) {
1218          @Nullable X instance = newFromConstructor(constructor, cause);
1219          if (instance != null) {
1220            if (instance.getCause() == null) {
1221              instance.initCause(cause);
1222            }
1223            return instance;
1224          }
1225        }
1226        throw new IllegalArgumentException(
1227            "No appropriate constructor for exception of type " + exceptionClass
1228                + " in response to chained exception", cause);
1229      }
1230    
1231      private static <X extends Exception> List<Constructor<X>>
1232          preferringStrings(List<Constructor<X>> constructors) {
1233        return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1234      }
1235    
1236      private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1237          Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1238            @Override public Boolean apply(Constructor<?> input) {
1239              return asList(input.getParameterTypes()).contains(String.class);
1240            }
1241          }).reverse();
1242    
1243      @Nullable private static <X> X newFromConstructor(
1244          Constructor<X> constructor, Throwable cause) {
1245        Class<?>[] paramTypes = constructor.getParameterTypes();
1246        Object[] params = new Object[paramTypes.length];
1247        for (int i = 0; i < paramTypes.length; i++) {
1248          Class<?> paramType = paramTypes[i];
1249          if (paramType.equals(String.class)) {
1250            params[i] = cause.toString();
1251          } else if (paramType.equals(Throwable.class)) {
1252            params[i] = cause;
1253          } else {
1254            return null;
1255          }
1256        }
1257        try {
1258          return constructor.newInstance(params);
1259        } catch (IllegalArgumentException e) {
1260          return null;
1261        } catch (InstantiationException e) {
1262          return null;
1263        } catch (IllegalAccessException e) {
1264          return null;
1265        } catch (InvocationTargetException e) {
1266          return null;
1267        }
1268      }
1269    
1270      /**
1271       * Class that implements {@link #allAsList} and {@link #successfulAsList}.
1272       * The idea is to create a (null-filled) List and register a listener with
1273       * each component future to fill out the value in the List when that future
1274       * completes.
1275       */
1276      private static class ListFuture<V> extends AbstractFuture<List<V>> {
1277        ImmutableList<? extends ListenableFuture<? extends V>> futures;
1278        final boolean allMustSucceed;
1279        final AtomicInteger remaining;
1280        List<V> values;
1281    
1282        /**
1283         * Constructor.
1284         *
1285         * @param futures all the futures to build the list from
1286         * @param allMustSucceed whether a single failure or cancellation should
1287         *        propagate to this future
1288         * @param listenerExecutor used to run listeners on all the passed in
1289         *        futures.
1290         */
1291        ListFuture(
1292            final ImmutableList<? extends ListenableFuture<? extends V>> futures,
1293            final boolean allMustSucceed, final Executor listenerExecutor) {
1294          this.futures = futures;
1295          this.values = Lists.newArrayListWithCapacity(futures.size());
1296          this.allMustSucceed = allMustSucceed;
1297          this.remaining = new AtomicInteger(futures.size());
1298    
1299          init(listenerExecutor);
1300        }
1301    
1302        private void init(final Executor listenerExecutor) {
1303          // First, schedule cleanup to execute when the Future is done.
1304          addListener(new Runnable() {
1305            @Override
1306            public void run() {
1307              // By now the values array has either been set as the Future's value,
1308              // or (in case of failure) is no longer useful.
1309              ListFuture.this.values = null;
1310    
1311              // Let go of the memory held by other futures
1312              ListFuture.this.futures = null;
1313            }
1314          }, MoreExecutors.sameThreadExecutor());
1315    
1316          // Now begin the "real" initialization.
1317    
1318          // Corner case: List is empty.
1319          if (futures.isEmpty()) {
1320            set(Lists.newArrayList(values));
1321            return;
1322          }
1323    
1324          // Populate the results list with null initially.
1325          for (int i = 0; i < futures.size(); ++i) {
1326            values.add(null);
1327          }
1328    
1329          // Register a listener on each Future in the list to update
1330          // the state of this future.
1331          // Note that if all the futures on the list are done prior to completing
1332          // this loop, the last call to addListener() will callback to
1333          // setOneValue(), transitively call our cleanup listener, and set
1334          // this.futures to null.
1335          // We store a reference to futures to avoid the NPE.
1336          ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures;
1337          for (int i = 0; i < localFutures.size(); i++) {
1338            final ListenableFuture<? extends V> listenable = localFutures.get(i);
1339            final int index = i;
1340            listenable.addListener(new Runnable() {
1341              @Override
1342              public void run() {
1343                setOneValue(index, listenable);
1344              }
1345            }, listenerExecutor);
1346          }
1347        }
1348    
1349        /**
1350         * Sets the value at the given index to that of the given future.
1351         */
1352        private void setOneValue(int index, Future<? extends V> future) {
1353          List<V> localValues = values;
1354          if (isDone() || localValues == null) {
1355            // Some other future failed or has been cancelled, causing this one to
1356            // also be cancelled or have an exception set. This should only happen
1357            // if allMustSucceed is true.
1358            checkState(allMustSucceed,
1359                "Future was done before all dependencies completed");
1360            return;
1361          }
1362    
1363          try {
1364            checkState(future.isDone(),
1365                "Tried to set value from future which is not done");
1366            localValues.set(index, getUninterruptibly(future));
1367          } catch (CancellationException e) {
1368            if (allMustSucceed) {
1369              // Set ourselves as cancelled. Let the input futures keep running
1370              // as some of them may be used elsewhere.
1371              // (Currently we don't override interruptTask, so
1372              // mayInterruptIfRunning==false isn't technically necessary.)
1373              cancel(false);
1374            }
1375          } catch (ExecutionException e) {
1376            if (allMustSucceed) {
1377              // As soon as the first one fails, throw the exception up.
1378              // The result of all other inputs is then ignored.
1379              setException(e.getCause());
1380            }
1381          } catch (RuntimeException e) {
1382            if (allMustSucceed) {
1383              setException(e);
1384            }
1385          } catch (Error e) {
1386            // Propagate errors up ASAP - our superclass will rethrow the error
1387            setException(e);
1388          } finally {
1389            int newRemaining = remaining.decrementAndGet();
1390            checkState(newRemaining >= 0, "Less than 0 remaining futures");
1391            if (newRemaining == 0) {
1392              localValues = values;
1393              if (localValues != null) {
1394                set(Lists.newArrayList(localValues));
1395              } else {
1396                checkState(isDone());
1397              }
1398            }
1399          }
1400        }
1401    
1402        @Override
1403        public List<V> get() throws InterruptedException, ExecutionException {
1404          callAllGets();
1405    
1406          // This may still block in spite of the calls above, as the listeners may
1407          // be scheduled for execution in other threads.
1408          return super.get();
1409        }
1410    
1411        /**
1412         * Calls the get method of all dependency futures to work around a bug in
1413         * some ListenableFutures where the listeners aren't called until get() is
1414         * called.
1415         */
1416        private void callAllGets() throws InterruptedException {
1417          List<? extends ListenableFuture<? extends V>> oldFutures = futures;
1418          if (oldFutures != null && !isDone()) {
1419            for (ListenableFuture<? extends V> future : oldFutures) {
1420              // We wait for a little while for the future, but if it's not done,
1421              // we check that no other futures caused a cancellation or failure.
1422              // This can introduce a delay of up to 10ms in reporting an exception.
1423              while (!future.isDone()) {
1424                try {
1425                  future.get();
1426                } catch (Error e) {
1427                  throw e;
1428                } catch (InterruptedException e) {
1429                  throw e;
1430                } catch (Throwable e) {
1431                  // ExecutionException / CancellationException / RuntimeException
1432                  if (allMustSucceed) {
1433                    return;
1434                  } else {
1435                    continue;
1436                  }
1437                }
1438              }
1439            }
1440          }
1441        }
1442      }
1443    
1444      /**
1445       * A checked future that uses a function to map from exceptions to the
1446       * appropriate checked type.
1447       */
1448      private static class MappingCheckedFuture<V, X extends Exception> extends
1449          AbstractCheckedFuture<V, X> {
1450    
1451        final Function<Exception, X> mapper;
1452    
1453        MappingCheckedFuture(ListenableFuture<V> delegate,
1454            Function<Exception, X> mapper) {
1455          super(delegate);
1456    
1457          this.mapper = checkNotNull(mapper);
1458        }
1459    
1460        @Override
1461        protected X mapException(Exception e) {
1462          return mapper.apply(e);
1463        }
1464      }
1465    }