001 /*
002 * Copyright (C) 2006 Google Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.util.concurrent;
018
019 import static com.google.common.base.Preconditions.checkNotNull;
020 import static java.util.concurrent.TimeUnit.NANOSECONDS;
021
022 import com.google.common.annotations.Beta;
023 import com.google.common.base.Function;
024
025 import java.lang.reflect.UndeclaredThrowableException;
026 import java.util.concurrent.BlockingQueue;
027 import java.util.concurrent.CancellationException;
028 import java.util.concurrent.CountDownLatch;
029 import java.util.concurrent.ExecutionException;
030 import java.util.concurrent.Executor;
031 import java.util.concurrent.Executors;
032 import java.util.concurrent.Future;
033 import java.util.concurrent.LinkedBlockingQueue;
034 import java.util.concurrent.ThreadFactory;
035 import java.util.concurrent.TimeUnit;
036 import java.util.concurrent.TimeoutException;
037 import java.util.concurrent.atomic.AtomicBoolean;
038
039 import javax.annotation.Nullable;
040
041 /**
042 * Static utility methods pertaining to the {@link Future} interface.
043 *
044 * @author Kevin Bourrillion
045 * @author Nishant Thakkar
046 * @author Sven Mawson
047 * @since 1
048 */
049 @Beta
050 public final class Futures {
051 private Futures() {}
052
053 /**
054 * Returns an uninterruptible view of a {@code Future}. If a thread is
055 * interrupted during an attempt to {@code get()} from the returned future, it
056 * continues to wait on the result until it is available or the timeout
057 * elapses, and only then re-interrupts the thread.
058 */
059 public static <V> UninterruptibleFuture<V> makeUninterruptible(
060 final Future<V> future) {
061 checkNotNull(future);
062 if (future instanceof UninterruptibleFuture<?>) {
063 return (UninterruptibleFuture<V>) future;
064 }
065 return new UninterruptibleFuture<V>() {
066 public boolean cancel(boolean mayInterruptIfRunning) {
067 return future.cancel(mayInterruptIfRunning);
068 }
069 public boolean isCancelled() {
070 return future.isCancelled();
071 }
072 public boolean isDone() {
073 return future.isDone();
074 }
075
076 public V get(long originalTimeout, TimeUnit originalUnit)
077 throws TimeoutException, ExecutionException {
078 boolean interrupted = false;
079 try {
080 long end = System.nanoTime() + originalUnit.toNanos(originalTimeout);
081 while (true) {
082 try {
083 // Future treats negative timeouts just like zero.
084 return future.get(end - System.nanoTime(), NANOSECONDS);
085 } catch (InterruptedException e) {
086 interrupted = true;
087 }
088 }
089 } finally {
090 if (interrupted) {
091 Thread.currentThread().interrupt();
092 }
093 }
094 }
095
096 public V get() throws ExecutionException {
097 boolean interrupted = false;
098 try {
099 while (true) {
100 try {
101 return future.get();
102 } catch (InterruptedException ignored) {
103 interrupted = true;
104 }
105 }
106 } finally {
107 if (interrupted) {
108 Thread.currentThread().interrupt();
109 }
110 }
111 }
112 };
113 }
114
115 /**
116 * Creates a {@link ListenableFuture} out of a normal {@link Future}. The
117 * returned future will create a thread to wait for the source future to
118 * complete before executing the listeners.
119 *
120 * <p>Callers who have a future that subclasses
121 * {@link java.util.concurrent.FutureTask} may want to instead subclass
122 * {@link ListenableFutureTask}, which adds the {@link ListenableFuture}
123 * functionality to the standard {@code FutureTask} implementation.
124 */
125 public static <V> ListenableFuture<V> makeListenable(Future<V> future) {
126 if (future instanceof ListenableFuture<?>) {
127 return (ListenableFuture<V>) future;
128 }
129 return new ListenableFutureAdapter<V>(future);
130 }
131
132 /**
133 * Creates a {@link ListenableFuture} out of a normal {@link Future} and uses
134 * the given {@link Executor} to get the value of the Future. The
135 * returned future will create a thread using the given executor to wait for
136 * the source future to complete before executing the listeners.
137 *
138 * <p>Callers who have a future that subclasses
139 * {@link java.util.concurrent.FutureTask} may want to instead subclass
140 * {@link ListenableFutureTask}, which adds the {@link ListenableFuture}
141 * functionality to the standard {@code FutureTask} implementation.
142 */
143 static <V> ListenableFuture<V> makeListenable(
144 Future<V> future, Executor executor) {
145 checkNotNull(executor);
146 if (future instanceof ListenableFuture<?>) {
147 return (ListenableFuture<V>) future;
148 }
149 return new ListenableFutureAdapter<V>(future, executor);
150 }
151
152 /**
153 * Creates a {@link CheckedFuture} out of a normal {@link Future} and a
154 * {@link Function} that maps from {@link Exception} instances into the
155 * appropriate checked type.
156 *
157 * <p>The given mapping function will be applied to an
158 * {@link InterruptedException}, a {@link CancellationException}, or an
159 * {@link ExecutionException} with the actual cause of the exception.
160 * See {@link Future#get()} for details on the exceptions thrown.
161 */
162 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
163 Future<V> future, Function<Exception, X> mapper) {
164 return new MappingCheckedFuture<V, X>(makeListenable(future), mapper);
165 }
166
167 /**
168 * Creates a {@code ListenableFuture} which has its value set immediately upon
169 * construction. The getters just return the value. This {@code Future} can't
170 * be canceled or timed out and its {@code isDone()} method always returns
171 * {@code true}. It's useful for returning something that implements the
172 * {@code ListenableFuture} interface but already has the result.
173 */
174 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
175 ValueFuture<V> future = ValueFuture.create();
176 future.set(value);
177 return future;
178 }
179
180 /**
181 * Returns a {@code CheckedFuture} which has its value set immediately upon
182 * construction.
183 *
184 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
185 * method always returns {@code true}. Calling {@code get()} or {@code
186 * checkedGet()} will immediately return the provided value.
187 */
188 public static <V, X extends Exception> CheckedFuture<V, X>
189 immediateCheckedFuture(@Nullable V value) {
190 ValueFuture<V> future = ValueFuture.create();
191 future.set(value);
192 return Futures.makeChecked(future, new Function<Exception, X>() {
193 public X apply(Exception e) {
194 throw new AssertionError("impossible");
195 }
196 });
197 }
198
199 /**
200 * Returns a {@code ListenableFuture} which has an exception set immediately
201 * upon construction.
202 *
203 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
204 * method always returns {@code true}. Calling {@code get()} will immediately
205 * throw the provided {@code Throwable} wrapped in an {@code
206 * ExecutionException}.
207 *
208 * @throws Error if the throwable is an {@link Error}.
209 */
210 public static <V> ListenableFuture<V> immediateFailedFuture(
211 Throwable throwable) {
212 checkNotNull(throwable);
213 ValueFuture<V> future = ValueFuture.create();
214 future.setException(throwable);
215 return future;
216 }
217
218 /**
219 * Returns a {@code CheckedFuture} which has an exception set immediately upon
220 * construction.
221 *
222 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
223 * method always returns {@code true}. Calling {@code get()} will immediately
224 * throw the provided {@code Throwable} wrapped in an {@code
225 * ExecutionException}, and calling {@code checkedGet()} will throw the
226 * provided exception itself.
227 *
228 * @throws Error if the throwable is an {@link Error}.
229 */
230 public static <V, X extends Exception> CheckedFuture<V, X>
231 immediateFailedCheckedFuture(final X exception) {
232 checkNotNull(exception);
233 return makeChecked(Futures.<V>immediateFailedFuture(exception),
234 new Function<Exception, X>() {
235 public X apply(Exception e) {
236 return exception;
237 }
238 });
239 }
240
241 /**
242 * Returns a new {@code ListenableFuture} whose result is asynchronously
243 * derived from the result of the given {@code Future}. More precisely, the
244 * returned {@code Future} takes its result from a {@code Future} produced by
245 * applying the given {@code Function} to the result of the original {@code
246 * Future}.
247 *
248 * <p>Successful cancellation of either the input future or the result of
249 * function application will cause the returned future to be cancelled.
250 * Cancelling the returned future will succeed if it is currently running.
251 * In this case, attempts will be made to cancel the input future and the
252 * result of the function, however there is no guarantee of success.
253 *
254 * <p>TODO: Add a version that accepts a normal {@code Future}
255 *
256 * <p>The typical use for this method would be when a RPC call is dependent on
257 * the results of another RPC. One would call the first RPC (input), create a
258 * function that calls another RPC based on input's result, and then call
259 * chain on input and that function to get a {@code ListenableFuture} of
260 * the result.
261 *
262 * @param input The future to chain
263 * @param function A function to chain the results of the provided future
264 * to the results of the returned future. This will be run in the thread
265 * that notifies input it is complete.
266 * @return A future that holds result of the chain.
267 */
268 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
269 Function<? super I, ? extends ListenableFuture<? extends O>> function) {
270 return chain(input, function, MoreExecutors.sameThreadExecutor());
271 }
272
273 /**
274 * Returns a new {@code ListenableFuture} whose result is asynchronously
275 * derived from the result of the given {@code Future}. More precisely, the
276 * returned {@code Future} takes its result from a {@code Future} produced by
277 * applying the given {@code Function} to the result of the original {@code
278 * Future}.
279 *
280 * <p>Successful cancellation of either the input future or the result of
281 * function application will cause the returned future to be cancelled.
282 * Cancelling the returned future will succeed if it is currently running.
283 * In this case, attempts will be made to cancel the input future and the
284 * result of the function, however there is no guarantee of success.
285 *
286 * <p>This version allows an arbitrary executor to be passed in for running
287 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
288 * the thread chained Function executes in will be whichever thread set the
289 * result of the input Future, which may be the network thread in the case of
290 * RPC-based Futures.
291 *
292 * @param input The future to chain
293 * @param function A function to chain the results of the provided future
294 * to the results of the returned future.
295 * @param exec Executor to run the function in.
296 * @return A future that holds result of the chain.
297 */
298 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
299 Function<? super I, ? extends ListenableFuture<? extends O>> function,
300 Executor exec) {
301 ChainingListenableFuture<I, O> chain =
302 new ChainingListenableFuture<I, O>(function, input);
303 input.addListener(chain, exec);
304 return chain;
305 }
306
307 /**
308 * Returns a new {@code ListenableFuture} whose result is the product of
309 * applying the given {@code Function} to the result of the given {@code
310 * Future}.
311 *
312 * <p>Successful cancellation of the input future will cause the returned
313 * future to be cancelled. Cancelling the returned future will succeed if it
314 * is currently running. In this case, an attempt will be made to cancel the
315 * input future, however there is no guarantee of success.
316 *
317 * <p>An example use of this method is to convert a serializable object
318 * returned from an RPC into a POJO.
319 *
320 * @param future The future to compose
321 * @param function A Function to compose the results of the provided future
322 * to the results of the returned future. This will be run in the thread
323 * that notifies input it is complete.
324 * @return A future that holds result of the composition.
325 */
326 public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future,
327 final Function<? super I, ? extends O> function) {
328 return compose(future, function, MoreExecutors.sameThreadExecutor());
329 }
330
331 /**
332 * Returns a new {@code ListenableFuture} whose result is the product of
333 * applying the given {@code Function} to the result of the given {@code
334 * Future}.
335 *
336 * <p>Successful cancellation of the input future will cause the returned
337 * future to be cancelled. Cancelling the returned future will succeed if it
338 * is currently running. In this case, an attempt will be made to cancel the
339 * input future, however there is no guarantee of success.
340 *
341 * <p>An example use of this method is to convert a serializable object
342 * returned from an RPC into a POJO.
343 *
344 * <p>This version allows an arbitrary executor to be passed in for running
345 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
346 * the thread chained Function executes in will be whichever thread set the
347 * result of the input Future, which may be the network thread in the case of
348 * RPC-based Futures.
349 *
350 * @param future The future to compose
351 * @param function A Function to compose the results of the provided future
352 * to the results of the returned future.
353 * @param exec Executor to run the function in.
354 * @return A future that holds result of the composition.
355 * @since 2
356 */
357 public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future,
358 final Function<? super I, ? extends O> function, Executor exec) {
359 checkNotNull(function);
360 Function<I, ListenableFuture<O>> wrapperFunction
361 = new Function<I, ListenableFuture<O>>() {
362 @Override public ListenableFuture<O> apply(I input) {
363 O output = function.apply(input);
364 return immediateFuture(output);
365 }
366 };
367 return chain(future, wrapperFunction, exec);
368 }
369
370 /**
371 * Returns a new {@code Future} whose result is the product of applying the
372 * given {@code Function} to the result of the given {@code Future}.
373 *
374 * <p>An example use of this method is to convert a Future that produces a
375 * handle to an object to a future that produces the object itself.
376 *
377 * <p>Each call to {@code Future<O>.get(*)} results in a call to
378 * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it
379 * is assumed that {@code Future<I>.get(*)} is idempotent.
380 *
381 * <p>When calling {@link Future#get(long, TimeUnit)} on the returned
382 * future, the timeout only applies to the future passed in to this method.
383 * Any additional time taken by applying {@code function} is not considered.
384 * (Exception: If the input future is a {@link ListenableFuture}, timeouts
385 * will be strictly enforced.)
386 *
387 * @param future The future to compose
388 * @param function A Function to compose the results of the provided future
389 * to the results of the returned future. This will be run in the thread
390 * that calls one of the varieties of {@code get()}.
391 * @return A future that computes result of the composition.
392 */
393 public static <I, O> Future<O> compose(final Future<I> future,
394 final Function<? super I, ? extends O> function) {
395 if (future instanceof ListenableFuture) {
396 return compose((ListenableFuture<I>) future, function);
397 }
398 checkNotNull(future);
399 checkNotNull(function);
400 return new Future<O>() {
401
402 /*
403 * Concurrency detail:
404 *
405 * <p>To preserve the idempotency of calls to this.get(*) calls to the
406 * function are only applied once. A lock is required to prevent multiple
407 * applications of the function. The calls to future.get(*) are performed
408 * outside the lock, as is required to prevent calls to
409 * get(long, TimeUnit) to persist beyond their timeout.
410 *
411 * <p>Calls to future.get(*) on every call to this.get(*) also provide
412 * the cancellation behavior for this.
413 *
414 * <p>(Consider: in thread A, call get(), in thread B call get(long,
415 * TimeUnit). Thread B may have to wait for Thread A to finish, which
416 * would be unacceptable.)
417 *
418 * <p>Note that each call to Future<O>.get(*) results in a call to
419 * Future<I>.get(*), but the function is only applied once, so
420 * Future<I>.get(*) is assumed to be idempotent.
421 */
422
423 private final Object lock = new Object();
424 private boolean set = false;
425 private O value = null;
426 private ExecutionException exception = null;
427
428 @Override
429 public O get() throws InterruptedException, ExecutionException {
430 return apply(future.get());
431 }
432
433 @Override
434 public O get(long timeout, TimeUnit unit) throws InterruptedException,
435 ExecutionException, TimeoutException {
436 return apply(future.get(timeout, unit));
437 }
438
439 private O apply(I raw) throws ExecutionException {
440 synchronized (lock) {
441 if (!set) {
442 try {
443 value = function.apply(raw);
444 } catch (RuntimeException e) {
445 exception = new ExecutionException(e);
446 } catch (Error e) {
447 exception = new ExecutionException(e);
448 }
449 set = true;
450 }
451
452 if (exception != null) {
453 throw exception;
454 }
455 return value;
456 }
457 }
458
459 @Override
460 public boolean cancel(boolean mayInterruptIfRunning) {
461 return future.cancel(mayInterruptIfRunning);
462 }
463
464 @Override
465 public boolean isCancelled() {
466 return future.isCancelled();
467 }
468
469 @Override
470 public boolean isDone() {
471 return future.isDone();
472 }
473 };
474 }
475
476 /**
477 * An implementation of {@code ListenableFuture} that also implements
478 * {@code Runnable} so that it can be used to nest ListenableFutures.
479 * Once the passed-in {@code ListenableFuture} is complete, it calls the
480 * passed-in {@code Function} to generate the result.
481 *
482 * <p>If the function throws any checked exceptions, they should be wrapped
483 * in a {@code UndeclaredThrowableException} so that this class can get
484 * access to the cause.
485 */
486 private static class ChainingListenableFuture<I, O>
487 extends AbstractListenableFuture<O> implements Runnable {
488
489 private Function<? super I, ? extends ListenableFuture<? extends O>>
490 function;
491 private ListenableFuture<? extends I> inputFuture;
492 private volatile ListenableFuture<? extends O> outputFuture;
493 private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
494 new LinkedBlockingQueue<Boolean>(1);
495 private final CountDownLatch outputCreated = new CountDownLatch(1);
496
497 private ChainingListenableFuture(
498 Function<? super I, ? extends ListenableFuture<? extends O>> function,
499 ListenableFuture<? extends I> inputFuture) {
500 this.function = checkNotNull(function);
501 this.inputFuture = checkNotNull(inputFuture);
502 }
503
504 /**
505 * Delegate the get() to the input and output futures, in case
506 * their implementations defer starting computation until their
507 * own get() is invoked.
508 */
509 @Override
510 public O get() throws InterruptedException, ExecutionException {
511 if (!isDone()) {
512 // Invoking get on the inputFuture will ensure our own run()
513 // method below is invoked as a listener when inputFuture sets
514 // its value. Therefore when get() returns we should then see
515 // the outputFuture be created.
516 ListenableFuture<? extends I> inputFuture = this.inputFuture;
517 if (inputFuture != null) {
518 inputFuture.get();
519 }
520
521 // If our listener was scheduled to run on an executor we may
522 // need to wait for our listener to finish running before the
523 // outputFuture has been constructed by the function.
524 outputCreated.await();
525
526 // Like above with the inputFuture, we have a listener on
527 // the outputFuture that will set our own value when its
528 // value is set. Invoking get will ensure the output can
529 // complete and invoke our listener, so that we can later
530 // get the result.
531 ListenableFuture<? extends O> outputFuture = this.outputFuture;
532 if (outputFuture != null) {
533 outputFuture.get();
534 }
535 }
536 return super.get();
537 }
538
539 /**
540 * Delegate the get() to the input and output futures, in case
541 * their implementations defer starting computation until their
542 * own get() is invoked.
543 */
544 @Override
545 public O get(long timeout, TimeUnit unit) throws TimeoutException,
546 ExecutionException, InterruptedException {
547 if (!isDone()) {
548 // Use a single time unit so we can decrease remaining timeout
549 // as we wait for various phases to complete.
550 if (unit != NANOSECONDS) {
551 timeout = NANOSECONDS.convert(timeout, unit);
552 unit = NANOSECONDS;
553 }
554
555 // Invoking get on the inputFuture will ensure our own run()
556 // method below is invoked as a listener when inputFuture sets
557 // its value. Therefore when get() returns we should then see
558 // the outputFuture be created.
559 ListenableFuture<? extends I> inputFuture = this.inputFuture;
560 if (inputFuture != null) {
561 long start = System.nanoTime();
562 inputFuture.get(timeout, unit);
563 timeout -= Math.max(0, System.nanoTime() - start);
564 }
565
566 // If our listener was scheduled to run on an executor we may
567 // need to wait for our listener to finish running before the
568 // outputFuture has been constructed by the function.
569 long start = System.nanoTime();
570 if (!outputCreated.await(timeout, unit)) {
571 throw new TimeoutException();
572 }
573 timeout -= Math.max(0, System.nanoTime() - start);
574
575 // Like above with the inputFuture, we have a listener on
576 // the outputFuture that will set our own value when its
577 // value is set. Invoking get will ensure the output can
578 // complete and invoke our listener, so that we can later
579 // get the result.
580 ListenableFuture<? extends O> outputFuture = this.outputFuture;
581 if (outputFuture != null) {
582 outputFuture.get(timeout, unit);
583 }
584 }
585 return super.get(timeout, unit);
586 }
587
588 @Override
589 public boolean cancel(boolean mayInterruptIfRunning) {
590 if (cancel()) {
591 try {
592 // This should never block since only one thread is allowed to cancel
593 // this Future.
594 mayInterruptIfRunningChannel.put(mayInterruptIfRunning);
595 } catch (InterruptedException ignored) {
596 Thread.currentThread().interrupt();
597 }
598 cancel(inputFuture, mayInterruptIfRunning);
599 cancel(outputFuture, mayInterruptIfRunning);
600 return true;
601 }
602 return false;
603 }
604
605 private void cancel(@Nullable Future<?> future,
606 boolean mayInterruptIfRunning) {
607 if (future != null) {
608 future.cancel(mayInterruptIfRunning);
609 }
610 }
611
612 public void run() {
613 try {
614 I sourceResult;
615 try {
616 sourceResult = makeUninterruptible(inputFuture).get();
617 } catch (CancellationException e) {
618 // Cancel this future and return.
619 cancel();
620 return;
621 } catch (ExecutionException e) {
622 // Set the cause of the exception as this future's exception
623 setException(e.getCause());
624 return;
625 }
626
627 final ListenableFuture<? extends O> outputFuture = this.outputFuture =
628 function.apply(sourceResult);
629 if (isCancelled()) {
630 // Handles the case where cancel was called while the function was
631 // being applied.
632 try {
633 // There is a gap in cancel(boolean) between calling cancel() and
634 // storing the value of mayInterruptIfRunning, so this thread needs
635 // to block, waiting for that value.
636 outputFuture.cancel(mayInterruptIfRunningChannel.take());
637 } catch (InterruptedException ignored) {
638 Thread.currentThread().interrupt();
639 }
640 this.outputFuture = null;
641 return;
642 }
643 outputFuture.addListener(new Runnable() {
644 public void run() {
645 try {
646 // Here it would have been nice to have had an
647 // UninterruptibleListenableFuture, but we don't want to start a
648 // combinatorial explosion of interfaces, so we have to make do.
649 set(makeUninterruptible(outputFuture).get());
650 } catch (CancellationException e) {
651 // Cancel this future and return.
652 cancel();
653 return;
654 } catch (ExecutionException e) {
655 // Set the cause of the exception as this future's exception
656 setException(e.getCause());
657 } finally {
658 // Don't pin inputs beyond completion
659 ChainingListenableFuture.this.outputFuture = null;
660 }
661 }
662 }, MoreExecutors.sameThreadExecutor());
663 } catch (UndeclaredThrowableException e) {
664 // Set the cause of the exception as this future's exception
665 setException(e.getCause());
666 } catch (RuntimeException e) {
667 // This exception is irrelevant in this thread, but useful for the
668 // client
669 setException(e);
670 } catch (Error e) {
671 // Propagate errors up ASAP - our superclass will rethrow the error
672 setException(e);
673 } finally {
674 // Don't pin inputs beyond completion
675 function = null;
676 inputFuture = null;
677 // Allow our get routines to examine outputFuture now.
678 outputCreated.countDown();
679 }
680 }
681 }
682
683 /**
684 * A checked future that uses a function to map from exceptions to the
685 * appropriate checked type.
686 */
687 private static class MappingCheckedFuture<V, X extends Exception> extends
688 AbstractCheckedFuture<V, X> {
689
690 final Function<Exception, X> mapper;
691
692 MappingCheckedFuture(ListenableFuture<V> delegate,
693 Function<Exception, X> mapper) {
694 super(delegate);
695
696 this.mapper = checkNotNull(mapper);
697 }
698
699 @Override
700 protected X mapException(Exception e) {
701 return mapper.apply(e);
702 }
703 }
704
705 /**
706 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This
707 * will wait on the future to finish, and when it completes, run the
708 * listeners. This implementation will wait on the source future
709 * indefinitely, so if the source future never completes, the adapter will
710 * never complete either.
711 *
712 * <p>If the delegate future is interrupted or throws an unexpected unchecked
713 * exception, the listeners will not be invoked.
714 */
715 private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
716 implements ListenableFuture<V> {
717
718 private static final ThreadFactory threadFactory =
719 new ThreadFactoryBuilder()
720 .setNameFormat("ListenableFutureAdapter-thread-%d")
721 .build();
722 private static final Executor defaultAdapterExecutor =
723 Executors.newCachedThreadPool(threadFactory);
724
725 private final Executor adapterExecutor;
726
727 // The execution list to hold our listeners.
728 private final ExecutionList executionList = new ExecutionList();
729
730 // This allows us to only start up a thread waiting on the delegate future
731 // when the first listener is added.
732 private final AtomicBoolean hasListeners = new AtomicBoolean(false);
733
734 // The delegate future.
735 private final Future<V> delegate;
736
737 ListenableFutureAdapter(Future<V> delegate) {
738 this(delegate, defaultAdapterExecutor);
739 }
740
741 ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
742 this.delegate = checkNotNull(delegate);
743 this.adapterExecutor = checkNotNull(adapterExecutor);
744 }
745
746 @Override
747 protected Future<V> delegate() {
748 return delegate;
749 }
750
751 @Override
752 public void addListener(Runnable listener, Executor exec) {
753 executionList.add(listener, exec);
754
755 // When a listener is first added, we run a task that will wait for
756 // the delegate to finish, and when it is done will run the listeners.
757 if (hasListeners.compareAndSet(false, true)) {
758 if (delegate.isDone()) {
759 // If the delegate is already done, run the execution list
760 // immediately on the current thread.
761 executionList.run();
762 return;
763 }
764
765 adapterExecutor.execute(new Runnable() {
766 @Override
767 public void run() {
768 try {
769 delegate.get();
770 } catch (Error e) {
771 throw e;
772 } catch (InterruptedException e) {
773 // This thread was interrupted. This should never happen, so we
774 // throw an IllegalStateException.
775 Thread.currentThread().interrupt();
776 throw new IllegalStateException("Adapter thread interrupted!", e);
777 } catch (Throwable e) {
778 // ExecutionException / CancellationException / RuntimeException
779 // The task is done, run the listeners.
780 }
781 executionList.run();
782 }
783 });
784 }
785 }
786 }
787 }