001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.base.Preconditions.checkState; 022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; 023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 024import static java.lang.Thread.currentThread; 025import static java.util.Arrays.asList; 026 027import com.google.common.annotations.Beta; 028import com.google.common.base.Function; 029import com.google.common.base.Optional; 030import com.google.common.base.Preconditions; 031import com.google.common.collect.ImmutableCollection; 032import com.google.common.collect.ImmutableList; 033import com.google.common.collect.Lists; 034import com.google.common.collect.Ordering; 035 036import java.lang.reflect.Constructor; 037import java.lang.reflect.InvocationTargetException; 038import java.lang.reflect.UndeclaredThrowableException; 039import java.util.Arrays; 040import java.util.List; 041import java.util.concurrent.CancellationException; 042import java.util.concurrent.CountDownLatch; 043import java.util.concurrent.ExecutionException; 044import java.util.concurrent.Executor; 045import java.util.concurrent.Future; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.TimeoutException; 048import java.util.concurrent.atomic.AtomicInteger; 049import java.util.logging.Level; 050import java.util.logging.Logger; 051 052import javax.annotation.Nullable; 053 054/** 055 * Static utility methods pertaining to the {@link Future} interface. 056 * 057 * <p>Many of these methods use the {@link ListenableFuture} API; consult the 058 * Guava User Guide article on <a href= 059 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> 060 * {@code ListenableFuture}</a>. 061 * 062 * @author Kevin Bourrillion 063 * @author Nishant Thakkar 064 * @author Sven Mawson 065 * @since 1.0 066 */ 067public final class Futures { 068 private Futures() {} 069 070 /** 071 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 072 * and a {@link Function} that maps from {@link Exception} instances into the 073 * appropriate checked type. 074 * 075 * <p>The given mapping function will be applied to an 076 * {@link InterruptedException}, a {@link CancellationException}, or an 077 * {@link ExecutionException}. 078 * See {@link Future#get()} for details on the exceptions thrown. 079 * 080 * @since 9.0 (source-compatible since 1.0) 081 */ 082 @Beta 083 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 084 ListenableFuture<V> future, Function<Exception, X> mapper) { 085 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 086 } 087 088 private abstract static class ImmediateFuture<V> 089 implements ListenableFuture<V> { 090 091 private static final Logger log = 092 Logger.getLogger(ImmediateFuture.class.getName()); 093 094 @Override 095 public void addListener(Runnable listener, Executor executor) { 096 checkNotNull(listener, "Runnable was null."); 097 checkNotNull(executor, "Executor was null."); 098 try { 099 executor.execute(listener); 100 } catch (RuntimeException e) { 101 // ListenableFuture's contract is that it will not throw unchecked 102 // exceptions, so log the bad runnable and/or executor and swallow it. 103 log.log(Level.SEVERE, "RuntimeException while executing runnable " 104 + listener + " with executor " + executor, e); 105 } 106 } 107 108 @Override 109 public boolean cancel(boolean mayInterruptIfRunning) { 110 return false; 111 } 112 113 @Override 114 public abstract V get() throws ExecutionException; 115 116 @Override 117 public V get(long timeout, TimeUnit unit) throws ExecutionException { 118 checkNotNull(unit); 119 return get(); 120 } 121 122 @Override 123 public boolean isCancelled() { 124 return false; 125 } 126 127 @Override 128 public boolean isDone() { 129 return true; 130 } 131 } 132 133 private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { 134 135 @Nullable private final V value; 136 137 ImmediateSuccessfulFuture(@Nullable V value) { 138 this.value = value; 139 } 140 141 @Override 142 public V get() { 143 return value; 144 } 145 } 146 147 private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception> 148 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 149 150 @Nullable private final V value; 151 152 ImmediateSuccessfulCheckedFuture(@Nullable V value) { 153 this.value = value; 154 } 155 156 @Override 157 public V get() { 158 return value; 159 } 160 161 @Override 162 public V checkedGet() { 163 return value; 164 } 165 166 @Override 167 public V checkedGet(long timeout, TimeUnit unit) { 168 checkNotNull(unit); 169 return value; 170 } 171 } 172 173 private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { 174 175 private final Throwable thrown; 176 177 ImmediateFailedFuture(Throwable thrown) { 178 this.thrown = thrown; 179 } 180 181 @Override 182 public V get() throws ExecutionException { 183 throw new ExecutionException(thrown); 184 } 185 } 186 187 private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> { 188 189 private final CancellationException thrown; 190 191 ImmediateCancelledFuture() { 192 this.thrown = new CancellationException("Immediate cancelled future."); 193 } 194 195 @Override 196 public boolean isCancelled() { 197 return true; 198 } 199 200 @Override 201 public V get() { 202 throw AbstractFuture.cancellationExceptionWithCause( 203 "Task was cancelled.", thrown); 204 } 205 } 206 207 private static class ImmediateFailedCheckedFuture<V, X extends Exception> 208 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 209 210 private final X thrown; 211 212 ImmediateFailedCheckedFuture(X thrown) { 213 this.thrown = thrown; 214 } 215 216 @Override 217 public V get() throws ExecutionException { 218 throw new ExecutionException(thrown); 219 } 220 221 @Override 222 public V checkedGet() throws X { 223 throw thrown; 224 } 225 226 @Override 227 public V checkedGet(long timeout, TimeUnit unit) throws X { 228 checkNotNull(unit); 229 throw thrown; 230 } 231 } 232 233 /** 234 * Creates a {@code ListenableFuture} which has its value set immediately upon 235 * construction. The getters just return the value. This {@code Future} can't 236 * be canceled or timed out and its {@code isDone()} method always returns 237 * {@code true}. 238 */ 239 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 240 return new ImmediateSuccessfulFuture<V>(value); 241 } 242 243 /** 244 * Returns a {@code CheckedFuture} which has its value set immediately upon 245 * construction. 246 * 247 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 248 * method always returns {@code true}. Calling {@code get()} or {@code 249 * checkedGet()} will immediately return the provided value. 250 */ 251 @Beta 252 public static <V, X extends Exception> CheckedFuture<V, X> 253 immediateCheckedFuture(@Nullable V value) { 254 return new ImmediateSuccessfulCheckedFuture<V, X>(value); 255 } 256 257 /** 258 * Returns a {@code ListenableFuture} which has an exception set immediately 259 * upon construction. 260 * 261 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 262 * method always returns {@code true}. Calling {@code get()} will immediately 263 * throw the provided {@code Throwable} wrapped in an {@code 264 * ExecutionException}. 265 */ 266 public static <V> ListenableFuture<V> immediateFailedFuture( 267 Throwable throwable) { 268 checkNotNull(throwable); 269 return new ImmediateFailedFuture<V>(throwable); 270 } 271 272 /** 273 * Creates a {@code ListenableFuture} which is cancelled immediately upon 274 * construction, so that {@code isCancelled()} always returns {@code true}. 275 * 276 * @since 14.0 277 */ 278 @Beta 279 public static <V> ListenableFuture<V> immediateCancelledFuture() { 280 return new ImmediateCancelledFuture<V>(); 281 } 282 283 /** 284 * Returns a {@code CheckedFuture} which has an exception set immediately upon 285 * construction. 286 * 287 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 288 * method always returns {@code true}. Calling {@code get()} will immediately 289 * throw the provided {@code Exception} wrapped in an {@code 290 * ExecutionException}, and calling {@code checkedGet()} will throw the 291 * provided exception itself. 292 */ 293 @Beta 294 public static <V, X extends Exception> CheckedFuture<V, X> 295 immediateFailedCheckedFuture(X exception) { 296 checkNotNull(exception); 297 return new ImmediateFailedCheckedFuture<V, X>(exception); 298 } 299 300 /** 301 * Returns a {@code Future} whose result is taken from the given primary 302 * {@code input} or, if the primary input fails, from the {@code Future} 303 * provided by the {@code fallback}. {@link FutureFallback#create} is not 304 * invoked until the primary input has failed, so if the primary input 305 * succeeds, it is never invoked. If, during the invocation of {@code 306 * fallback}, an exception is thrown, this exception is used as the result of 307 * the output {@code Future}. 308 * 309 * <p>Below is an example of a fallback that returns a default value if an 310 * exception occurs: 311 * 312 * <pre> {@code 313 * ListenableFuture<Integer> fetchCounterFuture = ...; 314 * 315 * // Falling back to a zero counter in case an exception happens when 316 * // processing the RPC to fetch counters. 317 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 318 * fetchCounterFuture, new FutureFallback<Integer>() { 319 * public ListenableFuture<Integer> create(Throwable t) { 320 * // Returning "0" as the default for the counter when the 321 * // exception happens. 322 * return immediateFuture(0); 323 * } 324 * }); 325 * }</pre> 326 * 327 * The fallback can also choose to propagate the original exception when 328 * desired: 329 * 330 * <pre> {@code 331 * ListenableFuture<Integer> fetchCounterFuture = ...; 332 * 333 * // Falling back to a zero counter only in case the exception was a 334 * // TimeoutException. 335 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 336 * fetchCounterFuture, new FutureFallback<Integer>() { 337 * public ListenableFuture<Integer> create(Throwable t) { 338 * if (t instanceof TimeoutException) { 339 * return immediateFuture(0); 340 * } 341 * return immediateFailedFuture(t); 342 * } 343 * }); 344 * }</pre> 345 * 346 * Note: If the derived {@code Future} is slow or heavyweight to create 347 * (whether the {@code Future} itself is slow or heavyweight to complete is 348 * irrelevant), consider {@linkplain #withFallback(ListenableFuture, 349 * FutureFallback, Executor) supplying an executor}. If you do not supply an 350 * executor, {@code withFallback} will use {@link 351 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 352 * caveats for heavier operations. For example, the call to {@code 353 * fallback.create} may run on an unpredictable or undesirable thread: 354 * 355 * <ul> 356 * <li>If the input {@code Future} is done at the time {@code withFallback} 357 * is called, {@code withFallback} will call {@code fallback.create} inline. 358 * <li>If the input {@code Future} is not yet done, {@code withFallback} will 359 * schedule {@code fallback.create} to be run by the thread that completes 360 * the input {@code Future}, which may be an internal system thread such as 361 * an RPC network thread. 362 * </ul> 363 * 364 * Also note that, regardless of which thread executes {@code 365 * fallback.create}, all other registered but unexecuted listeners are 366 * prevented from running during its execution, even if those listeners are 367 * to run in other executors. 368 * 369 * @param input the primary input {@code Future} 370 * @param fallback the {@link FutureFallback} implementation to be called if 371 * {@code input} fails 372 * @since 14.0 373 */ 374 @Beta 375 public static <V> ListenableFuture<V> withFallback( 376 ListenableFuture<? extends V> input, 377 FutureFallback<? extends V> fallback) { 378 return withFallback(input, fallback, sameThreadExecutor()); 379 } 380 381 /** 382 * Returns a {@code Future} whose result is taken from the given primary 383 * {@code input} or, if the primary input fails, from the {@code Future} 384 * provided by the {@code fallback}. {@link FutureFallback#create} is not 385 * invoked until the primary input has failed, so if the primary input 386 * succeeds, it is never invoked. If, during the invocation of {@code 387 * fallback}, an exception is thrown, this exception is used as the result of 388 * the output {@code Future}. 389 * 390 * <p>Below is an example of a fallback that returns a default value if an 391 * exception occurs: 392 * 393 * <pre> {@code 394 * ListenableFuture<Integer> fetchCounterFuture = ...; 395 * 396 * // Falling back to a zero counter in case an exception happens when 397 * // processing the RPC to fetch counters. 398 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 399 * fetchCounterFuture, new FutureFallback<Integer>() { 400 * public ListenableFuture<Integer> create(Throwable t) { 401 * // Returning "0" as the default for the counter when the 402 * // exception happens. 403 * return immediateFuture(0); 404 * } 405 * }, sameThreadExecutor()); 406 * }</pre> 407 * 408 * The fallback can also choose to propagate the original exception when 409 * desired: 410 * 411 * <pre> {@code 412 * ListenableFuture<Integer> fetchCounterFuture = ...; 413 * 414 * // Falling back to a zero counter only in case the exception was a 415 * // TimeoutException. 416 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 417 * fetchCounterFuture, new FutureFallback<Integer>() { 418 * public ListenableFuture<Integer> create(Throwable t) { 419 * if (t instanceof TimeoutException) { 420 * return immediateFuture(0); 421 * } 422 * return immediateFailedFuture(t); 423 * } 424 * }, sameThreadExecutor()); 425 * }</pre> 426 * 427 * When the execution of {@code fallback.create} is fast and lightweight 428 * (though the {@code Future} it returns need not meet these criteria), 429 * consider {@linkplain #withFallback(ListenableFuture, FutureFallback) 430 * omitting the executor} or explicitly specifying {@code 431 * sameThreadExecutor}. However, be aware of the caveats documented in the 432 * link above. 433 * 434 * @param input the primary input {@code Future} 435 * @param fallback the {@link FutureFallback} implementation to be called if 436 * {@code input} fails 437 * @param executor the executor that runs {@code fallback} if {@code input} 438 * fails 439 * @since 14.0 440 */ 441 @Beta 442 public static <V> ListenableFuture<V> withFallback( 443 ListenableFuture<? extends V> input, 444 FutureFallback<? extends V> fallback, Executor executor) { 445 checkNotNull(fallback); 446 return new FallbackFuture<V>(input, fallback, executor); 447 } 448 449 /** 450 * A future that falls back on a second, generated future, in case its 451 * original future fails. 452 */ 453 private static class FallbackFuture<V> extends AbstractFuture<V> { 454 455 private volatile ListenableFuture<? extends V> running; 456 457 FallbackFuture(ListenableFuture<? extends V> input, 458 final FutureFallback<? extends V> fallback, 459 final Executor executor) { 460 running = input; 461 addCallback(running, new FutureCallback<V>() { 462 @Override 463 public void onSuccess(V value) { 464 set(value); 465 } 466 467 @Override 468 public void onFailure(Throwable t) { 469 if (isCancelled()) { 470 return; 471 } 472 try { 473 running = fallback.create(t); 474 if (isCancelled()) { // in case cancel called in the meantime 475 running.cancel(wasInterrupted()); 476 return; 477 } 478 addCallback(running, new FutureCallback<V>() { 479 @Override 480 public void onSuccess(V value) { 481 set(value); 482 } 483 484 @Override 485 public void onFailure(Throwable t) { 486 if (running.isCancelled()) { 487 cancel(false); 488 } else { 489 setException(t); 490 } 491 } 492 }, sameThreadExecutor()); 493 } catch (Exception e) { 494 setException(e); 495 } catch (Error e) { 496 setException(e); // note: rethrows 497 } 498 } 499 }, executor); 500 } 501 502 @Override 503 public boolean cancel(boolean mayInterruptIfRunning) { 504 if (super.cancel(mayInterruptIfRunning)) { 505 running.cancel(mayInterruptIfRunning); 506 return true; 507 } 508 return false; 509 } 510 } 511 512 /** 513 * Returns a new {@code ListenableFuture} whose result is asynchronously 514 * derived from the result of the given {@code Future}. More precisely, the 515 * returned {@code Future} takes its result from a {@code Future} produced by 516 * applying the given {@code AsyncFunction} to the result of the original 517 * {@code Future}. Example: 518 * 519 * <pre> {@code 520 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 521 * AsyncFunction<RowKey, QueryResult> queryFunction = 522 * new AsyncFunction<RowKey, QueryResult>() { 523 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 524 * return dataService.read(rowKey); 525 * } 526 * }; 527 * ListenableFuture<QueryResult> queryFuture = 528 * transform(rowKeyFuture, queryFunction); 529 * }</pre> 530 * 531 * Note: If the derived {@code Future} is slow or heavyweight to create 532 * (whether the {@code Future} itself is slow or heavyweight to complete is 533 * irrelevant), consider {@linkplain #transform(ListenableFuture, 534 * AsyncFunction, Executor) supplying an executor}. If you do not supply an 535 * executor, {@code transform} will use {@link 536 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 537 * caveats for heavier operations. For example, the call to {@code 538 * function.apply} may run on an unpredictable or undesirable thread: 539 * 540 * <ul> 541 * <li>If the input {@code Future} is done at the time {@code transform} is 542 * called, {@code transform} will call {@code function.apply} inline. 543 * <li>If the input {@code Future} is not yet done, {@code transform} will 544 * schedule {@code function.apply} to be run by the thread that completes the 545 * input {@code Future}, which may be an internal system thread such as an 546 * RPC network thread. 547 * </ul> 548 * 549 * Also note that, regardless of which thread executes {@code 550 * function.apply}, all other registered but unexecuted listeners are 551 * prevented from running during its execution, even if those listeners are 552 * to run in other executors. 553 * 554 * <p>The returned {@code Future} attempts to keep its cancellation state in 555 * sync with that of the input future and that of the future returned by the 556 * function. That is, if the returned {@code Future} is cancelled, it will 557 * attempt to cancel the other two, and if either of the other two is 558 * cancelled, the returned {@code Future} will receive a callback in which it 559 * will attempt to cancel itself. 560 * 561 * @param input The future to transform 562 * @param function A function to transform the result of the input future 563 * to the result of the output future 564 * @return A future that holds result of the function (if the input succeeded) 565 * or the original input's failure (if not) 566 * @since 11.0 567 */ 568 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 569 AsyncFunction<? super I, ? extends O> function) { 570 return transform(input, function, MoreExecutors.sameThreadExecutor()); 571 } 572 573 /** 574 * Returns a new {@code ListenableFuture} whose result is asynchronously 575 * derived from the result of the given {@code Future}. More precisely, the 576 * returned {@code Future} takes its result from a {@code Future} produced by 577 * applying the given {@code AsyncFunction} to the result of the original 578 * {@code Future}. Example: 579 * 580 * <pre> {@code 581 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 582 * AsyncFunction<RowKey, QueryResult> queryFunction = 583 * new AsyncFunction<RowKey, QueryResult>() { 584 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 585 * return dataService.read(rowKey); 586 * } 587 * }; 588 * ListenableFuture<QueryResult> queryFuture = 589 * transform(rowKeyFuture, queryFunction, executor); 590 * }</pre> 591 * 592 * <p>The returned {@code Future} attempts to keep its cancellation state in 593 * sync with that of the input future and that of the future returned by the 594 * chain function. That is, if the returned {@code Future} is cancelled, it 595 * will attempt to cancel the other two, and if either of the other two is 596 * cancelled, the returned {@code Future} will receive a callback in which it 597 * will attempt to cancel itself. 598 * 599 * <p>When the execution of {@code function.apply} is fast and lightweight 600 * (though the {@code Future} it returns need not meet these criteria), 601 * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting 602 * the executor} or explicitly specifying {@code sameThreadExecutor}. 603 * However, be aware of the caveats documented in the link above. 604 * 605 * @param input The future to transform 606 * @param function A function to transform the result of the input future 607 * to the result of the output future 608 * @param executor Executor to run the function in. 609 * @return A future that holds result of the function (if the input succeeded) 610 * or the original input's failure (if not) 611 * @since 11.0 612 */ 613 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 614 AsyncFunction<? super I, ? extends O> function, 615 Executor executor) { 616 ChainingListenableFuture<I, O> output = 617 new ChainingListenableFuture<I, O>(function, input); 618 input.addListener(output, executor); 619 return output; 620 } 621 622 /** 623 * Returns a new {@code ListenableFuture} whose result is the product of 624 * applying the given {@code Function} to the result of the given {@code 625 * Future}. Example: 626 * 627 * <pre> {@code 628 * ListenableFuture<QueryResult> queryFuture = ...; 629 * Function<QueryResult, List<Row>> rowsFunction = 630 * new Function<QueryResult, List<Row>>() { 631 * public List<Row> apply(QueryResult queryResult) { 632 * return queryResult.getRows(); 633 * } 634 * }; 635 * ListenableFuture<List<Row>> rowsFuture = 636 * transform(queryFuture, rowsFunction); 637 * }</pre> 638 * 639 * Note: If the transformation is slow or heavyweight, consider {@linkplain 640 * #transform(ListenableFuture, Function, Executor) supplying an executor}. 641 * If you do not supply an executor, {@code transform} will use {@link 642 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 643 * caveats for heavier operations. For example, the call to {@code 644 * function.apply} may run on an unpredictable or undesirable thread: 645 * 646 * <ul> 647 * <li>If the input {@code Future} is done at the time {@code transform} is 648 * called, {@code transform} will call {@code function.apply} inline. 649 * <li>If the input {@code Future} is not yet done, {@code transform} will 650 * schedule {@code function.apply} to be run by the thread that completes the 651 * input {@code Future}, which may be an internal system thread such as an 652 * RPC network thread. 653 * </ul> 654 * 655 * Also note that, regardless of which thread executes {@code 656 * function.apply}, all other registered but unexecuted listeners are 657 * prevented from running during its execution, even if those listeners are 658 * to run in other executors. 659 * 660 * <p>The returned {@code Future} attempts to keep its cancellation state in 661 * sync with that of the input future. That is, if the returned {@code Future} 662 * is cancelled, it will attempt to cancel the input, and if the input is 663 * cancelled, the returned {@code Future} will receive a callback in which it 664 * will attempt to cancel itself. 665 * 666 * <p>An example use of this method is to convert a serializable object 667 * returned from an RPC into a POJO. 668 * 669 * @param input The future to transform 670 * @param function A Function to transform the results of the provided future 671 * to the results of the returned future. This will be run in the thread 672 * that notifies input it is complete. 673 * @return A future that holds result of the transformation. 674 * @since 9.0 (in 1.0 as {@code compose}) 675 */ 676 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 677 final Function<? super I, ? extends O> function) { 678 return transform(input, function, MoreExecutors.sameThreadExecutor()); 679 } 680 681 /** 682 * Returns a new {@code ListenableFuture} whose result is the product of 683 * applying the given {@code Function} to the result of the given {@code 684 * Future}. Example: 685 * 686 * <pre> {@code 687 * ListenableFuture<QueryResult> queryFuture = ...; 688 * Function<QueryResult, List<Row>> rowsFunction = 689 * new Function<QueryResult, List<Row>>() { 690 * public List<Row> apply(QueryResult queryResult) { 691 * return queryResult.getRows(); 692 * } 693 * }; 694 * ListenableFuture<List<Row>> rowsFuture = 695 * transform(queryFuture, rowsFunction, executor); 696 * }</pre> 697 * 698 * <p>The returned {@code Future} attempts to keep its cancellation state in 699 * sync with that of the input future. That is, if the returned {@code Future} 700 * is cancelled, it will attempt to cancel the input, and if the input is 701 * cancelled, the returned {@code Future} will receive a callback in which it 702 * will attempt to cancel itself. 703 * 704 * <p>An example use of this method is to convert a serializable object 705 * returned from an RPC into a POJO. 706 * 707 * <p>When the transformation is fast and lightweight, consider {@linkplain 708 * #transform(ListenableFuture, Function) omitting the executor} or 709 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 710 * caveats documented in the link above. 711 * 712 * @param input The future to transform 713 * @param function A Function to transform the results of the provided future 714 * to the results of the returned future. 715 * @param executor Executor to run the function in. 716 * @return A future that holds result of the transformation. 717 * @since 9.0 (in 2.0 as {@code compose}) 718 */ 719 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 720 final Function<? super I, ? extends O> function, Executor executor) { 721 checkNotNull(function); 722 AsyncFunction<I, O> wrapperFunction 723 = new AsyncFunction<I, O>() { 724 @Override public ListenableFuture<O> apply(I input) { 725 O output = function.apply(input); 726 return immediateFuture(output); 727 } 728 }; 729 return transform(input, wrapperFunction, executor); 730 } 731 732 /** 733 * Like {@link #transform(ListenableFuture, Function)} except that the 734 * transformation {@code function} is invoked on each call to 735 * {@link Future#get() get()} on the returned future. 736 * 737 * <p>The returned {@code Future} reflects the input's cancellation 738 * state directly, and any attempt to cancel the returned Future is likewise 739 * passed through to the input Future. 740 * 741 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 742 * only apply the timeout to the execution of the underlying {@code Future}, 743 * <em>not</em> to the execution of the transformation function. 744 * 745 * <p>The primary audience of this method is callers of {@code transform} 746 * who don't have a {@code ListenableFuture} available and 747 * do not mind repeated, lazy function evaluation. 748 * 749 * @param input The future to transform 750 * @param function A Function to transform the results of the provided future 751 * to the results of the returned future. 752 * @return A future that returns the result of the transformation. 753 * @since 10.0 754 */ 755 @Beta 756 public static <I, O> Future<O> lazyTransform(final Future<I> input, 757 final Function<? super I, ? extends O> function) { 758 checkNotNull(input); 759 checkNotNull(function); 760 return new Future<O>() { 761 762 @Override 763 public boolean cancel(boolean mayInterruptIfRunning) { 764 return input.cancel(mayInterruptIfRunning); 765 } 766 767 @Override 768 public boolean isCancelled() { 769 return input.isCancelled(); 770 } 771 772 @Override 773 public boolean isDone() { 774 return input.isDone(); 775 } 776 777 @Override 778 public O get() throws InterruptedException, ExecutionException { 779 return applyTransformation(input.get()); 780 } 781 782 @Override 783 public O get(long timeout, TimeUnit unit) 784 throws InterruptedException, ExecutionException, TimeoutException { 785 return applyTransformation(input.get(timeout, unit)); 786 } 787 788 private O applyTransformation(I input) throws ExecutionException { 789 try { 790 return function.apply(input); 791 } catch (Throwable t) { 792 throw new ExecutionException(t); 793 } 794 } 795 }; 796 } 797 798 /** 799 * An implementation of {@code ListenableFuture} that also implements 800 * {@code Runnable} so that it can be used to nest ListenableFutures. 801 * Once the passed-in {@code ListenableFuture} is complete, it calls the 802 * passed-in {@code Function} to generate the result. 803 * 804 * <p>If the function throws any checked exceptions, they should be wrapped 805 * in a {@code UndeclaredThrowableException} so that this class can get 806 * access to the cause. 807 */ 808 private static class ChainingListenableFuture<I, O> 809 extends AbstractFuture<O> implements Runnable { 810 811 private AsyncFunction<? super I, ? extends O> function; 812 private ListenableFuture<? extends I> inputFuture; 813 private volatile ListenableFuture<? extends O> outputFuture; 814 private final CountDownLatch outputCreated = new CountDownLatch(1); 815 816 private ChainingListenableFuture( 817 AsyncFunction<? super I, ? extends O> function, 818 ListenableFuture<? extends I> inputFuture) { 819 this.function = checkNotNull(function); 820 this.inputFuture = checkNotNull(inputFuture); 821 } 822 823 @Override 824 public boolean cancel(boolean mayInterruptIfRunning) { 825 /* 826 * Our additional cancellation work needs to occur even if 827 * !mayInterruptIfRunning, so we can't move it into interruptTask(). 828 */ 829 if (super.cancel(mayInterruptIfRunning)) { 830 // This should never block since only one thread is allowed to cancel 831 // this Future. 832 cancel(inputFuture, mayInterruptIfRunning); 833 cancel(outputFuture, mayInterruptIfRunning); 834 return true; 835 } 836 return false; 837 } 838 839 private void cancel(@Nullable Future<?> future, 840 boolean mayInterruptIfRunning) { 841 if (future != null) { 842 future.cancel(mayInterruptIfRunning); 843 } 844 } 845 846 @Override 847 public void run() { 848 try { 849 I sourceResult; 850 try { 851 sourceResult = getUninterruptibly(inputFuture); 852 } catch (CancellationException e) { 853 // Cancel this future and return. 854 // At this point, inputFuture is cancelled and outputFuture doesn't 855 // exist, so the value of mayInterruptIfRunning is irrelevant. 856 cancel(false); 857 return; 858 } catch (ExecutionException e) { 859 // Set the cause of the exception as this future's exception 860 setException(e.getCause()); 861 return; 862 } 863 864 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 865 function.apply(sourceResult); 866 if (isCancelled()) { 867 outputFuture.cancel(wasInterrupted()); 868 this.outputFuture = null; 869 return; 870 } 871 outputFuture.addListener(new Runnable() { 872 @Override 873 public void run() { 874 try { 875 // Here it would have been nice to have had an 876 // UninterruptibleListenableFuture, but we don't want to start a 877 // combinatorial explosion of interfaces, so we have to make do. 878 set(getUninterruptibly(outputFuture)); 879 } catch (CancellationException e) { 880 // Cancel this future and return. 881 // At this point, inputFuture and outputFuture are done, so the 882 // value of mayInterruptIfRunning is irrelevant. 883 cancel(false); 884 return; 885 } catch (ExecutionException e) { 886 // Set the cause of the exception as this future's exception 887 setException(e.getCause()); 888 } finally { 889 // Don't pin inputs beyond completion 890 ChainingListenableFuture.this.outputFuture = null; 891 } 892 } 893 }, MoreExecutors.sameThreadExecutor()); 894 } catch (UndeclaredThrowableException e) { 895 // Set the cause of the exception as this future's exception 896 setException(e.getCause()); 897 } catch (Exception e) { 898 // This exception is irrelevant in this thread, but useful for the 899 // client 900 setException(e); 901 } catch (Error e) { 902 // Propagate errors up ASAP - our superclass will rethrow the error 903 setException(e); 904 } finally { 905 // Don't pin inputs beyond completion 906 function = null; 907 inputFuture = null; 908 // Allow our get routines to examine outputFuture now. 909 outputCreated.countDown(); 910 } 911 } 912 } 913 914 /** 915 * Returns a new {@code ListenableFuture} whose result is the product of 916 * calling {@code get()} on the {@code Future} nested within the given {@code 917 * Future}, effectively chaining the futures one after the other. Example: 918 * 919 * <pre> {@code 920 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 921 * ListenableFuture<String> dereferenced = dereference(nested); 922 * }</pre> 923 * 924 * <p>This call has the same cancellation and execution semantics as {@link 925 * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code 926 * Future} attempts to keep its cancellation state in sync with both the 927 * input {@code Future} and the nested {@code Future}. The transformation 928 * is very lightweight and therefore takes place in the thread that called 929 * {@code dereference}. 930 * 931 * @param nested The nested future to transform. 932 * @return A future that holds result of the inner future. 933 * @since 13.0 934 */ 935 @Beta 936 @SuppressWarnings({"rawtypes", "unchecked"}) 937 public static <V> ListenableFuture<V> dereference( 938 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 939 return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); 940 } 941 942 /** 943 * Helper {@code Function} for {@link #dereference}. 944 */ 945 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 946 new AsyncFunction<ListenableFuture<Object>, Object>() { 947 @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 948 return input; 949 } 950 }; 951 952 /** 953 * Creates a new {@code ListenableFuture} whose value is a list containing the 954 * values of all its input futures, if all succeed. If any input fails, the 955 * returned future fails. 956 * 957 * <p>The list of results is in the same order as the input list. 958 * 959 * <p>Canceling this future will attempt to cancel all the component futures, 960 * and if any of the provided futures fails or is canceled, this one is, 961 * too. 962 * 963 * @param futures futures to combine 964 * @return a future that provides a list of the results of the component 965 * futures 966 * @since 10.0 967 */ 968 @Beta 969 public static <V> ListenableFuture<List<V>> allAsList( 970 ListenableFuture<? extends V>... futures) { 971 return listFuture(ImmutableList.copyOf(futures), true, 972 MoreExecutors.sameThreadExecutor()); 973 } 974 975 /** 976 * Creates a new {@code ListenableFuture} whose value is a list containing the 977 * values of all its input futures, if all succeed. If any input fails, the 978 * returned future fails. 979 * 980 * <p>The list of results is in the same order as the input list. 981 * 982 * <p>Canceling this future will attempt to cancel all the component futures, 983 * and if any of the provided futures fails or is canceled, this one is, 984 * too. 985 * 986 * @param futures futures to combine 987 * @return a future that provides a list of the results of the component 988 * futures 989 * @since 10.0 990 */ 991 @Beta 992 public static <V> ListenableFuture<List<V>> allAsList( 993 Iterable<? extends ListenableFuture<? extends V>> futures) { 994 return listFuture(ImmutableList.copyOf(futures), true, 995 MoreExecutors.sameThreadExecutor()); 996 } 997 998 /** 999 * Creates a new {@code ListenableFuture} whose value is a list containing the 1000 * values of all its successful input futures. The list of results is in the 1001 * same order as the input list, and if any of the provided futures fails or 1002 * is canceled, its corresponding position will contain {@code null} (which is 1003 * indistinguishable from the future having a successful value of 1004 * {@code null}). 1005 * 1006 * <p>Canceling this future will attempt to cancel all the component futures. 1007 * 1008 * @param futures futures to combine 1009 * @return a future that provides a list of the results of the component 1010 * futures 1011 * @since 10.0 1012 */ 1013 @Beta 1014 public static <V> ListenableFuture<List<V>> successfulAsList( 1015 ListenableFuture<? extends V>... futures) { 1016 return listFuture(ImmutableList.copyOf(futures), false, 1017 MoreExecutors.sameThreadExecutor()); 1018 } 1019 1020 /** 1021 * Creates a new {@code ListenableFuture} whose value is a list containing the 1022 * values of all its successful input futures. The list of results is in the 1023 * same order as the input list, and if any of the provided futures fails or 1024 * is canceled, its corresponding position will contain {@code null} (which is 1025 * indistinguishable from the future having a successful value of 1026 * {@code null}). 1027 * 1028 * <p>Canceling this future will attempt to cancel all the component futures. 1029 * 1030 * @param futures futures to combine 1031 * @return a future that provides a list of the results of the component 1032 * futures 1033 * @since 10.0 1034 */ 1035 @Beta 1036 public static <V> ListenableFuture<List<V>> successfulAsList( 1037 Iterable<? extends ListenableFuture<? extends V>> futures) { 1038 return listFuture(ImmutableList.copyOf(futures), false, 1039 MoreExecutors.sameThreadExecutor()); 1040 } 1041 1042 /** 1043 * Registers separate success and failure callbacks to be run when the {@code 1044 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1045 * complete} or, if the computation is already complete, immediately. 1046 * 1047 * <p>There is no guaranteed ordering of execution of callbacks, but any 1048 * callback added through this method is guaranteed to be called once the 1049 * computation is complete. 1050 * 1051 * Example: <pre> {@code 1052 * ListenableFuture<QueryResult> future = ...; 1053 * addCallback(future, 1054 * new FutureCallback<QueryResult> { 1055 * public void onSuccess(QueryResult result) { 1056 * storeInCache(result); 1057 * } 1058 * public void onFailure(Throwable t) { 1059 * reportError(t); 1060 * } 1061 * });}</pre> 1062 * 1063 * Note: If the callback is slow or heavyweight, consider {@linkplain 1064 * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an 1065 * executor}. If you do not supply an executor, {@code addCallback} will use 1066 * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries 1067 * some caveats for heavier operations. For example, the callback may run on 1068 * an unpredictable or undesirable thread: 1069 * 1070 * <ul> 1071 * <li>If the input {@code Future} is done at the time {@code addCallback} is 1072 * called, {@code addCallback} will execute the callback inline. 1073 * <li>If the input {@code Future} is not yet done, {@code addCallback} will 1074 * schedule the callback to be run by the thread that completes the input 1075 * {@code Future}, which may be an internal system thread such as an RPC 1076 * network thread. 1077 * </ul> 1078 * 1079 * Also note that, regardless of which thread executes the callback, all 1080 * other registered but unexecuted listeners are prevented from running 1081 * during its execution, even if those listeners are to run in other 1082 * executors. 1083 * 1084 * <p>For a more general interface to attach a completion listener to a 1085 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1086 * 1087 * @param future The future attach the callback to. 1088 * @param callback The callback to invoke when {@code future} is completed. 1089 * @since 10.0 1090 */ 1091 public static <V> void addCallback(ListenableFuture<V> future, 1092 FutureCallback<? super V> callback) { 1093 addCallback(future, callback, MoreExecutors.sameThreadExecutor()); 1094 } 1095 1096 /** 1097 * Registers separate success and failure callbacks to be run when the {@code 1098 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1099 * complete} or, if the computation is already complete, immediately. 1100 * 1101 * <p>The callback is run in {@code executor}. 1102 * There is no guaranteed ordering of execution of callbacks, but any 1103 * callback added through this method is guaranteed to be called once the 1104 * computation is complete. 1105 * 1106 * Example: <pre> {@code 1107 * ListenableFuture<QueryResult> future = ...; 1108 * Executor e = ... 1109 * addCallback(future, e, 1110 * new FutureCallback<QueryResult> { 1111 * public void onSuccess(QueryResult result) { 1112 * storeInCache(result); 1113 * } 1114 * public void onFailure(Throwable t) { 1115 * reportError(t); 1116 * } 1117 * });}</pre> 1118 * 1119 * When the callback is fast and lightweight, consider {@linkplain 1120 * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or 1121 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 1122 * caveats documented in the link above. 1123 * 1124 * <p>For a more general interface to attach a completion listener to a 1125 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1126 * 1127 * @param future The future attach the callback to. 1128 * @param callback The callback to invoke when {@code future} is completed. 1129 * @param executor The executor to run {@code callback} when the future 1130 * completes. 1131 * @since 10.0 1132 */ 1133 public static <V> void addCallback(final ListenableFuture<V> future, 1134 final FutureCallback<? super V> callback, Executor executor) { 1135 Preconditions.checkNotNull(callback); 1136 Runnable callbackListener = new Runnable() { 1137 @Override 1138 public void run() { 1139 final V value; 1140 try { 1141 // TODO(user): (Before Guava release), validate that this 1142 // is the thing for IE. 1143 value = getUninterruptibly(future); 1144 } catch (ExecutionException e) { 1145 callback.onFailure(e.getCause()); 1146 return; 1147 } catch (RuntimeException e) { 1148 callback.onFailure(e); 1149 return; 1150 } catch (Error e) { 1151 callback.onFailure(e); 1152 return; 1153 } 1154 callback.onSuccess(value); 1155 } 1156 }; 1157 future.addListener(callbackListener, executor); 1158 } 1159 1160 /** 1161 * Returns the result of {@link Future#get()}, converting most exceptions to a 1162 * new instance of the given checked exception type. This reduces boilerplate 1163 * for a common use of {@code Future} in which it is unnecessary to 1164 * programmatically distinguish between exception types or to extract other 1165 * information from the exception instance. 1166 * 1167 * <p>Exceptions from {@code Future.get} are treated as follows: 1168 * <ul> 1169 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1170 * {@code X} if the cause is a checked exception, an {@link 1171 * UncheckedExecutionException} if the cause is a {@code 1172 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1173 * {@code Error}. 1174 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1175 * restoring the interrupt). 1176 * <li>Any {@link CancellationException} is propagated untouched, as is any 1177 * other {@link RuntimeException} (though {@code get} implementations are 1178 * discouraged from throwing such exceptions). 1179 * </ul> 1180 * 1181 * The overall principle is to continue to treat every checked exception as a 1182 * checked exception, every unchecked exception as an unchecked exception, and 1183 * every error as an error. In addition, the cause of any {@code 1184 * ExecutionException} is wrapped in order to ensure that the new stack trace 1185 * matches that of the current thread. 1186 * 1187 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1188 * public constructor that accepts zero or more arguments, all of type {@code 1189 * String} or {@code Throwable} (preferring constructors with at least one 1190 * {@code String}) and calling the constructor via reflection. If the 1191 * exception did not already have a cause, one is set by calling {@link 1192 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1193 * {@code IllegalArgumentException} is thrown. 1194 * 1195 * @throws X if {@code get} throws any checked exception except for an {@code 1196 * ExecutionException} whose cause is not itself a checked exception 1197 * @throws UncheckedExecutionException if {@code get} throws an {@code 1198 * ExecutionException} with a {@code RuntimeException} as its cause 1199 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1200 * with an {@code Error} as its cause 1201 * @throws CancellationException if {@code get} throws a {@code 1202 * CancellationException} 1203 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1204 * RuntimeException} or does not have a suitable constructor 1205 * @since 10.0 1206 */ 1207 @Beta 1208 public static <V, X extends Exception> V get( 1209 Future<V> future, Class<X> exceptionClass) throws X { 1210 checkNotNull(future); 1211 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1212 "Futures.get exception type (%s) must not be a RuntimeException", 1213 exceptionClass); 1214 try { 1215 return future.get(); 1216 } catch (InterruptedException e) { 1217 currentThread().interrupt(); 1218 throw newWithCause(exceptionClass, e); 1219 } catch (ExecutionException e) { 1220 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1221 throw new AssertionError(); 1222 } 1223 } 1224 1225 /** 1226 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1227 * exceptions to a new instance of the given checked exception type. This 1228 * reduces boilerplate for a common use of {@code Future} in which it is 1229 * unnecessary to programmatically distinguish between exception types or to 1230 * extract other information from the exception instance. 1231 * 1232 * <p>Exceptions from {@code Future.get} are treated as follows: 1233 * <ul> 1234 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1235 * {@code X} if the cause is a checked exception, an {@link 1236 * UncheckedExecutionException} if the cause is a {@code 1237 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1238 * {@code Error}. 1239 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1240 * restoring the interrupt). 1241 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1242 * <li>Any {@link CancellationException} is propagated untouched, as is any 1243 * other {@link RuntimeException} (though {@code get} implementations are 1244 * discouraged from throwing such exceptions). 1245 * </ul> 1246 * 1247 * The overall principle is to continue to treat every checked exception as a 1248 * checked exception, every unchecked exception as an unchecked exception, and 1249 * every error as an error. In addition, the cause of any {@code 1250 * ExecutionException} is wrapped in order to ensure that the new stack trace 1251 * matches that of the current thread. 1252 * 1253 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1254 * public constructor that accepts zero or more arguments, all of type {@code 1255 * String} or {@code Throwable} (preferring constructors with at least one 1256 * {@code String}) and calling the constructor via reflection. If the 1257 * exception did not already have a cause, one is set by calling {@link 1258 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1259 * {@code IllegalArgumentException} is thrown. 1260 * 1261 * @throws X if {@code get} throws any checked exception except for an {@code 1262 * ExecutionException} whose cause is not itself a checked exception 1263 * @throws UncheckedExecutionException if {@code get} throws an {@code 1264 * ExecutionException} with a {@code RuntimeException} as its cause 1265 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1266 * with an {@code Error} as its cause 1267 * @throws CancellationException if {@code get} throws a {@code 1268 * CancellationException} 1269 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1270 * RuntimeException} or does not have a suitable constructor 1271 * @since 10.0 1272 */ 1273 @Beta 1274 public static <V, X extends Exception> V get( 1275 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 1276 throws X { 1277 checkNotNull(future); 1278 checkNotNull(unit); 1279 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1280 "Futures.get exception type (%s) must not be a RuntimeException", 1281 exceptionClass); 1282 try { 1283 return future.get(timeout, unit); 1284 } catch (InterruptedException e) { 1285 currentThread().interrupt(); 1286 throw newWithCause(exceptionClass, e); 1287 } catch (TimeoutException e) { 1288 throw newWithCause(exceptionClass, e); 1289 } catch (ExecutionException e) { 1290 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1291 throw new AssertionError(); 1292 } 1293 } 1294 1295 private static <X extends Exception> void wrapAndThrowExceptionOrError( 1296 Throwable cause, Class<X> exceptionClass) throws X { 1297 if (cause instanceof Error) { 1298 throw new ExecutionError((Error) cause); 1299 } 1300 if (cause instanceof RuntimeException) { 1301 throw new UncheckedExecutionException(cause); 1302 } 1303 throw newWithCause(exceptionClass, cause); 1304 } 1305 1306 /** 1307 * Returns the result of calling {@link Future#get()} uninterruptibly on a 1308 * task known not to throw a checked exception. This makes {@code Future} more 1309 * suitable for lightweight, fast-running tasks that, barring bugs in the 1310 * code, will not fail. This gives it exception-handling behavior similar to 1311 * that of {@code ForkJoinTask.join}. 1312 * 1313 * <p>Exceptions from {@code Future.get} are treated as follows: 1314 * <ul> 1315 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1316 * {@link UncheckedExecutionException} (if the cause is an {@code 1317 * Exception}) or {@link ExecutionError} (if the cause is an {@code 1318 * Error}). 1319 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 1320 * call. The interrupt is restored before {@code getUnchecked} returns. 1321 * <li>Any {@link CancellationException} is propagated untouched. So is any 1322 * other {@link RuntimeException} ({@code get} implementations are 1323 * discouraged from throwing such exceptions). 1324 * </ul> 1325 * 1326 * The overall principle is to eliminate all checked exceptions: to loop to 1327 * avoid {@code InterruptedException}, to pass through {@code 1328 * CancellationException}, and to wrap any exception from the underlying 1329 * computation in an {@code UncheckedExecutionException} or {@code 1330 * ExecutionError}. 1331 * 1332 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 1333 * {@link Uninterruptibles#getUninterruptibly(Future)}. 1334 * 1335 * @throws UncheckedExecutionException if {@code get} throws an {@code 1336 * ExecutionException} with an {@code Exception} as its cause 1337 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1338 * with an {@code Error} as its cause 1339 * @throws CancellationException if {@code get} throws a {@code 1340 * CancellationException} 1341 * @since 10.0 1342 */ 1343 @Beta 1344 public static <V> V getUnchecked(Future<V> future) { 1345 checkNotNull(future); 1346 try { 1347 return getUninterruptibly(future); 1348 } catch (ExecutionException e) { 1349 wrapAndThrowUnchecked(e.getCause()); 1350 throw new AssertionError(); 1351 } 1352 } 1353 1354 private static void wrapAndThrowUnchecked(Throwable cause) { 1355 if (cause instanceof Error) { 1356 throw new ExecutionError((Error) cause); 1357 } 1358 /* 1359 * It's a non-Error, non-Exception Throwable. From my survey of such 1360 * classes, I believe that most users intended to extend Exception, so we'll 1361 * treat it like an Exception. 1362 */ 1363 throw new UncheckedExecutionException(cause); 1364 } 1365 1366 /* 1367 * TODO(user): FutureChecker interface for these to be static methods on? If 1368 * so, refer to it in the (static-method) Futures.get documentation 1369 */ 1370 1371 /* 1372 * Arguably we don't need a timed getUnchecked because any operation slow 1373 * enough to require a timeout is heavyweight enough to throw a checked 1374 * exception and therefore be inappropriate to use with getUnchecked. Further, 1375 * it's not clear that converting the checked TimeoutException to a 1376 * RuntimeException -- especially to an UncheckedExecutionException, since it 1377 * wasn't thrown by the computation -- makes sense, and if we don't convert 1378 * it, the user still has to write a try-catch block. 1379 * 1380 * If you think you would use this method, let us know. 1381 */ 1382 1383 private static <X extends Exception> X newWithCause( 1384 Class<X> exceptionClass, Throwable cause) { 1385 // getConstructors() guarantees this as long as we don't modify the array. 1386 @SuppressWarnings("unchecked") 1387 List<Constructor<X>> constructors = 1388 (List) Arrays.asList(exceptionClass.getConstructors()); 1389 for (Constructor<X> constructor : preferringStrings(constructors)) { 1390 @Nullable X instance = newFromConstructor(constructor, cause); 1391 if (instance != null) { 1392 if (instance.getCause() == null) { 1393 instance.initCause(cause); 1394 } 1395 return instance; 1396 } 1397 } 1398 throw new IllegalArgumentException( 1399 "No appropriate constructor for exception of type " + exceptionClass 1400 + " in response to chained exception", cause); 1401 } 1402 1403 private static <X extends Exception> List<Constructor<X>> 1404 preferringStrings(List<Constructor<X>> constructors) { 1405 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); 1406 } 1407 1408 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST = 1409 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() { 1410 @Override public Boolean apply(Constructor<?> input) { 1411 return asList(input.getParameterTypes()).contains(String.class); 1412 } 1413 }).reverse(); 1414 1415 @Nullable private static <X> X newFromConstructor( 1416 Constructor<X> constructor, Throwable cause) { 1417 Class<?>[] paramTypes = constructor.getParameterTypes(); 1418 Object[] params = new Object[paramTypes.length]; 1419 for (int i = 0; i < paramTypes.length; i++) { 1420 Class<?> paramType = paramTypes[i]; 1421 if (paramType.equals(String.class)) { 1422 params[i] = cause.toString(); 1423 } else if (paramType.equals(Throwable.class)) { 1424 params[i] = cause; 1425 } else { 1426 return null; 1427 } 1428 } 1429 try { 1430 return constructor.newInstance(params); 1431 } catch (IllegalArgumentException e) { 1432 return null; 1433 } catch (InstantiationException e) { 1434 return null; 1435 } catch (IllegalAccessException e) { 1436 return null; 1437 } catch (InvocationTargetException e) { 1438 return null; 1439 } 1440 } 1441 1442 private interface FutureCombiner<V, C> { 1443 C combine(List<Optional<V>> values); 1444 } 1445 1446 private static class CombinedFuture<V, C> extends AbstractFuture<C> { 1447 ImmutableCollection<? extends ListenableFuture<? extends V>> futures; 1448 final boolean allMustSucceed; 1449 final AtomicInteger remaining; 1450 FutureCombiner<V, C> combiner; 1451 List<Optional<V>> values; 1452 1453 CombinedFuture( 1454 ImmutableCollection<? extends ListenableFuture<? extends V>> futures, 1455 boolean allMustSucceed, Executor listenerExecutor, 1456 FutureCombiner<V, C> combiner) { 1457 this.futures = futures; 1458 this.allMustSucceed = allMustSucceed; 1459 this.remaining = new AtomicInteger(futures.size()); 1460 this.combiner = combiner; 1461 this.values = Lists.newArrayListWithCapacity(futures.size()); 1462 init(listenerExecutor); 1463 } 1464 1465 /** 1466 * Must be called at the end of the constructor. 1467 */ 1468 protected void init(final Executor listenerExecutor) { 1469 // First, schedule cleanup to execute when the Future is done. 1470 addListener(new Runnable() { 1471 @Override 1472 public void run() { 1473 // Cancel all the component futures. 1474 if (CombinedFuture.this.isCancelled()) { 1475 for (ListenableFuture<?> future : CombinedFuture.this.futures) { 1476 future.cancel(CombinedFuture.this.wasInterrupted()); 1477 } 1478 } 1479 1480 // By now the values array has either been set as the Future's value, 1481 // or (in case of failure) is no longer useful. 1482 CombinedFuture.this.futures = null; 1483 1484 // Let go of the memory held by other futures 1485 CombinedFuture.this.values = null; 1486 1487 // The combiner may also hold state, so free that as well 1488 CombinedFuture.this.combiner = null; 1489 } 1490 }, MoreExecutors.sameThreadExecutor()); 1491 1492 // Now begin the "real" initialization. 1493 1494 // Corner case: List is empty. 1495 if (futures.isEmpty()) { 1496 set(combiner.combine(ImmutableList.<Optional<V>>of())); 1497 return; 1498 } 1499 1500 // Populate the results list with null initially. 1501 for (int i = 0; i < futures.size(); ++i) { 1502 values.add(null); 1503 } 1504 1505 // Register a listener on each Future in the list to update 1506 // the state of this future. 1507 // Note that if all the futures on the list are done prior to completing 1508 // this loop, the last call to addListener() will callback to 1509 // setOneValue(), transitively call our cleanup listener, and set 1510 // this.futures to null. 1511 // This is not actually a problem, since the foreach only needs 1512 // this.futures to be non-null at the beginning of the loop. 1513 int i = 0; 1514 for (final ListenableFuture<? extends V> listenable : futures) { 1515 final int index = i++; 1516 listenable.addListener(new Runnable() { 1517 @Override 1518 public void run() { 1519 setOneValue(index, listenable); 1520 } 1521 }, listenerExecutor); 1522 } 1523 } 1524 1525 /** 1526 * Sets the value at the given index to that of the given future. 1527 */ 1528 private void setOneValue(int index, Future<? extends V> future) { 1529 List<Optional<V>> localValues = values; 1530 if (isDone() || localValues == null) { 1531 // Some other future failed or has been cancelled, causing this one to 1532 // also be cancelled or have an exception set. This should only happen 1533 // if allMustSucceed is true or if the output itself has been cancelled. 1534 checkState(allMustSucceed || isCancelled(), 1535 "Future was done before all dependencies completed"); 1536 return; 1537 } 1538 1539 try { 1540 checkState(future.isDone(), 1541 "Tried to set value from future which is not done"); 1542 V returnValue = getUninterruptibly(future); 1543 localValues.set(index, Optional.fromNullable(returnValue)); 1544 } catch (CancellationException e) { 1545 if (allMustSucceed) { 1546 // Set ourselves as cancelled. Let the input futures keep running 1547 // as some of them may be used elsewhere. 1548 // (Currently we don't override interruptTask, so 1549 // mayInterruptIfRunning==false isn't technically necessary.) 1550 cancel(false); 1551 } 1552 } catch (ExecutionException e) { 1553 if (allMustSucceed) { 1554 // As soon as the first one fails, throw the exception up. 1555 // The result of all other inputs is then ignored. 1556 setException(e.getCause()); 1557 } 1558 } catch (RuntimeException e) { 1559 if (allMustSucceed) { 1560 setException(e); 1561 } 1562 } catch (Error e) { 1563 // Propagate errors up ASAP - our superclass will rethrow the error 1564 setException(e); 1565 } finally { 1566 int newRemaining = remaining.decrementAndGet(); 1567 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 1568 if (newRemaining == 0) { 1569 FutureCombiner<V, C> localCombiner = combiner; 1570 if (localCombiner != null) { 1571 set(localCombiner.combine(localValues)); 1572 } else { 1573 checkState(isDone()); 1574 } 1575 } 1576 } 1577 } 1578 1579 } 1580 1581 /** Used for {@link #allAsList} and {@link #successfulAsList}. */ 1582 private static <V> ListenableFuture<List<V>> listFuture( 1583 ImmutableList<ListenableFuture<? extends V>> futures, 1584 boolean allMustSucceed, Executor listenerExecutor) { 1585 return new CombinedFuture<V, List<V>>( 1586 futures, allMustSucceed, listenerExecutor, 1587 new FutureCombiner<V, List<V>>() { 1588 @Override 1589 public List<V> combine(List<Optional<V>> values) { 1590 List<V> result = Lists.newArrayList(); 1591 for (Optional<V> element : values) { 1592 result.add(element != null ? element.orNull() : null); 1593 } 1594 // TODO(user): This should ultimately return an unmodifiableList 1595 return result; 1596 } 1597 }); 1598 } 1599 1600 /** 1601 * A checked future that uses a function to map from exceptions to the 1602 * appropriate checked type. 1603 */ 1604 private static class MappingCheckedFuture<V, X extends Exception> extends 1605 AbstractCheckedFuture<V, X> { 1606 1607 final Function<Exception, X> mapper; 1608 1609 MappingCheckedFuture(ListenableFuture<V> delegate, 1610 Function<Exception, X> mapper) { 1611 super(delegate); 1612 1613 this.mapper = checkNotNull(mapper); 1614 } 1615 1616 @Override 1617 protected X mapException(Exception e) { 1618 return mapper.apply(e); 1619 } 1620 } 1621}