001 /*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.util.concurrent;
018
019 import static com.google.common.base.Preconditions.checkNotNull;
020 import static java.util.concurrent.TimeUnit.NANOSECONDS;
021
022 import com.google.common.annotations.Beta;
023 import com.google.common.base.Function;
024
025 import java.lang.reflect.UndeclaredThrowableException;
026 import java.util.List;
027 import java.util.concurrent.BlockingQueue;
028 import java.util.concurrent.CancellationException;
029 import java.util.concurrent.CountDownLatch;
030 import java.util.concurrent.ExecutionException;
031 import java.util.concurrent.Executor;
032 import java.util.concurrent.Executors;
033 import java.util.concurrent.Future;
034 import java.util.concurrent.LinkedBlockingQueue;
035 import java.util.concurrent.ThreadFactory;
036 import java.util.concurrent.TimeUnit;
037 import java.util.concurrent.TimeoutException;
038 import java.util.concurrent.atomic.AtomicBoolean;
039
040 import javax.annotation.Nullable;
041
042 /**
043 * Static utility methods pertaining to the {@link Future} interface.
044 *
045 * @author Kevin Bourrillion
046 * @author Nishant Thakkar
047 * @author Sven Mawson
048 * @since 1
049 */
050 @Beta
051 public final class Futures {
052 private Futures() {}
053
054 /**
055 * Returns an uninterruptible view of a {@code Future}. If a thread is
056 * interrupted during an attempt to {@code get()} from the returned future, it
057 * continues to wait on the result until it is available or the timeout
058 * elapses, and only then re-interrupts the thread.
059 */
060 public static <V> UninterruptibleFuture<V> makeUninterruptible(
061 final Future<V> future) {
062 checkNotNull(future);
063 if (future instanceof UninterruptibleFuture<?>) {
064 return (UninterruptibleFuture<V>) future;
065 }
066 return new UninterruptibleFuture<V>() {
067 @Override
068 public boolean cancel(boolean mayInterruptIfRunning) {
069 return future.cancel(mayInterruptIfRunning);
070 }
071 @Override
072 public boolean isCancelled() {
073 return future.isCancelled();
074 }
075 @Override
076 public boolean isDone() {
077 return future.isDone();
078 }
079
080 @Override
081 public V get(long originalTimeout, TimeUnit originalUnit)
082 throws TimeoutException, ExecutionException {
083 boolean interrupted = false;
084 try {
085 long end = System.nanoTime() + originalUnit.toNanos(originalTimeout);
086 while (true) {
087 try {
088 // Future treats negative timeouts just like zero.
089 return future.get(end - System.nanoTime(), NANOSECONDS);
090 } catch (InterruptedException e) {
091 interrupted = true;
092 }
093 }
094 } finally {
095 if (interrupted) {
096 Thread.currentThread().interrupt();
097 }
098 }
099 }
100
101 @Override
102 public V get() throws ExecutionException {
103 boolean interrupted = false;
104 try {
105 while (true) {
106 try {
107 return future.get();
108 } catch (InterruptedException ignored) {
109 interrupted = true;
110 }
111 }
112 } finally {
113 if (interrupted) {
114 Thread.currentThread().interrupt();
115 }
116 }
117 }
118 };
119 }
120
121 /**
122 *
123 * <p>Creates a {@link ListenableFuture} out of a normal {@link Future}. The
124 * returned future will create a thread to wait for the source future to
125 * complete before executing the listeners.
126 *
127 * <p><b>Warning:</b> If the input future does not already implement {@link
128 * ListenableFuture}, the returned future will emulate {@link
129 * ListenableFuture#addListener} by taking a thread from an internal,
130 * unbounded pool at the first call to {@code addListener} and holding it
131 * until the future is {@linkplain Future#isDone() done}.
132 *
133 * <p>Callers who have a future that subclasses
134 * {@link java.util.concurrent.FutureTask} may want to instead subclass
135 * {@link ListenableFutureTask}, which adds the {@link ListenableFuture}
136 * functionality to the standard {@code FutureTask} implementation.
137 */
138 public static <V> ListenableFuture<V> makeListenable(
139 Future<V> future) {
140 if (future instanceof ListenableFuture<?>) {
141 return (ListenableFuture<V>) future;
142 }
143 return new ListenableFutureAdapter<V>(future);
144 }
145
146 static <V> ListenableFuture<V> makeListenable(
147 Future<V> future, Executor executor) {
148 checkNotNull(executor);
149 if (future instanceof ListenableFuture<?>) {
150 return (ListenableFuture<V>) future;
151 }
152 return new ListenableFutureAdapter<V>(future, executor);
153 }
154
155 /**
156 * Creates a {@link CheckedFuture} out of a normal {@link Future} and a
157 * {@link Function} that maps from {@link Exception} instances into the
158 * appropriate checked type.
159 *
160 * <p><b>Warning:</b> If the input future does not implement {@link
161 * ListenableFuture}, the returned future will emulate {@link
162 * ListenableFuture#addListener} by taking a thread from an internal,
163 * unbounded pool at the first call to {@code addListener} and holding it
164 * until the future is {@linkplain Future#isDone() done}.
165 *
166 * <p>The given mapping function will be applied to an
167 * {@link InterruptedException}, a {@link CancellationException}, or an
168 * {@link ExecutionException} with the actual cause of the exception.
169 * See {@link Future#get()} for details on the exceptions thrown.
170 */
171 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
172 Future<V> future, Function<Exception, X> mapper) {
173 return new MappingCheckedFuture<V, X>(makeListenable(future), mapper);
174 }
175
176 /**
177 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
178 * and a {@link Function} that maps from {@link Exception} instances into the
179 * appropriate checked type.
180 *
181 * <p>The given mapping function will be applied to an
182 * {@link InterruptedException}, a {@link CancellationException}, or an
183 * {@link ExecutionException} with the actual cause of the exception.
184 * See {@link Future#get()} for details on the exceptions thrown.
185 *
186 * @since 9 (source-compatible since release 1)
187 */
188 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
189 ListenableFuture<V> future, Function<Exception, X> mapper) {
190 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
191 }
192
193 /**
194 * Creates a {@code ListenableFuture} which has its value set immediately upon
195 * construction. The getters just return the value. This {@code Future} can't
196 * be canceled or timed out and its {@code isDone()} method always returns
197 * {@code true}.
198 */
199 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
200 SettableFuture<V> future = SettableFuture.create();
201 future.set(value);
202 return future;
203 }
204
205 /**
206 * Returns a {@code CheckedFuture} which has its value set immediately upon
207 * construction.
208 *
209 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
210 * method always returns {@code true}. Calling {@code get()} or {@code
211 * checkedGet()} will immediately return the provided value.
212 */
213 public static <V, X extends Exception> CheckedFuture<V, X>
214 immediateCheckedFuture(@Nullable V value) {
215 SettableFuture<V> future = SettableFuture.create();
216 future.set(value);
217 return Futures.makeChecked(future, new Function<Exception, X>() {
218 @Override
219 public X apply(Exception e) {
220 throw new AssertionError("impossible");
221 }
222 });
223 }
224
225 /**
226 * Returns a {@code ListenableFuture} which has an exception set immediately
227 * upon construction.
228 *
229 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
230 * method always returns {@code true}. Calling {@code get()} will immediately
231 * throw the provided {@code Throwable} wrapped in an {@code
232 * ExecutionException}.
233 *
234 * @throws Error if the throwable is an {@link Error}.
235 */
236 public static <V> ListenableFuture<V> immediateFailedFuture(
237 Throwable throwable) {
238 checkNotNull(throwable);
239 SettableFuture<V> future = SettableFuture.create();
240 future.setException(throwable);
241 return future;
242 }
243
244 /**
245 * Returns a {@code CheckedFuture} which has an exception set immediately upon
246 * construction.
247 *
248 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
249 * method always returns {@code true}. Calling {@code get()} will immediately
250 * throw the provided {@code Throwable} wrapped in an {@code
251 * ExecutionException}, and calling {@code checkedGet()} will throw the
252 * provided exception itself.
253 *
254 * @throws Error if the throwable is an {@link Error}.
255 */
256 public static <V, X extends Exception> CheckedFuture<V, X>
257 immediateFailedCheckedFuture(final X exception) {
258 checkNotNull(exception);
259 return makeChecked(Futures.<V>immediateFailedFuture(exception),
260 new Function<Exception, X>() {
261 @Override
262 public X apply(Exception e) {
263 return exception;
264 }
265 });
266 }
267
268 /**
269 * Returns a new {@code ListenableFuture} whose result is asynchronously
270 * derived from the result of the given {@code Future}. More precisely, the
271 * returned {@code Future} takes its result from a {@code Future} produced by
272 * applying the given {@code Function} to the result of the original {@code
273 * Future}. Example:
274 *
275 * <pre> {@code
276 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
277 * Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
278 * new Function<RowKey, ListenableFuture<QueryResult>>() {
279 * public ListenableFuture<QueryResult> apply(RowKey rowKey) {
280 * return dataService.read(rowKey);
281 * }
282 * };
283 * ListenableFuture<QueryResult> queryFuture =
284 * chain(queryFuture, queryFunction);
285 * }</pre>
286 *
287 * <p>Successful cancellation of either the input future or the result of
288 * function application will cause the returned future to be cancelled.
289 * Cancelling the returned future will succeed if it is currently running.
290 * In this case, attempts will be made to cancel the input future and the
291 * result of the function, however there is no guarantee of success.
292 *
293 * <p>TODO: Add a version that accepts a normal {@code Future}
294 *
295 * <p>The typical use for this method would be when a RPC call is dependent on
296 * the results of another RPC. One would call the first RPC (input), create a
297 * function that calls another RPC based on input's result, and then call
298 * chain on input and that function to get a {@code ListenableFuture} of
299 * the result.
300 *
301 * @param input The future to chain
302 * @param function A function to chain the results of the provided future
303 * to the results of the returned future. This will be run in the thread
304 * that notifies input it is complete.
305 * @return A future that holds result of the chain.
306 */
307 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
308 Function<? super I, ? extends ListenableFuture<? extends O>> function) {
309 return chain(input, function, MoreExecutors.sameThreadExecutor());
310 }
311
312 /**
313 * Returns a new {@code ListenableFuture} whose result is asynchronously
314 * derived from the result of the given {@code Future}. More precisely, the
315 * returned {@code Future} takes its result from a {@code Future} produced by
316 * applying the given {@code Function} to the result of the original {@code
317 * Future}. Example:
318 *
319 * <pre> {@code
320 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
321 * Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
322 * new Function<RowKey, ListenableFuture<QueryResult>>() {
323 * public ListenableFuture<QueryResult> apply(RowKey rowKey) {
324 * return dataService.read(rowKey);
325 * }
326 * };
327 * ListenableFuture<QueryResult> queryFuture =
328 * chain(queryFuture, queryFunction, executor);
329 * }</pre>
330 *
331 * <p>Successful cancellation of either the input future or the result of
332 * function application will cause the returned future to be cancelled.
333 * Cancelling the returned future will succeed if it is currently running.
334 * In this case, attempts will be made to cancel the input future and the
335 * result of the function, however there is no guarantee of success.
336 *
337 * <p>This version allows an arbitrary executor to be passed in for running
338 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
339 * the thread chained Function executes in will be whichever thread set the
340 * result of the input Future, which may be the network thread in the case of
341 * RPC-based Futures.
342 *
343 * @param input The future to chain
344 * @param function A function to chain the results of the provided future
345 * to the results of the returned future.
346 * @param exec Executor to run the function in.
347 * @return A future that holds result of the chain.
348 */
349 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
350 Function<? super I, ? extends ListenableFuture<? extends O>> function,
351 Executor exec) {
352 ChainingListenableFuture<I, O> chain =
353 new ChainingListenableFuture<I, O>(function, input);
354 input.addListener(chain, exec);
355 return chain;
356 }
357
358 /**
359 * Returns a new {@code ListenableFuture} whose result is the product of
360 * applying the given {@code Function} to the result of the given {@code
361 * Future}. Example:
362 *
363 * <pre> {@code
364 * ListenableFuture<QueryResult> queryFuture = ...;
365 * Function<QueryResult, List<Row>> rowsFunction =
366 * new Function<QueryResult, List<Row>>() {
367 * public List<Row> apply(QueryResult queryResult) {
368 * return queryResult.getRows();
369 * }
370 * };
371 * ListenableFuture<List<Row>> rowsFuture =
372 * transform(queryFuture, rowsFunction);
373 * }</pre>
374 *
375 * <p>Successful cancellation of the input future will cause the returned
376 * future to be cancelled. Cancelling the returned future will succeed if it
377 * is currently running. In this case, an attempt will be made to cancel the
378 * input future, however there is no guarantee of success.
379 *
380 * <p>An example use of this method is to convert a serializable object
381 * returned from an RPC into a POJO.
382 *
383 * @param future The future to compose
384 * @param function A Function to compose the results of the provided future
385 * to the results of the returned future. This will be run in the thread
386 * that notifies input it is complete.
387 * @return A future that holds result of the composition.
388 * @since 9 (in version 1 as {@code compose})
389 */
390 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
391 final Function<? super I, ? extends O> function) {
392 return transform(future, function, MoreExecutors.sameThreadExecutor());
393 }
394
395 /**
396 * Returns a new {@code ListenableFuture} whose result is the product of
397 * applying the given {@code Function} to the result of the given {@code
398 * Future}. Example:
399 *
400 * <pre> {@code
401 * ListenableFuture<QueryResult> queryFuture = ...;
402 * Function<QueryResult, List<Row>> rowsFunction =
403 * new Function<QueryResult, List<Row>>() {
404 * public List<Row> apply(QueryResult queryResult) {
405 * return queryResult.getRows();
406 * }
407 * };
408 * ListenableFuture<List<Row>> rowsFuture =
409 * transform(queryFuture, rowsFunction, executor);
410 * }</pre>
411 *
412 * <p>Successful cancellation of the input future will cause the returned
413 * future to be cancelled. Cancelling the returned future will succeed if it
414 * is currently running. In this case, an attempt will be made to cancel the
415 * input future, however there is no guarantee of success.
416 *
417 * <p>An example use of this method is to convert a serializable object
418 * returned from an RPC into a POJO.
419 *
420 * <p>This version allows an arbitrary executor to be passed in for running
421 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
422 * the thread chained Function executes in will be whichever thread set the
423 * result of the input Future, which may be the network thread in the case of
424 * RPC-based Futures.
425 *
426 * @param future The future to compose
427 * @param function A Function to compose the results of the provided future
428 * to the results of the returned future.
429 * @param exec Executor to run the function in.
430 * @return A future that holds result of the composition.
431 * @since 9 (in version 2 as {@code compose})
432 */
433 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
434 final Function<? super I, ? extends O> function, Executor exec) {
435 checkNotNull(function);
436 Function<I, ListenableFuture<O>> wrapperFunction
437 = new Function<I, ListenableFuture<O>>() {
438 @Override public ListenableFuture<O> apply(I input) {
439 O output = function.apply(input);
440 return immediateFuture(output);
441 }
442 };
443 return chain(future, wrapperFunction, exec);
444 }
445
446 /**
447 * Returns a new {@code Future} whose result is the product of applying the
448 * given {@code Function} to the result of the given {@code Future}. Example:
449 *
450 * <pre> {@code
451 * Future<QueryResult> queryFuture = ...;
452 * Function<QueryResult, List<Row>> rowsFunction =
453 * new Function<QueryResult, List<Row>>() {
454 * public List<Row> apply(QueryResult queryResult) {
455 * return queryResult.getRows();
456 * }
457 * };
458 * Future<List<Row>> rowsFuture = transform(queryFuture, rowsFunction);
459 * }</pre>
460 *
461 * <p>Each call to {@code Future<O>.get(*)} results in a call to
462 * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it
463 * is assumed that {@code Future<I>.get(*)} is idempotent.
464 *
465 * <p>When calling {@link Future#get(long, TimeUnit)} on the returned
466 * future, the timeout only applies to the future passed in to this method.
467 * Any additional time taken by applying {@code function} is not considered.
468 * (Exception: If the input future is a {@link ListenableFuture}, timeouts
469 * will be strictly enforced.)
470 *
471 * @param future The future to compose
472 * @param function A Function to compose the results of the provided future
473 * to the results of the returned future. This will be run in the thread
474 * that calls one of the varieties of {@code get()}.
475 * @return A future that computes result of the composition.
476 * @since 9 (in version 1 as {@code compose})
477 */
478 public static <I, O> Future<O> transform(final Future<I> future,
479 final Function<? super I, ? extends O> function) {
480 if (future instanceof ListenableFuture) {
481 return transform((ListenableFuture<I>) future, function);
482 }
483 checkNotNull(future);
484 checkNotNull(function);
485 return new Future<O>() {
486
487 /*
488 * Concurrency detail:
489 *
490 * <p>To preserve the idempotency of calls to this.get(*) calls to the
491 * function are only applied once. A lock is required to prevent multiple
492 * applications of the function. The calls to future.get(*) are performed
493 * outside the lock, as is required to prevent calls to
494 * get(long, TimeUnit) to persist beyond their timeout.
495 *
496 * <p>Calls to future.get(*) on every call to this.get(*) also provide
497 * the cancellation behavior for this.
498 *
499 * <p>(Consider: in thread A, call get(), in thread B call get(long,
500 * TimeUnit). Thread B may have to wait for Thread A to finish, which
501 * would be unacceptable.)
502 *
503 * <p>Note that each call to Future<O>.get(*) results in a call to
504 * Future<I>.get(*), but the function is only applied once, so
505 * Future<I>.get(*) is assumed to be idempotent.
506 */
507
508 private final Object lock = new Object();
509 private boolean set = false;
510 private O value = null;
511 private ExecutionException exception = null;
512
513 @Override
514 public O get() throws InterruptedException, ExecutionException {
515 return apply(future.get());
516 }
517
518 @Override
519 public O get(long timeout, TimeUnit unit) throws InterruptedException,
520 ExecutionException, TimeoutException {
521 return apply(future.get(timeout, unit));
522 }
523
524 private O apply(I raw) throws ExecutionException {
525 synchronized (lock) {
526 if (!set) {
527 try {
528 value = function.apply(raw);
529 } catch (RuntimeException e) {
530 exception = new ExecutionException(e);
531 } catch (Error e) {
532 exception = new ExecutionException(e);
533 }
534 set = true;
535 }
536
537 if (exception != null) {
538 throw exception;
539 }
540 return value;
541 }
542 }
543
544 @Override
545 public boolean cancel(boolean mayInterruptIfRunning) {
546 return future.cancel(mayInterruptIfRunning);
547 }
548
549 @Override
550 public boolean isCancelled() {
551 return future.isCancelled();
552 }
553
554 @Override
555 public boolean isDone() {
556 return future.isDone();
557 }
558 };
559 }
560
561 /**
562 * An implementation of {@code ListenableFuture} that also implements
563 * {@code Runnable} so that it can be used to nest ListenableFutures.
564 * Once the passed-in {@code ListenableFuture} is complete, it calls the
565 * passed-in {@code Function} to generate the result.
566 *
567 * <p>If the function throws any checked exceptions, they should be wrapped
568 * in a {@code UndeclaredThrowableException} so that this class can get
569 * access to the cause.
570 */
571 private static class ChainingListenableFuture<I, O>
572 extends AbstractListenableFuture<O> implements Runnable {
573
574 private Function<? super I, ? extends ListenableFuture<? extends O>>
575 function;
576 private ListenableFuture<? extends I> inputFuture;
577 private volatile ListenableFuture<? extends O> outputFuture;
578 private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
579 new LinkedBlockingQueue<Boolean>(1);
580 private final CountDownLatch outputCreated = new CountDownLatch(1);
581
582 private ChainingListenableFuture(
583 Function<? super I, ? extends ListenableFuture<? extends O>> function,
584 ListenableFuture<? extends I> inputFuture) {
585 this.function = checkNotNull(function);
586 this.inputFuture = checkNotNull(inputFuture);
587 }
588
589 /**
590 * Delegate the get() to the input and output futures, in case
591 * their implementations defer starting computation until their
592 * own get() is invoked.
593 */
594 @Override
595 public O get() throws InterruptedException, ExecutionException {
596 if (!isDone()) {
597 // Invoking get on the inputFuture will ensure our own run()
598 // method below is invoked as a listener when inputFuture sets
599 // its value. Therefore when get() returns we should then see
600 // the outputFuture be created.
601 ListenableFuture<? extends I> inputFuture = this.inputFuture;
602 if (inputFuture != null) {
603 inputFuture.get();
604 }
605
606 // If our listener was scheduled to run on an executor we may
607 // need to wait for our listener to finish running before the
608 // outputFuture has been constructed by the function.
609 outputCreated.await();
610
611 // Like above with the inputFuture, we have a listener on
612 // the outputFuture that will set our own value when its
613 // value is set. Invoking get will ensure the output can
614 // complete and invoke our listener, so that we can later
615 // get the result.
616 ListenableFuture<? extends O> outputFuture = this.outputFuture;
617 if (outputFuture != null) {
618 outputFuture.get();
619 }
620 }
621 return super.get();
622 }
623
624 /**
625 * Delegate the get() to the input and output futures, in case
626 * their implementations defer starting computation until their
627 * own get() is invoked.
628 */
629 @Override
630 public O get(long timeout, TimeUnit unit) throws TimeoutException,
631 ExecutionException, InterruptedException {
632 if (!isDone()) {
633 // Use a single time unit so we can decrease remaining timeout
634 // as we wait for various phases to complete.
635 if (unit != NANOSECONDS) {
636 timeout = NANOSECONDS.convert(timeout, unit);
637 unit = NANOSECONDS;
638 }
639
640 // Invoking get on the inputFuture will ensure our own run()
641 // method below is invoked as a listener when inputFuture sets
642 // its value. Therefore when get() returns we should then see
643 // the outputFuture be created.
644 ListenableFuture<? extends I> inputFuture = this.inputFuture;
645 if (inputFuture != null) {
646 long start = System.nanoTime();
647 inputFuture.get(timeout, unit);
648 timeout -= Math.max(0, System.nanoTime() - start);
649 }
650
651 // If our listener was scheduled to run on an executor we may
652 // need to wait for our listener to finish running before the
653 // outputFuture has been constructed by the function.
654 long start = System.nanoTime();
655 if (!outputCreated.await(timeout, unit)) {
656 throw new TimeoutException();
657 }
658 timeout -= Math.max(0, System.nanoTime() - start);
659
660 // Like above with the inputFuture, we have a listener on
661 // the outputFuture that will set our own value when its
662 // value is set. Invoking get will ensure the output can
663 // complete and invoke our listener, so that we can later
664 // get the result.
665 ListenableFuture<? extends O> outputFuture = this.outputFuture;
666 if (outputFuture != null) {
667 outputFuture.get(timeout, unit);
668 }
669 }
670 return super.get(timeout, unit);
671 }
672
673 @Override
674 public boolean cancel(boolean mayInterruptIfRunning) {
675 if (cancel()) {
676 try {
677 // This should never block since only one thread is allowed to cancel
678 // this Future.
679 mayInterruptIfRunningChannel.put(mayInterruptIfRunning);
680 } catch (InterruptedException ignored) {
681 Thread.currentThread().interrupt();
682 }
683 cancel(inputFuture, mayInterruptIfRunning);
684 cancel(outputFuture, mayInterruptIfRunning);
685 return true;
686 }
687 return false;
688 }
689
690 private void cancel(@Nullable Future<?> future,
691 boolean mayInterruptIfRunning) {
692 if (future != null) {
693 future.cancel(mayInterruptIfRunning);
694 }
695 }
696
697 @Override
698 public void run() {
699 try {
700 I sourceResult;
701 try {
702 sourceResult = makeUninterruptible(inputFuture).get();
703 } catch (CancellationException e) {
704 // Cancel this future and return.
705 cancel();
706 return;
707 } catch (ExecutionException e) {
708 // Set the cause of the exception as this future's exception
709 setException(e.getCause());
710 return;
711 }
712
713 final ListenableFuture<? extends O> outputFuture = this.outputFuture =
714 function.apply(sourceResult);
715 if (isCancelled()) {
716 // Handles the case where cancel was called while the function was
717 // being applied.
718 try {
719 // There is a gap in cancel(boolean) between calling cancel() and
720 // storing the value of mayInterruptIfRunning, so this thread needs
721 // to block, waiting for that value.
722 outputFuture.cancel(mayInterruptIfRunningChannel.take());
723 } catch (InterruptedException ignored) {
724 Thread.currentThread().interrupt();
725 }
726 this.outputFuture = null;
727 return;
728 }
729 outputFuture.addListener(new Runnable() {
730 @Override
731 public void run() {
732 try {
733 // Here it would have been nice to have had an
734 // UninterruptibleListenableFuture, but we don't want to start a
735 // combinatorial explosion of interfaces, so we have to make do.
736 set(makeUninterruptible(outputFuture).get());
737 } catch (CancellationException e) {
738 // Cancel this future and return.
739 cancel();
740 return;
741 } catch (ExecutionException e) {
742 // Set the cause of the exception as this future's exception
743 setException(e.getCause());
744 } finally {
745 // Don't pin inputs beyond completion
746 ChainingListenableFuture.this.outputFuture = null;
747 }
748 }
749 }, MoreExecutors.sameThreadExecutor());
750 } catch (UndeclaredThrowableException e) {
751 // Set the cause of the exception as this future's exception
752 setException(e.getCause());
753 } catch (RuntimeException e) {
754 // This exception is irrelevant in this thread, but useful for the
755 // client
756 setException(e);
757 } catch (Error e) {
758 // Propagate errors up ASAP - our superclass will rethrow the error
759 setException(e);
760 } finally {
761 // Don't pin inputs beyond completion
762 function = null;
763 inputFuture = null;
764 // Allow our get routines to examine outputFuture now.
765 outputCreated.countDown();
766 }
767 }
768 }
769
770 /**
771 * A checked future that uses a function to map from exceptions to the
772 * appropriate checked type.
773 */
774 private static class MappingCheckedFuture<V, X extends Exception> extends
775 AbstractCheckedFuture<V, X> {
776
777 final Function<Exception, X> mapper;
778
779 MappingCheckedFuture(ListenableFuture<V> delegate,
780 Function<Exception, X> mapper) {
781 super(delegate);
782
783 this.mapper = checkNotNull(mapper);
784 }
785
786 @Override
787 protected X mapException(Exception e) {
788 return mapper.apply(e);
789 }
790 }
791
792 /**
793 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This
794 * will wait on the future to finish, and when it completes, run the
795 * listeners. This implementation will wait on the source future
796 * indefinitely, so if the source future never completes, the adapter will
797 * never complete either.
798 *
799 * <p>If the delegate future is interrupted or throws an unexpected unchecked
800 * exception, the listeners will not be invoked.
801 */
802 private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
803 implements ListenableFuture<V> {
804
805 private static final ThreadFactory threadFactory =
806 new ThreadFactoryBuilder()
807 .setNameFormat("ListenableFutureAdapter-thread-%d")
808 .build();
809 private static final Executor defaultAdapterExecutor =
810 Executors.newCachedThreadPool(threadFactory);
811
812 private final Executor adapterExecutor;
813
814 // The execution list to hold our listeners.
815 private final ExecutionList executionList = new ExecutionList();
816
817 // This allows us to only start up a thread waiting on the delegate future
818 // when the first listener is added.
819 private final AtomicBoolean hasListeners = new AtomicBoolean(false);
820
821 // The delegate future.
822 private final Future<V> delegate;
823
824 ListenableFutureAdapter(Future<V> delegate) {
825 this(delegate, defaultAdapterExecutor);
826 }
827
828 ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
829 this.delegate = checkNotNull(delegate);
830 this.adapterExecutor = checkNotNull(adapterExecutor);
831 }
832
833 @Override
834 protected Future<V> delegate() {
835 return delegate;
836 }
837
838 @Override
839 public void addListener(Runnable listener, Executor exec) {
840 executionList.add(listener, exec);
841
842 // When a listener is first added, we run a task that will wait for
843 // the delegate to finish, and when it is done will run the listeners.
844 if (hasListeners.compareAndSet(false, true)) {
845 if (delegate.isDone()) {
846 // If the delegate is already done, run the execution list
847 // immediately on the current thread.
848 executionList.run();
849 return;
850 }
851
852 adapterExecutor.execute(new Runnable() {
853 @Override
854 public void run() {
855 try {
856 delegate.get();
857 } catch (Error e) {
858 throw e;
859 } catch (InterruptedException e) {
860 // This thread was interrupted. This should never happen, so we
861 // throw an IllegalStateException.
862 Thread.currentThread().interrupt();
863 throw new IllegalStateException("Adapter thread interrupted!", e);
864 } catch (Throwable e) {
865 // ExecutionException / CancellationException / RuntimeException
866 // The task is done, run the listeners.
867 }
868 executionList.run();
869 }
870 });
871 }
872 }
873 }
874 }