001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Preconditions.checkState; 019import static com.google.common.util.concurrent.Futures.catchingAsync; 020import static com.google.common.util.concurrent.Futures.transformAsync; 021import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 022import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 023 024import com.google.common.annotations.Beta; 025import com.google.common.annotations.GwtCompatible; 026import com.google.common.annotations.GwtIncompatible; 027import com.google.common.base.Function; 028import com.google.common.base.MoreObjects; 029import com.google.common.base.Preconditions; 030import com.google.common.collect.ImmutableList; 031import com.google.common.util.concurrent.CollectionFuture.ListFuture; 032import com.google.common.util.concurrent.ImmediateFuture.ImmediateCancelledFuture; 033import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedCheckedFuture; 034import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedFuture; 035import com.google.common.util.concurrent.ImmediateFuture.ImmediateSuccessfulCheckedFuture; 036import com.google.common.util.concurrent.ImmediateFuture.ImmediateSuccessfulFuture; 037import com.google.errorprone.annotations.CanIgnoreReturnValue; 038import java.util.Collection; 039import java.util.List; 040import java.util.concurrent.Callable; 041import java.util.concurrent.CancellationException; 042import java.util.concurrent.ExecutionException; 043import java.util.concurrent.Executor; 044import java.util.concurrent.Future; 045import java.util.concurrent.ScheduledExecutorService; 046import java.util.concurrent.TimeUnit; 047import java.util.concurrent.TimeoutException; 048import java.util.concurrent.atomic.AtomicInteger; 049import javax.annotation.Nullable; 050 051/** 052 * Static utility methods pertaining to the {@link Future} interface. 053 * 054 * <p>Many of these methods use the {@link ListenableFuture} API; consult the Guava User Guide 055 * article on <a href="https://github.com/google/guava/wiki/ListenableFutureExplained">{@code 056 * ListenableFuture}</a>. 057 * 058 * <p>The main purpose of {@code ListenableFuture} is to help you chain together a graph of 059 * asynchronous operations. You can chain them together manually with calls to methods like {@link 060 * Futures#transform(ListenableFuture, Function, Executor) Futures.transform}, but you will often 061 * find it easier to use a framework. Frameworks automate the process, often adding features like 062 * monitoring, debugging, and cancellation. Examples of frameworks include: 063 * 064 * <ul> 065 * <li><a href="http://google.github.io/dagger/producers.html">Dagger Producers</a> 066 * </ul> 067 * 068 * <p>If you do chain your operations manually, you may want to use {@link FluentFuture}. 069 * 070 * @author Kevin Bourrillion 071 * @author Nishant Thakkar 072 * @author Sven Mawson 073 * @since 1.0 074 */ 075@Beta 076@GwtCompatible(emulated = true) 077public final class Futures extends GwtFuturesCatchingSpecialization { 078 079 // A note on memory visibility. 080 // Many of the utilities in this class (transform, withFallback, withTimeout, asList, combine) 081 // have two requirements that significantly complicate their design. 082 // 1. Cancellation should propagate from the returned future to the input future(s). 083 // 2. The returned futures shouldn't unnecessarily 'pin' their inputs after completion. 084 // 085 // A consequence of these requirements is that the delegate futures cannot be stored in 086 // final fields. 087 // 088 // For simplicity the rest of this description will discuss Futures.catching since it is the 089 // simplest instance, though very similar descriptions apply to many other classes in this file. 090 // 091 // In the constructor of AbstractCatchingFuture, the delegate future is assigned to a field 092 // 'inputFuture'. That field is non-final and non-volatile. There are 2 places where the 093 // 'inputFuture' field is read and where we will have to consider visibility of the write 094 // operation in the constructor. 095 // 096 // 1. In the listener that performs the callback. In this case it is fine since inputFuture is 097 // assigned prior to calling addListener, and addListener happens-before any invocation of the 098 // listener. Notably, this means that 'volatile' is unnecessary to make 'inputFuture' visible 099 // to the listener. 100 // 101 // 2. In done() where we may propagate cancellation to the input. In this case it is _not_ fine. 102 // There is currently nothing that enforces that the write to inputFuture in the constructor is 103 // visible to done(). This is because there is no happens before edge between the write and a 104 // (hypothetical) unsafe read by our caller. Note: adding 'volatile' does not fix this issue, 105 // it would just add an edge such that if done() observed non-null, then it would also 106 // definitely observe all earlier writes, but we still have no guarantee that done() would see 107 // the inital write (just stronger guarantees if it does). 108 // 109 // See: http://cs.oswego.edu/pipermail/concurrency-interest/2015-January/013800.html 110 // For a (long) discussion about this specific issue and the general futility of life. 111 // 112 // For the time being we are OK with the problem discussed above since it requires a caller to 113 // introduce a very specific kind of data-race. And given the other operations performed by these 114 // methods that involve volatile read/write operations, in practice there is no issue. Also, the 115 // way in such a visibility issue would surface is most likely as a failure of cancel() to 116 // propagate to the input. Cancellation propagation is fundamentally racy so this is fine. 117 // 118 // Future versions of the JMM may revise safe construction semantics in such a way that we can 119 // safely publish these objects and we won't need this whole discussion. 120 // TODO(user,lukes): consider adding volatile to all these fields since in current known JVMs 121 // that should resolve the issue. This comes at the cost of adding more write barriers to the 122 // implementations. 123 124 private Futures() {} 125 126 /** 127 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} and a {@link Function} 128 * that maps from {@link Exception} instances into the appropriate checked type. 129 * 130 * <p><b>Warning:</b> We recommend against using {@code CheckedFuture} in new projects. {@code 131 * CheckedFuture} is difficult to build libraries atop. {@code CheckedFuture} ports of methods 132 * like {@link Futures#transformAsync} have historically had bugs, and some of these bugs are 133 * necessary, unavoidable consequences of the {@code CheckedFuture} API. Additionally, {@code 134 * CheckedFuture} encourages users to take exceptions from one thread and rethrow them in another, 135 * producing confusing stack traces. 136 * 137 * <p>The given mapping function will be applied to an {@link InterruptedException}, a {@link 138 * CancellationException}, or an {@link ExecutionException}. See {@link Future#get()} for details 139 * on the exceptions thrown. 140 * 141 * @since 9.0 (source-compatible since 1.0) 142 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 143 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 144 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 145 * Additionally, it has a surprising policy about which exceptions to map and which to leave 146 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 147 * use, possibly specializing them to the particular exception type they use. We recommend 148 * that most people use {@code ListenableFuture} and perform any exception wrapping 149 * themselves. This method is scheduled for removal from Guava in February 2018. 150 */ 151 @Deprecated 152 @GwtIncompatible // TODO 153 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 154 ListenableFuture<V> future, Function<? super Exception, X> mapper) { 155 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 156 } 157 158 /** 159 * Creates a {@code ListenableFuture} which has its value set immediately upon construction. The 160 * getters just return the value. This {@code Future} can't be canceled or timed out and its 161 * {@code isDone()} method always returns {@code true}. 162 */ 163 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 164 if (value == null) { 165 // This cast is safe because null is assignable to V for all V (i.e. it is covariant) 166 @SuppressWarnings({"unchecked", "rawtypes"}) 167 ListenableFuture<V> typedNull = (ListenableFuture) ImmediateSuccessfulFuture.NULL; 168 return typedNull; 169 } 170 return new ImmediateSuccessfulFuture<V>(value); 171 } 172 173 /** 174 * Returns a {@code CheckedFuture} which has its value set immediately upon construction. 175 * 176 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 177 * returns {@code true}. Calling {@code get()} or {@code checkedGet()} will immediately return the 178 * provided value. 179 * 180 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 181 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 182 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 183 * Additionally, it has a surprising policy about which exceptions to map and which to leave 184 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 185 * use, possibly specializing them to the particular exception type they use. We recommend 186 * that most people use {@code ListenableFuture} and perform any exception wrapping 187 * themselves. This method is scheduled for removal from Guava in February 2018. 188 */ 189 @Deprecated 190 @GwtIncompatible // TODO 191 public static <V, X extends Exception> CheckedFuture<V, X> immediateCheckedFuture( 192 @Nullable V value) { 193 return new ImmediateSuccessfulCheckedFuture<V, X>(value); 194 } 195 196 /** 197 * Returns a {@code ListenableFuture} which has an exception set immediately upon construction. 198 * 199 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 200 * returns {@code true}. Calling {@code get()} will immediately throw the provided {@code 201 * Throwable} wrapped in an {@code ExecutionException}. 202 */ 203 public static <V> ListenableFuture<V> immediateFailedFuture(Throwable throwable) { 204 checkNotNull(throwable); 205 return new ImmediateFailedFuture<V>(throwable); 206 } 207 208 /** 209 * Creates a {@code ListenableFuture} which is cancelled immediately upon construction, so that 210 * {@code isCancelled()} always returns {@code true}. 211 * 212 * @since 14.0 213 */ 214 public static <V> ListenableFuture<V> immediateCancelledFuture() { 215 return new ImmediateCancelledFuture<V>(); 216 } 217 218 /** 219 * Returns a {@code CheckedFuture} which has an exception set immediately upon construction. 220 * 221 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 222 * returns {@code true}. Calling {@code get()} will immediately throw the provided {@code 223 * Exception} wrapped in an {@code ExecutionException}, and calling {@code checkedGet()} will 224 * throw the provided exception itself. 225 * 226 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 227 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 228 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 229 * Additionally, it has a surprising policy about which exceptions to map and which to leave 230 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 231 * use, possibly specializing them to the particular exception type they use. We recommend 232 * that most people use {@code ListenableFuture} and perform any exception wrapping 233 * themselves. This method is scheduled for removal from Guava in February 2018. 234 */ 235 @Deprecated 236 @GwtIncompatible // TODO 237 public static <V, X extends Exception> CheckedFuture<V, X> immediateFailedCheckedFuture( 238 X exception) { 239 checkNotNull(exception); 240 return new ImmediateFailedCheckedFuture<V, X>(exception); 241 } 242 243 /** 244 * Executes {@code callable} on the specified {@code executor}, returning a {@code Future}. 245 * 246 * @throws RejectedExecutionException if the task cannot be scheduled for execution 247 * @since 23.0 248 */ 249 public static <O> ListenableFuture<O> submitAsync(AsyncCallable<O> callable, Executor executor) { 250 TrustedListenableFutureTask<O> task = TrustedListenableFutureTask.create(callable); 251 executor.execute(task); 252 return task; 253 } 254 255 /** 256 * Schedules {@code callable} on the specified {@code executor}, returning a {@code Future}. 257 * 258 * @throws RejectedExecutionException if the task cannot be scheduled for execution 259 * @since 23.0 260 */ 261 @GwtIncompatible // java.util.concurrent.ScheduledExecutorService 262 public static <O> ListenableFuture<O> scheduleAsync( 263 AsyncCallable<O> callable, 264 long delay, 265 TimeUnit timeUnit, 266 ScheduledExecutorService executorService) { 267 TrustedListenableFutureTask<O> task = TrustedListenableFutureTask.create(callable); 268 final Future<?> scheduled = executorService.schedule(task, delay, timeUnit); 269 task.addListener( 270 new Runnable() { 271 @Override 272 public void run() { 273 // Don't want to interrupt twice 274 scheduled.cancel(false); 275 } 276 }, 277 directExecutor()); 278 return task; 279 } 280 281 /** 282 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 283 * primary input fails with the given {@code exceptionType}, from the result provided by the 284 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 285 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 286 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 287 * Future}. 288 * 289 * <p>Usage example: 290 * 291 * <pre>{@code 292 * ListenableFuture<Integer> fetchCounterFuture = ...; 293 * 294 * // Falling back to a zero counter in case an exception happens when 295 * // processing the RPC to fetch counters. 296 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 297 * fetchCounterFuture, FetchException.class, x -> 0); 298 * }</pre> 299 * 300 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 301 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 302 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 303 * also applicable to heavyweight functions passed to this method. 304 * 305 * @param input the primary input {@code Future} 306 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 307 * type is matched against the input's exception. "The input's exception" means the cause of 308 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 309 * different kind of exception, that exception itself. To avoid hiding bugs and other 310 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 311 * Throwable.class} in particular. 312 * @param fallback the {@link Function} to be called if {@code input} fails with the expected 313 * exception type. The function's argument is the input's exception. "The input's exception" 314 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 315 * {@code get()} throws a different kind of exception, that exception itself. 316 * @since 19.0 317 * @deprecated Use {@linkplain #catching(ListenableFuture, Class, Function, Executor) the overload 318 * that requires an executor}. For identical behavior, pass {@link 319 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 320 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 321 * documentation. This method is scheduled to be removed in April 2018. 322 */ 323 @Deprecated 324 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 325 public static <V, X extends Throwable> ListenableFuture<V> catching( 326 ListenableFuture<? extends V> input, 327 Class<X> exceptionType, 328 Function<? super X, ? extends V> fallback) { 329 return AbstractCatchingFuture.create(input, exceptionType, fallback); 330 } 331 332 /** 333 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 334 * primary input fails with the given {@code exceptionType}, from the result provided by the 335 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 336 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 337 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 338 * Future}. 339 * 340 * <p>Usage example: 341 * 342 * <pre>{@code 343 * ListenableFuture<Integer> fetchCounterFuture = ...; 344 * 345 * // Falling back to a zero counter in case an exception happens when 346 * // processing the RPC to fetch counters. 347 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 348 * fetchCounterFuture, FetchException.class, x -> 0, directExecutor()); 349 * }</pre> 350 * 351 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 352 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 353 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 354 * functions passed to this method. 355 * 356 * @param input the primary input {@code Future} 357 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 358 * type is matched against the input's exception. "The input's exception" means the cause of 359 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 360 * different kind of exception, that exception itself. To avoid hiding bugs and other 361 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 362 * Throwable.class} in particular. 363 * @param fallback the {@link Function} to be called if {@code input} fails with the expected 364 * exception type. The function's argument is the input's exception. "The input's exception" 365 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 366 * {@code get()} throws a different kind of exception, that exception itself. 367 * @param executor the executor that runs {@code fallback} if {@code input} fails 368 * @since 19.0 369 */ 370 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 371 public static <V, X extends Throwable> ListenableFuture<V> catching( 372 ListenableFuture<? extends V> input, 373 Class<X> exceptionType, 374 Function<? super X, ? extends V> fallback, 375 Executor executor) { 376 return AbstractCatchingFuture.create(input, exceptionType, fallback, executor); 377 } 378 379 /** 380 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 381 * primary input fails with the given {@code exceptionType}, from the result provided by the 382 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 383 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 384 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 385 * {@code Future}. 386 * 387 * <p>Usage examples: 388 * 389 * <pre>{@code 390 * ListenableFuture<Integer> fetchCounterFuture = ...; 391 * 392 * // Falling back to a zero counter in case an exception happens when 393 * // processing the RPC to fetch counters. 394 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 395 * fetchCounterFuture, FetchException.class, x -> immediateFuture(0)); 396 * }</pre> 397 * 398 * <p>The fallback can also choose to propagate the original exception when desired: 399 * 400 * <pre>{@code 401 * ListenableFuture<Integer> fetchCounterFuture = ...; 402 * 403 * // Falling back to a zero counter only in case the exception was a 404 * // TimeoutException. 405 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 406 * fetchCounterFuture, 407 * FetchException.class, 408 * e -> { 409 * if (omitDataOnFetchFailure) { 410 * return immediateFuture(0); 411 * } 412 * throw e; 413 * }); 414 * }</pre> 415 * 416 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 417 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 418 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 419 * also applicable to heavyweight functions passed to this method. (Specifically, {@code 420 * directExecutor} functions should avoid heavyweight operations inside {@code 421 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 422 * completing the returned {@code Future}.) 423 * 424 * @param input the primary input {@code Future} 425 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 426 * type is matched against the input's exception. "The input's exception" means the cause of 427 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 428 * different kind of exception, that exception itself. To avoid hiding bugs and other 429 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 430 * Throwable.class} in particular. 431 * @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected 432 * exception type. The function's argument is the input's exception. "The input's exception" 433 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 434 * {@code get()} throws a different kind of exception, that exception itself. 435 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 436 * @deprecated Use {@linkplain #catchingAsync(ListenableFuture, Class, AsyncFunction, Executor) 437 * the overload that requires an executor}. For identical behavior, pass {@link 438 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 439 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 440 * documentation. This method is scheduled to be removed in April 2018. 441 */ 442 @CanIgnoreReturnValue // TODO(kak): @CheckReturnValue 443 @Deprecated 444 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 445 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 446 ListenableFuture<? extends V> input, 447 Class<X> exceptionType, 448 AsyncFunction<? super X, ? extends V> fallback) { 449 return AbstractCatchingFuture.create(input, exceptionType, fallback); 450 } 451 452 /** 453 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 454 * primary input fails with the given {@code exceptionType}, from the result provided by the 455 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 456 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 457 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 458 * {@code Future}. 459 * 460 * <p>Usage examples: 461 * 462 * <pre>{@code 463 * ListenableFuture<Integer> fetchCounterFuture = ...; 464 * 465 * // Falling back to a zero counter in case an exception happens when 466 * // processing the RPC to fetch counters. 467 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 468 * fetchCounterFuture, FetchException.class, x -> immediateFuture(0), directExecutor()); 469 * }</pre> 470 * 471 * <p>The fallback can also choose to propagate the original exception when desired: 472 * 473 * <pre>{@code 474 * ListenableFuture<Integer> fetchCounterFuture = ...; 475 * 476 * // Falling back to a zero counter only in case the exception was a 477 * // TimeoutException. 478 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 479 * fetchCounterFuture, 480 * FetchException.class, 481 * e -> { 482 * if (omitDataOnFetchFailure) { 483 * return immediateFuture(0); 484 * } 485 * throw e; 486 * }, 487 * directExecutor()); 488 * }</pre> 489 * 490 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 491 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 492 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 493 * functions passed to this method. (Specifically, {@code directExecutor} functions should avoid 494 * heavyweight operations inside {@code AsyncFunction.apply}. Any heavyweight operations should 495 * occur in other threads responsible for completing the returned {@code Future}.) 496 * 497 * @param input the primary input {@code Future} 498 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 499 * type is matched against the input's exception. "The input's exception" means the cause of 500 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 501 * different kind of exception, that exception itself. To avoid hiding bugs and other 502 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 503 * Throwable.class} in particular. 504 * @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected 505 * exception type. The function's argument is the input's exception. "The input's exception" 506 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 507 * {@code get()} throws a different kind of exception, that exception itself. 508 * @param executor the executor that runs {@code fallback} if {@code input} fails 509 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 510 */ 511 @CanIgnoreReturnValue // TODO(kak): @CheckReturnValue 512 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 513 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 514 ListenableFuture<? extends V> input, 515 Class<X> exceptionType, 516 AsyncFunction<? super X, ? extends V> fallback, 517 Executor executor) { 518 return AbstractCatchingFuture.create(input, exceptionType, fallback, executor); 519 } 520 521 /** 522 * Returns a future that delegates to another but will finish early (via a {@link 523 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified duration expires. 524 * 525 * <p>The delegate future is interrupted and cancelled if it times out. 526 * 527 * @param delegate The future to delegate to. 528 * @param time when to timeout the future 529 * @param unit the time unit of the time parameter 530 * @param scheduledExecutor The executor service to enforce the timeout. 531 * @since 19.0 532 */ 533 @GwtIncompatible // java.util.concurrent.ScheduledExecutorService 534 public static <V> ListenableFuture<V> withTimeout( 535 ListenableFuture<V> delegate, 536 long time, 537 TimeUnit unit, 538 ScheduledExecutorService scheduledExecutor) { 539 return TimeoutFuture.create(delegate, time, unit, scheduledExecutor); 540 } 541 542 /** 543 * Returns a new {@code Future} whose result is asynchronously derived from the result of the 544 * given {@code Future}. If the given {@code Future} fails, the returned {@code Future} fails with 545 * the same exception (and the function is not invoked). 546 * 547 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 548 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 549 * Example usage: 550 * 551 * <pre>{@code 552 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 553 * ListenableFuture<QueryResult> queryFuture = 554 * transformAsync(rowKeyFuture, dataService::readFuture); 555 * }</pre> 556 * 557 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 558 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 559 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 560 * also applicable to heavyweight functions passed to this method. (Specifically, {@code 561 * directExecutor} functions should avoid heavyweight operations inside {@code 562 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 563 * completing the returned {@code Future}.) 564 * 565 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 566 * input future and that of the future returned by the function. That is, if the returned {@code 567 * Future} is cancelled, it will attempt to cancel the other two, and if either of the other two 568 * is cancelled, the returned {@code Future} will receive a callback in which it will attempt to 569 * cancel itself. 570 * 571 * @param input The future to transform 572 * @param function A function to transform the result of the input future to the result of the 573 * output future 574 * @return A future that holds result of the function (if the input succeeded) or the original 575 * input's failure (if not) 576 * @since 19.0 (in 11.0 as {@code transform}) 577 * @deprecated Use {@linkplain #transformAsync(ListenableFuture, AsyncFunction, Executor) the 578 * overload that requires an executor}. For identical behavior, pass {@link 579 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 580 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 581 * documentation. This method is scheduled to be removed in April 2018. 582 */ 583 @Deprecated 584 public static <I, O> ListenableFuture<O> transformAsync( 585 ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) { 586 return AbstractTransformFuture.create(input, function); 587 } 588 589 /** 590 * Returns a new {@code Future} whose result is asynchronously derived from the result of the 591 * given {@code Future}. If the given {@code Future} fails, the returned {@code Future} fails with 592 * the same exception (and the function is not invoked). 593 * 594 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 595 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 596 * Example usage: 597 * 598 * <pre>{@code 599 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 600 * ListenableFuture<QueryResult> queryFuture = 601 * transformAsync(rowKeyFuture, dataService::readFuture, executor); 602 * }</pre> 603 * 604 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 605 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 606 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 607 * functions passed to this method. (Specifically, {@code directExecutor} functions should avoid 608 * heavyweight operations inside {@code AsyncFunction.apply}. Any heavyweight operations should 609 * occur in other threads responsible for completing the returned {@code Future}.) 610 * 611 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 612 * input future and that of the future returned by the chain function. That is, if the returned 613 * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the 614 * other two is cancelled, the returned {@code Future} will receive a callback in which it will 615 * attempt to cancel itself. 616 * 617 * @param input The future to transform 618 * @param function A function to transform the result of the input future to the result of the 619 * output future 620 * @param executor Executor to run the function in. 621 * @return A future that holds result of the function (if the input succeeded) or the original 622 * input's failure (if not) 623 * @since 19.0 (in 11.0 as {@code transform}) 624 */ 625 public static <I, O> ListenableFuture<O> transformAsync( 626 ListenableFuture<I> input, 627 AsyncFunction<? super I, ? extends O> function, 628 Executor executor) { 629 return AbstractTransformFuture.create(input, function, executor); 630 } 631 632 /** 633 * Returns a new {@code Future} whose result is derived from the result of the given {@code 634 * Future}. If {@code input} fails, the returned {@code Future} fails with the same exception (and 635 * the function is not invoked). Example usage: 636 * 637 * <pre>{@code 638 * ListenableFuture<QueryResult> queryFuture = ...; 639 * ListenableFuture<List<Row>> rowsFuture = 640 * transform(queryFuture, QueryResult::getRows); 641 * }</pre> 642 * 643 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 644 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 645 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 646 * also applicable to heavyweight functions passed to this method. 647 * 648 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 649 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 650 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 651 * in which it will attempt to cancel itself. 652 * 653 * <p>An example use of this method is to convert a serializable object returned from an RPC into 654 * a POJO. 655 * 656 * @param input The future to transform 657 * @param function A Function to transform the results of the provided future to the results of 658 * the returned future. This will be run in the thread that notifies input it is complete. 659 * @return A future that holds result of the transformation. 660 * @since 9.0 (in 1.0 as {@code compose}) 661 * @deprecated Use {@linkplain #transform(ListenableFuture, Function, Executor) the overload that 662 * requires an executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, 663 * but consider whether another executor would be safer, as discussed in the {@link 664 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 665 * scheduled to be removed in April 2018. 666 */ 667 @Deprecated 668 public static <I, O> ListenableFuture<O> transform( 669 ListenableFuture<I> input, Function<? super I, ? extends O> function) { 670 return AbstractTransformFuture.create(input, function); 671 } 672 673 /** 674 * Returns a new {@code Future} whose result is derived from the result of the given {@code 675 * Future}. If {@code input} fails, the returned {@code Future} fails with the same exception (and 676 * the function is not invoked). Example usage: 677 * 678 * <pre>{@code 679 * ListenableFuture<QueryResult> queryFuture = ...; 680 * ListenableFuture<List<Row>> rowsFuture = 681 * transform(queryFuture, QueryResult::getRows, executor); 682 * }</pre> 683 * 684 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 685 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 686 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 687 * functions passed to this method. 688 * 689 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 690 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 691 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 692 * in which it will attempt to cancel itself. 693 * 694 * <p>An example use of this method is to convert a serializable object returned from an RPC into 695 * a POJO. 696 * 697 * @param input The future to transform 698 * @param function A Function to transform the results of the provided future to the results of 699 * the returned future. 700 * @param executor Executor to run the function in. 701 * @return A future that holds result of the transformation. 702 * @since 9.0 (in 2.0 as {@code compose}) 703 */ 704 public static <I, O> ListenableFuture<O> transform( 705 ListenableFuture<I> input, Function<? super I, ? extends O> function, Executor executor) { 706 return AbstractTransformFuture.create(input, function, executor); 707 } 708 709 /** 710 * Like {@link #transform(ListenableFuture, Function, Executor)} except that the transformation 711 * {@code function} is invoked on each call to {@link Future#get() get()} on the returned future. 712 * 713 * <p>The returned {@code Future} reflects the input's cancellation state directly, and any 714 * attempt to cancel the returned Future is likewise passed through to the input Future. 715 * 716 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} only apply the timeout 717 * to the execution of the underlying {@code Future}, <em>not</em> to the execution of the 718 * transformation function. 719 * 720 * <p>The primary audience of this method is callers of {@code transform} who don't have a {@code 721 * ListenableFuture} available and do not mind repeated, lazy function evaluation. 722 * 723 * @param input The future to transform 724 * @param function A Function to transform the results of the provided future to the results of 725 * the returned future. 726 * @return A future that returns the result of the transformation. 727 * @since 10.0 728 */ 729 @GwtIncompatible // TODO 730 public static <I, O> Future<O> lazyTransform( 731 final Future<I> input, final Function<? super I, ? extends O> function) { 732 checkNotNull(input); 733 checkNotNull(function); 734 return new Future<O>() { 735 736 @Override 737 public boolean cancel(boolean mayInterruptIfRunning) { 738 return input.cancel(mayInterruptIfRunning); 739 } 740 741 @Override 742 public boolean isCancelled() { 743 return input.isCancelled(); 744 } 745 746 @Override 747 public boolean isDone() { 748 return input.isDone(); 749 } 750 751 @Override 752 public O get() throws InterruptedException, ExecutionException { 753 return applyTransformation(input.get()); 754 } 755 756 @Override 757 public O get(long timeout, TimeUnit unit) 758 throws InterruptedException, ExecutionException, TimeoutException { 759 return applyTransformation(input.get(timeout, unit)); 760 } 761 762 private O applyTransformation(I input) throws ExecutionException { 763 try { 764 return function.apply(input); 765 } catch (Throwable t) { 766 throw new ExecutionException(t); 767 } 768 } 769 }; 770 } 771 772 /** 773 * Returns a new {@code ListenableFuture} whose result is the product of calling {@code get()} on 774 * the {@code Future} nested within the given {@code Future}, effectively chaining the futures one 775 * after the other. Example: 776 * 777 * <pre>{@code 778 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 779 * ListenableFuture<String> dereferenced = dereference(nested); 780 * }</pre> 781 * 782 * <p>Most users will not need this method. To create a {@code Future} that completes with the 783 * result of another {@code Future}, create a {@link SettableFuture}, and call {@link 784 * SettableFuture#setFuture setFuture(otherFuture)} on it. 785 * 786 * <p>{@code dereference} has the same cancellation and execution semantics as {@link 787 * #transformAsync(ListenableFuture, AsyncFunction, Executor)}, in that the returned {@code 788 * Future} attempts to keep its cancellation state in sync with both the input {@code Future} and 789 * the nested {@code Future}. The transformation is very lightweight and therefore takes place in 790 * the same thread (either the thread that called {@code dereference}, or the thread in which the 791 * dereferenced future completes). 792 * 793 * @param nested The nested future to transform. 794 * @return A future that holds result of the inner future. 795 * @since 13.0 796 */ 797 @SuppressWarnings({"rawtypes", "unchecked"}) 798 public static <V> ListenableFuture<V> dereference( 799 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 800 return transformAsync( 801 (ListenableFuture) nested, (AsyncFunction) DEREFERENCER, directExecutor()); 802 } 803 804 /** 805 * Helper {@code Function} for {@link #dereference}. 806 */ 807 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 808 new AsyncFunction<ListenableFuture<Object>, Object>() { 809 @Override 810 public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 811 return input; 812 } 813 }; 814 815 /** 816 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 817 * input futures, if all succeed. If any input fails, the returned future fails immediately. 818 * 819 * <p>The list of results is in the same order as the input list. 820 * 821 * <p>Canceling this future will attempt to cancel all the component futures, and if any of the 822 * provided futures fails or is canceled, this one is, too. 823 * 824 * @param futures futures to combine 825 * @return a future that provides a list of the results of the component futures 826 * @since 10.0 827 */ 828 @Beta 829 @SafeVarargs 830 public static <V> ListenableFuture<List<V>> allAsList(ListenableFuture<? extends V>... futures) { 831 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 832 } 833 834 /** 835 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 836 * input futures, if all succeed. If any input fails, the returned future fails immediately. 837 * 838 * <p>The list of results is in the same order as the input list. 839 * 840 * <p>Canceling this future will attempt to cancel all the component futures, and if any of the 841 * provided futures fails or is canceled, this one is, too. 842 * 843 * @param futures futures to combine 844 * @return a future that provides a list of the results of the component futures 845 * @since 10.0 846 */ 847 @Beta 848 public static <V> ListenableFuture<List<V>> allAsList( 849 Iterable<? extends ListenableFuture<? extends V>> futures) { 850 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 851 } 852 853 /** 854 * Creates a {@link FutureCombiner} that processes the completed futures whether or not they're 855 * successful. 856 * 857 * @since 20.0 858 */ 859 @SafeVarargs 860 public static <V> FutureCombiner<V> whenAllComplete(ListenableFuture<? extends V>... futures) { 861 return new FutureCombiner<V>(false, ImmutableList.copyOf(futures)); 862 } 863 864 /** 865 * Creates a {@link FutureCombiner} that processes the completed futures whether or not they're 866 * successful. 867 * 868 * @since 20.0 869 */ 870 public static <V> FutureCombiner<V> whenAllComplete( 871 Iterable<? extends ListenableFuture<? extends V>> futures) { 872 return new FutureCombiner<V>(false, ImmutableList.copyOf(futures)); 873 } 874 875 /** 876 * Creates a {@link FutureCombiner} requiring that all passed in futures are successful. 877 * 878 * <p>If any input fails, the returned future fails immediately. 879 * 880 * @since 20.0 881 */ 882 @SafeVarargs 883 public static <V> FutureCombiner<V> whenAllSucceed(ListenableFuture<? extends V>... futures) { 884 return new FutureCombiner<V>(true, ImmutableList.copyOf(futures)); 885 } 886 887 /** 888 * Creates a {@link FutureCombiner} requiring that all passed in futures are successful. 889 * 890 * <p>If any input fails, the returned future fails immediately. 891 * 892 * @since 20.0 893 */ 894 public static <V> FutureCombiner<V> whenAllSucceed( 895 Iterable<? extends ListenableFuture<? extends V>> futures) { 896 return new FutureCombiner<V>(true, ImmutableList.copyOf(futures)); 897 } 898 899 /** 900 * A helper to create a new {@code ListenableFuture} whose result is generated from a combination 901 * of input futures. 902 * 903 * <p>See {@link #whenAllComplete} and {@link #whenAllSucceed} for how to instantiate this class. 904 * 905 * <p>Example: 906 * 907 * <pre> {@code 908 * final ListenableFuture<Instant> loginDateFuture = 909 * loginService.findLastLoginDate(username); 910 * final ListenableFuture<List<String>> recentCommandsFuture = 911 * recentCommandsService.findRecentCommands(username); 912 * Callable<UsageHistory> usageComputation = 913 * new Callable<UsageHistory>() { 914 * public UsageHistory call() throws Exception { 915 * return new UsageHistory( 916 * username, loginDateFuture.get(), recentCommandsFuture.get()); 917 * } 918 * }; 919 * ListenableFuture<UsageHistory> usageFuture = 920 * Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture) 921 * .call(usageComputation, executor);}</pre> 922 * 923 * @since 20.0 924 */ 925 @Beta 926 @CanIgnoreReturnValue // TODO(cpovirk): Consider removing, especially if we provide run(Runnable) 927 @GwtCompatible 928 public static final class FutureCombiner<V> { 929 private final boolean allMustSucceed; 930 private final ImmutableList<ListenableFuture<? extends V>> futures; 931 932 private FutureCombiner( 933 boolean allMustSucceed, ImmutableList<ListenableFuture<? extends V>> futures) { 934 this.allMustSucceed = allMustSucceed; 935 this.futures = futures; 936 } 937 938 /** 939 * Creates the {@link ListenableFuture} which will return the result of calling {@link 940 * AsyncCallable#call} in {@code combiner} when all futures complete, using the specified {@code 941 * executor}. 942 * 943 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 944 * cancelled. 945 * 946 * <p>If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code 947 * ExecutionException} will be extracted and returned as the cause of the new {@code 948 * ExecutionException} that gets thrown by the returned combined future. 949 * 950 * <p>Canceling this future will attempt to cancel all the component futures. 951 */ 952 public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner, Executor executor) { 953 return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner); 954 } 955 956 /** 957 * Like {@link #callAsync(AsyncCallable, Executor)} but using {@linkplain 958 * MoreExecutors#directExecutor direct executor}. 959 * 960 * @deprecated Use {@linkplain #callAsync(AsyncCallable, Executor) the overload that requires an 961 * executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, but 962 * consider whether another executor would be safer, as discussed in the {@link 963 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 964 * scheduled to be removed in April 2018. 965 */ 966 @Deprecated 967 public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner) { 968 return callAsync(combiner, directExecutor()); 969 } 970 971 /** 972 * Creates the {@link ListenableFuture} which will return the result of calling {@link 973 * Callable#call} in {@code combiner} when all futures complete, using the specified {@code 974 * executor}. 975 * 976 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 977 * cancelled. 978 * 979 * <p>If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code 980 * ExecutionException} will be extracted and returned as the cause of the new {@code 981 * ExecutionException} that gets thrown by the returned combined future. 982 * 983 * <p>Canceling this future will attempt to cancel all the component futures. 984 */ 985 @CanIgnoreReturnValue 986 public <C> ListenableFuture<C> call(Callable<C> combiner, Executor executor) { 987 return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner); 988 } 989 990 /** 991 * Like {@link #call(Callable, Executor)} but using {@linkplain MoreExecutors#directExecutor 992 * direct executor}. 993 * 994 * @deprecated Use {@linkplain #call(Callable, Executor) the overload that requires an 995 * executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, but 996 * consider whether another executor would be safer, as discussed in the {@link 997 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 998 * scheduled to be removed in April 2018. 999 */ 1000 @CanIgnoreReturnValue 1001 @Deprecated 1002 public <C> ListenableFuture<C> call(Callable<C> combiner) { 1003 return call(combiner, directExecutor()); 1004 } 1005 1006 /* 1007 * TODO(cpovirk): Evaluate demand for a run(Runnable) version. Would it allow us to remove 1008 * @CanIgnoreReturnValue from the call() methods above? 1009 * https://github.com/google/guava/issues/2371 1010 */ 1011 } 1012 1013 /** 1014 * Returns a {@code ListenableFuture} whose result is set from the supplied future when it 1015 * completes. Cancelling the supplied future will also cancel the returned future, but cancelling 1016 * the returned future will have no effect on the supplied future. 1017 * 1018 * @since 15.0 1019 */ 1020 public static <V> ListenableFuture<V> nonCancellationPropagating(ListenableFuture<V> future) { 1021 if (future.isDone()) { 1022 return future; 1023 } 1024 NonCancellationPropagatingFuture<V> output = new NonCancellationPropagatingFuture<V>(future); 1025 future.addListener(output, directExecutor()); 1026 return output; 1027 } 1028 1029 /** A wrapped future that does not propagate cancellation to its delegate. */ 1030 private static final class NonCancellationPropagatingFuture<V> 1031 extends AbstractFuture.TrustedFuture<V> implements Runnable { 1032 private ListenableFuture<V> delegate; 1033 1034 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 1035 this.delegate = delegate; 1036 } 1037 1038 @Override 1039 public void run() { 1040 // This prevents cancellation from propagating because we don't assign delegate until 1041 // delegate is already done, so calling cancel() on it is a no-op. 1042 ListenableFuture<V> localDelegate = delegate; 1043 if (localDelegate != null) { 1044 setFuture(localDelegate); 1045 } 1046 } 1047 1048 @Override 1049 protected String pendingToString() { 1050 ListenableFuture<V> localDelegate = delegate; 1051 if (localDelegate != null) { 1052 return "delegate=[" + localDelegate + "]"; 1053 } 1054 return null; 1055 } 1056 1057 @Override 1058 protected void afterDone() { 1059 delegate = null; 1060 } 1061 } 1062 1063 /** 1064 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 1065 * successful input futures. The list of results is in the same order as the input list, and if 1066 * any of the provided futures fails or is canceled, its corresponding position will contain 1067 * {@code null} (which is indistinguishable from the future having a successful value of {@code 1068 * null}). 1069 * 1070 * <p>Canceling this future will attempt to cancel all the component futures. 1071 * 1072 * @param futures futures to combine 1073 * @return a future that provides a list of the results of the component futures 1074 * @since 10.0 1075 */ 1076 @Beta 1077 @SafeVarargs 1078 public static <V> ListenableFuture<List<V>> successfulAsList( 1079 ListenableFuture<? extends V>... futures) { 1080 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1081 } 1082 1083 /** 1084 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 1085 * successful input futures. The list of results is in the same order as the input list, and if 1086 * any of the provided futures fails or is canceled, its corresponding position will contain 1087 * {@code null} (which is indistinguishable from the future having a successful value of {@code 1088 * null}). 1089 * 1090 * <p>Canceling this future will attempt to cancel all the component futures. 1091 * 1092 * @param futures futures to combine 1093 * @return a future that provides a list of the results of the component futures 1094 * @since 10.0 1095 */ 1096 @Beta 1097 public static <V> ListenableFuture<List<V>> successfulAsList( 1098 Iterable<? extends ListenableFuture<? extends V>> futures) { 1099 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1100 } 1101 1102 /** 1103 * Returns a list of delegate futures that correspond to the futures received in the order that 1104 * they complete. Delegate futures return the same value or throw the same exception as the 1105 * corresponding input future returns/throws. 1106 * 1107 * <p>"In the order that they complete" means, for practical purposes, about what you would 1108 * expect, but there are some subtleties. First, we do guarantee that, if the output future at 1109 * index n is done, the output future at index n-1 is also done. (But as usual with futures, some 1110 * listeners for future n may complete before some for future n-1.) However, it is possible, if 1111 * one input completes with result X and another later with result Y, for Y to come before X in 1112 * the output future list. (Such races are impossible to solve without global synchronization of 1113 * all future completions. And they should have little practical impact.) 1114 * 1115 * <p>Cancelling a delegate future propagates to input futures once all the delegates complete, 1116 * either from cancellation or because an input future has completed. If N futures are passed in, 1117 * and M delegates are cancelled, the remaining M input futures will be cancelled once N - M of 1118 * the input futures complete. If all the delegates are cancelled, all the input futures will be 1119 * too. 1120 * 1121 * @since 17.0 1122 */ 1123 @Beta 1124 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder( 1125 Iterable<? extends ListenableFuture<? extends T>> futures) { 1126 // Can't use Iterables.toArray because it's not gwt compatible 1127 final Collection<ListenableFuture<? extends T>> collection; 1128 if (futures instanceof Collection) { 1129 collection = (Collection<ListenableFuture<? extends T>>) futures; 1130 } else { 1131 collection = ImmutableList.copyOf(futures); 1132 } 1133 @SuppressWarnings("unchecked") 1134 ListenableFuture<? extends T>[] copy = 1135 (ListenableFuture<? extends T>[]) 1136 collection.toArray(new ListenableFuture[collection.size()]); 1137 final InCompletionOrderState<T> state = new InCompletionOrderState<>(copy); 1138 ImmutableList.Builder<AbstractFuture<T>> delegatesBuilder = ImmutableList.builder(); 1139 for (int i = 0; i < copy.length; i++) { 1140 delegatesBuilder.add(new InCompletionOrderFuture<T>(state)); 1141 } 1142 1143 final ImmutableList<AbstractFuture<T>> delegates = delegatesBuilder.build(); 1144 for (int i = 0; i < copy.length; i++) { 1145 final int localI = i; 1146 copy[i].addListener( 1147 new Runnable() { 1148 @Override 1149 public void run() { 1150 state.recordInputCompletion(delegates, localI); 1151 } 1152 }, 1153 directExecutor()); 1154 } 1155 1156 @SuppressWarnings("unchecked") 1157 ImmutableList<ListenableFuture<T>> delegatesCast = (ImmutableList) delegates; 1158 return delegatesCast; 1159 } 1160 1161 // This can't be a TrustedFuture, because TrustedFuture has clever optimizations that 1162 // mean cancel won't be called if this Future is passed into setFuture, and then 1163 // cancelled. 1164 private static final class InCompletionOrderFuture<T> extends AbstractFuture<T> { 1165 private InCompletionOrderState<T> state; 1166 1167 private InCompletionOrderFuture(InCompletionOrderState<T> state) { 1168 this.state = state; 1169 } 1170 1171 @Override 1172 public boolean cancel(boolean interruptIfRunning) { 1173 InCompletionOrderState<T> localState = state; 1174 if (super.cancel(interruptIfRunning)) { 1175 localState.recordOutputCancellation(interruptIfRunning); 1176 return true; 1177 } 1178 return false; 1179 } 1180 1181 @Override 1182 public void afterDone() { 1183 state = null; 1184 } 1185 1186 @Override 1187 protected String pendingToString() { 1188 InCompletionOrderState<T> localState = state; 1189 if (localState != null) { 1190 // Don't print the actual array! We don't want inCompletionOrder(list).toString() to have 1191 // quadratic output. 1192 return "inputCount=[" 1193 + localState.inputFutures.length 1194 + "], remaining=[" 1195 + localState.incompleteOutputCount.get() 1196 + "]"; 1197 } 1198 return null; 1199 } 1200 } 1201 1202 private static final class InCompletionOrderState<T> { 1203 // A happens-before edge between the writes of these fields and their reads exists, because 1204 // in order to read these fields, the corresponding write to incompleteOutputCount must have 1205 // been read. 1206 private boolean wasCancelled = false; 1207 private boolean shouldInterrupt = true; 1208 private final AtomicInteger incompleteOutputCount; 1209 private final ListenableFuture<? extends T>[] inputFutures; 1210 private volatile int delegateIndex = 0; 1211 1212 private InCompletionOrderState(ListenableFuture<? extends T>[] inputFutures) { 1213 this.inputFutures = inputFutures; 1214 incompleteOutputCount = new AtomicInteger(inputFutures.length); 1215 } 1216 1217 private void recordOutputCancellation(boolean interruptIfRunning) { 1218 wasCancelled = true; 1219 // If all the futures were cancelled with interruption, cancel the input futures 1220 // with interruption; otherwise cancel without 1221 if (!interruptIfRunning) { 1222 shouldInterrupt = false; 1223 } 1224 recordCompletion(); 1225 } 1226 1227 private void recordInputCompletion( 1228 ImmutableList<AbstractFuture<T>> delegates, int inputFutureIndex) { 1229 ListenableFuture<? extends T> inputFuture = inputFutures[inputFutureIndex]; 1230 // Null out our reference to this future, so it can be GCed 1231 inputFutures[inputFutureIndex] = null; 1232 for (int i = delegateIndex; i < delegates.size(); i++) { 1233 if (delegates.get(i).setFuture(inputFuture)) { 1234 recordCompletion(); 1235 // this is technically unnecessary, but should speed up later accesses 1236 delegateIndex = i + 1; 1237 return; 1238 } 1239 } 1240 // If all the delegates were complete, no reason for the next listener to have to 1241 // go through the whole list. Avoids O(n^2) behavior when the entire output list is 1242 // cancelled. 1243 delegateIndex = delegates.size(); 1244 } 1245 1246 private void recordCompletion() { 1247 if (incompleteOutputCount.decrementAndGet() == 0 && wasCancelled) { 1248 for (ListenableFuture<?> toCancel : inputFutures) { 1249 if (toCancel != null) { 1250 toCancel.cancel(shouldInterrupt); 1251 } 1252 } 1253 } 1254 } 1255 } 1256 1257 /** 1258 * Registers separate success and failure callbacks to be run when the {@code Future}'s 1259 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 1260 * computation is already complete, immediately. 1261 * 1262 * <p>There is no guaranteed ordering of execution of callbacks, but any callback added through 1263 * this method is guaranteed to be called once the computation is complete. 1264 * 1265 * <p>Example: 1266 * 1267 * <pre>{@code 1268 * ListenableFuture<QueryResult> future = ...; 1269 * addCallback(future, 1270 * new FutureCallback<QueryResult>() { 1271 * public void onSuccess(QueryResult result) { 1272 * storeInCache(result); 1273 * } 1274 * public void onFailure(Throwable t) { 1275 * reportError(t); 1276 * } 1277 * }); 1278 * }</pre> 1279 * 1280 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 1281 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 1282 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 1283 * also applicable to heavyweight callbacks passed to this method. 1284 * 1285 * <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link 1286 * ListenableFuture#addListener addListener}. 1287 * 1288 * @param future The future attach the callback to. 1289 * @param callback The callback to invoke when {@code future} is completed. 1290 * @since 10.0 1291 * @deprecated Use {@linkplain #addCallback(ListenableFuture, FutureCallback, Executor) the 1292 * overload that requires an executor}. For identical behavior, pass {@link 1293 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 1294 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1295 * documentation. This method is scheduled to be removed in April 2018. 1296 */ 1297 @Deprecated 1298 public static <V> void addCallback( 1299 ListenableFuture<V> future, FutureCallback<? super V> callback) { 1300 addCallback(future, callback, directExecutor()); 1301 } 1302 1303 /** 1304 * Registers separate success and failure callbacks to be run when the {@code Future}'s 1305 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 1306 * computation is already complete, immediately. 1307 * 1308 * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of 1309 * callbacks, but any callback added through this method is guaranteed to be called once the 1310 * computation is complete. 1311 * 1312 * <p>Example: 1313 * 1314 * <pre>{@code 1315 * ListenableFuture<QueryResult> future = ...; 1316 * Executor e = ... 1317 * addCallback(future, 1318 * new FutureCallback<QueryResult>() { 1319 * public void onSuccess(QueryResult result) { 1320 * storeInCache(result); 1321 * } 1322 * public void onFailure(Throwable t) { 1323 * reportError(t); 1324 * } 1325 * }, e); 1326 * }</pre> 1327 * 1328 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 1329 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1330 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 1331 * callbacks passed to this method. 1332 * 1333 * <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link 1334 * ListenableFuture#addListener addListener}. 1335 * 1336 * @param future The future attach the callback to. 1337 * @param callback The callback to invoke when {@code future} is completed. 1338 * @param executor The executor to run {@code callback} when the future completes. 1339 * @since 10.0 1340 */ 1341 public static <V> void addCallback( 1342 final ListenableFuture<V> future, 1343 final FutureCallback<? super V> callback, 1344 Executor executor) { 1345 Preconditions.checkNotNull(callback); 1346 future.addListener(new CallbackListener<V>(future, callback), executor); 1347 } 1348 1349 /** See {@link #addCallback(ListenableFuture, FutureCallback, Executor)} for behavioral notes. */ 1350 private static final class CallbackListener<V> implements Runnable { 1351 final Future<V> future; 1352 final FutureCallback<? super V> callback; 1353 1354 CallbackListener(Future<V> future, FutureCallback<? super V> callback) { 1355 this.future = future; 1356 this.callback = callback; 1357 } 1358 1359 @Override 1360 public void run() { 1361 final V value; 1362 try { 1363 value = getDone(future); 1364 } catch (ExecutionException e) { 1365 callback.onFailure(e.getCause()); 1366 return; 1367 } catch (RuntimeException e) { 1368 callback.onFailure(e); 1369 return; 1370 } catch (Error e) { 1371 callback.onFailure(e); 1372 return; 1373 } 1374 callback.onSuccess(value); 1375 } 1376 1377 @Override 1378 public String toString() { 1379 return MoreObjects.toStringHelper(this).addValue(callback).toString(); 1380 } 1381 } 1382 1383 /** 1384 * Returns the result of the input {@code Future}, which must have already completed. 1385 * 1386 * <p>The benefits of this method are twofold. First, the name "getDone" suggests to readers that 1387 * the {@code Future} is already done. Second, if buggy code calls {@code getDone} on a {@code 1388 * Future} that is still pending, the program will throw instead of block. This can be important 1389 * for APIs like {@link #whenAllComplete whenAllComplete(...)}{@code .}{@link 1390 * FutureCombiner#call(Callable) call(...)}, where it is easy to use a new input from the {@code 1391 * call} implementation but forget to add it to the arguments of {@code whenAllComplete}. 1392 * 1393 * <p>If you are looking for a method to determine whether a given {@code Future} is done, use the 1394 * instance method {@link Future#isDone()}. 1395 * 1396 * @throws ExecutionException if the {@code Future} failed with an exception 1397 * @throws CancellationException if the {@code Future} was cancelled 1398 * @throws IllegalStateException if the {@code Future} is not done 1399 * @since 20.0 1400 */ 1401 @CanIgnoreReturnValue 1402 // TODO(cpovirk): Consider calling getDone() in our own code. 1403 public static <V> V getDone(Future<V> future) throws ExecutionException { 1404 /* 1405 * We throw IllegalStateException, since the call could succeed later. Perhaps we "should" throw 1406 * IllegalArgumentException, since the call could succeed with a different argument. Those 1407 * exceptions' docs suggest that either is acceptable. Google's Java Practices page recommends 1408 * IllegalArgumentException here, in part to keep its recommendation simple: Static methods 1409 * should throw IllegalStateException only when they use static state. 1410 * 1411 * 1412 * Why do we deviate here? The answer: We want for fluentFuture.getDone() to throw the same 1413 * exception as Futures.getDone(fluentFuture). 1414 */ 1415 checkState(future.isDone(), "Future was expected to be done: %s", future); 1416 return getUninterruptibly(future); 1417 } 1418 1419 /** 1420 * Returns the result of {@link Future#get()}, converting most exceptions to a new instance of the 1421 * given checked exception type. This reduces boilerplate for a common use of {@code Future} in 1422 * which it is unnecessary to programmatically distinguish between exception types or to extract 1423 * other information from the exception instance. 1424 * 1425 * <p>Exceptions from {@code Future.get} are treated as follows: 1426 * <ul> 1427 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@code X} if the cause is 1428 * a checked exception, an {@link UncheckedExecutionException} if the cause is a {@code 1429 * RuntimeException}, or an {@link ExecutionError} if the cause is an {@code Error}. 1430 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after restoring the 1431 * interrupt). 1432 * <li>Any {@link CancellationException} is propagated untouched, as is any other {@link 1433 * RuntimeException} (though {@code get} implementations are discouraged from throwing such 1434 * exceptions). 1435 * </ul> 1436 * 1437 * <p>The overall principle is to continue to treat every checked exception as a checked 1438 * exception, every unchecked exception as an unchecked exception, and every error as an error. In 1439 * addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the 1440 * new stack trace matches that of the current thread. 1441 * 1442 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor 1443 * that accepts zero or more arguments, all of type {@code String} or {@code Throwable} 1444 * (preferring constructors with at least one {@code String}) and calling the constructor via 1445 * reflection. If the exception did not already have a cause, one is set by calling {@link 1446 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code 1447 * IllegalArgumentException} is thrown. 1448 * 1449 * @throws X if {@code get} throws any checked exception except for an {@code ExecutionException} 1450 * whose cause is not itself a checked exception 1451 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a 1452 * {@code RuntimeException} as its cause 1453 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1454 * Error} as its cause 1455 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1456 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or 1457 * does not have a suitable constructor 1458 * @since 19.0 (in 10.0 as {@code get}) 1459 */ 1460 @CanIgnoreReturnValue 1461 @GwtIncompatible // reflection 1462 public static <V, X extends Exception> V getChecked(Future<V> future, Class<X> exceptionClass) 1463 throws X { 1464 return FuturesGetChecked.getChecked(future, exceptionClass); 1465 } 1466 1467 /** 1468 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most exceptions to a new 1469 * instance of the given checked exception type. This reduces boilerplate for a common use of 1470 * {@code Future} in which it is unnecessary to programmatically distinguish between exception 1471 * types or to extract other information from the exception instance. 1472 * 1473 * <p>Exceptions from {@code Future.get} are treated as follows: 1474 * <ul> 1475 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@code X} if the cause is 1476 * a checked exception, an {@link UncheckedExecutionException} if the cause is a {@code 1477 * RuntimeException}, or an {@link ExecutionError} if the cause is an {@code Error}. 1478 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after restoring the 1479 * interrupt). 1480 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1481 * <li>Any {@link CancellationException} is propagated untouched, as is any other {@link 1482 * RuntimeException} (though {@code get} implementations are discouraged from throwing such 1483 * exceptions). 1484 * </ul> 1485 * 1486 * <p>The overall principle is to continue to treat every checked exception as a checked 1487 * exception, every unchecked exception as an unchecked exception, and every error as an error. In 1488 * addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the 1489 * new stack trace matches that of the current thread. 1490 * 1491 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor 1492 * that accepts zero or more arguments, all of type {@code String} or {@code Throwable} 1493 * (preferring constructors with at least one {@code String}) and calling the constructor via 1494 * reflection. If the exception did not already have a cause, one is set by calling {@link 1495 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code 1496 * IllegalArgumentException} is thrown. 1497 * 1498 * @throws X if {@code get} throws any checked exception except for an {@code ExecutionException} 1499 * whose cause is not itself a checked exception 1500 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a 1501 * {@code RuntimeException} as its cause 1502 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1503 * Error} as its cause 1504 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1505 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or 1506 * does not have a suitable constructor 1507 * @since 19.0 (in 10.0 as {@code get} and with different parameter order) 1508 */ 1509 @CanIgnoreReturnValue 1510 @GwtIncompatible // reflection 1511 public static <V, X extends Exception> V getChecked( 1512 Future<V> future, Class<X> exceptionClass, long timeout, TimeUnit unit) throws X { 1513 return FuturesGetChecked.getChecked(future, exceptionClass, timeout, unit); 1514 } 1515 1516 /** 1517 * Returns the result of calling {@link Future#get()} uninterruptibly on a task known not to throw 1518 * a checked exception. This makes {@code Future} more suitable for lightweight, fast-running 1519 * tasks that, barring bugs in the code, will not fail. This gives it exception-handling behavior 1520 * similar to that of {@code ForkJoinTask.join}. 1521 * 1522 * <p>Exceptions from {@code Future.get} are treated as follows: 1523 * <ul> 1524 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@link 1525 * UncheckedExecutionException} (if the cause is an {@code Exception}) or {@link 1526 * ExecutionError} (if the cause is an {@code Error}). 1527 * <li>Any {@link InterruptedException} causes a retry of the {@code get} call. The interrupt is 1528 * restored before {@code getUnchecked} returns. 1529 * <li>Any {@link CancellationException} is propagated untouched. So is any other {@link 1530 * RuntimeException} ({@code get} implementations are discouraged from throwing such 1531 * exceptions). 1532 * </ul> 1533 * 1534 * <p>The overall principle is to eliminate all checked exceptions: to loop to avoid {@code 1535 * InterruptedException}, to pass through {@code CancellationException}, and to wrap any exception 1536 * from the underlying computation in an {@code UncheckedExecutionException} or {@code 1537 * ExecutionError}. 1538 * 1539 * <p>For an uninterruptible {@code get} that preserves other exceptions, see {@link 1540 * Uninterruptibles#getUninterruptibly(Future)}. 1541 * 1542 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with an 1543 * {@code Exception} as its cause 1544 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1545 * Error} as its cause 1546 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1547 * @since 10.0 1548 */ 1549 @CanIgnoreReturnValue 1550 @GwtIncompatible // TODO 1551 public static <V> V getUnchecked(Future<V> future) { 1552 checkNotNull(future); 1553 try { 1554 return getUninterruptibly(future); 1555 } catch (ExecutionException e) { 1556 wrapAndThrowUnchecked(e.getCause()); 1557 throw new AssertionError(); 1558 } 1559 } 1560 1561 @GwtIncompatible // TODO 1562 private static void wrapAndThrowUnchecked(Throwable cause) { 1563 if (cause instanceof Error) { 1564 throw new ExecutionError((Error) cause); 1565 } 1566 /* 1567 * It's an Exception. (Or it's a non-Error, non-Exception Throwable. From my survey of such 1568 * classes, I believe that most users intended to extend Exception, so we'll treat it like an 1569 * Exception.) 1570 */ 1571 throw new UncheckedExecutionException(cause); 1572 } 1573 1574 /* 1575 * Arguably we don't need a timed getUnchecked because any operation slow enough to require a 1576 * timeout is heavyweight enough to throw a checked exception and therefore be inappropriate to 1577 * use with getUnchecked. Further, it's not clear that converting the checked TimeoutException to 1578 * a RuntimeException -- especially to an UncheckedExecutionException, since it wasn't thrown by 1579 * the computation -- makes sense, and if we don't convert it, the user still has to write a 1580 * try-catch block. 1581 * 1582 * If you think you would use this method, let us know. You might also also look into the 1583 * Fork-Join framework: http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html 1584 */ 1585 1586 /** 1587 * A checked future that uses a function to map from exceptions to the appropriate checked type. 1588 */ 1589 @GwtIncompatible // TODO 1590 private static class MappingCheckedFuture<V, X extends Exception> 1591 extends AbstractCheckedFuture<V, X> { 1592 1593 final Function<? super Exception, X> mapper; 1594 1595 MappingCheckedFuture(ListenableFuture<V> delegate, Function<? super Exception, X> mapper) { 1596 super(delegate); 1597 1598 this.mapper = checkNotNull(mapper); 1599 } 1600 1601 @Override 1602 protected X mapException(Exception e) { 1603 return mapper.apply(e); 1604 } 1605 } 1606}