001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.base.Preconditions.checkState; 022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; 023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 024import static java.lang.Thread.currentThread; 025import static java.util.Arrays.asList; 026 027import com.google.common.annotations.Beta; 028import com.google.common.base.Function; 029import com.google.common.base.Optional; 030import com.google.common.base.Preconditions; 031import com.google.common.collect.ImmutableCollection; 032import com.google.common.collect.ImmutableList; 033import com.google.common.collect.Lists; 034import com.google.common.collect.Ordering; 035 036import java.lang.reflect.Constructor; 037import java.lang.reflect.InvocationTargetException; 038import java.lang.reflect.UndeclaredThrowableException; 039import java.util.Arrays; 040import java.util.Collections; 041import java.util.List; 042import java.util.concurrent.CancellationException; 043import java.util.concurrent.CountDownLatch; 044import java.util.concurrent.ExecutionException; 045import java.util.concurrent.Executor; 046import java.util.concurrent.Future; 047import java.util.concurrent.TimeUnit; 048import java.util.concurrent.TimeoutException; 049import java.util.concurrent.atomic.AtomicInteger; 050import java.util.logging.Level; 051import java.util.logging.Logger; 052 053import javax.annotation.Nullable; 054 055/** 056 * Static utility methods pertaining to the {@link Future} interface. 057 * 058 * <p>Many of these methods use the {@link ListenableFuture} API; consult the 059 * Guava User Guide article on <a href= 060 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> 061 * {@code ListenableFuture}</a>. 062 * 063 * @author Kevin Bourrillion 064 * @author Nishant Thakkar 065 * @author Sven Mawson 066 * @since 1.0 067 */ 068@Beta 069public final class Futures { 070 private Futures() {} 071 072 /** 073 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 074 * and a {@link Function} that maps from {@link Exception} instances into the 075 * appropriate checked type. 076 * 077 * <p>The given mapping function will be applied to an 078 * {@link InterruptedException}, a {@link CancellationException}, or an 079 * {@link ExecutionException}. 080 * See {@link Future#get()} for details on the exceptions thrown. 081 * 082 * @since 9.0 (source-compatible since 1.0) 083 */ 084 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 085 ListenableFuture<V> future, Function<Exception, X> mapper) { 086 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 087 } 088 089 private abstract static class ImmediateFuture<V> 090 implements ListenableFuture<V> { 091 092 private static final Logger log = 093 Logger.getLogger(ImmediateFuture.class.getName()); 094 095 @Override 096 public void addListener(Runnable listener, Executor executor) { 097 checkNotNull(listener, "Runnable was null."); 098 checkNotNull(executor, "Executor was null."); 099 try { 100 executor.execute(listener); 101 } catch (RuntimeException e) { 102 // ListenableFuture's contract is that it will not throw unchecked 103 // exceptions, so log the bad runnable and/or executor and swallow it. 104 log.log(Level.SEVERE, "RuntimeException while executing runnable " 105 + listener + " with executor " + executor, e); 106 } 107 } 108 109 @Override 110 public boolean cancel(boolean mayInterruptIfRunning) { 111 return false; 112 } 113 114 @Override 115 public abstract V get() throws ExecutionException; 116 117 @Override 118 public V get(long timeout, TimeUnit unit) throws ExecutionException { 119 checkNotNull(unit); 120 return get(); 121 } 122 123 @Override 124 public boolean isCancelled() { 125 return false; 126 } 127 128 @Override 129 public boolean isDone() { 130 return true; 131 } 132 } 133 134 private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { 135 136 @Nullable private final V value; 137 138 ImmediateSuccessfulFuture(@Nullable V value) { 139 this.value = value; 140 } 141 142 @Override 143 public V get() { 144 return value; 145 } 146 } 147 148 private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception> 149 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 150 151 @Nullable private final V value; 152 153 ImmediateSuccessfulCheckedFuture(@Nullable V value) { 154 this.value = value; 155 } 156 157 @Override 158 public V get() { 159 return value; 160 } 161 162 @Override 163 public V checkedGet() { 164 return value; 165 } 166 167 @Override 168 public V checkedGet(long timeout, TimeUnit unit) { 169 checkNotNull(unit); 170 return value; 171 } 172 } 173 174 private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { 175 176 private final Throwable thrown; 177 178 ImmediateFailedFuture(Throwable thrown) { 179 this.thrown = thrown; 180 } 181 182 @Override 183 public V get() throws ExecutionException { 184 throw new ExecutionException(thrown); 185 } 186 } 187 188 private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> { 189 190 private final CancellationException thrown; 191 192 ImmediateCancelledFuture() { 193 this.thrown = new CancellationException("Immediate cancelled future."); 194 } 195 196 @Override 197 public boolean isCancelled() { 198 return true; 199 } 200 201 @Override 202 public V get() { 203 throw AbstractFuture.cancellationExceptionWithCause( 204 "Task was cancelled.", thrown); 205 } 206 } 207 208 private static class ImmediateFailedCheckedFuture<V, X extends Exception> 209 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 210 211 private final X thrown; 212 213 ImmediateFailedCheckedFuture(X thrown) { 214 this.thrown = thrown; 215 } 216 217 @Override 218 public V get() throws ExecutionException { 219 throw new ExecutionException(thrown); 220 } 221 222 @Override 223 public V checkedGet() throws X { 224 throw thrown; 225 } 226 227 @Override 228 public V checkedGet(long timeout, TimeUnit unit) throws X { 229 checkNotNull(unit); 230 throw thrown; 231 } 232 } 233 234 /** 235 * Creates a {@code ListenableFuture} which has its value set immediately upon 236 * construction. The getters just return the value. This {@code Future} can't 237 * be canceled or timed out and its {@code isDone()} method always returns 238 * {@code true}. 239 */ 240 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 241 return new ImmediateSuccessfulFuture<V>(value); 242 } 243 244 /** 245 * Returns a {@code CheckedFuture} which has its value set immediately upon 246 * construction. 247 * 248 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 249 * method always returns {@code true}. Calling {@code get()} or {@code 250 * checkedGet()} will immediately return the provided value. 251 */ 252 public static <V, X extends Exception> CheckedFuture<V, X> 253 immediateCheckedFuture(@Nullable V value) { 254 return new ImmediateSuccessfulCheckedFuture<V, X>(value); 255 } 256 257 /** 258 * Returns a {@code ListenableFuture} which has an exception set immediately 259 * upon construction. 260 * 261 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 262 * method always returns {@code true}. Calling {@code get()} will immediately 263 * throw the provided {@code Throwable} wrapped in an {@code 264 * ExecutionException}. 265 */ 266 public static <V> ListenableFuture<V> immediateFailedFuture( 267 Throwable throwable) { 268 checkNotNull(throwable); 269 return new ImmediateFailedFuture<V>(throwable); 270 } 271 272 /** 273 * Creates a {@code ListenableFuture} which is cancelled immediately upon 274 * construction, so that {@code isCancelled()} always returns {@code true}. 275 * 276 * @since 14.0 277 */ 278 public static <V> ListenableFuture<V> immediateCancelledFuture() { 279 return new ImmediateCancelledFuture<V>(); 280 } 281 282 /** 283 * Returns a {@code CheckedFuture} which has an exception set immediately upon 284 * construction. 285 * 286 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 287 * method always returns {@code true}. Calling {@code get()} will immediately 288 * throw the provided {@code Exception} wrapped in an {@code 289 * ExecutionException}, and calling {@code checkedGet()} will throw the 290 * provided exception itself. 291 */ 292 public static <V, X extends Exception> CheckedFuture<V, X> 293 immediateFailedCheckedFuture(X exception) { 294 checkNotNull(exception); 295 return new ImmediateFailedCheckedFuture<V, X>(exception); 296 } 297 298 /** 299 * Returns a {@code Future} whose result is taken from the given primary 300 * {@code input} or, if the primary input fails, from the {@code Future} 301 * provided by the {@code fallback}. {@link FutureFallback#create} is not 302 * invoked until the primary input has failed, so if the primary input 303 * succeeds, it is never invoked. If, during the invocation of {@code 304 * fallback}, an exception is thrown, this exception is used as the result of 305 * the output {@code Future}. 306 * 307 * <p>Below is an example of a fallback that returns a default value if an 308 * exception occurs: 309 * 310 * <pre> {@code 311 * ListenableFuture<Integer> fetchCounterFuture = ...; 312 * 313 * // Falling back to a zero counter in case an exception happens when 314 * // processing the RPC to fetch counters. 315 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 316 * fetchCounterFuture, new FutureFallback<Integer>() { 317 * public ListenableFuture<Integer> create(Throwable t) { 318 * // Returning "0" as the default for the counter when the 319 * // exception happens. 320 * return immediateFuture(0); 321 * } 322 * });}</pre> 323 * 324 * <p>The fallback can also choose to propagate the original exception when 325 * desired: 326 * 327 * <pre> {@code 328 * ListenableFuture<Integer> fetchCounterFuture = ...; 329 * 330 * // Falling back to a zero counter only in case the exception was a 331 * // TimeoutException. 332 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 333 * fetchCounterFuture, new FutureFallback<Integer>() { 334 * public ListenableFuture<Integer> create(Throwable t) { 335 * if (t instanceof TimeoutException) { 336 * return immediateFuture(0); 337 * } 338 * return immediateFailedFuture(t); 339 * } 340 * });}</pre> 341 * 342 * <p>Note: If the derived {@code Future} is slow or heavyweight to create 343 * (whether the {@code Future} itself is slow or heavyweight to complete is 344 * irrelevant), consider {@linkplain #withFallback(ListenableFuture, 345 * FutureFallback, Executor) supplying an executor}. If you do not supply an 346 * executor, {@code withFallback} will use {@link 347 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 348 * caveats for heavier operations. For example, the call to {@code 349 * fallback.create} may run on an unpredictable or undesirable thread: 350 * 351 * <ul> 352 * <li>If the input {@code Future} is done at the time {@code withFallback} 353 * is called, {@code withFallback} will call {@code fallback.create} inline. 354 * <li>If the input {@code Future} is not yet done, {@code withFallback} will 355 * schedule {@code fallback.create} to be run by the thread that completes 356 * the input {@code Future}, which may be an internal system thread such as 357 * an RPC network thread. 358 * </ul> 359 * 360 * <p>Also note that, regardless of which thread executes the {@code 361 * sameThreadExecutor} {@code fallback.create}, all other registered but 362 * unexecuted listeners are prevented from running during its execution, even 363 * if those listeners are to run in other executors. 364 * 365 * @param input the primary input {@code Future} 366 * @param fallback the {@link FutureFallback} implementation to be called if 367 * {@code input} fails 368 * @since 14.0 369 */ 370 public static <V> ListenableFuture<V> withFallback( 371 ListenableFuture<? extends V> input, 372 FutureFallback<? extends V> fallback) { 373 return withFallback(input, fallback, sameThreadExecutor()); 374 } 375 376 /** 377 * Returns a {@code Future} whose result is taken from the given primary 378 * {@code input} or, if the primary input fails, from the {@code Future} 379 * provided by the {@code fallback}. {@link FutureFallback#create} is not 380 * invoked until the primary input has failed, so if the primary input 381 * succeeds, it is never invoked. If, during the invocation of {@code 382 * fallback}, an exception is thrown, this exception is used as the result of 383 * the output {@code Future}. 384 * 385 * <p>Below is an example of a fallback that returns a default value if an 386 * exception occurs: 387 * 388 * <pre> {@code 389 * ListenableFuture<Integer> fetchCounterFuture = ...; 390 * 391 * // Falling back to a zero counter in case an exception happens when 392 * // processing the RPC to fetch counters. 393 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 394 * fetchCounterFuture, new FutureFallback<Integer>() { 395 * public ListenableFuture<Integer> create(Throwable t) { 396 * // Returning "0" as the default for the counter when the 397 * // exception happens. 398 * return immediateFuture(0); 399 * } 400 * }, sameThreadExecutor());}</pre> 401 * 402 * <p>The fallback can also choose to propagate the original exception when 403 * desired: 404 * 405 * <pre> {@code 406 * ListenableFuture<Integer> fetchCounterFuture = ...; 407 * 408 * // Falling back to a zero counter only in case the exception was a 409 * // TimeoutException. 410 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 411 * fetchCounterFuture, new FutureFallback<Integer>() { 412 * public ListenableFuture<Integer> create(Throwable t) { 413 * if (t instanceof TimeoutException) { 414 * return immediateFuture(0); 415 * } 416 * return immediateFailedFuture(t); 417 * } 418 * }, sameThreadExecutor());}</pre> 419 * 420 * <p>When the execution of {@code fallback.create} is fast and lightweight 421 * (though the {@code Future} it returns need not meet these criteria), 422 * consider {@linkplain #withFallback(ListenableFuture, FutureFallback) 423 * omitting the executor} or explicitly specifying {@code 424 * sameThreadExecutor}. However, be aware of the caveats documented in the 425 * link above. 426 * 427 * @param input the primary input {@code Future} 428 * @param fallback the {@link FutureFallback} implementation to be called if 429 * {@code input} fails 430 * @param executor the executor that runs {@code fallback} if {@code input} 431 * fails 432 * @since 14.0 433 */ 434 public static <V> ListenableFuture<V> withFallback( 435 ListenableFuture<? extends V> input, 436 FutureFallback<? extends V> fallback, Executor executor) { 437 checkNotNull(fallback); 438 return new FallbackFuture<V>(input, fallback, executor); 439 } 440 441 /** 442 * A future that falls back on a second, generated future, in case its 443 * original future fails. 444 */ 445 private static class FallbackFuture<V> extends AbstractFuture<V> { 446 447 private volatile ListenableFuture<? extends V> running; 448 449 FallbackFuture(ListenableFuture<? extends V> input, 450 final FutureFallback<? extends V> fallback, 451 final Executor executor) { 452 running = input; 453 addCallback(running, new FutureCallback<V>() { 454 @Override 455 public void onSuccess(V value) { 456 set(value); 457 } 458 459 @Override 460 public void onFailure(Throwable t) { 461 if (isCancelled()) { 462 return; 463 } 464 try { 465 running = fallback.create(t); 466 if (isCancelled()) { // in case cancel called in the meantime 467 running.cancel(wasInterrupted()); 468 return; 469 } 470 addCallback(running, new FutureCallback<V>() { 471 @Override 472 public void onSuccess(V value) { 473 set(value); 474 } 475 476 @Override 477 public void onFailure(Throwable t) { 478 if (running.isCancelled()) { 479 cancel(false); 480 } else { 481 setException(t); 482 } 483 } 484 }, sameThreadExecutor()); 485 } catch (Throwable e) { 486 setException(e); 487 } 488 } 489 }, executor); 490 } 491 492 @Override 493 public boolean cancel(boolean mayInterruptIfRunning) { 494 if (super.cancel(mayInterruptIfRunning)) { 495 running.cancel(mayInterruptIfRunning); 496 return true; 497 } 498 return false; 499 } 500 } 501 502 /** 503 * Returns a new {@code ListenableFuture} whose result is asynchronously 504 * derived from the result of the given {@code Future}. More precisely, the 505 * returned {@code Future} takes its result from a {@code Future} produced by 506 * applying the given {@code AsyncFunction} to the result of the original 507 * {@code Future}. Example: 508 * 509 * <pre> {@code 510 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 511 * AsyncFunction<RowKey, QueryResult> queryFunction = 512 * new AsyncFunction<RowKey, QueryResult>() { 513 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 514 * return dataService.read(rowKey); 515 * } 516 * }; 517 * ListenableFuture<QueryResult> queryFuture = 518 * transform(rowKeyFuture, queryFunction);}</pre> 519 * 520 * <p>Note: If the derived {@code Future} is slow or heavyweight to create 521 * (whether the {@code Future} itself is slow or heavyweight to complete is 522 * irrelevant), consider {@linkplain #transform(ListenableFuture, 523 * AsyncFunction, Executor) supplying an executor}. If you do not supply an 524 * executor, {@code transform} will use {@link 525 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 526 * caveats for heavier operations. For example, the call to {@code 527 * function.apply} may run on an unpredictable or undesirable thread: 528 * 529 * <ul> 530 * <li>If the input {@code Future} is done at the time {@code transform} is 531 * called, {@code transform} will call {@code function.apply} inline. 532 * <li>If the input {@code Future} is not yet done, {@code transform} will 533 * schedule {@code function.apply} to be run by the thread that completes the 534 * input {@code Future}, which may be an internal system thread such as an 535 * RPC network thread. 536 * </ul> 537 * 538 * <p>Also note that, regardless of which thread executes the {@code 539 * sameThreadExecutor} {@code function.apply}, all other registered but 540 * unexecuted listeners are prevented from running during its execution, even 541 * if those listeners are to run in other executors. 542 * 543 * <p>The returned {@code Future} attempts to keep its cancellation state in 544 * sync with that of the input future and that of the future returned by the 545 * function. That is, if the returned {@code Future} is cancelled, it will 546 * attempt to cancel the other two, and if either of the other two is 547 * cancelled, the returned {@code Future} will receive a callback in which it 548 * will attempt to cancel itself. 549 * 550 * @param input The future to transform 551 * @param function A function to transform the result of the input future 552 * to the result of the output future 553 * @return A future that holds result of the function (if the input succeeded) 554 * or the original input's failure (if not) 555 * @since 11.0 556 */ 557 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 558 AsyncFunction<? super I, ? extends O> function) { 559 return transform(input, function, MoreExecutors.sameThreadExecutor()); 560 } 561 562 /** 563 * Returns a new {@code ListenableFuture} whose result is asynchronously 564 * derived from the result of the given {@code Future}. More precisely, the 565 * returned {@code Future} takes its result from a {@code Future} produced by 566 * applying the given {@code AsyncFunction} to the result of the original 567 * {@code Future}. Example: 568 * 569 * <pre> {@code 570 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 571 * AsyncFunction<RowKey, QueryResult> queryFunction = 572 * new AsyncFunction<RowKey, QueryResult>() { 573 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 574 * return dataService.read(rowKey); 575 * } 576 * }; 577 * ListenableFuture<QueryResult> queryFuture = 578 * transform(rowKeyFuture, queryFunction, executor);}</pre> 579 * 580 * <p>The returned {@code Future} attempts to keep its cancellation state in 581 * sync with that of the input future and that of the future returned by the 582 * chain function. That is, if the returned {@code Future} is cancelled, it 583 * will attempt to cancel the other two, and if either of the other two is 584 * cancelled, the returned {@code Future} will receive a callback in which it 585 * will attempt to cancel itself. 586 * 587 * <p>When the execution of {@code function.apply} is fast and lightweight 588 * (though the {@code Future} it returns need not meet these criteria), 589 * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting 590 * the executor} or explicitly specifying {@code sameThreadExecutor}. 591 * However, be aware of the caveats documented in the link above. 592 * 593 * @param input The future to transform 594 * @param function A function to transform the result of the input future 595 * to the result of the output future 596 * @param executor Executor to run the function in. 597 * @return A future that holds result of the function (if the input succeeded) 598 * or the original input's failure (if not) 599 * @since 11.0 600 */ 601 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 602 AsyncFunction<? super I, ? extends O> function, 603 Executor executor) { 604 ChainingListenableFuture<I, O> output = 605 new ChainingListenableFuture<I, O>(function, input); 606 input.addListener(output, executor); 607 return output; 608 } 609 610 /** 611 * Returns a new {@code ListenableFuture} whose result is the product of 612 * applying the given {@code Function} to the result of the given {@code 613 * Future}. Example: 614 * 615 * <pre> {@code 616 * ListenableFuture<QueryResult> queryFuture = ...; 617 * Function<QueryResult, List<Row>> rowsFunction = 618 * new Function<QueryResult, List<Row>>() { 619 * public List<Row> apply(QueryResult queryResult) { 620 * return queryResult.getRows(); 621 * } 622 * }; 623 * ListenableFuture<List<Row>> rowsFuture = 624 * transform(queryFuture, rowsFunction);}</pre> 625 * 626 * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain 627 * #transform(ListenableFuture, Function, Executor) supplying an executor}. 628 * If you do not supply an executor, {@code transform} will use {@link 629 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 630 * caveats for heavier operations. For example, the call to {@code 631 * function.apply} may run on an unpredictable or undesirable thread: 632 * 633 * <ul> 634 * <li>If the input {@code Future} is done at the time {@code transform} is 635 * called, {@code transform} will call {@code function.apply} inline. 636 * <li>If the input {@code Future} is not yet done, {@code transform} will 637 * schedule {@code function.apply} to be run by the thread that completes the 638 * input {@code Future}, which may be an internal system thread such as an 639 * RPC network thread. 640 * </ul> 641 * 642 * <p>Also note that, regardless of which thread executes the {@code 643 * sameThreadExecutor} {@code function.apply}, all other registered but 644 * unexecuted listeners are prevented from running during its execution, even 645 * if those listeners are to run in other executors. 646 * 647 * <p>The returned {@code Future} attempts to keep its cancellation state in 648 * sync with that of the input future. That is, if the returned {@code Future} 649 * is cancelled, it will attempt to cancel the input, and if the input is 650 * cancelled, the returned {@code Future} will receive a callback in which it 651 * will attempt to cancel itself. 652 * 653 * <p>An example use of this method is to convert a serializable object 654 * returned from an RPC into a POJO. 655 * 656 * @param input The future to transform 657 * @param function A Function to transform the results of the provided future 658 * to the results of the returned future. This will be run in the thread 659 * that notifies input it is complete. 660 * @return A future that holds result of the transformation. 661 * @since 9.0 (in 1.0 as {@code compose}) 662 */ 663 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 664 final Function<? super I, ? extends O> function) { 665 return transform(input, function, MoreExecutors.sameThreadExecutor()); 666 } 667 668 /** 669 * Returns a new {@code ListenableFuture} whose result is the product of 670 * applying the given {@code Function} to the result of the given {@code 671 * Future}. Example: 672 * 673 * <pre> {@code 674 * ListenableFuture<QueryResult> queryFuture = ...; 675 * Function<QueryResult, List<Row>> rowsFunction = 676 * new Function<QueryResult, List<Row>>() { 677 * public List<Row> apply(QueryResult queryResult) { 678 * return queryResult.getRows(); 679 * } 680 * }; 681 * ListenableFuture<List<Row>> rowsFuture = 682 * transform(queryFuture, rowsFunction, executor);}</pre> 683 * 684 * <p>The returned {@code Future} attempts to keep its cancellation state in 685 * sync with that of the input future. That is, if the returned {@code Future} 686 * is cancelled, it will attempt to cancel the input, and if the input is 687 * cancelled, the returned {@code Future} will receive a callback in which it 688 * will attempt to cancel itself. 689 * 690 * <p>An example use of this method is to convert a serializable object 691 * returned from an RPC into a POJO. 692 * 693 * <p>When the transformation is fast and lightweight, consider {@linkplain 694 * #transform(ListenableFuture, Function) omitting the executor} or 695 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 696 * caveats documented in the link above. 697 * 698 * @param input The future to transform 699 * @param function A Function to transform the results of the provided future 700 * to the results of the returned future. 701 * @param executor Executor to run the function in. 702 * @return A future that holds result of the transformation. 703 * @since 9.0 (in 2.0 as {@code compose}) 704 */ 705 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 706 final Function<? super I, ? extends O> function, Executor executor) { 707 checkNotNull(function); 708 AsyncFunction<I, O> wrapperFunction 709 = new AsyncFunction<I, O>() { 710 @Override public ListenableFuture<O> apply(I input) { 711 O output = function.apply(input); 712 return immediateFuture(output); 713 } 714 }; 715 return transform(input, wrapperFunction, executor); 716 } 717 718 /** 719 * Like {@link #transform(ListenableFuture, Function)} except that the 720 * transformation {@code function} is invoked on each call to 721 * {@link Future#get() get()} on the returned future. 722 * 723 * <p>The returned {@code Future} reflects the input's cancellation 724 * state directly, and any attempt to cancel the returned Future is likewise 725 * passed through to the input Future. 726 * 727 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 728 * only apply the timeout to the execution of the underlying {@code Future}, 729 * <em>not</em> to the execution of the transformation function. 730 * 731 * <p>The primary audience of this method is callers of {@code transform} 732 * who don't have a {@code ListenableFuture} available and 733 * do not mind repeated, lazy function evaluation. 734 * 735 * @param input The future to transform 736 * @param function A Function to transform the results of the provided future 737 * to the results of the returned future. 738 * @return A future that returns the result of the transformation. 739 * @since 10.0 740 */ 741 public static <I, O> Future<O> lazyTransform(final Future<I> input, 742 final Function<? super I, ? extends O> function) { 743 checkNotNull(input); 744 checkNotNull(function); 745 return new Future<O>() { 746 747 @Override 748 public boolean cancel(boolean mayInterruptIfRunning) { 749 return input.cancel(mayInterruptIfRunning); 750 } 751 752 @Override 753 public boolean isCancelled() { 754 return input.isCancelled(); 755 } 756 757 @Override 758 public boolean isDone() { 759 return input.isDone(); 760 } 761 762 @Override 763 public O get() throws InterruptedException, ExecutionException { 764 return applyTransformation(input.get()); 765 } 766 767 @Override 768 public O get(long timeout, TimeUnit unit) 769 throws InterruptedException, ExecutionException, TimeoutException { 770 return applyTransformation(input.get(timeout, unit)); 771 } 772 773 private O applyTransformation(I input) throws ExecutionException { 774 try { 775 return function.apply(input); 776 } catch (Throwable t) { 777 throw new ExecutionException(t); 778 } 779 } 780 }; 781 } 782 783 /** 784 * An implementation of {@code ListenableFuture} that also implements 785 * {@code Runnable} so that it can be used to nest ListenableFutures. 786 * Once the passed-in {@code ListenableFuture} is complete, it calls the 787 * passed-in {@code Function} to generate the result. 788 * 789 * <p>If the function throws any checked exceptions, they should be wrapped 790 * in a {@code UndeclaredThrowableException} so that this class can get 791 * access to the cause. 792 */ 793 private static class ChainingListenableFuture<I, O> 794 extends AbstractFuture<O> implements Runnable { 795 796 private AsyncFunction<? super I, ? extends O> function; 797 private ListenableFuture<? extends I> inputFuture; 798 private volatile ListenableFuture<? extends O> outputFuture; 799 private final CountDownLatch outputCreated = new CountDownLatch(1); 800 801 private ChainingListenableFuture( 802 AsyncFunction<? super I, ? extends O> function, 803 ListenableFuture<? extends I> inputFuture) { 804 this.function = checkNotNull(function); 805 this.inputFuture = checkNotNull(inputFuture); 806 } 807 808 @Override 809 public boolean cancel(boolean mayInterruptIfRunning) { 810 /* 811 * Our additional cancellation work needs to occur even if 812 * !mayInterruptIfRunning, so we can't move it into interruptTask(). 813 */ 814 if (super.cancel(mayInterruptIfRunning)) { 815 // This should never block since only one thread is allowed to cancel 816 // this Future. 817 cancel(inputFuture, mayInterruptIfRunning); 818 cancel(outputFuture, mayInterruptIfRunning); 819 return true; 820 } 821 return false; 822 } 823 824 private void cancel(@Nullable Future<?> future, 825 boolean mayInterruptIfRunning) { 826 if (future != null) { 827 future.cancel(mayInterruptIfRunning); 828 } 829 } 830 831 @Override 832 public void run() { 833 try { 834 I sourceResult; 835 try { 836 sourceResult = getUninterruptibly(inputFuture); 837 } catch (CancellationException e) { 838 // Cancel this future and return. 839 // At this point, inputFuture is cancelled and outputFuture doesn't 840 // exist, so the value of mayInterruptIfRunning is irrelevant. 841 cancel(false); 842 return; 843 } catch (ExecutionException e) { 844 // Set the cause of the exception as this future's exception 845 setException(e.getCause()); 846 return; 847 } 848 849 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 850 function.apply(sourceResult); 851 if (isCancelled()) { 852 outputFuture.cancel(wasInterrupted()); 853 this.outputFuture = null; 854 return; 855 } 856 outputFuture.addListener(new Runnable() { 857 @Override 858 public void run() { 859 try { 860 set(getUninterruptibly(outputFuture)); 861 } catch (CancellationException e) { 862 // Cancel this future and return. 863 // At this point, inputFuture and outputFuture are done, so the 864 // value of mayInterruptIfRunning is irrelevant. 865 cancel(false); 866 return; 867 } catch (ExecutionException e) { 868 // Set the cause of the exception as this future's exception 869 setException(e.getCause()); 870 } finally { 871 // Don't pin inputs beyond completion 872 ChainingListenableFuture.this.outputFuture = null; 873 } 874 } 875 }, MoreExecutors.sameThreadExecutor()); 876 } catch (UndeclaredThrowableException e) { 877 // Set the cause of the exception as this future's exception 878 setException(e.getCause()); 879 } catch (Throwable t) { 880 // This exception is irrelevant in this thread, but useful for the 881 // client 882 setException(t); 883 } finally { 884 // Don't pin inputs beyond completion 885 function = null; 886 inputFuture = null; 887 // Allow our get routines to examine outputFuture now. 888 outputCreated.countDown(); 889 } 890 } 891 } 892 893 /** 894 * Returns a new {@code ListenableFuture} whose result is the product of 895 * calling {@code get()} on the {@code Future} nested within the given {@code 896 * Future}, effectively chaining the futures one after the other. Example: 897 * 898 * <pre> {@code 899 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 900 * ListenableFuture<String> dereferenced = dereference(nested);}</pre> 901 * 902 * <p>This call has the same cancellation and execution semantics as {@link 903 * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code 904 * Future} attempts to keep its cancellation state in sync with both the 905 * input {@code Future} and the nested {@code Future}. The transformation 906 * is very lightweight and therefore takes place in the thread that called 907 * {@code dereference}. 908 * 909 * @param nested The nested future to transform. 910 * @return A future that holds result of the inner future. 911 * @since 13.0 912 */ 913 @SuppressWarnings({"rawtypes", "unchecked"}) 914 public static <V> ListenableFuture<V> dereference( 915 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 916 return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); 917 } 918 919 /** 920 * Helper {@code Function} for {@link #dereference}. 921 */ 922 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 923 new AsyncFunction<ListenableFuture<Object>, Object>() { 924 @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 925 return input; 926 } 927 }; 928 929 /** 930 * Creates a new {@code ListenableFuture} whose value is a list containing the 931 * values of all its input futures, if all succeed. If any input fails, the 932 * returned future fails. 933 * 934 * <p>The list of results is in the same order as the input list. 935 * 936 * <p>Canceling this future will attempt to cancel all the component futures, 937 * and if any of the provided futures fails or is canceled, this one is, 938 * too. 939 * 940 * @param futures futures to combine 941 * @return a future that provides a list of the results of the component 942 * futures 943 * @since 10.0 944 */ 945 @Beta 946 public static <V> ListenableFuture<List<V>> allAsList( 947 ListenableFuture<? extends V>... futures) { 948 return listFuture(ImmutableList.copyOf(futures), true, 949 MoreExecutors.sameThreadExecutor()); 950 } 951 952 /** 953 * Creates a new {@code ListenableFuture} whose value is a list containing the 954 * values of all its input futures, if all succeed. If any input fails, the 955 * returned future fails. 956 * 957 * <p>The list of results is in the same order as the input list. 958 * 959 * <p>Canceling this future will attempt to cancel all the component futures, 960 * and if any of the provided futures fails or is canceled, this one is, 961 * too. 962 * 963 * @param futures futures to combine 964 * @return a future that provides a list of the results of the component 965 * futures 966 * @since 10.0 967 */ 968 @Beta 969 public static <V> ListenableFuture<List<V>> allAsList( 970 Iterable<? extends ListenableFuture<? extends V>> futures) { 971 return listFuture(ImmutableList.copyOf(futures), true, 972 MoreExecutors.sameThreadExecutor()); 973 } 974 975 /** 976 * Creates a new {@code ListenableFuture} whose result is set from the 977 * supplied future when it completes. Cancelling the supplied future 978 * will also cancel the returned future, but cancelling the returned 979 * future will have no effect on the supplied future. 980 * 981 * @since 15.0 982 */ 983 public static <V> ListenableFuture<V> nonCancellationPropagating( 984 ListenableFuture<V> future) { 985 return new NonCancellationPropagatingFuture<V>(future); 986 } 987 988 /** 989 * A wrapped future that does not propagate cancellation to its delegate. 990 */ 991 private static class NonCancellationPropagatingFuture<V> 992 extends AbstractFuture<V> { 993 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 994 checkNotNull(delegate); 995 addCallback(delegate, new FutureCallback<V>() { 996 @Override 997 public void onSuccess(V result) { 998 set(result); 999 } 1000 1001 @Override 1002 public void onFailure(Throwable t) { 1003 if (delegate.isCancelled()) { 1004 cancel(false); 1005 } else { 1006 setException(t); 1007 } 1008 } 1009 }, sameThreadExecutor()); 1010 } 1011 } 1012 1013 /** 1014 * Creates a new {@code ListenableFuture} whose value is a list containing the 1015 * values of all its successful input futures. The list of results is in the 1016 * same order as the input list, and if any of the provided futures fails or 1017 * is canceled, its corresponding position will contain {@code null} (which is 1018 * indistinguishable from the future having a successful value of 1019 * {@code null}). 1020 * 1021 * <p>Canceling this future will attempt to cancel all the component futures. 1022 * 1023 * @param futures futures to combine 1024 * @return a future that provides a list of the results of the component 1025 * futures 1026 * @since 10.0 1027 */ 1028 @Beta 1029 public static <V> ListenableFuture<List<V>> successfulAsList( 1030 ListenableFuture<? extends V>... futures) { 1031 return listFuture(ImmutableList.copyOf(futures), false, 1032 MoreExecutors.sameThreadExecutor()); 1033 } 1034 1035 /** 1036 * Creates a new {@code ListenableFuture} whose value is a list containing the 1037 * values of all its successful input futures. The list of results is in the 1038 * same order as the input list, and if any of the provided futures fails or 1039 * is canceled, its corresponding position will contain {@code null} (which is 1040 * indistinguishable from the future having a successful value of 1041 * {@code null}). 1042 * 1043 * <p>Canceling this future will attempt to cancel all the component futures. 1044 * 1045 * @param futures futures to combine 1046 * @return a future that provides a list of the results of the component 1047 * futures 1048 * @since 10.0 1049 */ 1050 @Beta 1051 public static <V> ListenableFuture<List<V>> successfulAsList( 1052 Iterable<? extends ListenableFuture<? extends V>> futures) { 1053 return listFuture(ImmutableList.copyOf(futures), false, 1054 MoreExecutors.sameThreadExecutor()); 1055 } 1056 1057 /** 1058 * Registers separate success and failure callbacks to be run when the {@code 1059 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1060 * complete} or, if the computation is already complete, immediately. 1061 * 1062 * <p>There is no guaranteed ordering of execution of callbacks, but any 1063 * callback added through this method is guaranteed to be called once the 1064 * computation is complete. 1065 * 1066 * Example: <pre> {@code 1067 * ListenableFuture<QueryResult> future = ...; 1068 * addCallback(future, 1069 * new FutureCallback<QueryResult> { 1070 * public void onSuccess(QueryResult result) { 1071 * storeInCache(result); 1072 * } 1073 * public void onFailure(Throwable t) { 1074 * reportError(t); 1075 * } 1076 * });}</pre> 1077 * 1078 * <p>Note: If the callback is slow or heavyweight, consider {@linkplain 1079 * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an 1080 * executor}. If you do not supply an executor, {@code addCallback} will use 1081 * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries 1082 * some caveats for heavier operations. For example, the callback may run on 1083 * an unpredictable or undesirable thread: 1084 * 1085 * <ul> 1086 * <li>If the input {@code Future} is done at the time {@code addCallback} is 1087 * called, {@code addCallback} will execute the callback inline. 1088 * <li>If the input {@code Future} is not yet done, {@code addCallback} will 1089 * schedule the callback to be run by the thread that completes the input 1090 * {@code Future}, which may be an internal system thread such as an RPC 1091 * network thread. 1092 * </ul> 1093 * 1094 * <p>Also note that, regardless of which thread executes the {@code 1095 * sameThreadExecutor} callback, all other registered but unexecuted listeners 1096 * are prevented from running during its execution, even if those listeners 1097 * are to run in other executors. 1098 * 1099 * <p>For a more general interface to attach a completion listener to a 1100 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1101 * 1102 * @param future The future attach the callback to. 1103 * @param callback The callback to invoke when {@code future} is completed. 1104 * @since 10.0 1105 */ 1106 public static <V> void addCallback(ListenableFuture<V> future, 1107 FutureCallback<? super V> callback) { 1108 addCallback(future, callback, MoreExecutors.sameThreadExecutor()); 1109 } 1110 1111 /** 1112 * Registers separate success and failure callbacks to be run when the {@code 1113 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1114 * complete} or, if the computation is already complete, immediately. 1115 * 1116 * <p>The callback is run in {@code executor}. 1117 * There is no guaranteed ordering of execution of callbacks, but any 1118 * callback added through this method is guaranteed to be called once the 1119 * computation is complete. 1120 * 1121 * Example: <pre> {@code 1122 * ListenableFuture<QueryResult> future = ...; 1123 * Executor e = ... 1124 * addCallback(future, e, 1125 * new FutureCallback<QueryResult> { 1126 * public void onSuccess(QueryResult result) { 1127 * storeInCache(result); 1128 * } 1129 * public void onFailure(Throwable t) { 1130 * reportError(t); 1131 * } 1132 * });}</pre> 1133 * 1134 * <p>When the callback is fast and lightweight, consider {@linkplain 1135 * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or 1136 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 1137 * caveats documented in the link above. 1138 * 1139 * <p>For a more general interface to attach a completion listener to a 1140 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1141 * 1142 * @param future The future attach the callback to. 1143 * @param callback The callback to invoke when {@code future} is completed. 1144 * @param executor The executor to run {@code callback} when the future 1145 * completes. 1146 * @since 10.0 1147 */ 1148 public static <V> void addCallback(final ListenableFuture<V> future, 1149 final FutureCallback<? super V> callback, Executor executor) { 1150 Preconditions.checkNotNull(callback); 1151 Runnable callbackListener = new Runnable() { 1152 @Override 1153 public void run() { 1154 final V value; 1155 try { 1156 // TODO(user): (Before Guava release), validate that this 1157 // is the thing for IE. 1158 value = getUninterruptibly(future); 1159 } catch (ExecutionException e) { 1160 callback.onFailure(e.getCause()); 1161 return; 1162 } catch (RuntimeException e) { 1163 callback.onFailure(e); 1164 return; 1165 } catch (Error e) { 1166 callback.onFailure(e); 1167 return; 1168 } 1169 callback.onSuccess(value); 1170 } 1171 }; 1172 future.addListener(callbackListener, executor); 1173 } 1174 1175 /** 1176 * Returns the result of {@link Future#get()}, converting most exceptions to a 1177 * new instance of the given checked exception type. This reduces boilerplate 1178 * for a common use of {@code Future} in which it is unnecessary to 1179 * programmatically distinguish between exception types or to extract other 1180 * information from the exception instance. 1181 * 1182 * <p>Exceptions from {@code Future.get} are treated as follows: 1183 * <ul> 1184 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1185 * {@code X} if the cause is a checked exception, an {@link 1186 * UncheckedExecutionException} if the cause is a {@code 1187 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1188 * {@code Error}. 1189 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1190 * restoring the interrupt). 1191 * <li>Any {@link CancellationException} is propagated untouched, as is any 1192 * other {@link RuntimeException} (though {@code get} implementations are 1193 * discouraged from throwing such exceptions). 1194 * </ul> 1195 * 1196 * <p>The overall principle is to continue to treat every checked exception as a 1197 * checked exception, every unchecked exception as an unchecked exception, and 1198 * every error as an error. In addition, the cause of any {@code 1199 * ExecutionException} is wrapped in order to ensure that the new stack trace 1200 * matches that of the current thread. 1201 * 1202 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1203 * public constructor that accepts zero or more arguments, all of type {@code 1204 * String} or {@code Throwable} (preferring constructors with at least one 1205 * {@code String}) and calling the constructor via reflection. If the 1206 * exception did not already have a cause, one is set by calling {@link 1207 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1208 * {@code IllegalArgumentException} is thrown. 1209 * 1210 * @throws X if {@code get} throws any checked exception except for an {@code 1211 * ExecutionException} whose cause is not itself a checked exception 1212 * @throws UncheckedExecutionException if {@code get} throws an {@code 1213 * ExecutionException} with a {@code RuntimeException} as its cause 1214 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1215 * with an {@code Error} as its cause 1216 * @throws CancellationException if {@code get} throws a {@code 1217 * CancellationException} 1218 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1219 * RuntimeException} or does not have a suitable constructor 1220 * @since 10.0 1221 */ 1222 public static <V, X extends Exception> V get( 1223 Future<V> future, Class<X> exceptionClass) throws X { 1224 checkNotNull(future); 1225 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1226 "Futures.get exception type (%s) must not be a RuntimeException", 1227 exceptionClass); 1228 try { 1229 return future.get(); 1230 } catch (InterruptedException e) { 1231 currentThread().interrupt(); 1232 throw newWithCause(exceptionClass, e); 1233 } catch (ExecutionException e) { 1234 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1235 throw new AssertionError(); 1236 } 1237 } 1238 1239 /** 1240 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1241 * exceptions to a new instance of the given checked exception type. This 1242 * reduces boilerplate for a common use of {@code Future} in which it is 1243 * unnecessary to programmatically distinguish between exception types or to 1244 * extract other information from the exception instance. 1245 * 1246 * <p>Exceptions from {@code Future.get} are treated as follows: 1247 * <ul> 1248 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1249 * {@code X} if the cause is a checked exception, an {@link 1250 * UncheckedExecutionException} if the cause is a {@code 1251 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1252 * {@code Error}. 1253 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1254 * restoring the interrupt). 1255 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1256 * <li>Any {@link CancellationException} is propagated untouched, as is any 1257 * other {@link RuntimeException} (though {@code get} implementations are 1258 * discouraged from throwing such exceptions). 1259 * </ul> 1260 * 1261 * <p>The overall principle is to continue to treat every checked exception as a 1262 * checked exception, every unchecked exception as an unchecked exception, and 1263 * every error as an error. In addition, the cause of any {@code 1264 * ExecutionException} is wrapped in order to ensure that the new stack trace 1265 * matches that of the current thread. 1266 * 1267 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1268 * public constructor that accepts zero or more arguments, all of type {@code 1269 * String} or {@code Throwable} (preferring constructors with at least one 1270 * {@code String}) and calling the constructor via reflection. If the 1271 * exception did not already have a cause, one is set by calling {@link 1272 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1273 * {@code IllegalArgumentException} is thrown. 1274 * 1275 * @throws X if {@code get} throws any checked exception except for an {@code 1276 * ExecutionException} whose cause is not itself a checked exception 1277 * @throws UncheckedExecutionException if {@code get} throws an {@code 1278 * ExecutionException} with a {@code RuntimeException} as its cause 1279 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1280 * with an {@code Error} as its cause 1281 * @throws CancellationException if {@code get} throws a {@code 1282 * CancellationException} 1283 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1284 * RuntimeException} or does not have a suitable constructor 1285 * @since 10.0 1286 */ 1287 public static <V, X extends Exception> V get( 1288 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 1289 throws X { 1290 checkNotNull(future); 1291 checkNotNull(unit); 1292 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1293 "Futures.get exception type (%s) must not be a RuntimeException", 1294 exceptionClass); 1295 try { 1296 return future.get(timeout, unit); 1297 } catch (InterruptedException e) { 1298 currentThread().interrupt(); 1299 throw newWithCause(exceptionClass, e); 1300 } catch (TimeoutException e) { 1301 throw newWithCause(exceptionClass, e); 1302 } catch (ExecutionException e) { 1303 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1304 throw new AssertionError(); 1305 } 1306 } 1307 1308 private static <X extends Exception> void wrapAndThrowExceptionOrError( 1309 Throwable cause, Class<X> exceptionClass) throws X { 1310 if (cause instanceof Error) { 1311 throw new ExecutionError((Error) cause); 1312 } 1313 if (cause instanceof RuntimeException) { 1314 throw new UncheckedExecutionException(cause); 1315 } 1316 throw newWithCause(exceptionClass, cause); 1317 } 1318 1319 /** 1320 * Returns the result of calling {@link Future#get()} uninterruptibly on a 1321 * task known not to throw a checked exception. This makes {@code Future} more 1322 * suitable for lightweight, fast-running tasks that, barring bugs in the 1323 * code, will not fail. This gives it exception-handling behavior similar to 1324 * that of {@code ForkJoinTask.join}. 1325 * 1326 * <p>Exceptions from {@code Future.get} are treated as follows: 1327 * <ul> 1328 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1329 * {@link UncheckedExecutionException} (if the cause is an {@code 1330 * Exception}) or {@link ExecutionError} (if the cause is an {@code 1331 * Error}). 1332 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 1333 * call. The interrupt is restored before {@code getUnchecked} returns. 1334 * <li>Any {@link CancellationException} is propagated untouched. So is any 1335 * other {@link RuntimeException} ({@code get} implementations are 1336 * discouraged from throwing such exceptions). 1337 * </ul> 1338 * 1339 * <p>The overall principle is to eliminate all checked exceptions: to loop to 1340 * avoid {@code InterruptedException}, to pass through {@code 1341 * CancellationException}, and to wrap any exception from the underlying 1342 * computation in an {@code UncheckedExecutionException} or {@code 1343 * ExecutionError}. 1344 * 1345 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 1346 * {@link Uninterruptibles#getUninterruptibly(Future)}. 1347 * 1348 * @throws UncheckedExecutionException if {@code get} throws an {@code 1349 * ExecutionException} with an {@code Exception} as its cause 1350 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1351 * with an {@code Error} as its cause 1352 * @throws CancellationException if {@code get} throws a {@code 1353 * CancellationException} 1354 * @since 10.0 1355 */ 1356 public static <V> V getUnchecked(Future<V> future) { 1357 checkNotNull(future); 1358 try { 1359 return getUninterruptibly(future); 1360 } catch (ExecutionException e) { 1361 wrapAndThrowUnchecked(e.getCause()); 1362 throw new AssertionError(); 1363 } 1364 } 1365 1366 private static void wrapAndThrowUnchecked(Throwable cause) { 1367 if (cause instanceof Error) { 1368 throw new ExecutionError((Error) cause); 1369 } 1370 /* 1371 * It's a non-Error, non-Exception Throwable. From my survey of such 1372 * classes, I believe that most users intended to extend Exception, so we'll 1373 * treat it like an Exception. 1374 */ 1375 throw new UncheckedExecutionException(cause); 1376 } 1377 1378 /* 1379 * TODO(user): FutureChecker interface for these to be static methods on? If 1380 * so, refer to it in the (static-method) Futures.get documentation 1381 */ 1382 1383 /* 1384 * Arguably we don't need a timed getUnchecked because any operation slow 1385 * enough to require a timeout is heavyweight enough to throw a checked 1386 * exception and therefore be inappropriate to use with getUnchecked. Further, 1387 * it's not clear that converting the checked TimeoutException to a 1388 * RuntimeException -- especially to an UncheckedExecutionException, since it 1389 * wasn't thrown by the computation -- makes sense, and if we don't convert 1390 * it, the user still has to write a try-catch block. 1391 * 1392 * If you think you would use this method, let us know. 1393 */ 1394 1395 private static <X extends Exception> X newWithCause( 1396 Class<X> exceptionClass, Throwable cause) { 1397 // getConstructors() guarantees this as long as we don't modify the array. 1398 @SuppressWarnings("unchecked") 1399 List<Constructor<X>> constructors = 1400 (List) Arrays.asList(exceptionClass.getConstructors()); 1401 for (Constructor<X> constructor : preferringStrings(constructors)) { 1402 @Nullable X instance = newFromConstructor(constructor, cause); 1403 if (instance != null) { 1404 if (instance.getCause() == null) { 1405 instance.initCause(cause); 1406 } 1407 return instance; 1408 } 1409 } 1410 throw new IllegalArgumentException( 1411 "No appropriate constructor for exception of type " + exceptionClass 1412 + " in response to chained exception", cause); 1413 } 1414 1415 private static <X extends Exception> List<Constructor<X>> 1416 preferringStrings(List<Constructor<X>> constructors) { 1417 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); 1418 } 1419 1420 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST = 1421 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() { 1422 @Override public Boolean apply(Constructor<?> input) { 1423 return asList(input.getParameterTypes()).contains(String.class); 1424 } 1425 }).reverse(); 1426 1427 @Nullable private static <X> X newFromConstructor( 1428 Constructor<X> constructor, Throwable cause) { 1429 Class<?>[] paramTypes = constructor.getParameterTypes(); 1430 Object[] params = new Object[paramTypes.length]; 1431 for (int i = 0; i < paramTypes.length; i++) { 1432 Class<?> paramType = paramTypes[i]; 1433 if (paramType.equals(String.class)) { 1434 params[i] = cause.toString(); 1435 } else if (paramType.equals(Throwable.class)) { 1436 params[i] = cause; 1437 } else { 1438 return null; 1439 } 1440 } 1441 try { 1442 return constructor.newInstance(params); 1443 } catch (IllegalArgumentException e) { 1444 return null; 1445 } catch (InstantiationException e) { 1446 return null; 1447 } catch (IllegalAccessException e) { 1448 return null; 1449 } catch (InvocationTargetException e) { 1450 return null; 1451 } 1452 } 1453 1454 private interface FutureCombiner<V, C> { 1455 C combine(List<Optional<V>> values); 1456 } 1457 1458 private static class CombinedFuture<V, C> extends AbstractFuture<C> { 1459 private static final Logger logger = 1460 Logger.getLogger(CombinedFuture.class.getName()); 1461 1462 ImmutableCollection<? extends ListenableFuture<? extends V>> futures; 1463 final boolean allMustSucceed; 1464 final AtomicInteger remaining; 1465 FutureCombiner<V, C> combiner; 1466 List<Optional<V>> values; 1467 1468 CombinedFuture( 1469 ImmutableCollection<? extends ListenableFuture<? extends V>> futures, 1470 boolean allMustSucceed, Executor listenerExecutor, 1471 FutureCombiner<V, C> combiner) { 1472 this.futures = futures; 1473 this.allMustSucceed = allMustSucceed; 1474 this.remaining = new AtomicInteger(futures.size()); 1475 this.combiner = combiner; 1476 this.values = Lists.newArrayListWithCapacity(futures.size()); 1477 init(listenerExecutor); 1478 } 1479 1480 /** 1481 * Must be called at the end of the constructor. 1482 */ 1483 protected void init(final Executor listenerExecutor) { 1484 // First, schedule cleanup to execute when the Future is done. 1485 addListener(new Runnable() { 1486 @Override 1487 public void run() { 1488 // Cancel all the component futures. 1489 if (CombinedFuture.this.isCancelled()) { 1490 for (ListenableFuture<?> future : CombinedFuture.this.futures) { 1491 future.cancel(CombinedFuture.this.wasInterrupted()); 1492 } 1493 } 1494 1495 // Let go of the memory held by other futures 1496 CombinedFuture.this.futures = null; 1497 1498 // By now the values array has either been set as the Future's value, 1499 // or (in case of failure) is no longer useful. 1500 CombinedFuture.this.values = null; 1501 1502 // The combiner may also hold state, so free that as well 1503 CombinedFuture.this.combiner = null; 1504 } 1505 }, MoreExecutors.sameThreadExecutor()); 1506 1507 // Now begin the "real" initialization. 1508 1509 // Corner case: List is empty. 1510 if (futures.isEmpty()) { 1511 set(combiner.combine(ImmutableList.<Optional<V>>of())); 1512 return; 1513 } 1514 1515 // Populate the results list with null initially. 1516 for (int i = 0; i < futures.size(); ++i) { 1517 values.add(null); 1518 } 1519 1520 // Register a listener on each Future in the list to update 1521 // the state of this future. 1522 // Note that if all the futures on the list are done prior to completing 1523 // this loop, the last call to addListener() will callback to 1524 // setOneValue(), transitively call our cleanup listener, and set 1525 // this.futures to null. 1526 // This is not actually a problem, since the foreach only needs 1527 // this.futures to be non-null at the beginning of the loop. 1528 int i = 0; 1529 for (final ListenableFuture<? extends V> listenable : futures) { 1530 final int index = i++; 1531 listenable.addListener(new Runnable() { 1532 @Override 1533 public void run() { 1534 setOneValue(index, listenable); 1535 } 1536 }, listenerExecutor); 1537 } 1538 } 1539 1540 /** 1541 * Fails this future with the given Throwable if {@link #allMustSucceed} is 1542 * true. Also, logs the throwable if it is an {@link Error} or if 1543 * {@link #allMustSucceed} is {@code true} and the throwable did not cause 1544 * this future to fail. 1545 */ 1546 private void setExceptionAndMaybeLog(Throwable throwable) { 1547 boolean result = false; 1548 if (allMustSucceed) { 1549 // As soon as the first one fails, throw the exception up. 1550 // The result of all other inputs is then ignored. 1551 result = super.setException(throwable); 1552 } 1553 if (throwable instanceof Error || (allMustSucceed && !result)) { 1554 logger.log(Level.SEVERE, "input future failed.", throwable); 1555 } 1556 } 1557 1558 /** 1559 * Sets the value at the given index to that of the given future. 1560 */ 1561 private void setOneValue(int index, Future<? extends V> future) { 1562 List<Optional<V>> localValues = values; 1563 // TODO(user): This check appears to be redundant since values is 1564 // assigned null only after the future completes. However, values 1565 // is not volatile so it may be possible for us to observe the changes 1566 // to these two values in a different order... which I think is why 1567 // we need to check both. Clear up this craziness either by making 1568 // values volatile or proving that it doesn't need to be for some other 1569 // reason. 1570 if (isDone() || localValues == null) { 1571 // Some other future failed or has been cancelled, causing this one to 1572 // also be cancelled or have an exception set. This should only happen 1573 // if allMustSucceed is true or if the output itself has been 1574 // cancelled. 1575 checkState(allMustSucceed || isCancelled(), 1576 "Future was done before all dependencies completed"); 1577 } 1578 1579 try { 1580 checkState(future.isDone(), 1581 "Tried to set value from future which is not done"); 1582 V returnValue = getUninterruptibly(future); 1583 if (localValues != null) { 1584 localValues.set(index, Optional.fromNullable(returnValue)); 1585 } 1586 } catch (CancellationException e) { 1587 if (allMustSucceed) { 1588 // Set ourselves as cancelled. Let the input futures keep running 1589 // as some of them may be used elsewhere. 1590 cancel(false); 1591 } 1592 } catch (ExecutionException e) { 1593 setExceptionAndMaybeLog(e.getCause()); 1594 } catch (Throwable t) { 1595 setExceptionAndMaybeLog(t); 1596 } finally { 1597 int newRemaining = remaining.decrementAndGet(); 1598 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 1599 if (newRemaining == 0) { 1600 FutureCombiner<V, C> localCombiner = combiner; 1601 if (localCombiner != null && localValues != null) { 1602 set(localCombiner.combine(localValues)); 1603 } else { 1604 checkState(isDone()); 1605 } 1606 } 1607 } 1608 } 1609 1610 } 1611 1612 /** Used for {@link #allAsList} and {@link #successfulAsList}. */ 1613 private static <V> ListenableFuture<List<V>> listFuture( 1614 ImmutableList<ListenableFuture<? extends V>> futures, 1615 boolean allMustSucceed, Executor listenerExecutor) { 1616 return new CombinedFuture<V, List<V>>( 1617 futures, allMustSucceed, listenerExecutor, 1618 new FutureCombiner<V, List<V>>() { 1619 @Override 1620 public List<V> combine(List<Optional<V>> values) { 1621 List<V> result = Lists.newArrayList(); 1622 for (Optional<V> element : values) { 1623 result.add(element != null ? element.orNull() : null); 1624 } 1625 return Collections.unmodifiableList(result); 1626 } 1627 }); 1628 } 1629 1630 /** 1631 * A checked future that uses a function to map from exceptions to the 1632 * appropriate checked type. 1633 */ 1634 private static class MappingCheckedFuture<V, X extends Exception> extends 1635 AbstractCheckedFuture<V, X> { 1636 1637 final Function<Exception, X> mapper; 1638 1639 MappingCheckedFuture(ListenableFuture<V> delegate, 1640 Function<Exception, X> mapper) { 1641 super(delegate); 1642 1643 this.mapper = checkNotNull(mapper); 1644 } 1645 1646 @Override 1647 protected X mapException(Exception e) { 1648 return mapper.apply(e); 1649 } 1650 } 1651}