001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Preconditions.checkState; 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 021 022import com.google.common.annotations.Beta; 023import com.google.common.annotations.GwtCompatible; 024import com.google.common.annotations.GwtIncompatible; 025import com.google.common.base.Function; 026import com.google.common.base.MoreObjects; 027import com.google.common.base.Preconditions; 028import com.google.common.collect.ImmutableList; 029import com.google.common.util.concurrent.CollectionFuture.ListFuture; 030import com.google.common.util.concurrent.ImmediateFuture.ImmediateCancelledFuture; 031import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedCheckedFuture; 032import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedFuture; 033import com.google.common.util.concurrent.ImmediateFuture.ImmediateSuccessfulCheckedFuture; 034import com.google.common.util.concurrent.ImmediateFuture.ImmediateSuccessfulFuture; 035import com.google.errorprone.annotations.CanIgnoreReturnValue; 036import com.google.errorprone.annotations.DoNotCall; 037import java.util.Collection; 038import java.util.List; 039import java.util.concurrent.Callable; 040import java.util.concurrent.CancellationException; 041import java.util.concurrent.ExecutionException; 042import java.util.concurrent.Executor; 043import java.util.concurrent.Future; 044import java.util.concurrent.ScheduledExecutorService; 045import java.util.concurrent.TimeUnit; 046import java.util.concurrent.TimeoutException; 047import java.util.concurrent.atomic.AtomicInteger; 048import org.checkerframework.checker.nullness.compatqual.NullableDecl; 049 050/** 051 * Static utility methods pertaining to the {@link Future} interface. 052 * 053 * <p>Many of these methods use the {@link ListenableFuture} API; consult the Guava User Guide 054 * article on <a href="https://github.com/google/guava/wiki/ListenableFutureExplained">{@code 055 * ListenableFuture}</a>. 056 * 057 * <p>The main purpose of {@code ListenableFuture} is to help you chain together a graph of 058 * asynchronous operations. You can chain them together manually with calls to methods like {@link 059 * Futures#transform(ListenableFuture, Function, Executor) Futures.transform}, but you will often 060 * find it easier to use a framework. Frameworks automate the process, often adding features like 061 * monitoring, debugging, and cancellation. Examples of frameworks include: 062 * 063 * <ul> 064 * <li><a href="http://google.github.io/dagger/producers.html">Dagger Producers</a> 065 * </ul> 066 * 067 * <p>If you do chain your operations manually, you may want to use {@link FluentFuture}. 068 * 069 * @author Kevin Bourrillion 070 * @author Nishant Thakkar 071 * @author Sven Mawson 072 * @since 1.0 073 */ 074@Beta 075@GwtCompatible(emulated = true) 076public final class Futures extends GwtFuturesCatchingSpecialization { 077 078 // A note on memory visibility. 079 // Many of the utilities in this class (transform, withFallback, withTimeout, asList, combine) 080 // have two requirements that significantly complicate their design. 081 // 1. Cancellation should propagate from the returned future to the input future(s). 082 // 2. The returned futures shouldn't unnecessarily 'pin' their inputs after completion. 083 // 084 // A consequence of these requirements is that the delegate futures cannot be stored in 085 // final fields. 086 // 087 // For simplicity the rest of this description will discuss Futures.catching since it is the 088 // simplest instance, though very similar descriptions apply to many other classes in this file. 089 // 090 // In the constructor of AbstractCatchingFuture, the delegate future is assigned to a field 091 // 'inputFuture'. That field is non-final and non-volatile. There are 2 places where the 092 // 'inputFuture' field is read and where we will have to consider visibility of the write 093 // operation in the constructor. 094 // 095 // 1. In the listener that performs the callback. In this case it is fine since inputFuture is 096 // assigned prior to calling addListener, and addListener happens-before any invocation of the 097 // listener. Notably, this means that 'volatile' is unnecessary to make 'inputFuture' visible 098 // to the listener. 099 // 100 // 2. In done() where we may propagate cancellation to the input. In this case it is _not_ fine. 101 // There is currently nothing that enforces that the write to inputFuture in the constructor is 102 // visible to done(). This is because there is no happens before edge between the write and a 103 // (hypothetical) unsafe read by our caller. Note: adding 'volatile' does not fix this issue, 104 // it would just add an edge such that if done() observed non-null, then it would also 105 // definitely observe all earlier writes, but we still have no guarantee that done() would see 106 // the inital write (just stronger guarantees if it does). 107 // 108 // See: http://cs.oswego.edu/pipermail/concurrency-interest/2015-January/013800.html 109 // For a (long) discussion about this specific issue and the general futility of life. 110 // 111 // For the time being we are OK with the problem discussed above since it requires a caller to 112 // introduce a very specific kind of data-race. And given the other operations performed by these 113 // methods that involve volatile read/write operations, in practice there is no issue. Also, the 114 // way in such a visibility issue would surface is most likely as a failure of cancel() to 115 // propagate to the input. Cancellation propagation is fundamentally racy so this is fine. 116 // 117 // Future versions of the JMM may revise safe construction semantics in such a way that we can 118 // safely publish these objects and we won't need this whole discussion. 119 // TODO(user,lukes): consider adding volatile to all these fields since in current known JVMs 120 // that should resolve the issue. This comes at the cost of adding more write barriers to the 121 // implementations. 122 123 private Futures() {} 124 125 /** 126 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} and a {@link Function} 127 * that maps from {@link Exception} instances into the appropriate checked type. 128 * 129 * <p><b>Warning:</b> We recommend against using {@code CheckedFuture} in new projects. {@code 130 * CheckedFuture} is difficult to build libraries atop. {@code CheckedFuture} ports of methods 131 * like {@link Futures#transformAsync} have historically had bugs, and some of these bugs are 132 * necessary, unavoidable consequences of the {@code CheckedFuture} API. Additionally, {@code 133 * CheckedFuture} encourages users to take exceptions from one thread and rethrow them in another, 134 * producing confusing stack traces. 135 * 136 * <p>The given mapping function will be applied to an {@link InterruptedException}, a {@link 137 * CancellationException}, or an {@link ExecutionException}. See {@link Future#get()} for details 138 * on the exceptions thrown. 139 * 140 * @since 9.0 (source-compatible since 1.0) 141 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 142 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 143 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 144 * Additionally, it has a surprising policy about which exceptions to map and which to leave 145 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 146 * use, possibly specializing them to the particular exception type they use. We recommend 147 * that most people use {@code ListenableFuture} and perform any exception wrapping 148 * themselves. This method is scheduled for removal from Guava in July 2018. 149 */ 150 // TODO(b/72241575): Remove by 2018-07 151 @Deprecated 152 @GwtIncompatible // TODO 153 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 154 ListenableFuture<V> future, Function<? super Exception, X> mapper) { 155 return new MappingCheckedFuture<>(checkNotNull(future), mapper); 156 } 157 158 /** 159 * Creates a {@code ListenableFuture} which has its value set immediately upon construction. The 160 * getters just return the value. This {@code Future} can't be canceled or timed out and its 161 * {@code isDone()} method always returns {@code true}. 162 */ 163 public static <V> ListenableFuture<V> immediateFuture(@NullableDecl V value) { 164 if (value == null) { 165 // This cast is safe because null is assignable to V for all V (i.e. it is covariant) 166 @SuppressWarnings({"unchecked", "rawtypes"}) 167 ListenableFuture<V> typedNull = (ListenableFuture) ImmediateSuccessfulFuture.NULL; 168 return typedNull; 169 } 170 return new ImmediateSuccessfulFuture<V>(value); 171 } 172 173 /** 174 * Returns a {@code CheckedFuture} which has its value set immediately upon construction. 175 * 176 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 177 * returns {@code true}. Calling {@code get()} or {@code checkedGet()} will immediately return the 178 * provided value. 179 * 180 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 181 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 182 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 183 * Additionally, it has a surprising policy about which exceptions to map and which to leave 184 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 185 * use, possibly specializing them to the particular exception type they use. We recommend 186 * that most people use {@code ListenableFuture} and perform any exception wrapping 187 * themselves. This method is scheduled for removal from Guava in July 2018. 188 */ 189 // TODO(b/72241893): Remove by 2018-07 190 @Deprecated 191 @GwtIncompatible // TODO 192 public static <V, X extends Exception> CheckedFuture<V, X> immediateCheckedFuture( 193 @NullableDecl V value) { 194 return new ImmediateSuccessfulCheckedFuture<>(value); 195 } 196 197 /** 198 * Returns a {@code ListenableFuture} which has an exception set immediately upon construction. 199 * 200 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 201 * returns {@code true}. Calling {@code get()} will immediately throw the provided {@code 202 * Throwable} wrapped in an {@code ExecutionException}. 203 */ 204 public static <V> ListenableFuture<V> immediateFailedFuture(Throwable throwable) { 205 checkNotNull(throwable); 206 return new ImmediateFailedFuture<V>(throwable); 207 } 208 209 /** 210 * Creates a {@code ListenableFuture} which is cancelled immediately upon construction, so that 211 * {@code isCancelled()} always returns {@code true}. 212 * 213 * @since 14.0 214 */ 215 public static <V> ListenableFuture<V> immediateCancelledFuture() { 216 return new ImmediateCancelledFuture<V>(); 217 } 218 219 /** 220 * Returns a {@code CheckedFuture} which has an exception set immediately upon construction. 221 * 222 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 223 * returns {@code true}. Calling {@code get()} will immediately throw the provided {@code 224 * Exception} wrapped in an {@code ExecutionException}, and calling {@code checkedGet()} will 225 * throw the provided exception itself. 226 * 227 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 228 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 229 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 230 * Additionally, it has a surprising policy about which exceptions to map and which to leave 231 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 232 * use, possibly specializing them to the particular exception type they use. We recommend 233 * that most people use {@code ListenableFuture} and perform any exception wrapping 234 * themselves. This method is scheduled for removal from Guava in July 2018. 235 */ 236 // TODO(b/72241500): Remove by 2018-07 237 @Deprecated 238 @GwtIncompatible // TODO 239 public static <V, X extends Exception> CheckedFuture<V, X> immediateFailedCheckedFuture( 240 X exception) { 241 checkNotNull(exception); 242 return new ImmediateFailedCheckedFuture<>(exception); 243 } 244 245 /** 246 * Executes {@code callable} on the specified {@code executor}, returning a {@code Future}. 247 * 248 * @throws RejectedExecutionException if the task cannot be scheduled for execution 249 * @since 23.0 250 */ 251 public static <O> ListenableFuture<O> submitAsync(AsyncCallable<O> callable, Executor executor) { 252 TrustedListenableFutureTask<O> task = TrustedListenableFutureTask.create(callable); 253 executor.execute(task); 254 return task; 255 } 256 257 /** 258 * Schedules {@code callable} on the specified {@code executor}, returning a {@code Future}. 259 * 260 * @throws RejectedExecutionException if the task cannot be scheduled for execution 261 * @since 23.0 262 */ 263 @GwtIncompatible // java.util.concurrent.ScheduledExecutorService 264 public static <O> ListenableFuture<O> scheduleAsync( 265 AsyncCallable<O> callable, 266 long delay, 267 TimeUnit timeUnit, 268 ScheduledExecutorService executorService) { 269 TrustedListenableFutureTask<O> task = TrustedListenableFutureTask.create(callable); 270 final Future<?> scheduled = executorService.schedule(task, delay, timeUnit); 271 task.addListener( 272 new Runnable() { 273 @Override 274 public void run() { 275 // Don't want to interrupt twice 276 scheduled.cancel(false); 277 } 278 }, 279 directExecutor()); 280 return task; 281 } 282 283 /** 284 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 285 * primary input fails with the given {@code exceptionType}, from the result provided by the 286 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 287 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 288 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 289 * Future}. 290 * 291 * <p>Usage example: 292 * 293 * <pre>{@code 294 * ListenableFuture<Integer> fetchCounterFuture = ...; 295 * 296 * // Falling back to a zero counter in case an exception happens when 297 * // processing the RPC to fetch counters. 298 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 299 * fetchCounterFuture, FetchException.class, x -> 0); 300 * }</pre> 301 * 302 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 303 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 304 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 305 * also applicable to heavyweight functions passed to this method. 306 * 307 * @param input the primary input {@code Future} 308 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 309 * type is matched against the input's exception. "The input's exception" means the cause of 310 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 311 * different kind of exception, that exception itself. To avoid hiding bugs and other 312 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 313 * Throwable.class} in particular. 314 * @param fallback the {@link Function} to be called if {@code input} fails with the expected 315 * exception type. The function's argument is the input's exception. "The input's exception" 316 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 317 * {@code get()} throws a different kind of exception, that exception itself. 318 * @since 19.0 319 * @deprecated Use {@linkplain #catching(ListenableFuture, Class, Function, Executor) the overload 320 * that requires an executor}. For identical behavior, pass {@link 321 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 322 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 323 * documentation. This method is scheduled to be removed in July 2018. 324 */ 325 @Deprecated 326 @DoNotCall 327 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 328 public static <V, X extends Throwable> ListenableFuture<V> catching( 329 ListenableFuture<? extends V> input, 330 Class<X> exceptionType, 331 Function<? super X, ? extends V> fallback) { 332 return AbstractCatchingFuture.create(input, exceptionType, fallback, directExecutor()); 333 } 334 335 /** 336 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 337 * primary input fails with the given {@code exceptionType}, from the result provided by the 338 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 339 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 340 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 341 * Future}. 342 * 343 * <p>Usage example: 344 * 345 * <pre>{@code 346 * ListenableFuture<Integer> fetchCounterFuture = ...; 347 * 348 * // Falling back to a zero counter in case an exception happens when 349 * // processing the RPC to fetch counters. 350 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 351 * fetchCounterFuture, FetchException.class, x -> 0, directExecutor()); 352 * }</pre> 353 * 354 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 355 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 356 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 357 * functions passed to this method. 358 * 359 * @param input the primary input {@code Future} 360 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 361 * type is matched against the input's exception. "The input's exception" means the cause of 362 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 363 * different kind of exception, that exception itself. To avoid hiding bugs and other 364 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 365 * Throwable.class} in particular. 366 * @param fallback the {@link Function} to be called if {@code input} fails with the expected 367 * exception type. The function's argument is the input's exception. "The input's exception" 368 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 369 * {@code get()} throws a different kind of exception, that exception itself. 370 * @param executor the executor that runs {@code fallback} if {@code input} fails 371 * @since 19.0 372 */ 373 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 374 public static <V, X extends Throwable> ListenableFuture<V> catching( 375 ListenableFuture<? extends V> input, 376 Class<X> exceptionType, 377 Function<? super X, ? extends V> fallback, 378 Executor executor) { 379 return AbstractCatchingFuture.create(input, exceptionType, fallback, executor); 380 } 381 382 /** 383 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 384 * primary input fails with the given {@code exceptionType}, from the result provided by the 385 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 386 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 387 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 388 * {@code Future}. 389 * 390 * <p>Usage examples: 391 * 392 * <pre>{@code 393 * ListenableFuture<Integer> fetchCounterFuture = ...; 394 * 395 * // Falling back to a zero counter in case an exception happens when 396 * // processing the RPC to fetch counters. 397 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 398 * fetchCounterFuture, FetchException.class, x -> immediateFuture(0)); 399 * }</pre> 400 * 401 * <p>The fallback can also choose to propagate the original exception when desired: 402 * 403 * <pre>{@code 404 * ListenableFuture<Integer> fetchCounterFuture = ...; 405 * 406 * // Falling back to a zero counter only in case the exception was a 407 * // TimeoutException. 408 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 409 * fetchCounterFuture, 410 * FetchException.class, 411 * e -> { 412 * if (omitDataOnFetchFailure) { 413 * return immediateFuture(0); 414 * } 415 * throw e; 416 * }); 417 * }</pre> 418 * 419 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 420 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 421 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 422 * also applicable to heavyweight functions passed to this method. (Specifically, {@code 423 * directExecutor} functions should avoid heavyweight operations inside {@code 424 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 425 * completing the returned {@code Future}.) 426 * 427 * @param input the primary input {@code Future} 428 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 429 * type is matched against the input's exception. "The input's exception" means the cause of 430 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 431 * different kind of exception, that exception itself. To avoid hiding bugs and other 432 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 433 * Throwable.class} in particular. 434 * @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected 435 * exception type. The function's argument is the input's exception. "The input's exception" 436 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 437 * {@code get()} throws a different kind of exception, that exception itself. 438 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 439 * @deprecated Use {@linkplain #catchingAsync(ListenableFuture, Class, AsyncFunction, Executor) 440 * the overload that requires an executor}. For identical behavior, pass {@link 441 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 442 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 443 * documentation. This method is scheduled to be removed in July 2018. 444 */ 445 @CanIgnoreReturnValue // TODO(kak): @CheckReturnValue 446 @Deprecated 447 @DoNotCall 448 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 449 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 450 ListenableFuture<? extends V> input, 451 Class<X> exceptionType, 452 AsyncFunction<? super X, ? extends V> fallback) { 453 return AbstractCatchingFuture.create(input, exceptionType, fallback, directExecutor()); 454 } 455 456 /** 457 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 458 * primary input fails with the given {@code exceptionType}, from the result provided by the 459 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 460 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 461 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 462 * {@code Future}. 463 * 464 * <p>Usage examples: 465 * 466 * <pre>{@code 467 * ListenableFuture<Integer> fetchCounterFuture = ...; 468 * 469 * // Falling back to a zero counter in case an exception happens when 470 * // processing the RPC to fetch counters. 471 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 472 * fetchCounterFuture, FetchException.class, x -> immediateFuture(0), directExecutor()); 473 * }</pre> 474 * 475 * <p>The fallback can also choose to propagate the original exception when desired: 476 * 477 * <pre>{@code 478 * ListenableFuture<Integer> fetchCounterFuture = ...; 479 * 480 * // Falling back to a zero counter only in case the exception was a 481 * // TimeoutException. 482 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 483 * fetchCounterFuture, 484 * FetchException.class, 485 * e -> { 486 * if (omitDataOnFetchFailure) { 487 * return immediateFuture(0); 488 * } 489 * throw e; 490 * }, 491 * directExecutor()); 492 * }</pre> 493 * 494 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 495 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 496 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 497 * functions passed to this method. (Specifically, {@code directExecutor} functions should avoid 498 * heavyweight operations inside {@code AsyncFunction.apply}. Any heavyweight operations should 499 * occur in other threads responsible for completing the returned {@code Future}.) 500 * 501 * @param input the primary input {@code Future} 502 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 503 * type is matched against the input's exception. "The input's exception" means the cause of 504 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 505 * different kind of exception, that exception itself. To avoid hiding bugs and other 506 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 507 * Throwable.class} in particular. 508 * @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected 509 * exception type. The function's argument is the input's exception. "The input's exception" 510 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 511 * {@code get()} throws a different kind of exception, that exception itself. 512 * @param executor the executor that runs {@code fallback} if {@code input} fails 513 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 514 */ 515 @CanIgnoreReturnValue // TODO(kak): @CheckReturnValue 516 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 517 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 518 ListenableFuture<? extends V> input, 519 Class<X> exceptionType, 520 AsyncFunction<? super X, ? extends V> fallback, 521 Executor executor) { 522 return AbstractCatchingFuture.create(input, exceptionType, fallback, executor); 523 } 524 525 /** 526 * Returns a future that delegates to another but will finish early (via a {@link 527 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified duration expires. 528 * 529 * <p>The delegate future is interrupted and cancelled if it times out. 530 * 531 * @param delegate The future to delegate to. 532 * @param time when to timeout the future 533 * @param unit the time unit of the time parameter 534 * @param scheduledExecutor The executor service to enforce the timeout. 535 * @since 19.0 536 */ 537 @GwtIncompatible // java.util.concurrent.ScheduledExecutorService 538 public static <V> ListenableFuture<V> withTimeout( 539 ListenableFuture<V> delegate, 540 long time, 541 TimeUnit unit, 542 ScheduledExecutorService scheduledExecutor) { 543 return TimeoutFuture.create(delegate, time, unit, scheduledExecutor); 544 } 545 546 /** 547 * Returns a new {@code Future} whose result is asynchronously derived from the result of the 548 * given {@code Future}. If the given {@code Future} fails, the returned {@code Future} fails with 549 * the same exception (and the function is not invoked). 550 * 551 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 552 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 553 * Example usage: 554 * 555 * <pre>{@code 556 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 557 * ListenableFuture<QueryResult> queryFuture = 558 * transformAsync(rowKeyFuture, dataService::readFuture); 559 * }</pre> 560 * 561 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 562 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 563 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 564 * also applicable to heavyweight functions passed to this method. (Specifically, {@code 565 * directExecutor} functions should avoid heavyweight operations inside {@code 566 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 567 * completing the returned {@code Future}.) 568 * 569 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 570 * input future and that of the future returned by the function. That is, if the returned {@code 571 * Future} is cancelled, it will attempt to cancel the other two, and if either of the other two 572 * is cancelled, the returned {@code Future} will receive a callback in which it will attempt to 573 * cancel itself. 574 * 575 * @param input The future to transform 576 * @param function A function to transform the result of the input future to the result of the 577 * output future 578 * @return A future that holds result of the function (if the input succeeded) or the original 579 * input's failure (if not) 580 * @since 19.0 (in 11.0 as {@code transform}) 581 * @deprecated Use {@linkplain #transformAsync(ListenableFuture, AsyncFunction, Executor) the 582 * overload that requires an executor}. For identical behavior, pass {@link 583 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 584 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 585 * documentation. This method is scheduled to be removed in July 2018. 586 */ 587 @Deprecated 588 @DoNotCall 589 public static <I, O> ListenableFuture<O> transformAsync( 590 ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) { 591 return AbstractTransformFuture.create(input, function, directExecutor()); 592 } 593 594 /** 595 * Returns a new {@code Future} whose result is asynchronously derived from the result of the 596 * given {@code Future}. If the given {@code Future} fails, the returned {@code Future} fails with 597 * the same exception (and the function is not invoked). 598 * 599 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 600 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 601 * Example usage: 602 * 603 * <pre>{@code 604 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 605 * ListenableFuture<QueryResult> queryFuture = 606 * transformAsync(rowKeyFuture, dataService::readFuture, executor); 607 * }</pre> 608 * 609 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 610 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 611 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 612 * functions passed to this method. (Specifically, {@code directExecutor} functions should avoid 613 * heavyweight operations inside {@code AsyncFunction.apply}. Any heavyweight operations should 614 * occur in other threads responsible for completing the returned {@code Future}.) 615 * 616 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 617 * input future and that of the future returned by the chain function. That is, if the returned 618 * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the 619 * other two is cancelled, the returned {@code Future} will receive a callback in which it will 620 * attempt to cancel itself. 621 * 622 * @param input The future to transform 623 * @param function A function to transform the result of the input future to the result of the 624 * output future 625 * @param executor Executor to run the function in. 626 * @return A future that holds result of the function (if the input succeeded) or the original 627 * input's failure (if not) 628 * @since 19.0 (in 11.0 as {@code transform}) 629 */ 630 public static <I, O> ListenableFuture<O> transformAsync( 631 ListenableFuture<I> input, 632 AsyncFunction<? super I, ? extends O> function, 633 Executor executor) { 634 return AbstractTransformFuture.create(input, function, executor); 635 } 636 637 /** 638 * Returns a new {@code Future} whose result is derived from the result of the given {@code 639 * Future}. If {@code input} fails, the returned {@code Future} fails with the same exception (and 640 * the function is not invoked). Example usage: 641 * 642 * <pre>{@code 643 * ListenableFuture<QueryResult> queryFuture = ...; 644 * ListenableFuture<List<Row>> rowsFuture = 645 * transform(queryFuture, QueryResult::getRows); 646 * }</pre> 647 * 648 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 649 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 650 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 651 * also applicable to heavyweight functions passed to this method. 652 * 653 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 654 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 655 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 656 * in which it will attempt to cancel itself. 657 * 658 * <p>An example use of this method is to convert a serializable object returned from an RPC into 659 * a POJO. 660 * 661 * @param input The future to transform 662 * @param function A Function to transform the results of the provided future to the results of 663 * the returned future. This will be run in the thread that notifies input it is complete. 664 * @return A future that holds result of the transformation. 665 * @since 9.0 (in 1.0 as {@code compose}) 666 * @deprecated Use {@linkplain #transform(ListenableFuture, Function, Executor) the overload that 667 * requires an executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, 668 * but consider whether another executor would be safer, as discussed in the {@link 669 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 670 * scheduled to be removed in July 2018. 671 */ 672 @Deprecated 673 @DoNotCall 674 public static <I, O> ListenableFuture<O> transform( 675 ListenableFuture<I> input, Function<? super I, ? extends O> function) { 676 return AbstractTransformFuture.create(input, function, directExecutor()); 677 } 678 679 /** 680 * Returns a new {@code Future} whose result is derived from the result of the given {@code 681 * Future}. If {@code input} fails, the returned {@code Future} fails with the same exception (and 682 * the function is not invoked). Example usage: 683 * 684 * <pre>{@code 685 * ListenableFuture<QueryResult> queryFuture = ...; 686 * ListenableFuture<List<Row>> rowsFuture = 687 * transform(queryFuture, QueryResult::getRows, executor); 688 * }</pre> 689 * 690 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 691 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 692 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 693 * functions passed to this method. 694 * 695 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 696 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 697 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 698 * in which it will attempt to cancel itself. 699 * 700 * <p>An example use of this method is to convert a serializable object returned from an RPC into 701 * a POJO. 702 * 703 * @param input The future to transform 704 * @param function A Function to transform the results of the provided future to the results of 705 * the returned future. 706 * @param executor Executor to run the function in. 707 * @return A future that holds result of the transformation. 708 * @since 9.0 (in 2.0 as {@code compose}) 709 */ 710 public static <I, O> ListenableFuture<O> transform( 711 ListenableFuture<I> input, Function<? super I, ? extends O> function, Executor executor) { 712 return AbstractTransformFuture.create(input, function, executor); 713 } 714 715 /** 716 * Like {@link #transform(ListenableFuture, Function, Executor)} except that the transformation 717 * {@code function} is invoked on each call to {@link Future#get() get()} on the returned future. 718 * 719 * <p>The returned {@code Future} reflects the input's cancellation state directly, and any 720 * attempt to cancel the returned Future is likewise passed through to the input Future. 721 * 722 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} only apply the timeout 723 * to the execution of the underlying {@code Future}, <em>not</em> to the execution of the 724 * transformation function. 725 * 726 * <p>The primary audience of this method is callers of {@code transform} who don't have a {@code 727 * ListenableFuture} available and do not mind repeated, lazy function evaluation. 728 * 729 * @param input The future to transform 730 * @param function A Function to transform the results of the provided future to the results of 731 * the returned future. 732 * @return A future that returns the result of the transformation. 733 * @since 10.0 734 */ 735 @GwtIncompatible // TODO 736 public static <I, O> Future<O> lazyTransform( 737 final Future<I> input, final Function<? super I, ? extends O> function) { 738 checkNotNull(input); 739 checkNotNull(function); 740 return new Future<O>() { 741 742 @Override 743 public boolean cancel(boolean mayInterruptIfRunning) { 744 return input.cancel(mayInterruptIfRunning); 745 } 746 747 @Override 748 public boolean isCancelled() { 749 return input.isCancelled(); 750 } 751 752 @Override 753 public boolean isDone() { 754 return input.isDone(); 755 } 756 757 @Override 758 public O get() throws InterruptedException, ExecutionException { 759 return applyTransformation(input.get()); 760 } 761 762 @Override 763 public O get(long timeout, TimeUnit unit) 764 throws InterruptedException, ExecutionException, TimeoutException { 765 return applyTransformation(input.get(timeout, unit)); 766 } 767 768 private O applyTransformation(I input) throws ExecutionException { 769 try { 770 return function.apply(input); 771 } catch (Throwable t) { 772 throw new ExecutionException(t); 773 } 774 } 775 }; 776 } 777 778 /** 779 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 780 * input futures, if all succeed. 781 * 782 * <p>The list of results is in the same order as the input list. 783 * 784 * <p>Canceling this future will attempt to cancel all the component futures, and if any of the 785 * provided futures fails or is canceled, this one is, too. 786 * 787 * @param futures futures to combine 788 * @return a future that provides a list of the results of the component futures 789 * @since 10.0 790 */ 791 @Beta 792 @SafeVarargs 793 public static <V> ListenableFuture<List<V>> allAsList(ListenableFuture<? extends V>... futures) { 794 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 795 } 796 797 /** 798 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 799 * input futures, if all succeed. 800 * 801 * <p>The list of results is in the same order as the input list. 802 * 803 * <p>Canceling this future will attempt to cancel all the component futures, and if any of the 804 * provided futures fails or is canceled, this one is, too. 805 * 806 * @param futures futures to combine 807 * @return a future that provides a list of the results of the component futures 808 * @since 10.0 809 */ 810 @Beta 811 public static <V> ListenableFuture<List<V>> allAsList( 812 Iterable<? extends ListenableFuture<? extends V>> futures) { 813 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 814 } 815 816 /** 817 * Creates a {@link FutureCombiner} that processes the completed futures whether or not they're 818 * successful. 819 * 820 * @since 20.0 821 */ 822 @SafeVarargs 823 public static <V> FutureCombiner<V> whenAllComplete(ListenableFuture<? extends V>... futures) { 824 return new FutureCombiner<V>(false, ImmutableList.copyOf(futures)); 825 } 826 827 /** 828 * Creates a {@link FutureCombiner} that processes the completed futures whether or not they're 829 * successful. 830 * 831 * @since 20.0 832 */ 833 public static <V> FutureCombiner<V> whenAllComplete( 834 Iterable<? extends ListenableFuture<? extends V>> futures) { 835 return new FutureCombiner<V>(false, ImmutableList.copyOf(futures)); 836 } 837 838 /** 839 * Creates a {@link FutureCombiner} requiring that all passed in futures are successful. 840 * 841 * <p>If any input fails, the returned future fails immediately. 842 * 843 * @since 20.0 844 */ 845 @SafeVarargs 846 public static <V> FutureCombiner<V> whenAllSucceed(ListenableFuture<? extends V>... futures) { 847 return new FutureCombiner<V>(true, ImmutableList.copyOf(futures)); 848 } 849 850 /** 851 * Creates a {@link FutureCombiner} requiring that all passed in futures are successful. 852 * 853 * <p>If any input fails, the returned future fails immediately. 854 * 855 * @since 20.0 856 */ 857 public static <V> FutureCombiner<V> whenAllSucceed( 858 Iterable<? extends ListenableFuture<? extends V>> futures) { 859 return new FutureCombiner<V>(true, ImmutableList.copyOf(futures)); 860 } 861 862 /** 863 * A helper to create a new {@code ListenableFuture} whose result is generated from a combination 864 * of input futures. 865 * 866 * <p>See {@link #whenAllComplete} and {@link #whenAllSucceed} for how to instantiate this class. 867 * 868 * <p>Example: 869 * 870 * <pre>{@code 871 * final ListenableFuture<Instant> loginDateFuture = 872 * loginService.findLastLoginDate(username); 873 * final ListenableFuture<List<String>> recentCommandsFuture = 874 * recentCommandsService.findRecentCommands(username); 875 * Callable<UsageHistory> usageComputation = 876 * new Callable<UsageHistory>() { 877 * public UsageHistory call() throws Exception { 878 * return new UsageHistory( 879 * username, loginDateFuture.get(), recentCommandsFuture.get()); 880 * } 881 * }; 882 * ListenableFuture<UsageHistory> usageFuture = 883 * Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture) 884 * .call(usageComputation, executor); 885 * }</pre> 886 * 887 * @since 20.0 888 */ 889 @Beta 890 @CanIgnoreReturnValue // TODO(cpovirk): Consider removing, especially if we provide run(Runnable) 891 @GwtCompatible 892 public static final class FutureCombiner<V> { 893 private final boolean allMustSucceed; 894 private final ImmutableList<ListenableFuture<? extends V>> futures; 895 896 private FutureCombiner( 897 boolean allMustSucceed, ImmutableList<ListenableFuture<? extends V>> futures) { 898 this.allMustSucceed = allMustSucceed; 899 this.futures = futures; 900 } 901 902 /** 903 * Creates the {@link ListenableFuture} which will return the result of calling {@link 904 * AsyncCallable#call} in {@code combiner} when all futures complete, using the specified {@code 905 * executor}. 906 * 907 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 908 * cancelled. 909 * 910 * <p>If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code 911 * ExecutionException} will be extracted and returned as the cause of the new {@code 912 * ExecutionException} that gets thrown by the returned combined future. 913 * 914 * <p>Canceling this future will attempt to cancel all the component futures. 915 */ 916 public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner, Executor executor) { 917 return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner); 918 } 919 920 /** 921 * Like {@link #callAsync(AsyncCallable, Executor)} but using {@linkplain 922 * MoreExecutors#directExecutor direct executor}. 923 * 924 * @deprecated Use {@linkplain #callAsync(AsyncCallable, Executor) the overload that requires an 925 * executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, but 926 * consider whether another executor would be safer, as discussed in the {@link 927 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 928 * scheduled to be removed in July 2018. 929 */ 930 @Deprecated 931 @DoNotCall 932 public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner) { 933 return callAsync(combiner, directExecutor()); 934 } 935 936 /** 937 * Creates the {@link ListenableFuture} which will return the result of calling {@link 938 * Callable#call} in {@code combiner} when all futures complete, using the specified {@code 939 * executor}. 940 * 941 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 942 * cancelled. 943 * 944 * <p>If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code 945 * ExecutionException} will be extracted and returned as the cause of the new {@code 946 * ExecutionException} that gets thrown by the returned combined future. 947 * 948 * <p>Canceling this future will attempt to cancel all the component futures. 949 */ 950 @CanIgnoreReturnValue // TODO(cpovirk): Remove this 951 public <C> ListenableFuture<C> call(Callable<C> combiner, Executor executor) { 952 return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner); 953 } 954 955 /** 956 * Like {@link #call(Callable, Executor)} but using {@linkplain MoreExecutors#directExecutor 957 * direct executor}. 958 * 959 * @deprecated Use {@linkplain #call(Callable, Executor) the overload that requires an 960 * executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, but 961 * consider whether another executor would be safer, as discussed in the {@link 962 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 963 * scheduled to be removed in July 2018. 964 */ 965 @CanIgnoreReturnValue // TODO(cpovirk): Remove this 966 @Deprecated 967 @DoNotCall 968 public <C> ListenableFuture<C> call(Callable<C> combiner) { 969 return call(combiner, directExecutor()); 970 } 971 972 /** 973 * Creates the {@link ListenableFuture} which will return the result of running {@code combiner} 974 * when all Futures complete. {@code combiner} will run using {@code executor}. 975 * 976 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 977 * cancelled. 978 * 979 * <p>Canceling this Future will attempt to cancel all the component futures. 980 * 981 * @since 23.6 982 */ 983 public ListenableFuture<?> run(final Runnable combiner, Executor executor) { 984 return call( 985 new Callable<Void>() { 986 @Override 987 public Void call() throws Exception { 988 combiner.run(); 989 return null; 990 } 991 }, 992 executor); 993 } 994 } 995 996 /** 997 * Returns a {@code ListenableFuture} whose result is set from the supplied future when it 998 * completes. Cancelling the supplied future will also cancel the returned future, but cancelling 999 * the returned future will have no effect on the supplied future. 1000 * 1001 * @since 15.0 1002 */ 1003 public static <V> ListenableFuture<V> nonCancellationPropagating(ListenableFuture<V> future) { 1004 if (future.isDone()) { 1005 return future; 1006 } 1007 NonCancellationPropagatingFuture<V> output = new NonCancellationPropagatingFuture<>(future); 1008 future.addListener(output, directExecutor()); 1009 return output; 1010 } 1011 1012 /** A wrapped future that does not propagate cancellation to its delegate. */ 1013 private static final class NonCancellationPropagatingFuture<V> 1014 extends AbstractFuture.TrustedFuture<V> implements Runnable { 1015 private ListenableFuture<V> delegate; 1016 1017 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 1018 this.delegate = delegate; 1019 } 1020 1021 @Override 1022 public void run() { 1023 // This prevents cancellation from propagating because we don't assign delegate until 1024 // delegate is already done, so calling cancel() on it is a no-op. 1025 ListenableFuture<V> localDelegate = delegate; 1026 if (localDelegate != null) { 1027 setFuture(localDelegate); 1028 } 1029 } 1030 1031 @Override 1032 protected String pendingToString() { 1033 ListenableFuture<V> localDelegate = delegate; 1034 if (localDelegate != null) { 1035 return "delegate=[" + localDelegate + "]"; 1036 } 1037 return null; 1038 } 1039 1040 @Override 1041 protected void afterDone() { 1042 delegate = null; 1043 } 1044 } 1045 1046 /** 1047 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 1048 * successful input futures. The list of results is in the same order as the input list, and if 1049 * any of the provided futures fails or is canceled, its corresponding position will contain 1050 * {@code null} (which is indistinguishable from the future having a successful value of {@code 1051 * null}). 1052 * 1053 * <p>Canceling this future will attempt to cancel all the component futures. 1054 * 1055 * @param futures futures to combine 1056 * @return a future that provides a list of the results of the component futures 1057 * @since 10.0 1058 */ 1059 @Beta 1060 @SafeVarargs 1061 public static <V> ListenableFuture<List<V>> successfulAsList( 1062 ListenableFuture<? extends V>... futures) { 1063 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1064 } 1065 1066 /** 1067 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 1068 * successful input futures. The list of results is in the same order as the input list, and if 1069 * any of the provided futures fails or is canceled, its corresponding position will contain 1070 * {@code null} (which is indistinguishable from the future having a successful value of {@code 1071 * null}). 1072 * 1073 * <p>Canceling this future will attempt to cancel all the component futures. 1074 * 1075 * @param futures futures to combine 1076 * @return a future that provides a list of the results of the component futures 1077 * @since 10.0 1078 */ 1079 @Beta 1080 public static <V> ListenableFuture<List<V>> successfulAsList( 1081 Iterable<? extends ListenableFuture<? extends V>> futures) { 1082 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1083 } 1084 1085 /** 1086 * Returns a list of delegate futures that correspond to the futures received in the order that 1087 * they complete. Delegate futures return the same value or throw the same exception as the 1088 * corresponding input future returns/throws. 1089 * 1090 * <p>"In the order that they complete" means, for practical purposes, about what you would 1091 * expect, but there are some subtleties. First, we do guarantee that, if the output future at 1092 * index n is done, the output future at index n-1 is also done. (But as usual with futures, some 1093 * listeners for future n may complete before some for future n-1.) However, it is possible, if 1094 * one input completes with result X and another later with result Y, for Y to come before X in 1095 * the output future list. (Such races are impossible to solve without global synchronization of 1096 * all future completions. And they should have little practical impact.) 1097 * 1098 * <p>Cancelling a delegate future propagates to input futures once all the delegates complete, 1099 * either from cancellation or because an input future has completed. If N futures are passed in, 1100 * and M delegates are cancelled, the remaining M input futures will be cancelled once N - M of 1101 * the input futures complete. If all the delegates are cancelled, all the input futures will be 1102 * too. 1103 * 1104 * @since 17.0 1105 */ 1106 @Beta 1107 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder( 1108 Iterable<? extends ListenableFuture<? extends T>> futures) { 1109 // Can't use Iterables.toArray because it's not gwt compatible 1110 final Collection<ListenableFuture<? extends T>> collection; 1111 if (futures instanceof Collection) { 1112 collection = (Collection<ListenableFuture<? extends T>>) futures; 1113 } else { 1114 collection = ImmutableList.copyOf(futures); 1115 } 1116 @SuppressWarnings("unchecked") 1117 ListenableFuture<? extends T>[] copy = 1118 (ListenableFuture<? extends T>[]) 1119 collection.toArray(new ListenableFuture[collection.size()]); 1120 final InCompletionOrderState<T> state = new InCompletionOrderState<>(copy); 1121 ImmutableList.Builder<AbstractFuture<T>> delegatesBuilder = ImmutableList.builder(); 1122 for (int i = 0; i < copy.length; i++) { 1123 delegatesBuilder.add(new InCompletionOrderFuture<T>(state)); 1124 } 1125 1126 final ImmutableList<AbstractFuture<T>> delegates = delegatesBuilder.build(); 1127 for (int i = 0; i < copy.length; i++) { 1128 final int localI = i; 1129 copy[i].addListener( 1130 new Runnable() { 1131 @Override 1132 public void run() { 1133 state.recordInputCompletion(delegates, localI); 1134 } 1135 }, 1136 directExecutor()); 1137 } 1138 1139 @SuppressWarnings("unchecked") 1140 ImmutableList<ListenableFuture<T>> delegatesCast = (ImmutableList) delegates; 1141 return delegatesCast; 1142 } 1143 1144 // This can't be a TrustedFuture, because TrustedFuture has clever optimizations that 1145 // mean cancel won't be called if this Future is passed into setFuture, and then 1146 // cancelled. 1147 private static final class InCompletionOrderFuture<T> extends AbstractFuture<T> { 1148 private InCompletionOrderState<T> state; 1149 1150 private InCompletionOrderFuture(InCompletionOrderState<T> state) { 1151 this.state = state; 1152 } 1153 1154 @Override 1155 public boolean cancel(boolean interruptIfRunning) { 1156 InCompletionOrderState<T> localState = state; 1157 if (super.cancel(interruptIfRunning)) { 1158 localState.recordOutputCancellation(interruptIfRunning); 1159 return true; 1160 } 1161 return false; 1162 } 1163 1164 @Override 1165 protected void afterDone() { 1166 state = null; 1167 } 1168 1169 @Override 1170 protected String pendingToString() { 1171 InCompletionOrderState<T> localState = state; 1172 if (localState != null) { 1173 // Don't print the actual array! We don't want inCompletionOrder(list).toString() to have 1174 // quadratic output. 1175 return "inputCount=[" 1176 + localState.inputFutures.length 1177 + "], remaining=[" 1178 + localState.incompleteOutputCount.get() 1179 + "]"; 1180 } 1181 return null; 1182 } 1183 } 1184 1185 private static final class InCompletionOrderState<T> { 1186 // A happens-before edge between the writes of these fields and their reads exists, because 1187 // in order to read these fields, the corresponding write to incompleteOutputCount must have 1188 // been read. 1189 private boolean wasCancelled = false; 1190 private boolean shouldInterrupt = true; 1191 private final AtomicInteger incompleteOutputCount; 1192 private final ListenableFuture<? extends T>[] inputFutures; 1193 private volatile int delegateIndex = 0; 1194 1195 private InCompletionOrderState(ListenableFuture<? extends T>[] inputFutures) { 1196 this.inputFutures = inputFutures; 1197 incompleteOutputCount = new AtomicInteger(inputFutures.length); 1198 } 1199 1200 private void recordOutputCancellation(boolean interruptIfRunning) { 1201 wasCancelled = true; 1202 // If all the futures were cancelled with interruption, cancel the input futures 1203 // with interruption; otherwise cancel without 1204 if (!interruptIfRunning) { 1205 shouldInterrupt = false; 1206 } 1207 recordCompletion(); 1208 } 1209 1210 private void recordInputCompletion( 1211 ImmutableList<AbstractFuture<T>> delegates, int inputFutureIndex) { 1212 ListenableFuture<? extends T> inputFuture = inputFutures[inputFutureIndex]; 1213 // Null out our reference to this future, so it can be GCed 1214 inputFutures[inputFutureIndex] = null; 1215 for (int i = delegateIndex; i < delegates.size(); i++) { 1216 if (delegates.get(i).setFuture(inputFuture)) { 1217 recordCompletion(); 1218 // this is technically unnecessary, but should speed up later accesses 1219 delegateIndex = i + 1; 1220 return; 1221 } 1222 } 1223 // If all the delegates were complete, no reason for the next listener to have to 1224 // go through the whole list. Avoids O(n^2) behavior when the entire output list is 1225 // cancelled. 1226 delegateIndex = delegates.size(); 1227 } 1228 1229 private void recordCompletion() { 1230 if (incompleteOutputCount.decrementAndGet() == 0 && wasCancelled) { 1231 for (ListenableFuture<?> toCancel : inputFutures) { 1232 if (toCancel != null) { 1233 toCancel.cancel(shouldInterrupt); 1234 } 1235 } 1236 } 1237 } 1238 } 1239 1240 /** 1241 * Registers separate success and failure callbacks to be run when the {@code Future}'s 1242 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 1243 * computation is already complete, immediately. 1244 * 1245 * <p>There is no guaranteed ordering of execution of callbacks, but any callback added through 1246 * this method is guaranteed to be called once the computation is complete. 1247 * 1248 * <p>Example: 1249 * 1250 * <pre>{@code 1251 * ListenableFuture<QueryResult> future = ...; 1252 * addCallback(future, 1253 * new FutureCallback<QueryResult>() { 1254 * public void onSuccess(QueryResult result) { 1255 * storeInCache(result); 1256 * } 1257 * public void onFailure(Throwable t) { 1258 * reportError(t); 1259 * } 1260 * }); 1261 * }</pre> 1262 * 1263 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 1264 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 1265 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 1266 * also applicable to heavyweight callbacks passed to this method. 1267 * 1268 * <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link 1269 * ListenableFuture#addListener addListener}. 1270 * 1271 * @param future The future attach the callback to. 1272 * @param callback The callback to invoke when {@code future} is completed. 1273 * @since 10.0 1274 * @deprecated Use {@linkplain #addCallback(ListenableFuture, FutureCallback, Executor) the 1275 * overload that requires an executor}. For identical behavior, pass {@link 1276 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 1277 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1278 * documentation. This method is scheduled to be removed in July 2018. 1279 */ 1280 @Deprecated 1281 @DoNotCall 1282 public static <V> void addCallback( 1283 ListenableFuture<V> future, FutureCallback<? super V> callback) { 1284 addCallback(future, callback, directExecutor()); 1285 } 1286 1287 /** 1288 * Registers separate success and failure callbacks to be run when the {@code Future}'s 1289 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 1290 * computation is already complete, immediately. 1291 * 1292 * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of 1293 * callbacks, but any callback added through this method is guaranteed to be called once the 1294 * computation is complete. 1295 * 1296 * <p>Example: 1297 * 1298 * <pre>{@code 1299 * ListenableFuture<QueryResult> future = ...; 1300 * Executor e = ... 1301 * addCallback(future, 1302 * new FutureCallback<QueryResult>() { 1303 * public void onSuccess(QueryResult result) { 1304 * storeInCache(result); 1305 * } 1306 * public void onFailure(Throwable t) { 1307 * reportError(t); 1308 * } 1309 * }, e); 1310 * }</pre> 1311 * 1312 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 1313 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1314 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 1315 * callbacks passed to this method. 1316 * 1317 * <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link 1318 * ListenableFuture#addListener addListener}. 1319 * 1320 * @param future The future attach the callback to. 1321 * @param callback The callback to invoke when {@code future} is completed. 1322 * @param executor The executor to run {@code callback} when the future completes. 1323 * @since 10.0 1324 */ 1325 public static <V> void addCallback( 1326 final ListenableFuture<V> future, 1327 final FutureCallback<? super V> callback, 1328 Executor executor) { 1329 Preconditions.checkNotNull(callback); 1330 future.addListener(new CallbackListener<V>(future, callback), executor); 1331 } 1332 1333 /** See {@link #addCallback(ListenableFuture, FutureCallback, Executor)} for behavioral notes. */ 1334 private static final class CallbackListener<V> implements Runnable { 1335 final Future<V> future; 1336 final FutureCallback<? super V> callback; 1337 1338 CallbackListener(Future<V> future, FutureCallback<? super V> callback) { 1339 this.future = future; 1340 this.callback = callback; 1341 } 1342 1343 @Override 1344 public void run() { 1345 final V value; 1346 try { 1347 value = getDone(future); 1348 } catch (ExecutionException e) { 1349 callback.onFailure(e.getCause()); 1350 return; 1351 } catch (RuntimeException | Error e) { 1352 callback.onFailure(e); 1353 return; 1354 } 1355 callback.onSuccess(value); 1356 } 1357 1358 @Override 1359 public String toString() { 1360 return MoreObjects.toStringHelper(this).addValue(callback).toString(); 1361 } 1362 } 1363 1364 /** 1365 * Returns the result of the input {@code Future}, which must have already completed. 1366 * 1367 * <p>The benefits of this method are twofold. First, the name "getDone" suggests to readers that 1368 * the {@code Future} is already done. Second, if buggy code calls {@code getDone} on a {@code 1369 * Future} that is still pending, the program will throw instead of block. This can be important 1370 * for APIs like {@link #whenAllComplete whenAllComplete(...)}{@code .}{@link 1371 * FutureCombiner#call(Callable, Executor) call(...)}, where it is easy to use a new input from 1372 * the {@code call} implementation but forget to add it to the arguments of {@code 1373 * whenAllComplete}. 1374 * 1375 * <p>If you are looking for a method to determine whether a given {@code Future} is done, use the 1376 * instance method {@link Future#isDone()}. 1377 * 1378 * @throws ExecutionException if the {@code Future} failed with an exception 1379 * @throws CancellationException if the {@code Future} was cancelled 1380 * @throws IllegalStateException if the {@code Future} is not done 1381 * @since 20.0 1382 */ 1383 @CanIgnoreReturnValue 1384 // TODO(cpovirk): Consider calling getDone() in our own code. 1385 public static <V> V getDone(Future<V> future) throws ExecutionException { 1386 /* 1387 * We throw IllegalStateException, since the call could succeed later. Perhaps we "should" throw 1388 * IllegalArgumentException, since the call could succeed with a different argument. Those 1389 * exceptions' docs suggest that either is acceptable. Google's Java Practices page recommends 1390 * IllegalArgumentException here, in part to keep its recommendation simple: Static methods 1391 * should throw IllegalStateException only when they use static state. 1392 * 1393 * 1394 * Why do we deviate here? The answer: We want for fluentFuture.getDone() to throw the same 1395 * exception as Futures.getDone(fluentFuture). 1396 */ 1397 checkState(future.isDone(), "Future was expected to be done: %s", future); 1398 return getUninterruptibly(future); 1399 } 1400 1401 /** 1402 * Returns the result of {@link Future#get()}, converting most exceptions to a new instance of the 1403 * given checked exception type. This reduces boilerplate for a common use of {@code Future} in 1404 * which it is unnecessary to programmatically distinguish between exception types or to extract 1405 * other information from the exception instance. 1406 * 1407 * <p>Exceptions from {@code Future.get} are treated as follows: 1408 * 1409 * <ul> 1410 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@code X} if the cause 1411 * is a checked exception, an {@link UncheckedExecutionException} if the cause is a {@code 1412 * RuntimeException}, or an {@link ExecutionError} if the cause is an {@code Error}. 1413 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after restoring the 1414 * interrupt). 1415 * <li>Any {@link CancellationException} is propagated untouched, as is any other {@link 1416 * RuntimeException} (though {@code get} implementations are discouraged from throwing such 1417 * exceptions). 1418 * </ul> 1419 * 1420 * <p>The overall principle is to continue to treat every checked exception as a checked 1421 * exception, every unchecked exception as an unchecked exception, and every error as an error. In 1422 * addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the 1423 * new stack trace matches that of the current thread. 1424 * 1425 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor 1426 * that accepts zero or more arguments, all of type {@code String} or {@code Throwable} 1427 * (preferring constructors with at least one {@code String}) and calling the constructor via 1428 * reflection. If the exception did not already have a cause, one is set by calling {@link 1429 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code 1430 * IllegalArgumentException} is thrown. 1431 * 1432 * @throws X if {@code get} throws any checked exception except for an {@code ExecutionException} 1433 * whose cause is not itself a checked exception 1434 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a 1435 * {@code RuntimeException} as its cause 1436 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1437 * Error} as its cause 1438 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1439 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or 1440 * does not have a suitable constructor 1441 * @since 19.0 (in 10.0 as {@code get}) 1442 */ 1443 @CanIgnoreReturnValue 1444 @GwtIncompatible // reflection 1445 public static <V, X extends Exception> V getChecked(Future<V> future, Class<X> exceptionClass) 1446 throws X { 1447 return FuturesGetChecked.getChecked(future, exceptionClass); 1448 } 1449 1450 /** 1451 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most exceptions to a new 1452 * instance of the given checked exception type. This reduces boilerplate for a common use of 1453 * {@code Future} in which it is unnecessary to programmatically distinguish between exception 1454 * types or to extract other information from the exception instance. 1455 * 1456 * <p>Exceptions from {@code Future.get} are treated as follows: 1457 * 1458 * <ul> 1459 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@code X} if the cause 1460 * is a checked exception, an {@link UncheckedExecutionException} if the cause is a {@code 1461 * RuntimeException}, or an {@link ExecutionError} if the cause is an {@code Error}. 1462 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after restoring the 1463 * interrupt). 1464 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1465 * <li>Any {@link CancellationException} is propagated untouched, as is any other {@link 1466 * RuntimeException} (though {@code get} implementations are discouraged from throwing such 1467 * exceptions). 1468 * </ul> 1469 * 1470 * <p>The overall principle is to continue to treat every checked exception as a checked 1471 * exception, every unchecked exception as an unchecked exception, and every error as an error. In 1472 * addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the 1473 * new stack trace matches that of the current thread. 1474 * 1475 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor 1476 * that accepts zero or more arguments, all of type {@code String} or {@code Throwable} 1477 * (preferring constructors with at least one {@code String}) and calling the constructor via 1478 * reflection. If the exception did not already have a cause, one is set by calling {@link 1479 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code 1480 * IllegalArgumentException} is thrown. 1481 * 1482 * @throws X if {@code get} throws any checked exception except for an {@code ExecutionException} 1483 * whose cause is not itself a checked exception 1484 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a 1485 * {@code RuntimeException} as its cause 1486 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1487 * Error} as its cause 1488 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1489 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or 1490 * does not have a suitable constructor 1491 * @since 19.0 (in 10.0 as {@code get} and with different parameter order) 1492 */ 1493 @CanIgnoreReturnValue 1494 @GwtIncompatible // reflection 1495 public static <V, X extends Exception> V getChecked( 1496 Future<V> future, Class<X> exceptionClass, long timeout, TimeUnit unit) throws X { 1497 return FuturesGetChecked.getChecked(future, exceptionClass, timeout, unit); 1498 } 1499 1500 /** 1501 * Returns the result of calling {@link Future#get()} uninterruptibly on a task known not to throw 1502 * a checked exception. This makes {@code Future} more suitable for lightweight, fast-running 1503 * tasks that, barring bugs in the code, will not fail. This gives it exception-handling behavior 1504 * similar to that of {@code ForkJoinTask.join}. 1505 * 1506 * <p>Exceptions from {@code Future.get} are treated as follows: 1507 * 1508 * <ul> 1509 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@link 1510 * UncheckedExecutionException} (if the cause is an {@code Exception}) or {@link 1511 * ExecutionError} (if the cause is an {@code Error}). 1512 * <li>Any {@link InterruptedException} causes a retry of the {@code get} call. The interrupt is 1513 * restored before {@code getUnchecked} returns. 1514 * <li>Any {@link CancellationException} is propagated untouched. So is any other {@link 1515 * RuntimeException} ({@code get} implementations are discouraged from throwing such 1516 * exceptions). 1517 * </ul> 1518 * 1519 * <p>The overall principle is to eliminate all checked exceptions: to loop to avoid {@code 1520 * InterruptedException}, to pass through {@code CancellationException}, and to wrap any exception 1521 * from the underlying computation in an {@code UncheckedExecutionException} or {@code 1522 * ExecutionError}. 1523 * 1524 * <p>For an uninterruptible {@code get} that preserves other exceptions, see {@link 1525 * Uninterruptibles#getUninterruptibly(Future)}. 1526 * 1527 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with an 1528 * {@code Exception} as its cause 1529 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1530 * Error} as its cause 1531 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1532 * @since 10.0 1533 */ 1534 @CanIgnoreReturnValue 1535 public static <V> V getUnchecked(Future<V> future) { 1536 checkNotNull(future); 1537 try { 1538 return getUninterruptibly(future); 1539 } catch (ExecutionException e) { 1540 wrapAndThrowUnchecked(e.getCause()); 1541 throw new AssertionError(); 1542 } 1543 } 1544 1545 private static void wrapAndThrowUnchecked(Throwable cause) { 1546 if (cause instanceof Error) { 1547 throw new ExecutionError((Error) cause); 1548 } 1549 /* 1550 * It's an Exception. (Or it's a non-Error, non-Exception Throwable. From my survey of such 1551 * classes, I believe that most users intended to extend Exception, so we'll treat it like an 1552 * Exception.) 1553 */ 1554 throw new UncheckedExecutionException(cause); 1555 } 1556 1557 /* 1558 * Arguably we don't need a timed getUnchecked because any operation slow enough to require a 1559 * timeout is heavyweight enough to throw a checked exception and therefore be inappropriate to 1560 * use with getUnchecked. Further, it's not clear that converting the checked TimeoutException to 1561 * a RuntimeException -- especially to an UncheckedExecutionException, since it wasn't thrown by 1562 * the computation -- makes sense, and if we don't convert it, the user still has to write a 1563 * try-catch block. 1564 * 1565 * If you think you would use this method, let us know. You might also also look into the 1566 * Fork-Join framework: http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html 1567 */ 1568 1569 /** 1570 * A checked future that uses a function to map from exceptions to the appropriate checked type. 1571 */ 1572 @GwtIncompatible // TODO 1573 private static class MappingCheckedFuture<V, X extends Exception> 1574 extends AbstractCheckedFuture<V, X> { 1575 1576 final Function<? super Exception, X> mapper; 1577 1578 MappingCheckedFuture(ListenableFuture<V> delegate, Function<? super Exception, X> mapper) { 1579 super(delegate); 1580 1581 this.mapper = checkNotNull(mapper); 1582 } 1583 1584 @Override 1585 protected X mapException(Exception e) { 1586 return mapper.apply(e); 1587 } 1588 } 1589}