001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.base.Preconditions.checkState; 022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; 023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 024import static java.lang.Thread.currentThread; 025import static java.util.Arrays.asList; 026 027import com.google.common.annotations.Beta; 028import com.google.common.base.Function; 029import com.google.common.base.Optional; 030import com.google.common.base.Preconditions; 031import com.google.common.collect.ImmutableCollection; 032import com.google.common.collect.ImmutableList; 033import com.google.common.collect.Lists; 034import com.google.common.collect.Ordering; 035import com.google.common.collect.Sets; 036 037import java.lang.reflect.Constructor; 038import java.lang.reflect.InvocationTargetException; 039import java.lang.reflect.UndeclaredThrowableException; 040import java.util.Arrays; 041import java.util.Collections; 042import java.util.List; 043import java.util.Set; 044import java.util.concurrent.CancellationException; 045import java.util.concurrent.CountDownLatch; 046import java.util.concurrent.ExecutionException; 047import java.util.concurrent.Executor; 048import java.util.concurrent.Future; 049import java.util.concurrent.TimeUnit; 050import java.util.concurrent.TimeoutException; 051import java.util.concurrent.atomic.AtomicInteger; 052import java.util.logging.Level; 053import java.util.logging.Logger; 054 055import javax.annotation.Nullable; 056 057/** 058 * Static utility methods pertaining to the {@link Future} interface. 059 * 060 * <p>Many of these methods use the {@link ListenableFuture} API; consult the 061 * Guava User Guide article on <a href= 062 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> 063 * {@code ListenableFuture}</a>. 064 * 065 * @author Kevin Bourrillion 066 * @author Nishant Thakkar 067 * @author Sven Mawson 068 * @since 1.0 069 */ 070@Beta 071public final class Futures { 072 private Futures() {} 073 074 /** 075 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 076 * and a {@link Function} that maps from {@link Exception} instances into the 077 * appropriate checked type. 078 * 079 * <p>The given mapping function will be applied to an 080 * {@link InterruptedException}, a {@link CancellationException}, or an 081 * {@link ExecutionException}. 082 * See {@link Future#get()} for details on the exceptions thrown. 083 * 084 * @since 9.0 (source-compatible since 1.0) 085 */ 086 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 087 ListenableFuture<V> future, Function<Exception, X> mapper) { 088 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 089 } 090 091 private abstract static class ImmediateFuture<V> 092 implements ListenableFuture<V> { 093 094 private static final Logger log = 095 Logger.getLogger(ImmediateFuture.class.getName()); 096 097 @Override 098 public void addListener(Runnable listener, Executor executor) { 099 checkNotNull(listener, "Runnable was null."); 100 checkNotNull(executor, "Executor was null."); 101 try { 102 executor.execute(listener); 103 } catch (RuntimeException e) { 104 // ListenableFuture's contract is that it will not throw unchecked 105 // exceptions, so log the bad runnable and/or executor and swallow it. 106 log.log(Level.SEVERE, "RuntimeException while executing runnable " 107 + listener + " with executor " + executor, e); 108 } 109 } 110 111 @Override 112 public boolean cancel(boolean mayInterruptIfRunning) { 113 return false; 114 } 115 116 @Override 117 public abstract V get() throws ExecutionException; 118 119 @Override 120 public V get(long timeout, TimeUnit unit) throws ExecutionException { 121 checkNotNull(unit); 122 return get(); 123 } 124 125 @Override 126 public boolean isCancelled() { 127 return false; 128 } 129 130 @Override 131 public boolean isDone() { 132 return true; 133 } 134 } 135 136 private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { 137 138 @Nullable private final V value; 139 140 ImmediateSuccessfulFuture(@Nullable V value) { 141 this.value = value; 142 } 143 144 @Override 145 public V get() { 146 return value; 147 } 148 } 149 150 private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception> 151 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 152 153 @Nullable private final V value; 154 155 ImmediateSuccessfulCheckedFuture(@Nullable V value) { 156 this.value = value; 157 } 158 159 @Override 160 public V get() { 161 return value; 162 } 163 164 @Override 165 public V checkedGet() { 166 return value; 167 } 168 169 @Override 170 public V checkedGet(long timeout, TimeUnit unit) { 171 checkNotNull(unit); 172 return value; 173 } 174 } 175 176 private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { 177 178 private final Throwable thrown; 179 180 ImmediateFailedFuture(Throwable thrown) { 181 this.thrown = thrown; 182 } 183 184 @Override 185 public V get() throws ExecutionException { 186 throw new ExecutionException(thrown); 187 } 188 } 189 190 private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> { 191 192 private final CancellationException thrown; 193 194 ImmediateCancelledFuture() { 195 this.thrown = new CancellationException("Immediate cancelled future."); 196 } 197 198 @Override 199 public boolean isCancelled() { 200 return true; 201 } 202 203 @Override 204 public V get() { 205 throw AbstractFuture.cancellationExceptionWithCause( 206 "Task was cancelled.", thrown); 207 } 208 } 209 210 private static class ImmediateFailedCheckedFuture<V, X extends Exception> 211 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 212 213 private final X thrown; 214 215 ImmediateFailedCheckedFuture(X thrown) { 216 this.thrown = thrown; 217 } 218 219 @Override 220 public V get() throws ExecutionException { 221 throw new ExecutionException(thrown); 222 } 223 224 @Override 225 public V checkedGet() throws X { 226 throw thrown; 227 } 228 229 @Override 230 public V checkedGet(long timeout, TimeUnit unit) throws X { 231 checkNotNull(unit); 232 throw thrown; 233 } 234 } 235 236 /** 237 * Creates a {@code ListenableFuture} which has its value set immediately upon 238 * construction. The getters just return the value. This {@code Future} can't 239 * be canceled or timed out and its {@code isDone()} method always returns 240 * {@code true}. 241 */ 242 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 243 return new ImmediateSuccessfulFuture<V>(value); 244 } 245 246 /** 247 * Returns a {@code CheckedFuture} which has its value set immediately upon 248 * construction. 249 * 250 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 251 * method always returns {@code true}. Calling {@code get()} or {@code 252 * checkedGet()} will immediately return the provided value. 253 */ 254 public static <V, X extends Exception> CheckedFuture<V, X> 255 immediateCheckedFuture(@Nullable V value) { 256 return new ImmediateSuccessfulCheckedFuture<V, X>(value); 257 } 258 259 /** 260 * Returns a {@code ListenableFuture} which has an exception set immediately 261 * upon construction. 262 * 263 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 264 * method always returns {@code true}. Calling {@code get()} will immediately 265 * throw the provided {@code Throwable} wrapped in an {@code 266 * ExecutionException}. 267 */ 268 public static <V> ListenableFuture<V> immediateFailedFuture( 269 Throwable throwable) { 270 checkNotNull(throwable); 271 return new ImmediateFailedFuture<V>(throwable); 272 } 273 274 /** 275 * Creates a {@code ListenableFuture} which is cancelled immediately upon 276 * construction, so that {@code isCancelled()} always returns {@code true}. 277 * 278 * @since 14.0 279 */ 280 public static <V> ListenableFuture<V> immediateCancelledFuture() { 281 return new ImmediateCancelledFuture<V>(); 282 } 283 284 /** 285 * Returns a {@code CheckedFuture} which has an exception set immediately upon 286 * construction. 287 * 288 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 289 * method always returns {@code true}. Calling {@code get()} will immediately 290 * throw the provided {@code Exception} wrapped in an {@code 291 * ExecutionException}, and calling {@code checkedGet()} will throw the 292 * provided exception itself. 293 */ 294 public static <V, X extends Exception> CheckedFuture<V, X> 295 immediateFailedCheckedFuture(X exception) { 296 checkNotNull(exception); 297 return new ImmediateFailedCheckedFuture<V, X>(exception); 298 } 299 300 /** 301 * Returns a {@code Future} whose result is taken from the given primary 302 * {@code input} or, if the primary input fails, from the {@code Future} 303 * provided by the {@code fallback}. {@link FutureFallback#create} is not 304 * invoked until the primary input has failed, so if the primary input 305 * succeeds, it is never invoked. If, during the invocation of {@code 306 * fallback}, an exception is thrown, this exception is used as the result of 307 * the output {@code Future}. 308 * 309 * <p>Below is an example of a fallback that returns a default value if an 310 * exception occurs: 311 * 312 * <pre> {@code 313 * ListenableFuture<Integer> fetchCounterFuture = ...; 314 * 315 * // Falling back to a zero counter in case an exception happens when 316 * // processing the RPC to fetch counters. 317 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 318 * fetchCounterFuture, new FutureFallback<Integer>() { 319 * public ListenableFuture<Integer> create(Throwable t) { 320 * // Returning "0" as the default for the counter when the 321 * // exception happens. 322 * return immediateFuture(0); 323 * } 324 * });}</pre> 325 * 326 * <p>The fallback can also choose to propagate the original exception when 327 * desired: 328 * 329 * <pre> {@code 330 * ListenableFuture<Integer> fetchCounterFuture = ...; 331 * 332 * // Falling back to a zero counter only in case the exception was a 333 * // TimeoutException. 334 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 335 * fetchCounterFuture, new FutureFallback<Integer>() { 336 * public ListenableFuture<Integer> create(Throwable t) { 337 * if (t instanceof TimeoutException) { 338 * return immediateFuture(0); 339 * } 340 * return immediateFailedFuture(t); 341 * } 342 * });}</pre> 343 * 344 * <p>Note: If the derived {@code Future} is slow or heavyweight to create 345 * (whether the {@code Future} itself is slow or heavyweight to complete is 346 * irrelevant), consider {@linkplain #withFallback(ListenableFuture, 347 * FutureFallback, Executor) supplying an executor}. If you do not supply an 348 * executor, {@code withFallback} will use {@link 349 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 350 * caveats for heavier operations. For example, the call to {@code 351 * fallback.create} may run on an unpredictable or undesirable thread: 352 * 353 * <ul> 354 * <li>If the input {@code Future} is done at the time {@code withFallback} 355 * is called, {@code withFallback} will call {@code fallback.create} inline. 356 * <li>If the input {@code Future} is not yet done, {@code withFallback} will 357 * schedule {@code fallback.create} to be run by the thread that completes 358 * the input {@code Future}, which may be an internal system thread such as 359 * an RPC network thread. 360 * </ul> 361 * 362 * <p>Also note that, regardless of which thread executes the {@code 363 * sameThreadExecutor} {@code fallback.create}, all other registered but 364 * unexecuted listeners are prevented from running during its execution, even 365 * if those listeners are to run in other executors. 366 * 367 * @param input the primary input {@code Future} 368 * @param fallback the {@link FutureFallback} implementation to be called if 369 * {@code input} fails 370 * @since 14.0 371 */ 372 public static <V> ListenableFuture<V> withFallback( 373 ListenableFuture<? extends V> input, 374 FutureFallback<? extends V> fallback) { 375 return withFallback(input, fallback, sameThreadExecutor()); 376 } 377 378 /** 379 * Returns a {@code Future} whose result is taken from the given primary 380 * {@code input} or, if the primary input fails, from the {@code Future} 381 * provided by the {@code fallback}. {@link FutureFallback#create} is not 382 * invoked until the primary input has failed, so if the primary input 383 * succeeds, it is never invoked. If, during the invocation of {@code 384 * fallback}, an exception is thrown, this exception is used as the result of 385 * the output {@code Future}. 386 * 387 * <p>Below is an example of a fallback that returns a default value if an 388 * exception occurs: 389 * 390 * <pre> {@code 391 * ListenableFuture<Integer> fetchCounterFuture = ...; 392 * 393 * // Falling back to a zero counter in case an exception happens when 394 * // processing the RPC to fetch counters. 395 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 396 * fetchCounterFuture, new FutureFallback<Integer>() { 397 * public ListenableFuture<Integer> create(Throwable t) { 398 * // Returning "0" as the default for the counter when the 399 * // exception happens. 400 * return immediateFuture(0); 401 * } 402 * }, sameThreadExecutor());}</pre> 403 * 404 * <p>The fallback can also choose to propagate the original exception when 405 * desired: 406 * 407 * <pre> {@code 408 * ListenableFuture<Integer> fetchCounterFuture = ...; 409 * 410 * // Falling back to a zero counter only in case the exception was a 411 * // TimeoutException. 412 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 413 * fetchCounterFuture, new FutureFallback<Integer>() { 414 * public ListenableFuture<Integer> create(Throwable t) { 415 * if (t instanceof TimeoutException) { 416 * return immediateFuture(0); 417 * } 418 * return immediateFailedFuture(t); 419 * } 420 * }, sameThreadExecutor());}</pre> 421 * 422 * <p>When the execution of {@code fallback.create} is fast and lightweight 423 * (though the {@code Future} it returns need not meet these criteria), 424 * consider {@linkplain #withFallback(ListenableFuture, FutureFallback) 425 * omitting the executor} or explicitly specifying {@code 426 * sameThreadExecutor}. However, be aware of the caveats documented in the 427 * link above. 428 * 429 * @param input the primary input {@code Future} 430 * @param fallback the {@link FutureFallback} implementation to be called if 431 * {@code input} fails 432 * @param executor the executor that runs {@code fallback} if {@code input} 433 * fails 434 * @since 14.0 435 */ 436 public static <V> ListenableFuture<V> withFallback( 437 ListenableFuture<? extends V> input, 438 FutureFallback<? extends V> fallback, Executor executor) { 439 checkNotNull(fallback); 440 return new FallbackFuture<V>(input, fallback, executor); 441 } 442 443 /** 444 * A future that falls back on a second, generated future, in case its 445 * original future fails. 446 */ 447 private static class FallbackFuture<V> extends AbstractFuture<V> { 448 449 private volatile ListenableFuture<? extends V> running; 450 451 FallbackFuture(ListenableFuture<? extends V> input, 452 final FutureFallback<? extends V> fallback, 453 final Executor executor) { 454 running = input; 455 addCallback(running, new FutureCallback<V>() { 456 @Override 457 public void onSuccess(V value) { 458 set(value); 459 } 460 461 @Override 462 public void onFailure(Throwable t) { 463 if (isCancelled()) { 464 return; 465 } 466 try { 467 running = fallback.create(t); 468 if (isCancelled()) { // in case cancel called in the meantime 469 running.cancel(wasInterrupted()); 470 return; 471 } 472 addCallback(running, new FutureCallback<V>() { 473 @Override 474 public void onSuccess(V value) { 475 set(value); 476 } 477 478 @Override 479 public void onFailure(Throwable t) { 480 if (running.isCancelled()) { 481 cancel(false); 482 } else { 483 setException(t); 484 } 485 } 486 }, sameThreadExecutor()); 487 } catch (Throwable e) { 488 setException(e); 489 } 490 } 491 }, executor); 492 } 493 494 @Override 495 public boolean cancel(boolean mayInterruptIfRunning) { 496 if (super.cancel(mayInterruptIfRunning)) { 497 running.cancel(mayInterruptIfRunning); 498 return true; 499 } 500 return false; 501 } 502 } 503 504 /** 505 * Returns a new {@code ListenableFuture} whose result is asynchronously 506 * derived from the result of the given {@code Future}. More precisely, the 507 * returned {@code Future} takes its result from a {@code Future} produced by 508 * applying the given {@code AsyncFunction} to the result of the original 509 * {@code Future}. Example: 510 * 511 * <pre> {@code 512 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 513 * AsyncFunction<RowKey, QueryResult> queryFunction = 514 * new AsyncFunction<RowKey, QueryResult>() { 515 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 516 * return dataService.read(rowKey); 517 * } 518 * }; 519 * ListenableFuture<QueryResult> queryFuture = 520 * transform(rowKeyFuture, queryFunction);}</pre> 521 * 522 * <p>Note: If the derived {@code Future} is slow or heavyweight to create 523 * (whether the {@code Future} itself is slow or heavyweight to complete is 524 * irrelevant), consider {@linkplain #transform(ListenableFuture, 525 * AsyncFunction, Executor) supplying an executor}. If you do not supply an 526 * executor, {@code transform} will use {@link 527 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 528 * caveats for heavier operations. For example, the call to {@code 529 * function.apply} may run on an unpredictable or undesirable thread: 530 * 531 * <ul> 532 * <li>If the input {@code Future} is done at the time {@code transform} is 533 * called, {@code transform} will call {@code function.apply} inline. 534 * <li>If the input {@code Future} is not yet done, {@code transform} will 535 * schedule {@code function.apply} to be run by the thread that completes the 536 * input {@code Future}, which may be an internal system thread such as an 537 * RPC network thread. 538 * </ul> 539 * 540 * <p>Also note that, regardless of which thread executes the {@code 541 * sameThreadExecutor} {@code function.apply}, all other registered but 542 * unexecuted listeners are prevented from running during its execution, even 543 * if those listeners are to run in other executors. 544 * 545 * <p>The returned {@code Future} attempts to keep its cancellation state in 546 * sync with that of the input future and that of the future returned by the 547 * function. That is, if the returned {@code Future} is cancelled, it will 548 * attempt to cancel the other two, and if either of the other two is 549 * cancelled, the returned {@code Future} will receive a callback in which it 550 * will attempt to cancel itself. 551 * 552 * @param input The future to transform 553 * @param function A function to transform the result of the input future 554 * to the result of the output future 555 * @return A future that holds result of the function (if the input succeeded) 556 * or the original input's failure (if not) 557 * @since 11.0 558 */ 559 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 560 AsyncFunction<? super I, ? extends O> function) { 561 return transform(input, function, MoreExecutors.sameThreadExecutor()); 562 } 563 564 /** 565 * Returns a new {@code ListenableFuture} whose result is asynchronously 566 * derived from the result of the given {@code Future}. More precisely, the 567 * returned {@code Future} takes its result from a {@code Future} produced by 568 * applying the given {@code AsyncFunction} to the result of the original 569 * {@code Future}. Example: 570 * 571 * <pre> {@code 572 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 573 * AsyncFunction<RowKey, QueryResult> queryFunction = 574 * new AsyncFunction<RowKey, QueryResult>() { 575 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 576 * return dataService.read(rowKey); 577 * } 578 * }; 579 * ListenableFuture<QueryResult> queryFuture = 580 * transform(rowKeyFuture, queryFunction, executor);}</pre> 581 * 582 * <p>The returned {@code Future} attempts to keep its cancellation state in 583 * sync with that of the input future and that of the future returned by the 584 * chain function. That is, if the returned {@code Future} is cancelled, it 585 * will attempt to cancel the other two, and if either of the other two is 586 * cancelled, the returned {@code Future} will receive a callback in which it 587 * will attempt to cancel itself. 588 * 589 * <p>When the execution of {@code function.apply} is fast and lightweight 590 * (though the {@code Future} it returns need not meet these criteria), 591 * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting 592 * the executor} or explicitly specifying {@code sameThreadExecutor}. 593 * However, be aware of the caveats documented in the link above. 594 * 595 * @param input The future to transform 596 * @param function A function to transform the result of the input future 597 * to the result of the output future 598 * @param executor Executor to run the function in. 599 * @return A future that holds result of the function (if the input succeeded) 600 * or the original input's failure (if not) 601 * @since 11.0 602 */ 603 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 604 AsyncFunction<? super I, ? extends O> function, 605 Executor executor) { 606 ChainingListenableFuture<I, O> output = 607 new ChainingListenableFuture<I, O>(function, input); 608 input.addListener(output, executor); 609 return output; 610 } 611 612 /** 613 * Returns a new {@code ListenableFuture} whose result is the product of 614 * applying the given {@code Function} to the result of the given {@code 615 * Future}. Example: 616 * 617 * <pre> {@code 618 * ListenableFuture<QueryResult> queryFuture = ...; 619 * Function<QueryResult, List<Row>> rowsFunction = 620 * new Function<QueryResult, List<Row>>() { 621 * public List<Row> apply(QueryResult queryResult) { 622 * return queryResult.getRows(); 623 * } 624 * }; 625 * ListenableFuture<List<Row>> rowsFuture = 626 * transform(queryFuture, rowsFunction);}</pre> 627 * 628 * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain 629 * #transform(ListenableFuture, Function, Executor) supplying an executor}. 630 * If you do not supply an executor, {@code transform} will use {@link 631 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 632 * caveats for heavier operations. For example, the call to {@code 633 * function.apply} may run on an unpredictable or undesirable thread: 634 * 635 * <ul> 636 * <li>If the input {@code Future} is done at the time {@code transform} is 637 * called, {@code transform} will call {@code function.apply} inline. 638 * <li>If the input {@code Future} is not yet done, {@code transform} will 639 * schedule {@code function.apply} to be run by the thread that completes the 640 * input {@code Future}, which may be an internal system thread such as an 641 * RPC network thread. 642 * </ul> 643 * 644 * <p>Also note that, regardless of which thread executes the {@code 645 * sameThreadExecutor} {@code function.apply}, all other registered but 646 * unexecuted listeners are prevented from running during its execution, even 647 * if those listeners are to run in other executors. 648 * 649 * <p>The returned {@code Future} attempts to keep its cancellation state in 650 * sync with that of the input future. That is, if the returned {@code Future} 651 * is cancelled, it will attempt to cancel the input, and if the input is 652 * cancelled, the returned {@code Future} will receive a callback in which it 653 * will attempt to cancel itself. 654 * 655 * <p>An example use of this method is to convert a serializable object 656 * returned from an RPC into a POJO. 657 * 658 * @param input The future to transform 659 * @param function A Function to transform the results of the provided future 660 * to the results of the returned future. This will be run in the thread 661 * that notifies input it is complete. 662 * @return A future that holds result of the transformation. 663 * @since 9.0 (in 1.0 as {@code compose}) 664 */ 665 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 666 final Function<? super I, ? extends O> function) { 667 return transform(input, function, MoreExecutors.sameThreadExecutor()); 668 } 669 670 /** 671 * Returns a new {@code ListenableFuture} whose result is the product of 672 * applying the given {@code Function} to the result of the given {@code 673 * Future}. Example: 674 * 675 * <pre> {@code 676 * ListenableFuture<QueryResult> queryFuture = ...; 677 * Function<QueryResult, List<Row>> rowsFunction = 678 * new Function<QueryResult, List<Row>>() { 679 * public List<Row> apply(QueryResult queryResult) { 680 * return queryResult.getRows(); 681 * } 682 * }; 683 * ListenableFuture<List<Row>> rowsFuture = 684 * transform(queryFuture, rowsFunction, executor);}</pre> 685 * 686 * <p>The returned {@code Future} attempts to keep its cancellation state in 687 * sync with that of the input future. That is, if the returned {@code Future} 688 * is cancelled, it will attempt to cancel the input, and if the input is 689 * cancelled, the returned {@code Future} will receive a callback in which it 690 * will attempt to cancel itself. 691 * 692 * <p>An example use of this method is to convert a serializable object 693 * returned from an RPC into a POJO. 694 * 695 * <p>When the transformation is fast and lightweight, consider {@linkplain 696 * #transform(ListenableFuture, Function) omitting the executor} or 697 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 698 * caveats documented in the link above. 699 * 700 * @param input The future to transform 701 * @param function A Function to transform the results of the provided future 702 * to the results of the returned future. 703 * @param executor Executor to run the function in. 704 * @return A future that holds result of the transformation. 705 * @since 9.0 (in 2.0 as {@code compose}) 706 */ 707 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 708 final Function<? super I, ? extends O> function, Executor executor) { 709 checkNotNull(function); 710 AsyncFunction<I, O> wrapperFunction 711 = new AsyncFunction<I, O>() { 712 @Override public ListenableFuture<O> apply(I input) { 713 O output = function.apply(input); 714 return immediateFuture(output); 715 } 716 }; 717 return transform(input, wrapperFunction, executor); 718 } 719 720 /** 721 * Like {@link #transform(ListenableFuture, Function)} except that the 722 * transformation {@code function} is invoked on each call to 723 * {@link Future#get() get()} on the returned future. 724 * 725 * <p>The returned {@code Future} reflects the input's cancellation 726 * state directly, and any attempt to cancel the returned Future is likewise 727 * passed through to the input Future. 728 * 729 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 730 * only apply the timeout to the execution of the underlying {@code Future}, 731 * <em>not</em> to the execution of the transformation function. 732 * 733 * <p>The primary audience of this method is callers of {@code transform} 734 * who don't have a {@code ListenableFuture} available and 735 * do not mind repeated, lazy function evaluation. 736 * 737 * @param input The future to transform 738 * @param function A Function to transform the results of the provided future 739 * to the results of the returned future. 740 * @return A future that returns the result of the transformation. 741 * @since 10.0 742 */ 743 public static <I, O> Future<O> lazyTransform(final Future<I> input, 744 final Function<? super I, ? extends O> function) { 745 checkNotNull(input); 746 checkNotNull(function); 747 return new Future<O>() { 748 749 @Override 750 public boolean cancel(boolean mayInterruptIfRunning) { 751 return input.cancel(mayInterruptIfRunning); 752 } 753 754 @Override 755 public boolean isCancelled() { 756 return input.isCancelled(); 757 } 758 759 @Override 760 public boolean isDone() { 761 return input.isDone(); 762 } 763 764 @Override 765 public O get() throws InterruptedException, ExecutionException { 766 return applyTransformation(input.get()); 767 } 768 769 @Override 770 public O get(long timeout, TimeUnit unit) 771 throws InterruptedException, ExecutionException, TimeoutException { 772 return applyTransformation(input.get(timeout, unit)); 773 } 774 775 private O applyTransformation(I input) throws ExecutionException { 776 try { 777 return function.apply(input); 778 } catch (Throwable t) { 779 throw new ExecutionException(t); 780 } 781 } 782 }; 783 } 784 785 /** 786 * An implementation of {@code ListenableFuture} that also implements 787 * {@code Runnable} so that it can be used to nest ListenableFutures. 788 * Once the passed-in {@code ListenableFuture} is complete, it calls the 789 * passed-in {@code Function} to generate the result. 790 * 791 * <p>For historical reasons, this class has a special case in its exception 792 * handling: If the given {@code AsyncFunction} throws an {@code 793 * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it 794 * and uses its <i>cause</i> as the output future's exception, rather than 795 * using the {@code UndeclaredThrowableException} itself as it would for other 796 * exception types. The reason for this is that {@code Futures.transform} used 797 * to require a {@code Function}, whose {@code apply} method is not allowed to 798 * throw checked exceptions. Nowadays, {@code Futures.transform} has an 799 * overload that accepts an {@code AsyncFunction}, whose {@code apply} method 800 * <i>is</i> allowed to throw checked exception. Users who wish to throw 801 * checked exceptions should use that overload instead, and <a 802 * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we 803 * should remove the {@code UndeclaredThrowableException} special case</a>. 804 */ 805 private static class ChainingListenableFuture<I, O> 806 extends AbstractFuture<O> implements Runnable { 807 808 private AsyncFunction<? super I, ? extends O> function; 809 private ListenableFuture<? extends I> inputFuture; 810 private volatile ListenableFuture<? extends O> outputFuture; 811 private final CountDownLatch outputCreated = new CountDownLatch(1); 812 813 private ChainingListenableFuture( 814 AsyncFunction<? super I, ? extends O> function, 815 ListenableFuture<? extends I> inputFuture) { 816 this.function = checkNotNull(function); 817 this.inputFuture = checkNotNull(inputFuture); 818 } 819 820 @Override 821 public boolean cancel(boolean mayInterruptIfRunning) { 822 /* 823 * Our additional cancellation work needs to occur even if 824 * !mayInterruptIfRunning, so we can't move it into interruptTask(). 825 */ 826 if (super.cancel(mayInterruptIfRunning)) { 827 // This should never block since only one thread is allowed to cancel 828 // this Future. 829 cancel(inputFuture, mayInterruptIfRunning); 830 cancel(outputFuture, mayInterruptIfRunning); 831 return true; 832 } 833 return false; 834 } 835 836 private void cancel(@Nullable Future<?> future, 837 boolean mayInterruptIfRunning) { 838 if (future != null) { 839 future.cancel(mayInterruptIfRunning); 840 } 841 } 842 843 @Override 844 public void run() { 845 try { 846 I sourceResult; 847 try { 848 sourceResult = getUninterruptibly(inputFuture); 849 } catch (CancellationException e) { 850 // Cancel this future and return. 851 // At this point, inputFuture is cancelled and outputFuture doesn't 852 // exist, so the value of mayInterruptIfRunning is irrelevant. 853 cancel(false); 854 return; 855 } catch (ExecutionException e) { 856 // Set the cause of the exception as this future's exception 857 setException(e.getCause()); 858 return; 859 } 860 861 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 862 function.apply(sourceResult); 863 if (isCancelled()) { 864 outputFuture.cancel(wasInterrupted()); 865 this.outputFuture = null; 866 return; 867 } 868 outputFuture.addListener(new Runnable() { 869 @Override 870 public void run() { 871 try { 872 set(getUninterruptibly(outputFuture)); 873 } catch (CancellationException e) { 874 // Cancel this future and return. 875 // At this point, inputFuture and outputFuture are done, so the 876 // value of mayInterruptIfRunning is irrelevant. 877 cancel(false); 878 return; 879 } catch (ExecutionException e) { 880 // Set the cause of the exception as this future's exception 881 setException(e.getCause()); 882 } finally { 883 // Don't pin inputs beyond completion 884 ChainingListenableFuture.this.outputFuture = null; 885 } 886 } 887 }, MoreExecutors.sameThreadExecutor()); 888 } catch (UndeclaredThrowableException e) { 889 // Set the cause of the exception as this future's exception 890 setException(e.getCause()); 891 } catch (Throwable t) { 892 // This exception is irrelevant in this thread, but useful for the 893 // client 894 setException(t); 895 } finally { 896 // Don't pin inputs beyond completion 897 function = null; 898 inputFuture = null; 899 // Allow our get routines to examine outputFuture now. 900 outputCreated.countDown(); 901 } 902 } 903 } 904 905 /** 906 * Returns a new {@code ListenableFuture} whose result is the product of 907 * calling {@code get()} on the {@code Future} nested within the given {@code 908 * Future}, effectively chaining the futures one after the other. Example: 909 * 910 * <pre> {@code 911 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 912 * ListenableFuture<String> dereferenced = dereference(nested);}</pre> 913 * 914 * <p>This call has the same cancellation and execution semantics as {@link 915 * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code 916 * Future} attempts to keep its cancellation state in sync with both the 917 * input {@code Future} and the nested {@code Future}. The transformation 918 * is very lightweight and therefore takes place in the thread that called 919 * {@code dereference}. 920 * 921 * @param nested The nested future to transform. 922 * @return A future that holds result of the inner future. 923 * @since 13.0 924 */ 925 @SuppressWarnings({"rawtypes", "unchecked"}) 926 public static <V> ListenableFuture<V> dereference( 927 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 928 return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); 929 } 930 931 /** 932 * Helper {@code Function} for {@link #dereference}. 933 */ 934 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 935 new AsyncFunction<ListenableFuture<Object>, Object>() { 936 @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 937 return input; 938 } 939 }; 940 941 /** 942 * Creates a new {@code ListenableFuture} whose value is a list containing the 943 * values of all its input futures, if all succeed. If any input fails, the 944 * returned future fails. 945 * 946 * <p>The list of results is in the same order as the input list. 947 * 948 * <p>Canceling this future will attempt to cancel all the component futures, 949 * and if any of the provided futures fails or is canceled, this one is, 950 * too. 951 * 952 * @param futures futures to combine 953 * @return a future that provides a list of the results of the component 954 * futures 955 * @since 10.0 956 */ 957 @Beta 958 public static <V> ListenableFuture<List<V>> allAsList( 959 ListenableFuture<? extends V>... futures) { 960 return listFuture(ImmutableList.copyOf(futures), true, 961 MoreExecutors.sameThreadExecutor()); 962 } 963 964 /** 965 * Creates a new {@code ListenableFuture} whose value is a list containing the 966 * values of all its input futures, if all succeed. If any input fails, the 967 * returned future fails. 968 * 969 * <p>The list of results is in the same order as the input list. 970 * 971 * <p>Canceling this future will attempt to cancel all the component futures, 972 * and if any of the provided futures fails or is canceled, this one is, 973 * too. 974 * 975 * @param futures futures to combine 976 * @return a future that provides a list of the results of the component 977 * futures 978 * @since 10.0 979 */ 980 @Beta 981 public static <V> ListenableFuture<List<V>> allAsList( 982 Iterable<? extends ListenableFuture<? extends V>> futures) { 983 return listFuture(ImmutableList.copyOf(futures), true, 984 MoreExecutors.sameThreadExecutor()); 985 } 986 987 /** 988 * Creates a new {@code ListenableFuture} whose result is set from the 989 * supplied future when it completes. Cancelling the supplied future 990 * will also cancel the returned future, but cancelling the returned 991 * future will have no effect on the supplied future. 992 * 993 * @since 15.0 994 */ 995 public static <V> ListenableFuture<V> nonCancellationPropagating( 996 ListenableFuture<V> future) { 997 return new NonCancellationPropagatingFuture<V>(future); 998 } 999 1000 /** 1001 * A wrapped future that does not propagate cancellation to its delegate. 1002 */ 1003 private static class NonCancellationPropagatingFuture<V> 1004 extends AbstractFuture<V> { 1005 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 1006 checkNotNull(delegate); 1007 addCallback(delegate, new FutureCallback<V>() { 1008 @Override 1009 public void onSuccess(V result) { 1010 set(result); 1011 } 1012 1013 @Override 1014 public void onFailure(Throwable t) { 1015 if (delegate.isCancelled()) { 1016 cancel(false); 1017 } else { 1018 setException(t); 1019 } 1020 } 1021 }, sameThreadExecutor()); 1022 } 1023 } 1024 1025 /** 1026 * Creates a new {@code ListenableFuture} whose value is a list containing the 1027 * values of all its successful input futures. The list of results is in the 1028 * same order as the input list, and if any of the provided futures fails or 1029 * is canceled, its corresponding position will contain {@code null} (which is 1030 * indistinguishable from the future having a successful value of 1031 * {@code null}). 1032 * 1033 * <p>Canceling this future will attempt to cancel all the component futures. 1034 * 1035 * @param futures futures to combine 1036 * @return a future that provides a list of the results of the component 1037 * futures 1038 * @since 10.0 1039 */ 1040 @Beta 1041 public static <V> ListenableFuture<List<V>> successfulAsList( 1042 ListenableFuture<? extends V>... futures) { 1043 return listFuture(ImmutableList.copyOf(futures), false, 1044 MoreExecutors.sameThreadExecutor()); 1045 } 1046 1047 /** 1048 * Creates a new {@code ListenableFuture} whose value is a list containing the 1049 * values of all its successful input futures. The list of results is in the 1050 * same order as the input list, and if any of the provided futures fails or 1051 * is canceled, its corresponding position will contain {@code null} (which is 1052 * indistinguishable from the future having a successful value of 1053 * {@code null}). 1054 * 1055 * <p>Canceling this future will attempt to cancel all the component futures. 1056 * 1057 * @param futures futures to combine 1058 * @return a future that provides a list of the results of the component 1059 * futures 1060 * @since 10.0 1061 */ 1062 @Beta 1063 public static <V> ListenableFuture<List<V>> successfulAsList( 1064 Iterable<? extends ListenableFuture<? extends V>> futures) { 1065 return listFuture(ImmutableList.copyOf(futures), false, 1066 MoreExecutors.sameThreadExecutor()); 1067 } 1068 1069 /** 1070 * Registers separate success and failure callbacks to be run when the {@code 1071 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1072 * complete} or, if the computation is already complete, immediately. 1073 * 1074 * <p>There is no guaranteed ordering of execution of callbacks, but any 1075 * callback added through this method is guaranteed to be called once the 1076 * computation is complete. 1077 * 1078 * Example: <pre> {@code 1079 * ListenableFuture<QueryResult> future = ...; 1080 * addCallback(future, 1081 * new FutureCallback<QueryResult> { 1082 * public void onSuccess(QueryResult result) { 1083 * storeInCache(result); 1084 * } 1085 * public void onFailure(Throwable t) { 1086 * reportError(t); 1087 * } 1088 * });}</pre> 1089 * 1090 * <p>Note: If the callback is slow or heavyweight, consider {@linkplain 1091 * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an 1092 * executor}. If you do not supply an executor, {@code addCallback} will use 1093 * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries 1094 * some caveats for heavier operations. For example, the callback may run on 1095 * an unpredictable or undesirable thread: 1096 * 1097 * <ul> 1098 * <li>If the input {@code Future} is done at the time {@code addCallback} is 1099 * called, {@code addCallback} will execute the callback inline. 1100 * <li>If the input {@code Future} is not yet done, {@code addCallback} will 1101 * schedule the callback to be run by the thread that completes the input 1102 * {@code Future}, which may be an internal system thread such as an RPC 1103 * network thread. 1104 * </ul> 1105 * 1106 * <p>Also note that, regardless of which thread executes the {@code 1107 * sameThreadExecutor} callback, all other registered but unexecuted listeners 1108 * are prevented from running during its execution, even if those listeners 1109 * are to run in other executors. 1110 * 1111 * <p>For a more general interface to attach a completion listener to a 1112 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1113 * 1114 * @param future The future attach the callback to. 1115 * @param callback The callback to invoke when {@code future} is completed. 1116 * @since 10.0 1117 */ 1118 public static <V> void addCallback(ListenableFuture<V> future, 1119 FutureCallback<? super V> callback) { 1120 addCallback(future, callback, MoreExecutors.sameThreadExecutor()); 1121 } 1122 1123 /** 1124 * Registers separate success and failure callbacks to be run when the {@code 1125 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1126 * complete} or, if the computation is already complete, immediately. 1127 * 1128 * <p>The callback is run in {@code executor}. 1129 * There is no guaranteed ordering of execution of callbacks, but any 1130 * callback added through this method is guaranteed to be called once the 1131 * computation is complete. 1132 * 1133 * Example: <pre> {@code 1134 * ListenableFuture<QueryResult> future = ...; 1135 * Executor e = ... 1136 * addCallback(future, 1137 * new FutureCallback<QueryResult> { 1138 * public void onSuccess(QueryResult result) { 1139 * storeInCache(result); 1140 * } 1141 * public void onFailure(Throwable t) { 1142 * reportError(t); 1143 * } 1144 * }, e);}</pre> 1145 * 1146 * <p>When the callback is fast and lightweight, consider {@linkplain 1147 * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or 1148 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 1149 * caveats documented in the link above. 1150 * 1151 * <p>For a more general interface to attach a completion listener to a 1152 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1153 * 1154 * @param future The future attach the callback to. 1155 * @param callback The callback to invoke when {@code future} is completed. 1156 * @param executor The executor to run {@code callback} when the future 1157 * completes. 1158 * @since 10.0 1159 */ 1160 public static <V> void addCallback(final ListenableFuture<V> future, 1161 final FutureCallback<? super V> callback, Executor executor) { 1162 Preconditions.checkNotNull(callback); 1163 Runnable callbackListener = new Runnable() { 1164 @Override 1165 public void run() { 1166 final V value; 1167 try { 1168 // TODO(user): (Before Guava release), validate that this 1169 // is the thing for IE. 1170 value = getUninterruptibly(future); 1171 } catch (ExecutionException e) { 1172 callback.onFailure(e.getCause()); 1173 return; 1174 } catch (RuntimeException e) { 1175 callback.onFailure(e); 1176 return; 1177 } catch (Error e) { 1178 callback.onFailure(e); 1179 return; 1180 } 1181 callback.onSuccess(value); 1182 } 1183 }; 1184 future.addListener(callbackListener, executor); 1185 } 1186 1187 /** 1188 * Returns the result of {@link Future#get()}, converting most exceptions to a 1189 * new instance of the given checked exception type. This reduces boilerplate 1190 * for a common use of {@code Future} in which it is unnecessary to 1191 * programmatically distinguish between exception types or to extract other 1192 * information from the exception instance. 1193 * 1194 * <p>Exceptions from {@code Future.get} are treated as follows: 1195 * <ul> 1196 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1197 * {@code X} if the cause is a checked exception, an {@link 1198 * UncheckedExecutionException} if the cause is a {@code 1199 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1200 * {@code Error}. 1201 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1202 * restoring the interrupt). 1203 * <li>Any {@link CancellationException} is propagated untouched, as is any 1204 * other {@link RuntimeException} (though {@code get} implementations are 1205 * discouraged from throwing such exceptions). 1206 * </ul> 1207 * 1208 * <p>The overall principle is to continue to treat every checked exception as a 1209 * checked exception, every unchecked exception as an unchecked exception, and 1210 * every error as an error. In addition, the cause of any {@code 1211 * ExecutionException} is wrapped in order to ensure that the new stack trace 1212 * matches that of the current thread. 1213 * 1214 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1215 * public constructor that accepts zero or more arguments, all of type {@code 1216 * String} or {@code Throwable} (preferring constructors with at least one 1217 * {@code String}) and calling the constructor via reflection. If the 1218 * exception did not already have a cause, one is set by calling {@link 1219 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1220 * {@code IllegalArgumentException} is thrown. 1221 * 1222 * @throws X if {@code get} throws any checked exception except for an {@code 1223 * ExecutionException} whose cause is not itself a checked exception 1224 * @throws UncheckedExecutionException if {@code get} throws an {@code 1225 * ExecutionException} with a {@code RuntimeException} as its cause 1226 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1227 * with an {@code Error} as its cause 1228 * @throws CancellationException if {@code get} throws a {@code 1229 * CancellationException} 1230 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1231 * RuntimeException} or does not have a suitable constructor 1232 * @since 10.0 1233 */ 1234 public static <V, X extends Exception> V get( 1235 Future<V> future, Class<X> exceptionClass) throws X { 1236 checkNotNull(future); 1237 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1238 "Futures.get exception type (%s) must not be a RuntimeException", 1239 exceptionClass); 1240 try { 1241 return future.get(); 1242 } catch (InterruptedException e) { 1243 currentThread().interrupt(); 1244 throw newWithCause(exceptionClass, e); 1245 } catch (ExecutionException e) { 1246 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1247 throw new AssertionError(); 1248 } 1249 } 1250 1251 /** 1252 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1253 * exceptions to a new instance of the given checked exception type. This 1254 * reduces boilerplate for a common use of {@code Future} in which it is 1255 * unnecessary to programmatically distinguish between exception types or to 1256 * extract other information from the exception instance. 1257 * 1258 * <p>Exceptions from {@code Future.get} are treated as follows: 1259 * <ul> 1260 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1261 * {@code X} if the cause is a checked exception, an {@link 1262 * UncheckedExecutionException} if the cause is a {@code 1263 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1264 * {@code Error}. 1265 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1266 * restoring the interrupt). 1267 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1268 * <li>Any {@link CancellationException} is propagated untouched, as is any 1269 * other {@link RuntimeException} (though {@code get} implementations are 1270 * discouraged from throwing such exceptions). 1271 * </ul> 1272 * 1273 * <p>The overall principle is to continue to treat every checked exception as a 1274 * checked exception, every unchecked exception as an unchecked exception, and 1275 * every error as an error. In addition, the cause of any {@code 1276 * ExecutionException} is wrapped in order to ensure that the new stack trace 1277 * matches that of the current thread. 1278 * 1279 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1280 * public constructor that accepts zero or more arguments, all of type {@code 1281 * String} or {@code Throwable} (preferring constructors with at least one 1282 * {@code String}) and calling the constructor via reflection. If the 1283 * exception did not already have a cause, one is set by calling {@link 1284 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1285 * {@code IllegalArgumentException} is thrown. 1286 * 1287 * @throws X if {@code get} throws any checked exception except for an {@code 1288 * ExecutionException} whose cause is not itself a checked exception 1289 * @throws UncheckedExecutionException if {@code get} throws an {@code 1290 * ExecutionException} with a {@code RuntimeException} as its cause 1291 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1292 * with an {@code Error} as its cause 1293 * @throws CancellationException if {@code get} throws a {@code 1294 * CancellationException} 1295 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1296 * RuntimeException} or does not have a suitable constructor 1297 * @since 10.0 1298 */ 1299 public static <V, X extends Exception> V get( 1300 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 1301 throws X { 1302 checkNotNull(future); 1303 checkNotNull(unit); 1304 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1305 "Futures.get exception type (%s) must not be a RuntimeException", 1306 exceptionClass); 1307 try { 1308 return future.get(timeout, unit); 1309 } catch (InterruptedException e) { 1310 currentThread().interrupt(); 1311 throw newWithCause(exceptionClass, e); 1312 } catch (TimeoutException e) { 1313 throw newWithCause(exceptionClass, e); 1314 } catch (ExecutionException e) { 1315 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1316 throw new AssertionError(); 1317 } 1318 } 1319 1320 private static <X extends Exception> void wrapAndThrowExceptionOrError( 1321 Throwable cause, Class<X> exceptionClass) throws X { 1322 if (cause instanceof Error) { 1323 throw new ExecutionError((Error) cause); 1324 } 1325 if (cause instanceof RuntimeException) { 1326 throw new UncheckedExecutionException(cause); 1327 } 1328 throw newWithCause(exceptionClass, cause); 1329 } 1330 1331 /** 1332 * Returns the result of calling {@link Future#get()} uninterruptibly on a 1333 * task known not to throw a checked exception. This makes {@code Future} more 1334 * suitable for lightweight, fast-running tasks that, barring bugs in the 1335 * code, will not fail. This gives it exception-handling behavior similar to 1336 * that of {@code ForkJoinTask.join}. 1337 * 1338 * <p>Exceptions from {@code Future.get} are treated as follows: 1339 * <ul> 1340 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1341 * {@link UncheckedExecutionException} (if the cause is an {@code 1342 * Exception}) or {@link ExecutionError} (if the cause is an {@code 1343 * Error}). 1344 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 1345 * call. The interrupt is restored before {@code getUnchecked} returns. 1346 * <li>Any {@link CancellationException} is propagated untouched. So is any 1347 * other {@link RuntimeException} ({@code get} implementations are 1348 * discouraged from throwing such exceptions). 1349 * </ul> 1350 * 1351 * <p>The overall principle is to eliminate all checked exceptions: to loop to 1352 * avoid {@code InterruptedException}, to pass through {@code 1353 * CancellationException}, and to wrap any exception from the underlying 1354 * computation in an {@code UncheckedExecutionException} or {@code 1355 * ExecutionError}. 1356 * 1357 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 1358 * {@link Uninterruptibles#getUninterruptibly(Future)}. 1359 * 1360 * @throws UncheckedExecutionException if {@code get} throws an {@code 1361 * ExecutionException} with an {@code Exception} as its cause 1362 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1363 * with an {@code Error} as its cause 1364 * @throws CancellationException if {@code get} throws a {@code 1365 * CancellationException} 1366 * @since 10.0 1367 */ 1368 public static <V> V getUnchecked(Future<V> future) { 1369 checkNotNull(future); 1370 try { 1371 return getUninterruptibly(future); 1372 } catch (ExecutionException e) { 1373 wrapAndThrowUnchecked(e.getCause()); 1374 throw new AssertionError(); 1375 } 1376 } 1377 1378 private static void wrapAndThrowUnchecked(Throwable cause) { 1379 if (cause instanceof Error) { 1380 throw new ExecutionError((Error) cause); 1381 } 1382 /* 1383 * It's a non-Error, non-Exception Throwable. From my survey of such 1384 * classes, I believe that most users intended to extend Exception, so we'll 1385 * treat it like an Exception. 1386 */ 1387 throw new UncheckedExecutionException(cause); 1388 } 1389 1390 /* 1391 * TODO(user): FutureChecker interface for these to be static methods on? If 1392 * so, refer to it in the (static-method) Futures.get documentation 1393 */ 1394 1395 /* 1396 * Arguably we don't need a timed getUnchecked because any operation slow 1397 * enough to require a timeout is heavyweight enough to throw a checked 1398 * exception and therefore be inappropriate to use with getUnchecked. Further, 1399 * it's not clear that converting the checked TimeoutException to a 1400 * RuntimeException -- especially to an UncheckedExecutionException, since it 1401 * wasn't thrown by the computation -- makes sense, and if we don't convert 1402 * it, the user still has to write a try-catch block. 1403 * 1404 * If you think you would use this method, let us know. 1405 */ 1406 1407 private static <X extends Exception> X newWithCause( 1408 Class<X> exceptionClass, Throwable cause) { 1409 // getConstructors() guarantees this as long as we don't modify the array. 1410 @SuppressWarnings("unchecked") 1411 List<Constructor<X>> constructors = 1412 (List) Arrays.asList(exceptionClass.getConstructors()); 1413 for (Constructor<X> constructor : preferringStrings(constructors)) { 1414 @Nullable X instance = newFromConstructor(constructor, cause); 1415 if (instance != null) { 1416 if (instance.getCause() == null) { 1417 instance.initCause(cause); 1418 } 1419 return instance; 1420 } 1421 } 1422 throw new IllegalArgumentException( 1423 "No appropriate constructor for exception of type " + exceptionClass 1424 + " in response to chained exception", cause); 1425 } 1426 1427 private static <X extends Exception> List<Constructor<X>> 1428 preferringStrings(List<Constructor<X>> constructors) { 1429 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); 1430 } 1431 1432 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST = 1433 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() { 1434 @Override public Boolean apply(Constructor<?> input) { 1435 return asList(input.getParameterTypes()).contains(String.class); 1436 } 1437 }).reverse(); 1438 1439 @Nullable private static <X> X newFromConstructor( 1440 Constructor<X> constructor, Throwable cause) { 1441 Class<?>[] paramTypes = constructor.getParameterTypes(); 1442 Object[] params = new Object[paramTypes.length]; 1443 for (int i = 0; i < paramTypes.length; i++) { 1444 Class<?> paramType = paramTypes[i]; 1445 if (paramType.equals(String.class)) { 1446 params[i] = cause.toString(); 1447 } else if (paramType.equals(Throwable.class)) { 1448 params[i] = cause; 1449 } else { 1450 return null; 1451 } 1452 } 1453 try { 1454 return constructor.newInstance(params); 1455 } catch (IllegalArgumentException e) { 1456 return null; 1457 } catch (InstantiationException e) { 1458 return null; 1459 } catch (IllegalAccessException e) { 1460 return null; 1461 } catch (InvocationTargetException e) { 1462 return null; 1463 } 1464 } 1465 1466 private interface FutureCombiner<V, C> { 1467 C combine(List<Optional<V>> values); 1468 } 1469 1470 private static class CombinedFuture<V, C> extends AbstractFuture<C> { 1471 private static final Logger logger = 1472 Logger.getLogger(CombinedFuture.class.getName()); 1473 1474 ImmutableCollection<? extends ListenableFuture<? extends V>> futures; 1475 final boolean allMustSucceed; 1476 final AtomicInteger remaining; 1477 FutureCombiner<V, C> combiner; 1478 List<Optional<V>> values; 1479 final Object seenExceptionsLock = new Object(); 1480 Set<Throwable> seenExceptions; 1481 1482 CombinedFuture( 1483 ImmutableCollection<? extends ListenableFuture<? extends V>> futures, 1484 boolean allMustSucceed, Executor listenerExecutor, 1485 FutureCombiner<V, C> combiner) { 1486 this.futures = futures; 1487 this.allMustSucceed = allMustSucceed; 1488 this.remaining = new AtomicInteger(futures.size()); 1489 this.combiner = combiner; 1490 this.values = Lists.newArrayListWithCapacity(futures.size()); 1491 init(listenerExecutor); 1492 } 1493 1494 /** 1495 * Must be called at the end of the constructor. 1496 */ 1497 protected void init(final Executor listenerExecutor) { 1498 // First, schedule cleanup to execute when the Future is done. 1499 addListener(new Runnable() { 1500 @Override 1501 public void run() { 1502 // Cancel all the component futures. 1503 if (CombinedFuture.this.isCancelled()) { 1504 for (ListenableFuture<?> future : CombinedFuture.this.futures) { 1505 future.cancel(CombinedFuture.this.wasInterrupted()); 1506 } 1507 } 1508 1509 // Let go of the memory held by other futures 1510 CombinedFuture.this.futures = null; 1511 1512 // By now the values array has either been set as the Future's value, 1513 // or (in case of failure) is no longer useful. 1514 CombinedFuture.this.values = null; 1515 1516 // The combiner may also hold state, so free that as well 1517 CombinedFuture.this.combiner = null; 1518 } 1519 }, MoreExecutors.sameThreadExecutor()); 1520 1521 // Now begin the "real" initialization. 1522 1523 // Corner case: List is empty. 1524 if (futures.isEmpty()) { 1525 set(combiner.combine(ImmutableList.<Optional<V>>of())); 1526 return; 1527 } 1528 1529 // Populate the results list with null initially. 1530 for (int i = 0; i < futures.size(); ++i) { 1531 values.add(null); 1532 } 1533 1534 // Register a listener on each Future in the list to update 1535 // the state of this future. 1536 // Note that if all the futures on the list are done prior to completing 1537 // this loop, the last call to addListener() will callback to 1538 // setOneValue(), transitively call our cleanup listener, and set 1539 // this.futures to null. 1540 // This is not actually a problem, since the foreach only needs 1541 // this.futures to be non-null at the beginning of the loop. 1542 int i = 0; 1543 for (final ListenableFuture<? extends V> listenable : futures) { 1544 final int index = i++; 1545 listenable.addListener(new Runnable() { 1546 @Override 1547 public void run() { 1548 setOneValue(index, listenable); 1549 } 1550 }, listenerExecutor); 1551 } 1552 } 1553 1554 /** 1555 * Fails this future with the given Throwable if {@link #allMustSucceed} is 1556 * true. Also, logs the throwable if it is an {@link Error} or if 1557 * {@link #allMustSucceed} is {@code true}, the throwable did not cause 1558 * this future to fail, and it is the first time we've seen that particular Throwable. 1559 */ 1560 private void setExceptionAndMaybeLog(Throwable throwable) { 1561 boolean visibleFromOutputFuture = false; 1562 boolean firstTimeSeeingThisException = true; 1563 if (allMustSucceed) { 1564 // As soon as the first one fails, throw the exception up. 1565 // The result of all other inputs is then ignored. 1566 visibleFromOutputFuture = super.setException(throwable); 1567 1568 synchronized (seenExceptionsLock) { 1569 if (seenExceptions == null) { 1570 seenExceptions = Sets.newHashSet(); 1571 } 1572 firstTimeSeeingThisException = seenExceptions.add(throwable); 1573 } 1574 } 1575 1576 if (throwable instanceof Error 1577 || (allMustSucceed && !visibleFromOutputFuture && firstTimeSeeingThisException)) { 1578 logger.log(Level.SEVERE, "input future failed.", throwable); 1579 } 1580 } 1581 1582 /** 1583 * Sets the value at the given index to that of the given future. 1584 */ 1585 private void setOneValue(int index, Future<? extends V> future) { 1586 List<Optional<V>> localValues = values; 1587 // TODO(user): This check appears to be redundant since values is 1588 // assigned null only after the future completes. However, values 1589 // is not volatile so it may be possible for us to observe the changes 1590 // to these two values in a different order... which I think is why 1591 // we need to check both. Clear up this craziness either by making 1592 // values volatile or proving that it doesn't need to be for some other 1593 // reason. 1594 if (isDone() || localValues == null) { 1595 // Some other future failed or has been cancelled, causing this one to 1596 // also be cancelled or have an exception set. This should only happen 1597 // if allMustSucceed is true or if the output itself has been 1598 // cancelled. 1599 checkState(allMustSucceed || isCancelled(), 1600 "Future was done before all dependencies completed"); 1601 } 1602 1603 try { 1604 checkState(future.isDone(), 1605 "Tried to set value from future which is not done"); 1606 V returnValue = getUninterruptibly(future); 1607 if (localValues != null) { 1608 localValues.set(index, Optional.fromNullable(returnValue)); 1609 } 1610 } catch (CancellationException e) { 1611 if (allMustSucceed) { 1612 // Set ourselves as cancelled. Let the input futures keep running 1613 // as some of them may be used elsewhere. 1614 cancel(false); 1615 } 1616 } catch (ExecutionException e) { 1617 setExceptionAndMaybeLog(e.getCause()); 1618 } catch (Throwable t) { 1619 setExceptionAndMaybeLog(t); 1620 } finally { 1621 int newRemaining = remaining.decrementAndGet(); 1622 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 1623 if (newRemaining == 0) { 1624 FutureCombiner<V, C> localCombiner = combiner; 1625 if (localCombiner != null && localValues != null) { 1626 set(localCombiner.combine(localValues)); 1627 } else { 1628 checkState(isDone()); 1629 } 1630 } 1631 } 1632 } 1633 1634 } 1635 1636 /** Used for {@link #allAsList} and {@link #successfulAsList}. */ 1637 private static <V> ListenableFuture<List<V>> listFuture( 1638 ImmutableList<ListenableFuture<? extends V>> futures, 1639 boolean allMustSucceed, Executor listenerExecutor) { 1640 return new CombinedFuture<V, List<V>>( 1641 futures, allMustSucceed, listenerExecutor, 1642 new FutureCombiner<V, List<V>>() { 1643 @Override 1644 public List<V> combine(List<Optional<V>> values) { 1645 List<V> result = Lists.newArrayList(); 1646 for (Optional<V> element : values) { 1647 result.add(element != null ? element.orNull() : null); 1648 } 1649 return Collections.unmodifiableList(result); 1650 } 1651 }); 1652 } 1653 1654 /** 1655 * A checked future that uses a function to map from exceptions to the 1656 * appropriate checked type. 1657 */ 1658 private static class MappingCheckedFuture<V, X extends Exception> extends 1659 AbstractCheckedFuture<V, X> { 1660 1661 final Function<Exception, X> mapper; 1662 1663 MappingCheckedFuture(ListenableFuture<V> delegate, 1664 Function<Exception, X> mapper) { 1665 super(delegate); 1666 1667 this.mapper = checkNotNull(mapper); 1668 } 1669 1670 @Override 1671 protected X mapException(Exception e) { 1672 return mapper.apply(e); 1673 } 1674 } 1675}