001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Preconditions.checkState; 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 021 022import com.google.common.annotations.Beta; 023import com.google.common.annotations.GwtCompatible; 024import com.google.common.annotations.GwtIncompatible; 025import com.google.common.base.Function; 026import com.google.common.base.MoreObjects; 027import com.google.common.base.Preconditions; 028import com.google.common.collect.ImmutableList; 029import com.google.common.util.concurrent.CollectionFuture.ListFuture; 030import com.google.common.util.concurrent.ImmediateFuture.ImmediateCancelledFuture; 031import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedCheckedFuture; 032import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedFuture; 033import com.google.common.util.concurrent.ImmediateFuture.ImmediateSuccessfulCheckedFuture; 034import com.google.common.util.concurrent.ImmediateFuture.ImmediateSuccessfulFuture; 035import com.google.errorprone.annotations.CanIgnoreReturnValue; 036import java.util.Collection; 037import java.util.List; 038import java.util.concurrent.Callable; 039import java.util.concurrent.CancellationException; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.Executor; 042import java.util.concurrent.Future; 043import java.util.concurrent.ScheduledExecutorService; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.TimeoutException; 046import java.util.concurrent.atomic.AtomicInteger; 047import org.checkerframework.checker.nullness.compatqual.NullableDecl; 048 049/** 050 * Static utility methods pertaining to the {@link Future} interface. 051 * 052 * <p>Many of these methods use the {@link ListenableFuture} API; consult the Guava User Guide 053 * article on <a href="https://github.com/google/guava/wiki/ListenableFutureExplained">{@code 054 * ListenableFuture}</a>. 055 * 056 * <p>The main purpose of {@code ListenableFuture} is to help you chain together a graph of 057 * asynchronous operations. You can chain them together manually with calls to methods like {@link 058 * Futures#transform(ListenableFuture, Function, Executor) Futures.transform}, but you will often 059 * find it easier to use a framework. Frameworks automate the process, often adding features like 060 * monitoring, debugging, and cancellation. Examples of frameworks include: 061 * 062 * <ul> 063 * <li><a href="http://google.github.io/dagger/producers.html">Dagger Producers</a> 064 * </ul> 065 * 066 * <p>If you do chain your operations manually, you may want to use {@link FluentFuture}. 067 * 068 * @author Kevin Bourrillion 069 * @author Nishant Thakkar 070 * @author Sven Mawson 071 * @since 1.0 072 */ 073@Beta 074@GwtCompatible(emulated = true) 075public final class Futures extends GwtFuturesCatchingSpecialization { 076 077 // A note on memory visibility. 078 // Many of the utilities in this class (transform, withFallback, withTimeout, asList, combine) 079 // have two requirements that significantly complicate their design. 080 // 1. Cancellation should propagate from the returned future to the input future(s). 081 // 2. The returned futures shouldn't unnecessarily 'pin' their inputs after completion. 082 // 083 // A consequence of these requirements is that the delegate futures cannot be stored in 084 // final fields. 085 // 086 // For simplicity the rest of this description will discuss Futures.catching since it is the 087 // simplest instance, though very similar descriptions apply to many other classes in this file. 088 // 089 // In the constructor of AbstractCatchingFuture, the delegate future is assigned to a field 090 // 'inputFuture'. That field is non-final and non-volatile. There are 2 places where the 091 // 'inputFuture' field is read and where we will have to consider visibility of the write 092 // operation in the constructor. 093 // 094 // 1. In the listener that performs the callback. In this case it is fine since inputFuture is 095 // assigned prior to calling addListener, and addListener happens-before any invocation of the 096 // listener. Notably, this means that 'volatile' is unnecessary to make 'inputFuture' visible 097 // to the listener. 098 // 099 // 2. In done() where we may propagate cancellation to the input. In this case it is _not_ fine. 100 // There is currently nothing that enforces that the write to inputFuture in the constructor is 101 // visible to done(). This is because there is no happens before edge between the write and a 102 // (hypothetical) unsafe read by our caller. Note: adding 'volatile' does not fix this issue, 103 // it would just add an edge such that if done() observed non-null, then it would also 104 // definitely observe all earlier writes, but we still have no guarantee that done() would see 105 // the inital write (just stronger guarantees if it does). 106 // 107 // See: http://cs.oswego.edu/pipermail/concurrency-interest/2015-January/013800.html 108 // For a (long) discussion about this specific issue and the general futility of life. 109 // 110 // For the time being we are OK with the problem discussed above since it requires a caller to 111 // introduce a very specific kind of data-race. And given the other operations performed by these 112 // methods that involve volatile read/write operations, in practice there is no issue. Also, the 113 // way in such a visibility issue would surface is most likely as a failure of cancel() to 114 // propagate to the input. Cancellation propagation is fundamentally racy so this is fine. 115 // 116 // Future versions of the JMM may revise safe construction semantics in such a way that we can 117 // safely publish these objects and we won't need this whole discussion. 118 // TODO(user,lukes): consider adding volatile to all these fields since in current known JVMs 119 // that should resolve the issue. This comes at the cost of adding more write barriers to the 120 // implementations. 121 122 private Futures() {} 123 124 /** 125 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} and a {@link Function} 126 * that maps from {@link Exception} instances into the appropriate checked type. 127 * 128 * <p><b>Warning:</b> We recommend against using {@code CheckedFuture} in new projects. {@code 129 * CheckedFuture} is difficult to build libraries atop. {@code CheckedFuture} ports of methods 130 * like {@link Futures#transformAsync} have historically had bugs, and some of these bugs are 131 * necessary, unavoidable consequences of the {@code CheckedFuture} API. Additionally, {@code 132 * CheckedFuture} encourages users to take exceptions from one thread and rethrow them in another, 133 * producing confusing stack traces. 134 * 135 * <p>The given mapping function will be applied to an {@link InterruptedException}, a {@link 136 * CancellationException}, or an {@link ExecutionException}. See {@link Future#get()} for details 137 * on the exceptions thrown. 138 * 139 * @since 9.0 (source-compatible since 1.0) 140 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 141 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 142 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 143 * Additionally, it has a surprising policy about which exceptions to map and which to leave 144 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 145 * use, possibly specializing them to the particular exception type they use. We recommend 146 * that most people use {@code ListenableFuture} and perform any exception wrapping 147 * themselves. This method is scheduled for removal from Guava in April 2018. 148 */ 149 // TODO(b/72241575): Remove by 2018-04 150 @Deprecated 151 @GwtIncompatible // TODO 152 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 153 ListenableFuture<V> future, Function<? super Exception, X> mapper) { 154 return new MappingCheckedFuture<>(checkNotNull(future), mapper); 155 } 156 157 /** 158 * Creates a {@code ListenableFuture} which has its value set immediately upon construction. The 159 * getters just return the value. This {@code Future} can't be canceled or timed out and its 160 * {@code isDone()} method always returns {@code true}. 161 */ 162 public static <V> ListenableFuture<V> immediateFuture(@NullableDecl V value) { 163 if (value == null) { 164 // This cast is safe because null is assignable to V for all V (i.e. it is covariant) 165 @SuppressWarnings({"unchecked", "rawtypes"}) 166 ListenableFuture<V> typedNull = (ListenableFuture) ImmediateSuccessfulFuture.NULL; 167 return typedNull; 168 } 169 return new ImmediateSuccessfulFuture<V>(value); 170 } 171 172 /** 173 * Returns a {@code CheckedFuture} which has its value set immediately upon construction. 174 * 175 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 176 * returns {@code true}. Calling {@code get()} or {@code checkedGet()} will immediately return the 177 * provided value. 178 * 179 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 180 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 181 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 182 * Additionally, it has a surprising policy about which exceptions to map and which to leave 183 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 184 * use, possibly specializing them to the particular exception type they use. We recommend 185 * that most people use {@code ListenableFuture} and perform any exception wrapping 186 * themselves. This method is scheduled for removal from Guava in April 2018. 187 */ 188 // TODO(b/72241893): Remove by 2018-04 189 @Deprecated 190 @GwtIncompatible // TODO 191 public static <V, X extends Exception> CheckedFuture<V, X> immediateCheckedFuture( 192 @NullableDecl V value) { 193 return new ImmediateSuccessfulCheckedFuture<>(value); 194 } 195 196 /** 197 * Returns a {@code ListenableFuture} which has an exception set immediately upon construction. 198 * 199 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 200 * returns {@code true}. Calling {@code get()} will immediately throw the provided {@code 201 * Throwable} wrapped in an {@code ExecutionException}. 202 */ 203 public static <V> ListenableFuture<V> immediateFailedFuture(Throwable throwable) { 204 checkNotNull(throwable); 205 return new ImmediateFailedFuture<V>(throwable); 206 } 207 208 /** 209 * Creates a {@code ListenableFuture} which is cancelled immediately upon construction, so that 210 * {@code isCancelled()} always returns {@code true}. 211 * 212 * @since 14.0 213 */ 214 public static <V> ListenableFuture<V> immediateCancelledFuture() { 215 return new ImmediateCancelledFuture<V>(); 216 } 217 218 /** 219 * Returns a {@code CheckedFuture} which has an exception set immediately upon construction. 220 * 221 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 222 * returns {@code true}. Calling {@code get()} will immediately throw the provided {@code 223 * Exception} wrapped in an {@code ExecutionException}, and calling {@code checkedGet()} will 224 * throw the provided exception itself. 225 * 226 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 227 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 228 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 229 * Additionally, it has a surprising policy about which exceptions to map and which to leave 230 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 231 * use, possibly specializing them to the particular exception type they use. We recommend 232 * that most people use {@code ListenableFuture} and perform any exception wrapping 233 * themselves. This method is scheduled for removal from Guava in April 2018. 234 */ 235 // TODO(b/72241500): Remove by 2018-04 236 @Deprecated 237 @GwtIncompatible // TODO 238 public static <V, X extends Exception> CheckedFuture<V, X> immediateFailedCheckedFuture( 239 X exception) { 240 checkNotNull(exception); 241 return new ImmediateFailedCheckedFuture<>(exception); 242 } 243 244 /** 245 * Executes {@code callable} on the specified {@code executor}, returning a {@code Future}. 246 * 247 * @throws RejectedExecutionException if the task cannot be scheduled for execution 248 * @since 23.0 249 */ 250 public static <O> ListenableFuture<O> submitAsync(AsyncCallable<O> callable, Executor executor) { 251 TrustedListenableFutureTask<O> task = TrustedListenableFutureTask.create(callable); 252 executor.execute(task); 253 return task; 254 } 255 256 /** 257 * Schedules {@code callable} on the specified {@code executor}, returning a {@code Future}. 258 * 259 * @throws RejectedExecutionException if the task cannot be scheduled for execution 260 * @since 23.0 261 */ 262 @GwtIncompatible // java.util.concurrent.ScheduledExecutorService 263 public static <O> ListenableFuture<O> scheduleAsync( 264 AsyncCallable<O> callable, 265 long delay, 266 TimeUnit timeUnit, 267 ScheduledExecutorService executorService) { 268 TrustedListenableFutureTask<O> task = TrustedListenableFutureTask.create(callable); 269 final Future<?> scheduled = executorService.schedule(task, delay, timeUnit); 270 task.addListener( 271 new Runnable() { 272 @Override 273 public void run() { 274 // Don't want to interrupt twice 275 scheduled.cancel(false); 276 } 277 }, 278 directExecutor()); 279 return task; 280 } 281 282 /** 283 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 284 * primary input fails with the given {@code exceptionType}, from the result provided by the 285 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 286 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 287 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 288 * Future}. 289 * 290 * <p>Usage example: 291 * 292 * <pre>{@code 293 * ListenableFuture<Integer> fetchCounterFuture = ...; 294 * 295 * // Falling back to a zero counter in case an exception happens when 296 * // processing the RPC to fetch counters. 297 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 298 * fetchCounterFuture, FetchException.class, x -> 0); 299 * }</pre> 300 * 301 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 302 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 303 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 304 * also applicable to heavyweight functions passed to this method. 305 * 306 * @param input the primary input {@code Future} 307 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 308 * type is matched against the input's exception. "The input's exception" means the cause of 309 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 310 * different kind of exception, that exception itself. To avoid hiding bugs and other 311 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 312 * Throwable.class} in particular. 313 * @param fallback the {@link Function} to be called if {@code input} fails with the expected 314 * exception type. The function's argument is the input's exception. "The input's exception" 315 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 316 * {@code get()} throws a different kind of exception, that exception itself. 317 * @since 19.0 318 * @deprecated Use {@linkplain #catching(ListenableFuture, Class, Function, Executor) the overload 319 * that requires an executor}. For identical behavior, pass {@link 320 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 321 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 322 * documentation. This method is scheduled to be removed in April 2018. 323 */ 324 @Deprecated 325 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 326 public static <V, X extends Throwable> ListenableFuture<V> catching( 327 ListenableFuture<? extends V> input, 328 Class<X> exceptionType, 329 Function<? super X, ? extends V> fallback) { 330 return AbstractCatchingFuture.create(input, exceptionType, fallback, directExecutor()); 331 } 332 333 /** 334 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 335 * primary input fails with the given {@code exceptionType}, from the result provided by the 336 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 337 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 338 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 339 * Future}. 340 * 341 * <p>Usage example: 342 * 343 * <pre>{@code 344 * ListenableFuture<Integer> fetchCounterFuture = ...; 345 * 346 * // Falling back to a zero counter in case an exception happens when 347 * // processing the RPC to fetch counters. 348 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 349 * fetchCounterFuture, FetchException.class, x -> 0, directExecutor()); 350 * }</pre> 351 * 352 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 353 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 354 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 355 * functions passed to this method. 356 * 357 * @param input the primary input {@code Future} 358 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 359 * type is matched against the input's exception. "The input's exception" means the cause of 360 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 361 * different kind of exception, that exception itself. To avoid hiding bugs and other 362 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 363 * Throwable.class} in particular. 364 * @param fallback the {@link Function} to be called if {@code input} fails with the expected 365 * exception type. The function's argument is the input's exception. "The input's exception" 366 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 367 * {@code get()} throws a different kind of exception, that exception itself. 368 * @param executor the executor that runs {@code fallback} if {@code input} fails 369 * @since 19.0 370 */ 371 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 372 public static <V, X extends Throwable> ListenableFuture<V> catching( 373 ListenableFuture<? extends V> input, 374 Class<X> exceptionType, 375 Function<? super X, ? extends V> fallback, 376 Executor executor) { 377 return AbstractCatchingFuture.create(input, exceptionType, fallback, executor); 378 } 379 380 /** 381 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 382 * primary input fails with the given {@code exceptionType}, from the result provided by the 383 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 384 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 385 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 386 * {@code Future}. 387 * 388 * <p>Usage examples: 389 * 390 * <pre>{@code 391 * ListenableFuture<Integer> fetchCounterFuture = ...; 392 * 393 * // Falling back to a zero counter in case an exception happens when 394 * // processing the RPC to fetch counters. 395 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 396 * fetchCounterFuture, FetchException.class, x -> immediateFuture(0)); 397 * }</pre> 398 * 399 * <p>The fallback can also choose to propagate the original exception when desired: 400 * 401 * <pre>{@code 402 * ListenableFuture<Integer> fetchCounterFuture = ...; 403 * 404 * // Falling back to a zero counter only in case the exception was a 405 * // TimeoutException. 406 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 407 * fetchCounterFuture, 408 * FetchException.class, 409 * e -> { 410 * if (omitDataOnFetchFailure) { 411 * return immediateFuture(0); 412 * } 413 * throw e; 414 * }); 415 * }</pre> 416 * 417 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 418 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 419 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 420 * also applicable to heavyweight functions passed to this method. (Specifically, {@code 421 * directExecutor} functions should avoid heavyweight operations inside {@code 422 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 423 * completing the returned {@code Future}.) 424 * 425 * @param input the primary input {@code Future} 426 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 427 * type is matched against the input's exception. "The input's exception" means the cause of 428 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 429 * different kind of exception, that exception itself. To avoid hiding bugs and other 430 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 431 * Throwable.class} in particular. 432 * @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected 433 * exception type. The function's argument is the input's exception. "The input's exception" 434 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 435 * {@code get()} throws a different kind of exception, that exception itself. 436 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 437 * @deprecated Use {@linkplain #catchingAsync(ListenableFuture, Class, AsyncFunction, Executor) 438 * the overload that requires an executor}. For identical behavior, pass {@link 439 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 440 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 441 * documentation. This method is scheduled to be removed in April 2018. 442 */ 443 @CanIgnoreReturnValue // TODO(kak): @CheckReturnValue 444 @Deprecated 445 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 446 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 447 ListenableFuture<? extends V> input, 448 Class<X> exceptionType, 449 AsyncFunction<? super X, ? extends V> fallback) { 450 return AbstractCatchingFuture.create(input, exceptionType, fallback, directExecutor()); 451 } 452 453 /** 454 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 455 * primary input fails with the given {@code exceptionType}, from the result provided by the 456 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 457 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 458 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 459 * {@code Future}. 460 * 461 * <p>Usage examples: 462 * 463 * <pre>{@code 464 * ListenableFuture<Integer> fetchCounterFuture = ...; 465 * 466 * // Falling back to a zero counter in case an exception happens when 467 * // processing the RPC to fetch counters. 468 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 469 * fetchCounterFuture, FetchException.class, x -> immediateFuture(0), directExecutor()); 470 * }</pre> 471 * 472 * <p>The fallback can also choose to propagate the original exception when desired: 473 * 474 * <pre>{@code 475 * ListenableFuture<Integer> fetchCounterFuture = ...; 476 * 477 * // Falling back to a zero counter only in case the exception was a 478 * // TimeoutException. 479 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 480 * fetchCounterFuture, 481 * FetchException.class, 482 * e -> { 483 * if (omitDataOnFetchFailure) { 484 * return immediateFuture(0); 485 * } 486 * throw e; 487 * }, 488 * directExecutor()); 489 * }</pre> 490 * 491 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 492 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 493 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 494 * functions passed to this method. (Specifically, {@code directExecutor} functions should avoid 495 * heavyweight operations inside {@code AsyncFunction.apply}. Any heavyweight operations should 496 * occur in other threads responsible for completing the returned {@code Future}.) 497 * 498 * @param input the primary input {@code Future} 499 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 500 * type is matched against the input's exception. "The input's exception" means the cause of 501 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 502 * different kind of exception, that exception itself. To avoid hiding bugs and other 503 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 504 * Throwable.class} in particular. 505 * @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected 506 * exception type. The function's argument is the input's exception. "The input's exception" 507 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 508 * {@code get()} throws a different kind of exception, that exception itself. 509 * @param executor the executor that runs {@code fallback} if {@code input} fails 510 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 511 */ 512 @CanIgnoreReturnValue // TODO(kak): @CheckReturnValue 513 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 514 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 515 ListenableFuture<? extends V> input, 516 Class<X> exceptionType, 517 AsyncFunction<? super X, ? extends V> fallback, 518 Executor executor) { 519 return AbstractCatchingFuture.create(input, exceptionType, fallback, executor); 520 } 521 522 /** 523 * Returns a future that delegates to another but will finish early (via a {@link 524 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified duration expires. 525 * 526 * <p>The delegate future is interrupted and cancelled if it times out. 527 * 528 * @param delegate The future to delegate to. 529 * @param time when to timeout the future 530 * @param unit the time unit of the time parameter 531 * @param scheduledExecutor The executor service to enforce the timeout. 532 * @since 19.0 533 */ 534 @GwtIncompatible // java.util.concurrent.ScheduledExecutorService 535 public static <V> ListenableFuture<V> withTimeout( 536 ListenableFuture<V> delegate, 537 long time, 538 TimeUnit unit, 539 ScheduledExecutorService scheduledExecutor) { 540 return TimeoutFuture.create(delegate, time, unit, scheduledExecutor); 541 } 542 543 /** 544 * Returns a new {@code Future} whose result is asynchronously derived from the result of the 545 * given {@code Future}. If the given {@code Future} fails, the returned {@code Future} fails with 546 * the same exception (and the function is not invoked). 547 * 548 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 549 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 550 * Example usage: 551 * 552 * <pre>{@code 553 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 554 * ListenableFuture<QueryResult> queryFuture = 555 * transformAsync(rowKeyFuture, dataService::readFuture); 556 * }</pre> 557 * 558 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 559 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 560 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 561 * also applicable to heavyweight functions passed to this method. (Specifically, {@code 562 * directExecutor} functions should avoid heavyweight operations inside {@code 563 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 564 * completing the returned {@code Future}.) 565 * 566 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 567 * input future and that of the future returned by the function. That is, if the returned {@code 568 * Future} is cancelled, it will attempt to cancel the other two, and if either of the other two 569 * is cancelled, the returned {@code Future} will receive a callback in which it will attempt to 570 * cancel itself. 571 * 572 * @param input The future to transform 573 * @param function A function to transform the result of the input future to the result of the 574 * output future 575 * @return A future that holds result of the function (if the input succeeded) or the original 576 * input's failure (if not) 577 * @since 19.0 (in 11.0 as {@code transform}) 578 * @deprecated Use {@linkplain #transformAsync(ListenableFuture, AsyncFunction, Executor) the 579 * overload that requires an executor}. For identical behavior, pass {@link 580 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 581 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 582 * documentation. This method is scheduled to be removed in April 2018. 583 */ 584 @Deprecated 585 public static <I, O> ListenableFuture<O> transformAsync( 586 ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) { 587 return AbstractTransformFuture.create(input, function, directExecutor()); 588 } 589 590 /** 591 * Returns a new {@code Future} whose result is asynchronously derived from the result of the 592 * given {@code Future}. If the given {@code Future} fails, the returned {@code Future} fails with 593 * the same exception (and the function is not invoked). 594 * 595 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 596 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 597 * Example usage: 598 * 599 * <pre>{@code 600 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 601 * ListenableFuture<QueryResult> queryFuture = 602 * transformAsync(rowKeyFuture, dataService::readFuture, executor); 603 * }</pre> 604 * 605 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 606 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 607 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 608 * functions passed to this method. (Specifically, {@code directExecutor} functions should avoid 609 * heavyweight operations inside {@code AsyncFunction.apply}. Any heavyweight operations should 610 * occur in other threads responsible for completing the returned {@code Future}.) 611 * 612 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 613 * input future and that of the future returned by the chain function. That is, if the returned 614 * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the 615 * other two is cancelled, the returned {@code Future} will receive a callback in which it will 616 * attempt to cancel itself. 617 * 618 * @param input The future to transform 619 * @param function A function to transform the result of the input future to the result of the 620 * output future 621 * @param executor Executor to run the function in. 622 * @return A future that holds result of the function (if the input succeeded) or the original 623 * input's failure (if not) 624 * @since 19.0 (in 11.0 as {@code transform}) 625 */ 626 public static <I, O> ListenableFuture<O> transformAsync( 627 ListenableFuture<I> input, 628 AsyncFunction<? super I, ? extends O> function, 629 Executor executor) { 630 return AbstractTransformFuture.create(input, function, executor); 631 } 632 633 /** 634 * Returns a new {@code Future} whose result is derived from the result of the given {@code 635 * Future}. If {@code input} fails, the returned {@code Future} fails with the same exception (and 636 * the function is not invoked). Example usage: 637 * 638 * <pre>{@code 639 * ListenableFuture<QueryResult> queryFuture = ...; 640 * ListenableFuture<List<Row>> rowsFuture = 641 * transform(queryFuture, QueryResult::getRows); 642 * }</pre> 643 * 644 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 645 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 646 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 647 * also applicable to heavyweight functions passed to this method. 648 * 649 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 650 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 651 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 652 * in which it will attempt to cancel itself. 653 * 654 * <p>An example use of this method is to convert a serializable object returned from an RPC into 655 * a POJO. 656 * 657 * @param input The future to transform 658 * @param function A Function to transform the results of the provided future to the results of 659 * the returned future. This will be run in the thread that notifies input it is complete. 660 * @return A future that holds result of the transformation. 661 * @since 9.0 (in 1.0 as {@code compose}) 662 * @deprecated Use {@linkplain #transform(ListenableFuture, Function, Executor) the overload that 663 * requires an executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, 664 * but consider whether another executor would be safer, as discussed in the {@link 665 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 666 * scheduled to be removed in April 2018. 667 */ 668 @Deprecated 669 public static <I, O> ListenableFuture<O> transform( 670 ListenableFuture<I> input, Function<? super I, ? extends O> function) { 671 return AbstractTransformFuture.create(input, function, directExecutor()); 672 } 673 674 /** 675 * Returns a new {@code Future} whose result is derived from the result of the given {@code 676 * Future}. If {@code input} fails, the returned {@code Future} fails with the same exception (and 677 * the function is not invoked). Example usage: 678 * 679 * <pre>{@code 680 * ListenableFuture<QueryResult> queryFuture = ...; 681 * ListenableFuture<List<Row>> rowsFuture = 682 * transform(queryFuture, QueryResult::getRows, executor); 683 * }</pre> 684 * 685 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 686 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 687 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 688 * functions passed to this method. 689 * 690 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 691 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 692 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 693 * in which it will attempt to cancel itself. 694 * 695 * <p>An example use of this method is to convert a serializable object returned from an RPC into 696 * a POJO. 697 * 698 * @param input The future to transform 699 * @param function A Function to transform the results of the provided future to the results of 700 * the returned future. 701 * @param executor Executor to run the function in. 702 * @return A future that holds result of the transformation. 703 * @since 9.0 (in 2.0 as {@code compose}) 704 */ 705 public static <I, O> ListenableFuture<O> transform( 706 ListenableFuture<I> input, Function<? super I, ? extends O> function, Executor executor) { 707 return AbstractTransformFuture.create(input, function, executor); 708 } 709 710 /** 711 * Like {@link #transform(ListenableFuture, Function, Executor)} except that the transformation 712 * {@code function} is invoked on each call to {@link Future#get() get()} on the returned future. 713 * 714 * <p>The returned {@code Future} reflects the input's cancellation state directly, and any 715 * attempt to cancel the returned Future is likewise passed through to the input Future. 716 * 717 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} only apply the timeout 718 * to the execution of the underlying {@code Future}, <em>not</em> to the execution of the 719 * transformation function. 720 * 721 * <p>The primary audience of this method is callers of {@code transform} who don't have a {@code 722 * ListenableFuture} available and do not mind repeated, lazy function evaluation. 723 * 724 * @param input The future to transform 725 * @param function A Function to transform the results of the provided future to the results of 726 * the returned future. 727 * @return A future that returns the result of the transformation. 728 * @since 10.0 729 */ 730 @GwtIncompatible // TODO 731 public static <I, O> Future<O> lazyTransform( 732 final Future<I> input, final Function<? super I, ? extends O> function) { 733 checkNotNull(input); 734 checkNotNull(function); 735 return new Future<O>() { 736 737 @Override 738 public boolean cancel(boolean mayInterruptIfRunning) { 739 return input.cancel(mayInterruptIfRunning); 740 } 741 742 @Override 743 public boolean isCancelled() { 744 return input.isCancelled(); 745 } 746 747 @Override 748 public boolean isDone() { 749 return input.isDone(); 750 } 751 752 @Override 753 public O get() throws InterruptedException, ExecutionException { 754 return applyTransformation(input.get()); 755 } 756 757 @Override 758 public O get(long timeout, TimeUnit unit) 759 throws InterruptedException, ExecutionException, TimeoutException { 760 return applyTransformation(input.get(timeout, unit)); 761 } 762 763 private O applyTransformation(I input) throws ExecutionException { 764 try { 765 return function.apply(input); 766 } catch (Throwable t) { 767 throw new ExecutionException(t); 768 } 769 } 770 }; 771 } 772 773 /** 774 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 775 * input futures, if all succeed. 776 * 777 * <p>The list of results is in the same order as the input list. 778 * 779 * <p>Canceling this future will attempt to cancel all the component futures, and if any of the 780 * provided futures fails or is canceled, this one is, too. 781 * 782 * @param futures futures to combine 783 * @return a future that provides a list of the results of the component futures 784 * @since 10.0 785 */ 786 @Beta 787 @SafeVarargs 788 public static <V> ListenableFuture<List<V>> allAsList(ListenableFuture<? extends V>... futures) { 789 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 790 } 791 792 /** 793 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 794 * input futures, if all succeed. 795 * 796 * <p>The list of results is in the same order as the input list. 797 * 798 * <p>Canceling this future will attempt to cancel all the component futures, and if any of the 799 * provided futures fails or is canceled, this one is, too. 800 * 801 * @param futures futures to combine 802 * @return a future that provides a list of the results of the component futures 803 * @since 10.0 804 */ 805 @Beta 806 public static <V> ListenableFuture<List<V>> allAsList( 807 Iterable<? extends ListenableFuture<? extends V>> futures) { 808 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 809 } 810 811 /** 812 * Creates a {@link FutureCombiner} that processes the completed futures whether or not they're 813 * successful. 814 * 815 * @since 20.0 816 */ 817 @SafeVarargs 818 public static <V> FutureCombiner<V> whenAllComplete(ListenableFuture<? extends V>... futures) { 819 return new FutureCombiner<V>(false, ImmutableList.copyOf(futures)); 820 } 821 822 /** 823 * Creates a {@link FutureCombiner} that processes the completed futures whether or not they're 824 * successful. 825 * 826 * @since 20.0 827 */ 828 public static <V> FutureCombiner<V> whenAllComplete( 829 Iterable<? extends ListenableFuture<? extends V>> futures) { 830 return new FutureCombiner<V>(false, ImmutableList.copyOf(futures)); 831 } 832 833 /** 834 * Creates a {@link FutureCombiner} requiring that all passed in futures are successful. 835 * 836 * <p>If any input fails, the returned future fails immediately. 837 * 838 * @since 20.0 839 */ 840 @SafeVarargs 841 public static <V> FutureCombiner<V> whenAllSucceed(ListenableFuture<? extends V>... futures) { 842 return new FutureCombiner<V>(true, ImmutableList.copyOf(futures)); 843 } 844 845 /** 846 * Creates a {@link FutureCombiner} requiring that all passed in futures are successful. 847 * 848 * <p>If any input fails, the returned future fails immediately. 849 * 850 * @since 20.0 851 */ 852 public static <V> FutureCombiner<V> whenAllSucceed( 853 Iterable<? extends ListenableFuture<? extends V>> futures) { 854 return new FutureCombiner<V>(true, ImmutableList.copyOf(futures)); 855 } 856 857 /** 858 * A helper to create a new {@code ListenableFuture} whose result is generated from a combination 859 * of input futures. 860 * 861 * <p>See {@link #whenAllComplete} and {@link #whenAllSucceed} for how to instantiate this class. 862 * 863 * <p>Example: 864 * 865 * <pre>{@code 866 * final ListenableFuture<Instant> loginDateFuture = 867 * loginService.findLastLoginDate(username); 868 * final ListenableFuture<List<String>> recentCommandsFuture = 869 * recentCommandsService.findRecentCommands(username); 870 * Callable<UsageHistory> usageComputation = 871 * new Callable<UsageHistory>() { 872 * public UsageHistory call() throws Exception { 873 * return new UsageHistory( 874 * username, loginDateFuture.get(), recentCommandsFuture.get()); 875 * } 876 * }; 877 * ListenableFuture<UsageHistory> usageFuture = 878 * Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture) 879 * .call(usageComputation, executor); 880 * }</pre> 881 * 882 * @since 20.0 883 */ 884 @Beta 885 @CanIgnoreReturnValue // TODO(cpovirk): Consider removing, especially if we provide run(Runnable) 886 @GwtCompatible 887 public static final class FutureCombiner<V> { 888 private final boolean allMustSucceed; 889 private final ImmutableList<ListenableFuture<? extends V>> futures; 890 891 private FutureCombiner( 892 boolean allMustSucceed, ImmutableList<ListenableFuture<? extends V>> futures) { 893 this.allMustSucceed = allMustSucceed; 894 this.futures = futures; 895 } 896 897 /** 898 * Creates the {@link ListenableFuture} which will return the result of calling {@link 899 * AsyncCallable#call} in {@code combiner} when all futures complete, using the specified {@code 900 * executor}. 901 * 902 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 903 * cancelled. 904 * 905 * <p>If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code 906 * ExecutionException} will be extracted and returned as the cause of the new {@code 907 * ExecutionException} that gets thrown by the returned combined future. 908 * 909 * <p>Canceling this future will attempt to cancel all the component futures. 910 */ 911 public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner, Executor executor) { 912 return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner); 913 } 914 915 /** 916 * Like {@link #callAsync(AsyncCallable, Executor)} but using {@linkplain 917 * MoreExecutors#directExecutor direct executor}. 918 * 919 * @deprecated Use {@linkplain #callAsync(AsyncCallable, Executor) the overload that requires an 920 * executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, but 921 * consider whether another executor would be safer, as discussed in the {@link 922 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 923 * scheduled to be removed in April 2018. 924 */ 925 @Deprecated 926 public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner) { 927 return callAsync(combiner, directExecutor()); 928 } 929 930 /** 931 * Creates the {@link ListenableFuture} which will return the result of calling {@link 932 * Callable#call} in {@code combiner} when all futures complete, using the specified {@code 933 * executor}. 934 * 935 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 936 * cancelled. 937 * 938 * <p>If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code 939 * ExecutionException} will be extracted and returned as the cause of the new {@code 940 * ExecutionException} that gets thrown by the returned combined future. 941 * 942 * <p>Canceling this future will attempt to cancel all the component futures. 943 */ 944 @CanIgnoreReturnValue // TODO(cpovirk): Remove this 945 public <C> ListenableFuture<C> call(Callable<C> combiner, Executor executor) { 946 return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner); 947 } 948 949 /** 950 * Like {@link #call(Callable, Executor)} but using {@linkplain MoreExecutors#directExecutor 951 * direct executor}. 952 * 953 * @deprecated Use {@linkplain #call(Callable, Executor) the overload that requires an 954 * executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, but 955 * consider whether another executor would be safer, as discussed in the {@link 956 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 957 * scheduled to be removed in April 2018. 958 */ 959 @CanIgnoreReturnValue // TODO(cpovirk): Remove this 960 @Deprecated 961 public <C> ListenableFuture<C> call(Callable<C> combiner) { 962 return call(combiner, directExecutor()); 963 } 964 965 /** 966 * Creates the {@link ListenableFuture} which will return the result of running {@code combiner} 967 * when all Futures complete. {@code combiner} will run using {@code executor}. 968 * 969 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 970 * cancelled. 971 * 972 * <p>Canceling this Future will attempt to cancel all the component futures. 973 * 974 * @since 23.6 975 */ 976 public ListenableFuture<?> run(final Runnable combiner, Executor executor) { 977 return call( 978 new Callable<Void>() { 979 @Override 980 public Void call() throws Exception { 981 combiner.run(); 982 return null; 983 } 984 }, 985 executor); 986 } 987 } 988 989 /** 990 * Returns a {@code ListenableFuture} whose result is set from the supplied future when it 991 * completes. Cancelling the supplied future will also cancel the returned future, but cancelling 992 * the returned future will have no effect on the supplied future. 993 * 994 * @since 15.0 995 */ 996 public static <V> ListenableFuture<V> nonCancellationPropagating(ListenableFuture<V> future) { 997 if (future.isDone()) { 998 return future; 999 } 1000 NonCancellationPropagatingFuture<V> output = new NonCancellationPropagatingFuture<>(future); 1001 future.addListener(output, directExecutor()); 1002 return output; 1003 } 1004 1005 /** A wrapped future that does not propagate cancellation to its delegate. */ 1006 private static final class NonCancellationPropagatingFuture<V> 1007 extends AbstractFuture.TrustedFuture<V> implements Runnable { 1008 private ListenableFuture<V> delegate; 1009 1010 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 1011 this.delegate = delegate; 1012 } 1013 1014 @Override 1015 public void run() { 1016 // This prevents cancellation from propagating because we don't assign delegate until 1017 // delegate is already done, so calling cancel() on it is a no-op. 1018 ListenableFuture<V> localDelegate = delegate; 1019 if (localDelegate != null) { 1020 setFuture(localDelegate); 1021 } 1022 } 1023 1024 @Override 1025 protected String pendingToString() { 1026 ListenableFuture<V> localDelegate = delegate; 1027 if (localDelegate != null) { 1028 return "delegate=[" + localDelegate + "]"; 1029 } 1030 return null; 1031 } 1032 1033 @Override 1034 protected void afterDone() { 1035 delegate = null; 1036 } 1037 } 1038 1039 /** 1040 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 1041 * successful input futures. The list of results is in the same order as the input list, and if 1042 * any of the provided futures fails or is canceled, its corresponding position will contain 1043 * {@code null} (which is indistinguishable from the future having a successful value of {@code 1044 * null}). 1045 * 1046 * <p>Canceling this future will attempt to cancel all the component futures. 1047 * 1048 * @param futures futures to combine 1049 * @return a future that provides a list of the results of the component futures 1050 * @since 10.0 1051 */ 1052 @Beta 1053 @SafeVarargs 1054 public static <V> ListenableFuture<List<V>> successfulAsList( 1055 ListenableFuture<? extends V>... futures) { 1056 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1057 } 1058 1059 /** 1060 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 1061 * successful input futures. The list of results is in the same order as the input list, and if 1062 * any of the provided futures fails or is canceled, its corresponding position will contain 1063 * {@code null} (which is indistinguishable from the future having a successful value of {@code 1064 * null}). 1065 * 1066 * <p>Canceling this future will attempt to cancel all the component futures. 1067 * 1068 * @param futures futures to combine 1069 * @return a future that provides a list of the results of the component futures 1070 * @since 10.0 1071 */ 1072 @Beta 1073 public static <V> ListenableFuture<List<V>> successfulAsList( 1074 Iterable<? extends ListenableFuture<? extends V>> futures) { 1075 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1076 } 1077 1078 /** 1079 * Returns a list of delegate futures that correspond to the futures received in the order that 1080 * they complete. Delegate futures return the same value or throw the same exception as the 1081 * corresponding input future returns/throws. 1082 * 1083 * <p>"In the order that they complete" means, for practical purposes, about what you would 1084 * expect, but there are some subtleties. First, we do guarantee that, if the output future at 1085 * index n is done, the output future at index n-1 is also done. (But as usual with futures, some 1086 * listeners for future n may complete before some for future n-1.) However, it is possible, if 1087 * one input completes with result X and another later with result Y, for Y to come before X in 1088 * the output future list. (Such races are impossible to solve without global synchronization of 1089 * all future completions. And they should have little practical impact.) 1090 * 1091 * <p>Cancelling a delegate future propagates to input futures once all the delegates complete, 1092 * either from cancellation or because an input future has completed. If N futures are passed in, 1093 * and M delegates are cancelled, the remaining M input futures will be cancelled once N - M of 1094 * the input futures complete. If all the delegates are cancelled, all the input futures will be 1095 * too. 1096 * 1097 * @since 17.0 1098 */ 1099 @Beta 1100 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder( 1101 Iterable<? extends ListenableFuture<? extends T>> futures) { 1102 // Can't use Iterables.toArray because it's not gwt compatible 1103 final Collection<ListenableFuture<? extends T>> collection; 1104 if (futures instanceof Collection) { 1105 collection = (Collection<ListenableFuture<? extends T>>) futures; 1106 } else { 1107 collection = ImmutableList.copyOf(futures); 1108 } 1109 @SuppressWarnings("unchecked") 1110 ListenableFuture<? extends T>[] copy = 1111 (ListenableFuture<? extends T>[]) 1112 collection.toArray(new ListenableFuture[collection.size()]); 1113 final InCompletionOrderState<T> state = new InCompletionOrderState<>(copy); 1114 ImmutableList.Builder<AbstractFuture<T>> delegatesBuilder = ImmutableList.builder(); 1115 for (int i = 0; i < copy.length; i++) { 1116 delegatesBuilder.add(new InCompletionOrderFuture<T>(state)); 1117 } 1118 1119 final ImmutableList<AbstractFuture<T>> delegates = delegatesBuilder.build(); 1120 for (int i = 0; i < copy.length; i++) { 1121 final int localI = i; 1122 copy[i].addListener( 1123 new Runnable() { 1124 @Override 1125 public void run() { 1126 state.recordInputCompletion(delegates, localI); 1127 } 1128 }, 1129 directExecutor()); 1130 } 1131 1132 @SuppressWarnings("unchecked") 1133 ImmutableList<ListenableFuture<T>> delegatesCast = (ImmutableList) delegates; 1134 return delegatesCast; 1135 } 1136 1137 // This can't be a TrustedFuture, because TrustedFuture has clever optimizations that 1138 // mean cancel won't be called if this Future is passed into setFuture, and then 1139 // cancelled. 1140 private static final class InCompletionOrderFuture<T> extends AbstractFuture<T> { 1141 private InCompletionOrderState<T> state; 1142 1143 private InCompletionOrderFuture(InCompletionOrderState<T> state) { 1144 this.state = state; 1145 } 1146 1147 @Override 1148 public boolean cancel(boolean interruptIfRunning) { 1149 InCompletionOrderState<T> localState = state; 1150 if (super.cancel(interruptIfRunning)) { 1151 localState.recordOutputCancellation(interruptIfRunning); 1152 return true; 1153 } 1154 return false; 1155 } 1156 1157 @Override 1158 protected void afterDone() { 1159 state = null; 1160 } 1161 1162 @Override 1163 protected String pendingToString() { 1164 InCompletionOrderState<T> localState = state; 1165 if (localState != null) { 1166 // Don't print the actual array! We don't want inCompletionOrder(list).toString() to have 1167 // quadratic output. 1168 return "inputCount=[" 1169 + localState.inputFutures.length 1170 + "], remaining=[" 1171 + localState.incompleteOutputCount.get() 1172 + "]"; 1173 } 1174 return null; 1175 } 1176 } 1177 1178 private static final class InCompletionOrderState<T> { 1179 // A happens-before edge between the writes of these fields and their reads exists, because 1180 // in order to read these fields, the corresponding write to incompleteOutputCount must have 1181 // been read. 1182 private boolean wasCancelled = false; 1183 private boolean shouldInterrupt = true; 1184 private final AtomicInteger incompleteOutputCount; 1185 private final ListenableFuture<? extends T>[] inputFutures; 1186 private volatile int delegateIndex = 0; 1187 1188 private InCompletionOrderState(ListenableFuture<? extends T>[] inputFutures) { 1189 this.inputFutures = inputFutures; 1190 incompleteOutputCount = new AtomicInteger(inputFutures.length); 1191 } 1192 1193 private void recordOutputCancellation(boolean interruptIfRunning) { 1194 wasCancelled = true; 1195 // If all the futures were cancelled with interruption, cancel the input futures 1196 // with interruption; otherwise cancel without 1197 if (!interruptIfRunning) { 1198 shouldInterrupt = false; 1199 } 1200 recordCompletion(); 1201 } 1202 1203 private void recordInputCompletion( 1204 ImmutableList<AbstractFuture<T>> delegates, int inputFutureIndex) { 1205 ListenableFuture<? extends T> inputFuture = inputFutures[inputFutureIndex]; 1206 // Null out our reference to this future, so it can be GCed 1207 inputFutures[inputFutureIndex] = null; 1208 for (int i = delegateIndex; i < delegates.size(); i++) { 1209 if (delegates.get(i).setFuture(inputFuture)) { 1210 recordCompletion(); 1211 // this is technically unnecessary, but should speed up later accesses 1212 delegateIndex = i + 1; 1213 return; 1214 } 1215 } 1216 // If all the delegates were complete, no reason for the next listener to have to 1217 // go through the whole list. Avoids O(n^2) behavior when the entire output list is 1218 // cancelled. 1219 delegateIndex = delegates.size(); 1220 } 1221 1222 private void recordCompletion() { 1223 if (incompleteOutputCount.decrementAndGet() == 0 && wasCancelled) { 1224 for (ListenableFuture<?> toCancel : inputFutures) { 1225 if (toCancel != null) { 1226 toCancel.cancel(shouldInterrupt); 1227 } 1228 } 1229 } 1230 } 1231 } 1232 1233 /** 1234 * Registers separate success and failure callbacks to be run when the {@code Future}'s 1235 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 1236 * computation is already complete, immediately. 1237 * 1238 * <p>There is no guaranteed ordering of execution of callbacks, but any callback added through 1239 * this method is guaranteed to be called once the computation is complete. 1240 * 1241 * <p>Example: 1242 * 1243 * <pre>{@code 1244 * ListenableFuture<QueryResult> future = ...; 1245 * addCallback(future, 1246 * new FutureCallback<QueryResult>() { 1247 * public void onSuccess(QueryResult result) { 1248 * storeInCache(result); 1249 * } 1250 * public void onFailure(Throwable t) { 1251 * reportError(t); 1252 * } 1253 * }); 1254 * }</pre> 1255 * 1256 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 1257 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 1258 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 1259 * also applicable to heavyweight callbacks passed to this method. 1260 * 1261 * <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link 1262 * ListenableFuture#addListener addListener}. 1263 * 1264 * @param future The future attach the callback to. 1265 * @param callback The callback to invoke when {@code future} is completed. 1266 * @since 10.0 1267 * @deprecated Use {@linkplain #addCallback(ListenableFuture, FutureCallback, Executor) the 1268 * overload that requires an executor}. For identical behavior, pass {@link 1269 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 1270 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1271 * documentation. This method is scheduled to be removed in April 2018. 1272 */ 1273 @Deprecated 1274 public static <V> void addCallback( 1275 ListenableFuture<V> future, FutureCallback<? super V> callback) { 1276 addCallback(future, callback, directExecutor()); 1277 } 1278 1279 /** 1280 * Registers separate success and failure callbacks to be run when the {@code Future}'s 1281 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 1282 * computation is already complete, immediately. 1283 * 1284 * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of 1285 * callbacks, but any callback added through this method is guaranteed to be called once the 1286 * computation is complete. 1287 * 1288 * <p>Example: 1289 * 1290 * <pre>{@code 1291 * ListenableFuture<QueryResult> future = ...; 1292 * Executor e = ... 1293 * addCallback(future, 1294 * new FutureCallback<QueryResult>() { 1295 * public void onSuccess(QueryResult result) { 1296 * storeInCache(result); 1297 * } 1298 * public void onFailure(Throwable t) { 1299 * reportError(t); 1300 * } 1301 * }, e); 1302 * }</pre> 1303 * 1304 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 1305 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1306 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 1307 * callbacks passed to this method. 1308 * 1309 * <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link 1310 * ListenableFuture#addListener addListener}. 1311 * 1312 * @param future The future attach the callback to. 1313 * @param callback The callback to invoke when {@code future} is completed. 1314 * @param executor The executor to run {@code callback} when the future completes. 1315 * @since 10.0 1316 */ 1317 public static <V> void addCallback( 1318 final ListenableFuture<V> future, 1319 final FutureCallback<? super V> callback, 1320 Executor executor) { 1321 Preconditions.checkNotNull(callback); 1322 future.addListener(new CallbackListener<V>(future, callback), executor); 1323 } 1324 1325 /** See {@link #addCallback(ListenableFuture, FutureCallback, Executor)} for behavioral notes. */ 1326 private static final class CallbackListener<V> implements Runnable { 1327 final Future<V> future; 1328 final FutureCallback<? super V> callback; 1329 1330 CallbackListener(Future<V> future, FutureCallback<? super V> callback) { 1331 this.future = future; 1332 this.callback = callback; 1333 } 1334 1335 @Override 1336 public void run() { 1337 final V value; 1338 try { 1339 value = getDone(future); 1340 } catch (ExecutionException e) { 1341 callback.onFailure(e.getCause()); 1342 return; 1343 } catch (RuntimeException | Error e) { 1344 callback.onFailure(e); 1345 return; 1346 } 1347 callback.onSuccess(value); 1348 } 1349 1350 @Override 1351 public String toString() { 1352 return MoreObjects.toStringHelper(this).addValue(callback).toString(); 1353 } 1354 } 1355 1356 /** 1357 * Returns the result of the input {@code Future}, which must have already completed. 1358 * 1359 * <p>The benefits of this method are twofold. First, the name "getDone" suggests to readers that 1360 * the {@code Future} is already done. Second, if buggy code calls {@code getDone} on a {@code 1361 * Future} that is still pending, the program will throw instead of block. This can be important 1362 * for APIs like {@link #whenAllComplete whenAllComplete(...)}{@code .}{@link 1363 * FutureCombiner#call(Callable, Executor) call(...)}, where it is easy to use a new input from 1364 * the {@code call} implementation but forget to add it to the arguments of {@code 1365 * whenAllComplete}. 1366 * 1367 * <p>If you are looking for a method to determine whether a given {@code Future} is done, use the 1368 * instance method {@link Future#isDone()}. 1369 * 1370 * @throws ExecutionException if the {@code Future} failed with an exception 1371 * @throws CancellationException if the {@code Future} was cancelled 1372 * @throws IllegalStateException if the {@code Future} is not done 1373 * @since 20.0 1374 */ 1375 @CanIgnoreReturnValue 1376 // TODO(cpovirk): Consider calling getDone() in our own code. 1377 public static <V> V getDone(Future<V> future) throws ExecutionException { 1378 /* 1379 * We throw IllegalStateException, since the call could succeed later. Perhaps we "should" throw 1380 * IllegalArgumentException, since the call could succeed with a different argument. Those 1381 * exceptions' docs suggest that either is acceptable. Google's Java Practices page recommends 1382 * IllegalArgumentException here, in part to keep its recommendation simple: Static methods 1383 * should throw IllegalStateException only when they use static state. 1384 * 1385 * 1386 * Why do we deviate here? The answer: We want for fluentFuture.getDone() to throw the same 1387 * exception as Futures.getDone(fluentFuture). 1388 */ 1389 checkState(future.isDone(), "Future was expected to be done: %s", future); 1390 return getUninterruptibly(future); 1391 } 1392 1393 /** 1394 * Returns the result of {@link Future#get()}, converting most exceptions to a new instance of the 1395 * given checked exception type. This reduces boilerplate for a common use of {@code Future} in 1396 * which it is unnecessary to programmatically distinguish between exception types or to extract 1397 * other information from the exception instance. 1398 * 1399 * <p>Exceptions from {@code Future.get} are treated as follows: 1400 * 1401 * <ul> 1402 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@code X} if the cause 1403 * is a checked exception, an {@link UncheckedExecutionException} if the cause is a {@code 1404 * RuntimeException}, or an {@link ExecutionError} if the cause is an {@code Error}. 1405 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after restoring the 1406 * interrupt). 1407 * <li>Any {@link CancellationException} is propagated untouched, as is any other {@link 1408 * RuntimeException} (though {@code get} implementations are discouraged from throwing such 1409 * exceptions). 1410 * </ul> 1411 * 1412 * <p>The overall principle is to continue to treat every checked exception as a checked 1413 * exception, every unchecked exception as an unchecked exception, and every error as an error. In 1414 * addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the 1415 * new stack trace matches that of the current thread. 1416 * 1417 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor 1418 * that accepts zero or more arguments, all of type {@code String} or {@code Throwable} 1419 * (preferring constructors with at least one {@code String}) and calling the constructor via 1420 * reflection. If the exception did not already have a cause, one is set by calling {@link 1421 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code 1422 * IllegalArgumentException} is thrown. 1423 * 1424 * @throws X if {@code get} throws any checked exception except for an {@code ExecutionException} 1425 * whose cause is not itself a checked exception 1426 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a 1427 * {@code RuntimeException} as its cause 1428 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1429 * Error} as its cause 1430 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1431 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or 1432 * does not have a suitable constructor 1433 * @since 19.0 (in 10.0 as {@code get}) 1434 */ 1435 @CanIgnoreReturnValue 1436 @GwtIncompatible // reflection 1437 public static <V, X extends Exception> V getChecked(Future<V> future, Class<X> exceptionClass) 1438 throws X { 1439 return FuturesGetChecked.getChecked(future, exceptionClass); 1440 } 1441 1442 /** 1443 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most exceptions to a new 1444 * instance of the given checked exception type. This reduces boilerplate for a common use of 1445 * {@code Future} in which it is unnecessary to programmatically distinguish between exception 1446 * types or to extract other information from the exception instance. 1447 * 1448 * <p>Exceptions from {@code Future.get} are treated as follows: 1449 * 1450 * <ul> 1451 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@code X} if the cause 1452 * is a checked exception, an {@link UncheckedExecutionException} if the cause is a {@code 1453 * RuntimeException}, or an {@link ExecutionError} if the cause is an {@code Error}. 1454 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after restoring the 1455 * interrupt). 1456 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1457 * <li>Any {@link CancellationException} is propagated untouched, as is any other {@link 1458 * RuntimeException} (though {@code get} implementations are discouraged from throwing such 1459 * exceptions). 1460 * </ul> 1461 * 1462 * <p>The overall principle is to continue to treat every checked exception as a checked 1463 * exception, every unchecked exception as an unchecked exception, and every error as an error. In 1464 * addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the 1465 * new stack trace matches that of the current thread. 1466 * 1467 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor 1468 * that accepts zero or more arguments, all of type {@code String} or {@code Throwable} 1469 * (preferring constructors with at least one {@code String}) and calling the constructor via 1470 * reflection. If the exception did not already have a cause, one is set by calling {@link 1471 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code 1472 * IllegalArgumentException} is thrown. 1473 * 1474 * @throws X if {@code get} throws any checked exception except for an {@code ExecutionException} 1475 * whose cause is not itself a checked exception 1476 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a 1477 * {@code RuntimeException} as its cause 1478 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1479 * Error} as its cause 1480 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1481 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or 1482 * does not have a suitable constructor 1483 * @since 19.0 (in 10.0 as {@code get} and with different parameter order) 1484 */ 1485 @CanIgnoreReturnValue 1486 @GwtIncompatible // reflection 1487 public static <V, X extends Exception> V getChecked( 1488 Future<V> future, Class<X> exceptionClass, long timeout, TimeUnit unit) throws X { 1489 return FuturesGetChecked.getChecked(future, exceptionClass, timeout, unit); 1490 } 1491 1492 /** 1493 * Returns the result of calling {@link Future#get()} uninterruptibly on a task known not to throw 1494 * a checked exception. This makes {@code Future} more suitable for lightweight, fast-running 1495 * tasks that, barring bugs in the code, will not fail. This gives it exception-handling behavior 1496 * similar to that of {@code ForkJoinTask.join}. 1497 * 1498 * <p>Exceptions from {@code Future.get} are treated as follows: 1499 * 1500 * <ul> 1501 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@link 1502 * UncheckedExecutionException} (if the cause is an {@code Exception}) or {@link 1503 * ExecutionError} (if the cause is an {@code Error}). 1504 * <li>Any {@link InterruptedException} causes a retry of the {@code get} call. The interrupt is 1505 * restored before {@code getUnchecked} returns. 1506 * <li>Any {@link CancellationException} is propagated untouched. So is any other {@link 1507 * RuntimeException} ({@code get} implementations are discouraged from throwing such 1508 * exceptions). 1509 * </ul> 1510 * 1511 * <p>The overall principle is to eliminate all checked exceptions: to loop to avoid {@code 1512 * InterruptedException}, to pass through {@code CancellationException}, and to wrap any exception 1513 * from the underlying computation in an {@code UncheckedExecutionException} or {@code 1514 * ExecutionError}. 1515 * 1516 * <p>For an uninterruptible {@code get} that preserves other exceptions, see {@link 1517 * Uninterruptibles#getUninterruptibly(Future)}. 1518 * 1519 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with an 1520 * {@code Exception} as its cause 1521 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1522 * Error} as its cause 1523 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1524 * @since 10.0 1525 */ 1526 @CanIgnoreReturnValue 1527 public static <V> V getUnchecked(Future<V> future) { 1528 checkNotNull(future); 1529 try { 1530 return getUninterruptibly(future); 1531 } catch (ExecutionException e) { 1532 wrapAndThrowUnchecked(e.getCause()); 1533 throw new AssertionError(); 1534 } 1535 } 1536 1537 private static void wrapAndThrowUnchecked(Throwable cause) { 1538 if (cause instanceof Error) { 1539 throw new ExecutionError((Error) cause); 1540 } 1541 /* 1542 * It's an Exception. (Or it's a non-Error, non-Exception Throwable. From my survey of such 1543 * classes, I believe that most users intended to extend Exception, so we'll treat it like an 1544 * Exception.) 1545 */ 1546 throw new UncheckedExecutionException(cause); 1547 } 1548 1549 /* 1550 * Arguably we don't need a timed getUnchecked because any operation slow enough to require a 1551 * timeout is heavyweight enough to throw a checked exception and therefore be inappropriate to 1552 * use with getUnchecked. Further, it's not clear that converting the checked TimeoutException to 1553 * a RuntimeException -- especially to an UncheckedExecutionException, since it wasn't thrown by 1554 * the computation -- makes sense, and if we don't convert it, the user still has to write a 1555 * try-catch block. 1556 * 1557 * If you think you would use this method, let us know. You might also also look into the 1558 * Fork-Join framework: http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html 1559 */ 1560 1561 /** 1562 * A checked future that uses a function to map from exceptions to the appropriate checked type. 1563 */ 1564 @GwtIncompatible // TODO 1565 private static class MappingCheckedFuture<V, X extends Exception> 1566 extends AbstractCheckedFuture<V, X> { 1567 1568 final Function<? super Exception, X> mapper; 1569 1570 MappingCheckedFuture(ListenableFuture<V> delegate, Function<? super Exception, X> mapper) { 1571 super(delegate); 1572 1573 this.mapper = checkNotNull(mapper); 1574 } 1575 1576 @Override 1577 protected X mapException(Exception e) { 1578 return mapper.apply(e); 1579 } 1580 } 1581}