001    /*
002     * Copyright (C) 2006 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    import static java.util.concurrent.TimeUnit.NANOSECONDS;
021    
022    import com.google.common.annotations.Beta;
023    import com.google.common.base.Function;
024    
025    import java.lang.reflect.UndeclaredThrowableException;
026    import java.util.List;
027    import java.util.concurrent.BlockingQueue;
028    import java.util.concurrent.CancellationException;
029    import java.util.concurrent.CountDownLatch;
030    import java.util.concurrent.ExecutionException;
031    import java.util.concurrent.Executor;
032    import java.util.concurrent.Executors;
033    import java.util.concurrent.Future;
034    import java.util.concurrent.LinkedBlockingQueue;
035    import java.util.concurrent.ThreadFactory;
036    import java.util.concurrent.TimeUnit;
037    import java.util.concurrent.TimeoutException;
038    import java.util.concurrent.atomic.AtomicBoolean;
039    
040    import javax.annotation.Nullable;
041    
042    /**
043     * Static utility methods pertaining to the {@link Future} interface.
044     *
045     * @author Kevin Bourrillion
046     * @author Nishant Thakkar
047     * @author Sven Mawson
048     * @since 1
049     */
050    @Beta
051    public final class Futures {
052      private Futures() {}
053    
054      /**
055       * Returns an uninterruptible view of a {@code Future}. If a thread is
056       * interrupted during an attempt to {@code get()} from the returned future, it
057       * continues to wait on the result until it is available or the timeout
058       * elapses, and only then re-interrupts the thread.
059       */
060      public static <V> UninterruptibleFuture<V> makeUninterruptible(
061          final Future<V> future) {
062        checkNotNull(future);
063        if (future instanceof UninterruptibleFuture<?>) {
064          return (UninterruptibleFuture<V>) future;
065        }
066        return new UninterruptibleFuture<V>() {
067          @Override
068          public boolean cancel(boolean mayInterruptIfRunning) {
069            return future.cancel(mayInterruptIfRunning);
070          }
071          @Override
072          public boolean isCancelled() {
073            return future.isCancelled();
074          }
075          @Override
076          public boolean isDone() {
077            return future.isDone();
078          }
079    
080          @Override
081          public V get(long originalTimeout, TimeUnit originalUnit)
082              throws TimeoutException, ExecutionException {
083            boolean interrupted = false;
084            try {
085              long end = System.nanoTime() + originalUnit.toNanos(originalTimeout);
086              while (true) {
087                try {
088                  // Future treats negative timeouts just like zero.
089                  return future.get(end - System.nanoTime(), NANOSECONDS);
090                } catch (InterruptedException e) {
091                  interrupted = true;
092                }
093              }
094            } finally {
095              if (interrupted) {
096                Thread.currentThread().interrupt();
097              }
098            }
099          }
100    
101          @Override
102          public V get() throws ExecutionException {
103            boolean interrupted = false;
104            try {
105              while (true) {
106                try {
107                  return future.get();
108                } catch (InterruptedException ignored) {
109                  interrupted = true;
110                }
111              }
112            } finally {
113              if (interrupted) {
114                Thread.currentThread().interrupt();
115              }
116            }
117          }
118        };
119      }
120    
121      /**
122       *
123       * <p>Creates a {@link ListenableFuture} out of a normal {@link Future}. The
124       * returned future will create a thread to wait for the source future to
125       * complete before executing the listeners.
126       *
127       * <p><b>Warning:</b> If the input future does not already implement {@link
128       * ListenableFuture}, the returned future will emulate {@link
129       * ListenableFuture#addListener} by taking a thread from an internal,
130       * unbounded pool at the first call to {@code addListener} and holding it
131       * until the future is {@linkplain Future#isDone() done}.
132       *
133       * <p>Callers who have a future that subclasses
134       * {@link java.util.concurrent.FutureTask} may want to instead subclass
135       * {@link ListenableFutureTask}, which adds the {@link ListenableFuture}
136       * functionality to the standard {@code FutureTask} implementation.
137       */
138      public static <V> ListenableFuture<V> makeListenable(
139          Future<V> future) {
140        if (future instanceof ListenableFuture<?>) {
141          return (ListenableFuture<V>) future;
142        }
143        return new ListenableFutureAdapter<V>(future);
144      }
145    
146      static <V> ListenableFuture<V> makeListenable(
147          Future<V> future, Executor executor) {
148        checkNotNull(executor);
149        if (future instanceof ListenableFuture<?>) {
150          return (ListenableFuture<V>) future;
151        }
152        return new ListenableFutureAdapter<V>(future, executor);
153      }
154    
155      /**
156       * Creates a {@link CheckedFuture} out of a normal {@link Future} and a
157       * {@link Function} that maps from {@link Exception} instances into the
158       * appropriate checked type.
159       *
160       * <p><b>Warning:</b> If the input future does not implement {@link
161       * ListenableFuture}, the returned future will emulate {@link
162       * ListenableFuture#addListener} by taking a thread from an internal,
163       * unbounded pool at the first call to {@code addListener} and holding it
164       * until the future is {@linkplain Future#isDone() done}.
165       *
166       * <p>The given mapping function will be applied to an
167       * {@link InterruptedException}, a {@link CancellationException}, or an
168       * {@link ExecutionException} with the actual cause of the exception.
169       * See {@link Future#get()} for details on the exceptions thrown.
170       */
171      public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
172          Future<V> future, Function<Exception, X> mapper) {
173        return new MappingCheckedFuture<V, X>(makeListenable(future), mapper);
174      }
175    
176      /**
177       * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
178       * and a {@link Function} that maps from {@link Exception} instances into the
179       * appropriate checked type.
180       *
181       * <p>The given mapping function will be applied to an
182       * {@link InterruptedException}, a {@link CancellationException}, or an
183       * {@link ExecutionException} with the actual cause of the exception.
184       * See {@link Future#get()} for details on the exceptions thrown.
185       *
186       * @since 9 (source-compatible since release 1)
187       */
188      public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
189          ListenableFuture<V> future, Function<Exception, X> mapper) {
190        return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
191      }
192    
193      /**
194       * Creates a {@code ListenableFuture} which has its value set immediately upon
195       * construction. The getters just return the value. This {@code Future} can't
196       * be canceled or timed out and its {@code isDone()} method always returns
197       * {@code true}.
198       */
199      public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
200        SettableFuture<V> future = SettableFuture.create();
201        future.set(value);
202        return future;
203      }
204    
205      /**
206       * Returns a {@code CheckedFuture} which has its value set immediately upon
207       * construction.
208       *
209       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
210       * method always returns {@code true}. Calling {@code get()} or {@code
211       * checkedGet()} will immediately return the provided value.
212       */
213      public static <V, X extends Exception> CheckedFuture<V, X>
214          immediateCheckedFuture(@Nullable V value) {
215        SettableFuture<V> future = SettableFuture.create();
216        future.set(value);
217        return Futures.makeChecked(future, new Function<Exception, X>() {
218          @Override
219          public X apply(Exception e) {
220            throw new AssertionError("impossible");
221          }
222        });
223      }
224    
225      /**
226       * Returns a {@code ListenableFuture} which has an exception set immediately
227       * upon construction.
228       *
229       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
230       * method always returns {@code true}. Calling {@code get()} will immediately
231       * throw the provided {@code Throwable} wrapped in an {@code
232       * ExecutionException}.
233       *
234       * @throws Error if the throwable is an {@link Error}.
235       */
236      public static <V> ListenableFuture<V> immediateFailedFuture(
237          Throwable throwable) {
238        checkNotNull(throwable);
239        SettableFuture<V> future = SettableFuture.create();
240        future.setException(throwable);
241        return future;
242      }
243    
244      /**
245       * Returns a {@code CheckedFuture} which has an exception set immediately upon
246       * construction.
247       *
248       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
249       * method always returns {@code true}. Calling {@code get()} will immediately
250       * throw the provided {@code Throwable} wrapped in an {@code
251       * ExecutionException}, and calling {@code checkedGet()} will throw the
252       * provided exception itself.
253       *
254       * @throws Error if the throwable is an {@link Error}.
255       */
256      public static <V, X extends Exception> CheckedFuture<V, X>
257          immediateFailedCheckedFuture(final X exception) {
258        checkNotNull(exception);
259        return makeChecked(Futures.<V>immediateFailedFuture(exception),
260            new Function<Exception, X>() {
261              @Override
262              public X apply(Exception e) {
263                return exception;
264              }
265            });
266      }
267    
268      /**
269       * Returns a new {@code ListenableFuture} whose result is asynchronously
270       * derived from the result of the given {@code Future}. More precisely, the
271       * returned {@code Future} takes its result from a {@code Future} produced by
272       * applying the given {@code Function} to the result of the original {@code
273       * Future}. Example:
274       *
275       * <pre>   {@code
276       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
277       *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
278       *       new Function<RowKey, ListenableFuture<QueryResult>>() {
279       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
280       *           return dataService.read(rowKey);
281       *         }
282       *       };
283       *   ListenableFuture<QueryResult> queryFuture =
284       *       chain(queryFuture, queryFunction);
285       * }</pre>
286       *
287       * <p>Successful cancellation of either the input future or the result of
288       * function application will cause the returned future to be cancelled.
289       * Cancelling the returned future will succeed if it is currently running.
290       * In this case, attempts will be made to cancel the input future and the
291       * result of the function, however there is no guarantee of success.
292       *
293       * <p>TODO: Add a version that accepts a normal {@code Future}
294       *
295       * <p>The typical use for this method would be when a RPC call is dependent on
296       * the results of another RPC.  One would call the first RPC (input), create a
297       * function that calls another RPC based on input's result, and then call
298       * chain on input and that function to get a {@code ListenableFuture} of
299       * the result.
300       *
301       * @param input The future to chain
302       * @param function A function to chain the results of the provided future
303       *     to the results of the returned future.  This will be run in the thread
304       *     that notifies input it is complete.
305       * @return A future that holds result of the chain.
306       */
307      public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
308          Function<? super I, ? extends ListenableFuture<? extends O>> function) {
309        return chain(input, function, MoreExecutors.sameThreadExecutor());
310      }
311    
312      /**
313       * Returns a new {@code ListenableFuture} whose result is asynchronously
314       * derived from the result of the given {@code Future}. More precisely, the
315       * returned {@code Future} takes its result from a {@code Future} produced by
316       * applying the given {@code Function} to the result of the original {@code
317       * Future}. Example:
318       *
319       * <pre>   {@code
320       *   ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
321       *   Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
322       *       new Function<RowKey, ListenableFuture<QueryResult>>() {
323       *         public ListenableFuture<QueryResult> apply(RowKey rowKey) {
324       *           return dataService.read(rowKey);
325       *         }
326       *       };
327       *   ListenableFuture<QueryResult> queryFuture =
328       *       chain(queryFuture, queryFunction, executor);
329       * }</pre>
330       *
331       * <p>Successful cancellation of either the input future or the result of
332       * function application will cause the returned future to be cancelled.
333       * Cancelling the returned future will succeed if it is currently running.
334       * In this case, attempts will be made to cancel the input future and the
335       * result of the function, however there is no guarantee of success.
336       *
337       * <p>This version allows an arbitrary executor to be passed in for running
338       * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
339       * the thread chained Function executes in will be whichever thread set the
340       * result of the input Future, which may be the network thread in the case of
341       * RPC-based Futures.
342       *
343       * @param input The future to chain
344       * @param function A function to chain the results of the provided future
345       *     to the results of the returned future.
346       * @param exec Executor to run the function in.
347       * @return A future that holds result of the chain.
348       */
349      public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
350          Function<? super I, ? extends ListenableFuture<? extends O>> function,
351          Executor exec) {
352        ChainingListenableFuture<I, O> chain =
353            new ChainingListenableFuture<I, O>(function, input);
354        input.addListener(chain, exec);
355        return chain;
356      }
357    
358      /**
359       * Returns a new {@code ListenableFuture} whose result is the product of
360       * applying the given {@code Function} to the result of the given {@code
361       * Future}. Example:
362       *
363       * <pre>   {@code
364       *   ListenableFuture<QueryResult> queryFuture = ...;
365       *   Function<QueryResult, List<Row>> rowsFunction =
366       *       new Function<QueryResult, List<Row>>() {
367       *         public List<Row> apply(QueryResult queryResult) {
368       *           return queryResult.getRows();
369       *         }
370       *       };
371       *   ListenableFuture<List<Row>> rowsFuture =
372       *       transform(queryFuture, rowsFunction);
373       * }</pre>
374       *
375       * <p>Successful cancellation of the input future will cause the returned
376       * future to be cancelled.  Cancelling the returned future will succeed if it
377       * is currently running.  In this case, an attempt will be made to cancel the
378       * input future, however there is no guarantee of success.
379       *
380       * <p>An example use of this method is to convert a serializable object
381       * returned from an RPC into a POJO.
382       *
383       * @param future The future to compose
384       * @param function A Function to compose the results of the provided future
385       *     to the results of the returned future.  This will be run in the thread
386       *     that notifies input it is complete.
387       * @return A future that holds result of the composition.
388       * @since 9 (in version 1 as {@code compose})
389       */
390      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
391          final Function<? super I, ? extends O> function) {
392        return transform(future, function, MoreExecutors.sameThreadExecutor());
393      }
394    
395      /**
396       * Returns a new {@code ListenableFuture} whose result is the product of
397       * applying the given {@code Function} to the result of the given {@code
398       * Future}. Example:
399       *
400       * <pre>   {@code
401       *   ListenableFuture<QueryResult> queryFuture = ...;
402       *   Function<QueryResult, List<Row>> rowsFunction =
403       *       new Function<QueryResult, List<Row>>() {
404       *         public List<Row> apply(QueryResult queryResult) {
405       *           return queryResult.getRows();
406       *         }
407       *       };
408       *   ListenableFuture<List<Row>> rowsFuture =
409       *       transform(queryFuture, rowsFunction, executor);
410       * }</pre>
411       *
412       * <p>Successful cancellation of the input future will cause the returned
413       * future to be cancelled.  Cancelling the returned future will succeed if it
414       * is currently running.  In this case, an attempt will be made to cancel the
415       * input future, however there is no guarantee of success.
416       *
417       * <p>An example use of this method is to convert a serializable object
418       * returned from an RPC into a POJO.
419       *
420       * <p>This version allows an arbitrary executor to be passed in for running
421       * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
422       * the thread chained Function executes in will be whichever thread set the
423       * result of the input Future, which may be the network thread in the case of
424       * RPC-based Futures.
425       *
426       * @param future The future to compose
427       * @param function A Function to compose the results of the provided future
428       *     to the results of the returned future.
429       * @param exec Executor to run the function in.
430       * @return A future that holds result of the composition.
431       * @since 9 (in version 2 as {@code compose})
432       */
433      public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
434          final Function<? super I, ? extends O> function, Executor exec) {
435        checkNotNull(function);
436        Function<I, ListenableFuture<O>> wrapperFunction
437            = new Function<I, ListenableFuture<O>>() {
438                @Override public ListenableFuture<O> apply(I input) {
439                  O output = function.apply(input);
440                  return immediateFuture(output);
441                }
442            };
443        return chain(future, wrapperFunction, exec);
444      }
445    
446      /**
447       * Returns a new {@code Future} whose result is the product of applying the
448       * given {@code Function} to the result of the given {@code Future}. Example:
449       *
450       * <pre>   {@code
451       *   Future<QueryResult> queryFuture = ...;
452       *   Function<QueryResult, List<Row>> rowsFunction =
453       *       new Function<QueryResult, List<Row>>() {
454       *         public List<Row> apply(QueryResult queryResult) {
455       *           return queryResult.getRows();
456       *         }
457       *       };
458       *   Future<List<Row>> rowsFuture = transform(queryFuture, rowsFunction);
459       * }</pre>
460       *
461       * <p>Each call to {@code Future<O>.get(*)} results in a call to
462       * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it
463       * is assumed that {@code Future<I>.get(*)} is idempotent.
464       *
465       * <p>When calling {@link Future#get(long, TimeUnit)} on the returned
466       * future, the timeout only applies to the future passed in to this method.
467       * Any additional time taken by applying {@code function} is not considered.
468       * (Exception: If the input future is a {@link ListenableFuture}, timeouts
469       * will be strictly enforced.)
470       *
471       * @param future The future to compose
472       * @param function A Function to compose the results of the provided future
473       *     to the results of the returned future.  This will be run in the thread
474       *     that calls one of the varieties of {@code get()}.
475       * @return A future that computes result of the composition.
476       * @since 9 (in version 1 as {@code compose})
477       */
478      public static <I, O> Future<O> transform(final Future<I> future,
479          final Function<? super I, ? extends O> function) {
480        if (future instanceof ListenableFuture) {
481          return transform((ListenableFuture<I>) future, function);
482        }
483        checkNotNull(future);
484        checkNotNull(function);
485        return new Future<O>() {
486    
487          /*
488           * Concurrency detail:
489           *
490           * <p>To preserve the idempotency of calls to this.get(*) calls to the
491           * function are only applied once. A lock is required to prevent multiple
492           * applications of the function. The calls to future.get(*) are performed
493           * outside the lock, as is required to prevent calls to
494           * get(long, TimeUnit) to persist beyond their timeout.
495           *
496           * <p>Calls to future.get(*) on every call to this.get(*) also provide
497           * the cancellation behavior for this.
498           *
499           * <p>(Consider: in thread A, call get(), in thread B call get(long,
500           * TimeUnit). Thread B may have to wait for Thread A to finish, which
501           * would be unacceptable.)
502           *
503           * <p>Note that each call to Future<O>.get(*) results in a call to
504           * Future<I>.get(*), but the function is only applied once, so
505           * Future<I>.get(*) is assumed to be idempotent.
506           */
507    
508          private final Object lock = new Object();
509          private boolean set = false;
510          private O value = null;
511          private ExecutionException exception = null;
512    
513          @Override
514          public O get() throws InterruptedException, ExecutionException {
515            return apply(future.get());
516          }
517    
518          @Override
519          public O get(long timeout, TimeUnit unit) throws InterruptedException,
520              ExecutionException, TimeoutException {
521            return apply(future.get(timeout, unit));
522          }
523    
524          private O apply(I raw) throws ExecutionException {
525            synchronized (lock) {
526              if (!set) {
527                try {
528                  value = function.apply(raw);
529                } catch (RuntimeException e) {
530                  exception = new ExecutionException(e);
531                } catch (Error e) {
532                  exception = new ExecutionException(e);
533                }
534                set = true;
535              }
536    
537              if (exception != null) {
538                throw exception;
539              }
540              return value;
541            }
542          }
543    
544          @Override
545          public boolean cancel(boolean mayInterruptIfRunning) {
546            return future.cancel(mayInterruptIfRunning);
547          }
548    
549          @Override
550          public boolean isCancelled() {
551            return future.isCancelled();
552          }
553    
554          @Override
555          public boolean isDone() {
556            return future.isDone();
557          }
558        };
559      }
560    
561      /**
562       * An implementation of {@code ListenableFuture} that also implements
563       * {@code Runnable} so that it can be used to nest ListenableFutures.
564       * Once the passed-in {@code ListenableFuture} is complete, it calls the
565       * passed-in {@code Function} to generate the result.
566       *
567       * <p>If the function throws any checked exceptions, they should be wrapped
568       * in a {@code UndeclaredThrowableException} so that this class can get
569       * access to the cause.
570       */
571      private static class ChainingListenableFuture<I, O>
572          extends AbstractListenableFuture<O> implements Runnable {
573    
574        private Function<? super I, ? extends ListenableFuture<? extends O>>
575            function;
576        private ListenableFuture<? extends I> inputFuture;
577        private volatile ListenableFuture<? extends O> outputFuture;
578        private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
579            new LinkedBlockingQueue<Boolean>(1);
580        private final CountDownLatch outputCreated = new CountDownLatch(1);
581    
582        private ChainingListenableFuture(
583            Function<? super I, ? extends ListenableFuture<? extends O>> function,
584            ListenableFuture<? extends I> inputFuture) {
585          this.function = checkNotNull(function);
586          this.inputFuture = checkNotNull(inputFuture);
587        }
588    
589        /**
590         * Delegate the get() to the input and output futures, in case
591         * their implementations defer starting computation until their
592         * own get() is invoked.
593         */
594        @Override
595        public O get() throws InterruptedException, ExecutionException {
596          if (!isDone()) {
597            // Invoking get on the inputFuture will ensure our own run()
598            // method below is invoked as a listener when inputFuture sets
599            // its value.  Therefore when get() returns we should then see
600            // the outputFuture be created.
601            ListenableFuture<? extends I> inputFuture = this.inputFuture;
602            if (inputFuture != null) {
603              inputFuture.get();
604            }
605    
606            // If our listener was scheduled to run on an executor we may
607            // need to wait for our listener to finish running before the
608            // outputFuture has been constructed by the function.
609            outputCreated.await();
610    
611            // Like above with the inputFuture, we have a listener on
612            // the outputFuture that will set our own value when its
613            // value is set.  Invoking get will ensure the output can
614            // complete and invoke our listener, so that we can later
615            // get the result.
616            ListenableFuture<? extends O> outputFuture = this.outputFuture;
617            if (outputFuture != null) {
618              outputFuture.get();
619            }
620          }
621          return super.get();
622        }
623    
624        /**
625         * Delegate the get() to the input and output futures, in case
626         * their implementations defer starting computation until their
627         * own get() is invoked.
628         */
629        @Override
630        public O get(long timeout, TimeUnit unit) throws TimeoutException,
631            ExecutionException, InterruptedException {
632          if (!isDone()) {
633            // Use a single time unit so we can decrease remaining timeout
634            // as we wait for various phases to complete.
635            if (unit != NANOSECONDS) {
636              timeout = NANOSECONDS.convert(timeout, unit);
637              unit = NANOSECONDS;
638            }
639    
640            // Invoking get on the inputFuture will ensure our own run()
641            // method below is invoked as a listener when inputFuture sets
642            // its value.  Therefore when get() returns we should then see
643            // the outputFuture be created.
644            ListenableFuture<? extends I> inputFuture = this.inputFuture;
645            if (inputFuture != null) {
646              long start = System.nanoTime();
647              inputFuture.get(timeout, unit);
648              timeout -= Math.max(0, System.nanoTime() - start);
649            }
650    
651            // If our listener was scheduled to run on an executor we may
652            // need to wait for our listener to finish running before the
653            // outputFuture has been constructed by the function.
654            long start = System.nanoTime();
655            if (!outputCreated.await(timeout, unit)) {
656              throw new TimeoutException();
657            }
658            timeout -= Math.max(0, System.nanoTime() - start);
659    
660            // Like above with the inputFuture, we have a listener on
661            // the outputFuture that will set our own value when its
662            // value is set.  Invoking get will ensure the output can
663            // complete and invoke our listener, so that we can later
664            // get the result.
665            ListenableFuture<? extends O> outputFuture = this.outputFuture;
666            if (outputFuture != null) {
667              outputFuture.get(timeout, unit);
668            }
669          }
670          return super.get(timeout, unit);
671        }
672    
673        @Override
674        public boolean cancel(boolean mayInterruptIfRunning) {
675          if (cancel()) {
676            try {
677              // This should never block since only one thread is allowed to cancel
678              // this Future.
679              mayInterruptIfRunningChannel.put(mayInterruptIfRunning);
680            } catch (InterruptedException ignored) {
681              Thread.currentThread().interrupt();
682            }
683            cancel(inputFuture, mayInterruptIfRunning);
684            cancel(outputFuture, mayInterruptIfRunning);
685            return true;
686          }
687          return false;
688        }
689    
690        private void cancel(@Nullable Future<?> future,
691            boolean mayInterruptIfRunning) {
692          if (future != null) {
693            future.cancel(mayInterruptIfRunning);
694          }
695        }
696    
697        @Override
698        public void run() {
699          try {
700            I sourceResult;
701            try {
702              sourceResult = makeUninterruptible(inputFuture).get();
703            } catch (CancellationException e) {
704              // Cancel this future and return.
705              cancel();
706              return;
707            } catch (ExecutionException e) {
708              // Set the cause of the exception as this future's exception
709              setException(e.getCause());
710              return;
711            }
712    
713            final ListenableFuture<? extends O> outputFuture = this.outputFuture =
714                function.apply(sourceResult);
715            if (isCancelled()) {
716              // Handles the case where cancel was called while the function was
717              // being applied.
718              try {
719                // There is a gap in cancel(boolean) between calling cancel() and
720                // storing the value of mayInterruptIfRunning, so this thread needs
721                // to block, waiting for that value.
722                outputFuture.cancel(mayInterruptIfRunningChannel.take());
723              } catch (InterruptedException ignored) {
724                Thread.currentThread().interrupt();
725              }
726              this.outputFuture = null;
727              return;
728            }
729            outputFuture.addListener(new Runnable() {
730                @Override
731                public void run() {
732                  try {
733                    // Here it would have been nice to have had an
734                    // UninterruptibleListenableFuture, but we don't want to start a
735                    // combinatorial explosion of interfaces, so we have to make do.
736                    set(makeUninterruptible(outputFuture).get());
737                  } catch (CancellationException e) {
738                    // Cancel this future and return.
739                    cancel();
740                    return;
741                  } catch (ExecutionException e) {
742                    // Set the cause of the exception as this future's exception
743                    setException(e.getCause());
744                  } finally {
745                    // Don't pin inputs beyond completion
746                    ChainingListenableFuture.this.outputFuture = null;
747                  }
748                }
749              }, MoreExecutors.sameThreadExecutor());
750          } catch (UndeclaredThrowableException e) {
751            // Set the cause of the exception as this future's exception
752            setException(e.getCause());
753          } catch (RuntimeException e) {
754            // This exception is irrelevant in this thread, but useful for the
755            // client
756            setException(e);
757          } catch (Error e) {
758            // Propagate errors up ASAP - our superclass will rethrow the error
759            setException(e);
760          } finally {
761            // Don't pin inputs beyond completion
762            function = null;
763            inputFuture = null;
764            // Allow our get routines to examine outputFuture now.
765            outputCreated.countDown();
766          }
767        }
768      }
769    
770      /**
771       * A checked future that uses a function to map from exceptions to the
772       * appropriate checked type.
773       */
774      private static class MappingCheckedFuture<V, X extends Exception> extends
775          AbstractCheckedFuture<V, X> {
776    
777        final Function<Exception, X> mapper;
778    
779        MappingCheckedFuture(ListenableFuture<V> delegate,
780            Function<Exception, X> mapper) {
781          super(delegate);
782    
783          this.mapper = checkNotNull(mapper);
784        }
785    
786        @Override
787        protected X mapException(Exception e) {
788          return mapper.apply(e);
789        }
790      }
791    
792      /**
793       * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
794       * will wait on the future to finish, and when it completes, run the
795       * listeners.  This implementation will wait on the source future
796       * indefinitely, so if the source future never completes, the adapter will
797       * never complete either.
798       *
799       * <p>If the delegate future is interrupted or throws an unexpected unchecked
800       * exception, the listeners will not be invoked.
801       */
802      private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
803          implements ListenableFuture<V> {
804    
805        private static final ThreadFactory threadFactory =
806            new ThreadFactoryBuilder()
807                .setNameFormat("ListenableFutureAdapter-thread-%d")
808                .build();
809        private static final Executor defaultAdapterExecutor =
810            Executors.newCachedThreadPool(threadFactory);
811    
812        private final Executor adapterExecutor;
813    
814        // The execution list to hold our listeners.
815        private final ExecutionList executionList = new ExecutionList();
816    
817        // This allows us to only start up a thread waiting on the delegate future
818        // when the first listener is added.
819        private final AtomicBoolean hasListeners = new AtomicBoolean(false);
820    
821        // The delegate future.
822        private final Future<V> delegate;
823    
824        ListenableFutureAdapter(Future<V> delegate) {
825          this(delegate, defaultAdapterExecutor);
826        }
827    
828        ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
829          this.delegate = checkNotNull(delegate);
830          this.adapterExecutor = checkNotNull(adapterExecutor);
831        }
832    
833        @Override
834        protected Future<V> delegate() {
835          return delegate;
836        }
837    
838        @Override
839        public void addListener(Runnable listener, Executor exec) {
840          executionList.add(listener, exec);
841    
842          // When a listener is first added, we run a task that will wait for
843          // the delegate to finish, and when it is done will run the listeners.
844          if (hasListeners.compareAndSet(false, true)) {
845            if (delegate.isDone()) {
846              // If the delegate is already done, run the execution list
847              // immediately on the current thread.
848              executionList.run();
849              return;
850            }
851    
852            adapterExecutor.execute(new Runnable() {
853              @Override
854              public void run() {
855                try {
856                  delegate.get();
857                } catch (Error e) {
858                  throw e;
859                } catch (InterruptedException e) {
860                  // This thread was interrupted.  This should never happen, so we
861                  // throw an IllegalStateException.
862                  Thread.currentThread().interrupt();
863                  throw new IllegalStateException("Adapter thread interrupted!", e);
864                } catch (Throwable e) {
865                  // ExecutionException / CancellationException / RuntimeException
866                  // The task is done, run the listeners.
867                }
868                executionList.run();
869              }
870            });
871          }
872        }
873      }
874    }