001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkNotNull; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021import static com.google.common.util.concurrent.Platform.isInstanceOfThrowableClass; 022import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 023 024import com.google.common.annotations.Beta; 025import com.google.common.annotations.GwtCompatible; 026import com.google.common.annotations.GwtIncompatible; 027import com.google.common.base.Function; 028import com.google.common.base.Optional; 029import com.google.common.base.Preconditions; 030import com.google.common.collect.ImmutableCollection; 031import com.google.common.collect.ImmutableList; 032import com.google.common.collect.Lists; 033import com.google.common.collect.Queues; 034 035import java.lang.reflect.UndeclaredThrowableException; 036import java.util.Collections; 037import java.util.List; 038import java.util.concurrent.CancellationException; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.Executor; 042import java.util.concurrent.Future; 043import java.util.concurrent.RejectedExecutionException; 044import java.util.concurrent.ScheduledExecutorService; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.TimeoutException; 047import java.util.logging.Level; 048import java.util.logging.Logger; 049 050import javax.annotation.CheckReturnValue; 051import javax.annotation.Nullable; 052 053/** 054 * Static utility methods pertaining to the {@link Future} interface. 055 * 056 * <p>Many of these methods use the {@link ListenableFuture} API; consult the 057 * Guava User Guide article on <a href= 058 * "https://github.com/google/guava/wiki/ListenableFutureExplained"> 059 * {@code ListenableFuture}</a>. 060 * 061 * @author Kevin Bourrillion 062 * @author Nishant Thakkar 063 * @author Sven Mawson 064 * @since 1.0 065 */ 066@Beta 067@GwtCompatible(emulated = true) 068public final class Futures extends GwtFuturesCatchingSpecialization { 069 070 // A note on memory visibility. 071 // Many of the utilities in this class (transform, withFallback, withTimeout, asList, combine) 072 // have two requirements that significantly complicate their design. 073 // 1. Cancellation should propagate from the returned future to the input future(s). 074 // 2. The returned futures shouldn't unnecessarily 'pin' their inputs after completion. 075 // 076 // A consequence of these requirements is that the delegate futures cannot be stored in 077 // final fields. 078 // 079 // For simplicity the rest of this description will discuss Futures.catching since it is the 080 // simplest instance, though very similar descriptions apply to many other classes in this file. 081 // 082 // In the constructor of AbstractCatchingFuture, the delegate future is assigned to a field 083 // 'inputFuture'. That field is non-final and non-volatile. There are 2 places where the 084 // 'inputFuture' field is read and where we will have to consider visibility of the write 085 // operation in the constructor. 086 // 087 // 1. In the listener that performs the callback. In this case it is fine since inputFuture is 088 // assigned prior to calling addListener, and addListener happens-before any invocation of the 089 // listener. Notably, this means that 'volatile' is unnecessary to make 'inputFuture' visible 090 // to the listener. 091 // 092 // 2. In done() where we may propagate cancellation to the input. In this case it is _not_ fine. 093 // There is currently nothing that enforces that the write to inputFuture in the constructor is 094 // visible to done(). This is because there is no happens before edge between the write and a 095 // (hypothetical) unsafe read by our caller. Note: adding 'volatile' does not fix this issue, 096 // it would just add an edge such that if done() observed non-null, then it would also 097 // definitely observe all earlier writes, but we still have no guarantee that done() would see 098 // the inital write (just stronger guarantees if it does). 099 // 100 // See: http://cs.oswego.edu/pipermail/concurrency-interest/2015-January/013800.html 101 // For a (long) discussion about this specific issue and the general futility of life. 102 // 103 // For the time being we are OK with the problem discussed above since it requires a caller to 104 // introduce a very specific kind of data-race. And given the other operations performed by these 105 // methods that involve volatile read/write operations, in practice there is no issue. Also, the 106 // way in such a visibility issue would surface is most likely as a failure of cancel() to 107 // propagate to the input. Cancellation propagation is fundamentally racy so this is fine. 108 // 109 // Future versions of the JMM may revise safe construction semantics in such a way that we can 110 // safely publish these objects and we won't need this whole discussion. 111 // TODO(user,lukes): consider adding volatile to all these fields since in current known JVMs 112 // that should resolve the issue. This comes at the cost of adding more write barriers to the 113 // implementations. 114 115 private Futures() {} 116 117 /** 118 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} and a {@link Function} 119 * that maps from {@link Exception} instances into the appropriate checked type. 120 * 121 * <p><b>Warning:</b> We recommend against using {@code CheckedFuture} in new projects. {@code 122 * CheckedFuture} is difficult to build libraries atop. {@code CheckedFuture} ports of methods 123 * like {@link Futures#transformAsync} have historically had bugs, and some of these bugs are 124 * necessary, unavoidable consequences of the {@code CheckedFuture} API. Additionally, {@code 125 * CheckedFuture} encourages users to take exceptions from one thread and rethrow them in another, 126 * producing confusing stack traces. 127 * 128 * <p>The given mapping function will be applied to an 129 * {@link InterruptedException}, a {@link CancellationException}, or an 130 * {@link ExecutionException}. 131 * See {@link Future#get()} for details on the exceptions thrown. 132 * 133 * @since 9.0 (source-compatible since 1.0) 134 */ 135 @GwtIncompatible("TODO") 136 @CheckReturnValue 137 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 138 ListenableFuture<V> future, Function<? super Exception, X> mapper) { 139 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 140 } 141 142 private abstract static class ImmediateFuture<V> 143 implements ListenableFuture<V> { 144 145 private static final Logger log = 146 Logger.getLogger(ImmediateFuture.class.getName()); 147 148 @Override 149 public void addListener(Runnable listener, Executor executor) { 150 checkNotNull(listener, "Runnable was null."); 151 checkNotNull(executor, "Executor was null."); 152 try { 153 executor.execute(listener); 154 } catch (RuntimeException e) { 155 // ListenableFuture's contract is that it will not throw unchecked 156 // exceptions, so log the bad runnable and/or executor and swallow it. 157 log.log(Level.SEVERE, "RuntimeException while executing runnable " 158 + listener + " with executor " + executor, e); 159 } 160 } 161 162 @Override 163 public boolean cancel(boolean mayInterruptIfRunning) { 164 return false; 165 } 166 167 @Override 168 public abstract V get() throws ExecutionException; 169 170 @Override 171 public V get(long timeout, TimeUnit unit) throws ExecutionException { 172 checkNotNull(unit); 173 return get(); 174 } 175 176 @Override 177 public boolean isCancelled() { 178 return false; 179 } 180 181 @Override 182 public boolean isDone() { 183 return true; 184 } 185 } 186 187 private static class ImmediateSuccessfulFuture<V> extends ImmediateFuture<V> { 188 static final ImmediateSuccessfulFuture<Object> NULL = 189 new ImmediateSuccessfulFuture<Object>(null); 190 191 @Nullable private final V value; 192 193 ImmediateSuccessfulFuture(@Nullable V value) { 194 this.value = value; 195 } 196 197 @Override 198 public V get() { 199 return value; 200 } 201 } 202 203 @GwtIncompatible("TODO") 204 private static class ImmediateSuccessfulCheckedFuture<V, X extends Exception> 205 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 206 207 @Nullable private final V value; 208 209 ImmediateSuccessfulCheckedFuture(@Nullable V value) { 210 this.value = value; 211 } 212 213 @Override 214 public V get() { 215 return value; 216 } 217 218 @Override 219 public V checkedGet() { 220 return value; 221 } 222 223 @Override 224 public V checkedGet(long timeout, TimeUnit unit) { 225 checkNotNull(unit); 226 return value; 227 } 228 } 229 230 private static class ImmediateFailedFuture<V> extends ImmediateFuture<V> { 231 232 private final Throwable thrown; 233 234 ImmediateFailedFuture(Throwable thrown) { 235 this.thrown = thrown; 236 } 237 238 @Override 239 public V get() throws ExecutionException { 240 throw new ExecutionException(thrown); 241 } 242 } 243 244 @GwtIncompatible("TODO") 245 private static class ImmediateCancelledFuture<V> extends ImmediateFuture<V> { 246 247 private final CancellationException thrown; 248 249 ImmediateCancelledFuture() { 250 this.thrown = new CancellationException("Immediate cancelled future."); 251 } 252 253 @Override 254 public boolean isCancelled() { 255 return true; 256 } 257 258 @Override 259 public V get() { 260 throw AbstractFuture.cancellationExceptionWithCause( 261 "Task was cancelled.", thrown); 262 } 263 } 264 265 @GwtIncompatible("TODO") 266 private static class ImmediateFailedCheckedFuture<V, X extends Exception> 267 extends ImmediateFuture<V> implements CheckedFuture<V, X> { 268 269 private final X thrown; 270 271 ImmediateFailedCheckedFuture(X thrown) { 272 this.thrown = thrown; 273 } 274 275 @Override 276 public V get() throws ExecutionException { 277 throw new ExecutionException(thrown); 278 } 279 280 @Override 281 public V checkedGet() throws X { 282 throw thrown; 283 } 284 285 @Override 286 public V checkedGet(long timeout, TimeUnit unit) throws X { 287 checkNotNull(unit); 288 throw thrown; 289 } 290 } 291 292 /** 293 * Creates a {@code ListenableFuture} which has its value set immediately upon 294 * construction. The getters just return the value. This {@code Future} can't 295 * be canceled or timed out and its {@code isDone()} method always returns 296 * {@code true}. 297 */ 298 @CheckReturnValue 299 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 300 if (value == null) { 301 // This cast is safe because null is assignable to V for all V (i.e. it is covariant) 302 @SuppressWarnings({"unchecked", "rawtypes"}) 303 ListenableFuture<V> typedNull = (ListenableFuture) ImmediateSuccessfulFuture.NULL; 304 return typedNull; 305 } 306 return new ImmediateSuccessfulFuture<V>(value); 307 } 308 309 /** 310 * Returns a {@code CheckedFuture} which has its value set immediately upon 311 * construction. 312 * 313 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 314 * method always returns {@code true}. Calling {@code get()} or {@code 315 * checkedGet()} will immediately return the provided value. 316 */ 317 @GwtIncompatible("TODO") 318 @CheckReturnValue 319 public static <V, X extends Exception> CheckedFuture<V, X> 320 immediateCheckedFuture(@Nullable V value) { 321 return new ImmediateSuccessfulCheckedFuture<V, X>(value); 322 } 323 324 /** 325 * Returns a {@code ListenableFuture} which has an exception set immediately 326 * upon construction. 327 * 328 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 329 * method always returns {@code true}. Calling {@code get()} will immediately 330 * throw the provided {@code Throwable} wrapped in an {@code 331 * ExecutionException}. 332 */ 333 @CheckReturnValue 334 public static <V> ListenableFuture<V> immediateFailedFuture( 335 Throwable throwable) { 336 checkNotNull(throwable); 337 return new ImmediateFailedFuture<V>(throwable); 338 } 339 340 /** 341 * Creates a {@code ListenableFuture} which is cancelled immediately upon 342 * construction, so that {@code isCancelled()} always returns {@code true}. 343 * 344 * @since 14.0 345 */ 346 @GwtIncompatible("TODO") 347 @CheckReturnValue 348 public static <V> ListenableFuture<V> immediateCancelledFuture() { 349 return new ImmediateCancelledFuture<V>(); 350 } 351 352 /** 353 * Returns a {@code CheckedFuture} which has an exception set immediately upon 354 * construction. 355 * 356 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 357 * method always returns {@code true}. Calling {@code get()} will immediately 358 * throw the provided {@code Exception} wrapped in an {@code 359 * ExecutionException}, and calling {@code checkedGet()} will throw the 360 * provided exception itself. 361 */ 362 @GwtIncompatible("TODO") 363 @CheckReturnValue 364 public static <V, X extends Exception> CheckedFuture<V, X> 365 immediateFailedCheckedFuture(X exception) { 366 checkNotNull(exception); 367 return new ImmediateFailedCheckedFuture<V, X>(exception); 368 } 369 370 /** 371 * Returns a {@code Future} whose result is taken from the given primary 372 * {@code input} or, if the primary input fails, from the {@code Future} 373 * provided by the {@code fallback}. {@link FutureFallback#create} is not 374 * invoked until the primary input has failed, so if the primary input 375 * succeeds, it is never invoked. If, during the invocation of {@code 376 * fallback}, an exception is thrown, this exception is used as the result of 377 * the output {@code Future}. 378 * 379 * <p>Below is an example of a fallback that returns a default value if an 380 * exception occurs: 381 * 382 * <pre> {@code 383 * ListenableFuture<Integer> fetchCounterFuture = ...; 384 * 385 * // Falling back to a zero counter in case an exception happens when 386 * // processing the RPC to fetch counters. 387 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 388 * fetchCounterFuture, new FutureFallback<Integer>() { 389 * public ListenableFuture<Integer> create(Throwable t) { 390 * // Returning "0" as the default for the counter when the 391 * // exception happens. 392 * return immediateFuture(0); 393 * } 394 * });}</pre> 395 * 396 * <p>The fallback can also choose to propagate the original exception when 397 * desired: 398 * 399 * <pre> {@code 400 * ListenableFuture<Integer> fetchCounterFuture = ...; 401 * 402 * // Falling back to a zero counter only in case the exception was a 403 * // TimeoutException. 404 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 405 * fetchCounterFuture, new FutureFallback<Integer>() { 406 * public ListenableFuture<Integer> create(Throwable t) { 407 * if (t instanceof TimeoutException) { 408 * return immediateFuture(0); 409 * } 410 * return immediateFailedFuture(t); 411 * } 412 * });}</pre> 413 * 414 * <p>This overload, which does not accept an executor, uses {@code 415 * directExecutor}, a dangerous choice in some cases. See the discussion in 416 * the {@link ListenableFuture#addListener ListenableFuture.addListener} 417 * documentation. The documentation's warnings about "lightweight listeners" 418 * refer here to the work done during {@code FutureFallback.create}, not to 419 * any work done to complete the returned {@code Future}. 420 * 421 * @param input the primary input {@code Future} 422 * @param fallback the {@link FutureFallback} implementation to be called if 423 * {@code input} fails 424 * @since 14.0 425 * @deprecated Use {@link #catchingAsync(ListenableFuture, Class, 426 * AsyncFunction) catchingAsync(input, Throwable.class, 427 * fallbackImplementedAsAnAsyncFunction)}, usually replacing {@code 428 * Throwable.class} with the specific type you want to handle. This method 429 * will be removed in Guava release 20.0. 430 */ 431 @Deprecated 432 @CheckReturnValue 433 public static <V> ListenableFuture<V> withFallback( 434 ListenableFuture<? extends V> input, 435 FutureFallback<? extends V> fallback) { 436 return withFallback(input, fallback, directExecutor()); 437 } 438 439 /** 440 * Returns a {@code Future} whose result is taken from the given primary 441 * {@code input} or, if the primary input fails, from the {@code Future} 442 * provided by the {@code fallback}. {@link FutureFallback#create} is not 443 * invoked until the primary input has failed, so if the primary input 444 * succeeds, it is never invoked. If, during the invocation of {@code 445 * fallback}, an exception is thrown, this exception is used as the result of 446 * the output {@code Future}. 447 * 448 * <p>Below is an example of a fallback that returns a default value if an 449 * exception occurs: 450 * 451 * <pre> {@code 452 * ListenableFuture<Integer> fetchCounterFuture = ...; 453 * 454 * // Falling back to a zero counter in case an exception happens when 455 * // processing the RPC to fetch counters. 456 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 457 * fetchCounterFuture, new FutureFallback<Integer>() { 458 * public ListenableFuture<Integer> create(Throwable t) { 459 * // Returning "0" as the default for the counter when the 460 * // exception happens. 461 * return immediateFuture(0); 462 * } 463 * }, directExecutor());}</pre> 464 * 465 * <p>The fallback can also choose to propagate the original exception when 466 * desired: 467 * 468 * <pre> {@code 469 * ListenableFuture<Integer> fetchCounterFuture = ...; 470 * 471 * // Falling back to a zero counter only in case the exception was a 472 * // TimeoutException. 473 * ListenableFuture<Integer> faultTolerantFuture = Futures.withFallback( 474 * fetchCounterFuture, new FutureFallback<Integer>() { 475 * public ListenableFuture<Integer> create(Throwable t) { 476 * if (t instanceof TimeoutException) { 477 * return immediateFuture(0); 478 * } 479 * return immediateFailedFuture(t); 480 * } 481 * }, directExecutor());}</pre> 482 * 483 * <p>When selecting an executor, note that {@code directExecutor} is 484 * dangerous in some cases. See the discussion in the {@link 485 * ListenableFuture#addListener ListenableFuture.addListener} documentation. 486 * The documentation's warnings about "lightweight listeners" refer here to 487 * the work done during {@code FutureFallback.create}, not to any work done to 488 * complete the returned {@code Future}. 489 * 490 * @param input the primary input {@code Future} 491 * @param fallback the {@link FutureFallback} implementation to be called if 492 * {@code input} fails 493 * @param executor the executor that runs {@code fallback} if {@code input} 494 * fails 495 * @since 14.0 496 * @deprecated Use {@link #catchingAsync(ListenableFuture, Class, 497 * AsyncFunction, Executor) catchingAsync(input, Throwable.class, 498 * fallbackImplementedAsAnAsyncFunction, executor)}, usually replacing 499 * {@code Throwable.class} with the specific type you want to handle. This method 500 * will be removed in Guava release 20.0. 501 */ 502 @Deprecated 503 @CheckReturnValue 504 public static <V> ListenableFuture<V> withFallback( 505 ListenableFuture<? extends V> input, 506 FutureFallback<? extends V> fallback, Executor executor) { 507 return catchingAsync( 508 input, Throwable.class, asAsyncFunction(fallback), executor); 509 } 510 511 /** 512 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 513 * primary input fails with the given {@code exceptionType}, from the result provided by the 514 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 515 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 516 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 517 * Future}. 518 * 519 * <p>Usage example: 520 * 521 * <pre> {@code 522 * ListenableFuture<Integer> fetchCounterFuture = ...; 523 * 524 * // Falling back to a zero counter in case an exception happens when 525 * // processing the RPC to fetch counters. 526 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 527 * fetchCounterFuture, FetchException.class, 528 * new Function<FetchException, Integer>() { 529 * public Integer apply(FetchException e) { 530 * return 0; 531 * } 532 * });}</pre> 533 * 534 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 535 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 536 * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight 537 * listeners" refer here to the work done during {@code Function.apply}. 538 * 539 * @param input the primary input {@code Future} 540 * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding 541 * bugs and other unrecoverable errors, callers should prefer more specific types, avoiding 542 * {@code Throwable.class} in particular. 543 * @param fallback the {@link Function} implementation to be called if {@code input} fails with 544 * the expected exception type 545 * @since 19.0 546 */ 547 @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 548 @CheckReturnValue 549 public static <V, X extends Throwable> ListenableFuture<V> catching( 550 ListenableFuture<? extends V> input, Class<X> exceptionType, 551 Function<? super X, ? extends V> fallback) { 552 CatchingFuture<V, X> future = new CatchingFuture<V, X>(input, exceptionType, fallback); 553 input.addListener(future, directExecutor()); 554 return future; 555 } 556 557 /** 558 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 559 * primary input fails with the given {@code exceptionType}, from the result provided by the 560 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 561 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 562 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 563 * Future}. 564 * 565 * <p>Usage example: 566 * 567 * <pre> {@code 568 * ListenableFuture<Integer> fetchCounterFuture = ...; 569 * 570 * // Falling back to a zero counter in case an exception happens when 571 * // processing the RPC to fetch counters. 572 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 573 * fetchCounterFuture, FetchException.class, 574 * new Function<FetchException, Integer>() { 575 * public Integer apply(FetchException e) { 576 * return 0; 577 * } 578 * }, directExecutor());}</pre> 579 * 580 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 581 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 582 * documentation. The documentation's warnings about "lightweight listeners" refer here to the 583 * work done during {@code Function.apply}. 584 * 585 * @param input the primary input {@code Future} 586 * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding 587 * bugs and other unrecoverable errors, callers should prefer more specific types, avoiding 588 * {@code Throwable.class} in particular. 589 * @param fallback the {@link Function} implementation to be called if {@code input} fails with 590 * the expected exception type 591 * @param executor the executor that runs {@code fallback} if {@code input} fails 592 * @since 19.0 593 */ 594 @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 595 @CheckReturnValue 596 public static <V, X extends Throwable> ListenableFuture<V> catching( 597 ListenableFuture<? extends V> input, Class<X> exceptionType, 598 Function<? super X, ? extends V> fallback, Executor executor) { 599 CatchingFuture<V, X> future = new CatchingFuture<V, X>(input, exceptionType, fallback); 600 input.addListener(future, rejectionPropagatingExecutor(executor, future)); 601 return future; 602 } 603 604 /** 605 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 606 * primary input fails with the given {@code exceptionType}, from the result provided by the 607 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 608 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 609 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 610 * {@code Future}. 611 * 612 * <p>Usage examples: 613 * 614 * <pre> {@code 615 * ListenableFuture<Integer> fetchCounterFuture = ...; 616 * 617 * // Falling back to a zero counter in case an exception happens when 618 * // processing the RPC to fetch counters. 619 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 620 * fetchCounterFuture, FetchException.class, 621 * new AsyncFunction<FetchException, Integer>() { 622 * public ListenableFuture<Integer> apply(FetchException e) { 623 * return immediateFuture(0); 624 * } 625 * });}</pre> 626 * 627 * <p>The fallback can also choose to propagate the original exception when desired: 628 * 629 * <pre> {@code 630 * ListenableFuture<Integer> fetchCounterFuture = ...; 631 * 632 * // Falling back to a zero counter only in case the exception was a 633 * // TimeoutException. 634 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 635 * fetchCounterFuture, FetchException.class, 636 * new AsyncFunction<FetchException, Integer>() { 637 * public ListenableFuture<Integer> apply(FetchException e) 638 * throws FetchException { 639 * if (omitDataOnFetchFailure) { 640 * return immediateFuture(0); 641 * } 642 * throw e; 643 * } 644 * });}</pre> 645 * 646 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 647 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 648 * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight 649 * listeners" refer here to the work done during {@code AsyncFunction.apply}, not to any work done 650 * to complete the returned {@code Future}. 651 * 652 * @param input the primary input {@code Future} 653 * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding 654 * bugs and other unrecoverable errors, callers should prefer more specific types, avoiding 655 * {@code Throwable.class} in particular. 656 * @param fallback the {@link AsyncFunction} implementation to be called if {@code input} fails 657 * with the expected exception type 658 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 659 */ 660 @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 661 // TODO(kak): @CheckReturnValue 662 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 663 ListenableFuture<? extends V> input, Class<X> exceptionType, 664 AsyncFunction<? super X, ? extends V> fallback) { 665 AsyncCatchingFuture<V, X> future = 666 new AsyncCatchingFuture<V, X>(input, exceptionType, fallback); 667 input.addListener(future, directExecutor()); 668 return future; 669 } 670 671 /** 672 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 673 * primary input fails with the given {@code exceptionType}, from the result provided by the 674 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 675 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 676 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 677 * {@code Future}. 678 * 679 * <p>Usage examples: 680 * 681 * <pre> {@code 682 * ListenableFuture<Integer> fetchCounterFuture = ...; 683 * 684 * // Falling back to a zero counter in case an exception happens when 685 * // processing the RPC to fetch counters. 686 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 687 * fetchCounterFuture, FetchException.class, 688 * new AsyncFunction<FetchException, Integer>() { 689 * public ListenableFuture<Integer> apply(FetchException e) { 690 * return immediateFuture(0); 691 * } 692 * }, directExecutor());}</pre> 693 * 694 * <p>The fallback can also choose to propagate the original exception when desired: 695 * 696 * <pre> {@code 697 * ListenableFuture<Integer> fetchCounterFuture = ...; 698 * 699 * // Falling back to a zero counter only in case the exception was a 700 * // TimeoutException. 701 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 702 * fetchCounterFuture, FetchException.class, 703 * new AsyncFunction<FetchException, Integer>() { 704 * public ListenableFuture<Integer> apply(FetchException e) 705 * throws FetchException { 706 * if (omitDataOnFetchFailure) { 707 * return immediateFuture(0); 708 * } 709 * throw e; 710 * } 711 * }, directExecutor());}</pre> 712 * 713 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 714 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 715 * documentation. The documentation's warnings about "lightweight listeners" refer here to the 716 * work done during {@code AsyncFunction.apply}, not to any work done to complete the returned 717 * {@code Future}. 718 * 719 * @param input the primary input {@code Future} 720 * @param exceptionType the exception type that triggers use of {@code fallback}. To avoid hiding 721 * bugs and other unrecoverable errors, callers should prefer more specific types, avoiding 722 * {@code Throwable.class} in particular. 723 * @param fallback the {@link AsyncFunction} implementation to be called if {@code input} fails 724 * with the expected exception type 725 * @param executor the executor that runs {@code fallback} if {@code input} fails 726 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 727 */ 728 @GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 729 // TODO(kak): @CheckReturnValue 730 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 731 ListenableFuture<? extends V> input, Class<X> exceptionType, 732 AsyncFunction<? super X, ? extends V> fallback, Executor executor) { 733 AsyncCatchingFuture<V, X> future = 734 new AsyncCatchingFuture<V, X>(input, exceptionType, fallback); 735 input.addListener(future, rejectionPropagatingExecutor(executor, future)); 736 return future; 737 } 738 739 @Deprecated 740 static <V> AsyncFunction<Throwable, V> asAsyncFunction(final FutureFallback<V> fallback) { 741 checkNotNull(fallback); 742 return new AsyncFunction<Throwable, V>() { 743 @Override 744 public ListenableFuture<V> apply(Throwable t) throws Exception { 745 return checkNotNull(fallback.create(t), "FutureFallback.create returned null instead of a " 746 + "Future. Did you mean to return immediateFuture(null)?"); 747 } 748 }; 749 } 750 751 private abstract static class AbstractCatchingFuture<V, X extends Throwable, F> 752 extends AbstractFuture.TrustedFuture<V> implements Runnable { 753 @Nullable ListenableFuture<? extends V> inputFuture; 754 @Nullable Class<X> exceptionType; 755 @Nullable F fallback; 756 757 AbstractCatchingFuture( 758 ListenableFuture<? extends V> inputFuture, Class<X> exceptionType, F fallback) { 759 this.inputFuture = checkNotNull(inputFuture); 760 this.exceptionType = checkNotNull(exceptionType); 761 this.fallback = checkNotNull(fallback); 762 } 763 764 @Override public final void run() { 765 ListenableFuture<? extends V> localInputFuture = inputFuture; 766 Class<X> localExceptionType = exceptionType; 767 F localFallback = fallback; 768 if (localInputFuture == null | localExceptionType == null | localFallback == null 769 | isCancelled()) { 770 return; 771 } 772 inputFuture = null; 773 exceptionType = null; 774 fallback = null; 775 776 Throwable throwable; 777 try { 778 set(getUninterruptibly(localInputFuture)); 779 return; 780 } catch (ExecutionException e) { 781 throwable = e.getCause(); 782 } catch (Throwable e) { // this includes cancellation exception 783 throwable = e; 784 } 785 try { 786 if (isInstanceOfThrowableClass(throwable, localExceptionType)) { 787 @SuppressWarnings("unchecked") // verified safe by isInstance 788 X castThrowable = (X) throwable; 789 doFallback(localFallback, castThrowable); 790 } else { 791 setException(throwable); 792 } 793 } catch (Throwable e) { 794 setException(e); 795 } 796 } 797 798 /** Template method for subtypes to actually run the fallback. */ 799 abstract void doFallback(F fallback, X throwable) throws Exception; 800 801 @Override final void done() { 802 maybePropagateCancellation(inputFuture); 803 this.inputFuture = null; 804 this.exceptionType = null; 805 this.fallback = null; 806 } 807 } 808 809 /** 810 * A {@link AbstractCatchingFuture} that delegates to an {@link AsyncFunction} 811 * and {@link #setFuture(ListenableFuture)} to implement {@link #doFallback} 812 */ 813 static final class AsyncCatchingFuture<V, X extends Throwable> 814 extends AbstractCatchingFuture<V, X, AsyncFunction<? super X, ? extends V>> { 815 816 AsyncCatchingFuture(ListenableFuture<? extends V> input, Class<X> exceptionType, 817 AsyncFunction<? super X, ? extends V> fallback) { 818 super(input, exceptionType, fallback); 819 } 820 821 @Override void doFallback( 822 AsyncFunction<? super X, ? extends V> fallback, X cause) throws Exception { 823 ListenableFuture<? extends V> replacement = fallback.apply(cause); 824 checkNotNull(replacement, "AsyncFunction.apply returned null instead of a Future. " 825 + "Did you mean to return immediateFuture(null)?"); 826 setFuture(replacement); 827 } 828 } 829 830 /** 831 * A {@link AbstractCatchingFuture} that delegates to a {@link Function} 832 * and {@link #set(Object)} to implement {@link #doFallback} 833 */ 834 static final class CatchingFuture<V, X extends Throwable> 835 extends AbstractCatchingFuture<V, X, Function<? super X, ? extends V>> { 836 CatchingFuture(ListenableFuture<? extends V> input, Class<X> exceptionType, 837 Function<? super X, ? extends V> fallback) { 838 super(input, exceptionType, fallback); 839 } 840 841 @Override void doFallback(Function<? super X, ? extends V> fallback, X cause) throws Exception { 842 V replacement = fallback.apply(cause); 843 set(replacement); 844 } 845 } 846 847 /** 848 * Returns a future that delegates to another but will finish early (via a 849 * {@link TimeoutException} wrapped in an {@link ExecutionException}) if the 850 * specified duration expires. 851 * 852 * <p>The delegate future is interrupted and cancelled if it times out. 853 * 854 * @param delegate The future to delegate to. 855 * @param time when to timeout the future 856 * @param unit the time unit of the time parameter 857 * @param scheduledExecutor The executor service to enforce the timeout. 858 * 859 * @since 19.0 860 */ 861 @GwtIncompatible("java.util.concurrent.ScheduledExecutorService") 862 @CheckReturnValue 863 public static <V> ListenableFuture<V> withTimeout(ListenableFuture<V> delegate, 864 long time, TimeUnit unit, ScheduledExecutorService scheduledExecutor) { 865 TimeoutFuture<V> result = new TimeoutFuture<V>(delegate); 866 TimeoutFuture.Fire<V> fire = new TimeoutFuture.Fire<V>(result); 867 result.timer = scheduledExecutor.schedule(fire, time, unit); 868 delegate.addListener(fire, directExecutor()); 869 return result; 870 } 871 872 /** 873 * Future that delegates to another but will finish early (via a {@link 874 * TimeoutException} wrapped in an {@link ExecutionException}) if the 875 * specified duration expires. 876 * The delegate future is interrupted and cancelled if it times out. 877 */ 878 private static final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture<V> { 879 // Memory visibility of these fields. 880 // There are two cases to consider. 881 // 1. visibility of the writes to these fields to Fire.run 882 // The initial write to delegateRef is made definitely visible via the semantics of 883 // addListener/SES.schedule. The later racy write in cancel() is not guaranteed to be 884 // observed, however that is fine since the correctness is based on the atomic state in 885 // our base class. 886 // The initial write to timer is never definitely visible to Fire.run since it is assigned 887 // after SES.schedule is called. Therefore Fire.run has to check for null. However, it 888 // should be visible if Fire.run is called by delegate.addListener since addListener is 889 // called after the assignment to timer, and importantly this is the main situation in which 890 // we need to be able to see the write. 891 // 2. visibility of the writes to cancel 892 // Since these fields are non-final that means that TimeoutFuture is not being 'safely 893 // published', thus a motivated caller may be able to expose the reference to another thread 894 // that would then call cancel() and be unable to cancel the delegate. 895 // There are a number of ways to solve this, none of which are very pretty, and it is 896 // currently believed to be a purely theoretical problem (since the other actions should 897 // supply sufficient write-barriers). 898 899 @Nullable ListenableFuture<V> delegateRef; 900 @Nullable Future<?> timer; 901 902 TimeoutFuture(ListenableFuture<V> delegate) { 903 this.delegateRef = Preconditions.checkNotNull(delegate); 904 } 905 906 /** A runnable that is called when the delegate or the timer completes. */ 907 private static final class Fire<V> implements Runnable { 908 @Nullable TimeoutFuture<V> timeoutFutureRef; 909 910 Fire(TimeoutFuture<V> timeoutFuture) { 911 this.timeoutFutureRef = timeoutFuture; 912 } 913 914 @Override public void run() { 915 // If either of these reads return null then we must be after a successful cancel 916 // or another call to this method. 917 TimeoutFuture<V> timeoutFuture = timeoutFutureRef; 918 if (timeoutFuture == null) { 919 return; 920 } 921 ListenableFuture<V> delegate = timeoutFuture.delegateRef; 922 if (delegate == null) { 923 return; 924 } 925 926 /* 927 * If we're about to complete the TimeoutFuture, we want to release our reference to it. 928 * Otherwise, we'll pin it (and its result) in memory until the timeout task is GCed. (The 929 * need to clear our reference to the TimeoutFuture is the reason we use a *static* nested 930 * class with a manual reference back to the "containing" class.) 931 * 932 * This has the nice-ish side effect of limiting reentrancy: run() calls 933 * timeoutFuture.setException() calls run(). That reentrancy would already be harmless, 934 * since timeoutFuture can be set (and delegate cancelled) only once. (And "set only once" 935 * is important for other reasons: run() can still be invoked concurrently in different 936 * threads, even with the above null checks.) 937 */ 938 timeoutFutureRef = null; 939 if (delegate.isDone()) { 940 timeoutFuture.setFuture(delegate); 941 } else { 942 try { 943 // TODO(lukes): this stack trace is particularly useless (all it does is point at the 944 // scheduledexecutorservice thread), consider eliminating it altogether? 945 timeoutFuture.setException(new TimeoutException("Future timed out: " + delegate)); 946 } finally { 947 delegate.cancel(true); 948 } 949 } 950 } 951 } 952 953 @Override void done() { 954 maybePropagateCancellation(delegateRef); 955 956 Future<?> localTimer = timer; 957 // Try to cancel the timer as an optimization 958 // timer may be null if this call to run was by the timer task since there is no 959 // happens-before edge between the assignment to timer and an execution of the timer task. 960 if (localTimer != null) { 961 localTimer.cancel(false); 962 } 963 964 delegateRef = null; 965 timer = null; 966 } 967 } 968 969 /** 970 * Returns a new {@code ListenableFuture} whose result is asynchronously 971 * derived from the result of the given {@code Future}. More precisely, the 972 * returned {@code Future} takes its result from a {@code Future} produced by 973 * applying the given {@code AsyncFunction} to the result of the original 974 * {@code Future}. Example: 975 * 976 * <pre> {@code 977 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 978 * AsyncFunction<RowKey, QueryResult> queryFunction = 979 * new AsyncFunction<RowKey, QueryResult>() { 980 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 981 * return dataService.read(rowKey); 982 * } 983 * }; 984 * ListenableFuture<QueryResult> queryFuture = 985 * transform(rowKeyFuture, queryFunction);}</pre> 986 * 987 * <p>This overload, which does not accept an executor, uses {@code 988 * directExecutor}, a dangerous choice in some cases. See the discussion in 989 * the {@link ListenableFuture#addListener ListenableFuture.addListener} 990 * documentation. The documentation's warnings about "lightweight listeners" 991 * refer here to the work done during {@code AsyncFunction.apply}, not to any 992 * work done to complete the returned {@code Future}. 993 * 994 * <p>The returned {@code Future} attempts to keep its cancellation state in 995 * sync with that of the input future and that of the future returned by the 996 * function. That is, if the returned {@code Future} is cancelled, it will 997 * attempt to cancel the other two, and if either of the other two is 998 * cancelled, the returned {@code Future} will receive a callback in which it 999 * will attempt to cancel itself. 1000 * 1001 * @param input The future to transform 1002 * @param function A function to transform the result of the input future 1003 * to the result of the output future 1004 * @return A future that holds result of the function (if the input succeeded) 1005 * or the original input's failure (if not) 1006 * @since 11.0 1007 * @deprecated These {@code AsyncFunction} overloads of {@code transform} are 1008 * being renamed to {@code transformAsync}. (The {@code Function} 1009 * overloads are keeping the "transform" name.) This method will be removed in Guava release 1010 * 20.0. 1011 */ 1012 @Deprecated 1013 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 1014 AsyncFunction<? super I, ? extends O> function) { 1015 return transformAsync(input, function); 1016 } 1017 1018 /** 1019 * Returns a new {@code ListenableFuture} whose result is asynchronously 1020 * derived from the result of the given {@code Future}. More precisely, the 1021 * returned {@code Future} takes its result from a {@code Future} produced by 1022 * applying the given {@code AsyncFunction} to the result of the original 1023 * {@code Future}. Example: 1024 * 1025 * <pre> {@code 1026 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 1027 * AsyncFunction<RowKey, QueryResult> queryFunction = 1028 * new AsyncFunction<RowKey, QueryResult>() { 1029 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 1030 * return dataService.read(rowKey); 1031 * } 1032 * }; 1033 * ListenableFuture<QueryResult> queryFuture = 1034 * transform(rowKeyFuture, queryFunction, executor);}</pre> 1035 * 1036 * <p>When selecting an executor, note that {@code directExecutor} is 1037 * dangerous in some cases. See the discussion in the {@link 1038 * ListenableFuture#addListener ListenableFuture.addListener} documentation. 1039 * The documentation's warnings about "lightweight listeners" refer here to 1040 * the work done during {@code AsyncFunction.apply}, not to any work done to 1041 * complete the returned {@code Future}. 1042 * 1043 * <p>The returned {@code Future} attempts to keep its cancellation state in 1044 * sync with that of the input future and that of the future returned by the 1045 * chain function. That is, if the returned {@code Future} is cancelled, it 1046 * will attempt to cancel the other two, and if either of the other two is 1047 * cancelled, the returned {@code Future} will receive a callback in which it 1048 * will attempt to cancel itself. 1049 * 1050 * @param input The future to transform 1051 * @param function A function to transform the result of the input future 1052 * to the result of the output future 1053 * @param executor Executor to run the function in. 1054 * @return A future that holds result of the function (if the input succeeded) 1055 * or the original input's failure (if not) 1056 * @since 11.0 1057 * @deprecated These {@code AsyncFunction} overloads of {@code transform} are 1058 * being renamed to {@code transformAsync}. (The {@code Function} 1059 * overloads are keeping the "transform" name.) This method will be removed in Guava release 1060 * 20.0. 1061 */ 1062 @Deprecated 1063 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 1064 AsyncFunction<? super I, ? extends O> function, 1065 Executor executor) { 1066 return transformAsync(input, function, executor); 1067 } 1068 1069 /** 1070 * Returns a new {@code ListenableFuture} whose result is asynchronously derived from the result 1071 * of the given {@code Future}. More precisely, the returned {@code Future} takes its result from 1072 * a {@code Future} produced by applying the given {@code AsyncFunction} to the result of the 1073 * original {@code Future}. Example: 1074 * 1075 * <pre> {@code 1076 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 1077 * AsyncFunction<RowKey, QueryResult> queryFunction = 1078 * new AsyncFunction<RowKey, QueryResult>() { 1079 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 1080 * return dataService.read(rowKey); 1081 * } 1082 * }; 1083 * ListenableFuture<QueryResult> queryFuture = 1084 * transformAsync(rowKeyFuture, queryFunction);}</pre> 1085 * 1086 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 1087 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 1088 * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight 1089 * listeners" refer here to the work done during {@code AsyncFunction.apply}, not to any work done 1090 * to complete the returned {@code Future}. 1091 * 1092 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 1093 * input future and that of the future returned by the function. That is, if the returned {@code 1094 * Future} is cancelled, it will attempt to cancel the other two, and if either of the other two 1095 * is cancelled, the returned {@code Future} will receive a callback in which it will attempt to 1096 * cancel itself. 1097 * 1098 * @param input The future to transform 1099 * @param function A function to transform the result of the input future to the result of the 1100 * output future 1101 * @return A future that holds result of the function (if the input succeeded) or the original 1102 * input's failure (if not) 1103 * @since 19.0 (in 11.0 as {@code transform}) 1104 */ 1105 public static <I, O> ListenableFuture<O> transformAsync( 1106 ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) { 1107 AsyncChainingFuture<I, O> output = new AsyncChainingFuture<I, O>(input, function); 1108 input.addListener(output, directExecutor()); 1109 return output; 1110 } 1111 1112 /** 1113 * Returns a new {@code ListenableFuture} whose result is asynchronously derived from the result 1114 * of the given {@code Future}. More precisely, the returned {@code Future} takes its result from 1115 * a {@code Future} produced by applying the given {@code AsyncFunction} to the result of the 1116 * original {@code Future}. Example: 1117 * 1118 * <pre> {@code 1119 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 1120 * AsyncFunction<RowKey, QueryResult> queryFunction = 1121 * new AsyncFunction<RowKey, QueryResult>() { 1122 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 1123 * return dataService.read(rowKey); 1124 * } 1125 * }; 1126 * ListenableFuture<QueryResult> queryFuture = 1127 * transformAsync(rowKeyFuture, queryFunction, executor);}</pre> 1128 * 1129 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 1130 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1131 * documentation. The documentation's warnings about "lightweight listeners" refer here to the 1132 * work done during {@code AsyncFunction.apply}, not to any work done to complete the returned 1133 * {@code Future}. 1134 * 1135 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 1136 * input future and that of the future returned by the chain function. That is, if the returned 1137 * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the 1138 * other two is cancelled, the returned {@code Future} will receive a callback in which it will 1139 * attempt to cancel itself. 1140 * 1141 * @param input The future to transform 1142 * @param function A function to transform the result of the input future to the result of the 1143 * output future 1144 * @param executor Executor to run the function in. 1145 * @return A future that holds result of the function (if the input succeeded) or the original 1146 * input's failure (if not) 1147 * @since 19.0 (in 11.0 as {@code transform}) 1148 */ 1149 public static <I, O> ListenableFuture<O> transformAsync(ListenableFuture<I> input, 1150 AsyncFunction<? super I, ? extends O> function, Executor executor) { 1151 checkNotNull(executor); 1152 AsyncChainingFuture<I, O> output = new AsyncChainingFuture<I, O>(input, function); 1153 input.addListener(output, rejectionPropagatingExecutor(executor, output)); 1154 return output; 1155 } 1156 1157 /** 1158 * Returns an Executor that will propagate {@link RejectedExecutionException} from the delegate 1159 * executor to the given {@code future}. 1160 * 1161 * <p>Note, the returned executor can only be used once. 1162 */ 1163 private static Executor rejectionPropagatingExecutor( 1164 final Executor delegate, final AbstractFuture<?> future) { 1165 checkNotNull(delegate); 1166 if (delegate == directExecutor()) { 1167 // directExecutor() cannot throw RejectedExecutionException 1168 return delegate; 1169 } 1170 return new Executor() { 1171 volatile boolean thrownFromDelegate = true; 1172 @Override public void execute(final Runnable command) { 1173 try { 1174 delegate.execute(new Runnable() { 1175 @Override public void run() { 1176 thrownFromDelegate = false; 1177 command.run(); 1178 } 1179 }); 1180 } catch (RejectedExecutionException e) { 1181 if (thrownFromDelegate) { 1182 // wrap exception? 1183 future.setException(e); 1184 } 1185 // otherwise it must have been thrown from a transitive call and the delegate runnable 1186 // should have handled it. 1187 } 1188 } 1189 }; 1190 } 1191 1192 /** 1193 * Returns a new {@code ListenableFuture} whose result is the product of 1194 * applying the given {@code Function} to the result of the given {@code 1195 * Future}. Example: 1196 * 1197 * <pre> {@code 1198 * ListenableFuture<QueryResult> queryFuture = ...; 1199 * Function<QueryResult, List<Row>> rowsFunction = 1200 * new Function<QueryResult, List<Row>>() { 1201 * public List<Row> apply(QueryResult queryResult) { 1202 * return queryResult.getRows(); 1203 * } 1204 * }; 1205 * ListenableFuture<List<Row>> rowsFuture = 1206 * transform(queryFuture, rowsFunction);}</pre> 1207 * 1208 * <p>This overload, which does not accept an executor, uses {@code 1209 * directExecutor}, a dangerous choice in some cases. See the discussion in 1210 * the {@link ListenableFuture#addListener ListenableFuture.addListener} 1211 * documentation. The documentation's warnings about "lightweight listeners" 1212 * refer here to the work done during {@code Function.apply}. 1213 * 1214 * <p>The returned {@code Future} attempts to keep its cancellation state in 1215 * sync with that of the input future. That is, if the returned {@code Future} 1216 * is cancelled, it will attempt to cancel the input, and if the input is 1217 * cancelled, the returned {@code Future} will receive a callback in which it 1218 * will attempt to cancel itself. 1219 * 1220 * <p>An example use of this method is to convert a serializable object 1221 * returned from an RPC into a POJO. 1222 * 1223 * @param input The future to transform 1224 * @param function A Function to transform the results of the provided future 1225 * to the results of the returned future. This will be run in the thread 1226 * that notifies input it is complete. 1227 * @return A future that holds result of the transformation. 1228 * @since 9.0 (in 1.0 as {@code compose}) 1229 */ 1230 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 1231 final Function<? super I, ? extends O> function) { 1232 checkNotNull(function); 1233 ChainingFuture<I, O> output = new ChainingFuture<I, O>(input, function); 1234 input.addListener(output, directExecutor()); 1235 return output; 1236 } 1237 1238 /** 1239 * Returns a new {@code ListenableFuture} whose result is the product of 1240 * applying the given {@code Function} to the result of the given {@code 1241 * Future}. Example: 1242 * 1243 * <pre> {@code 1244 * ListenableFuture<QueryResult> queryFuture = ...; 1245 * Function<QueryResult, List<Row>> rowsFunction = 1246 * new Function<QueryResult, List<Row>>() { 1247 * public List<Row> apply(QueryResult queryResult) { 1248 * return queryResult.getRows(); 1249 * } 1250 * }; 1251 * ListenableFuture<List<Row>> rowsFuture = 1252 * transform(queryFuture, rowsFunction, executor);}</pre> 1253 * 1254 * <p>When selecting an executor, note that {@code directExecutor} is 1255 * dangerous in some cases. See the discussion in the {@link 1256 * ListenableFuture#addListener ListenableFuture.addListener} documentation. 1257 * The documentation's warnings about "lightweight listeners" refer here to 1258 * the work done during {@code Function.apply}. 1259 * 1260 * <p>The returned {@code Future} attempts to keep its cancellation state in 1261 * sync with that of the input future. That is, if the returned {@code Future} 1262 * is cancelled, it will attempt to cancel the input, and if the input is 1263 * cancelled, the returned {@code Future} will receive a callback in which it 1264 * will attempt to cancel itself. 1265 * 1266 * <p>An example use of this method is to convert a serializable object 1267 * returned from an RPC into a POJO. 1268 * 1269 * @param input The future to transform 1270 * @param function A Function to transform the results of the provided future 1271 * to the results of the returned future. 1272 * @param executor Executor to run the function in. 1273 * @return A future that holds result of the transformation. 1274 * @since 9.0 (in 2.0 as {@code compose}) 1275 */ 1276 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 1277 final Function<? super I, ? extends O> function, Executor executor) { 1278 checkNotNull(function); 1279 ChainingFuture<I, O> output = new ChainingFuture<I, O>(input, function); 1280 input.addListener(output, rejectionPropagatingExecutor(executor, output)); 1281 return output; 1282 } 1283 1284 /** 1285 * Like {@link #transform(ListenableFuture, Function)} except that the 1286 * transformation {@code function} is invoked on each call to 1287 * {@link Future#get() get()} on the returned future. 1288 * 1289 * <p>The returned {@code Future} reflects the input's cancellation 1290 * state directly, and any attempt to cancel the returned Future is likewise 1291 * passed through to the input Future. 1292 * 1293 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 1294 * only apply the timeout to the execution of the underlying {@code Future}, 1295 * <em>not</em> to the execution of the transformation function. 1296 * 1297 * <p>The primary audience of this method is callers of {@code transform} 1298 * who don't have a {@code ListenableFuture} available and 1299 * do not mind repeated, lazy function evaluation. 1300 * 1301 * @param input The future to transform 1302 * @param function A Function to transform the results of the provided future 1303 * to the results of the returned future. 1304 * @return A future that returns the result of the transformation. 1305 * @since 10.0 1306 */ 1307 @GwtIncompatible("TODO") 1308 @CheckReturnValue 1309 public static <I, O> Future<O> lazyTransform(final Future<I> input, 1310 final Function<? super I, ? extends O> function) { 1311 checkNotNull(input); 1312 checkNotNull(function); 1313 return new Future<O>() { 1314 1315 @Override 1316 public boolean cancel(boolean mayInterruptIfRunning) { 1317 return input.cancel(mayInterruptIfRunning); 1318 } 1319 1320 @Override 1321 public boolean isCancelled() { 1322 return input.isCancelled(); 1323 } 1324 1325 @Override 1326 public boolean isDone() { 1327 return input.isDone(); 1328 } 1329 1330 @Override 1331 public O get() throws InterruptedException, ExecutionException { 1332 return applyTransformation(input.get()); 1333 } 1334 1335 @Override 1336 public O get(long timeout, TimeUnit unit) 1337 throws InterruptedException, ExecutionException, TimeoutException { 1338 return applyTransformation(input.get(timeout, unit)); 1339 } 1340 1341 private O applyTransformation(I input) throws ExecutionException { 1342 try { 1343 return function.apply(input); 1344 } catch (Throwable t) { 1345 throw new ExecutionException(t); 1346 } 1347 } 1348 }; 1349 } 1350 1351 /** 1352 * An implementation of {@code ListenableFuture} that also implements 1353 * {@code Runnable} so that it can be used to nest ListenableFutures. 1354 * Once the passed-in {@code ListenableFuture} is complete, it calls the 1355 * passed-in {@code Function} to generate the result. 1356 * 1357 * <p>For historical reasons, this class has a special case in its exception 1358 * handling: If the given {@code AsyncFunction} throws an {@code 1359 * UndeclaredThrowableException}, {@code ChainingListenableFuture} unwraps it 1360 * and uses its <i>cause</i> as the output future's exception, rather than 1361 * using the {@code UndeclaredThrowableException} itself as it would for other 1362 * exception types. The reason for this is that {@code Futures.transform} used 1363 * to require a {@code Function}, whose {@code apply} method is not allowed to 1364 * throw checked exceptions. Nowadays, {@code Futures.transform} has an 1365 * overload that accepts an {@code AsyncFunction}, whose {@code apply} method 1366 * <i>is</i> allowed to throw checked exception. Users who wish to throw 1367 * checked exceptions should use that overload instead, and <a 1368 * href="http://code.google.com/p/guava-libraries/issues/detail?id=1548">we 1369 * should remove the {@code UndeclaredThrowableException} special case</a>. 1370 */ 1371 private abstract static class AbstractChainingFuture<I, O, F> 1372 extends AbstractFuture.TrustedFuture<O> implements Runnable { 1373 // In theory, this field might not be visible to a cancel() call in certain circumstances. For 1374 // details, see the comments on the fields of TimeoutFuture. 1375 @Nullable ListenableFuture<? extends I> inputFuture; 1376 @Nullable F function; 1377 1378 AbstractChainingFuture(ListenableFuture<? extends I> inputFuture, F function) { 1379 this.inputFuture = checkNotNull(inputFuture); 1380 this.function = checkNotNull(function); 1381 } 1382 1383 @Override 1384 public final void run() { 1385 try { 1386 ListenableFuture<? extends I> localInputFuture = inputFuture; 1387 F localFunction = function; 1388 if (isCancelled() | localInputFuture == null | localFunction == null) { 1389 return; 1390 } 1391 inputFuture = null; 1392 function = null; 1393 1394 I sourceResult; 1395 try { 1396 sourceResult = getUninterruptibly(localInputFuture); 1397 } catch (CancellationException e) { 1398 // Cancel this future and return. 1399 // At this point, inputFuture is cancelled and outputFuture doesn't 1400 // exist, so the value of mayInterruptIfRunning is irrelevant. 1401 cancel(false); 1402 return; 1403 } catch (ExecutionException e) { 1404 // Set the cause of the exception as this future's exception 1405 setException(e.getCause()); 1406 return; 1407 } 1408 doTransform(localFunction, sourceResult); 1409 } catch (UndeclaredThrowableException e) { 1410 // Set the cause of the exception as this future's exception 1411 setException(e.getCause()); 1412 } catch (Throwable t) { 1413 // This exception is irrelevant in this thread, but useful for the 1414 // client 1415 setException(t); 1416 } 1417 } 1418 1419 /** Template method for subtypes to actually run the transform. */ 1420 abstract void doTransform(F function, I result) throws Exception; 1421 1422 @Override final void done() { 1423 maybePropagateCancellation(inputFuture); 1424 this.inputFuture = null; 1425 this.function = null; 1426 } 1427 } 1428 1429 /** 1430 * A {@link AbstractChainingFuture} that delegates to an {@link AsyncFunction} and 1431 * {@link #setFuture(ListenableFuture)} to implement {@link #doTransform}. 1432 */ 1433 private static final class AsyncChainingFuture<I, O> 1434 extends AbstractChainingFuture<I, O, AsyncFunction<? super I, ? extends O>> { 1435 AsyncChainingFuture(ListenableFuture<? extends I> inputFuture, 1436 AsyncFunction<? super I, ? extends O> function) { 1437 super(inputFuture, function); 1438 } 1439 1440 @Override 1441 void doTransform(AsyncFunction<? super I, ? extends O> function, I input) throws Exception { 1442 ListenableFuture<? extends O> outputFuture = function.apply(input); 1443 checkNotNull(outputFuture, "AsyncFunction.apply returned null instead of a Future. " 1444 + "Did you mean to return immediateFuture(null)?"); 1445 setFuture(outputFuture); 1446 } 1447 } 1448 1449 /** 1450 * A {@link AbstractChainingFuture} that delegates to a {@link Function} and 1451 * {@link #set(Object)} to implement {@link #doTransform}. 1452 */ 1453 private static final class ChainingFuture<I, O> 1454 extends AbstractChainingFuture<I, O, Function<? super I, ? extends O>> { 1455 1456 ChainingFuture(ListenableFuture<? extends I> inputFuture, 1457 Function<? super I, ? extends O> function) { 1458 super(inputFuture, function); 1459 } 1460 1461 @Override 1462 void doTransform(Function<? super I, ? extends O> function, I input) { 1463 // TODO(lukes): move the UndeclaredThrowable catch block here? 1464 set(function.apply(input)); 1465 } 1466 } 1467 1468 /** 1469 * Returns a new {@code ListenableFuture} whose result is the product of 1470 * calling {@code get()} on the {@code Future} nested within the given {@code 1471 * Future}, effectively chaining the futures one after the other. Example: 1472 * 1473 * <pre> {@code 1474 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 1475 * ListenableFuture<String> dereferenced = dereference(nested);}</pre> 1476 * 1477 * <p>This call has the same cancellation and execution semantics as {@link 1478 * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code 1479 * Future} attempts to keep its cancellation state in sync with both the 1480 * input {@code Future} and the nested {@code Future}. The transformation 1481 * is very lightweight and therefore takes place in the same thread (either 1482 * the thread that called {@code dereference}, or the thread in which the 1483 * dereferenced future completes). 1484 * 1485 * @param nested The nested future to transform. 1486 * @return A future that holds result of the inner future. 1487 * @since 13.0 1488 */ 1489 @SuppressWarnings({"rawtypes", "unchecked"}) 1490 @CheckReturnValue 1491 public static <V> ListenableFuture<V> dereference( 1492 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 1493 return transformAsync((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); 1494 } 1495 1496 /** 1497 * Helper {@code Function} for {@link #dereference}. 1498 */ 1499 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 1500 new AsyncFunction<ListenableFuture<Object>, Object>() { 1501 @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 1502 return input; 1503 } 1504 }; 1505 1506 /** 1507 * Creates a new {@code ListenableFuture} whose value is a list containing the 1508 * values of all its input futures, if all succeed. If any input fails, the 1509 * returned future fails immediately. 1510 * 1511 * <p>The list of results is in the same order as the input list. 1512 * 1513 * <p>Canceling this future will attempt to cancel all the component futures, 1514 * and if any of the provided futures fails or is canceled, this one is, 1515 * too. 1516 * 1517 * @param futures futures to combine 1518 * @return a future that provides a list of the results of the component 1519 * futures 1520 * @since 10.0 1521 */ 1522 @Beta 1523 @SafeVarargs 1524 @CheckReturnValue 1525 public static <V> ListenableFuture<List<V>> allAsList( 1526 ListenableFuture<? extends V>... futures) { 1527 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 1528 } 1529 1530 /** 1531 * Creates a new {@code ListenableFuture} whose value is a list containing the 1532 * values of all its input futures, if all succeed. If any input fails, the 1533 * returned future fails immediately. 1534 * 1535 * <p>The list of results is in the same order as the input list. 1536 * 1537 * <p>Canceling this future will attempt to cancel all the component futures, 1538 * and if any of the provided futures fails or is canceled, this one is, 1539 * too. 1540 * 1541 * @param futures futures to combine 1542 * @return a future that provides a list of the results of the component 1543 * futures 1544 * @since 10.0 1545 */ 1546 @Beta 1547 @CheckReturnValue 1548 public static <V> ListenableFuture<List<V>> allAsList( 1549 Iterable<? extends ListenableFuture<? extends V>> futures) { 1550 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 1551 } 1552 1553 /** 1554 * Creates a new {@code ListenableFuture} whose result is set from the 1555 * supplied future when it completes. Cancelling the supplied future 1556 * will also cancel the returned future, but cancelling the returned 1557 * future will have no effect on the supplied future. 1558 * 1559 * @since 15.0 1560 */ 1561 @GwtIncompatible("TODO") 1562 @CheckReturnValue 1563 public static <V> ListenableFuture<V> nonCancellationPropagating( 1564 ListenableFuture<V> future) { 1565 return new NonCancellationPropagatingFuture<V>(future); 1566 } 1567 1568 /** 1569 * A wrapped future that does not propagate cancellation to its delegate. 1570 */ 1571 @GwtIncompatible("TODO") 1572 private static final class NonCancellationPropagatingFuture<V> 1573 extends AbstractFuture.TrustedFuture<V> { 1574 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 1575 delegate.addListener(new Runnable() { 1576 @Override public void run() { 1577 // This prevents cancellation from propagating because we don't assign delegate until 1578 // delegate is already done, so calling cancel() on it is a no-op. 1579 setFuture(delegate); 1580 } 1581 }, directExecutor()); 1582 } 1583 } 1584 1585 /** 1586 * Creates a new {@code ListenableFuture} whose value is a list containing the 1587 * values of all its successful input futures. The list of results is in the 1588 * same order as the input list, and if any of the provided futures fails or 1589 * is canceled, its corresponding position will contain {@code null} (which is 1590 * indistinguishable from the future having a successful value of 1591 * {@code null}). 1592 * 1593 * <p>Canceling this future will attempt to cancel all the component futures. 1594 * 1595 * @param futures futures to combine 1596 * @return a future that provides a list of the results of the component 1597 * futures 1598 * @since 10.0 1599 */ 1600 @Beta 1601 @SafeVarargs 1602 @CheckReturnValue 1603 public static <V> ListenableFuture<List<V>> successfulAsList( 1604 ListenableFuture<? extends V>... futures) { 1605 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1606 } 1607 1608 /** 1609 * Creates a new {@code ListenableFuture} whose value is a list containing the 1610 * values of all its successful input futures. The list of results is in the 1611 * same order as the input list, and if any of the provided futures fails or 1612 * is canceled, its corresponding position will contain {@code null} (which is 1613 * indistinguishable from the future having a successful value of 1614 * {@code null}). 1615 * 1616 * <p>Canceling this future will attempt to cancel all the component futures. 1617 * 1618 * @param futures futures to combine 1619 * @return a future that provides a list of the results of the component 1620 * futures 1621 * @since 10.0 1622 */ 1623 @Beta 1624 @CheckReturnValue 1625 public static <V> ListenableFuture<List<V>> successfulAsList( 1626 Iterable<? extends ListenableFuture<? extends V>> futures) { 1627 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1628 } 1629 1630 /** 1631 * Returns a list of delegate futures that correspond to the futures received in the order 1632 * that they complete. Delegate futures return the same value or throw the same exception 1633 * as the corresponding input future returns/throws. 1634 * 1635 * <p>Cancelling a delegate future has no effect on any input future, since the delegate future 1636 * does not correspond to a specific input future until the appropriate number of input 1637 * futures have completed. At that point, it is too late to cancel the input future. 1638 * The input future's result, which cannot be stored into the cancelled delegate future, 1639 * is ignored. 1640 * 1641 * @since 17.0 1642 */ 1643 @Beta 1644 @GwtIncompatible("TODO") 1645 @CheckReturnValue 1646 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder( 1647 Iterable<? extends ListenableFuture<? extends T>> futures) { 1648 // A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an 1649 // ArrayDeque 1650 final ConcurrentLinkedQueue<SettableFuture<T>> delegates = 1651 Queues.newConcurrentLinkedQueue(); 1652 ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder(); 1653 // Using SerializingExecutor here will ensure that each CompletionOrderListener executes 1654 // atomically and therefore that each returned future is guaranteed to be in completion order. 1655 // N.B. there are some cases where the use of this executor could have possibly surprising 1656 // effects when input futures finish at approximately the same time _and_ the output futures 1657 // have directExecutor listeners. In this situation, the listeners may end up running on a 1658 // different thread than if they were attached to the corresponding input future. We believe 1659 // this to be a negligible cost since: 1660 // 1. Using the directExecutor implies that your callback is safe to run on any thread. 1661 // 2. This would likely only be noticeable if you were doing something expensive or blocking on 1662 // a directExecutor listener on one of the output futures which is an antipattern anyway. 1663 SerializingExecutor executor = new SerializingExecutor(directExecutor()); 1664 for (final ListenableFuture<? extends T> future : futures) { 1665 SettableFuture<T> delegate = SettableFuture.create(); 1666 // Must make sure to add the delegate to the queue first in case the future is already done 1667 delegates.add(delegate); 1668 future.addListener(new Runnable() { 1669 @Override public void run() { 1670 delegates.remove().setFuture(future); 1671 } 1672 }, executor); 1673 listBuilder.add(delegate); 1674 } 1675 return listBuilder.build(); 1676 } 1677 1678 /** 1679 * Registers separate success and failure callbacks to be run when the {@code 1680 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1681 * complete} or, if the computation is already complete, immediately. 1682 * 1683 * <p>There is no guaranteed ordering of execution of callbacks, but any 1684 * callback added through this method is guaranteed to be called once the 1685 * computation is complete. 1686 * 1687 * Example: <pre> {@code 1688 * ListenableFuture<QueryResult> future = ...; 1689 * addCallback(future, 1690 * new FutureCallback<QueryResult> { 1691 * public void onSuccess(QueryResult result) { 1692 * storeInCache(result); 1693 * } 1694 * public void onFailure(Throwable t) { 1695 * reportError(t); 1696 * } 1697 * });}</pre> 1698 * 1699 * <p>This overload, which does not accept an executor, uses {@code 1700 * directExecutor}, a dangerous choice in some cases. See the discussion in 1701 * the {@link ListenableFuture#addListener ListenableFuture.addListener} 1702 * documentation. 1703 * 1704 * <p>For a more general interface to attach a completion listener to a 1705 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1706 * 1707 * @param future The future attach the callback to. 1708 * @param callback The callback to invoke when {@code future} is completed. 1709 * @since 10.0 1710 */ 1711 public static <V> void addCallback(ListenableFuture<V> future, 1712 FutureCallback<? super V> callback) { 1713 addCallback(future, callback, directExecutor()); 1714 } 1715 1716 /** 1717 * Registers separate success and failure callbacks to be run when the {@code 1718 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 1719 * complete} or, if the computation is already complete, immediately. 1720 * 1721 * <p>The callback is run in {@code executor}. 1722 * There is no guaranteed ordering of execution of callbacks, but any 1723 * callback added through this method is guaranteed to be called once the 1724 * computation is complete. 1725 * 1726 * Example: <pre> {@code 1727 * ListenableFuture<QueryResult> future = ...; 1728 * Executor e = ... 1729 * addCallback(future, 1730 * new FutureCallback<QueryResult> { 1731 * public void onSuccess(QueryResult result) { 1732 * storeInCache(result); 1733 * } 1734 * public void onFailure(Throwable t) { 1735 * reportError(t); 1736 * } 1737 * }, e);}</pre> 1738 * 1739 * <p>When selecting an executor, note that {@code directExecutor} is 1740 * dangerous in some cases. See the discussion in the {@link 1741 * ListenableFuture#addListener ListenableFuture.addListener} documentation. 1742 * 1743 * <p>For a more general interface to attach a completion listener to a 1744 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 1745 * 1746 * @param future The future attach the callback to. 1747 * @param callback The callback to invoke when {@code future} is completed. 1748 * @param executor The executor to run {@code callback} when the future 1749 * completes. 1750 * @since 10.0 1751 */ 1752 public static <V> void addCallback(final ListenableFuture<V> future, 1753 final FutureCallback<? super V> callback, Executor executor) { 1754 Preconditions.checkNotNull(callback); 1755 Runnable callbackListener = new Runnable() { 1756 @Override 1757 public void run() { 1758 final V value; 1759 try { 1760 // TODO(user): (Before Guava release), validate that this 1761 // is the thing for IE. 1762 value = getUninterruptibly(future); 1763 } catch (ExecutionException e) { 1764 callback.onFailure(e.getCause()); 1765 return; 1766 } catch (RuntimeException e) { 1767 callback.onFailure(e); 1768 return; 1769 } catch (Error e) { 1770 callback.onFailure(e); 1771 return; 1772 } 1773 callback.onSuccess(value); 1774 } 1775 }; 1776 future.addListener(callbackListener, executor); 1777 } 1778 1779 /** 1780 * Returns the result of {@link Future#get()}, converting most exceptions to a 1781 * new instance of the given checked exception type. This reduces boilerplate 1782 * for a common use of {@code Future} in which it is unnecessary to 1783 * programmatically distinguish between exception types or to extract other 1784 * information from the exception instance. 1785 * 1786 * <p>Exceptions from {@code Future.get} are treated as follows: 1787 * <ul> 1788 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1789 * {@code X} if the cause is a checked exception, an {@link 1790 * UncheckedExecutionException} if the cause is a {@code 1791 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1792 * {@code Error}. 1793 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1794 * restoring the interrupt). 1795 * <li>Any {@link CancellationException} is propagated untouched, as is any 1796 * other {@link RuntimeException} (though {@code get} implementations are 1797 * discouraged from throwing such exceptions). 1798 * </ul> 1799 * 1800 * <p>The overall principle is to continue to treat every checked exception as a 1801 * checked exception, every unchecked exception as an unchecked exception, and 1802 * every error as an error. In addition, the cause of any {@code 1803 * ExecutionException} is wrapped in order to ensure that the new stack trace 1804 * matches that of the current thread. 1805 * 1806 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1807 * public constructor that accepts zero or more arguments, all of type {@code 1808 * String} or {@code Throwable} (preferring constructors with at least one 1809 * {@code String}) and calling the constructor via reflection. If the 1810 * exception did not already have a cause, one is set by calling {@link 1811 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1812 * {@code IllegalArgumentException} is thrown. 1813 * 1814 * @throws X if {@code get} throws any checked exception except for an {@code 1815 * ExecutionException} whose cause is not itself a checked exception 1816 * @throws UncheckedExecutionException if {@code get} throws an {@code 1817 * ExecutionException} with a {@code RuntimeException} as its cause 1818 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1819 * with an {@code Error} as its cause 1820 * @throws CancellationException if {@code get} throws a {@code 1821 * CancellationException} 1822 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1823 * RuntimeException} or does not have a suitable constructor 1824 * @since 10.0 1825 * @deprecated Use {@link #getChecked(Future, Class)}. This method will be 1826 * removed in Guava release 20.0. 1827 */ 1828 @Deprecated 1829 @GwtIncompatible("reflection") 1830 public static <V, X extends Exception> V get( 1831 Future<V> future, Class<X> exceptionClass) throws X { 1832 return getChecked(future, exceptionClass); 1833 } 1834 1835 /** 1836 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1837 * exceptions to a new instance of the given checked exception type. This 1838 * reduces boilerplate for a common use of {@code Future} in which it is 1839 * unnecessary to programmatically distinguish between exception types or to 1840 * extract other information from the exception instance. 1841 * 1842 * <p>Exceptions from {@code Future.get} are treated as follows: 1843 * <ul> 1844 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1845 * {@code X} if the cause is a checked exception, an {@link 1846 * UncheckedExecutionException} if the cause is a {@code 1847 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1848 * {@code Error}. 1849 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1850 * restoring the interrupt). 1851 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1852 * <li>Any {@link CancellationException} is propagated untouched, as is any 1853 * other {@link RuntimeException} (though {@code get} implementations are 1854 * discouraged from throwing such exceptions). 1855 * </ul> 1856 * 1857 * <p>The overall principle is to continue to treat every checked exception as a 1858 * checked exception, every unchecked exception as an unchecked exception, and 1859 * every error as an error. In addition, the cause of any {@code 1860 * ExecutionException} is wrapped in order to ensure that the new stack trace 1861 * matches that of the current thread. 1862 * 1863 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1864 * public constructor that accepts zero or more arguments, all of type {@code 1865 * String} or {@code Throwable} (preferring constructors with at least one 1866 * {@code String}) and calling the constructor via reflection. If the 1867 * exception did not already have a cause, one is set by calling {@link 1868 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1869 * {@code IllegalArgumentException} is thrown. 1870 * 1871 * @throws X if {@code get} throws any checked exception except for an {@code 1872 * ExecutionException} whose cause is not itself a checked exception 1873 * @throws UncheckedExecutionException if {@code get} throws an {@code 1874 * ExecutionException} with a {@code RuntimeException} as its cause 1875 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1876 * with an {@code Error} as its cause 1877 * @throws CancellationException if {@code get} throws a {@code 1878 * CancellationException} 1879 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1880 * RuntimeException} or does not have a suitable constructor 1881 * @since 10.0 1882 * @deprecated Use {@link #getChecked(Future, Class, long, TimeUnit)}, noting 1883 * the change in parameter order. This method will be removed in Guava 1884 * release 20.0. 1885 */ 1886 @Deprecated 1887 @GwtIncompatible("reflection") 1888 public static <V, X extends Exception> V get( 1889 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 1890 throws X { 1891 return getChecked(future, exceptionClass, timeout, unit); 1892 } 1893 1894 /** 1895 * Returns the result of {@link Future#get()}, converting most exceptions to a 1896 * new instance of the given checked exception type. This reduces boilerplate 1897 * for a common use of {@code Future} in which it is unnecessary to 1898 * programmatically distinguish between exception types or to extract other 1899 * information from the exception instance. 1900 * 1901 * <p>Exceptions from {@code Future.get} are treated as follows: 1902 * <ul> 1903 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1904 * {@code X} if the cause is a checked exception, an {@link 1905 * UncheckedExecutionException} if the cause is a {@code 1906 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1907 * {@code Error}. 1908 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1909 * restoring the interrupt). 1910 * <li>Any {@link CancellationException} is propagated untouched, as is any 1911 * other {@link RuntimeException} (though {@code get} implementations are 1912 * discouraged from throwing such exceptions). 1913 * </ul> 1914 * 1915 * <p>The overall principle is to continue to treat every checked exception as a 1916 * checked exception, every unchecked exception as an unchecked exception, and 1917 * every error as an error. In addition, the cause of any {@code 1918 * ExecutionException} is wrapped in order to ensure that the new stack trace 1919 * matches that of the current thread. 1920 * 1921 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1922 * public constructor that accepts zero or more arguments, all of type {@code 1923 * String} or {@code Throwable} (preferring constructors with at least one 1924 * {@code String}) and calling the constructor via reflection. If the 1925 * exception did not already have a cause, one is set by calling {@link 1926 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1927 * {@code IllegalArgumentException} is thrown. 1928 * 1929 * @throws X if {@code get} throws any checked exception except for an {@code 1930 * ExecutionException} whose cause is not itself a checked exception 1931 * @throws UncheckedExecutionException if {@code get} throws an {@code 1932 * ExecutionException} with a {@code RuntimeException} as its cause 1933 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1934 * with an {@code Error} as its cause 1935 * @throws CancellationException if {@code get} throws a {@code 1936 * CancellationException} 1937 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1938 * RuntimeException} or does not have a suitable constructor 1939 * @since 19.0 (in 10.0 as {@code get}) 1940 */ 1941 @GwtIncompatible("reflection") 1942 public static <V, X extends Exception> V getChecked( 1943 Future<V> future, Class<X> exceptionClass) throws X { 1944 return FuturesGetChecked.getChecked(future, exceptionClass); 1945 } 1946 1947 /** 1948 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 1949 * exceptions to a new instance of the given checked exception type. This 1950 * reduces boilerplate for a common use of {@code Future} in which it is 1951 * unnecessary to programmatically distinguish between exception types or to 1952 * extract other information from the exception instance. 1953 * 1954 * <p>Exceptions from {@code Future.get} are treated as follows: 1955 * <ul> 1956 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 1957 * {@code X} if the cause is a checked exception, an {@link 1958 * UncheckedExecutionException} if the cause is a {@code 1959 * RuntimeException}, or an {@link ExecutionError} if the cause is an 1960 * {@code Error}. 1961 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 1962 * restoring the interrupt). 1963 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1964 * <li>Any {@link CancellationException} is propagated untouched, as is any 1965 * other {@link RuntimeException} (though {@code get} implementations are 1966 * discouraged from throwing such exceptions). 1967 * </ul> 1968 * 1969 * <p>The overall principle is to continue to treat every checked exception as a 1970 * checked exception, every unchecked exception as an unchecked exception, and 1971 * every error as an error. In addition, the cause of any {@code 1972 * ExecutionException} is wrapped in order to ensure that the new stack trace 1973 * matches that of the current thread. 1974 * 1975 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 1976 * public constructor that accepts zero or more arguments, all of type {@code 1977 * String} or {@code Throwable} (preferring constructors with at least one 1978 * {@code String}) and calling the constructor via reflection. If the 1979 * exception did not already have a cause, one is set by calling {@link 1980 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 1981 * {@code IllegalArgumentException} is thrown. 1982 * 1983 * @throws X if {@code get} throws any checked exception except for an {@code 1984 * ExecutionException} whose cause is not itself a checked exception 1985 * @throws UncheckedExecutionException if {@code get} throws an {@code 1986 * ExecutionException} with a {@code RuntimeException} as its cause 1987 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 1988 * with an {@code Error} as its cause 1989 * @throws CancellationException if {@code get} throws a {@code 1990 * CancellationException} 1991 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 1992 * RuntimeException} or does not have a suitable constructor 1993 * @since 19.0 (in 10.0 as {@code get} and with different parameter order) 1994 */ 1995 @GwtIncompatible("reflection") 1996 public static <V, X extends Exception> V getChecked( 1997 Future<V> future, Class<X> exceptionClass, long timeout, TimeUnit unit) 1998 throws X { 1999 return FuturesGetChecked.getChecked(future, exceptionClass, timeout, unit); 2000 } 2001 2002 /** 2003 * Returns the result of calling {@link Future#get()} uninterruptibly on a 2004 * task known not to throw a checked exception. This makes {@code Future} more 2005 * suitable for lightweight, fast-running tasks that, barring bugs in the 2006 * code, will not fail. This gives it exception-handling behavior similar to 2007 * that of {@code ForkJoinTask.join}. 2008 * 2009 * <p>Exceptions from {@code Future.get} are treated as follows: 2010 * <ul> 2011 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 2012 * {@link UncheckedExecutionException} (if the cause is an {@code 2013 * Exception}) or {@link ExecutionError} (if the cause is an {@code 2014 * Error}). 2015 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 2016 * call. The interrupt is restored before {@code getUnchecked} returns. 2017 * <li>Any {@link CancellationException} is propagated untouched. So is any 2018 * other {@link RuntimeException} ({@code get} implementations are 2019 * discouraged from throwing such exceptions). 2020 * </ul> 2021 * 2022 * <p>The overall principle is to eliminate all checked exceptions: to loop to 2023 * avoid {@code InterruptedException}, to pass through {@code 2024 * CancellationException}, and to wrap any exception from the underlying 2025 * computation in an {@code UncheckedExecutionException} or {@code 2026 * ExecutionError}. 2027 * 2028 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 2029 * {@link Uninterruptibles#getUninterruptibly(Future)}. 2030 * 2031 * @throws UncheckedExecutionException if {@code get} throws an {@code 2032 * ExecutionException} with an {@code Exception} as its cause 2033 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 2034 * with an {@code Error} as its cause 2035 * @throws CancellationException if {@code get} throws a {@code 2036 * CancellationException} 2037 * @since 10.0 2038 */ 2039 @GwtIncompatible("TODO") 2040 public static <V> V getUnchecked(Future<V> future) { 2041 checkNotNull(future); 2042 try { 2043 return getUninterruptibly(future); 2044 } catch (ExecutionException e) { 2045 wrapAndThrowUnchecked(e.getCause()); 2046 throw new AssertionError(); 2047 } 2048 } 2049 2050 @GwtIncompatible("TODO") 2051 private static void wrapAndThrowUnchecked(Throwable cause) { 2052 if (cause instanceof Error) { 2053 throw new ExecutionError((Error) cause); 2054 } 2055 /* 2056 * It's a non-Error, non-Exception Throwable. From my survey of such 2057 * classes, I believe that most users intended to extend Exception, so we'll 2058 * treat it like an Exception. 2059 */ 2060 throw new UncheckedExecutionException(cause); 2061 } 2062 2063 /* 2064 * Arguably we don't need a timed getUnchecked because any operation slow 2065 * enough to require a timeout is heavyweight enough to throw a checked 2066 * exception and therefore be inappropriate to use with getUnchecked. Further, 2067 * it's not clear that converting the checked TimeoutException to a 2068 * RuntimeException -- especially to an UncheckedExecutionException, since it 2069 * wasn't thrown by the computation -- makes sense, and if we don't convert 2070 * it, the user still has to write a try-catch block. 2071 * 2072 * If you think you would use this method, let us know. You might also also 2073 * look into the Fork-Join framework: 2074 * http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html 2075 */ 2076 2077 /** Used for {@link #allAsList} and {@link #successfulAsList}. */ 2078 private static final class ListFuture<V> extends CollectionFuture<V, List<V>> { 2079 ListFuture(ImmutableCollection<? extends ListenableFuture<? extends V>> futures, 2080 boolean allMustSucceed) { 2081 init(new ListFutureRunningState(futures, allMustSucceed)); 2082 } 2083 2084 private final class ListFutureRunningState extends CollectionFutureRunningState { 2085 ListFutureRunningState(ImmutableCollection<? extends ListenableFuture<? extends V>> futures, 2086 boolean allMustSucceed) { 2087 super(futures, allMustSucceed); 2088 } 2089 2090 @Override 2091 public List<V> combine(List<Optional<V>> values) { 2092 List<V> result = Lists.newArrayList(); 2093 for (Optional<V> element : values) { 2094 result.add(element != null ? element.orNull() : null); 2095 } 2096 return Collections.unmodifiableList(result); 2097 } 2098 } 2099 } 2100 2101 /** 2102 * A checked future that uses a function to map from exceptions to the 2103 * appropriate checked type. 2104 */ 2105 @GwtIncompatible("TODO") 2106 private static class MappingCheckedFuture<V, X extends Exception> extends 2107 AbstractCheckedFuture<V, X> { 2108 2109 final Function<? super Exception, X> mapper; 2110 2111 MappingCheckedFuture(ListenableFuture<V> delegate, 2112 Function<? super Exception, X> mapper) { 2113 super(delegate); 2114 2115 this.mapper = checkNotNull(mapper); 2116 } 2117 2118 @Override 2119 protected X mapException(Exception e) { 2120 return mapper.apply(e); 2121 } 2122 } 2123}