001    /*
002     * Copyright (C) 2006 Google Inc.
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    import static java.util.concurrent.TimeUnit.NANOSECONDS;
021    
022    import com.google.common.annotations.Beta;
023    import com.google.common.base.Function;
024    
025    import java.lang.reflect.UndeclaredThrowableException;
026    import java.util.concurrent.BlockingQueue;
027    import java.util.concurrent.CancellationException;
028    import java.util.concurrent.CountDownLatch;
029    import java.util.concurrent.ExecutionException;
030    import java.util.concurrent.Executor;
031    import java.util.concurrent.Executors;
032    import java.util.concurrent.Future;
033    import java.util.concurrent.LinkedBlockingQueue;
034    import java.util.concurrent.ThreadFactory;
035    import java.util.concurrent.TimeUnit;
036    import java.util.concurrent.TimeoutException;
037    import java.util.concurrent.atomic.AtomicBoolean;
038    
039    import javax.annotation.Nullable;
040    
041    /**
042     * Static utility methods pertaining to the {@link Future} interface.
043     *
044     * @author Kevin Bourrillion
045     * @author Nishant Thakkar
046     * @author Sven Mawson
047     * @since 1
048     */
049    @Beta
050    public final class Futures {
051      private Futures() {}
052    
053      /**
054       * Returns an uninterruptible view of a {@code Future}. If a thread is
055       * interrupted during an attempt to {@code get()} from the returned future, it
056       * continues to wait on the result until it is available or the timeout
057       * elapses, and only then re-interrupts the thread.
058       */
059      public static <V> UninterruptibleFuture<V> makeUninterruptible(
060          final Future<V> future) {
061        checkNotNull(future);
062        if (future instanceof UninterruptibleFuture<?>) {
063          return (UninterruptibleFuture<V>) future;
064        }
065        return new UninterruptibleFuture<V>() {
066          public boolean cancel(boolean mayInterruptIfRunning) {
067            return future.cancel(mayInterruptIfRunning);
068          }
069          public boolean isCancelled() {
070            return future.isCancelled();
071          }
072          public boolean isDone() {
073            return future.isDone();
074          }
075    
076          public V get(long originalTimeout, TimeUnit originalUnit)
077              throws TimeoutException, ExecutionException {
078            boolean interrupted = false;
079            try {
080              long end = System.nanoTime() + originalUnit.toNanos(originalTimeout);
081              while (true) {
082                try {
083                  // Future treats negative timeouts just like zero.
084                  return future.get(end - System.nanoTime(), NANOSECONDS);
085                } catch (InterruptedException e) {
086                  interrupted = true;
087                }
088              }
089            } finally {
090              if (interrupted) {
091                Thread.currentThread().interrupt();
092              }
093            }
094          }
095    
096          public V get() throws ExecutionException {
097            boolean interrupted = false;
098            try {
099              while (true) {
100                try {
101                  return future.get();
102                } catch (InterruptedException ignored) {
103                  interrupted = true;
104                }
105              }
106            } finally {
107              if (interrupted) {
108                Thread.currentThread().interrupt();
109              }
110            }
111          }
112        };
113      }
114    
115      /**
116       * Creates a {@link ListenableFuture} out of a normal {@link Future}. The
117       * returned future will create a thread to wait for the source future to
118       * complete before executing the listeners.
119       *
120       * <p>Callers who have a future that subclasses
121       * {@link java.util.concurrent.FutureTask} may want to instead subclass
122       * {@link ListenableFutureTask}, which adds the {@link ListenableFuture}
123       * functionality to the standard {@code FutureTask} implementation.
124       */
125      public static <V> ListenableFuture<V> makeListenable(Future<V> future) {
126        if (future instanceof ListenableFuture<?>) {
127          return (ListenableFuture<V>) future;
128        }
129        return new ListenableFutureAdapter<V>(future);
130      }
131    
132      /**
133       * Creates a {@link ListenableFuture} out of a normal {@link Future} and uses
134       * the given {@link Executor} to get the value of the Future. The
135       * returned future will create a thread using the given executor to wait for
136       * the source future to complete before executing the listeners.
137       *
138       * <p>Callers who have a future that subclasses
139       * {@link java.util.concurrent.FutureTask} may want to instead subclass
140       * {@link ListenableFutureTask}, which adds the {@link ListenableFuture}
141       * functionality to the standard {@code FutureTask} implementation.
142       */
143      static <V> ListenableFuture<V> makeListenable(
144          Future<V> future, Executor executor) {
145        checkNotNull(executor);
146        if (future instanceof ListenableFuture<?>) {
147          return (ListenableFuture<V>) future;
148        }
149        return new ListenableFutureAdapter<V>(future, executor);
150      }
151    
152      /**
153       * Creates a {@link CheckedFuture} out of a normal {@link Future} and a
154       * {@link Function} that maps from {@link Exception} instances into the
155       * appropriate checked type.
156       *
157       * <p>The given mapping function will be applied to an
158       * {@link InterruptedException}, a {@link CancellationException}, or an
159       * {@link ExecutionException} with the actual cause of the exception.
160       * See {@link Future#get()} for details on the exceptions thrown.
161       */
162      public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
163          Future<V> future, Function<Exception, X> mapper) {
164        return new MappingCheckedFuture<V, X>(makeListenable(future), mapper);
165      }
166    
167      /**
168       * Creates a {@code ListenableFuture} which has its value set immediately upon
169       * construction. The getters just return the value. This {@code Future} can't
170       * be canceled or timed out and its {@code isDone()} method always returns
171       * {@code true}. It's useful for returning something that implements the
172       * {@code ListenableFuture} interface but already has the result.
173       */
174      public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
175        ValueFuture<V> future = ValueFuture.create();
176        future.set(value);
177        return future;
178      }
179    
180      /**
181       * Returns a {@code CheckedFuture} which has its value set immediately upon
182       * construction.
183       *
184       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
185       * method always returns {@code true}. Calling {@code get()} or {@code
186       * checkedGet()} will immediately return the provided value.
187       */
188      public static <V, X extends Exception> CheckedFuture<V, X>
189          immediateCheckedFuture(@Nullable V value) {
190        ValueFuture<V> future = ValueFuture.create();
191        future.set(value);
192        return Futures.makeChecked(future, new Function<Exception, X>() {
193          public X apply(Exception e) {
194            throw new AssertionError("impossible");
195          }
196        });
197      }
198    
199      /**
200       * Returns a {@code ListenableFuture} which has an exception set immediately
201       * upon construction.
202       *
203       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
204       * method always returns {@code true}. Calling {@code get()} will immediately
205       * throw the provided {@code Throwable} wrapped in an {@code
206       * ExecutionException}.
207       *
208       * @throws Error if the throwable is an {@link Error}.
209       */
210      public static <V> ListenableFuture<V> immediateFailedFuture(
211          Throwable throwable) {
212        checkNotNull(throwable);
213        ValueFuture<V> future = ValueFuture.create();
214        future.setException(throwable);
215        return future;
216      }
217    
218      /**
219       * Returns a {@code CheckedFuture} which has an exception set immediately upon
220       * construction.
221       *
222       * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
223       * method always returns {@code true}. Calling {@code get()} will immediately
224       * throw the provided {@code Throwable} wrapped in an {@code
225       * ExecutionException}, and calling {@code checkedGet()} will throw the
226       * provided exception itself.
227       *
228       * @throws Error if the throwable is an {@link Error}.
229       */
230      public static <V, X extends Exception> CheckedFuture<V, X>
231          immediateFailedCheckedFuture(final X exception) {
232        checkNotNull(exception);
233        return makeChecked(Futures.<V>immediateFailedFuture(exception),
234            new Function<Exception, X>() {
235              public X apply(Exception e) {
236                return exception;
237              }
238            });
239      }
240    
241      /**
242       * Returns a new {@code ListenableFuture} whose result is asynchronously
243       * derived from the result of the given {@code Future}. More precisely, the
244       * returned {@code Future} takes its result from a {@code Future} produced by
245       * applying the given {@code Function} to the result of the original {@code
246       * Future}.
247       *
248       * <p>Successful cancellation of either the input future or the result of
249       * function application will cause the returned future to be cancelled.
250       * Cancelling the returned future will succeed if it is currently running.
251       * In this case, attempts will be made to cancel the input future and the
252       * result of the function, however there is no guarantee of success.
253       *
254       * <p>TODO: Add a version that accepts a normal {@code Future}
255       *
256       * <p>The typical use for this method would be when a RPC call is dependent on
257       * the results of another RPC.  One would call the first RPC (input), create a
258       * function that calls another RPC based on input's result, and then call
259       * chain on input and that function to get a {@code ListenableFuture} of
260       * the result.
261       *
262       * @param input The future to chain
263       * @param function A function to chain the results of the provided future
264       *     to the results of the returned future.  This will be run in the thread
265       *     that notifies input it is complete.
266       * @return A future that holds result of the chain.
267       */
268      public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
269          Function<? super I, ? extends ListenableFuture<? extends O>> function) {
270        return chain(input, function, MoreExecutors.sameThreadExecutor());
271      }
272    
273      /**
274       * Returns a new {@code ListenableFuture} whose result is asynchronously
275       * derived from the result of the given {@code Future}. More precisely, the
276       * returned {@code Future} takes its result from a {@code Future} produced by
277       * applying the given {@code Function} to the result of the original {@code
278       * Future}.
279       *
280       * <p>Successful cancellation of either the input future or the result of
281       * function application will cause the returned future to be cancelled.
282       * Cancelling the returned future will succeed if it is currently running.
283       * In this case, attempts will be made to cancel the input future and the
284       * result of the function, however there is no guarantee of success.
285       *
286       * <p>This version allows an arbitrary executor to be passed in for running
287       * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
288       * the thread chained Function executes in will be whichever thread set the
289       * result of the input Future, which may be the network thread in the case of
290       * RPC-based Futures.
291       *
292       * @param input The future to chain
293       * @param function A function to chain the results of the provided future
294       *     to the results of the returned future.
295       * @param exec Executor to run the function in.
296       * @return A future that holds result of the chain.
297       */
298      public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
299          Function<? super I, ? extends ListenableFuture<? extends O>> function,
300          Executor exec) {
301        ChainingListenableFuture<I, O> chain =
302            new ChainingListenableFuture<I, O>(function, input);
303        input.addListener(chain, exec);
304        return chain;
305      }
306    
307      /**
308       * Returns a new {@code ListenableFuture} whose result is the product of
309       * applying the given {@code Function} to the result of the given {@code
310       * Future}.
311       *
312       * <p>Successful cancellation of the input future will cause the returned
313       * future to be cancelled.  Cancelling the returned future will succeed if it
314       * is currently running.  In this case, an attempt will be made to cancel the
315       * input future, however there is no guarantee of success.
316       *
317       * <p>An example use of this method is to convert a serializable object
318       * returned from an RPC into a POJO.
319       *
320       * @param future The future to compose
321       * @param function A Function to compose the results of the provided future
322       *     to the results of the returned future.  This will be run in the thread
323       *     that notifies input it is complete.
324       * @return A future that holds result of the composition.
325       */
326      public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future,
327          final Function<? super I, ? extends O> function) {
328        return compose(future, function, MoreExecutors.sameThreadExecutor());
329      }
330    
331      /**
332       * Returns a new {@code ListenableFuture} whose result is the product of
333       * applying the given {@code Function} to the result of the given {@code
334       * Future}.
335       *
336       * <p>Successful cancellation of the input future will cause the returned
337       * future to be cancelled.  Cancelling the returned future will succeed if it
338       * is currently running.  In this case, an attempt will be made to cancel the
339       * input future, however there is no guarantee of success.
340       *
341       * <p>An example use of this method is to convert a serializable object
342       * returned from an RPC into a POJO.
343       *
344       * <p>This version allows an arbitrary executor to be passed in for running
345       * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
346       * the thread chained Function executes in will be whichever thread set the
347       * result of the input Future, which may be the network thread in the case of
348       * RPC-based Futures.
349       *
350       * @param future The future to compose
351       * @param function A Function to compose the results of the provided future
352       *     to the results of the returned future.
353       * @param exec Executor to run the function in.
354       * @return A future that holds result of the composition.
355       * @since 2
356       */
357      public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future,
358          final Function<? super I, ? extends O> function, Executor exec) {
359        checkNotNull(function);
360        Function<I, ListenableFuture<O>> wrapperFunction
361            = new Function<I, ListenableFuture<O>>() {
362                @Override public ListenableFuture<O> apply(I input) {
363                  O output = function.apply(input);
364                  return immediateFuture(output);
365                }
366            };
367        return chain(future, wrapperFunction, exec);
368      }
369    
370      /**
371       * Returns a new {@code Future} whose result is the product of applying the
372       * given {@code Function} to the result of the given {@code Future}.
373       *
374       * <p>An example use of this method is to convert a Future that produces a
375       * handle to an object to a future that produces the object itself.
376       *
377       * <p>Each call to {@code Future<O>.get(*)} results in a call to
378       * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it
379       * is assumed that {@code Future<I>.get(*)} is idempotent.
380       *
381       * <p>When calling {@link Future#get(long, TimeUnit)} on the returned
382       * future, the timeout only applies to the future passed in to this method.
383       * Any additional time taken by applying {@code function} is not considered.
384       * (Exception: If the input future is a {@link ListenableFuture}, timeouts
385       * will be strictly enforced.)
386       *
387       * @param future The future to compose
388       * @param function A Function to compose the results of the provided future
389       *     to the results of the returned future.  This will be run in the thread
390       *     that calls one of the varieties of {@code get()}.
391       * @return A future that computes result of the composition.
392       */
393      public static <I, O> Future<O> compose(final Future<I> future,
394          final Function<? super I, ? extends O> function) {
395        if (future instanceof ListenableFuture) {
396          return compose((ListenableFuture<I>) future, function);
397        }
398        checkNotNull(future);
399        checkNotNull(function);
400        return new Future<O>() {
401    
402          /*
403           * Concurrency detail:
404           *
405           * <p>To preserve the idempotency of calls to this.get(*) calls to the
406           * function are only applied once. A lock is required to prevent multiple
407           * applications of the function. The calls to future.get(*) are performed
408           * outside the lock, as is required to prevent calls to
409           * get(long, TimeUnit) to persist beyond their timeout.
410           *
411           * <p>Calls to future.get(*) on every call to this.get(*) also provide
412           * the cancellation behavior for this.
413           *
414           * <p>(Consider: in thread A, call get(), in thread B call get(long,
415           * TimeUnit). Thread B may have to wait for Thread A to finish, which
416           * would be unacceptable.)
417           *
418           * <p>Note that each call to Future<O>.get(*) results in a call to
419           * Future<I>.get(*), but the function is only applied once, so
420           * Future<I>.get(*) is assumed to be idempotent.
421           */
422    
423          private final Object lock = new Object();
424          private boolean set = false;
425          private O value = null;
426          private ExecutionException exception = null;
427    
428          @Override
429          public O get() throws InterruptedException, ExecutionException {
430            return apply(future.get());
431          }
432    
433          @Override
434          public O get(long timeout, TimeUnit unit) throws InterruptedException,
435              ExecutionException, TimeoutException {
436            return apply(future.get(timeout, unit));
437          }
438    
439          private O apply(I raw) throws ExecutionException {
440            synchronized (lock) {
441              if (!set) {
442                try {
443                  value = function.apply(raw);
444                } catch (RuntimeException e) {
445                  exception = new ExecutionException(e);
446                } catch (Error e) {
447                  exception = new ExecutionException(e);
448                }
449                set = true;
450              }
451    
452              if (exception != null) {
453                throw exception;
454              }
455              return value;
456            }
457          }
458    
459          @Override
460          public boolean cancel(boolean mayInterruptIfRunning) {
461            return future.cancel(mayInterruptIfRunning);
462          }
463    
464          @Override
465          public boolean isCancelled() {
466            return future.isCancelled();
467          }
468    
469          @Override
470          public boolean isDone() {
471            return future.isDone();
472          }
473        };
474      }
475    
476      /**
477       * An implementation of {@code ListenableFuture} that also implements
478       * {@code Runnable} so that it can be used to nest ListenableFutures.
479       * Once the passed-in {@code ListenableFuture} is complete, it calls the
480       * passed-in {@code Function} to generate the result.
481       *
482       * <p>If the function throws any checked exceptions, they should be wrapped
483       * in a {@code UndeclaredThrowableException} so that this class can get
484       * access to the cause.
485       */
486      private static class ChainingListenableFuture<I, O>
487          extends AbstractListenableFuture<O> implements Runnable {
488    
489        private Function<? super I, ? extends ListenableFuture<? extends O>>
490            function;
491        private ListenableFuture<? extends I> inputFuture;
492        private volatile ListenableFuture<? extends O> outputFuture;
493        private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
494            new LinkedBlockingQueue<Boolean>(1);
495        private final CountDownLatch outputCreated = new CountDownLatch(1);
496    
497        private ChainingListenableFuture(
498            Function<? super I, ? extends ListenableFuture<? extends O>> function,
499            ListenableFuture<? extends I> inputFuture) {
500          this.function = checkNotNull(function);
501          this.inputFuture = checkNotNull(inputFuture);
502        }
503    
504        /**
505         * Delegate the get() to the input and output futures, in case
506         * their implementations defer starting computation until their
507         * own get() is invoked.
508         */
509        @Override
510        public O get() throws InterruptedException, ExecutionException {
511          if (!isDone()) {
512            // Invoking get on the inputFuture will ensure our own run()
513            // method below is invoked as a listener when inputFuture sets
514            // its value.  Therefore when get() returns we should then see
515            // the outputFuture be created.
516            ListenableFuture<? extends I> inputFuture = this.inputFuture;
517            if (inputFuture != null) {
518              inputFuture.get();
519            }
520    
521            // If our listener was scheduled to run on an executor we may
522            // need to wait for our listener to finish running before the
523            // outputFuture has been constructed by the function.
524            outputCreated.await();
525    
526            // Like above with the inputFuture, we have a listener on
527            // the outputFuture that will set our own value when its
528            // value is set.  Invoking get will ensure the output can
529            // complete and invoke our listener, so that we can later
530            // get the result.
531            ListenableFuture<? extends O> outputFuture = this.outputFuture;
532            if (outputFuture != null) {
533              outputFuture.get();
534            }
535          }
536          return super.get();
537        }
538    
539        /**
540         * Delegate the get() to the input and output futures, in case
541         * their implementations defer starting computation until their
542         * own get() is invoked.
543         */
544        @Override
545        public O get(long timeout, TimeUnit unit) throws TimeoutException,
546            ExecutionException, InterruptedException {
547          if (!isDone()) {
548            // Use a single time unit so we can decrease remaining timeout
549            // as we wait for various phases to complete.
550            if (unit != NANOSECONDS) {
551              timeout = NANOSECONDS.convert(timeout, unit);
552              unit = NANOSECONDS;
553            }
554    
555            // Invoking get on the inputFuture will ensure our own run()
556            // method below is invoked as a listener when inputFuture sets
557            // its value.  Therefore when get() returns we should then see
558            // the outputFuture be created.
559            ListenableFuture<? extends I> inputFuture = this.inputFuture;
560            if (inputFuture != null) {
561              long start = System.nanoTime();
562              inputFuture.get(timeout, unit);
563              timeout -= Math.max(0, System.nanoTime() - start);
564            }
565    
566            // If our listener was scheduled to run on an executor we may
567            // need to wait for our listener to finish running before the
568            // outputFuture has been constructed by the function.
569            long start = System.nanoTime();
570            if (!outputCreated.await(timeout, unit)) {
571              throw new TimeoutException();
572            }
573            timeout -= Math.max(0, System.nanoTime() - start);
574    
575            // Like above with the inputFuture, we have a listener on
576            // the outputFuture that will set our own value when its
577            // value is set.  Invoking get will ensure the output can
578            // complete and invoke our listener, so that we can later
579            // get the result.
580            ListenableFuture<? extends O> outputFuture = this.outputFuture;
581            if (outputFuture != null) {
582              outputFuture.get(timeout, unit);
583            }
584          }
585          return super.get(timeout, unit);
586        }
587    
588        @Override
589        public boolean cancel(boolean mayInterruptIfRunning) {
590          if (cancel()) {
591            try {
592              // This should never block since only one thread is allowed to cancel
593              // this Future.
594              mayInterruptIfRunningChannel.put(mayInterruptIfRunning);
595            } catch (InterruptedException ignored) {
596              Thread.currentThread().interrupt();
597            }
598            cancel(inputFuture, mayInterruptIfRunning);
599            cancel(outputFuture, mayInterruptIfRunning);
600            return true;
601          }
602          return false;
603        }
604    
605        private void cancel(@Nullable Future<?> future,
606            boolean mayInterruptIfRunning) {
607          if (future != null) {
608            future.cancel(mayInterruptIfRunning);
609          }
610        }
611    
612        public void run() {
613          try {
614            I sourceResult;
615            try {
616              sourceResult = makeUninterruptible(inputFuture).get();
617            } catch (CancellationException e) {
618              // Cancel this future and return.
619              cancel();
620              return;
621            } catch (ExecutionException e) {
622              // Set the cause of the exception as this future's exception
623              setException(e.getCause());
624              return;
625            }
626    
627            final ListenableFuture<? extends O> outputFuture = this.outputFuture =
628                function.apply(sourceResult);
629            if (isCancelled()) {
630              // Handles the case where cancel was called while the function was
631              // being applied.
632              try {
633                // There is a gap in cancel(boolean) between calling cancel() and
634                // storing the value of mayInterruptIfRunning, so this thread needs
635                // to block, waiting for that value.
636                outputFuture.cancel(mayInterruptIfRunningChannel.take());
637              } catch (InterruptedException ignored) {
638                Thread.currentThread().interrupt();
639              }
640              this.outputFuture = null;
641              return;
642            }
643            outputFuture.addListener(new Runnable() {
644                public void run() {
645                  try {
646                    // Here it would have been nice to have had an
647                    // UninterruptibleListenableFuture, but we don't want to start a
648                    // combinatorial explosion of interfaces, so we have to make do.
649                    set(makeUninterruptible(outputFuture).get());
650                  } catch (CancellationException e) {
651                    // Cancel this future and return.
652                    cancel();
653                    return;
654                  } catch (ExecutionException e) {
655                    // Set the cause of the exception as this future's exception
656                    setException(e.getCause());
657                  } finally {
658                    // Don't pin inputs beyond completion
659                    ChainingListenableFuture.this.outputFuture = null;
660                  }
661                }
662              }, MoreExecutors.sameThreadExecutor());
663          } catch (UndeclaredThrowableException e) {
664            // Set the cause of the exception as this future's exception
665            setException(e.getCause());
666          } catch (RuntimeException e) {
667            // This exception is irrelevant in this thread, but useful for the
668            // client
669            setException(e);
670          } catch (Error e) {
671            // Propagate errors up ASAP - our superclass will rethrow the error
672            setException(e);
673          } finally {
674            // Don't pin inputs beyond completion
675            function = null;
676            inputFuture = null;
677            // Allow our get routines to examine outputFuture now.
678            outputCreated.countDown();
679          }
680        }
681      }
682    
683      /**
684       * A checked future that uses a function to map from exceptions to the
685       * appropriate checked type.
686       */
687      private static class MappingCheckedFuture<V, X extends Exception> extends
688          AbstractCheckedFuture<V, X> {
689    
690        final Function<Exception, X> mapper;
691    
692        MappingCheckedFuture(ListenableFuture<V> delegate,
693            Function<Exception, X> mapper) {
694          super(delegate);
695    
696          this.mapper = checkNotNull(mapper);
697        }
698    
699        @Override
700        protected X mapException(Exception e) {
701          return mapper.apply(e);
702        }
703      }
704    
705      /**
706       * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
707       * will wait on the future to finish, and when it completes, run the
708       * listeners.  This implementation will wait on the source future
709       * indefinitely, so if the source future never completes, the adapter will
710       * never complete either.
711       *
712       * <p>If the delegate future is interrupted or throws an unexpected unchecked
713       * exception, the listeners will not be invoked.
714       */
715      private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
716          implements ListenableFuture<V> {
717    
718        private static final ThreadFactory threadFactory =
719            new ThreadFactoryBuilder()
720                .setNameFormat("ListenableFutureAdapter-thread-%d")
721                .build();
722        private static final Executor defaultAdapterExecutor =
723            Executors.newCachedThreadPool(threadFactory);
724    
725        private final Executor adapterExecutor;
726    
727        // The execution list to hold our listeners.
728        private final ExecutionList executionList = new ExecutionList();
729    
730        // This allows us to only start up a thread waiting on the delegate future
731        // when the first listener is added.
732        private final AtomicBoolean hasListeners = new AtomicBoolean(false);
733    
734        // The delegate future.
735        private final Future<V> delegate;
736    
737        ListenableFutureAdapter(Future<V> delegate) {
738          this(delegate, defaultAdapterExecutor);
739        }
740    
741        ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
742          this.delegate = checkNotNull(delegate);
743          this.adapterExecutor = checkNotNull(adapterExecutor);
744        }
745    
746        @Override
747        protected Future<V> delegate() {
748          return delegate;
749        }
750    
751        @Override
752        public void addListener(Runnable listener, Executor exec) {
753          executionList.add(listener, exec);
754    
755          // When a listener is first added, we run a task that will wait for
756          // the delegate to finish, and when it is done will run the listeners.
757          if (hasListeners.compareAndSet(false, true)) {
758            if (delegate.isDone()) {
759              // If the delegate is already done, run the execution list
760              // immediately on the current thread.
761              executionList.run();
762              return;
763            }
764    
765            adapterExecutor.execute(new Runnable() {
766              @Override
767              public void run() {
768                try {
769                  delegate.get();
770                } catch (Error e) {
771                  throw e;
772                } catch (InterruptedException e) {
773                  // This thread was interrupted.  This should never happen, so we
774                  // throw an IllegalStateException.
775                  Thread.currentThread().interrupt();
776                  throw new IllegalStateException("Adapter thread interrupted!", e);
777                } catch (Throwable e) {
778                  // ExecutionException / CancellationException / RuntimeException
779                  // The task is done, run the listeners.
780                }
781                executionList.run();
782              }
783            });
784          }
785        }
786      }
787    }