001 /*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.util.concurrent;
018
019 import static com.google.common.base.Preconditions.checkArgument;
020 import static com.google.common.base.Preconditions.checkNotNull;
021 import static com.google.common.base.Preconditions.checkState;
022 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
023 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
024 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly;
025 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly;
026 import static java.lang.Thread.currentThread;
027 import static java.util.Arrays.asList;
028 import static java.util.concurrent.TimeUnit.NANOSECONDS;
029
030 import com.google.common.annotations.Beta;
031 import com.google.common.base.Function;
032 import com.google.common.base.Preconditions;
033 import com.google.common.collect.ImmutableList;
034 import com.google.common.collect.Lists;
035 import com.google.common.collect.Ordering;
036
037 import java.lang.reflect.Constructor;
038 import java.lang.reflect.InvocationTargetException;
039 import java.lang.reflect.UndeclaredThrowableException;
040 import java.util.Arrays;
041 import java.util.List;
042 import java.util.concurrent.BlockingQueue;
043 import java.util.concurrent.CancellationException;
044 import java.util.concurrent.CountDownLatch;
045 import java.util.concurrent.ExecutionException;
046 import java.util.concurrent.Executor;
047 import java.util.concurrent.Future;
048 import java.util.concurrent.LinkedBlockingQueue;
049 import java.util.concurrent.TimeUnit;
050 import java.util.concurrent.TimeoutException;
051 import java.util.concurrent.atomic.AtomicInteger;
052
053 import javax.annotation.Nullable;
054
055 /**
056 * Static utility methods pertaining to the {@link Future} interface.
057 *
058 * @author Kevin Bourrillion
059 * @author Nishant Thakkar
060 * @author Sven Mawson
061 * @since 1.0
062 */
063 @Beta
064 public final class Futures {
065 private Futures() {}
066
067 /**
068 * Returns an uninterruptible view of a {@code Future}. If a thread is
069 * interrupted during an attempt to {@code get()} from the returned future, it
070 * continues to wait on the result until it is available or the timeout
071 * elapses, and only then re-interrupts the thread.
072 * @deprecated Use
073 * {@link Uninterruptibles#getUninterruptibly(Future) getUninterruptibly}.
074 * <b>This method is scheduled for deletion in Guava Release 11.</b>
075 */
076 @Deprecated @SuppressWarnings("deprecation")
077 public
078 static <V> UninterruptibleFuture<V> makeUninterruptible(
079 final Future<V> future) {
080 checkNotNull(future);
081 if (future instanceof UninterruptibleFuture<?>) {
082 return (UninterruptibleFuture<V>) future;
083 }
084 return new UninterruptibleFuture<V>() {
085 @Override
086 public boolean cancel(boolean mayInterruptIfRunning) {
087 return future.cancel(mayInterruptIfRunning);
088 }
089 @Override
090 public boolean isCancelled() {
091 return future.isCancelled();
092 }
093 @Override
094 public boolean isDone() {
095 return future.isDone();
096 }
097
098 @Override
099 public V get(long timeout, TimeUnit unit)
100 throws TimeoutException, ExecutionException {
101 return Uninterruptibles.getUninterruptibly(future, timeout, unit);
102 }
103
104 @Override
105 public V get() throws ExecutionException {
106 return Uninterruptibles.getUninterruptibly(future);
107 }
108 };
109 }
110
111 /**
112 *
113 * <p>Creates a {@link ListenableFuture} out of a normal {@link Future}. The
114 * returned future will create a thread to wait for the source future to
115 * complete before executing the listeners.
116 *
117 * <p><b>Warning:</b> If the input future does not already implement {@link
118 * ListenableFuture}, the returned future will emulate {@link
119 * ListenableFuture#addListener} by taking a thread from an internal,
120 * unbounded pool at the first call to {@code addListener} and holding it
121 * until the future is {@linkplain Future#isDone() done}.
122 *
123 * @deprecated Prefer to create {@code ListenableFuture} instances with {@link
124 * SettableFuture}, {@link MoreExecutors#listeningDecorator(
125 * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
126 * {@link AbstractFuture}, and other utilities over creating plain {@code
127 * Future} instances to be upgraded to {@code ListenableFuture} after the
128 * fact. If this is not possible, the functionality of {@code
129 * makeListenable} is now available as {@link
130 * JdkFutureAdapters#listenInPoolThread}. <b>This method is scheduled
131 * for deletion from Guava in Guava release 11.0.</b>
132 */
133 @Deprecated
134 public
135 static <V> ListenableFuture<V> makeListenable(Future<V> future) {
136 return JdkFutureAdapters.listenInPoolThread(future);
137 }
138
139 /**
140 * Creates a {@link CheckedFuture} out of a normal {@link Future} and a
141 * {@link Function} that maps from {@link Exception} instances into the
142 * appropriate checked type.
143 *
144 * <p><b>Warning:</b> If the input future does not implement {@link
145 * ListenableFuture}, the returned future will emulate {@link
146 * ListenableFuture#addListener} by taking a thread from an internal,
147 * unbounded pool at the first call to {@code addListener} and holding it
148 * until the future is {@linkplain Future#isDone() done}.
149 *
150 * <p>The given mapping function will be applied to an
151 * {@link InterruptedException}, a {@link CancellationException}, or an
152 * {@link ExecutionException} with the actual cause of the exception.
153 * See {@link Future#get()} for details on the exceptions thrown.
154 *
155 * @deprecated Obtain a {@link ListenableFuture}, following the advice in its
156 * documentation and use {@link #makeChecked(ListenableFuture, Function)}.
157 * <b>This method is scheduled for deletion from Guava in Guava release
158 * 11.0.</b>
159 */
160 @Deprecated
161 public
162 static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
163 Future<V> future, Function<Exception, X> mapper) {
164 return new MappingCheckedFuture<V, X>(makeListenable(future), mapper);
165 }
166
167 /**
168 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture}
169 * and a {@link Function} that maps from {@link Exception} instances into the
170 * appropriate checked type.
171 *
172 * <p>The given mapping function will be applied to an
173 * {@link InterruptedException}, a {@link CancellationException}, or an
174 * {@link ExecutionException} with the actual cause of the exception.
175 * See {@link Future#get()} for details on the exceptions thrown.
176 *
177 * @since 9.0 (source-compatible since 1.0)
178 */
179 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked(
180 ListenableFuture<V> future, Function<Exception, X> mapper) {
181 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper);
182 }
183
184 /**
185 * Creates a {@code ListenableFuture} which has its value set immediately upon
186 * construction. The getters just return the value. This {@code Future} can't
187 * be canceled or timed out and its {@code isDone()} method always returns
188 * {@code true}.
189 */
190 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) {
191 SettableFuture<V> future = SettableFuture.create();
192 future.set(value);
193 return future;
194 }
195
196 /**
197 * Returns a {@code CheckedFuture} which has its value set immediately upon
198 * construction.
199 *
200 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
201 * method always returns {@code true}. Calling {@code get()} or {@code
202 * checkedGet()} will immediately return the provided value.
203 */
204 public static <V, X extends Exception> CheckedFuture<V, X>
205 immediateCheckedFuture(@Nullable V value) {
206 SettableFuture<V> future = SettableFuture.create();
207 future.set(value);
208 return Futures.makeChecked(future, new Function<Exception, X>() {
209 @Override
210 public X apply(Exception e) {
211 throw new AssertionError("impossible");
212 }
213 });
214 }
215
216 /**
217 * Returns a {@code ListenableFuture} which has an exception set immediately
218 * upon construction.
219 *
220 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
221 * method always returns {@code true}. Calling {@code get()} will immediately
222 * throw the provided {@code Throwable} wrapped in an {@code
223 * ExecutionException}.
224 *
225 * @throws Error if the throwable is an {@link Error}.
226 */
227 public static <V> ListenableFuture<V> immediateFailedFuture(
228 Throwable throwable) {
229 checkNotNull(throwable);
230 SettableFuture<V> future = SettableFuture.create();
231 future.setException(throwable);
232 return future;
233 }
234
235 /**
236 * Returns a {@code CheckedFuture} which has an exception set immediately upon
237 * construction.
238 *
239 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()}
240 * method always returns {@code true}. Calling {@code get()} will immediately
241 * throw the provided {@code Throwable} wrapped in an {@code
242 * ExecutionException}, and calling {@code checkedGet()} will throw the
243 * provided exception itself.
244 *
245 * @throws Error if the throwable is an {@link Error}.
246 */
247 public static <V, X extends Exception> CheckedFuture<V, X>
248 immediateFailedCheckedFuture(final X exception) {
249 checkNotNull(exception);
250 return makeChecked(Futures.<V>immediateFailedFuture(exception),
251 new Function<Exception, X>() {
252 @Override
253 public X apply(Exception e) {
254 return exception;
255 }
256 });
257 }
258
259 /**
260 * Returns a new {@code ListenableFuture} whose result is asynchronously
261 * derived from the result of the given {@code Future}. More precisely, the
262 * returned {@code Future} takes its result from a {@code Future} produced by
263 * applying the given {@code Function} to the result of the original {@code
264 * Future}. Example:
265 *
266 * <pre> {@code
267 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
268 * Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
269 * new Function<RowKey, ListenableFuture<QueryResult>>() {
270 * public ListenableFuture<QueryResult> apply(RowKey rowKey) {
271 * return dataService.read(rowKey);
272 * }
273 * };
274 * ListenableFuture<QueryResult> queryFuture =
275 * chain(queryFuture, queryFunction);
276 * }</pre>
277 *
278 * <p>Note: This overload of {@code chain} is designed for cases in which the
279 * work of creating the derived future is fast and lightweight, as the method
280 * does not accept an {@code Executor} to perform the the work in. For heavier
281 * derivations, this overload carries some caveats: First, the thread that the
282 * derivation runs in depends on whether the input {@code Future} is done at
283 * the time {@code chain} is called. In particular, if called late, {@code
284 * chain} will run the derivation in the thread that called {@code chain}.
285 * Second, derivations may run in an internal thread of the system responsible
286 * for the input {@code Future}, such as an RPC network thread. Finally,
287 * during the execution of a {@link MoreExecutors#sameThreadExecutor
288 * sameThreadExecutor} {@code chain} function, all other registered but
289 * unexecuted listeners are prevented from running, even if those listeners
290 * are to run in other executors.
291 *
292 * <p>The returned {@code Future} attempts to keep its cancellation state in
293 * sync with that of the input future and that of the future returned by the
294 * chain function. That is, if the returned {@code Future} is cancelled, it
295 * will attempt to cancel the other two, and if either of the other two is
296 * cancelled, the returned {@code Future} will receive a callback in which it
297 * will attempt to cancel itself.
298 *
299 * <p>The typical use for this method would be when a RPC call is dependent on
300 * the results of another RPC. One would call the first RPC (input), create a
301 * function that calls another RPC based on input's result, and then call
302 * chain on input and that function to get a {@code ListenableFuture} of
303 * the result.
304 *
305 * @param input The future to chain
306 * @param function A function to chain the results of the provided future
307 * to the results of the returned future. This will be run in the thread
308 * that notifies input it is complete.
309 * @return A future that holds result of the chain.
310 */
311 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
312 Function<? super I, ? extends ListenableFuture<? extends O>> function) {
313 return chain(input, function, MoreExecutors.sameThreadExecutor());
314 }
315
316 /**
317 * Returns a new {@code ListenableFuture} whose result is asynchronously
318 * derived from the result of the given {@code Future}. More precisely, the
319 * returned {@code Future} takes its result from a {@code Future} produced by
320 * applying the given {@code Function} to the result of the original {@code
321 * Future}. Example:
322 *
323 * <pre> {@code
324 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
325 * Function<RowKey, ListenableFuture<QueryResult>> queryFunction =
326 * new Function<RowKey, ListenableFuture<QueryResult>>() {
327 * public ListenableFuture<QueryResult> apply(RowKey rowKey) {
328 * return dataService.read(rowKey);
329 * }
330 * };
331 * ListenableFuture<QueryResult> queryFuture =
332 * chain(queryFuture, queryFunction, executor);
333 * }</pre>
334 *
335 * <p>The returned {@code Future} attempts to keep its cancellation state in
336 * sync with that of the input future and that of the future returned by the
337 * chain function. That is, if the returned {@code Future} is cancelled, it
338 * will attempt to cancel the other two, and if either of the other two is
339 * cancelled, the returned {@code Future} will receive a callback in which it
340 * will attempt to cancel itself.
341 *
342 * <p>Note: For cases in which the work of creating the derived future is fast
343 * and lightweight, consider {@linkplain Futures#chain(ListenableFuture,
344 * Function) the other overload} or explicit use of {@link
345 * MoreExecutors#sameThreadExecutor}. For heavier derivations, this choice
346 * carries some caveats: First, the thread that the derivation runs in depends
347 * on whether the input {@code Future} is done at the time {@code chain} is
348 * called. In particular, if called late, {@code chain} will run the
349 * derivation in the thread that called {@code chain}. Second, derivations may
350 * run in an internal thread of the system responsible for the input {@code
351 * Future}, such as an RPC network thread. Finally, during the execution of a
352 * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor} {@code chain}
353 * function, all other registered but unexecuted listeners are prevented from
354 * running, even if those listeners are to run in other executors.
355 *
356 * @param input The future to chain
357 * @param function A function to chain the results of the provided future
358 * to the results of the returned future.
359 * @param exec Executor to run the function in.
360 * @return A future that holds result of the chain.
361 */
362 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
363 Function<? super I, ? extends ListenableFuture<? extends O>> function,
364 Executor exec) {
365 ChainingListenableFuture<I, O> chain =
366 new ChainingListenableFuture<I, O>(function, input);
367 input.addListener(chain, exec);
368 return chain;
369 }
370
371 /**
372 * Returns a new {@code ListenableFuture} whose result is the product of
373 * applying the given {@code Function} to the result of the given {@code
374 * Future}. Example:
375 *
376 * <pre> {@code
377 * ListenableFuture<QueryResult> queryFuture = ...;
378 * Function<QueryResult, List<Row>> rowsFunction =
379 * new Function<QueryResult, List<Row>>() {
380 * public List<Row> apply(QueryResult queryResult) {
381 * return queryResult.getRows();
382 * }
383 * };
384 * ListenableFuture<List<Row>> rowsFuture =
385 * transform(queryFuture, rowsFunction);
386 * }</pre>
387 *
388 * <p>Note: This overload of {@code transform} is designed for cases in which
389 * the transformation is fast and lightweight, as the method does not accept
390 * an {@code Executor} to perform the the work in. For heavier
391 * transformations, this overload carries some caveats: First, the thread that
392 * the transformation runs in depends on whether the input {@code Future} is
393 * done at the time {@code transform} is called. In particular, if called
394 * late, {@code transform} will perform the transformation in the thread that
395 * called {@code transform}. Second, transformations may run in an internal
396 * thread of the system responsible for the input {@code Future}, such as an
397 * RPC network thread. Finally, during the execution of a {@link
398 * MoreExecutors#sameThreadExecutor sameThreadExecutor} transformation, all
399 * other registered but unexecuted listeners are prevented from running, even
400 * if those listeners are to run in other executors.
401 *
402 * <p>The returned {@code Future} attempts to keep its cancellation state in
403 * sync with that of the input future. That is, if the returned {@code Future}
404 * is cancelled, it will attempt to cancel the input, and if the input is
405 * cancelled, the returned {@code Future} will receive a callback in which it
406 * will attempt to cancel itself.
407 *
408 * <p>An example use of this method is to convert a serializable object
409 * returned from an RPC into a POJO.
410 *
411 * @param future The future to transform
412 * @param function A Function to transform the results of the provided future
413 * to the results of the returned future. This will be run in the thread
414 * that notifies input it is complete.
415 * @return A future that holds result of the transformation.
416 * @since 9.0 (in 1.0 as {@code compose})
417 */
418 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
419 final Function<? super I, ? extends O> function) {
420 return transform(future, function, MoreExecutors.sameThreadExecutor());
421 }
422
423 /**
424 * Returns a new {@code ListenableFuture} whose result is the product of
425 * applying the given {@code Function} to the result of the given {@code
426 * Future}. Example:
427 *
428 * <pre> {@code
429 * ListenableFuture<QueryResult> queryFuture = ...;
430 * Function<QueryResult, List<Row>> rowsFunction =
431 * new Function<QueryResult, List<Row>>() {
432 * public List<Row> apply(QueryResult queryResult) {
433 * return queryResult.getRows();
434 * }
435 * };
436 * ListenableFuture<List<Row>> rowsFuture =
437 * transform(queryFuture, rowsFunction, executor);
438 * }</pre>
439 *
440 * <p>The returned {@code Future} attempts to keep its cancellation state in
441 * sync with that of the input future. That is, if the returned {@code Future}
442 * is cancelled, it will attempt to cancel the input, and if the input is
443 * cancelled, the returned {@code Future} will receive a callback in which it
444 * will attempt to cancel itself.
445 *
446 * <p>An example use of this method is to convert a serializable object
447 * returned from an RPC into a POJO.
448 *
449 * <p>Note: For cases in which the transformation is fast and lightweight,
450 * consider {@linkplain Futures#transform(ListenableFuture, Function) the
451 * other overload} or explicit use of {@link
452 * MoreExecutors#sameThreadExecutor}. For heavier transformations, this choice
453 * carries some caveats: First, the thread that the transformation runs in
454 * depends on whether the input {@code Future} is done at the time {@code
455 * transform} is called. In particular, if called late, {@code transform} will
456 * perform the transformation in the thread that called {@code transform}.
457 * Second, transformations may run in an internal thread of the system
458 * responsible for the input {@code Future}, such as an RPC network thread.
459 * Finally, during the execution of a {@link MoreExecutors#sameThreadExecutor
460 * sameThreadExecutor} transformation, all other registered but unexecuted
461 * listeners are prevented from running, even if those listeners are to run
462 * in other executors.
463 *
464 * @param future The future to transform
465 * @param function A Function to transform the results of the provided future
466 * to the results of the returned future.
467 * @param exec Executor to run the function in.
468 * @return A future that holds result of the transformation.
469 * @since 9.0 (in 2.0 as {@code compose})
470 */
471 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future,
472 final Function<? super I, ? extends O> function, Executor exec) {
473 checkNotNull(function);
474 Function<I, ListenableFuture<O>> wrapperFunction
475 = new Function<I, ListenableFuture<O>>() {
476 @Override public ListenableFuture<O> apply(I input) {
477 O output = function.apply(input);
478 return immediateFuture(output);
479 }
480 };
481 return chain(future, wrapperFunction, exec);
482 }
483
484 /**
485 * Like {@link #transform(ListenableFuture, Function)} except that the
486 * transformation {@code function} is invoked on each call to
487 * {@link Future#get() get()} on the returned future.
488 *
489 * <p>The returned {@code Future} reflects the input's cancellation
490 * state directly, and any attempt to cancel the returned Future is likewise
491 * passed through to the input Future.
492 *
493 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get}
494 * only apply the timeout to the execution of the underlying {@code Future},
495 * <em>not</em> to the execution of the transformation function.
496 *
497 * <p>The primary audience of this method is callers of {@code transform}
498 * who don't have a {@code ListenableFuture} available and
499 * do not mind repeated, lazy function evaluation.
500 *
501 * @param future The future to transform
502 * @param function A Function to transform the results of the provided future
503 * to the results of the returned future.
504 * @return A future that returns the result of the transformation.
505 * @since 10.0
506 */
507 @Beta
508 public static <I, O> Future<O> lazyTransform(final Future<I> future,
509 final Function<? super I, ? extends O> function) {
510 checkNotNull(future);
511 checkNotNull(function);
512 return new Future<O>() {
513
514 @Override
515 public boolean cancel(boolean mayInterruptIfRunning) {
516 return future.cancel(mayInterruptIfRunning);
517 }
518
519 @Override
520 public boolean isCancelled() {
521 return future.isCancelled();
522 }
523
524 @Override
525 public boolean isDone() {
526 return future.isDone();
527 }
528
529 @Override
530 public O get() throws InterruptedException, ExecutionException {
531 return applyTransformation(future.get());
532 }
533
534 @Override
535 public O get(long timeout, TimeUnit unit)
536 throws InterruptedException, ExecutionException, TimeoutException {
537 return applyTransformation(future.get(timeout, unit));
538 }
539
540 private O applyTransformation(I input) throws ExecutionException {
541 try {
542 return function.apply(input);
543 } catch (Throwable t) {
544 throw new ExecutionException(t);
545 }
546 }
547 };
548 }
549
550 /**
551 * Returns a new {@code Future} whose result is the product of applying the
552 * given {@code Function} to the result of the given {@code Future}. Example:
553 *
554 * <pre> {@code
555 * Future<QueryResult> queryFuture = ...;
556 * Function<QueryResult, List<Row>> rowsFunction =
557 * new Function<QueryResult, List<Row>>() {
558 * public List<Row> apply(QueryResult queryResult) {
559 * return queryResult.getRows();
560 * }
561 * };
562 * Future<List<Row>> rowsFuture = transform(queryFuture, rowsFunction);
563 * }</pre>
564 *
565 * <p>Each call to {@code Future<O>.get(*)} results in a call to
566 * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it
567 * is assumed that {@code Future<I>.get(*)} is idempotent.
568 *
569 * <p>When calling {@link Future#get(long, TimeUnit)} on the returned
570 * future, the timeout only applies to the future passed in to this method.
571 * Any additional time taken by applying {@code function} is not considered.
572 * (Exception: If the input future is a {@link ListenableFuture}, timeouts
573 * will be strictly enforced.)
574 *
575 * @param future The future to transform
576 * @param function A Function to transform the results of the provided future
577 * to the results of the returned future. This will be run in the thread
578 * that calls one of the varieties of {@code get()}.
579 * @return A future that computes result of the transformation
580 * @since 9.0 (in 1.0 as {@code compose})
581 * @deprecated Obtain a {@code ListenableFuture} (following the advice in its
582 * documentation) and use {@link #transform(ListenableFuture, Function)}
583 * or use {@link #lazyTransform(Future, Function)}, which will apply the
584 * transformation on each call to {@code get()}.
585 * <b>This method is scheduled for deletion from Guava in Guava release
586 * 11.0.</b>
587 */
588 @Deprecated
589 public static <I, O> Future<O> transform(final Future<I> future,
590 final Function<? super I, ? extends O> function) {
591 if (future instanceof ListenableFuture) {
592 return transform((ListenableFuture<I>) future, function);
593 }
594 checkNotNull(future);
595 checkNotNull(function);
596 return new Future<O>() {
597
598 /*
599 * Concurrency detail:
600 *
601 * <p>To preserve the idempotency of calls to this.get(*) calls to the
602 * function are only applied once. A lock is required to prevent multiple
603 * applications of the function. The calls to future.get(*) are performed
604 * outside the lock, as is required to prevent calls to
605 * get(long, TimeUnit) to persist beyond their timeout.
606 *
607 * <p>Calls to future.get(*) on every call to this.get(*) also provide
608 * the cancellation behavior for this.
609 *
610 * <p>(Consider: in thread A, call get(), in thread B call get(long,
611 * TimeUnit). Thread B may have to wait for Thread A to finish, which
612 * would be unacceptable.)
613 *
614 * <p>Note that each call to Future<O>.get(*) results in a call to
615 * Future<I>.get(*), but the function is only applied once, so
616 * Future<I>.get(*) is assumed to be idempotent.
617 */
618
619 private final Object lock = new Object();
620 private boolean set = false;
621 private O value = null;
622 private ExecutionException exception = null;
623
624 @Override
625 public O get() throws InterruptedException, ExecutionException {
626 return apply(future.get());
627 }
628
629 @Override
630 public O get(long timeout, TimeUnit unit) throws InterruptedException,
631 ExecutionException, TimeoutException {
632 return apply(future.get(timeout, unit));
633 }
634
635 private O apply(I raw) throws ExecutionException {
636 synchronized (lock) {
637 if (!set) {
638 try {
639 value = function.apply(raw);
640 } catch (RuntimeException e) {
641 exception = new ExecutionException(e);
642 } catch (Error e) {
643 exception = new ExecutionException(e);
644 }
645 set = true;
646 }
647
648 if (exception != null) {
649 throw exception;
650 }
651 return value;
652 }
653 }
654
655 @Override
656 public boolean cancel(boolean mayInterruptIfRunning) {
657 return future.cancel(mayInterruptIfRunning);
658 }
659
660 @Override
661 public boolean isCancelled() {
662 return future.isCancelled();
663 }
664
665 @Override
666 public boolean isDone() {
667 return future.isDone();
668 }
669 };
670 }
671
672 /**
673 * An implementation of {@code ListenableFuture} that also implements
674 * {@code Runnable} so that it can be used to nest ListenableFutures.
675 * Once the passed-in {@code ListenableFuture} is complete, it calls the
676 * passed-in {@code Function} to generate the result.
677 *
678 * <p>If the function throws any checked exceptions, they should be wrapped
679 * in a {@code UndeclaredThrowableException} so that this class can get
680 * access to the cause.
681 */
682 private static class ChainingListenableFuture<I, O>
683 extends AbstractFuture<O> implements Runnable {
684
685 private Function<? super I, ? extends ListenableFuture<? extends O>>
686 function;
687 private ListenableFuture<? extends I> inputFuture;
688 private volatile ListenableFuture<? extends O> outputFuture;
689 private final BlockingQueue<Boolean> mayInterruptIfRunningChannel =
690 new LinkedBlockingQueue<Boolean>(1);
691 private final CountDownLatch outputCreated = new CountDownLatch(1);
692
693 private ChainingListenableFuture(
694 Function<? super I, ? extends ListenableFuture<? extends O>> function,
695 ListenableFuture<? extends I> inputFuture) {
696 this.function = checkNotNull(function);
697 this.inputFuture = checkNotNull(inputFuture);
698 }
699
700 /**
701 * Delegate the get() to the input and output futures, in case
702 * their implementations defer starting computation until their
703 * own get() is invoked.
704 */
705 @Override
706 public O get() throws InterruptedException, ExecutionException {
707 if (!isDone()) {
708 // Invoking get on the inputFuture will ensure our own run()
709 // method below is invoked as a listener when inputFuture sets
710 // its value. Therefore when get() returns we should then see
711 // the outputFuture be created.
712 ListenableFuture<? extends I> inputFuture = this.inputFuture;
713 if (inputFuture != null) {
714 inputFuture.get();
715 }
716
717 // If our listener was scheduled to run on an executor we may
718 // need to wait for our listener to finish running before the
719 // outputFuture has been constructed by the function.
720 outputCreated.await();
721
722 // Like above with the inputFuture, we have a listener on
723 // the outputFuture that will set our own value when its
724 // value is set. Invoking get will ensure the output can
725 // complete and invoke our listener, so that we can later
726 // get the result.
727 ListenableFuture<? extends O> outputFuture = this.outputFuture;
728 if (outputFuture != null) {
729 outputFuture.get();
730 }
731 }
732 return super.get();
733 }
734
735 /**
736 * Delegate the get() to the input and output futures, in case
737 * their implementations defer starting computation until their
738 * own get() is invoked.
739 */
740 @Override
741 public O get(long timeout, TimeUnit unit) throws TimeoutException,
742 ExecutionException, InterruptedException {
743 if (!isDone()) {
744 // Use a single time unit so we can decrease remaining timeout
745 // as we wait for various phases to complete.
746 if (unit != NANOSECONDS) {
747 timeout = NANOSECONDS.convert(timeout, unit);
748 unit = NANOSECONDS;
749 }
750
751 // Invoking get on the inputFuture will ensure our own run()
752 // method below is invoked as a listener when inputFuture sets
753 // its value. Therefore when get() returns we should then see
754 // the outputFuture be created.
755 ListenableFuture<? extends I> inputFuture = this.inputFuture;
756 if (inputFuture != null) {
757 long start = System.nanoTime();
758 inputFuture.get(timeout, unit);
759 timeout -= Math.max(0, System.nanoTime() - start);
760 }
761
762 // If our listener was scheduled to run on an executor we may
763 // need to wait for our listener to finish running before the
764 // outputFuture has been constructed by the function.
765 long start = System.nanoTime();
766 if (!outputCreated.await(timeout, unit)) {
767 throw new TimeoutException();
768 }
769 timeout -= Math.max(0, System.nanoTime() - start);
770
771 // Like above with the inputFuture, we have a listener on
772 // the outputFuture that will set our own value when its
773 // value is set. Invoking get will ensure the output can
774 // complete and invoke our listener, so that we can later
775 // get the result.
776 ListenableFuture<? extends O> outputFuture = this.outputFuture;
777 if (outputFuture != null) {
778 outputFuture.get(timeout, unit);
779 }
780 }
781 return super.get(timeout, unit);
782 }
783
784 @Override
785 public boolean cancel(boolean mayInterruptIfRunning) {
786 /*
787 * Our additional cancellation work needs to occur even if
788 * !mayInterruptIfRunning, so we can't move it into interruptTask().
789 */
790 if (super.cancel(mayInterruptIfRunning)) {
791 // This should never block since only one thread is allowed to cancel
792 // this Future.
793 putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning);
794 cancel(inputFuture, mayInterruptIfRunning);
795 cancel(outputFuture, mayInterruptIfRunning);
796 return true;
797 }
798 return false;
799 }
800
801 private void cancel(@Nullable Future<?> future,
802 boolean mayInterruptIfRunning) {
803 if (future != null) {
804 future.cancel(mayInterruptIfRunning);
805 }
806 }
807
808 @Override
809 public void run() {
810 try {
811 I sourceResult;
812 try {
813 sourceResult = getUninterruptibly(inputFuture);
814 } catch (CancellationException e) {
815 // Cancel this future and return.
816 // At this point, inputFuture is cancelled and outputFuture doesn't
817 // exist, so the value of mayInterruptIfRunning is irrelevant.
818 cancel(false);
819 return;
820 } catch (ExecutionException e) {
821 // Set the cause of the exception as this future's exception
822 setException(e.getCause());
823 return;
824 }
825
826 final ListenableFuture<? extends O> outputFuture = this.outputFuture =
827 function.apply(sourceResult);
828 if (isCancelled()) {
829 // Handles the case where cancel was called while the function was
830 // being applied.
831 // There is a gap in cancel(boolean) between calling sync.cancel()
832 // and storing the value of mayInterruptIfRunning, so this thread
833 // needs to block, waiting for that value.
834 outputFuture.cancel(
835 takeUninterruptibly(mayInterruptIfRunningChannel));
836 this.outputFuture = null;
837 return;
838 }
839 outputFuture.addListener(new Runnable() {
840 @Override
841 public void run() {
842 try {
843 // Here it would have been nice to have had an
844 // UninterruptibleListenableFuture, but we don't want to start a
845 // combinatorial explosion of interfaces, so we have to make do.
846 set(getUninterruptibly(outputFuture));
847 } catch (CancellationException e) {
848 // Cancel this future and return.
849 // At this point, inputFuture and outputFuture are done, so the
850 // value of mayInterruptIfRunning is irrelevant.
851 cancel(false);
852 return;
853 } catch (ExecutionException e) {
854 // Set the cause of the exception as this future's exception
855 setException(e.getCause());
856 } finally {
857 // Don't pin inputs beyond completion
858 ChainingListenableFuture.this.outputFuture = null;
859 }
860 }
861 }, MoreExecutors.sameThreadExecutor());
862 } catch (UndeclaredThrowableException e) {
863 // Set the cause of the exception as this future's exception
864 setException(e.getCause());
865 } catch (RuntimeException e) {
866 // This exception is irrelevant in this thread, but useful for the
867 // client
868 setException(e);
869 } catch (Error e) {
870 // Propagate errors up ASAP - our superclass will rethrow the error
871 setException(e);
872 } finally {
873 // Don't pin inputs beyond completion
874 function = null;
875 inputFuture = null;
876 // Allow our get routines to examine outputFuture now.
877 outputCreated.countDown();
878 }
879 }
880 }
881
882 /**
883 * Creates a new {@code ListenableFuture} whose value is a list containing the
884 * values of all its input futures, if all succeed. If any input fails, the
885 * returned future fails.
886 *
887 * <p>The list of results is in the same order as the input list.
888 *
889 * <p>Canceling this future does not cancel any of the component futures;
890 * however, if any of the provided futures fails or is canceled, this one is,
891 * too.
892 *
893 * @param futures futures to combine
894 * @return a future that provides a list of the results of the component
895 * futures
896 * @since 10.0
897 */
898 @Beta
899 public static <V> ListenableFuture<List<V>> allAsList(
900 ListenableFuture<? extends V>... futures) {
901 return new ListFuture<V>(ImmutableList.copyOf(futures), true,
902 MoreExecutors.sameThreadExecutor());
903 }
904
905 /**
906 * Creates a new {@code ListenableFuture} whose value is a list containing the
907 * values of all its input futures, if all succeed. If any input fails, the
908 * returned future fails.
909 *
910 * <p>The list of results is in the same order as the input list.
911 *
912 * <p>Canceling this future does not cancel any of the component futures;
913 * however, if any of the provided futures fails or is canceled, this one is,
914 * too.
915 *
916 * @param futures futures to combine
917 * @return a future that provides a list of the results of the component
918 * futures
919 * @since 10.0
920 */
921 @Beta
922 public static <V> ListenableFuture<List<V>> allAsList(
923 Iterable<? extends ListenableFuture<? extends V>> futures) {
924 return new ListFuture<V>(ImmutableList.copyOf(futures), true,
925 MoreExecutors.sameThreadExecutor());
926 }
927
928 /**
929 * Creates a new {@code ListenableFuture} whose value is a list containing the
930 * values of all its successful input futures. The list of results is in the
931 * same order as the input list, and if any of the provided futures fails or
932 * is canceled, its corresponding position will contain {@code null} (which is
933 * indistinguishable from the future having a successful value of
934 * {@code null}).
935 *
936 * @param futures futures to combine
937 * @return a future that provides a list of the results of the component
938 * futures
939 * @since 10.0
940 */
941 @Beta
942 public static <V> ListenableFuture<List<V>> successfulAsList(
943 ListenableFuture<? extends V>... futures) {
944 return new ListFuture<V>(ImmutableList.copyOf(futures), false,
945 MoreExecutors.sameThreadExecutor());
946 }
947
948 /**
949 * Creates a new {@code ListenableFuture} whose value is a list containing the
950 * values of all its successful input futures. The list of results is in the
951 * same order as the input list, and if any of the provided futures fails or
952 * is canceled, its corresponding position will contain {@code null} (which is
953 * indistinguishable from the future having a successful value of
954 * {@code null}).
955 *
956 * @param futures futures to combine
957 * @return a future that provides a list of the results of the component
958 * futures
959 * @since 10.0
960 */
961 @Beta
962 public static <V> ListenableFuture<List<V>> successfulAsList(
963 Iterable<? extends ListenableFuture<? extends V>> futures) {
964 return new ListFuture<V>(ImmutableList.copyOf(futures), false,
965 MoreExecutors.sameThreadExecutor());
966 }
967
968 /**
969 * Registers separate success and failure callbacks to be run when the {@code
970 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
971 * complete} or, if the computation is already complete, immediately.
972 *
973 * <p>There is no guaranteed ordering of execution of callbacks, but any
974 * callback added through this method is guaranteed to be called once the
975 * computation is complete.
976 *
977 * Example: <pre> {@code
978 * ListenableFuture<QueryResult> future = ...;
979 * addCallback(future,
980 * new FutureCallback<QueryResult> {
981 * public void onSuccess(QueryResult result) {
982 * storeInCache(result);
983 * }
984 * public void onFailure(Throwable t) {
985 * reportError(t);
986 * }
987 * });}</pre>
988 *
989 * <p>Note: This overload of {@code addCallback} is designed for cases in
990 * which the callack is fast and lightweight, as the method does not accept
991 * an {@code Executor} to perform the the work in. For heavier
992 * callbacks, this overload carries some caveats: First, the thread that
993 * the callback runs in depends on whether the input {@code Future} is
994 * done at the time {@code addCallback} is called. In particular, if called
995 * late, {@code addCallback} will execute the callback in the thread that
996 * called {@code addCallback}. Second, callbacks may run in an internal
997 * thread of the system responsible for the input {@code Future}, such as an
998 * RPC network thread. Finally, during the execution of a {@link
999 * MoreExecutors#sameThreadExecutor sameThreadExecutor} callback, all other
1000 * registered but unexecuted listeners are prevented from running, even if
1001 * those listeners are to run in other executors.
1002 *
1003 * <p>For a more general interface to attach a completion listener to a
1004 * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1005 *
1006 * @param future The future attach the callback to.
1007 * @param callback The callback to invoke when {@code future} is completed.
1008 * @since 10.0
1009 */
1010 public static <V> void addCallback(ListenableFuture<V> future,
1011 FutureCallback<? super V> callback) {
1012 addCallback(future, callback, MoreExecutors.sameThreadExecutor());
1013 }
1014
1015 /**
1016 * Registers separate success and failure callbacks to be run when the {@code
1017 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone()
1018 * complete} or, if the computation is already complete, immediately.
1019 *
1020 * <p>The callback is run in {@code executor}.
1021 * There is no guaranteed ordering of execution of callbacks, but any
1022 * callback added through this method is guaranteed to be called once the
1023 * computation is complete.
1024 *
1025 * Example: <pre> {@code
1026 * ListenableFuture<QueryResult> future = ...;
1027 * Executor e = ...
1028 * addCallback(future, e,
1029 * new FutureCallback<QueryResult> {
1030 * public void onSuccess(QueryResult result) {
1031 * storeInCache(result);
1032 * }
1033 * public void onFailure(Throwable t) {
1034 * reportError(t);
1035 * }
1036 * });}</pre>
1037 *
1038 * When the callback is fast and lightweight consider
1039 * {@linkplain Futures#addCallback(ListenableFuture, FutureCallback)
1040 * the other overload} or explicit use of
1041 * {@link MoreExecutors#sameThreadExecutor() sameThreadExecutor}. For heavier
1042 * callbacks, this choice carries some caveats: First, the thread that
1043 * the callback runs in depends on whether the input {@code Future} is
1044 * done at the time {@code addCallback} is called. In particular, if called
1045 * late, {@code addCallback} will execute the callback in the thread that
1046 * called {@code addCallback}. Second, callbacks may run in an internal
1047 * thread of the system responsible for the input {@code Future}, such as an
1048 * RPC network thread. Finally, during the execution of a {@link
1049 * MoreExecutors#sameThreadExecutor sameThreadExecutor} callback, all other
1050 * registered but unexecuted listeners are prevented from running, even if
1051 * those listeners are to run in other executors.
1052 *
1053 * <p>For a more general interface to attach a completion listener to a
1054 * {@code Future}, see {@link ListenableFuture#addListener addListener}.
1055 *
1056 * @param future The future attach the callback to.
1057 * @param callback The callback to invoke when {@code future} is completed.
1058 * @param executor The executor to run {@code callback} when the future
1059 * completes.
1060 * @since 10.0
1061 */
1062 public static <V> void addCallback(final ListenableFuture<V> future,
1063 final FutureCallback<? super V> callback, Executor executor) {
1064 Preconditions.checkNotNull(callback);
1065 Runnable callbackListener = new Runnable() {
1066 @Override
1067 public void run() {
1068 try {
1069 // TODO(user): (Before Guava release), validate that this
1070 // is the thing for IE.
1071 V value = getUninterruptibly(future);
1072 callback.onSuccess(value);
1073 } catch (ExecutionException e) {
1074 callback.onFailure(e.getCause());
1075 } catch (RuntimeException e) {
1076 callback.onFailure(e);
1077 } catch (Error e) {
1078 callback.onFailure(e);
1079 }
1080 }
1081 };
1082 future.addListener(callbackListener, executor);
1083 }
1084
1085 /**
1086 * Returns the result of {@link Future#get()}, converting most exceptions to a
1087 * new instance of the given checked exception type. This reduces boilerplate
1088 * for a common use of {@code Future} in which it is unnecessary to
1089 * programmatically distinguish between exception types or to extract other
1090 * information from the exception instance.
1091 *
1092 * <p>Exceptions from {@code Future.get} are treated as follows:
1093 * <ul>
1094 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1095 * {@code X} if the cause is a checked exception, an {@link
1096 * UncheckedExecutionException} if the cause is a {@code
1097 * RuntimeException}, or an {@link ExecutionError} if the cause is an
1098 * {@code Error}.
1099 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1100 * restoring the interrupt).
1101 * <li>Any {@link CancellationException} is propagated untouched, as is any
1102 * other {@link RuntimeException} (though {@code get} implementations are
1103 * discouraged from throwing such exceptions).
1104 * </ul>
1105 *
1106 * The overall principle is to continue to treat every checked exception as a
1107 * checked exception, every unchecked exception as an unchecked exception, and
1108 * every error as an error. In addition, the cause of any {@code
1109 * ExecutionException} is wrapped in order to ensure that the new stack trace
1110 * matches that of the current thread.
1111 *
1112 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1113 * public constructor that accepts zero or more arguments, all of type {@code
1114 * String} or {@code Throwable} (preferring constructors with at least one
1115 * {@code String}) and calling the constructor via reflection. If the
1116 * exception did not already have a cause, one is set by calling {@link
1117 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1118 * {@code IllegalArgumentException} is thrown.
1119 *
1120 * @throws X if {@code get} throws any checked exception except for an {@code
1121 * ExecutionException} whose cause is not itself a checked exception
1122 * @throws UncheckedExecutionException if {@code get} throws an {@code
1123 * ExecutionException} with a {@code RuntimeException} as its cause
1124 * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1125 * with an {@code Error} as its cause
1126 * @throws CancellationException if {@code get} throws a {@code
1127 * CancellationException}
1128 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1129 * RuntimeException} or does not have a suitable constructor
1130 * @since 10.0
1131 */
1132 @Beta
1133 public static <V, X extends Exception> V get(
1134 Future<V> future, Class<X> exceptionClass) throws X {
1135 checkNotNull(future);
1136 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1137 "Futures.get exception type (%s) must not be a RuntimeException",
1138 exceptionClass);
1139 try {
1140 return future.get();
1141 } catch (InterruptedException e) {
1142 currentThread().interrupt();
1143 throw newWithCause(exceptionClass, e);
1144 } catch (ExecutionException e) {
1145 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1146 throw new AssertionError();
1147 }
1148 }
1149
1150 /**
1151 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most
1152 * exceptions to a new instance of the given checked exception type. This
1153 * reduces boilerplate for a common use of {@code Future} in which it is
1154 * unnecessary to programmatically distinguish between exception types or to
1155 * extract other information from the exception instance.
1156 *
1157 * <p>Exceptions from {@code Future.get} are treated as follows:
1158 * <ul>
1159 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1160 * {@code X} if the cause is a checked exception, an {@link
1161 * UncheckedExecutionException} if the cause is a {@code
1162 * RuntimeException}, or an {@link ExecutionError} if the cause is an
1163 * {@code Error}.
1164 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after
1165 * restoring the interrupt).
1166 * <li>Any {@link TimeoutException} is wrapped in an {@code X}.
1167 * <li>Any {@link CancellationException} is propagated untouched, as is any
1168 * other {@link RuntimeException} (though {@code get} implementations are
1169 * discouraged from throwing such exceptions).
1170 * </ul>
1171 *
1172 * The overall principle is to continue to treat every checked exception as a
1173 * checked exception, every unchecked exception as an unchecked exception, and
1174 * every error as an error. In addition, the cause of any {@code
1175 * ExecutionException} is wrapped in order to ensure that the new stack trace
1176 * matches that of the current thread.
1177 *
1178 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary
1179 * public constructor that accepts zero or more arguments, all of type {@code
1180 * String} or {@code Throwable} (preferring constructors with at least one
1181 * {@code String}) and calling the constructor via reflection. If the
1182 * exception did not already have a cause, one is set by calling {@link
1183 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an
1184 * {@code IllegalArgumentException} is thrown.
1185 *
1186 * @throws X if {@code get} throws any checked exception except for an {@code
1187 * ExecutionException} whose cause is not itself a checked exception
1188 * @throws UncheckedExecutionException if {@code get} throws an {@code
1189 * ExecutionException} with a {@code RuntimeException} as its cause
1190 * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1191 * with an {@code Error} as its cause
1192 * @throws CancellationException if {@code get} throws a {@code
1193 * CancellationException}
1194 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code
1195 * RuntimeException} or does not have a suitable constructor
1196 * @since 10.0
1197 */
1198 @Beta
1199 public static <V, X extends Exception> V get(
1200 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass)
1201 throws X {
1202 checkNotNull(future);
1203 checkNotNull(unit);
1204 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass),
1205 "Futures.get exception type (%s) must not be a RuntimeException",
1206 exceptionClass);
1207 try {
1208 return future.get(timeout, unit);
1209 } catch (InterruptedException e) {
1210 currentThread().interrupt();
1211 throw newWithCause(exceptionClass, e);
1212 } catch (TimeoutException e) {
1213 throw newWithCause(exceptionClass, e);
1214 } catch (ExecutionException e) {
1215 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass);
1216 throw new AssertionError();
1217 }
1218 }
1219
1220 private static <X extends Exception> void wrapAndThrowExceptionOrError(
1221 Throwable cause, Class<X> exceptionClass) throws X {
1222 if (cause instanceof Error) {
1223 throw new ExecutionError((Error) cause);
1224 }
1225 if (cause instanceof RuntimeException) {
1226 throw new UncheckedExecutionException(cause);
1227 }
1228 throw newWithCause(exceptionClass, cause);
1229 }
1230
1231 /**
1232 * Returns the result of calling {@link Future#get()} uninterruptibly on a
1233 * task known not to throw a checked exception. This makes {@code Future} more
1234 * suitable for lightweight, fast-running tasks that, barring bugs in the
1235 * code, will not fail. This gives it exception-handling behavior similar to
1236 * that of {@code ForkJoinTask.join}.
1237 *
1238 * <p>Exceptions from {@code Future.get} are treated as follows:
1239 * <ul>
1240 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an
1241 * {@link UncheckedExecutionException} (if the cause is an {@code
1242 * Exception}) or {@link ExecutionError} (if the cause is an {@code
1243 * Error}).
1244 * <li>Any {@link InterruptedException} causes a retry of the {@code get}
1245 * call. The interrupt is restored before {@code getUnchecked} returns.
1246 * <li>Any {@link CancellationException} is propagated untouched. So is any
1247 * other {@link RuntimeException} ({@code get} implementations are
1248 * discouraged from throwing such exceptions).
1249 * </ul>
1250 *
1251 * The overall principle is to eliminate all checked exceptions: to loop to
1252 * avoid {@code InterruptedException}, to pass through {@code
1253 * CancellationException}, and to wrap any exception from the underlying
1254 * computation in an {@code UncheckedExecutionException} or {@code
1255 * ExecutionError}.
1256 *
1257 * <p>For an uninterruptible {@code get} that preserves other exceptions, see
1258 * {@link Uninterruptibles#getUninterruptibly(Future)}.
1259 *
1260 * @throws UncheckedExecutionException if {@code get} throws an {@code
1261 * ExecutionException} with an {@code Exception} as its cause
1262 * @throws ExecutionError if {@code get} throws an {@code ExecutionException}
1263 * with an {@code Error} as its cause
1264 * @throws CancellationException if {@code get} throws a {@code
1265 * CancellationException}
1266 * @since 10.0
1267 */
1268 @Beta
1269 public static <V> V getUnchecked(Future<V> future) {
1270 checkNotNull(future);
1271 try {
1272 return getUninterruptibly(future);
1273 } catch (ExecutionException e) {
1274 wrapAndThrowUnchecked(e.getCause());
1275 throw new AssertionError();
1276 }
1277 }
1278
1279 private static void wrapAndThrowUnchecked(Throwable cause) {
1280 if (cause instanceof Error) {
1281 throw new ExecutionError((Error) cause);
1282 }
1283 /*
1284 * It's a non-Error, non-Exception Throwable. From my survey of such
1285 * classes, I believe that most users intended to extend Exception, so we'll
1286 * treat it like an Exception.
1287 */
1288 throw new UncheckedExecutionException(cause);
1289 }
1290
1291 /*
1292 * TODO(user): FutureChecker interface for these to be static methods on? If
1293 * so, refer to it in the (static-method) Futures.get documentation
1294 */
1295
1296 /*
1297 * Arguably we don't need a timed getUnchecked because any operation slow
1298 * enough to require a timeout is heavyweight enough to throw a checked
1299 * exception and therefore be inappropriate to use with getUnchecked. Further,
1300 * it's not clear that converting the checked TimeoutException to a
1301 * RuntimeException -- especially to an UncheckedExecutionException, since it
1302 * wasn't thrown by the computation -- makes sense, and if we don't convert
1303 * it, the user still has to write a try-catch block.
1304 *
1305 * If you think you would use this method, let us know.
1306 */
1307
1308 private static <X extends Exception> X newWithCause(
1309 Class<X> exceptionClass, Throwable cause) {
1310 // getConstructors() guarantees this as long as we don't modify the array.
1311 @SuppressWarnings("unchecked")
1312 List<Constructor<X>> constructors =
1313 (List) Arrays.asList(exceptionClass.getConstructors());
1314 for (Constructor<X> constructor : preferringStrings(constructors)) {
1315 @Nullable X instance = newFromConstructor(constructor, cause);
1316 if (instance != null) {
1317 if (instance.getCause() == null) {
1318 instance.initCause(cause);
1319 }
1320 return instance;
1321 }
1322 }
1323 throw new IllegalArgumentException(
1324 "No appropriate constructor for exception of type " + exceptionClass
1325 + " in response to chained exception", cause);
1326 }
1327
1328 private static <X extends Exception> List<Constructor<X>>
1329 preferringStrings(List<Constructor<X>> constructors) {
1330 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors);
1331 }
1332
1333 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST =
1334 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() {
1335 @Override public Boolean apply(Constructor<?> input) {
1336 return asList(input.getParameterTypes()).contains(String.class);
1337 }
1338 }).reverse();
1339
1340 @Nullable private static <X> X newFromConstructor(
1341 Constructor<X> constructor, Throwable cause) {
1342 Class<?>[] paramTypes = constructor.getParameterTypes();
1343 Object[] params = new Object[paramTypes.length];
1344 for (int i = 0; i < paramTypes.length; i++) {
1345 Class<?> paramType = paramTypes[i];
1346 if (paramType.equals(String.class)) {
1347 params[i] = cause.toString();
1348 } else if (paramType.equals(Throwable.class)) {
1349 params[i] = cause;
1350 } else {
1351 return null;
1352 }
1353 }
1354 try {
1355 return constructor.newInstance(params);
1356 } catch (IllegalArgumentException e) {
1357 return null;
1358 } catch (InstantiationException e) {
1359 return null;
1360 } catch (IllegalAccessException e) {
1361 return null;
1362 } catch (InvocationTargetException e) {
1363 return null;
1364 }
1365 }
1366
1367 /**
1368 * Class that implements {@link #allAsList} and {@link #successfulAsList}.
1369 * The idea is to create a (null-filled) List and register a listener with
1370 * each component future to fill out the value in the List when that future
1371 * completes.
1372 */
1373 private static class ListFuture<V> extends AbstractFuture<List<V>> {
1374 ImmutableList<? extends ListenableFuture<? extends V>> futures;
1375 final boolean allMustSucceed;
1376 final AtomicInteger remaining;
1377 List<V> values;
1378
1379 /**
1380 * Constructor.
1381 *
1382 * @param futures all the futures to build the list from
1383 * @param allMustSucceed whether a single failure or cancellation should
1384 * propagate to this future
1385 * @param listenerExecutor used to run listeners on all the passed in
1386 * futures.
1387 */
1388 ListFuture(
1389 final ImmutableList<? extends ListenableFuture<? extends V>> futures,
1390 final boolean allMustSucceed, final Executor listenerExecutor) {
1391 this.futures = futures;
1392 this.values = Lists.newArrayListWithCapacity(futures.size());
1393 this.allMustSucceed = allMustSucceed;
1394 this.remaining = new AtomicInteger(futures.size());
1395
1396 init(listenerExecutor);
1397 }
1398
1399 private void init(final Executor listenerExecutor) {
1400 // First, schedule cleanup to execute when the Future is done.
1401 addListener(new Runnable() {
1402 @Override
1403 public void run() {
1404 // By now the values array has either been set as the Future's value,
1405 // or (in case of failure) is no longer useful.
1406 ListFuture.this.values = null;
1407
1408 // Let go of the memory held by other futures
1409 ListFuture.this.futures = null;
1410 }
1411 }, MoreExecutors.sameThreadExecutor());
1412
1413 // Now begin the "real" initialization.
1414
1415 // Corner case: List is empty.
1416 if (futures.isEmpty()) {
1417 set(Lists.newArrayList(values));
1418 return;
1419 }
1420
1421 // Populate the results list with null initially.
1422 for (int i = 0; i < futures.size(); ++i) {
1423 values.add(null);
1424 }
1425
1426 // Register a listener on each Future in the list to update
1427 // the state of this future.
1428 // Note that if all the futures on the list are done prior to completing
1429 // this loop, the last call to addListener() will callback to
1430 // setOneValue(), transitively call our cleanup listener, and set
1431 // this.futures to null.
1432 // We store a reference to futures to avoid the NPE.
1433 ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures;
1434 for (int i = 0; i < localFutures.size(); i++) {
1435 final ListenableFuture<? extends V> listenable = localFutures.get(i);
1436 final int index = i;
1437 listenable.addListener(new Runnable() {
1438 @Override
1439 public void run() {
1440 setOneValue(index, listenable);
1441 }
1442 }, listenerExecutor);
1443 }
1444 }
1445
1446 /**
1447 * Sets the value at the given index to that of the given future.
1448 */
1449 private void setOneValue(int index, Future<? extends V> future) {
1450 List<V> localValues = values;
1451 if (isDone() || localValues == null) {
1452 // Some other future failed or has been cancelled, causing this one to
1453 // also be cancelled or have an exception set. This should only happen
1454 // if allMustSucceed is true.
1455 checkState(allMustSucceed,
1456 "Future was done before all dependencies completed");
1457 return;
1458 }
1459
1460 try {
1461 checkState(future.isDone(),
1462 "Tried to set value from future which is not done");
1463 localValues.set(index, getUninterruptibly(future));
1464 } catch (CancellationException e) {
1465 if (allMustSucceed) {
1466 // Set ourselves as cancelled. Let the input futures keep running
1467 // as some of them may be used elsewhere.
1468 // (Currently we don't override interruptTask, so
1469 // mayInterruptIfRunning==false isn't technically necessary.)
1470 cancel(false);
1471 }
1472 } catch (ExecutionException e) {
1473 if (allMustSucceed) {
1474 // As soon as the first one fails, throw the exception up.
1475 // The result of all other inputs is then ignored.
1476 setException(e.getCause());
1477 }
1478 } catch (RuntimeException e) {
1479 if (allMustSucceed) {
1480 setException(e);
1481 }
1482 } catch (Error e) {
1483 // Propagate errors up ASAP - our superclass will rethrow the error
1484 setException(e);
1485 } finally {
1486 int newRemaining = remaining.decrementAndGet();
1487 checkState(newRemaining >= 0, "Less than 0 remaining futures");
1488 if (newRemaining == 0) {
1489 localValues = values;
1490 if (localValues != null) {
1491 set(Lists.newArrayList(localValues));
1492 } else {
1493 checkState(isDone());
1494 }
1495 }
1496 }
1497 }
1498
1499 @Override
1500 public List<V> get() throws InterruptedException, ExecutionException {
1501 callAllGets();
1502
1503 // This may still block in spite of the calls above, as the listeners may
1504 // be scheduled for execution in other threads.
1505 return super.get();
1506 }
1507
1508 /**
1509 * Calls the get method of all dependency futures to work around a bug in
1510 * some ListenableFutures where the listeners aren't called until get() is
1511 * called.
1512 */
1513 private void callAllGets() throws InterruptedException {
1514 List<? extends ListenableFuture<? extends V>> oldFutures = futures;
1515 if (oldFutures != null && !isDone()) {
1516 for (ListenableFuture<? extends V> future : oldFutures) {
1517 // We wait for a little while for the future, but if it's not done,
1518 // we check that no other futures caused a cancellation or failure.
1519 // This can introduce a delay of up to 10ms in reporting an exception.
1520 while (!future.isDone()) {
1521 try {
1522 future.get();
1523 } catch (Error e) {
1524 throw e;
1525 } catch (InterruptedException e) {
1526 throw e;
1527 } catch (Throwable e) {
1528 // ExecutionException / CancellationException / RuntimeException
1529 if (allMustSucceed) {
1530 return;
1531 } else {
1532 continue;
1533 }
1534 }
1535 }
1536 }
1537 }
1538 }
1539 }
1540
1541 /**
1542 * A checked future that uses a function to map from exceptions to the
1543 * appropriate checked type.
1544 */
1545 private static class MappingCheckedFuture<V, X extends Exception> extends
1546 AbstractCheckedFuture<V, X> {
1547
1548 final Function<Exception, X> mapper;
1549
1550 MappingCheckedFuture(ListenableFuture<V> delegate,
1551 Function<Exception, X> mapper) {
1552 super(delegate);
1553
1554 this.mapper = checkNotNull(mapper);
1555 }
1556
1557 @Override
1558 protected X mapException(Exception e) {
1559 return mapper.apply(e);
1560 }
1561 }
1562 }