001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.base.Preconditions.checkState; 022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; 023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 024import static java.lang.Thread.currentThread; 025import static java.util.Arrays.asList; 026 027import com.google.common.annotations.Beta; 028import com.google.common.base.Function; 029import com.google.common.base.Optional; 030import com.google.common.base.Preconditions; 031import com.google.common.collect.ImmutableCollection; 032import com.google.common.collect.ImmutableList; 033import com.google.common.collect.Lists; 034import com.google.common.collect.Ordering; 035 036import java.lang.reflect.Constructor; 037import java.lang.reflect.InvocationTargetException; 038import java.lang.reflect.UndeclaredThrowableException; 039import java.util.Arrays; 040import java.util.List; 041import java.util.concurrent.CancellationException; 042import java.util.concurrent.CountDownLatch; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.Executor; 045import java.util.concurrent.Future; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.TimeoutException; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.logging.Level; 050import java.util.logging.Logger; 051 052import javax.annotation.Nullable; 053 054/** 055 * Static utility methods pertaining to the {@link Future} interface. 056 * 057 * <p>Many of these methods use the {@link ListenableFuture} API; consult the 058 * Guava User Guide article on <a href= 059 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> 060 * {@code ListenableFuture}</a>. 061 * 062 * @author Kevin Bourrillion 063 * @author Nishant Thakkar 064 * @author Sven Mawson 065 * @since 1.0 066 */ 067@Beta 068public final class Futures { 069 private Futures() {} 070 071 /** 072 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 073 * and a {@link Function} that maps from {@link Exception} instances into the 074 * appropriate checked type. 075 * 076 * <p>The given mapping function will be applied to an 077 * {@link InterruptedException}, a {@link CancellationException}, or an 078 * {@link ExecutionException}. 079 * See {@link Future#get()} for details on the exceptions thrown. 080 * 081 * @since 9.0 (source-compatible since 1.0) 082 */ 083 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 084 ListenableFuture<V> future, Function<Exception, X> mapper) { 085 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 086 } 087 088 private abstract static class ImmediateFuture<V> 089 implements ListenableFuture<V> { 090 091 private static final Logger log = 092 Logger.getLogger(ImmediateFuture.class.getName()); 093 094 @Override 095 public void addListener(Runnable listener, Executor executor) { 096 checkNotNull(listener, "Runnable was null."); 097 checkNotNull(executor, "Executor was null."); 098 try { 099 executor.execute(listener); 100 } catch (RuntimeException e) { 101 // ListenableFuture's contract is that it will not throw unchecked 102 // exceptions, so log the bad runnable and/or executor and swallow it. 103 log.log(Level.SEVERE, "RuntimeException while executing runnable " 104 + listener + " with executor " + executor, e); 105 } 106 } 107 108 @Override 109 public boolean cancel(boolean mayInterruptIfRunning) { 110 return false; 111 } 112 113 @Override 114 public abstract V get() throws ExecutionException; 115 116 @Override 117 public V get(long timeout, TimeUnit unit) throws ExecutionException { 118 checkNotNull(unit); 119 return get(); 120 } 121 122 @Override 123 public boolean isCancelled() { 124 return false; 125 } 126 127 @Override 128 public boolean isDone() { 129 return true; 130 } 131 } 132 133 private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { 134 135 @Nullable private final V value; 136 137 ImmediateSuccessfulFuture(@Nullable V value) { 138 this.value = value; 139 } 140 141 @Override 142 public V get() { 143 return value; 144 } 145 } 146 147 private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception> 148 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 149 150 @Nullable private final V value; 151 152 ImmediateSuccessfulCheckedFuture(@Nullable V value) { 153 this.value = value; 154 } 155 156 @Override 157 public V get() { 158 return value; 159 } 160 161 @Override 162 public V checkedGet() { 163 return value; 164 } 165 166 @Override 167 public V checkedGet(long timeout, TimeUnit unit) { 168 checkNotNull(unit); 169 return value; 170 } 171 } 172 173 private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { 174 175 private final Throwable thrown; 176 177 ImmediateFailedFuture(Throwable thrown) { 178 this.thrown = thrown; 179 } 180 181 @Override 182 public V get() throws ExecutionException { 183 throw new ExecutionException(thrown); 184 } 185 } 186 187 private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> { 188 189 private final CancellationException thrown; 190 191 ImmediateCancelledFuture() { 192 this.thrown = new CancellationException("Immediate cancelled future."); 193 } 194 195 @Override 196 public boolean isCancelled() { 197 return true; 198 } 199 200 @Override 201 public V get() { 202 throw AbstractFuture.cancellationExceptionWithCause( 203 "Task was cancelled.", thrown); 204 } 205 } 206 207 private static class ImmediateFailedCheckedFuture<V, X extends Exception> 208 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 209 210 private final X thrown; 211 212 ImmediateFailedCheckedFuture(X thrown) { 213 this.thrown = thrown; 214 } 215 216 @Override 217 public V get() throws ExecutionException { 218 throw new ExecutionException(thrown); 219 } 220 221 @Override 222 public V checkedGet() throws X { 223 throw thrown; 224 } 225 226 @Override 227 public V checkedGet(long timeout, TimeUnit unit) throws X { 228 checkNotNull(unit); 229 throw thrown; 230 } 231 } 232 233 /** 234 * Creates a {@code ListenableFuture} which has its value set immediately upon 235 * construction. The getters just return the value. This {@code Future} can't 236 * be canceled or timed out and its {@code isDone()} method always returns 237 * {@code true}. 238 */ 239 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 240 return new ImmediateSuccessfulFuture<V>(value); 241 } 242 243 /** 244 * Returns a {@code CheckedFuture} which has its value set immediately upon 245 * construction. 246 * 247 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 248 * method always returns {@code true}. Calling {@code get()} or {@code 249 * checkedGet()} will immediately return the provided value. 250 */ 251 public static <V, X extends Exception> CheckedFuture<V, X> 252 immediateCheckedFuture(@Nullable V value) { 253 return new ImmediateSuccessfulCheckedFuture<V, X>(value); 254 } 255 256 /** 257 * Returns a {@code ListenableFuture} which has an exception set immediately 258 * upon construction. 259 * 260 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 261 * method always returns {@code true}. Calling {@code get()} will immediately 262 * throw the provided {@code Throwable} wrapped in an {@code 263 * ExecutionException}. 264 */ 265 public static <V> ListenableFuture<V> immediateFailedFuture( 266 Throwable throwable) { 267 checkNotNull(throwable); 268 return new ImmediateFailedFuture<V>(throwable); 269 } 270 271 /** 272 * Creates a {@code ListenableFuture} which is cancelled immediately upon 273 * construction, so that {@code isCancelled()} always returns {@code true}. 274 * 275 * @since 14.0 276 */ 277 public static <V> ListenableFuture<V> immediateCancelledFuture() { 278 return new ImmediateCancelledFuture<V>(); 279 } 280 281 /** 282 * Returns a {@code CheckedFuture} which has an exception set immediately upon 283 * construction. 284 * 285 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 286 * method always returns {@code true}. Calling {@code get()} will immediately 287 * throw the provided {@code Exception} wrapped in an {@code 288 * ExecutionException}, and calling {@code checkedGet()} will throw the 289 * provided exception itself. 290 */ 291 public static <V, X extends Exception> CheckedFuture<V, X> 292 immediateFailedCheckedFuture(X exception) { 293 checkNotNull(exception); 294 return new ImmediateFailedCheckedFuture<V, X>(exception); 295 } 296 297 /** 298 * Returns a {@code Future} whose result is taken from the given primary 299 * {@code input} or, if the primary input fails, from the {@code Future} 300 * provided by the {@code fallback}. {@link FutureFallback#create} is not 301 * invoked until the primary input has failed, so if the primary input 302 * succeeds, it is never invoked. If, during the invocation of {@code 303 * fallback}, an exception is thrown, this exception is used as the result of 304 * the output {@code Future}. 305 * 306 * <p>Below is an example of a fallback that returns a default value if an 307 * exception occurs: 308 * 309 * <pre> {@code 310 * ListenableFuture<Integer> fetchCounterFuture = ...; 311 * 312 * // Falling back to a zero counter in case an exception happens when 313 * // processing the RPC to fetch counters. 314 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 315 * fetchCounterFuture, new FutureFallback<Integer>() { 316 * public ListenableFuture<Integer> create(Throwable t) { 317 * // Returning "0" as the default for the counter when the 318 * // exception happens. 319 * return immediateFuture(0); 320 * } 321 * }); 322 * }</pre> 323 * 324 * The fallback can also choose to propagate the original exception when 325 * desired: 326 * 327 * <pre> {@code 328 * ListenableFuture<Integer> fetchCounterFuture = ...; 329 * 330 * // Falling back to a zero counter only in case the exception was a 331 * // TimeoutException. 332 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 333 * fetchCounterFuture, new FutureFallback<Integer>() { 334 * public ListenableFuture<Integer> create(Throwable t) { 335 * if (t instanceof TimeoutException) { 336 * return immediateFuture(0); 337 * } 338 * return immediateFailedFuture(t); 339 * } 340 * }); 341 * }</pre> 342 * 343 * Note: If the derived {@code Future} is slow or heavyweight to create 344 * (whether the {@code Future} itself is slow or heavyweight to complete is 345 * irrelevant), consider {@linkplain #withFallback(ListenableFuture, 346 * FutureFallback, Executor) supplying an executor}. If you do not supply an 347 * executor, {@code withFallback} will use {@link 348 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 349 * caveats for heavier operations. For example, the call to {@code 350 * fallback.create} may run on an unpredictable or undesirable thread: 351 * 352 * <ul> 353 * <li>If the input {@code Future} is done at the time {@code withFallback} 354 * is called, {@code withFallback} will call {@code fallback.create} inline. 355 * <li>If the input {@code Future} is not yet done, {@code withFallback} will 356 * schedule {@code fallback.create} to be run by the thread that completes 357 * the input {@code Future}, which may be an internal system thread such as 358 * an RPC network thread. 359 * </ul> 360 * 361 * Also note that, regardless of which thread executes {@code 362 * fallback.create}, all other registered but unexecuted listeners are 363 * prevented from running during its execution, even if those listeners are 364 * to run in other executors. 365 * 366 * @param input the primary input {@code Future} 367 * @param fallback the {@link FutureFallback} implementation to be called if 368 * {@code input} fails 369 * @since 14.0 370 */ 371 public static <V> ListenableFuture<V> withFallback( 372 ListenableFuture<? extends V> input, 373 FutureFallback<? extends V> fallback) { 374 return withFallback(input, fallback, sameThreadExecutor()); 375 } 376 377 /** 378 * Returns a {@code Future} whose result is taken from the given primary 379 * {@code input} or, if the primary input fails, from the {@code Future} 380 * provided by the {@code fallback}. {@link FutureFallback#create} is not 381 * invoked until the primary input has failed, so if the primary input 382 * succeeds, it is never invoked. If, during the invocation of {@code 383 * fallback}, an exception is thrown, this exception is used as the result of 384 * the output {@code Future}. 385 * 386 * <p>Below is an example of a fallback that returns a default value if an 387 * exception occurs: 388 * 389 * <pre> {@code 390 * ListenableFuture<Integer> fetchCounterFuture = ...; 391 * 392 * // Falling back to a zero counter in case an exception happens when 393 * // processing the RPC to fetch counters. 394 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 395 * fetchCounterFuture, new FutureFallback<Integer>() { 396 * public ListenableFuture<Integer> create(Throwable t) { 397 * // Returning "0" as the default for the counter when the 398 * // exception happens. 399 * return immediateFuture(0); 400 * } 401 * }, sameThreadExecutor()); 402 * }</pre> 403 * 404 * The fallback can also choose to propagate the original exception when 405 * desired: 406 * 407 * <pre> {@code 408 * ListenableFuture<Integer> fetchCounterFuture = ...; 409 * 410 * // Falling back to a zero counter only in case the exception was a 411 * // TimeoutException. 412 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 413 * fetchCounterFuture, new FutureFallback<Integer>() { 414 * public ListenableFuture<Integer> create(Throwable t) { 415 * if (t instanceof TimeoutException) { 416 * return immediateFuture(0); 417 * } 418 * return immediateFailedFuture(t); 419 * } 420 * }, sameThreadExecutor()); 421 * }</pre> 422 * 423 * When the execution of {@code fallback.create} is fast and lightweight 424 * (though the {@code Future} it returns need not meet these criteria), 425 * consider {@linkplain #withFallback(ListenableFuture, FutureFallback) 426 * omitting the executor} or explicitly specifying {@code 427 * sameThreadExecutor}. However, be aware of the caveats documented in the 428 * link above. 429 * 430 * @param input the primary input {@code Future} 431 * @param fallback the {@link FutureFallback} implementation to be called if 432 * {@code input} fails 433 * @param executor the executor that runs {@code fallback} if {@code input} 434 * fails 435 * @since 14.0 436 */ 437 public static <V> ListenableFuture<V> withFallback( 438 ListenableFuture<? extends V> input, 439 FutureFallback<? extends V> fallback, Executor executor) { 440 checkNotNull(fallback); 441 return new FallbackFuture<V>(input, fallback, executor); 442 } 443 444 /** 445 * A future that falls back on a second, generated future, in case its 446 * original future fails. 447 */ 448 private static class FallbackFuture<V> extends AbstractFuture<V> { 449 450 private volatile ListenableFuture<? extends V> running; 451 452 FallbackFuture(ListenableFuture<? extends V> input, 453 final FutureFallback<? extends V> fallback, 454 final Executor executor) { 455 running = input; 456 addCallback(running, new FutureCallback<V>() { 457 @Override 458 public void onSuccess(V value) { 459 set(value); 460 } 461 462 @Override 463 public void onFailure(Throwable t) { 464 if (isCancelled()) { 465 return; 466 } 467 try { 468 running = fallback.create(t); 469 if (isCancelled()) { // in case cancel called in the meantime 470 running.cancel(wasInterrupted()); 471 return; 472 } 473 addCallback(running, new FutureCallback<V>() { 474 @Override 475 public void onSuccess(V value) { 476 set(value); 477 } 478 479 @Override 480 public void onFailure(Throwable t) { 481 if (running.isCancelled()) { 482 cancel(false); 483 } else { 484 setException(t); 485 } 486 } 487 }, sameThreadExecutor()); 488 } catch (Exception e) { 489 setException(e); 490 } catch (Error e) { 491 setException(e); // note: rethrows 492 } 493 } 494 }, executor); 495 } 496 497 @Override 498 public boolean cancel(boolean mayInterruptIfRunning) { 499 if (super.cancel(mayInterruptIfRunning)) { 500 running.cancel(mayInterruptIfRunning); 501 return true; 502 } 503 return false; 504 } 505 } 506 507 /** 508 * Returns a new {@code ListenableFuture} whose result is asynchronously 509 * derived from the result of the given {@code Future}. More precisely, the 510 * returned {@code Future} takes its result from a {@code Future} produced by 511 * applying the given {@code AsyncFunction} to the result of the original 512 * {@code Future}. Example: 513 * 514 * <pre> {@code 515 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 516 * AsyncFunction<RowKey, QueryResult> queryFunction = 517 * new AsyncFunction<RowKey, QueryResult>() { 518 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 519 * return dataService.read(rowKey); 520 * } 521 * }; 522 * ListenableFuture<QueryResult> queryFuture = 523 * transform(rowKeyFuture, queryFunction); 524 * }</pre> 525 * 526 * Note: If the derived {@code Future} is slow or heavyweight to create 527 * (whether the {@code Future} itself is slow or heavyweight to complete is 528 * irrelevant), consider {@linkplain #transform(ListenableFuture, 529 * AsyncFunction, Executor) supplying an executor}. If you do not supply an 530 * executor, {@code transform} will use {@link 531 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 532 * caveats for heavier operations. For example, the call to {@code 533 * function.apply} may run on an unpredictable or undesirable thread: 534 * 535 * <ul> 536 * <li>If the input {@code Future} is done at the time {@code transform} is 537 * called, {@code transform} will call {@code function.apply} inline. 538 * <li>If the input {@code Future} is not yet done, {@code transform} will 539 * schedule {@code function.apply} to be run by the thread that completes the 540 * input {@code Future}, which may be an internal system thread such as an 541 * RPC network thread. 542 * </ul> 543 * 544 * Also note that, regardless of which thread executes {@code 545 * function.apply}, all other registered but unexecuted listeners are 546 * prevented from running during its execution, even if those listeners are 547 * to run in other executors. 548 * 549 * <p>The returned {@code Future} attempts to keep its cancellation state in 550 * sync with that of the input future and that of the future returned by the 551 * function. That is, if the returned {@code Future} is cancelled, it will 552 * attempt to cancel the other two, and if either of the other two is 553 * cancelled, the returned {@code Future} will receive a callback in which it 554 * will attempt to cancel itself. 555 * 556 * @param input The future to transform 557 * @param function A function to transform the result of the input future 558 * to the result of the output future 559 * @return A future that holds result of the function (if the input succeeded) 560 * or the original input's failure (if not) 561 * @since 11.0 562 */ 563 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 564 AsyncFunction<? super I, ? extends O> function) { 565 return transform(input, function, MoreExecutors.sameThreadExecutor()); 566 } 567 568 /** 569 * Returns a new {@code ListenableFuture} whose result is asynchronously 570 * derived from the result of the given {@code Future}. More precisely, the 571 * returned {@code Future} takes its result from a {@code Future} produced by 572 * applying the given {@code AsyncFunction} to the result of the original 573 * {@code Future}. Example: 574 * 575 * <pre> {@code 576 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 577 * AsyncFunction<RowKey, QueryResult> queryFunction = 578 * new AsyncFunction<RowKey, QueryResult>() { 579 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 580 * return dataService.read(rowKey); 581 * } 582 * }; 583 * ListenableFuture<QueryResult> queryFuture = 584 * transform(rowKeyFuture, queryFunction, executor); 585 * }</pre> 586 * 587 * <p>The returned {@code Future} attempts to keep its cancellation state in 588 * sync with that of the input future and that of the future returned by the 589 * chain function. That is, if the returned {@code Future} is cancelled, it 590 * will attempt to cancel the other two, and if either of the other two is 591 * cancelled, the returned {@code Future} will receive a callback in which it 592 * will attempt to cancel itself. 593 * 594 * <p>When the execution of {@code function.apply} is fast and lightweight 595 * (though the {@code Future} it returns need not meet these criteria), 596 * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting 597 * the executor} or explicitly specifying {@code sameThreadExecutor}. 598 * However, be aware of the caveats documented in the link above. 599 * 600 * @param input The future to transform 601 * @param function A function to transform the result of the input future 602 * to the result of the output future 603 * @param executor Executor to run the function in. 604 * @return A future that holds result of the function (if the input succeeded) 605 * or the original input's failure (if not) 606 * @since 11.0 607 */ 608 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 609 AsyncFunction<? super I, ? extends O> function, 610 Executor executor) { 611 ChainingListenableFuture<I, O> output = 612 new ChainingListenableFuture<I, O>(function, input); 613 input.addListener(output, executor); 614 return output; 615 } 616 617 /** 618 * Returns a new {@code ListenableFuture} whose result is the product of 619 * applying the given {@code Function} to the result of the given {@code 620 * Future}. Example: 621 * 622 * <pre> {@code 623 * ListenableFuture<QueryResult> queryFuture = ...; 624 * Function<QueryResult, List<Row>> rowsFunction = 625 * new Function<QueryResult, List<Row>>() { 626 * public List<Row> apply(QueryResult queryResult) { 627 * return queryResult.getRows(); 628 * } 629 * }; 630 * ListenableFuture<List<Row>> rowsFuture = 631 * transform(queryFuture, rowsFunction); 632 * }</pre> 633 * 634 * Note: If the transformation is slow or heavyweight, consider {@linkplain 635 * #transform(ListenableFuture, Function, Executor) supplying an executor}. 636 * If you do not supply an executor, {@code transform} will use {@link 637 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 638 * caveats for heavier operations. For example, the call to {@code 639 * function.apply} may run on an unpredictable or undesirable thread: 640 * 641 * <ul> 642 * <li>If the input {@code Future} is done at the time {@code transform} is 643 * called, {@code transform} will call {@code function.apply} inline. 644 * <li>If the input {@code Future} is not yet done, {@code transform} will 645 * schedule {@code function.apply} to be run by the thread that completes the 646 * input {@code Future}, which may be an internal system thread such as an 647 * RPC network thread. 648 * </ul> 649 * 650 * Also note that, regardless of which thread executes {@code 651 * function.apply}, all other registered but unexecuted listeners are 652 * prevented from running during its execution, even if those listeners are 653 * to run in other executors. 654 * 655 * <p>The returned {@code Future} attempts to keep its cancellation state in 656 * sync with that of the input future. That is, if the returned {@code Future} 657 * is cancelled, it will attempt to cancel the input, and if the input is 658 * cancelled, the returned {@code Future} will receive a callback in which it 659 * will attempt to cancel itself. 660 * 661 * <p>An example use of this method is to convert a serializable object 662 * returned from an RPC into a POJO. 663 * 664 * @param input The future to transform 665 * @param function A Function to transform the results of the provided future 666 * to the results of the returned future. This will be run in the thread 667 * that notifies input it is complete. 668 * @return A future that holds result of the transformation. 669 * @since 9.0 (in 1.0 as {@code compose}) 670 */ 671 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 672 final Function<? super I, ? extends O> function) { 673 return transform(input, function, MoreExecutors.sameThreadExecutor()); 674 } 675 676 /** 677 * Returns a new {@code ListenableFuture} whose result is the product of 678 * applying the given {@code Function} to the result of the given {@code 679 * Future}. Example: 680 * 681 * <pre> {@code 682 * ListenableFuture<QueryResult> queryFuture = ...; 683 * Function<QueryResult, List<Row>> rowsFunction = 684 * new Function<QueryResult, List<Row>>() { 685 * public List<Row> apply(QueryResult queryResult) { 686 * return queryResult.getRows(); 687 * } 688 * }; 689 * ListenableFuture<List<Row>> rowsFuture = 690 * transform(queryFuture, rowsFunction, executor); 691 * }</pre> 692 * 693 * <p>The returned {@code Future} attempts to keep its cancellation state in 694 * sync with that of the input future. That is, if the returned {@code Future} 695 * is cancelled, it will attempt to cancel the input, and if the input is 696 * cancelled, the returned {@code Future} will receive a callback in which it 697 * will attempt to cancel itself. 698 * 699 * <p>An example use of this method is to convert a serializable object 700 * returned from an RPC into a POJO. 701 * 702 * <p>When the transformation is fast and lightweight, consider {@linkplain 703 * #transform(ListenableFuture, Function) omitting the executor} or 704 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 705 * caveats documented in the link above. 706 * 707 * @param input The future to transform 708 * @param function A Function to transform the results of the provided future 709 * to the results of the returned future. 710 * @param executor Executor to run the function in. 711 * @return A future that holds result of the transformation. 712 * @since 9.0 (in 2.0 as {@code compose}) 713 */ 714 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 715 final Function<? super I, ? extends O> function, Executor executor) { 716 checkNotNull(function); 717 AsyncFunction<I, O> wrapperFunction 718 = new AsyncFunction<I, O>() { 719 @Override public ListenableFuture<O> apply(I input) { 720 O output = function.apply(input); 721 return immediateFuture(output); 722 } 723 }; 724 return transform(input, wrapperFunction, executor); 725 } 726 727 /** 728 * Like {@link #transform(ListenableFuture, Function)} except that the 729 * transformation {@code function} is invoked on each call to 730 * {@link Future#get() get()} on the returned future. 731 * 732 * <p>The returned {@code Future} reflects the input's cancellation 733 * state directly, and any attempt to cancel the returned Future is likewise 734 * passed through to the input Future. 735 * 736 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 737 * only apply the timeout to the execution of the underlying {@code Future}, 738 * <em>not</em> to the execution of the transformation function. 739 * 740 * <p>The primary audience of this method is callers of {@code transform} 741 * who don't have a {@code ListenableFuture} available and 742 * do not mind repeated, lazy function evaluation. 743 * 744 * @param input The future to transform 745 * @param function A Function to transform the results of the provided future 746 * to the results of the returned future. 747 * @return A future that returns the result of the transformation. 748 * @since 10.0 749 */ 750 @Beta 751 public static <I, O> Future<O> lazyTransform(final Future<I> input, 752 final Function<? super I, ? extends O> function) { 753 checkNotNull(input); 754 checkNotNull(function); 755 return new Future<O>() { 756 757 @Override 758 public boolean cancel(boolean mayInterruptIfRunning) { 759 return input.cancel(mayInterruptIfRunning); 760 } 761 762 @Override 763 public boolean isCancelled() { 764 return input.isCancelled(); 765 } 766 767 @Override 768 public boolean isDone() { 769 return input.isDone(); 770 } 771 772 @Override 773 public O get() throws InterruptedException, ExecutionException { 774 return applyTransformation(input.get()); 775 } 776 777 @Override 778 public O get(long timeout, TimeUnit unit) 779 throws InterruptedException, ExecutionException, TimeoutException { 780 return applyTransformation(input.get(timeout, unit)); 781 } 782 783 private O applyTransformation(I input) throws ExecutionException { 784 try { 785 return function.apply(input); 786 } catch (Throwable t) { 787 throw new ExecutionException(t); 788 } 789 } 790 }; 791 } 792 793 /** 794 * An implementation of {@code ListenableFuture} that also implements 795 * {@code Runnable} so that it can be used to nest ListenableFutures. 796 * Once the passed-in {@code ListenableFuture} is complete, it calls the 797 * passed-in {@code Function} to generate the result. 798 * 799 * <p>If the function throws any checked exceptions, they should be wrapped 800 * in a {@code UndeclaredThrowableException} so that this class can get 801 * access to the cause. 802 */ 803 private static class ChainingListenableFuture<I, O> 804 extends AbstractFuture<O> implements Runnable { 805 806 private AsyncFunction<? super I, ? extends O> function; 807 private ListenableFuture<? extends I> inputFuture; 808 private volatile ListenableFuture<? extends O> outputFuture; 809 private final CountDownLatch outputCreated = new CountDownLatch(1); 810 811 private ChainingListenableFuture( 812 AsyncFunction<? super I, ? extends O> function, 813 ListenableFuture<? extends I> inputFuture) { 814 this.function = checkNotNull(function); 815 this.inputFuture = checkNotNull(inputFuture); 816 } 817 818 @Override 819 public boolean cancel(boolean mayInterruptIfRunning) { 820 /* 821 * Our additional cancellation work needs to occur even if 822 * !mayInterruptIfRunning, so we can't move it into interruptTask(). 823 */ 824 if (super.cancel(mayInterruptIfRunning)) { 825 // This should never block since only one thread is allowed to cancel 826 // this Future. 827 cancel(inputFuture, mayInterruptIfRunning); 828 cancel(outputFuture, mayInterruptIfRunning); 829 return true; 830 } 831 return false; 832 } 833 834 private void cancel(@Nullable Future<?> future, 835 boolean mayInterruptIfRunning) { 836 if (future != null) { 837 future.cancel(mayInterruptIfRunning); 838 } 839 } 840 841 @Override 842 public void run() { 843 try { 844 I sourceResult; 845 try { 846 sourceResult = getUninterruptibly(inputFuture); 847 } catch (CancellationException e) { 848 // Cancel this future and return. 849 // At this point, inputFuture is cancelled and outputFuture doesn't 850 // exist, so the value of mayInterruptIfRunning is irrelevant. 851 cancel(false); 852 return; 853 } catch (ExecutionException e) { 854 // Set the cause of the exception as this future's exception 855 setException(e.getCause()); 856 return; 857 } 858 859 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 860 function.apply(sourceResult); 861 if (isCancelled()) { 862 outputFuture.cancel(wasInterrupted()); 863 this.outputFuture = null; 864 return; 865 } 866 outputFuture.addListener(new Runnable() { 867 @Override 868 public void run() { 869 try { 870 // Here it would have been nice to have had an 871 // UninterruptibleListenableFuture, but we don't want to start a 872 // combinatorial explosion of interfaces, so we have to make do. 873 set(getUninterruptibly(outputFuture)); 874 } catch (CancellationException e) { 875 // Cancel this future and return. 876 // At this point, inputFuture and outputFuture are done, so the 877 // value of mayInterruptIfRunning is irrelevant. 878 cancel(false); 879 return; 880 } catch (ExecutionException e) { 881 // Set the cause of the exception as this future's exception 882 setException(e.getCause()); 883 } finally { 884 // Don't pin inputs beyond completion 885 ChainingListenableFuture.this.outputFuture = null; 886 } 887 } 888 }, MoreExecutors.sameThreadExecutor()); 889 } catch (UndeclaredThrowableException e) { 890 // Set the cause of the exception as this future's exception 891 setException(e.getCause()); 892 } catch (Exception e) { 893 // This exception is irrelevant in this thread, but useful for the 894 // client 895 setException(e); 896 } catch (Error e) { 897 // Propagate errors up ASAP - our superclass will rethrow the error 898 setException(e); 899 } finally { 900 // Don't pin inputs beyond completion 901 function = null; 902 inputFuture = null; 903 // Allow our get routines to examine outputFuture now. 904 outputCreated.countDown(); 905 } 906 } 907 } 908 909 /** 910 * Returns a new {@code ListenableFuture} whose result is the product of 911 * calling {@code get()} on the {@code Future} nested within the given {@code 912 * Future}, effectively chaining the futures one after the other. Example: 913 * 914 * <pre> {@code 915 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 916 * ListenableFuture<String> dereferenced = dereference(nested); 917 * }</pre> 918 * 919 * <p>This call has the same cancellation and execution semantics as {@link 920 * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code 921 * Future} attempts to keep its cancellation state in sync with both the 922 * input {@code Future} and the nested {@code Future}. The transformation 923 * is very lightweight and therefore takes place in the thread that called 924 * {@code dereference}. 925 * 926 * @param nested The nested future to transform. 927 * @return A future that holds result of the inner future. 928 * @since 13.0 929 */ 930 @Beta 931 @SuppressWarnings({"rawtypes", "unchecked"}) 932 public static <V> ListenableFuture<V> dereference( 933 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 934 return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); 935 } 936 937 /** 938 * Helper {@code Function} for {@link #dereference}. 939 */ 940 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 941 new AsyncFunction<ListenableFuture<Object>, Object>() { 942 @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 943 return input; 944 } 945 }; 946 947 /** 948 * Creates a new {@code ListenableFuture} whose value is a list containing the 949 * values of all its input futures, if all succeed. If any input fails, the 950 * returned future fails. 951 * 952 * <p>The list of results is in the same order as the input list. 953 * 954 * <p>Canceling this future will attempt to cancel all the component futures, 955 * and if any of the provided futures fails or is canceled, this one is, 956 * too. 957 * 958 * @param futures futures to combine 959 * @return a future that provides a list of the results of the component 960 * futures 961 * @since 10.0 962 */ 963 @Beta 964 public static <V> ListenableFuture<List<V>> allAsList( 965 ListenableFuture<? extends V>... futures) { 966 return listFuture(ImmutableList.copyOf(futures), true, 967 MoreExecutors.sameThreadExecutor()); 968 } 969 970 /** 971 * Creates a new {@code ListenableFuture} whose value is a list containing the 972 * values of all its input futures, if all succeed. If any input fails, the 973 * returned future fails. 974 * 975 * <p>The list of results is in the same order as the input list. 976 * 977 * <p>Canceling this future will attempt to cancel all the component futures, 978 * and if any of the provided futures fails or is canceled, this one is, 979 * too. 980 * 981 * @param futures futures to combine 982 * @return a future that provides a list of the results of the component 983 * futures 984 * @since 10.0 985 */ 986 @Beta 987 public static <V> ListenableFuture<List<V>> allAsList( 988 Iterable<? extends ListenableFuture<? extends V>> futures) { 989 return listFuture(ImmutableList.copyOf(futures), true, 990 MoreExecutors.sameThreadExecutor()); 991 } 992 993 /** 994 * Creates a new {@code ListenableFuture} whose value is a list containing the 995 * values of all its successful input futures. The list of results is in the 996 * same order as the input list, and if any of the provided futures fails or 997 * is canceled, its corresponding position will contain {@code null} (which is 998 * indistinguishable from the future having a successful value of 999 * {@code null}). 1000 * 1001 * <p>Canceling this future will attempt to cancel all the component futures. 1002 * 1003 * @param futures futures to combine 1004 * @return a future that provides a list of the results of the component 1005 * futures 1006 * @since 10.0 1007 */ 1008 @Beta 1009 public static <V> ListenableFuture<List<V>> successfulAsList( 1010 ListenableFuture<? extends V>... futures) { 1011 return listFuture(ImmutableList.copyOf(futures), false, 1012 MoreExecutors.sameThreadExecutor()); 1013 } 1014 1015 /** 1016 * Creates a new {@code ListenableFuture} whose value is a list containing the 1017 * values of all its successful input futures. The list of results is in the 1018 * same order as the input list, and if any of the provided futures fails or 1019 * is canceled, its corresponding position will contain {@code null} (which is 1020 * indistinguishable from the future having a successful value of 1021 * {@code null}). 1022 * 1023 * <p>Canceling this future will attempt to cancel all the component futures. 1024 * 1025 * @param futures futures to combine 1026 * @return a future that provides a list of the results of the component 1027 * futures 1028 * @since 10.0 1029 */ 1030 @Beta 1031 public static <V> ListenableFuture<List<V>> successfulAsList( 1032 Iterable<? extends ListenableFuture<? extends V>> futures) { 1033 return listFuture(ImmutableList.copyOf(futures), false, 1034 MoreExecutors.sameThreadExecutor()); 1035 } 1036 1037 /** 1038 * Registers separate success and failure callbacks to be run when the {@code 1039 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1040 * complete} or, if the computation is already complete, immediately. 1041 * 1042 * <p>There is no guaranteed ordering of execution of callbacks, but any 1043 * callback added through this method is guaranteed to be called once the 1044 * computation is complete. 1045 * 1046 * Example: <pre> {@code 1047 * ListenableFuture<QueryResult> future = ...; 1048 * addCallback(future, 1049 * new FutureCallback<QueryResult> { 1050 * public void onSuccess(QueryResult result) { 1051 * storeInCache(result); 1052 * } 1053 * public void onFailure(Throwable t) { 1054 * reportError(t); 1055 * } 1056 * });}</pre> 1057 * 1058 * Note: If the callback is slow or heavyweight, consider {@linkplain 1059 * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an 1060 * executor}. If you do not supply an executor, {@code addCallback} will use 1061 * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries 1062 * some caveats for heavier operations. For example, the callback may run on 1063 * an unpredictable or undesirable thread: 1064 * 1065 * <ul> 1066 * <li>If the input {@code Future} is done at the time {@code addCallback} is 1067 * called, {@code addCallback} will execute the callback inline. 1068 * <li>If the input {@code Future} is not yet done, {@code addCallback} will 1069 * schedule the callback to be run by the thread that completes the input 1070 * {@code Future}, which may be an internal system thread such as an RPC 1071 * network thread. 1072 * </ul> 1073 * 1074 * Also note that, regardless of which thread executes the callback, all 1075 * other registered but unexecuted listeners are prevented from running 1076 * during its execution, even if those listeners are to run in other 1077 * executors. 1078 * 1079 * <p>For a more general interface to attach a completion listener to a 1080 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1081 * 1082 * @param future The future attach the callback to. 1083 * @param callback The callback to invoke when {@code future} is completed. 1084 * @since 10.0 1085 */ 1086 public static <V> void addCallback(ListenableFuture<V> future, 1087 FutureCallback<? super V> callback) { 1088 addCallback(future, callback, MoreExecutors.sameThreadExecutor()); 1089 } 1090 1091 /** 1092 * Registers separate success and failure callbacks to be run when the {@code 1093 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1094 * complete} or, if the computation is already complete, immediately. 1095 * 1096 * <p>The callback is run in {@code executor}. 1097 * There is no guaranteed ordering of execution of callbacks, but any 1098 * callback added through this method is guaranteed to be called once the 1099 * computation is complete. 1100 * 1101 * Example: <pre> {@code 1102 * ListenableFuture<QueryResult> future = ...; 1103 * Executor e = ... 1104 * addCallback(future, e, 1105 * new FutureCallback<QueryResult> { 1106 * public void onSuccess(QueryResult result) { 1107 * storeInCache(result); 1108 * } 1109 * public void onFailure(Throwable t) { 1110 * reportError(t); 1111 * } 1112 * });}</pre> 1113 * 1114 * When the callback is fast and lightweight, consider {@linkplain 1115 * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or 1116 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 1117 * caveats documented in the link above. 1118 * 1119 * <p>For a more general interface to attach a completion listener to a 1120 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1121 * 1122 * @param future The future attach the callback to. 1123 * @param callback The callback to invoke when {@code future} is completed. 1124 * @param executor The executor to run {@code callback} when the future 1125 * completes. 1126 * @since 10.0 1127 */ 1128 public static <V> void addCallback(final ListenableFuture<V> future, 1129 final FutureCallback<? super V> callback, Executor executor) { 1130 Preconditions.checkNotNull(callback); 1131 Runnable callbackListener = new Runnable() { 1132 @Override 1133 public void run() { 1134 final V value; 1135 try { 1136 // TODO(user): (Before Guava release), validate that this 1137 // is the thing for IE. 1138 value = getUninterruptibly(future); 1139 } catch (ExecutionException e) { 1140 callback.onFailure(e.getCause()); 1141 return; 1142 } catch (RuntimeException e) { 1143 callback.onFailure(e); 1144 return; 1145 } catch (Error e) { 1146 callback.onFailure(e); 1147 return; 1148 } 1149 callback.onSuccess(value); 1150 } 1151 }; 1152 future.addListener(callbackListener, executor); 1153 } 1154 1155 /** 1156 * Returns the result of {@link Future#get()}, converting most exceptions to a 1157 * new instance of the given checked exception type. This reduces boilerplate 1158 * for a common use of {@code Future} in which it is unnecessary to 1159 * programmatically distinguish between exception types or to extract other 1160 * information from the exception instance. 1161 * 1162 * <p>Exceptions from {@code Future.get} are treated as follows: 1163 * <ul> 1164 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1165 * {@code X} if the cause is a checked exception, an {@link 1166 * UncheckedExecutionException} if the cause is a {@code 1167 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1168 * {@code Error}. 1169 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1170 * restoring the interrupt). 1171 * <li>Any {@link CancellationException} is propagated untouched, as is any 1172 * other {@link RuntimeException} (though {@code get} implementations are 1173 * discouraged from throwing such exceptions). 1174 * </ul> 1175 * 1176 * The overall principle is to continue to treat every checked exception as a 1177 * checked exception, every unchecked exception as an unchecked exception, and 1178 * every error as an error. In addition, the cause of any {@code 1179 * ExecutionException} is wrapped in order to ensure that the new stack trace 1180 * matches that of the current thread. 1181 * 1182 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1183 * public constructor that accepts zero or more arguments, all of type {@code 1184 * String} or {@code Throwable} (preferring constructors with at least one 1185 * {@code String}) and calling the constructor via reflection. If the 1186 * exception did not already have a cause, one is set by calling {@link 1187 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1188 * {@code IllegalArgumentException} is thrown. 1189 * 1190 * @throws X if {@code get} throws any checked exception except for an {@code 1191 * ExecutionException} whose cause is not itself a checked exception 1192 * @throws UncheckedExecutionException if {@code get} throws an {@code 1193 * ExecutionException} with a {@code RuntimeException} as its cause 1194 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1195 * with an {@code Error} as its cause 1196 * @throws CancellationException if {@code get} throws a {@code 1197 * CancellationException} 1198 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1199 * RuntimeException} or does not have a suitable constructor 1200 * @since 10.0 1201 */ 1202 @Beta 1203 public static <V, X extends Exception> V get( 1204 Future<V> future, Class<X> exceptionClass) throws X { 1205 checkNotNull(future); 1206 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1207 "Futures.get exception type (%s) must not be a RuntimeException", 1208 exceptionClass); 1209 try { 1210 return future.get(); 1211 } catch (InterruptedException e) { 1212 currentThread().interrupt(); 1213 throw newWithCause(exceptionClass, e); 1214 } catch (ExecutionException e) { 1215 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1216 throw new AssertionError(); 1217 } 1218 } 1219 1220 /** 1221 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1222 * exceptions to a new instance of the given checked exception type. This 1223 * reduces boilerplate for a common use of {@code Future} in which it is 1224 * unnecessary to programmatically distinguish between exception types or to 1225 * extract other information from the exception instance. 1226 * 1227 * <p>Exceptions from {@code Future.get} are treated as follows: 1228 * <ul> 1229 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1230 * {@code X} if the cause is a checked exception, an {@link 1231 * UncheckedExecutionException} if the cause is a {@code 1232 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1233 * {@code Error}. 1234 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1235 * restoring the interrupt). 1236 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1237 * <li>Any {@link CancellationException} is propagated untouched, as is any 1238 * other {@link RuntimeException} (though {@code get} implementations are 1239 * discouraged from throwing such exceptions). 1240 * </ul> 1241 * 1242 * The overall principle is to continue to treat every checked exception as a 1243 * checked exception, every unchecked exception as an unchecked exception, and 1244 * every error as an error. In addition, the cause of any {@code 1245 * ExecutionException} is wrapped in order to ensure that the new stack trace 1246 * matches that of the current thread. 1247 * 1248 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1249 * public constructor that accepts zero or more arguments, all of type {@code 1250 * String} or {@code Throwable} (preferring constructors with at least one 1251 * {@code String}) and calling the constructor via reflection. If the 1252 * exception did not already have a cause, one is set by calling {@link 1253 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1254 * {@code IllegalArgumentException} is thrown. 1255 * 1256 * @throws X if {@code get} throws any checked exception except for an {@code 1257 * ExecutionException} whose cause is not itself a checked exception 1258 * @throws UncheckedExecutionException if {@code get} throws an {@code 1259 * ExecutionException} with a {@code RuntimeException} as its cause 1260 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1261 * with an {@code Error} as its cause 1262 * @throws CancellationException if {@code get} throws a {@code 1263 * CancellationException} 1264 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1265 * RuntimeException} or does not have a suitable constructor 1266 * @since 10.0 1267 */ 1268 @Beta 1269 public static <V, X extends Exception> V get( 1270 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 1271 throws X { 1272 checkNotNull(future); 1273 checkNotNull(unit); 1274 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1275 "Futures.get exception type (%s) must not be a RuntimeException", 1276 exceptionClass); 1277 try { 1278 return future.get(timeout, unit); 1279 } catch (InterruptedException e) { 1280 currentThread().interrupt(); 1281 throw newWithCause(exceptionClass, e); 1282 } catch (TimeoutException e) { 1283 throw newWithCause(exceptionClass, e); 1284 } catch (ExecutionException e) { 1285 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1286 throw new AssertionError(); 1287 } 1288 } 1289 1290 private static <X extends Exception> void wrapAndThrowExceptionOrError( 1291 Throwable cause, Class<X> exceptionClass) throws X { 1292 if (cause instanceof Error) { 1293 throw new ExecutionError((Error) cause); 1294 } 1295 if (cause instanceof RuntimeException) { 1296 throw new UncheckedExecutionException(cause); 1297 } 1298 throw newWithCause(exceptionClass, cause); 1299 } 1300 1301 /** 1302 * Returns the result of calling {@link Future#get()} uninterruptibly on a 1303 * task known not to throw a checked exception. This makes {@code Future} more 1304 * suitable for lightweight, fast-running tasks that, barring bugs in the 1305 * code, will not fail. This gives it exception-handling behavior similar to 1306 * that of {@code ForkJoinTask.join}. 1307 * 1308 * <p>Exceptions from {@code Future.get} are treated as follows: 1309 * <ul> 1310 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1311 * {@link UncheckedExecutionException} (if the cause is an {@code 1312 * Exception}) or {@link ExecutionError} (if the cause is an {@code 1313 * Error}). 1314 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 1315 * call. The interrupt is restored before {@code getUnchecked} returns. 1316 * <li>Any {@link CancellationException} is propagated untouched. So is any 1317 * other {@link RuntimeException} ({@code get} implementations are 1318 * discouraged from throwing such exceptions). 1319 * </ul> 1320 * 1321 * The overall principle is to eliminate all checked exceptions: to loop to 1322 * avoid {@code InterruptedException}, to pass through {@code 1323 * CancellationException}, and to wrap any exception from the underlying 1324 * computation in an {@code UncheckedExecutionException} or {@code 1325 * ExecutionError}. 1326 * 1327 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 1328 * {@link Uninterruptibles#getUninterruptibly(Future)}. 1329 * 1330 * @throws UncheckedExecutionException if {@code get} throws an {@code 1331 * ExecutionException} with an {@code Exception} as its cause 1332 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1333 * with an {@code Error} as its cause 1334 * @throws CancellationException if {@code get} throws a {@code 1335 * CancellationException} 1336 * @since 10.0 1337 */ 1338 @Beta 1339 public static <V> V getUnchecked(Future<V> future) { 1340 checkNotNull(future); 1341 try { 1342 return getUninterruptibly(future); 1343 } catch (ExecutionException e) { 1344 wrapAndThrowUnchecked(e.getCause()); 1345 throw new AssertionError(); 1346 } 1347 } 1348 1349 private static void wrapAndThrowUnchecked(Throwable cause) { 1350 if (cause instanceof Error) { 1351 throw new ExecutionError((Error) cause); 1352 } 1353 /* 1354 * It's a non-Error, non-Exception Throwable. From my survey of such 1355 * classes, I believe that most users intended to extend Exception, so we'll 1356 * treat it like an Exception. 1357 */ 1358 throw new UncheckedExecutionException(cause); 1359 } 1360 1361 /* 1362 * TODO(user): FutureChecker interface for these to be static methods on? If 1363 * so, refer to it in the (static-method) Futures.get documentation 1364 */ 1365 1366 /* 1367 * Arguably we don't need a timed getUnchecked because any operation slow 1368 * enough to require a timeout is heavyweight enough to throw a checked 1369 * exception and therefore be inappropriate to use with getUnchecked. Further, 1370 * it's not clear that converting the checked TimeoutException to a 1371 * RuntimeException -- especially to an UncheckedExecutionException, since it 1372 * wasn't thrown by the computation -- makes sense, and if we don't convert 1373 * it, the user still has to write a try-catch block. 1374 * 1375 * If you think you would use this method, let us know. 1376 */ 1377 1378 private static <X extends Exception> X newWithCause( 1379 Class<X> exceptionClass, Throwable cause) { 1380 // getConstructors() guarantees this as long as we don't modify the array. 1381 @SuppressWarnings("unchecked") 1382 List<Constructor<X>> constructors = 1383 (List) Arrays.asList(exceptionClass.getConstructors()); 1384 for (Constructor<X> constructor : preferringStrings(constructors)) { 1385 @Nullable X instance = newFromConstructor(constructor, cause); 1386 if (instance != null) { 1387 if (instance.getCause() == null) { 1388 instance.initCause(cause); 1389 } 1390 return instance; 1391 } 1392 } 1393 throw new IllegalArgumentException( 1394 "No appropriate constructor for exception of type " + exceptionClass 1395 + " in response to chained exception", cause); 1396 } 1397 1398 private static <X extends Exception> List<Constructor<X>> 1399 preferringStrings(List<Constructor<X>> constructors) { 1400 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); 1401 } 1402 1403 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST = 1404 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() { 1405 @Override public Boolean apply(Constructor<?> input) { 1406 return asList(input.getParameterTypes()).contains(String.class); 1407 } 1408 }).reverse(); 1409 1410 @Nullable private static <X> X newFromConstructor( 1411 Constructor<X> constructor, Throwable cause) { 1412 Class<?>[] paramTypes = constructor.getParameterTypes(); 1413 Object[] params = new Object[paramTypes.length]; 1414 for (int i = 0; i < paramTypes.length; i++) { 1415 Class<?> paramType = paramTypes[i]; 1416 if (paramType.equals(String.class)) { 1417 params[i] = cause.toString(); 1418 } else if (paramType.equals(Throwable.class)) { 1419 params[i] = cause; 1420 } else { 1421 return null; 1422 } 1423 } 1424 try { 1425 return constructor.newInstance(params); 1426 } catch (IllegalArgumentException e) { 1427 return null; 1428 } catch (InstantiationException e) { 1429 return null; 1430 } catch (IllegalAccessException e) { 1431 return null; 1432 } catch (InvocationTargetException e) { 1433 return null; 1434 } 1435 } 1436 1437 private interface FutureCombiner<V, C> { 1438 C combine(List<Optional<V>> values); 1439 } 1440 1441 private static class CombinedFuture<V, C> extends AbstractFuture<C> { 1442 ImmutableCollection<? extends ListenableFuture<? extends V>> futures; 1443 final boolean allMustSucceed; 1444 final AtomicInteger remaining; 1445 FutureCombiner<V, C> combiner; 1446 List<Optional<V>> values; 1447 1448 CombinedFuture( 1449 ImmutableCollection<? extends ListenableFuture<? extends V>> futures, 1450 boolean allMustSucceed, Executor listenerExecutor, 1451 FutureCombiner<V, C> combiner) { 1452 this.futures = futures; 1453 this.allMustSucceed = allMustSucceed; 1454 this.remaining = new AtomicInteger(futures.size()); 1455 this.combiner = combiner; 1456 this.values = Lists.newArrayListWithCapacity(futures.size()); 1457 init(listenerExecutor); 1458 } 1459 1460 /** 1461 * Must be called at the end of the constructor. 1462 */ 1463 protected void init(final Executor listenerExecutor) { 1464 // First, schedule cleanup to execute when the Future is done. 1465 addListener(new Runnable() { 1466 @Override 1467 public void run() { 1468 // Cancel all the component futures. 1469 if (CombinedFuture.this.isCancelled()) { 1470 for (ListenableFuture<?> future : CombinedFuture.this.futures) { 1471 future.cancel(CombinedFuture.this.wasInterrupted()); 1472 } 1473 } 1474 1475 // By now the values array has either been set as the Future's value, 1476 // or (in case of failure) is no longer useful. 1477 CombinedFuture.this.futures = null; 1478 1479 // Let go of the memory held by other futures 1480 CombinedFuture.this.values = null; 1481 1482 // The combiner may also hold state, so free that as well 1483 CombinedFuture.this.combiner = null; 1484 } 1485 }, MoreExecutors.sameThreadExecutor()); 1486 1487 // Now begin the "real" initialization. 1488 1489 // Corner case: List is empty. 1490 if (futures.isEmpty()) { 1491 set(combiner.combine(ImmutableList.<Optional<V>>of())); 1492 return; 1493 } 1494 1495 // Populate the results list with null initially. 1496 for (int i = 0; i < futures.size(); ++i) { 1497 values.add(null); 1498 } 1499 1500 // Register a listener on each Future in the list to update 1501 // the state of this future. 1502 // Note that if all the futures on the list are done prior to completing 1503 // this loop, the last call to addListener() will callback to 1504 // setOneValue(), transitively call our cleanup listener, and set 1505 // this.futures to null. 1506 // This is not actually a problem, since the foreach only needs 1507 // this.futures to be non-null at the beginning of the loop. 1508 int i = 0; 1509 for (final ListenableFuture<? extends V> listenable : futures) { 1510 final int index = i++; 1511 listenable.addListener(new Runnable() { 1512 @Override 1513 public void run() { 1514 setOneValue(index, listenable); 1515 } 1516 }, listenerExecutor); 1517 } 1518 } 1519 1520 /** 1521 * Sets the value at the given index to that of the given future. 1522 */ 1523 private void setOneValue(int index, Future<? extends V> future) { 1524 List<Optional<V>> localValues = values; 1525 if (isDone() || localValues == null) { 1526 // Some other future failed or has been cancelled, causing this one to 1527 // also be cancelled or have an exception set. This should only happen 1528 // if allMustSucceed is true or if the output itself has been cancelled. 1529 checkState(allMustSucceed || isCancelled(), 1530 "Future was done before all dependencies completed"); 1531 return; 1532 } 1533 1534 try { 1535 checkState(future.isDone(), 1536 "Tried to set value from future which is not done"); 1537 V returnValue = getUninterruptibly(future); 1538 localValues.set(index, Optional.fromNullable(returnValue)); 1539 } catch (CancellationException e) { 1540 if (allMustSucceed) { 1541 // Set ourselves as cancelled. Let the input futures keep running 1542 // as some of them may be used elsewhere. 1543 // (Currently we don't override interruptTask, so 1544 // mayInterruptIfRunning==false isn't technically necessary.) 1545 cancel(false); 1546 } 1547 } catch (ExecutionException e) { 1548 if (allMustSucceed) { 1549 // As soon as the first one fails, throw the exception up. 1550 // The result of all other inputs is then ignored. 1551 setException(e.getCause()); 1552 } 1553 } catch (RuntimeException e) { 1554 if (allMustSucceed) { 1555 setException(e); 1556 } 1557 } catch (Error e) { 1558 // Propagate errors up ASAP - our superclass will rethrow the error 1559 setException(e); 1560 } finally { 1561 int newRemaining = remaining.decrementAndGet(); 1562 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 1563 if (newRemaining == 0) { 1564 FutureCombiner<V, C> localCombiner = combiner; 1565 if (localCombiner != null) { 1566 set(localCombiner.combine(localValues)); 1567 } else { 1568 checkState(isDone()); 1569 } 1570 } 1571 } 1572 } 1573 1574 } 1575 1576 /** Used for {@link #allAsList} and {@link #successfulAsList}. */ 1577 private static <V> ListenableFuture<List<V>> listFuture( 1578 ImmutableList<ListenableFuture<? extends V>> futures, 1579 boolean allMustSucceed, Executor listenerExecutor) { 1580 return new CombinedFuture<V, List<V>>( 1581 futures, allMustSucceed, listenerExecutor, 1582 new FutureCombiner<V, List<V>>() { 1583 @Override 1584 public List<V> combine(List<Optional<V>> values) { 1585 List<V> result = Lists.newArrayList(); 1586 for (Optional<V> element : values) { 1587 result.add(element != null ? element.orNull() : null); 1588 } 1589 // TODO(user): This should ultimately return an unmodifiableList 1590 return result; 1591 } 1592 }); 1593 } 1594 1595 /** 1596 * A checked future that uses a function to map from exceptions to the 1597 * appropriate checked type. 1598 */ 1599 private static class MappingCheckedFuture<V, X extends Exception> extends 1600 AbstractCheckedFuture<V, X> { 1601 1602 final Function<Exception, X> mapper; 1603 1604 MappingCheckedFuture(ListenableFuture<V> delegate, 1605 Function<Exception, X> mapper) { 1606 super(delegate); 1607 1608 this.mapper = checkNotNull(mapper); 1609 } 1610 1611 @Override 1612 protected X mapException(Exception e) { 1613 return mapper.apply(e); 1614 } 1615 } 1616}