001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Preconditions.checkState; 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 021 022import com.google.common.annotations.Beta; 023import com.google.common.annotations.GwtCompatible; 024import com.google.common.annotations.GwtIncompatible; 025import com.google.common.base.Function; 026import com.google.common.base.Preconditions; 027import com.google.common.collect.ImmutableList; 028import com.google.common.collect.Queues; 029import com.google.common.util.concurrent.CollectionFuture.ListFuture; 030import com.google.common.util.concurrent.ImmediateFuture.ImmediateCancelledFuture; 031import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedCheckedFuture; 032import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedFuture; 033import com.google.common.util.concurrent.ImmediateFuture.ImmediateSuccessfulCheckedFuture; 034import com.google.common.util.concurrent.ImmediateFuture.ImmediateSuccessfulFuture; 035import com.google.errorprone.annotations.CanIgnoreReturnValue; 036import java.util.List; 037import java.util.concurrent.Callable; 038import java.util.concurrent.CancellationException; 039import java.util.concurrent.ConcurrentLinkedQueue; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.Executor; 042import java.util.concurrent.Future; 043import java.util.concurrent.ScheduledExecutorService; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.TimeoutException; 046import javax.annotation.Nullable; 047 048/** 049 * Static utility methods pertaining to the {@link Future} interface. 050 * 051 * <p>Many of these methods use the {@link ListenableFuture} API; consult the Guava User Guide 052 * article on <a href="https://github.com/google/guava/wiki/ListenableFutureExplained"> 053 * {@code ListenableFuture}</a>. 054 * 055 * @author Kevin Bourrillion 056 * @author Nishant Thakkar 057 * @author Sven Mawson 058 * @since 1.0 059 */ 060@Beta 061@GwtCompatible(emulated = true) 062public final class Futures extends GwtFuturesCatchingSpecialization { 063 064 // A note on memory visibility. 065 // Many of the utilities in this class (transform, withFallback, withTimeout, asList, combine) 066 // have two requirements that significantly complicate their design. 067 // 1. Cancellation should propagate from the returned future to the input future(s). 068 // 2. The returned futures shouldn't unnecessarily 'pin' their inputs after completion. 069 // 070 // A consequence of these requirements is that the delegate futures cannot be stored in 071 // final fields. 072 // 073 // For simplicity the rest of this description will discuss Futures.catching since it is the 074 // simplest instance, though very similar descriptions apply to many other classes in this file. 075 // 076 // In the constructor of AbstractCatchingFuture, the delegate future is assigned to a field 077 // 'inputFuture'. That field is non-final and non-volatile. There are 2 places where the 078 // 'inputFuture' field is read and where we will have to consider visibility of the write 079 // operation in the constructor. 080 // 081 // 1. In the listener that performs the callback. In this case it is fine since inputFuture is 082 // assigned prior to calling addListener, and addListener happens-before any invocation of the 083 // listener. Notably, this means that 'volatile' is unnecessary to make 'inputFuture' visible 084 // to the listener. 085 // 086 // 2. In done() where we may propagate cancellation to the input. In this case it is _not_ fine. 087 // There is currently nothing that enforces that the write to inputFuture in the constructor is 088 // visible to done(). This is because there is no happens before edge between the write and a 089 // (hypothetical) unsafe read by our caller. Note: adding 'volatile' does not fix this issue, 090 // it would just add an edge such that if done() observed non-null, then it would also 091 // definitely observe all earlier writes, but we still have no guarantee that done() would see 092 // the inital write (just stronger guarantees if it does). 093 // 094 // See: http://cs.oswego.edu/pipermail/concurrency-interest/2015-January/013800.html 095 // For a (long) discussion about this specific issue and the general futility of life. 096 // 097 // For the time being we are OK with the problem discussed above since it requires a caller to 098 // introduce a very specific kind of data-race. And given the other operations performed by these 099 // methods that involve volatile read/write operations, in practice there is no issue. Also, the 100 // way in such a visibility issue would surface is most likely as a failure of cancel() to 101 // propagate to the input. Cancellation propagation is fundamentally racy so this is fine. 102 // 103 // Future versions of the JMM may revise safe construction semantics in such a way that we can 104 // safely publish these objects and we won't need this whole discussion. 105 // TODO(user,lukes): consider adding volatile to all these fields since in current known JVMs 106 // that should resolve the issue. This comes at the cost of adding more write barriers to the 107 // implementations. 108 109 private Futures() {} 110 111 /** 112 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} and a {@link Function} 113 * that maps from {@link Exception} instances into the appropriate checked type. 114 * 115 * <p><b>Warning:</b> We recommend against using {@code CheckedFuture} in new projects. {@code 116 * CheckedFuture} is difficult to build libraries atop. {@code CheckedFuture} ports of methods 117 * like {@link Futures#transformAsync} have historically had bugs, and some of these bugs are 118 * necessary, unavoidable consequences of the {@code CheckedFuture} API. Additionally, {@code 119 * CheckedFuture} encourages users to take exceptions from one thread and rethrow them in another, 120 * producing confusing stack traces. 121 * 122 * <p>The given mapping function will be applied to an {@link InterruptedException}, a {@link 123 * CancellationException}, or an {@link ExecutionException}. See {@link Future#get()} for details 124 * on the exceptions thrown. 125 * 126 * @since 9.0 (source-compatible since 1.0) 127 */ 128 @GwtIncompatible // TODO 129 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 130 ListenableFuture<V> future, Function<? super Exception, X> mapper) { 131 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 132 } 133 134 /** 135 * Creates a {@code ListenableFuture} which has its value set immediately upon construction. The 136 * getters just return the value. This {@code Future} can't be canceled or timed out and its 137 * {@code isDone()} method always returns {@code true}. 138 */ 139 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 140 if (value == null) { 141 // This cast is safe because null is assignable to V for all V (i.e. it is covariant) 142 @SuppressWarnings({"unchecked", "rawtypes"}) 143 ListenableFuture<V> typedNull = (ListenableFuture) ImmediateSuccessfulFuture.NULL; 144 return typedNull; 145 } 146 return new ImmediateSuccessfulFuture<V>(value); 147 } 148 149 /** 150 * Returns a {@code CheckedFuture} which has its value set immediately upon construction. 151 * 152 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 153 * returns {@code true}. Calling {@code get()} or {@code checkedGet()} will immediately return the 154 * provided value. 155 */ 156 @GwtIncompatible // TODO 157 public static <V, X extends Exception> CheckedFuture<V, X> immediateCheckedFuture( 158 @Nullable V value) { 159 return new ImmediateSuccessfulCheckedFuture<V, X>(value); 160 } 161 162 /** 163 * Returns a {@code ListenableFuture} which has an exception set immediately upon construction. 164 * 165 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 166 * returns {@code true}. Calling {@code get()} will immediately throw the provided {@code 167 * Throwable} wrapped in an {@code ExecutionException}. 168 */ 169 public static <V> ListenableFuture<V> immediateFailedFuture(Throwable throwable) { 170 checkNotNull(throwable); 171 return new ImmediateFailedFuture<V>(throwable); 172 } 173 174 /** 175 * Creates a {@code ListenableFuture} which is cancelled immediately upon construction, so that 176 * {@code isCancelled()} always returns {@code true}. 177 * 178 * @since 14.0 179 */ 180 public static <V> ListenableFuture<V> immediateCancelledFuture() { 181 return new ImmediateCancelledFuture<V>(); 182 } 183 184 /** 185 * Returns a {@code CheckedFuture} which has an exception set immediately upon construction. 186 * 187 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 188 * returns {@code true}. Calling {@code get()} will immediately throw the provided {@code 189 * Exception} wrapped in an {@code ExecutionException}, and calling {@code checkedGet()} will 190 * throw the provided exception itself. 191 */ 192 @GwtIncompatible // TODO 193 public static <V, X extends Exception> CheckedFuture<V, X> immediateFailedCheckedFuture( 194 X exception) { 195 checkNotNull(exception); 196 return new ImmediateFailedCheckedFuture<V, X>(exception); 197 } 198 199 /** 200 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 201 * primary input fails with the given {@code exceptionType}, from the result provided by the 202 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 203 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 204 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 205 * Future}. 206 * 207 * <p>Usage example: 208 * 209 * <pre> {@code 210 * ListenableFuture<Integer> fetchCounterFuture = ...; 211 * 212 * // Falling back to a zero counter in case an exception happens when 213 * // processing the RPC to fetch counters. 214 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 215 * fetchCounterFuture, FetchException.class, 216 * new Function<FetchException, Integer>() { 217 * public Integer apply(FetchException e) { 218 * return 0; 219 * } 220 * });}</pre> 221 * 222 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 223 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 224 * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight 225 * listeners" refer here to the work done during {@code Function.apply}. 226 * 227 * @param input the primary input {@code Future} 228 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 229 * type is matched against the input's exception. "The input's exception" means the cause of 230 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 231 * different kind of exception, that exception itself. To avoid hiding bugs and other 232 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 233 * Throwable.class} in particular. 234 * @param fallback the {@link Function} to be called if {@code input} fails with the expected 235 * exception type. The function's argument is the input's exception. "The input's exception" 236 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 237 * {@code get()} throws a different kind of exception, that exception itself. 238 * @since 19.0 239 */ 240 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 241 public static <V, X extends Throwable> ListenableFuture<V> catching( 242 ListenableFuture<? extends V> input, 243 Class<X> exceptionType, 244 Function<? super X, ? extends V> fallback) { 245 return AbstractCatchingFuture.create(input, exceptionType, fallback); 246 } 247 248 /** 249 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 250 * primary input fails with the given {@code exceptionType}, from the result provided by the 251 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 252 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 253 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 254 * Future}. 255 * 256 * <p>Usage example: 257 * 258 * <pre> {@code 259 * ListenableFuture<Integer> fetchCounterFuture = ...; 260 * 261 * // Falling back to a zero counter in case an exception happens when 262 * // processing the RPC to fetch counters. 263 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 264 * fetchCounterFuture, FetchException.class, 265 * new Function<FetchException, Integer>() { 266 * public Integer apply(FetchException e) { 267 * return 0; 268 * } 269 * }, directExecutor());}</pre> 270 * 271 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 272 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 273 * documentation. The documentation's warnings about "lightweight listeners" refer here to the 274 * work done during {@code Function.apply}. 275 * 276 * @param input the primary input {@code Future} 277 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 278 * type is matched against the input's exception. "The input's exception" means the cause of 279 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 280 * different kind of exception, that exception itself. To avoid hiding bugs and other 281 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 282 * Throwable.class} in particular. 283 * @param fallback the {@link Function} to be called if {@code input} fails with the expected 284 * exception type. The function's argument is the input's exception. "The input's exception" 285 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 286 * {@code get()} throws a different kind of exception, that exception itself. 287 * @param executor the executor that runs {@code fallback} if {@code input} fails 288 * @since 19.0 289 */ 290 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 291 public static <V, X extends Throwable> ListenableFuture<V> catching( 292 ListenableFuture<? extends V> input, 293 Class<X> exceptionType, 294 Function<? super X, ? extends V> fallback, 295 Executor executor) { 296 return AbstractCatchingFuture.create(input, exceptionType, fallback, executor); 297 } 298 299 /** 300 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 301 * primary input fails with the given {@code exceptionType}, from the result provided by the 302 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 303 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 304 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 305 * {@code Future}. 306 * 307 * <p>Usage examples: 308 * 309 * <pre> {@code 310 * ListenableFuture<Integer> fetchCounterFuture = ...; 311 * 312 * // Falling back to a zero counter in case an exception happens when 313 * // processing the RPC to fetch counters. 314 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 315 * fetchCounterFuture, FetchException.class, 316 * new AsyncFunction<FetchException, Integer>() { 317 * public ListenableFuture<Integer> apply(FetchException e) { 318 * return immediateFuture(0); 319 * } 320 * });}</pre> 321 * 322 * <p>The fallback can also choose to propagate the original exception when desired: 323 * 324 * <pre> {@code 325 * ListenableFuture<Integer> fetchCounterFuture = ...; 326 * 327 * // Falling back to a zero counter only in case the exception was a 328 * // TimeoutException. 329 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 330 * fetchCounterFuture, FetchException.class, 331 * new AsyncFunction<FetchException, Integer>() { 332 * public ListenableFuture<Integer> apply(FetchException e) 333 * throws FetchException { 334 * if (omitDataOnFetchFailure) { 335 * return immediateFuture(0); 336 * } 337 * throw e; 338 * } 339 * });}</pre> 340 * 341 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 342 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 343 * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight 344 * listeners" refer here to the work done during {@code AsyncFunction.apply}, not to any work done 345 * to complete the returned {@code Future}. 346 * 347 * @param input the primary input {@code Future} 348 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 349 * type is matched against the input's exception. "The input's exception" means the cause of 350 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 351 * different kind of exception, that exception itself. To avoid hiding bugs and other 352 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 353 * Throwable.class} in particular. 354 * @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected 355 * exception type. The function's argument is the input's exception. "The input's exception" 356 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 357 * {@code get()} throws a different kind of exception, that exception itself. 358 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 359 */ 360 @CanIgnoreReturnValue // TODO(kak): @CheckReturnValue 361 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 362 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 363 ListenableFuture<? extends V> input, 364 Class<X> exceptionType, 365 AsyncFunction<? super X, ? extends V> fallback) { 366 return AbstractCatchingFuture.create(input, exceptionType, fallback); 367 } 368 369 /** 370 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 371 * primary input fails with the given {@code exceptionType}, from the result provided by the 372 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 373 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 374 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 375 * {@code Future}. 376 * 377 * <p>Usage examples: 378 * 379 * <pre> {@code 380 * ListenableFuture<Integer> fetchCounterFuture = ...; 381 * 382 * // Falling back to a zero counter in case an exception happens when 383 * // processing the RPC to fetch counters. 384 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 385 * fetchCounterFuture, FetchException.class, 386 * new AsyncFunction<FetchException, Integer>() { 387 * public ListenableFuture<Integer> apply(FetchException e) { 388 * return immediateFuture(0); 389 * } 390 * }, directExecutor());}</pre> 391 * 392 * <p>The fallback can also choose to propagate the original exception when desired: 393 * 394 * <pre> {@code 395 * ListenableFuture<Integer> fetchCounterFuture = ...; 396 * 397 * // Falling back to a zero counter only in case the exception was a 398 * // TimeoutException. 399 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 400 * fetchCounterFuture, FetchException.class, 401 * new AsyncFunction<FetchException, Integer>() { 402 * public ListenableFuture<Integer> apply(FetchException e) 403 * throws FetchException { 404 * if (omitDataOnFetchFailure) { 405 * return immediateFuture(0); 406 * } 407 * throw e; 408 * } 409 * }, directExecutor());}</pre> 410 * 411 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 412 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 413 * documentation. The documentation's warnings about "lightweight listeners" refer here to the 414 * work done during {@code AsyncFunction.apply}, not to any work done to complete the returned 415 * {@code Future}. 416 * 417 * @param input the primary input {@code Future} 418 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 419 * type is matched against the input's exception. "The input's exception" means the cause of 420 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 421 * different kind of exception, that exception itself. To avoid hiding bugs and other 422 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 423 * Throwable.class} in particular. 424 * @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected 425 * exception type. The function's argument is the input's exception. "The input's exception" 426 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 427 * {@code get()} throws a different kind of exception, that exception itself. 428 * @param executor the executor that runs {@code fallback} if {@code input} fails 429 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 430 */ 431 @CanIgnoreReturnValue // TODO(kak): @CheckReturnValue 432 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 433 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 434 ListenableFuture<? extends V> input, 435 Class<X> exceptionType, 436 AsyncFunction<? super X, ? extends V> fallback, 437 Executor executor) { 438 return AbstractCatchingFuture.create(input, exceptionType, fallback, executor); 439 } 440 441 /** 442 * Returns a future that delegates to another but will finish early (via a {@link 443 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified duration expires. 444 * 445 * <p>The delegate future is interrupted and cancelled if it times out. 446 * 447 * @param delegate The future to delegate to. 448 * @param time when to timeout the future 449 * @param unit the time unit of the time parameter 450 * @param scheduledExecutor The executor service to enforce the timeout. 451 * 452 * @since 19.0 453 */ 454 @GwtIncompatible // java.util.concurrent.ScheduledExecutorService 455 public static <V> ListenableFuture<V> withTimeout( 456 ListenableFuture<V> delegate, 457 long time, 458 TimeUnit unit, 459 ScheduledExecutorService scheduledExecutor) { 460 return TimeoutFuture.create(delegate, time, unit, scheduledExecutor); 461 } 462 463 /** 464 * Returns a new {@code Future} whose result is asynchronously derived from the result of the 465 * given {@code Future}. If the given {@code Future} fails, the returned {@code Future} fails with 466 * the same exception (and the function is not invoked). 467 * 468 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 469 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 470 * Example usage: 471 * 472 * <pre> {@code 473 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 474 * AsyncFunction<RowKey, QueryResult> queryFunction = 475 * new AsyncFunction<RowKey, QueryResult>() { 476 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 477 * return dataService.read(rowKey); 478 * } 479 * }; 480 * ListenableFuture<QueryResult> queryFuture = 481 * transformAsync(rowKeyFuture, queryFunction);}</pre> 482 * 483 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 484 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 485 * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight 486 * listeners" refer here to the work done during {@code AsyncFunction.apply}, not to any work done 487 * to complete the returned {@code Future}. 488 * 489 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 490 * input future and that of the future returned by the function. That is, if the returned {@code 491 * Future} is cancelled, it will attempt to cancel the other two, and if either of the other two 492 * is cancelled, the returned {@code Future} will receive a callback in which it will attempt to 493 * cancel itself. 494 * 495 * @param input The future to transform 496 * @param function A function to transform the result of the input future to the result of the 497 * output future 498 * @return A future that holds result of the function (if the input succeeded) or the original 499 * input's failure (if not) 500 * @since 19.0 (in 11.0 as {@code transform}) 501 */ 502 public static <I, O> ListenableFuture<O> transformAsync( 503 ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) { 504 return AbstractTransformFuture.create(input, function); 505 } 506 507 /** 508 * Returns a new {@code Future} whose result is asynchronously derived from the result of the 509 * given {@code Future}. If the given {@code Future} fails, the returned {@code Future} fails with 510 * the same exception (and the function is not invoked). 511 * 512 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 513 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 514 * Example usage: 515 * 516 * <pre> {@code 517 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 518 * AsyncFunction<RowKey, QueryResult> queryFunction = 519 * new AsyncFunction<RowKey, QueryResult>() { 520 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 521 * return dataService.read(rowKey); 522 * } 523 * }; 524 * ListenableFuture<QueryResult> queryFuture = 525 * transformAsync(rowKeyFuture, queryFunction, executor);}</pre> 526 * 527 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 528 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 529 * documentation. The documentation's warnings about "lightweight listeners" refer here to the 530 * work done during {@code AsyncFunction.apply}, not to any work done to complete the returned 531 * {@code Future}. 532 * 533 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 534 * input future and that of the future returned by the chain function. That is, if the returned 535 * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the 536 * other two is cancelled, the returned {@code Future} will receive a callback in which it will 537 * attempt to cancel itself. 538 * 539 * @param input The future to transform 540 * @param function A function to transform the result of the input future to the result of the 541 * output future 542 * @param executor Executor to run the function in. 543 * @return A future that holds result of the function (if the input succeeded) or the original 544 * input's failure (if not) 545 * @since 19.0 (in 11.0 as {@code transform}) 546 */ 547 public static <I, O> ListenableFuture<O> transformAsync( 548 ListenableFuture<I> input, 549 AsyncFunction<? super I, ? extends O> function, 550 Executor executor) { 551 return AbstractTransformFuture.create(input, function, executor); 552 } 553 554 /** 555 * Returns a new {@code Future} whose result is derived from the result of the given {@code 556 * Future}. If {@code input} fails, the returned {@code Future} fails with the same exception (and 557 * the function is not invoked). Example usage: 558 * 559 * <pre> {@code 560 * ListenableFuture<QueryResult> queryFuture = ...; 561 * Function<QueryResult, List<Row>> rowsFunction = 562 * new Function<QueryResult, List<Row>>() { 563 * public List<Row> apply(QueryResult queryResult) { 564 * return queryResult.getRows(); 565 * } 566 * }; 567 * ListenableFuture<List<Row>> rowsFuture = 568 * transform(queryFuture, rowsFunction);}</pre> 569 * 570 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 571 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 572 * ListenableFuture.addListener} documentation. The documentation's warnings about "lightweight 573 * listeners" refer here to the work done during {@code Function.apply}. 574 * 575 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 576 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 577 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 578 * in which it will attempt to cancel itself. 579 * 580 * <p>An example use of this method is to convert a serializable object returned from an RPC into 581 * a POJO. 582 * 583 * @param input The future to transform 584 * @param function A Function to transform the results of the provided future to the results of 585 * the returned future. This will be run in the thread that notifies input it is complete. 586 * @return A future that holds result of the transformation. 587 * @since 9.0 (in 1.0 as {@code compose}) 588 */ 589 public static <I, O> ListenableFuture<O> transform( 590 ListenableFuture<I> input, Function<? super I, ? extends O> function) { 591 return AbstractTransformFuture.create(input, function); 592 } 593 594 /** 595 * Returns a new {@code Future} whose result is derived from the result of the given {@code 596 * Future}. If {@code input} fails, the returned {@code Future} fails with the same exception (and 597 * the function is not invoked). Example usage: 598 * 599 * <pre> {@code 600 * ListenableFuture<QueryResult> queryFuture = ...; 601 * Function<QueryResult, List<Row>> rowsFunction = 602 * new Function<QueryResult, List<Row>>() { 603 * public List<Row> apply(QueryResult queryResult) { 604 * return queryResult.getRows(); 605 * } 606 * }; 607 * ListenableFuture<List<Row>> rowsFuture = 608 * transform(queryFuture, rowsFunction, executor);}</pre> 609 * 610 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 611 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 612 * documentation. The documentation's warnings about "lightweight listeners" refer here to the 613 * work done during {@code Function.apply}. 614 * 615 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 616 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 617 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 618 * in which it will attempt to cancel itself. 619 * 620 * <p>An example use of this method is to convert a serializable object returned from an RPC into 621 * a POJO. 622 * 623 * @param input The future to transform 624 * @param function A Function to transform the results of the provided future to the results of 625 * the returned future. 626 * @param executor Executor to run the function in. 627 * @return A future that holds result of the transformation. 628 * @since 9.0 (in 2.0 as {@code compose}) 629 */ 630 public static <I, O> ListenableFuture<O> transform( 631 ListenableFuture<I> input, Function<? super I, ? extends O> function, Executor executor) { 632 return AbstractTransformFuture.create(input, function, executor); 633 } 634 635 /** 636 * Like {@link #transform(ListenableFuture, Function)} except that the transformation {@code 637 * function} is invoked on each call to {@link Future#get() get()} on the returned future. 638 * 639 * <p>The returned {@code Future} reflects the input's cancellation state directly, and any 640 * attempt to cancel the returned Future is likewise passed through to the input Future. 641 * 642 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} only apply the timeout 643 * to the execution of the underlying {@code Future}, <em>not</em> to the execution of the 644 * transformation function. 645 * 646 * <p>The primary audience of this method is callers of {@code transform} who don't have a {@code 647 * ListenableFuture} available and do not mind repeated, lazy function evaluation. 648 * 649 * @param input The future to transform 650 * @param function A Function to transform the results of the provided future to the results of 651 * the returned future. 652 * @return A future that returns the result of the transformation. 653 * @since 10.0 654 */ 655 @GwtIncompatible // TODO 656 public static <I, O> Future<O> lazyTransform( 657 final Future<I> input, final Function<? super I, ? extends O> function) { 658 checkNotNull(input); 659 checkNotNull(function); 660 return new Future<O>() { 661 662 @Override 663 public boolean cancel(boolean mayInterruptIfRunning) { 664 return input.cancel(mayInterruptIfRunning); 665 } 666 667 @Override 668 public boolean isCancelled() { 669 return input.isCancelled(); 670 } 671 672 @Override 673 public boolean isDone() { 674 return input.isDone(); 675 } 676 677 @Override 678 public O get() throws InterruptedException, ExecutionException { 679 return applyTransformation(input.get()); 680 } 681 682 @Override 683 public O get(long timeout, TimeUnit unit) 684 throws InterruptedException, ExecutionException, TimeoutException { 685 return applyTransformation(input.get(timeout, unit)); 686 } 687 688 private O applyTransformation(I input) throws ExecutionException { 689 try { 690 return function.apply(input); 691 } catch (Throwable t) { 692 throw new ExecutionException(t); 693 } 694 } 695 }; 696 } 697 698 /** 699 * Returns a new {@code ListenableFuture} whose result is the product of calling {@code get()} on 700 * the {@code Future} nested within the given {@code Future}, effectively chaining the futures one 701 * after the other. Example: 702 * 703 * <pre> {@code 704 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 705 * ListenableFuture<String> dereferenced = dereference(nested);}</pre> 706 * 707 * <p>Most users will not need this method. To create a {@code Future} that completes with the 708 * result of another {@code Future}, create a {@link SettableFuture}, and call {@link 709 * SettableFuture#setFuture setFuture(otherFuture)} on it. 710 * 711 * <p>{@code dereference} has the same cancellation and execution semantics as {@link 712 * #transformAsync(ListenableFuture, AsyncFunction)}, in that the returned {@code Future} 713 * attempts to keep its cancellation state in sync with both the input {@code Future} and the 714 * nested {@code Future}. The transformation is very lightweight and therefore takes place in 715 * the same thread (either the thread that called {@code dereference}, or the thread in which 716 * the dereferenced future completes). 717 * 718 * @param nested The nested future to transform. 719 * @return A future that holds result of the inner future. 720 * @since 13.0 721 */ 722 @SuppressWarnings({"rawtypes", "unchecked"}) 723 public static <V> ListenableFuture<V> dereference( 724 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 725 return transformAsync((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); 726 } 727 728 /** 729 * Helper {@code Function} for {@link #dereference}. 730 */ 731 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 732 new AsyncFunction<ListenableFuture<Object>, Object>() { 733 @Override 734 public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 735 return input; 736 } 737 }; 738 739 /** 740 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 741 * input futures, if all succeed. If any input fails, the returned future fails immediately. 742 * 743 * <p>The list of results is in the same order as the input list. 744 * 745 * <p>Canceling this future will attempt to cancel all the component futures, and if any of the 746 * provided futures fails or is canceled, this one is, too. 747 * 748 * @param futures futures to combine 749 * @return a future that provides a list of the results of the component futures 750 * @since 10.0 751 */ 752 @Beta 753 @SafeVarargs 754 public static <V> ListenableFuture<List<V>> allAsList(ListenableFuture<? extends V>... futures) { 755 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 756 } 757 758 /** 759 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 760 * input futures, if all succeed. If any input fails, the returned future fails immediately. 761 * 762 * <p>The list of results is in the same order as the input list. 763 * 764 * <p>Canceling this future will attempt to cancel all the component futures, and if any of the 765 * provided futures fails or is canceled, this one is, too. 766 * 767 * @param futures futures to combine 768 * @return a future that provides a list of the results of the component futures 769 * @since 10.0 770 */ 771 @Beta 772 public static <V> ListenableFuture<List<V>> allAsList( 773 Iterable<? extends ListenableFuture<? extends V>> futures) { 774 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 775 } 776 777 /** 778 * Creates a {@link FutureCombiner} that processes the completed futures whether or not they're 779 * successful. 780 * 781 * @since 20.0 782 */ 783 @SafeVarargs 784 public static <V> FutureCombiner<V> whenAllComplete(ListenableFuture<? extends V>... futures) { 785 return new FutureCombiner<V>(false, ImmutableList.copyOf(futures)); 786 } 787 788 /** 789 * Creates a {@link FutureCombiner} that processes the completed futures whether or not they're 790 * successful. 791 * 792 * @since 20.0 793 */ 794 public static <V> FutureCombiner<V> whenAllComplete( 795 Iterable<? extends ListenableFuture<? extends V>> futures) { 796 return new FutureCombiner<V>(false, ImmutableList.copyOf(futures)); 797 } 798 799 /** 800 * Creates a {@link FutureCombiner} requiring that all passed in futures are successful. 801 * 802 * <p>If any input fails, the returned future fails immediately. 803 * 804 * @since 20.0 805 */ 806 @SafeVarargs 807 public static <V> FutureCombiner<V> whenAllSucceed(ListenableFuture<? extends V>... futures) { 808 return new FutureCombiner<V>(true, ImmutableList.copyOf(futures)); 809 } 810 811 /** 812 * Creates a {@link FutureCombiner} requiring that all passed in futures are successful. 813 * 814 * <p>If any input fails, the returned future fails immediately. 815 * 816 * @since 20.0 817 */ 818 public static <V> FutureCombiner<V> whenAllSucceed( 819 Iterable<? extends ListenableFuture<? extends V>> futures) { 820 return new FutureCombiner<V>(true, ImmutableList.copyOf(futures)); 821 } 822 823 /** 824 * A helper to create a new {@code ListenableFuture} whose result is generated from a combination 825 * of input futures. 826 * 827 * <p>See {@link #whenAllComplete} and {@link #whenAllSucceed} for how to instantiate this class. 828 * 829 * <p>Example: 830 * 831 * <pre> {@code 832 * final ListenableFuture<Instant> loginDateFuture = 833 * loginService.findLastLoginDate(username); 834 * final ListenableFuture<List<String>> recentCommandsFuture = 835 * recentCommandsService.findRecentCommands(username); 836 * Callable<UsageHistory> usageComputation = 837 * new Callable<UsageHistory>() { 838 * public UsageHistory call() throws Exception { 839 * return new UsageHistory( 840 * username, loginDateFuture.get(), recentCommandsFuture.get()); 841 * } 842 * }; 843 * ListenableFuture<UsageHistory> usageFuture = 844 * Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture) 845 * .call(usageComputation, executor);}</pre> 846 * 847 * @since 20.0 848 */ 849 @Beta 850 @CanIgnoreReturnValue // TODO(cpovirk): Consider removing, especially if we provide run(Runnable) 851 @GwtCompatible 852 public static final class FutureCombiner<V> { 853 private final boolean allMustSucceed; 854 private final ImmutableList<ListenableFuture<? extends V>> futures; 855 856 private FutureCombiner( 857 boolean allMustSucceed, ImmutableList<ListenableFuture<? extends V>> futures) { 858 this.allMustSucceed = allMustSucceed; 859 this.futures = futures; 860 } 861 862 /** 863 * Creates the {@link ListenableFuture} which will return the result of calling {@link 864 * AsyncCallable#call} in {@code combiner} when all futures complete, using the specified {@code 865 * executor}. 866 * 867 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 868 * cancelled. 869 * 870 * <p>If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code 871 * ExecutionException} will be extracted and returned as the cause of the new {@code 872 * ExecutionException} that gets thrown by the returned combined future. 873 * 874 * <p>Canceling this future will attempt to cancel all the component futures. 875 */ 876 public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner, Executor executor) { 877 return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner); 878 } 879 880 /** 881 * Like {@link #callAsync(AsyncCallable, Executor)} but using {@linkplain 882 * MoreExecutors#directExecutor direct executor}. 883 */ 884 public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner) { 885 return callAsync(combiner, directExecutor()); 886 } 887 888 /** 889 * Creates the {@link ListenableFuture} which will return the result of calling {@link 890 * Callable#call} in {@code combiner} when all futures complete, using the specified {@code 891 * executor}. 892 * 893 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 894 * cancelled. 895 * 896 * <p>If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code 897 * ExecutionException} will be extracted and returned as the cause of the new {@code 898 * ExecutionException} that gets thrown by the returned combined future. 899 * 900 * <p>Canceling this future will attempt to cancel all the component futures. 901 */ 902 @CanIgnoreReturnValue 903 public <C> ListenableFuture<C> call(Callable<C> combiner, Executor executor) { 904 return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner); 905 } 906 907 /** 908 * Like {@link #call(Callable, Executor)} but using {@linkplain MoreExecutors#directExecutor 909 * direct executor}. 910 */ 911 @CanIgnoreReturnValue 912 public <C> ListenableFuture<C> call(Callable<C> combiner) { 913 return call(combiner, directExecutor()); 914 } 915 916 /* 917 * TODO(cpovirk): Evaluate demand for a run(Runnable) version. Would it allow us to remove 918 * @CanIgnoreReturnValue from the call() methods above? 919 * https://github.com/google/guava/issues/2371 920 */ 921 } 922 923 /** 924 * Creates a new {@code ListenableFuture} whose result is set from the supplied future when it 925 * completes. Cancelling the supplied future will also cancel the returned future, but cancelling 926 * the returned future will have no effect on the supplied future. 927 * 928 * @since 15.0 929 */ 930 public static <V> ListenableFuture<V> nonCancellationPropagating(ListenableFuture<V> future) { 931 return new NonCancellationPropagatingFuture<V>(future); 932 } 933 934 /** 935 * A wrapped future that does not propagate cancellation to its delegate. 936 */ 937 private static final class NonCancellationPropagatingFuture<V> 938 extends AbstractFuture.TrustedFuture<V> { 939 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 940 delegate.addListener( 941 new Runnable() { 942 @Override 943 public void run() { 944 // This prevents cancellation from propagating because we don't assign delegate until 945 // delegate is already done, so calling cancel() on it is a no-op. 946 setFuture(delegate); 947 } 948 }, 949 directExecutor()); 950 } 951 } 952 953 /** 954 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 955 * successful input futures. The list of results is in the same order as the input list, and if 956 * any of the provided futures fails or is canceled, its corresponding position will contain 957 * {@code null} (which is indistinguishable from the future having a successful value of {@code 958 * null}). 959 * 960 * <p>Canceling this future will attempt to cancel all the component futures. 961 * 962 * @param futures futures to combine 963 * @return a future that provides a list of the results of the component futures 964 * @since 10.0 965 */ 966 @Beta 967 @SafeVarargs 968 public static <V> ListenableFuture<List<V>> successfulAsList( 969 ListenableFuture<? extends V>... futures) { 970 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 971 } 972 973 /** 974 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 975 * successful input futures. The list of results is in the same order as the input list, and if 976 * any of the provided futures fails or is canceled, its corresponding position will contain 977 * {@code null} (which is indistinguishable from the future having a successful value of {@code 978 * null}). 979 * 980 * <p>Canceling this future will attempt to cancel all the component futures. 981 * 982 * @param futures futures to combine 983 * @return a future that provides a list of the results of the component futures 984 * @since 10.0 985 */ 986 @Beta 987 public static <V> ListenableFuture<List<V>> successfulAsList( 988 Iterable<? extends ListenableFuture<? extends V>> futures) { 989 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 990 } 991 992 /** 993 * Returns a list of delegate futures that correspond to the futures received in the order that 994 * they complete. Delegate futures return the same value or throw the same exception as the 995 * corresponding input future returns/throws. 996 * 997 * <p>Cancelling a delegate future has no effect on any input future, since the delegate future 998 * does not correspond to a specific input future until the appropriate number of input futures 999 * have completed. At that point, it is too late to cancel the input future. The input future's 1000 * result, which cannot be stored into the cancelled delegate future, is ignored. 1001 * 1002 * @since 17.0 1003 */ 1004 @Beta 1005 @GwtIncompatible // TODO 1006 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder( 1007 Iterable<? extends ListenableFuture<? extends T>> futures) { 1008 // A CLQ may be overkill here. We could save some pointers/memory by synchronizing on an 1009 // ArrayDeque 1010 final ConcurrentLinkedQueue<SettableFuture<T>> delegates = Queues.newConcurrentLinkedQueue(); 1011 ImmutableList.Builder<ListenableFuture<T>> listBuilder = ImmutableList.builder(); 1012 // Using SerializingExecutor here will ensure that each CompletionOrderListener executes 1013 // atomically and therefore that each returned future is guaranteed to be in completion order. 1014 // N.B. there are some cases where the use of this executor could have possibly surprising 1015 // effects when input futures finish at approximately the same time _and_ the output futures 1016 // have directExecutor listeners. In this situation, the listeners may end up running on a 1017 // different thread than if they were attached to the corresponding input future. We believe 1018 // this to be a negligible cost since: 1019 // 1. Using the directExecutor implies that your callback is safe to run on any thread. 1020 // 2. This would likely only be noticeable if you were doing something expensive or blocking on 1021 // a directExecutor listener on one of the output futures which is an antipattern anyway. 1022 SerializingExecutor executor = new SerializingExecutor(directExecutor()); 1023 for (final ListenableFuture<? extends T> future : futures) { 1024 SettableFuture<T> delegate = SettableFuture.create(); 1025 // Must make sure to add the delegate to the queue first in case the future is already done 1026 delegates.add(delegate); 1027 future.addListener( 1028 new Runnable() { 1029 @Override 1030 public void run() { 1031 delegates.remove().setFuture(future); 1032 } 1033 }, 1034 executor); 1035 listBuilder.add(delegate); 1036 } 1037 return listBuilder.build(); 1038 } 1039 1040 /** 1041 * Registers separate success and failure callbacks to be run when the {@code Future}'s 1042 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 1043 * computation is already complete, immediately. 1044 * 1045 * <p>There is no guaranteed ordering of execution of callbacks, but any callback added through 1046 * this method is guaranteed to be called once the computation is complete. 1047 * 1048 * Example: <pre> {@code 1049 * ListenableFuture<QueryResult> future = ...; 1050 * addCallback(future, 1051 * new FutureCallback<QueryResult>() { 1052 * public void onSuccess(QueryResult result) { 1053 * storeInCache(result); 1054 * } 1055 * public void onFailure(Throwable t) { 1056 * reportError(t); 1057 * } 1058 * });}</pre> 1059 * 1060 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 1061 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 1062 * ListenableFuture.addListener} documentation. 1063 * 1064 * <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link 1065 * ListenableFuture#addListener addListener}. 1066 * 1067 * @param future The future attach the callback to. 1068 * @param callback The callback to invoke when {@code future} is completed. 1069 * @since 10.0 1070 */ 1071 public static <V> void addCallback( 1072 ListenableFuture<V> future, FutureCallback<? super V> callback) { 1073 addCallback(future, callback, directExecutor()); 1074 } 1075 1076 /** 1077 * Registers separate success and failure callbacks to be run when the {@code Future}'s 1078 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 1079 * computation is already complete, immediately. 1080 * 1081 * <p>The callback is run in {@code executor}. There is no guaranteed ordering of execution of 1082 * callbacks, but any callback added through this method is guaranteed to be called once the 1083 * computation is complete. 1084 * 1085 * Example: <pre> {@code 1086 * ListenableFuture<QueryResult> future = ...; 1087 * Executor e = ... 1088 * addCallback(future, 1089 * new FutureCallback<QueryResult>() { 1090 * public void onSuccess(QueryResult result) { 1091 * storeInCache(result); 1092 * } 1093 * public void onFailure(Throwable t) { 1094 * reportError(t); 1095 * } 1096 * }, e);}</pre> 1097 * 1098 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 1099 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1100 * documentation. 1101 * 1102 * <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link 1103 * ListenableFuture#addListener addListener}. 1104 * 1105 * @param future The future attach the callback to. 1106 * @param callback The callback to invoke when {@code future} is completed. 1107 * @param executor The executor to run {@code callback} when the future completes. 1108 * @since 10.0 1109 */ 1110 public static <V> void addCallback( 1111 final ListenableFuture<V> future, 1112 final FutureCallback<? super V> callback, 1113 Executor executor) { 1114 Preconditions.checkNotNull(callback); 1115 Runnable callbackListener = 1116 new Runnable() { 1117 @Override 1118 public void run() { 1119 final V value; 1120 try { 1121 value = getDone(future); 1122 } catch (ExecutionException e) { 1123 callback.onFailure(e.getCause()); 1124 return; 1125 } catch (RuntimeException e) { 1126 callback.onFailure(e); 1127 return; 1128 } catch (Error e) { 1129 callback.onFailure(e); 1130 return; 1131 } 1132 callback.onSuccess(value); 1133 } 1134 }; 1135 future.addListener(callbackListener, executor); 1136 } 1137 1138 /** 1139 * Returns the result of the input {@code Future}, which must have already completed. 1140 * 1141 * <p>The benefits of this method are twofold. First, the name "getDone" suggests to readers that 1142 * the {@code Future} is already done. Second, if buggy code calls {@code getDone} on a {@code 1143 * Future} that is still pending, the program will throw instead of block. This can be important 1144 * for APIs like {@link whenAllComplete whenAllComplete(...)}{@code .}{@link 1145 * FutureCombiner#call(Callable) call(...)}, where it is easy to use a new input from the {@code 1146 * call} implementation but forget to add it to the arguments of {@code whenAllComplete}. 1147 * 1148 * <p>If you are looking for a method to determine whether a given {@code Future} is done, use the 1149 * instance method {@link Future#isDone()}. 1150 * 1151 * @throws ExecutionException if the {@code Future} failed with an exception 1152 * @throws CancellationException if the {@code Future} was cancelled 1153 * @throws IllegalStateException if the {@code Future} is not done 1154 * @since 20.0 1155 */ 1156 @CanIgnoreReturnValue 1157 // TODO(cpovirk): Consider calling getDone() in our own code. 1158 public static <V> V getDone(Future<V> future) throws ExecutionException { 1159 /* 1160 * We throw IllegalStateException, since the call could succeed later. Perhaps we "should" throw 1161 * IllegalArgumentException, since the call could succeed with a different argument. Those 1162 * exceptions' docs suggest that either is acceptable. Google's Java Practices page recommends 1163 * IllegalArgumentException here, in part to keep its recommendation simple: Static methods 1164 * should throw IllegalStateException only when they use static state. 1165 * 1166 * 1167 * Why do we deviate here? The answer: We want for fluentFuture.getDone() to throw the same 1168 * exception as Futures.getDone(fluentFuture). 1169 */ 1170 checkState(future.isDone(), "Future was expected to be done: %s", future); 1171 return getUninterruptibly(future); 1172 } 1173 1174 /** 1175 * Returns the result of {@link Future#get()}, converting most exceptions to a new instance of the 1176 * given checked exception type. This reduces boilerplate for a common use of {@code Future} in 1177 * which it is unnecessary to programmatically distinguish between exception types or to extract 1178 * other information from the exception instance. 1179 * 1180 * <p>Exceptions from {@code Future.get} are treated as follows: 1181 * <ul> 1182 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@code X} if the cause is 1183 * a checked exception, an {@link UncheckedExecutionException} if the cause is a {@code 1184 * RuntimeException}, or an {@link ExecutionError} if the cause is an {@code Error}. 1185 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after restoring the 1186 * interrupt). 1187 * <li>Any {@link CancellationException} is propagated untouched, as is any other {@link 1188 * RuntimeException} (though {@code get} implementations are discouraged from throwing such 1189 * exceptions). 1190 * </ul> 1191 * 1192 * <p>The overall principle is to continue to treat every checked exception as a checked 1193 * exception, every unchecked exception as an unchecked exception, and every error as an error. In 1194 * addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the 1195 * new stack trace matches that of the current thread. 1196 * 1197 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor 1198 * that accepts zero or more arguments, all of type {@code String} or {@code Throwable} 1199 * (preferring constructors with at least one {@code String}) and calling the constructor via 1200 * reflection. If the exception did not already have a cause, one is set by calling {@link 1201 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code 1202 * IllegalArgumentException} is thrown. 1203 * 1204 * @throws X if {@code get} throws any checked exception except for an {@code ExecutionException} 1205 * whose cause is not itself a checked exception 1206 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a 1207 * {@code RuntimeException} as its cause 1208 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1209 * Error} as its cause 1210 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1211 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or 1212 * does not have a suitable constructor 1213 * @since 19.0 (in 10.0 as {@code get}) 1214 */ 1215 @CanIgnoreReturnValue 1216 @GwtIncompatible // reflection 1217 public static <V, X extends Exception> V getChecked(Future<V> future, Class<X> exceptionClass) 1218 throws X { 1219 return FuturesGetChecked.getChecked(future, exceptionClass); 1220 } 1221 1222 /** 1223 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most exceptions to a new 1224 * instance of the given checked exception type. This reduces boilerplate for a common use of 1225 * {@code Future} in which it is unnecessary to programmatically distinguish between exception 1226 * types or to extract other information from the exception instance. 1227 * 1228 * <p>Exceptions from {@code Future.get} are treated as follows: 1229 * <ul> 1230 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@code X} if the cause is 1231 * a checked exception, an {@link UncheckedExecutionException} if the cause is a {@code 1232 * RuntimeException}, or an {@link ExecutionError} if the cause is an {@code Error}. 1233 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after restoring the 1234 * interrupt). 1235 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1236 * <li>Any {@link CancellationException} is propagated untouched, as is any other {@link 1237 * RuntimeException} (though {@code get} implementations are discouraged from throwing such 1238 * exceptions). 1239 * </ul> 1240 * 1241 * <p>The overall principle is to continue to treat every checked exception as a checked 1242 * exception, every unchecked exception as an unchecked exception, and every error as an error. In 1243 * addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the 1244 * new stack trace matches that of the current thread. 1245 * 1246 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor 1247 * that accepts zero or more arguments, all of type {@code String} or {@code Throwable} 1248 * (preferring constructors with at least one {@code String}) and calling the constructor via 1249 * reflection. If the exception did not already have a cause, one is set by calling {@link 1250 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code 1251 * IllegalArgumentException} is thrown. 1252 * 1253 * @throws X if {@code get} throws any checked exception except for an {@code ExecutionException} 1254 * whose cause is not itself a checked exception 1255 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a 1256 * {@code RuntimeException} as its cause 1257 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1258 * Error} as its cause 1259 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1260 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or 1261 * does not have a suitable constructor 1262 * @since 19.0 (in 10.0 as {@code get} and with different parameter order) 1263 */ 1264 @CanIgnoreReturnValue 1265 @GwtIncompatible // reflection 1266 public static <V, X extends Exception> V getChecked( 1267 Future<V> future, Class<X> exceptionClass, long timeout, TimeUnit unit) throws X { 1268 return FuturesGetChecked.getChecked(future, exceptionClass, timeout, unit); 1269 } 1270 1271 /** 1272 * Returns the result of calling {@link Future#get()} uninterruptibly on a task known not to throw 1273 * a checked exception. This makes {@code Future} more suitable for lightweight, fast-running 1274 * tasks that, barring bugs in the code, will not fail. This gives it exception-handling behavior 1275 * similar to that of {@code ForkJoinTask.join}. 1276 * 1277 * <p>Exceptions from {@code Future.get} are treated as follows: 1278 * <ul> 1279 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@link 1280 * UncheckedExecutionException} (if the cause is an {@code Exception}) or {@link 1281 * ExecutionError} (if the cause is an {@code Error}). 1282 * <li>Any {@link InterruptedException} causes a retry of the {@code get} call. The interrupt is 1283 * restored before {@code getUnchecked} returns. 1284 * <li>Any {@link CancellationException} is propagated untouched. So is any other {@link 1285 * RuntimeException} ({@code get} implementations are discouraged from throwing such 1286 * exceptions). 1287 * </ul> 1288 * 1289 * <p>The overall principle is to eliminate all checked exceptions: to loop to avoid {@code 1290 * InterruptedException}, to pass through {@code CancellationException}, and to wrap any exception 1291 * from the underlying computation in an {@code UncheckedExecutionException} or {@code 1292 * ExecutionError}. 1293 * 1294 * <p>For an uninterruptible {@code get} that preserves other exceptions, see {@link 1295 * Uninterruptibles#getUninterruptibly(Future)}. 1296 * 1297 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with an 1298 * {@code Exception} as its cause 1299 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1300 * Error} as its cause 1301 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1302 * @since 10.0 1303 */ 1304 @CanIgnoreReturnValue 1305 @GwtIncompatible // TODO 1306 public static <V> V getUnchecked(Future<V> future) { 1307 checkNotNull(future); 1308 try { 1309 return getUninterruptibly(future); 1310 } catch (ExecutionException e) { 1311 wrapAndThrowUnchecked(e.getCause()); 1312 throw new AssertionError(); 1313 } 1314 } 1315 1316 @GwtIncompatible // TODO 1317 private static void wrapAndThrowUnchecked(Throwable cause) { 1318 if (cause instanceof Error) { 1319 throw new ExecutionError((Error) cause); 1320 } 1321 /* 1322 * It's an Exception. (Or it's a non-Error, non-Exception Throwable. From my survey of such 1323 * classes, I believe that most users intended to extend Exception, so we'll treat it like an 1324 * Exception.) 1325 */ 1326 throw new UncheckedExecutionException(cause); 1327 } 1328 1329 /* 1330 * Arguably we don't need a timed getUnchecked because any operation slow enough to require a 1331 * timeout is heavyweight enough to throw a checked exception and therefore be inappropriate to 1332 * use with getUnchecked. Further, it's not clear that converting the checked TimeoutException to 1333 * a RuntimeException -- especially to an UncheckedExecutionException, since it wasn't thrown by 1334 * the computation -- makes sense, and if we don't convert it, the user still has to write a 1335 * try-catch block. 1336 * 1337 * If you think you would use this method, let us know. You might also also look into the 1338 * Fork-Join framework: http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html 1339 */ 1340 1341 /** 1342 * A checked future that uses a function to map from exceptions to the appropriate checked type. 1343 */ 1344 @GwtIncompatible // TODO 1345 private static class MappingCheckedFuture<V, X extends Exception> 1346 extends AbstractCheckedFuture<V, X> { 1347 1348 final Function<? super Exception, X> mapper; 1349 1350 MappingCheckedFuture(ListenableFuture<V> delegate, Function<? super Exception, X> mapper) { 1351 super(delegate); 1352 1353 this.mapper = checkNotNull(mapper); 1354 } 1355 1356 @Override 1357 protected X mapException(Exception e) { 1358 return mapper.apply(e); 1359 } 1360 } 1361}