001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.base.Preconditions.checkState; 022import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; 023import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 024import static java.lang.Thread.currentThread; 025import static java.util.Arrays.asList; 026 027import com.google.common.annotations.Beta; 028import com.google.common.base.Function; 029import com.google.common.base.Optional; 030import com.google.common.base.Preconditions; 031import com.google.common.collect.ImmutableCollection; 032import com.google.common.collect.ImmutableList; 033import com.google.common.collect.Lists; 034import com.google.common.collect.Ordering; 035import com.google.common.collect.Queues; 036import com.google.common.collect.Sets; 037 038import java.lang.reflect.Constructor; 039import java.lang.reflect.InvocationTargetException; 040import java.lang.reflect.UndeclaredThrowableException; 041import java.util.Arrays; 042import java.util.Collections; 043import java.util.List; 044import java.util.Set; 045import java.util.concurrent.CancellationException; 046import java.util.concurrent.ConcurrentLinkedQueue; 047import java.util.concurrent.CountDownLatch; 048import java.util.concurrent.ExecutionException; 049import java.util.concurrent.Executor; 050import java.util.concurrent.Future; 051import java.util.concurrent.TimeUnit; 052import java.util.concurrent.TimeoutException; 053import java.util.concurrent.atomic.AtomicInteger; 054import java.util.logging.Level; 055import java.util.logging.Logger; 056 057import javax.annotation.Nullable; 058 059/** 060 * Static utility methods pertaining to the {@link Future} interface. 061 * 062 * <p>Many of these methods use the {@link ListenableFuture} API; consult the 063 * Guava User Guide article on <a href= 064 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> 065 * {@code ListenableFuture}</a>. 066 * 067 * @author Kevin Bourrillion 068 * @author Nishant Thakkar 069 * @author Sven Mawson 070 * @since 1.0 071 */ 072@Beta 073public final class Futures { 074 private Futures() {} 075 076 /** 077 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 078 * and a {@link Function} that maps from {@link Exception} instances into the 079 * appropriate checked type. 080 * 081 * <p>The given mapping function will be applied to an 082 * {@link InterruptedException}, a {@link CancellationException}, or an 083 * {@link ExecutionException}. 084 * See {@link Future#get()} for details on the exceptions thrown. 085 * 086 * @since 9.0 (source-compatible since 1.0) 087 */ 088 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 089 ListenableFuture<V> future, Function<Exception, X> mapper) { 090 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 091 } 092 093 private abstract static class ImmediateFuture<V> 094 implements ListenableFuture<V> { 095 096 private static final Logger log = 097 Logger.getLogger(ImmediateFuture.class.getName()); 098 099 @Override 100 public void addListener(Runnable listener, Executor executor) { 101 checkNotNull(listener, "Runnable was null."); 102 checkNotNull(executor, "Executor was null."); 103 try { 104 executor.execute(listener); 105 } catch (RuntimeException e) { 106 // ListenableFuture's contract is that it will not throw unchecked 107 // exceptions, so log the bad runnable and/or executor and swallow it. 108 log.log(Level.SEVERE, "RuntimeException while executing runnable " 109 + listener + " with executor " + executor, e); 110 } 111 } 112 113 @Override 114 public boolean cancel(boolean mayInterruptIfRunning) { 115 return false; 116 } 117 118 @Override 119 public abstract V get() throws ExecutionException; 120 121 @Override 122 public V get(long timeout, TimeUnit unit) throws ExecutionException { 123 checkNotNull(unit); 124 return get(); 125 } 126 127 @Override 128 public boolean isCancelled() { 129 return false; 130 } 131 132 @Override 133 public boolean isDone() { 134 return true; 135 } 136 } 137 138 private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { 139 140 @Nullable private final V value; 141 142 ImmediateSuccessfulFuture(@Nullable V value) { 143 this.value = value; 144 } 145 146 @Override 147 public V get() { 148 return value; 149 } 150 } 151 152 private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception> 153 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 154 155 @Nullable private final V value; 156 157 ImmediateSuccessfulCheckedFuture(@Nullable V value) { 158 this.value = value; 159 } 160 161 @Override 162 public V get() { 163 return value; 164 } 165 166 @Override 167 public V checkedGet() { 168 return value; 169 } 170 171 @Override 172 public V checkedGet(long timeout, TimeUnit unit) { 173 checkNotNull(unit); 174 return value; 175 } 176 } 177 178 private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { 179 180 private final Throwable thrown; 181 182 ImmediateFailedFuture(Throwable thrown) { 183 this.thrown = thrown; 184 } 185 186 @Override 187 public V get() throws ExecutionException { 188 throw new ExecutionException(thrown); 189 } 190 } 191 192 private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> { 193 194 private final CancellationException thrown; 195 196 ImmediateCancelledFuture() { 197 this.thrown = new CancellationException("Immediate cancelled future."); 198 } 199 200 @Override 201 public boolean isCancelled() { 202 return true; 203 } 204 205 @Override 206 public V get() { 207 throw AbstractFuture.cancellationExceptionWithCause( 208 "Task was cancelled.", thrown); 209 } 210 } 211 212 private static class ImmediateFailedCheckedFuture<V, X extends Exception> 213 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 214 215 private final X thrown; 216 217 ImmediateFailedCheckedFuture(X thrown) { 218 this.thrown = thrown; 219 } 220 221 @Override 222 public V get() throws ExecutionException { 223 throw new ExecutionException(thrown); 224 } 225 226 @Override 227 public V checkedGet() throws X { 228 throw thrown; 229 } 230 231 @Override 232 public V checkedGet(long timeout, TimeUnit unit) throws X { 233 checkNotNull(unit); 234 throw thrown; 235 } 236 } 237 238 /** 239 * Creates a {@code ListenableFuture} which has its value set immediately upon 240 * construction. The getters just return the value. This {@code Future} can't 241 * be canceled or timed out and its {@code isDone()} method always returns 242 * {@code true}. 243 */ 244 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 245 return new ImmediateSuccessfulFuture<V>(value); 246 } 247 248 /** 249 * Returns a {@code CheckedFuture} which has its value set immediately upon 250 * construction. 251 * 252 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 253 * method always returns {@code true}. Calling {@code get()} or {@code 254 * checkedGet()} will immediately return the provided value. 255 */ 256 public static <V, X extends Exception> CheckedFuture<V, X> 257 immediateCheckedFuture(@Nullable V value) { 258 return new ImmediateSuccessfulCheckedFuture<V, X>(value); 259 } 260 261 /** 262 * Returns a {@code ListenableFuture} which has an exception set immediately 263 * upon construction. 264 * 265 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 266 * method always returns {@code true}. Calling {@code get()} will immediately 267 * throw the provided {@code Throwable} wrapped in an {@code 268 * ExecutionException}. 269 */ 270 public static <V> ListenableFuture<V> immediateFailedFuture( 271 Throwable throwable) { 272 checkNotNull(throwable); 273 return new ImmediateFailedFuture<V>(throwable); 274 } 275 276 /** 277 * Creates a {@code ListenableFuture} which is cancelled immediately upon 278 * construction, so that {@code isCancelled()} always returns {@code true}. 279 * 280 * @since 14.0 281 */ 282 public static <V> ListenableFuture<V> immediateCancelledFuture() { 283 return new ImmediateCancelledFuture<V>(); 284 } 285 286 /** 287 * Returns a {@code CheckedFuture} which has an exception set immediately upon 288 * construction. 289 * 290 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 291 * method always returns {@code true}. Calling {@code get()} will immediately 292 * throw the provided {@code Exception} wrapped in an {@code 293 * ExecutionException}, and calling {@code checkedGet()} will throw the 294 * provided exception itself. 295 */ 296 public static <V, X extends Exception> CheckedFuture<V, X> 297 immediateFailedCheckedFuture(X exception) { 298 checkNotNull(exception); 299 return new ImmediateFailedCheckedFuture<V, X>(exception); 300 } 301 302 /** 303 * Returns a {@code Future} whose result is taken from the given primary 304 * {@code input} or, if the primary input fails, from the {@code Future} 305 * provided by the {@code fallback}. {@link FutureFallback#create} is not 306 * invoked until the primary input has failed, so if the primary input 307 * succeeds, it is never invoked. If, during the invocation of {@code 308 * fallback}, an exception is thrown, this exception is used as the result of 309 * the output {@code Future}. 310 * 311 * <p>Below is an example of a fallback that returns a default value if an 312 * exception occurs: 313 * 314 * <pre> {@code 315 * ListenableFuture<Integer> fetchCounterFuture = ...; 316 * 317 * // Falling back to a zero counter in case an exception happens when 318 * // processing the RPC to fetch counters. 319 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 320 * fetchCounterFuture, new FutureFallback<Integer>() { 321 * public ListenableFuture<Integer> create(Throwable t) { 322 * // Returning "0" as the default for the counter when the 323 * // exception happens. 324 * return immediateFuture(0); 325 * } 326 * });}</pre> 327 * 328 * <p>The fallback can also choose to propagate the original exception when 329 * desired: 330 * 331 * <pre> {@code 332 * ListenableFuture<Integer> fetchCounterFuture = ...; 333 * 334 * // Falling back to a zero counter only in case the exception was a 335 * // TimeoutException. 336 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 337 * fetchCounterFuture, new FutureFallback<Integer>() { 338 * public ListenableFuture<Integer> create(Throwable t) { 339 * if (t instanceof TimeoutException) { 340 * return immediateFuture(0); 341 * } 342 * return immediateFailedFuture(t); 343 * } 344 * });}</pre> 345 * 346 * <p>Note: If the derived {@code Future} is slow or heavyweight to create 347 * (whether the {@code Future} itself is slow or heavyweight to complete is 348 * irrelevant), consider {@linkplain #withFallback(ListenableFuture, 349 * FutureFallback, Executor) supplying an executor}. If you do not supply an 350 * executor, {@code withFallback} will use {@link 351 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 352 * caveats for heavier operations. For example, the call to {@code 353 * fallback.create} may run on an unpredictable or undesirable thread: 354 * 355 * <ul> 356 * <li>If the input {@code Future} is done at the time {@code withFallback} 357 * is called, {@code withFallback} will call {@code fallback.create} inline. 358 * <li>If the input {@code Future} is not yet done, {@code withFallback} will 359 * schedule {@code fallback.create} to be run by the thread that completes 360 * the input {@code Future}, which may be an internal system thread such as 361 * an RPC network thread. 362 * </ul> 363 * 364 * <p>Also note that, regardless of which thread executes the {@code 365 * sameThreadExecutor} {@code fallback.create}, all other registered but 366 * unexecuted listeners are prevented from running during its execution, even 367 * if those listeners are to run in other executors. 368 * 369 * @param input the primary input {@code Future} 370 * @param fallback the {@link FutureFallback} implementation to be called if 371 * {@code input} fails 372 * @since 14.0 373 */ 374 public static <V> ListenableFuture<V> withFallback( 375 ListenableFuture<? extends V> input, 376 FutureFallback<? extends V> fallback) { 377 return withFallback(input, fallback, sameThreadExecutor()); 378 } 379 380 /** 381 * Returns a {@code Future} whose result is taken from the given primary 382 * {@code input} or, if the primary input fails, from the {@code Future} 383 * provided by the {@code fallback}. {@link FutureFallback#create} is not 384 * invoked until the primary input has failed, so if the primary input 385 * succeeds, it is never invoked. If, during the invocation of {@code 386 * fallback}, an exception is thrown, this exception is used as the result of 387 * the output {@code Future}. 388 * 389 * <p>Below is an example of a fallback that returns a default value if an 390 * exception occurs: 391 * 392 * <pre> {@code 393 * ListenableFuture<Integer> fetchCounterFuture = ...; 394 * 395 * // Falling back to a zero counter in case an exception happens when 396 * // processing the RPC to fetch counters. 397 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 398 * fetchCounterFuture, new FutureFallback<Integer>() { 399 * public ListenableFuture<Integer> create(Throwable t) { 400 * // Returning "0" as the default for the counter when the 401 * // exception happens. 402 * return immediateFuture(0); 403 * } 404 * }, sameThreadExecutor());}</pre> 405 * 406 * <p>The fallback can also choose to propagate the original exception when 407 * desired: 408 * 409 * <pre> {@code 410 * ListenableFuture<Integer> fetchCounterFuture = ...; 411 * 412 * // Falling back to a zero counter only in case the exception was a 413 * // TimeoutException. 414 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 415 * fetchCounterFuture, new FutureFallback<Integer>() { 416 * public ListenableFuture<Integer> create(Throwable t) { 417 * if (t instanceof TimeoutException) { 418 * return immediateFuture(0); 419 * } 420 * return immediateFailedFuture(t); 421 * } 422 * }, sameThreadExecutor());}</pre> 423 * 424 * <p>When the execution of {@code fallback.create} is fast and lightweight 425 * (though the {@code Future} it returns need not meet these criteria), 426 * consider {@linkplain #withFallback(ListenableFuture, FutureFallback) 427 * omitting the executor} or explicitly specifying {@code 428 * sameThreadExecutor}. However, be aware of the caveats documented in the 429 * link above. 430 * 431 * @param input the primary input {@code Future} 432 * @param fallback the {@link FutureFallback} implementation to be called if 433 * {@code input} fails 434 * @param executor the executor that runs {@code fallback} if {@code input} 435 * fails 436 * @since 14.0 437 */ 438 public static <V> ListenableFuture<V> withFallback( 439 ListenableFuture<? extends V> input, 440 FutureFallback<? extends V> fallback, Executor executor) { 441 checkNotNull(fallback); 442 return new FallbackFuture<V>(input, fallback, executor); 443 } 444 445 /** 446 * A future that falls back on a second, generated future, in case its 447 * original future fails. 448 */ 449 private static class FallbackFuture<V> extends AbstractFuture<V> { 450 451 private volatile ListenableFuture<? extends V> running; 452 453 FallbackFuture(ListenableFuture<? extends V> input, 454 final FutureFallback<? extends V> fallback, 455 final Executor executor) { 456 running = input; 457 addCallback(running, new FutureCallback<V>() { 458 @Override 459 public void onSuccess(V value) { 460 set(value); 461 } 462 463 @Override 464 public void onFailure(Throwable t) { 465 if (isCancelled()) { 466 return; 467 } 468 try { 469 running = fallback.create(t); 470 if (isCancelled()) { // in case cancel called in the meantime 471 running.cancel(wasInterrupted()); 472 return; 473 } 474 addCallback(running, new FutureCallback<V>() { 475 @Override 476 public void onSuccess(V value) { 477 set(value); 478 } 479 480 @Override 481 public void onFailure(Throwable t) { 482 if (running.isCancelled()) { 483 cancel(false); 484 } else { 485 setException(t); 486 } 487 } 488 }, sameThreadExecutor()); 489 } catch (Throwable e) { 490 setException(e); 491 } 492 } 493 }, executor); 494 } 495 496 @Override 497 public boolean cancel(boolean mayInterruptIfRunning) { 498 if (super.cancel(mayInterruptIfRunning)) { 499 running.cancel(mayInterruptIfRunning); 500 return true; 501 } 502 return false; 503 } 504 } 505 506 /** 507 * Returns a new {@code ListenableFuture} whose result is asynchronously 508 * derived from the result of the given {@code Future}. More precisely, the 509 * returned {@code Future} takes its result from a {@code Future} produced by 510 * applying the given {@code AsyncFunction} to the result of the original 511 * {@code Future}. Example: 512 * 513 * <pre> {@code 514 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 515 * AsyncFunction<RowKey, QueryResult> queryFunction = 516 * new AsyncFunction<RowKey, QueryResult>() { 517 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 518 * return dataService.read(rowKey); 519 * } 520 * }; 521 * ListenableFuture<QueryResult> queryFuture = 522 * transform(rowKeyFuture, queryFunction);}</pre> 523 * 524 * <p>Note: If the derived {@code Future} is slow or heavyweight to create 525 * (whether the {@code Future} itself is slow or heavyweight to complete is 526 * irrelevant), consider {@linkplain #transform(ListenableFuture, 527 * AsyncFunction, Executor) supplying an executor}. If you do not supply an 528 * executor, {@code transform} will use {@link 529 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 530 * caveats for heavier operations. For example, the call to {@code 531 * function.apply} may run on an unpredictable or undesirable thread: 532 * 533 * <ul> 534 * <li>If the input {@code Future} is done at the time {@code transform} is 535 * called, {@code transform} will call {@code function.apply} inline. 536 * <li>If the input {@code Future} is not yet done, {@code transform} will 537 * schedule {@code function.apply} to be run by the thread that completes the 538 * input {@code Future}, which may be an internal system thread such as an 539 * RPC network thread. 540 * </ul> 541 * 542 * <p>Also note that, regardless of which thread executes the {@code 543 * sameThreadExecutor} {@code function.apply}, all other registered but 544 * unexecuted listeners are prevented from running during its execution, even 545 * if those listeners are to run in other executors. 546 * 547 * <p>The returned {@code Future} attempts to keep its cancellation state in 548 * sync with that of the input future and that of the future returned by the 549 * function. That is, if the returned {@code Future} is cancelled, it will 550 * attempt to cancel the other two, and if either of the other two is 551 * cancelled, the returned {@code Future} will receive a callback in which it 552 * will attempt to cancel itself. 553 * 554 * @param input The future to transform 555 * @param function A function to transform the result of the input future 556 * to the result of the output future 557 * @return A future that holds result of the function (if the input succeeded) 558 * or the original input's failure (if not) 559 * @since 11.0 560 */ 561 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 562 AsyncFunction<? super I, ? extends O> function) { 563 return transform(input, function, MoreExecutors.sameThreadExecutor()); 564 } 565 566 /** 567 * Returns a new {@code ListenableFuture} whose result is asynchronously 568 * derived from the result of the given {@code Future}. More precisely, the 569 * returned {@code Future} takes its result from a {@code Future} produced by 570 * applying the given {@code AsyncFunction} to the result of the original 571 * {@code Future}. Example: 572 * 573 * <pre> {@code 574 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 575 * AsyncFunction<RowKey, QueryResult> queryFunction = 576 * new AsyncFunction<RowKey, QueryResult>() { 577 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 578 * return dataService.read(rowKey); 579 * } 580 * }; 581 * ListenableFuture<QueryResult> queryFuture = 582 * transform(rowKeyFuture, queryFunction, executor);}</pre> 583 * 584 * <p>The returned {@code Future} attempts to keep its cancellation state in 585 * sync with that of the input future and that of the future returned by the 586 * chain function. That is, if the returned {@code Future} is cancelled, it 587 * will attempt to cancel the other two, and if either of the other two is 588 * cancelled, the returned {@code Future} will receive a callback in which it 589 * will attempt to cancel itself. 590 * 591 * <p>When the execution of {@code function.apply} is fast and lightweight 592 * (though the {@code Future} it returns need not meet these criteria), 593 * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting 594 * the executor} or explicitly specifying {@code sameThreadExecutor}. 595 * However, be aware of the caveats documented in the link above. 596 * 597 * @param input The future to transform 598 * @param function A function to transform the result of the input future 599 * to the result of the output future 600 * @param executor Executor to run the function in. 601 * @return A future that holds result of the function (if the input succeeded) 602 * or the original input's failure (if not) 603 * @since 11.0 604 */ 605 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 606 AsyncFunction<? super I, ? extends O> function, 607 Executor executor) { 608 ChainingListenableFuture<I, O> output = 609 new ChainingListenableFuture<I, O>(function, input); 610 input.addListener(output, executor); 611 return output; 612 } 613 614 /** 615 * Returns a new {@code ListenableFuture} whose result is the product of 616 * applying the given {@code Function} to the result of the given {@code 617 * Future}. Example: 618 * 619 * <pre> {@code 620 * ListenableFuture<QueryResult> queryFuture = ...; 621 * Function<QueryResult, List<Row>> rowsFunction = 622 * new Function<QueryResult, List<Row>>() { 623 * public List<Row> apply(QueryResult queryResult) { 624 * return queryResult.getRows(); 625 * } 626 * }; 627 * ListenableFuture<List<Row>> rowsFuture = 628 * transform(queryFuture, rowsFunction);}</pre> 629 * 630 * <p>Note: If the transformation is slow or heavyweight, consider {@linkplain 631 * #transform(ListenableFuture, Function, Executor) supplying an executor}. 632 * If you do not supply an executor, {@code transform} will use {@link 633 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 634 * caveats for heavier operations. For example, the call to {@code 635 * function.apply} may run on an unpredictable or undesirable thread: 636 * 637 * <ul> 638 * <li>If the input {@code Future} is done at the time {@code transform} is 639 * called, {@code transform} will call {@code function.apply} inline. 640 * <li>If the input {@code Future} is not yet done, {@code transform} will 641 * schedule {@code function.apply} to be run by the thread that completes the 642 * input {@code Future}, which may be an internal system thread such as an 643 * RPC network thread. 644 * </ul> 645 * 646 * <p>Also note that, regardless of which thread executes the {@code 647 * sameThreadExecutor} {@code function.apply}, all other registered but 648 * unexecuted listeners are prevented from running during its execution, even 649 * if those listeners are to run in other executors. 650 * 651 * <p>The returned {@code Future} attempts to keep its cancellation state in 652 * sync with that of the input future. That is, if the returned {@code Future} 653 * is cancelled, it will attempt to cancel the input, and if the input is 654 * cancelled, the returned {@code Future} will receive a callback in which it 655 * will attempt to cancel itself. 656 * 657 * <p>An example use of this method is to convert a serializable object 658 * returned from an RPC into a POJO. 659 * 660 * @param input The future to transform 661 * @param function A Function to transform the results of the provided future 662 * to the results of the returned future. This will be run in the thread 663 * that notifies input it is complete. 664 * @return A future that holds result of the transformation. 665 * @since 9.0 (in 1.0 as {@code compose}) 666 */ 667 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 668 final Function<? super I, ? extends O> function) { 669 return transform(input, function, MoreExecutors.sameThreadExecutor()); 670 } 671 672 /** 673 * Returns a new {@code ListenableFuture} whose result is the product of 674 * applying the given {@code Function} to the result of the given {@code 675 * Future}. Example: 676 * 677 * <pre> {@code 678 * ListenableFuture<QueryResult> queryFuture = ...; 679 * Function<QueryResult, List<Row>> rowsFunction = 680 * new Function<QueryResult, List<Row>>() { 681 * public List<Row> apply(QueryResult queryResult) { 682 * return queryResult.getRows(); 683 * } 684 * }; 685 * ListenableFuture<List<Row>> rowsFuture = 686 * transform(queryFuture, rowsFunction, executor);}</pre> 687 * 688 * <p>The returned {@code Future} attempts to keep its cancellation state in 689 * sync with that of the input future. That is, if the returned {@code Future} 690 * is cancelled, it will attempt to cancel the input, and if the input is 691 * cancelled, the returned {@code Future} will receive a callback in which it 692 * will attempt to cancel itself. 693 * 694 * <p>An example use of this method is to convert a serializable object 695 * returned from an RPC into a POJO. 696 * 697 * <p>When the transformation is fast and lightweight, consider {@linkplain 698 * #transform(ListenableFuture, Function) omitting the executor} or 699 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 700 * caveats documented in the link above. 701 * 702 * @param input The future to transform 703 * @param function A Function to transform the results of the provided future 704 * to the results of the returned future. 705 * @param executor Executor to run the function in. 706 * @return A future that holds result of the transformation. 707 * @since 9.0 (in 2.0 as {@code compose}) 708 */ 709 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 710 final Function<? super I, ? extends O> function, Executor executor) { 711 checkNotNull(function); 712 AsyncFunction<I, O> wrapperFunction 713 = new AsyncFunction<I, O>() { 714 @Override public ListenableFuture<O> apply(I input) { 715 O output = function.apply(input); 716 return immediateFuture(output); 717 } 718 }; 719 return transform(input, wrapperFunction, executor); 720 } 721 722 /** 723 * Like {@link #transform(ListenableFuture, Function)} except that the 724 * transformation {@code function} is invoked on each call to 725 * {@link Future#get() get()} on the returned future. 726 * 727 * <p>The returned {@code Future} reflects the input's cancellation 728 * state directly, and any attempt to cancel the returned Future is likewise 729 * passed through to the input Future. 730 * 731 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 732 * only apply the timeout to the execution of the underlying {@code Future}, 733 * <em>not</em> to the execution of the transformation function. 734 * 735 * <p>The primary audience of this method is callers of {@code transform} 736 * who don't have a {@code ListenableFuture} available and 737 * do not mind repeated, lazy function evaluation. 738 * 739 * @param input The future to transform 740 * @param function A Function to transform the results of the provided future 741 * to the results of the returned future. 742 * @return A future that returns the result of the transformation. 743 * @since 10.0 744 */ 745 public static <I, O> Future<O> lazyTransform(final Future<I> input, 746 final Function<? super I, ? extends O> function) { 747 checkNotNull(input); 748 checkNotNull(function); 749 return new Future<O>() { 750 751 @Override 752 public boolean cancel(boolean mayInterruptIfRunning) { 753 return input.cancel(mayInterruptIfRunning); 754 } 755 756 @Override 757 public boolean isCancelled() { 758 return input.isCancelled(); 759 } 760 761 @Override 762 public boolean isDone() { 763 return input.isDone(); 764 } 765 766 @Override 767 public O get() throws InterruptedException, ExecutionException { 768 return applyTransformation(input.get()); 769 } 770 771 @Override 772 public O get(long timeout, TimeUnit unit) 773 throws InterruptedException, ExecutionException, TimeoutException { 774 return applyTransformation(input.get(timeout, unit)); 775 } 776 777 private O applyTransformation(I input) throws ExecutionException { 778 try { 779 return function.apply(input); 780 } catch (Throwable t) { 781 throw new ExecutionException(t); 782 } 783 } 784 }; 785 } 786 787 /** 788 * An implementation of {@code ListenableFuture} that also implements 789 * {@code Runnable} so that it can be used to nest ListenableFutures. 790 * Once the passed-in {@code ListenableFuture} is complete, it calls the 791 * passed-in {@code Function} to generate the result. 792 * 793 * <p>For historical reasons, this class has a special case in its exception 794 * handling: If the given {@code AsyncFunction} throws an {@code 795 * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it 796 * and uses its <i>cause</i> as the output future's exception, rather than 797 * using the {@code UndeclaredThrowableException} itself as it would for other 798 * exception types. The reason for this is that {@code Futures.transform} used 799 * to require a {@code Function}, whose {@code apply} method is not allowed to 800 * throw checked exceptions. Nowadays, {@code Futures.transform} has an 801 * overload that accepts an {@code AsyncFunction}, whose {@code apply} method 802 * <i>is</i> allowed to throw checked exception. Users who wish to throw 803 * checked exceptions should use that overload instead, and <a 804 * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we 805 * should remove the {@code UndeclaredThrowableException} special case</a>. 806 */ 807 private static class ChainingListenableFuture<I, O> 808 extends AbstractFuture<O> implements Runnable { 809 810 private AsyncFunction<? super I, ? extends O> function; 811 private ListenableFuture<? extends I> inputFuture; 812 private volatile ListenableFuture<? extends O> outputFuture; 813 private final CountDownLatch outputCreated = new CountDownLatch(1); 814 815 private ChainingListenableFuture( 816 AsyncFunction<? super I, ? extends O> function, 817 ListenableFuture<? extends I> inputFuture) { 818 this.function = checkNotNull(function); 819 this.inputFuture = checkNotNull(inputFuture); 820 } 821 822 @Override 823 public boolean cancel(boolean mayInterruptIfRunning) { 824 /* 825 * Our additional cancellation work needs to occur even if 826 * !mayInterruptIfRunning, so we can't move it into interruptTask(). 827 */ 828 if (super.cancel(mayInterruptIfRunning)) { 829 // This should never block since only one thread is allowed to cancel 830 // this Future. 831 cancel(inputFuture, mayInterruptIfRunning); 832 cancel(outputFuture, mayInterruptIfRunning); 833 return true; 834 } 835 return false; 836 } 837 838 private void cancel(@Nullable Future<?> future, 839 boolean mayInterruptIfRunning) { 840 if (future != null) { 841 future.cancel(mayInterruptIfRunning); 842 } 843 } 844 845 @Override 846 public void run() { 847 try { 848 I sourceResult; 849 try { 850 sourceResult = getUninterruptibly(inputFuture); 851 } catch (CancellationException e) { 852 // Cancel this future and return. 853 // At this point, inputFuture is cancelled and outputFuture doesn't 854 // exist, so the value of mayInterruptIfRunning is irrelevant. 855 cancel(false); 856 return; 857 } catch (ExecutionException e) { 858 // Set the cause of the exception as this future's exception 859 setException(e.getCause()); 860 return; 861 } 862 863 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 864 Preconditions.checkNotNull(function.apply(sourceResult), 865 "AsyncFunction may not return null."); 866 if (isCancelled()) { 867 outputFuture.cancel(wasInterrupted()); 868 this.outputFuture = null; 869 return; 870 } 871 outputFuture.addListener(new Runnable() { 872 @Override 873 public void run() { 874 try { 875 set(getUninterruptibly(outputFuture)); 876 } catch (CancellationException e) { 877 // Cancel this future and return. 878 // At this point, inputFuture and outputFuture are done, so the 879 // value of mayInterruptIfRunning is irrelevant. 880 cancel(false); 881 return; 882 } catch (ExecutionException e) { 883 // Set the cause of the exception as this future's exception 884 setException(e.getCause()); 885 } finally { 886 // Don't pin inputs beyond completion 887 ChainingListenableFuture.this.outputFuture = null; 888 } 889 } 890 }, MoreExecutors.sameThreadExecutor()); 891 } catch (UndeclaredThrowableException e) { 892 // Set the cause of the exception as this future's exception 893 setException(e.getCause()); 894 } catch (Throwable t) { 895 // This exception is irrelevant in this thread, but useful for the 896 // client 897 setException(t); 898 } finally { 899 // Don't pin inputs beyond completion 900 function = null; 901 inputFuture = null; 902 // Allow our get routines to examine outputFuture now. 903 outputCreated.countDown(); 904 } 905 } 906 } 907 908 /** 909 * Returns a new {@code ListenableFuture} whose result is the product of 910 * calling {@code get()} on the {@code Future} nested within the given {@code 911 * Future}, effectively chaining the futures one after the other. Example: 912 * 913 * <pre> {@code 914 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 915 * ListenableFuture<String> dereferenced = dereference(nested);}</pre> 916 * 917 * <p>This call has the same cancellation and execution semantics as {@link 918 * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code 919 * Future} attempts to keep its cancellation state in sync with both the 920 * input {@code Future} and the nested {@code Future}. The transformation 921 * is very lightweight and therefore takes place in the same thread (either 922 * the thread that called {@code dereference}, or the thread in which the 923 * dereferenced future completes). 924 * 925 * @param nested The nested future to transform. 926 * @return A future that holds result of the inner future. 927 * @since 13.0 928 */ 929 @SuppressWarnings({"rawtypes", "unchecked"}) 930 public static <V> ListenableFuture<V> dereference( 931 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 932 return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); 933 } 934 935 /** 936 * Helper {@code Function} for {@link #dereference}. 937 */ 938 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 939 new AsyncFunction<ListenableFuture<Object>, Object>() { 940 @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 941 return input; 942 } 943 }; 944 945 /** 946 * Creates a new {@code ListenableFuture} whose value is a list containing the 947 * values of all its input futures, if all succeed. If any input fails, the 948 * returned future fails. 949 * 950 * <p>The list of results is in the same order as the input list. 951 * 952 * <p>Canceling this future will attempt to cancel all the component futures, 953 * and if any of the provided futures fails or is canceled, this one is, 954 * too. 955 * 956 * @param futures futures to combine 957 * @return a future that provides a list of the results of the component 958 * futures 959 * @since 10.0 960 */ 961 @Beta 962 public static <V> ListenableFuture<List<V>> allAsList( 963 ListenableFuture<? extends V>... futures) { 964 return listFuture(ImmutableList.copyOf(futures), true, 965 MoreExecutors.sameThreadExecutor()); 966 } 967 968 /** 969 * Creates a new {@code ListenableFuture} whose value is a list containing the 970 * values of all its input futures, if all succeed. If any input fails, the 971 * returned future fails. 972 * 973 * <p>The list of results is in the same order as the input list. 974 * 975 * <p>Canceling this future will attempt to cancel all the component futures, 976 * and if any of the provided futures fails or is canceled, this one is, 977 * too. 978 * 979 * @param futures futures to combine 980 * @return a future that provides a list of the results of the component 981 * futures 982 * @since 10.0 983 */ 984 @Beta 985 public static <V> ListenableFuture<List<V>> allAsList( 986 Iterable<? extends ListenableFuture<? extends V>> futures) { 987 return listFuture(ImmutableList.copyOf(futures), true, 988 MoreExecutors.sameThreadExecutor()); 989 } 990 991 /** 992 * Creates a new {@code ListenableFuture} whose result is set from the 993 * supplied future when it completes. Cancelling the supplied future 994 * will also cancel the returned future, but cancelling the returned 995 * future will have no effect on the supplied future. 996 * 997 * @since 15.0 998 */ 999 public static <V> ListenableFuture<V> nonCancellationPropagating( 1000 ListenableFuture<V> future) { 1001 return new NonCancellationPropagatingFuture<V>(future); 1002 } 1003 1004 /** 1005 * A wrapped future that does not propagate cancellation to its delegate. 1006 */ 1007 private static class NonCancellationPropagatingFuture<V> 1008 extends AbstractFuture<V> { 1009 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 1010 checkNotNull(delegate); 1011 addCallback(delegate, new FutureCallback<V>() { 1012 @Override 1013 public void onSuccess(V result) { 1014 set(result); 1015 } 1016 1017 @Override 1018 public void onFailure(Throwable t) { 1019 if (delegate.isCancelled()) { 1020 cancel(false); 1021 } else { 1022 setException(t); 1023 } 1024 } 1025 }, sameThreadExecutor()); 1026 } 1027 } 1028 1029 /** 1030 * Creates a new {@code ListenableFuture} whose value is a list containing the 1031 * values of all its successful input futures. The list of results is in the 1032 * same order as the input list, and if any of the provided futures fails or 1033 * is canceled, its corresponding position will contain {@code null} (which is 1034 * indistinguishable from the future having a successful value of 1035 * {@code null}). 1036 * 1037 * <p>Canceling this future will attempt to cancel all the component futures. 1038 * 1039 * @param futures futures to combine 1040 * @return a future that provides a list of the results of the component 1041 * futures 1042 * @since 10.0 1043 */ 1044 @Beta 1045 public static <V> ListenableFuture<List<V>> successfulAsList( 1046 ListenableFuture<? extends V>... futures) { 1047 return listFuture(ImmutableList.copyOf(futures), false, 1048 MoreExecutors.sameThreadExecutor()); 1049 } 1050 1051 /** 1052 * Creates a new {@code ListenableFuture} whose value is a list containing the 1053 * values of all its successful input futures. The list of results is in the 1054 * same order as the input list, and if any of the provided futures fails or 1055 * is canceled, its corresponding position will contain {@code null} (which is 1056 * indistinguishable from the future having a successful value of 1057 * {@code null}). 1058 * 1059 * <p>Canceling this future will attempt to cancel all the component futures. 1060 * 1061 * @param futures futures to combine 1062 * @return a future that provides a list of the results of the component 1063 * futures 1064 * @since 10.0 1065 */ 1066 @Beta 1067 public static <V> ListenableFuture<List<V>> successfulAsList( 1068 Iterable<? extends ListenableFuture<? extends V>> futures) { 1069 return listFuture(ImmutableList.copyOf(futures), false, 1070 MoreExecutors.sameThreadExecutor()); 1071 } 1072 1073 /** 1074 * Returns a list of delegate futures that correspond to the futures received in the order 1075 * that they complete. Delegate futures return the same value or throw the same exception 1076 * as the corresponding input future returns/throws. 1077 * 1078 * <p>Cancelling a delegate future has no effect on any input future, since the delegate future 1079 * does not correspond to a specific input future until the appropriate number of input 1080 * futures have completed. At that point, it is too late to cancel the input future. 1081 * The input future's result, which cannot be stored into the cancelled delegate future, 1082 * is ignored. 1083 * 1084 * @since 17.0 1085 */ 1086 @Beta 1087 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder( 1088 Iterable<? extends ListenableFuture<? extends T>> futures) { 1089 // A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an 1090 // ArrayDeque 1091 final ConcurrentLinkedQueue<AsyncSettableFuture<T>> delegates = 1092 Queues.newConcurrentLinkedQueue(); 1093 ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder(); 1094 // Using SerializingExecutor here will ensure that each CompletionOrderListener executes 1095 // atomically and therefore that each returned future is guaranteed to be in completion order. 1096 // N.B. there are some cases where the use of this executor could have possibly surprising 1097 // effects when input futures finish at approximately the same time _and_ the output futures 1098 // have sameThreadExecutor listeners. In this situation, the listeners may end up running on a 1099 // different thread than if they were attached to the corresponding input future. We believe 1100 // this to be a negligible cost since: 1101 // 1. Using the sameThreadExecutor implies that your callback is safe to run on any thread. 1102 // 2. This would likely only be noticeable if you were doing something expensive or blocking on 1103 // a sameThreadExecutor listener on one of the output futures which is an antipattern anyway. 1104 SerializingExecutor executor = new SerializingExecutor(MoreExecutors.sameThreadExecutor()); 1105 for (final ListenableFuture<? extends T> future : futures) { 1106 AsyncSettableFuture<T> delegate = AsyncSettableFuture.create(); 1107 // Must make sure to add the delegate to the queue first in case the future is already done 1108 delegates.add(delegate); 1109 future.addListener(new Runnable() { 1110 @Override public void run() { 1111 delegates.remove().setFuture(future); 1112 } 1113 }, executor); 1114 listBuilder.add(delegate); 1115 } 1116 return listBuilder.build(); 1117 } 1118 1119 /** 1120 * Registers separate success and failure callbacks to be run when the {@code 1121 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1122 * complete} or, if the computation is already complete, immediately. 1123 * 1124 * <p>There is no guaranteed ordering of execution of callbacks, but any 1125 * callback added through this method is guaranteed to be called once the 1126 * computation is complete. 1127 * 1128 * Example: <pre> {@code 1129 * ListenableFuture<QueryResult> future = ...; 1130 * addCallback(future, 1131 * new FutureCallback<QueryResult> { 1132 * public void onSuccess(QueryResult result) { 1133 * storeInCache(result); 1134 * } 1135 * public void onFailure(Throwable t) { 1136 * reportError(t); 1137 * } 1138 * });}</pre> 1139 * 1140 * <p>Note: If the callback is slow or heavyweight, consider {@linkplain 1141 * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an 1142 * executor}. If you do not supply an executor, {@code addCallback} will use 1143 * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries 1144 * some caveats for heavier operations. For example, the callback may run on 1145 * an unpredictable or undesirable thread: 1146 * 1147 * <ul> 1148 * <li>If the input {@code Future} is done at the time {@code addCallback} is 1149 * called, {@code addCallback} will execute the callback inline. 1150 * <li>If the input {@code Future} is not yet done, {@code addCallback} will 1151 * schedule the callback to be run by the thread that completes the input 1152 * {@code Future}, which may be an internal system thread such as an RPC 1153 * network thread. 1154 * </ul> 1155 * 1156 * <p>Also note that, regardless of which thread executes the {@code 1157 * sameThreadExecutor} callback, all other registered but unexecuted listeners 1158 * are prevented from running during its execution, even if those listeners 1159 * are to run in other executors. 1160 * 1161 * <p>For a more general interface to attach a completion listener to a 1162 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1163 * 1164 * @param future The future attach the callback to. 1165 * @param callback The callback to invoke when {@code future} is completed. 1166 * @since 10.0 1167 */ 1168 public static <V> void addCallback(ListenableFuture<V> future, 1169 FutureCallback<? super V> callback) { 1170 addCallback(future, callback, MoreExecutors.sameThreadExecutor()); 1171 } 1172 1173 /** 1174 * Registers separate success and failure callbacks to be run when the {@code 1175 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1176 * complete} or, if the computation is already complete, immediately. 1177 * 1178 * <p>The callback is run in {@code executor}. 1179 * There is no guaranteed ordering of execution of callbacks, but any 1180 * callback added through this method is guaranteed to be called once the 1181 * computation is complete. 1182 * 1183 * Example: <pre> {@code 1184 * ListenableFuture<QueryResult> future = ...; 1185 * Executor e = ... 1186 * addCallback(future, 1187 * new FutureCallback<QueryResult> { 1188 * public void onSuccess(QueryResult result) { 1189 * storeInCache(result); 1190 * } 1191 * public void onFailure(Throwable t) { 1192 * reportError(t); 1193 * } 1194 * }, e);}</pre> 1195 * 1196 * <p>When the callback is fast and lightweight, consider {@linkplain 1197 * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or 1198 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 1199 * caveats documented in the link above. 1200 * 1201 * <p>For a more general interface to attach a completion listener to a 1202 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1203 * 1204 * @param future The future attach the callback to. 1205 * @param callback The callback to invoke when {@code future} is completed. 1206 * @param executor The executor to run {@code callback} when the future 1207 * completes. 1208 * @since 10.0 1209 */ 1210 public static <V> void addCallback(final ListenableFuture<V> future, 1211 final FutureCallback<? super V> callback, Executor executor) { 1212 Preconditions.checkNotNull(callback); 1213 Runnable callbackListener = new Runnable() { 1214 @Override 1215 public void run() { 1216 final V value; 1217 try { 1218 // TODO(user): (Before Guava release), validate that this 1219 // is the thing for IE. 1220 value = getUninterruptibly(future); 1221 } catch (ExecutionException e) { 1222 callback.onFailure(e.getCause()); 1223 return; 1224 } catch (RuntimeException e) { 1225 callback.onFailure(e); 1226 return; 1227 } catch (Error e) { 1228 callback.onFailure(e); 1229 return; 1230 } 1231 callback.onSuccess(value); 1232 } 1233 }; 1234 future.addListener(callbackListener, executor); 1235 } 1236 1237 /** 1238 * Returns the result of {@link Future#get()}, converting most exceptions to a 1239 * new instance of the given checked exception type. This reduces boilerplate 1240 * for a common use of {@code Future} in which it is unnecessary to 1241 * programmatically distinguish between exception types or to extract other 1242 * information from the exception instance. 1243 * 1244 * <p>Exceptions from {@code Future.get} are treated as follows: 1245 * <ul> 1246 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1247 * {@code X} if the cause is a checked exception, an {@link 1248 * UncheckedExecutionException} if the cause is a {@code 1249 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1250 * {@code Error}. 1251 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1252 * restoring the interrupt). 1253 * <li>Any {@link CancellationException} is propagated untouched, as is any 1254 * other {@link RuntimeException} (though {@code get} implementations are 1255 * discouraged from throwing such exceptions). 1256 * </ul> 1257 * 1258 * <p>The overall principle is to continue to treat every checked exception as a 1259 * checked exception, every unchecked exception as an unchecked exception, and 1260 * every error as an error. In addition, the cause of any {@code 1261 * ExecutionException} is wrapped in order to ensure that the new stack trace 1262 * matches that of the current thread. 1263 * 1264 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1265 * public constructor that accepts zero or more arguments, all of type {@code 1266 * String} or {@code Throwable} (preferring constructors with at least one 1267 * {@code String}) and calling the constructor via reflection. If the 1268 * exception did not already have a cause, one is set by calling {@link 1269 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1270 * {@code IllegalArgumentException} is thrown. 1271 * 1272 * @throws X if {@code get} throws any checked exception except for an {@code 1273 * ExecutionException} whose cause is not itself a checked exception 1274 * @throws UncheckedExecutionException if {@code get} throws an {@code 1275 * ExecutionException} with a {@code RuntimeException} as its cause 1276 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1277 * with an {@code Error} as its cause 1278 * @throws CancellationException if {@code get} throws a {@code 1279 * CancellationException} 1280 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1281 * RuntimeException} or does not have a suitable constructor 1282 * @since 10.0 1283 */ 1284 public static <V, X extends Exception> V get( 1285 Future<V> future, Class<X> exceptionClass) throws X { 1286 checkNotNull(future); 1287 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1288 "Futures.get exception type (%s) must not be a RuntimeException", 1289 exceptionClass); 1290 try { 1291 return future.get(); 1292 } catch (InterruptedException e) { 1293 currentThread().interrupt(); 1294 throw newWithCause(exceptionClass, e); 1295 } catch (ExecutionException e) { 1296 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1297 throw new AssertionError(); 1298 } 1299 } 1300 1301 /** 1302 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1303 * exceptions to a new instance of the given checked exception type. This 1304 * reduces boilerplate for a common use of {@code Future} in which it is 1305 * unnecessary to programmatically distinguish between exception types or to 1306 * extract other information from the exception instance. 1307 * 1308 * <p>Exceptions from {@code Future.get} are treated as follows: 1309 * <ul> 1310 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1311 * {@code X} if the cause is a checked exception, an {@link 1312 * UncheckedExecutionException} if the cause is a {@code 1313 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1314 * {@code Error}. 1315 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1316 * restoring the interrupt). 1317 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1318 * <li>Any {@link CancellationException} is propagated untouched, as is any 1319 * other {@link RuntimeException} (though {@code get} implementations are 1320 * discouraged from throwing such exceptions). 1321 * </ul> 1322 * 1323 * <p>The overall principle is to continue to treat every checked exception as a 1324 * checked exception, every unchecked exception as an unchecked exception, and 1325 * every error as an error. In addition, the cause of any {@code 1326 * ExecutionException} is wrapped in order to ensure that the new stack trace 1327 * matches that of the current thread. 1328 * 1329 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1330 * public constructor that accepts zero or more arguments, all of type {@code 1331 * String} or {@code Throwable} (preferring constructors with at least one 1332 * {@code String}) and calling the constructor via reflection. If the 1333 * exception did not already have a cause, one is set by calling {@link 1334 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1335 * {@code IllegalArgumentException} is thrown. 1336 * 1337 * @throws X if {@code get} throws any checked exception except for an {@code 1338 * ExecutionException} whose cause is not itself a checked exception 1339 * @throws UncheckedExecutionException if {@code get} throws an {@code 1340 * ExecutionException} with a {@code RuntimeException} as its cause 1341 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1342 * with an {@code Error} as its cause 1343 * @throws CancellationException if {@code get} throws a {@code 1344 * CancellationException} 1345 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1346 * RuntimeException} or does not have a suitable constructor 1347 * @since 10.0 1348 */ 1349 public static <V, X extends Exception> V get( 1350 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 1351 throws X { 1352 checkNotNull(future); 1353 checkNotNull(unit); 1354 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 1355 "Futures.get exception type (%s) must not be a RuntimeException", 1356 exceptionClass); 1357 try { 1358 return future.get(timeout, unit); 1359 } catch (InterruptedException e) { 1360 currentThread().interrupt(); 1361 throw newWithCause(exceptionClass, e); 1362 } catch (TimeoutException e) { 1363 throw newWithCause(exceptionClass, e); 1364 } catch (ExecutionException e) { 1365 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 1366 throw new AssertionError(); 1367 } 1368 } 1369 1370 private static <X extends Exception> void wrapAndThrowExceptionOrError( 1371 Throwable cause, Class<X> exceptionClass) throws X { 1372 if (cause instanceof Error) { 1373 throw new ExecutionError((Error) cause); 1374 } 1375 if (cause instanceof RuntimeException) { 1376 throw new UncheckedExecutionException(cause); 1377 } 1378 throw newWithCause(exceptionClass, cause); 1379 } 1380 1381 /** 1382 * Returns the result of calling {@link Future#get()} uninterruptibly on a 1383 * task known not to throw a checked exception. This makes {@code Future} more 1384 * suitable for lightweight, fast-running tasks that, barring bugs in the 1385 * code, will not fail. This gives it exception-handling behavior similar to 1386 * that of {@code ForkJoinTask.join}. 1387 * 1388 * <p>Exceptions from {@code Future.get} are treated as follows: 1389 * <ul> 1390 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1391 * {@link UncheckedExecutionException} (if the cause is an {@code 1392 * Exception}) or {@link ExecutionError} (if the cause is an {@code 1393 * Error}). 1394 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 1395 * call. The interrupt is restored before {@code getUnchecked} returns. 1396 * <li>Any {@link CancellationException} is propagated untouched. So is any 1397 * other {@link RuntimeException} ({@code get} implementations are 1398 * discouraged from throwing such exceptions). 1399 * </ul> 1400 * 1401 * <p>The overall principle is to eliminate all checked exceptions: to loop to 1402 * avoid {@code InterruptedException}, to pass through {@code 1403 * CancellationException}, and to wrap any exception from the underlying 1404 * computation in an {@code UncheckedExecutionException} or {@code 1405 * ExecutionError}. 1406 * 1407 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 1408 * {@link Uninterruptibles#getUninterruptibly(Future)}. 1409 * 1410 * @throws UncheckedExecutionException if {@code get} throws an {@code 1411 * ExecutionException} with an {@code Exception} as its cause 1412 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1413 * with an {@code Error} as its cause 1414 * @throws CancellationException if {@code get} throws a {@code 1415 * CancellationException} 1416 * @since 10.0 1417 */ 1418 public static <V> V getUnchecked(Future<V> future) { 1419 checkNotNull(future); 1420 try { 1421 return getUninterruptibly(future); 1422 } catch (ExecutionException e) { 1423 wrapAndThrowUnchecked(e.getCause()); 1424 throw new AssertionError(); 1425 } 1426 } 1427 1428 private static void wrapAndThrowUnchecked(Throwable cause) { 1429 if (cause instanceof Error) { 1430 throw new ExecutionError((Error) cause); 1431 } 1432 /* 1433 * It's a non-Error, non-Exception Throwable. From my survey of such 1434 * classes, I believe that most users intended to extend Exception, so we'll 1435 * treat it like an Exception. 1436 */ 1437 throw new UncheckedExecutionException(cause); 1438 } 1439 1440 /* 1441 * TODO(user): FutureChecker interface for these to be static methods on? If 1442 * so, refer to it in the (static-method) Futures.get documentation 1443 */ 1444 1445 /* 1446 * Arguably we don't need a timed getUnchecked because any operation slow 1447 * enough to require a timeout is heavyweight enough to throw a checked 1448 * exception and therefore be inappropriate to use with getUnchecked. Further, 1449 * it's not clear that converting the checked TimeoutException to a 1450 * RuntimeException -- especially to an UncheckedExecutionException, since it 1451 * wasn't thrown by the computation -- makes sense, and if we don't convert 1452 * it, the user still has to write a try-catch block. 1453 * 1454 * If you think you would use this method, let us know. 1455 */ 1456 1457 private static <X extends Exception> X newWithCause( 1458 Class<X> exceptionClass, Throwable cause) { 1459 // getConstructors() guarantees this as long as we don't modify the array. 1460 @SuppressWarnings("unchecked") 1461 List<Constructor<X>> constructors = 1462 (List) Arrays.asList(exceptionClass.getConstructors()); 1463 for (Constructor<X> constructor : preferringStrings(constructors)) { 1464 @Nullable X instance = newFromConstructor(constructor, cause); 1465 if (instance != null) { 1466 if (instance.getCause() == null) { 1467 instance.initCause(cause); 1468 } 1469 return instance; 1470 } 1471 } 1472 throw new IllegalArgumentException( 1473 "No appropriate constructor for exception of type " + exceptionClass 1474 + " in response to chained exception", cause); 1475 } 1476 1477 private static <X extends Exception> List<Constructor<X>> 1478 preferringStrings(List<Constructor<X>> constructors) { 1479 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); 1480 } 1481 1482 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST = 1483 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() { 1484 @Override public Boolean apply(Constructor<?> input) { 1485 return asList(input.getParameterTypes()).contains(String.class); 1486 } 1487 }).reverse(); 1488 1489 @Nullable private static <X> X newFromConstructor( 1490 Constructor<X> constructor, Throwable cause) { 1491 Class<?>[] paramTypes = constructor.getParameterTypes(); 1492 Object[] params = new Object[paramTypes.length]; 1493 for (int i = 0; i < paramTypes.length; i++) { 1494 Class<?> paramType = paramTypes[i]; 1495 if (paramType.equals(String.class)) { 1496 params[i] = cause.toString(); 1497 } else if (paramType.equals(Throwable.class)) { 1498 params[i] = cause; 1499 } else { 1500 return null; 1501 } 1502 } 1503 try { 1504 return constructor.newInstance(params); 1505 } catch (IllegalArgumentException e) { 1506 return null; 1507 } catch (InstantiationException e) { 1508 return null; 1509 } catch (IllegalAccessException e) { 1510 return null; 1511 } catch (InvocationTargetException e) { 1512 return null; 1513 } 1514 } 1515 1516 private interface FutureCombiner<V, C> { 1517 C combine(List<Optional<V>> values); 1518 } 1519 1520 private static class CombinedFuture<V, C> extends AbstractFuture<C> { 1521 private static final Logger logger = 1522 Logger.getLogger(CombinedFuture.class.getName()); 1523 1524 ImmutableCollection<? extends ListenableFuture<? extends V>> futures; 1525 final boolean allMustSucceed; 1526 final AtomicInteger remaining; 1527 FutureCombiner<V, C> combiner; 1528 List<Optional<V>> values; 1529 final Object seenExceptionsLock = new Object(); 1530 Set<Throwable> seenExceptions; 1531 1532 CombinedFuture( 1533 ImmutableCollection<? extends ListenableFuture<? extends V>> futures, 1534 boolean allMustSucceed, Executor listenerExecutor, 1535 FutureCombiner<V, C> combiner) { 1536 this.futures = futures; 1537 this.allMustSucceed = allMustSucceed; 1538 this.remaining = new AtomicInteger(futures.size()); 1539 this.combiner = combiner; 1540 this.values = Lists.newArrayListWithCapacity(futures.size()); 1541 init(listenerExecutor); 1542 } 1543 1544 /** 1545 * Must be called at the end of the constructor. 1546 */ 1547 protected void init(final Executor listenerExecutor) { 1548 // First, schedule cleanup to execute when the Future is done. 1549 addListener(new Runnable() { 1550 @Override 1551 public void run() { 1552 // Cancel all the component futures. 1553 if (CombinedFuture.this.isCancelled()) { 1554 for (ListenableFuture<?> future : CombinedFuture.this.futures) { 1555 future.cancel(CombinedFuture.this.wasInterrupted()); 1556 } 1557 } 1558 1559 // Let go of the memory held by other futures 1560 CombinedFuture.this.futures = null; 1561 1562 // By now the values array has either been set as the Future's value, 1563 // or (in case of failure) is no longer useful. 1564 CombinedFuture.this.values = null; 1565 1566 // The combiner may also hold state, so free that as well 1567 CombinedFuture.this.combiner = null; 1568 } 1569 }, MoreExecutors.sameThreadExecutor()); 1570 1571 // Now begin the "real" initialization. 1572 1573 // Corner case: List is empty. 1574 if (futures.isEmpty()) { 1575 set(combiner.combine(ImmutableList.<Optional<V>>of())); 1576 return; 1577 } 1578 1579 // Populate the results list with null initially. 1580 for (int i = 0; i < futures.size(); ++i) { 1581 values.add(null); 1582 } 1583 1584 // Register a listener on each Future in the list to update 1585 // the state of this future. 1586 // Note that if all the futures on the list are done prior to completing 1587 // this loop, the last call to addListener() will callback to 1588 // setOneValue(), transitively call our cleanup listener, and set 1589 // this.futures to null. 1590 // This is not actually a problem, since the foreach only needs 1591 // this.futures to be non-null at the beginning of the loop. 1592 int i = 0; 1593 for (final ListenableFuture<? extends V> listenable : futures) { 1594 final int index = i++; 1595 listenable.addListener(new Runnable() { 1596 @Override 1597 public void run() { 1598 setOneValue(index, listenable); 1599 } 1600 }, listenerExecutor); 1601 } 1602 } 1603 1604 /** 1605 * Fails this future with the given Throwable if {@link #allMustSucceed} is 1606 * true. Also, logs the throwable if it is an {@link Error} or if 1607 * {@link #allMustSucceed} is {@code true}, the throwable did not cause 1608 * this future to fail, and it is the first time we've seen that particular Throwable. 1609 */ 1610 private void setExceptionAndMaybeLog(Throwable throwable) { 1611 boolean visibleFromOutputFuture = false; 1612 boolean firstTimeSeeingThisException = true; 1613 if (allMustSucceed) { 1614 // As soon as the first one fails, throw the exception up. 1615 // The result of all other inputs is then ignored. 1616 visibleFromOutputFuture = super.setException(throwable); 1617 1618 synchronized (seenExceptionsLock) { 1619 if (seenExceptions == null) { 1620 seenExceptions = Sets.newHashSet(); 1621 } 1622 firstTimeSeeingThisException = seenExceptions.add(throwable); 1623 } 1624 } 1625 1626 if (throwable instanceof Error 1627 || (allMustSucceed && !visibleFromOutputFuture && firstTimeSeeingThisException)) { 1628 logger.log(Level.SEVERE, "input future failed.", throwable); 1629 } 1630 } 1631 1632 /** 1633 * Sets the value at the given index to that of the given future. 1634 */ 1635 private void setOneValue(int index, Future<? extends V> future) { 1636 List<Optional<V>> localValues = values; 1637 // TODO(user): This check appears to be redundant since values is 1638 // assigned null only after the future completes. However, values 1639 // is not volatile so it may be possible for us to observe the changes 1640 // to these two values in a different order... which I think is why 1641 // we need to check both. Clear up this craziness either by making 1642 // values volatile or proving that it doesn't need to be for some other 1643 // reason. 1644 if (isDone() || localValues == null) { 1645 // Some other future failed or has been cancelled, causing this one to 1646 // also be cancelled or have an exception set. This should only happen 1647 // if allMustSucceed is true or if the output itself has been 1648 // cancelled. 1649 checkState(allMustSucceed || isCancelled(), 1650 "Future was done before all dependencies completed"); 1651 } 1652 1653 try { 1654 checkState(future.isDone(), 1655 "Tried to set value from future which is not done"); 1656 V returnValue = getUninterruptibly(future); 1657 if (localValues != null) { 1658 localValues.set(index, Optional.fromNullable(returnValue)); 1659 } 1660 } catch (CancellationException e) { 1661 if (allMustSucceed) { 1662 // Set ourselves as cancelled. Let the input futures keep running 1663 // as some of them may be used elsewhere. 1664 cancel(false); 1665 } 1666 } catch (ExecutionException e) { 1667 setExceptionAndMaybeLog(e.getCause()); 1668 } catch (Throwable t) { 1669 setExceptionAndMaybeLog(t); 1670 } finally { 1671 int newRemaining = remaining.decrementAndGet(); 1672 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 1673 if (newRemaining == 0) { 1674 FutureCombiner<V, C> localCombiner = combiner; 1675 if (localCombiner != null && localValues != null) { 1676 set(localCombiner.combine(localValues)); 1677 } else { 1678 checkState(isDone()); 1679 } 1680 } 1681 } 1682 } 1683 1684 } 1685 1686 /** Used for {@link #allAsList} and {@link #successfulAsList}. */ 1687 private static <V> ListenableFuture<List<V>> listFuture( 1688 ImmutableList<ListenableFuture<? extends V>> futures, 1689 boolean allMustSucceed, Executor listenerExecutor) { 1690 return new CombinedFuture<V, List<V>>( 1691 futures, allMustSucceed, listenerExecutor, 1692 new FutureCombiner<V, List<V>>() { 1693 @Override 1694 public List<V> combine(List<Optional<V>> values) { 1695 List<V> result = Lists.newArrayList(); 1696 for (Optional<V> element : values) { 1697 result.add(element != null ? element.orNull() : null); 1698 } 1699 return Collections.unmodifiableList(result); 1700 } 1701 }); 1702 } 1703 1704 /** 1705 * A checked future that uses a function to map from exceptions to the 1706 * appropriate checked type. 1707 */ 1708 private static class MappingCheckedFuture<V, X extends Exception> extends 1709 AbstractCheckedFuture<V, X> { 1710 1711 final Function<Exception, X> mapper; 1712 1713 MappingCheckedFuture(ListenableFuture<V> delegate, 1714 Function<Exception, X> mapper) { 1715 super(delegate); 1716 1717 this.mapper = checkNotNull(mapper); 1718 } 1719 1720 @Override 1721 protected X mapException(Exception e) { 1722 return mapper.apply(e); 1723 } 1724 } 1725}