001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Preconditions.checkState; 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 021 022import com.google.common.annotations.Beta; 023import com.google.common.annotations.GwtCompatible; 024import com.google.common.annotations.GwtIncompatible; 025import com.google.common.base.Function; 026import com.google.common.base.MoreObjects; 027import com.google.common.base.Preconditions; 028import com.google.common.collect.ImmutableList; 029import com.google.common.util.concurrent.CollectionFuture.ListFuture; 030import com.google.common.util.concurrent.ImmediateFuture.ImmediateCancelledFuture; 031import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedCheckedFuture; 032import com.google.common.util.concurrent.ImmediateFuture.ImmediateFailedFuture; 033import com.google.common.util.concurrent.ImmediateFuture.ImmediateSuccessfulCheckedFuture; 034import com.google.common.util.concurrent.ImmediateFuture.ImmediateSuccessfulFuture; 035import com.google.errorprone.annotations.CanIgnoreReturnValue; 036import java.util.Collection; 037import java.util.List; 038import java.util.concurrent.Callable; 039import java.util.concurrent.CancellationException; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.Executor; 042import java.util.concurrent.Future; 043import java.util.concurrent.ScheduledExecutorService; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.TimeoutException; 046import java.util.concurrent.atomic.AtomicInteger; 047import javax.annotation.Nullable; 048 049/** 050 * Static utility methods pertaining to the {@link Future} interface. 051 * 052 * <p>Many of these methods use the {@link ListenableFuture} API; consult the Guava User Guide 053 * article on <a href="https://github.com/google/guava/wiki/ListenableFutureExplained">{@code 054 * ListenableFuture}</a>. 055 * 056 * <p>The main purpose of {@code ListenableFuture} is to help you chain together a graph of 057 * asynchronous operations. You can chain them together manually with calls to methods like {@link 058 * Futures#transform(ListenableFuture, Function, Executor) Futures.transform}, but you will often 059 * find it easier to use a framework. Frameworks automate the process, often adding features like 060 * monitoring, debugging, and cancellation. Examples of frameworks include: 061 * 062 * <ul> 063 * <li><a href="http://google.github.io/dagger/producers.html">Dagger Producers</a> 064 * </ul> 065 * 066 * <p>If you do chain your operations manually, you may want to use {@link FluentFuture}. 067 * 068 * @author Kevin Bourrillion 069 * @author Nishant Thakkar 070 * @author Sven Mawson 071 * @since 1.0 072 */ 073@Beta 074@GwtCompatible(emulated = true) 075public final class Futures extends GwtFuturesCatchingSpecialization { 076 077 // A note on memory visibility. 078 // Many of the utilities in this class (transform, withFallback, withTimeout, asList, combine) 079 // have two requirements that significantly complicate their design. 080 // 1. Cancellation should propagate from the returned future to the input future(s). 081 // 2. The returned futures shouldn't unnecessarily 'pin' their inputs after completion. 082 // 083 // A consequence of these requirements is that the delegate futures cannot be stored in 084 // final fields. 085 // 086 // For simplicity the rest of this description will discuss Futures.catching since it is the 087 // simplest instance, though very similar descriptions apply to many other classes in this file. 088 // 089 // In the constructor of AbstractCatchingFuture, the delegate future is assigned to a field 090 // 'inputFuture'. That field is non-final and non-volatile. There are 2 places where the 091 // 'inputFuture' field is read and where we will have to consider visibility of the write 092 // operation in the constructor. 093 // 094 // 1. In the listener that performs the callback. In this case it is fine since inputFuture is 095 // assigned prior to calling addListener, and addListener happens-before any invocation of the 096 // listener. Notably, this means that 'volatile' is unnecessary to make 'inputFuture' visible 097 // to the listener. 098 // 099 // 2. In done() where we may propagate cancellation to the input. In this case it is _not_ fine. 100 // There is currently nothing that enforces that the write to inputFuture in the constructor is 101 // visible to done(). This is because there is no happens before edge between the write and a 102 // (hypothetical) unsafe read by our caller. Note: adding 'volatile' does not fix this issue, 103 // it would just add an edge such that if done() observed non-null, then it would also 104 // definitely observe all earlier writes, but we still have no guarantee that done() would see 105 // the inital write (just stronger guarantees if it does). 106 // 107 // See: http://cs.oswego.edu/pipermail/concurrency-interest/2015-January/013800.html 108 // For a (long) discussion about this specific issue and the general futility of life. 109 // 110 // For the time being we are OK with the problem discussed above since it requires a caller to 111 // introduce a very specific kind of data-race. And given the other operations performed by these 112 // methods that involve volatile read/write operations, in practice there is no issue. Also, the 113 // way in such a visibility issue would surface is most likely as a failure of cancel() to 114 // propagate to the input. Cancellation propagation is fundamentally racy so this is fine. 115 // 116 // Future versions of the JMM may revise safe construction semantics in such a way that we can 117 // safely publish these objects and we won't need this whole discussion. 118 // TODO(user,lukes): consider adding volatile to all these fields since in current known JVMs 119 // that should resolve the issue. This comes at the cost of adding more write barriers to the 120 // implementations. 121 122 private Futures() {} 123 124 /** 125 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} and a {@link Function} 126 * that maps from {@link Exception} instances into the appropriate checked type. 127 * 128 * <p><b>Warning:</b> We recommend against using {@code CheckedFuture} in new projects. {@code 129 * CheckedFuture} is difficult to build libraries atop. {@code CheckedFuture} ports of methods 130 * like {@link Futures#transformAsync} have historically had bugs, and some of these bugs are 131 * necessary, unavoidable consequences of the {@code CheckedFuture} API. Additionally, {@code 132 * CheckedFuture} encourages users to take exceptions from one thread and rethrow them in another, 133 * producing confusing stack traces. 134 * 135 * <p>The given mapping function will be applied to an {@link InterruptedException}, a {@link 136 * CancellationException}, or an {@link ExecutionException}. See {@link Future#get()} for details 137 * on the exceptions thrown. 138 * 139 * @since 9.0 (source-compatible since 1.0) 140 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 141 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 142 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 143 * Additionally, it has a surprising policy about which exceptions to map and which to leave 144 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 145 * use, possibly specializing them to the particular exception type they use. We recommend 146 * that most people use {@code ListenableFuture} and perform any exception wrapping 147 * themselves. This method is scheduled for removal from Guava in February 2018. 148 */ 149 @Deprecated 150 @GwtIncompatible // TODO 151 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 152 ListenableFuture<V> future, Function<? super Exception, X> mapper) { 153 return new MappingCheckedFuture<>(checkNotNull(future), mapper); 154 } 155 156 /** 157 * Creates a {@code ListenableFuture} which has its value set immediately upon construction. The 158 * getters just return the value. This {@code Future} can't be canceled or timed out and its 159 * {@code isDone()} method always returns {@code true}. 160 */ 161 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 162 if (value == null) { 163 // This cast is safe because null is assignable to V for all V (i.e. it is covariant) 164 @SuppressWarnings({"unchecked", "rawtypes"}) 165 ListenableFuture<V> typedNull = (ListenableFuture) ImmediateSuccessfulFuture.NULL; 166 return typedNull; 167 } 168 return new ImmediateSuccessfulFuture<V>(value); 169 } 170 171 /** 172 * Returns a {@code CheckedFuture} which has its value set immediately upon construction. 173 * 174 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 175 * returns {@code true}. Calling {@code get()} or {@code checkedGet()} will immediately return the 176 * provided value. 177 * 178 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 179 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 180 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 181 * Additionally, it has a surprising policy about which exceptions to map and which to leave 182 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 183 * use, possibly specializing them to the particular exception type they use. We recommend 184 * that most people use {@code ListenableFuture} and perform any exception wrapping 185 * themselves. This method is scheduled for removal from Guava in February 2018. 186 */ 187 @Deprecated 188 @GwtIncompatible // TODO 189 public static <V, X extends Exception> CheckedFuture<V, X> immediateCheckedFuture( 190 @Nullable V value) { 191 return new ImmediateSuccessfulCheckedFuture<>(value); 192 } 193 194 /** 195 * Returns a {@code ListenableFuture} which has an exception set immediately upon construction. 196 * 197 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 198 * returns {@code true}. Calling {@code get()} will immediately throw the provided {@code 199 * Throwable} wrapped in an {@code ExecutionException}. 200 */ 201 public static <V> ListenableFuture<V> immediateFailedFuture(Throwable throwable) { 202 checkNotNull(throwable); 203 return new ImmediateFailedFuture<V>(throwable); 204 } 205 206 /** 207 * Creates a {@code ListenableFuture} which is cancelled immediately upon construction, so that 208 * {@code isCancelled()} always returns {@code true}. 209 * 210 * @since 14.0 211 */ 212 public static <V> ListenableFuture<V> immediateCancelledFuture() { 213 return new ImmediateCancelledFuture<V>(); 214 } 215 216 /** 217 * Returns a {@code CheckedFuture} which has an exception set immediately upon construction. 218 * 219 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} method always 220 * returns {@code true}. Calling {@code get()} will immediately throw the provided {@code 221 * Exception} wrapped in an {@code ExecutionException}, and calling {@code checkedGet()} will 222 * throw the provided exception itself. 223 * 224 * @deprecated {@link CheckedFuture} cannot properly support the chained operations that are the 225 * primary goal of {@link ListenableFuture}. {@code CheckedFuture} also encourages users to 226 * rethrow exceptions from one thread in another thread, producing misleading stack traces. 227 * Additionally, it has a surprising policy about which exceptions to map and which to leave 228 * untouched. Guava users who want a {@code CheckedFuture} can fork the classes for their own 229 * use, possibly specializing them to the particular exception type they use. We recommend 230 * that most people use {@code ListenableFuture} and perform any exception wrapping 231 * themselves. This method is scheduled for removal from Guava in February 2018. 232 */ 233 @Deprecated 234 @GwtIncompatible // TODO 235 public static <V, X extends Exception> CheckedFuture<V, X> immediateFailedCheckedFuture( 236 X exception) { 237 checkNotNull(exception); 238 return new ImmediateFailedCheckedFuture<>(exception); 239 } 240 241 /** 242 * Executes {@code callable} on the specified {@code executor}, returning a {@code Future}. 243 * 244 * @throws RejectedExecutionException if the task cannot be scheduled for execution 245 * @since 23.0 246 */ 247 public static <O> ListenableFuture<O> submitAsync(AsyncCallable<O> callable, Executor executor) { 248 TrustedListenableFutureTask<O> task = TrustedListenableFutureTask.create(callable); 249 executor.execute(task); 250 return task; 251 } 252 253 /** 254 * Schedules {@code callable} on the specified {@code executor}, returning a {@code Future}. 255 * 256 * @throws RejectedExecutionException if the task cannot be scheduled for execution 257 * @since 23.0 258 */ 259 @GwtIncompatible // java.util.concurrent.ScheduledExecutorService 260 public static <O> ListenableFuture<O> scheduleAsync( 261 AsyncCallable<O> callable, 262 long delay, 263 TimeUnit timeUnit, 264 ScheduledExecutorService executorService) { 265 TrustedListenableFutureTask<O> task = TrustedListenableFutureTask.create(callable); 266 final Future<?> scheduled = executorService.schedule(task, delay, timeUnit); 267 task.addListener( 268 new Runnable() { 269 @Override 270 public void run() { 271 // Don't want to interrupt twice 272 scheduled.cancel(false); 273 } 274 }, 275 directExecutor()); 276 return task; 277 } 278 279 /** 280 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 281 * primary input fails with the given {@code exceptionType}, from the result provided by the 282 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 283 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 284 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 285 * Future}. 286 * 287 * <p>Usage example: 288 * 289 * <pre>{@code 290 * ListenableFuture<Integer> fetchCounterFuture = ...; 291 * 292 * // Falling back to a zero counter in case an exception happens when 293 * // processing the RPC to fetch counters. 294 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 295 * fetchCounterFuture, FetchException.class, x -> 0); 296 * }</pre> 297 * 298 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 299 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 300 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 301 * also applicable to heavyweight functions passed to this method. 302 * 303 * @param input the primary input {@code Future} 304 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 305 * type is matched against the input's exception. "The input's exception" means the cause of 306 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 307 * different kind of exception, that exception itself. To avoid hiding bugs and other 308 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 309 * Throwable.class} in particular. 310 * @param fallback the {@link Function} to be called if {@code input} fails with the expected 311 * exception type. The function's argument is the input's exception. "The input's exception" 312 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 313 * {@code get()} throws a different kind of exception, that exception itself. 314 * @since 19.0 315 * @deprecated Use {@linkplain #catching(ListenableFuture, Class, Function, Executor) the overload 316 * that requires an executor}. For identical behavior, pass {@link 317 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 318 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 319 * documentation. This method is scheduled to be removed in April 2018. 320 */ 321 @Deprecated 322 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 323 public static <V, X extends Throwable> ListenableFuture<V> catching( 324 ListenableFuture<? extends V> input, 325 Class<X> exceptionType, 326 Function<? super X, ? extends V> fallback) { 327 return AbstractCatchingFuture.create(input, exceptionType, fallback, directExecutor()); 328 } 329 330 /** 331 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 332 * primary input fails with the given {@code exceptionType}, from the result provided by the 333 * {@code fallback}. {@link Function#apply} is not invoked until the primary input has failed, so 334 * if the primary input succeeds, it is never invoked. If, during the invocation of {@code 335 * fallback}, an exception is thrown, this exception is used as the result of the output {@code 336 * Future}. 337 * 338 * <p>Usage example: 339 * 340 * <pre>{@code 341 * ListenableFuture<Integer> fetchCounterFuture = ...; 342 * 343 * // Falling back to a zero counter in case an exception happens when 344 * // processing the RPC to fetch counters. 345 * ListenableFuture<Integer> faultTolerantFuture = Futures.catching( 346 * fetchCounterFuture, FetchException.class, x -> 0, directExecutor()); 347 * }</pre> 348 * 349 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 350 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 351 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 352 * functions passed to this method. 353 * 354 * @param input the primary input {@code Future} 355 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 356 * type is matched against the input's exception. "The input's exception" means the cause of 357 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 358 * different kind of exception, that exception itself. To avoid hiding bugs and other 359 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 360 * Throwable.class} in particular. 361 * @param fallback the {@link Function} to be called if {@code input} fails with the expected 362 * exception type. The function's argument is the input's exception. "The input's exception" 363 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 364 * {@code get()} throws a different kind of exception, that exception itself. 365 * @param executor the executor that runs {@code fallback} if {@code input} fails 366 * @since 19.0 367 */ 368 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 369 public static <V, X extends Throwable> ListenableFuture<V> catching( 370 ListenableFuture<? extends V> input, 371 Class<X> exceptionType, 372 Function<? super X, ? extends V> fallback, 373 Executor executor) { 374 return AbstractCatchingFuture.create(input, exceptionType, fallback, executor); 375 } 376 377 /** 378 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 379 * primary input fails with the given {@code exceptionType}, from the result provided by the 380 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 381 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 382 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 383 * {@code Future}. 384 * 385 * <p>Usage examples: 386 * 387 * <pre>{@code 388 * ListenableFuture<Integer> fetchCounterFuture = ...; 389 * 390 * // Falling back to a zero counter in case an exception happens when 391 * // processing the RPC to fetch counters. 392 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 393 * fetchCounterFuture, FetchException.class, x -> immediateFuture(0)); 394 * }</pre> 395 * 396 * <p>The fallback can also choose to propagate the original exception when desired: 397 * 398 * <pre>{@code 399 * ListenableFuture<Integer> fetchCounterFuture = ...; 400 * 401 * // Falling back to a zero counter only in case the exception was a 402 * // TimeoutException. 403 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 404 * fetchCounterFuture, 405 * FetchException.class, 406 * e -> { 407 * if (omitDataOnFetchFailure) { 408 * return immediateFuture(0); 409 * } 410 * throw e; 411 * }); 412 * }</pre> 413 * 414 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 415 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 416 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 417 * also applicable to heavyweight functions passed to this method. (Specifically, {@code 418 * directExecutor} functions should avoid heavyweight operations inside {@code 419 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 420 * completing the returned {@code Future}.) 421 * 422 * @param input the primary input {@code Future} 423 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 424 * type is matched against the input's exception. "The input's exception" means the cause of 425 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 426 * different kind of exception, that exception itself. To avoid hiding bugs and other 427 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 428 * Throwable.class} in particular. 429 * @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected 430 * exception type. The function's argument is the input's exception. "The input's exception" 431 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 432 * {@code get()} throws a different kind of exception, that exception itself. 433 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 434 * @deprecated Use {@linkplain #catchingAsync(ListenableFuture, Class, AsyncFunction, Executor) 435 * the overload that requires an executor}. For identical behavior, pass {@link 436 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 437 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 438 * documentation. This method is scheduled to be removed in April 2018. 439 */ 440 @CanIgnoreReturnValue // TODO(kak): @CheckReturnValue 441 @Deprecated 442 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 443 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 444 ListenableFuture<? extends V> input, 445 Class<X> exceptionType, 446 AsyncFunction<? super X, ? extends V> fallback) { 447 return AbstractCatchingFuture.create(input, exceptionType, fallback, directExecutor()); 448 } 449 450 /** 451 * Returns a {@code Future} whose result is taken from the given primary {@code input} or, if the 452 * primary input fails with the given {@code exceptionType}, from the result provided by the 453 * {@code fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has 454 * failed, so if the primary input succeeds, it is never invoked. If, during the invocation of 455 * {@code fallback}, an exception is thrown, this exception is used as the result of the output 456 * {@code Future}. 457 * 458 * <p>Usage examples: 459 * 460 * <pre>{@code 461 * ListenableFuture<Integer> fetchCounterFuture = ...; 462 * 463 * // Falling back to a zero counter in case an exception happens when 464 * // processing the RPC to fetch counters. 465 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 466 * fetchCounterFuture, FetchException.class, x -> immediateFuture(0), directExecutor()); 467 * }</pre> 468 * 469 * <p>The fallback can also choose to propagate the original exception when desired: 470 * 471 * <pre>{@code 472 * ListenableFuture<Integer> fetchCounterFuture = ...; 473 * 474 * // Falling back to a zero counter only in case the exception was a 475 * // TimeoutException. 476 * ListenableFuture<Integer> faultTolerantFuture = Futures.catchingAsync( 477 * fetchCounterFuture, 478 * FetchException.class, 479 * e -> { 480 * if (omitDataOnFetchFailure) { 481 * return immediateFuture(0); 482 * } 483 * throw e; 484 * }, 485 * directExecutor()); 486 * }</pre> 487 * 488 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 489 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 490 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 491 * functions passed to this method. (Specifically, {@code directExecutor} functions should avoid 492 * heavyweight operations inside {@code AsyncFunction.apply}. Any heavyweight operations should 493 * occur in other threads responsible for completing the returned {@code Future}.) 494 * 495 * @param input the primary input {@code Future} 496 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 497 * type is matched against the input's exception. "The input's exception" means the cause of 498 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 499 * different kind of exception, that exception itself. To avoid hiding bugs and other 500 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 501 * Throwable.class} in particular. 502 * @param fallback the {@link AsyncFunction} to be called if {@code input} fails with the expected 503 * exception type. The function's argument is the input's exception. "The input's exception" 504 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 505 * {@code get()} throws a different kind of exception, that exception itself. 506 * @param executor the executor that runs {@code fallback} if {@code input} fails 507 * @since 19.0 (similar functionality in 14.0 as {@code withFallback}) 508 */ 509 @CanIgnoreReturnValue // TODO(kak): @CheckReturnValue 510 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 511 public static <V, X extends Throwable> ListenableFuture<V> catchingAsync( 512 ListenableFuture<? extends V> input, 513 Class<X> exceptionType, 514 AsyncFunction<? super X, ? extends V> fallback, 515 Executor executor) { 516 return AbstractCatchingFuture.create(input, exceptionType, fallback, executor); 517 } 518 519 /** 520 * Returns a future that delegates to another but will finish early (via a {@link 521 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified duration expires. 522 * 523 * <p>The delegate future is interrupted and cancelled if it times out. 524 * 525 * @param delegate The future to delegate to. 526 * @param time when to timeout the future 527 * @param unit the time unit of the time parameter 528 * @param scheduledExecutor The executor service to enforce the timeout. 529 * @since 19.0 530 */ 531 @GwtIncompatible // java.util.concurrent.ScheduledExecutorService 532 public static <V> ListenableFuture<V> withTimeout( 533 ListenableFuture<V> delegate, 534 long time, 535 TimeUnit unit, 536 ScheduledExecutorService scheduledExecutor) { 537 return TimeoutFuture.create(delegate, time, unit, scheduledExecutor); 538 } 539 540 /** 541 * Returns a new {@code Future} whose result is asynchronously derived from the result of the 542 * given {@code Future}. If the given {@code Future} fails, the returned {@code Future} fails with 543 * the same exception (and the function is not invoked). 544 * 545 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 546 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 547 * Example usage: 548 * 549 * <pre>{@code 550 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 551 * ListenableFuture<QueryResult> queryFuture = 552 * transformAsync(rowKeyFuture, dataService::readFuture); 553 * }</pre> 554 * 555 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 556 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 557 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 558 * also applicable to heavyweight functions passed to this method. (Specifically, {@code 559 * directExecutor} functions should avoid heavyweight operations inside {@code 560 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 561 * completing the returned {@code Future}.) 562 * 563 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 564 * input future and that of the future returned by the function. That is, if the returned {@code 565 * Future} is cancelled, it will attempt to cancel the other two, and if either of the other two 566 * is cancelled, the returned {@code Future} will receive a callback in which it will attempt to 567 * cancel itself. 568 * 569 * @param input The future to transform 570 * @param function A function to transform the result of the input future to the result of the 571 * output future 572 * @return A future that holds result of the function (if the input succeeded) or the original 573 * input's failure (if not) 574 * @since 19.0 (in 11.0 as {@code transform}) 575 * @deprecated Use {@linkplain #transformAsync(ListenableFuture, AsyncFunction, Executor) the 576 * overload that requires an executor}. For identical behavior, pass {@link 577 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 578 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 579 * documentation. This method is scheduled to be removed in April 2018. 580 */ 581 @Deprecated 582 public static <I, O> ListenableFuture<O> transformAsync( 583 ListenableFuture<I> input, AsyncFunction<? super I, ? extends O> function) { 584 return AbstractTransformFuture.create(input, function, directExecutor()); 585 } 586 587 /** 588 * Returns a new {@code Future} whose result is asynchronously derived from the result of the 589 * given {@code Future}. If the given {@code Future} fails, the returned {@code Future} fails with 590 * the same exception (and the function is not invoked). 591 * 592 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 593 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 594 * Example usage: 595 * 596 * <pre>{@code 597 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 598 * ListenableFuture<QueryResult> queryFuture = 599 * transformAsync(rowKeyFuture, dataService::readFuture, executor); 600 * }</pre> 601 * 602 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 603 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 604 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 605 * functions passed to this method. (Specifically, {@code directExecutor} functions should avoid 606 * heavyweight operations inside {@code AsyncFunction.apply}. Any heavyweight operations should 607 * occur in other threads responsible for completing the returned {@code Future}.) 608 * 609 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 610 * input future and that of the future returned by the chain function. That is, if the returned 611 * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the 612 * other two is cancelled, the returned {@code Future} will receive a callback in which it will 613 * attempt to cancel itself. 614 * 615 * @param input The future to transform 616 * @param function A function to transform the result of the input future to the result of the 617 * output future 618 * @param executor Executor to run the function in. 619 * @return A future that holds result of the function (if the input succeeded) or the original 620 * input's failure (if not) 621 * @since 19.0 (in 11.0 as {@code transform}) 622 */ 623 public static <I, O> ListenableFuture<O> transformAsync( 624 ListenableFuture<I> input, 625 AsyncFunction<? super I, ? extends O> function, 626 Executor executor) { 627 return AbstractTransformFuture.create(input, function, executor); 628 } 629 630 /** 631 * Returns a new {@code Future} whose result is derived from the result of the given {@code 632 * Future}. If {@code input} fails, the returned {@code Future} fails with the same exception (and 633 * the function is not invoked). Example usage: 634 * 635 * <pre>{@code 636 * ListenableFuture<QueryResult> queryFuture = ...; 637 * ListenableFuture<List<Row>> rowsFuture = 638 * transform(queryFuture, QueryResult::getRows); 639 * }</pre> 640 * 641 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 642 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 643 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 644 * also applicable to heavyweight functions passed to this method. 645 * 646 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 647 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 648 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 649 * in which it will attempt to cancel itself. 650 * 651 * <p>An example use of this method is to convert a serializable object returned from an RPC into 652 * a POJO. 653 * 654 * @param input The future to transform 655 * @param function A Function to transform the results of the provided future to the results of 656 * the returned future. This will be run in the thread that notifies input it is complete. 657 * @return A future that holds result of the transformation. 658 * @since 9.0 (in 1.0 as {@code compose}) 659 * @deprecated Use {@linkplain #transform(ListenableFuture, Function, Executor) the overload that 660 * requires an executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, 661 * but consider whether another executor would be safer, as discussed in the {@link 662 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 663 * scheduled to be removed in April 2018. 664 */ 665 @Deprecated 666 public static <I, O> ListenableFuture<O> transform( 667 ListenableFuture<I> input, Function<? super I, ? extends O> function) { 668 return AbstractTransformFuture.create(input, function, directExecutor()); 669 } 670 671 /** 672 * Returns a new {@code Future} whose result is derived from the result of the given {@code 673 * Future}. If {@code input} fails, the returned {@code Future} fails with the same exception (and 674 * the function is not invoked). Example usage: 675 * 676 * <pre>{@code 677 * ListenableFuture<QueryResult> queryFuture = ...; 678 * ListenableFuture<List<Row>> rowsFuture = 679 * transform(queryFuture, QueryResult::getRows, executor); 680 * }</pre> 681 * 682 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 683 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 684 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 685 * functions passed to this method. 686 * 687 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 688 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 689 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 690 * in which it will attempt to cancel itself. 691 * 692 * <p>An example use of this method is to convert a serializable object returned from an RPC into 693 * a POJO. 694 * 695 * @param input The future to transform 696 * @param function A Function to transform the results of the provided future to the results of 697 * the returned future. 698 * @param executor Executor to run the function in. 699 * @return A future that holds result of the transformation. 700 * @since 9.0 (in 2.0 as {@code compose}) 701 */ 702 public static <I, O> ListenableFuture<O> transform( 703 ListenableFuture<I> input, Function<? super I, ? extends O> function, Executor executor) { 704 return AbstractTransformFuture.create(input, function, executor); 705 } 706 707 /** 708 * Like {@link #transform(ListenableFuture, Function, Executor)} except that the transformation 709 * {@code function} is invoked on each call to {@link Future#get() get()} on the returned future. 710 * 711 * <p>The returned {@code Future} reflects the input's cancellation state directly, and any 712 * attempt to cancel the returned Future is likewise passed through to the input Future. 713 * 714 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} only apply the timeout 715 * to the execution of the underlying {@code Future}, <em>not</em> to the execution of the 716 * transformation function. 717 * 718 * <p>The primary audience of this method is callers of {@code transform} who don't have a {@code 719 * ListenableFuture} available and do not mind repeated, lazy function evaluation. 720 * 721 * @param input The future to transform 722 * @param function A Function to transform the results of the provided future to the results of 723 * the returned future. 724 * @return A future that returns the result of the transformation. 725 * @since 10.0 726 */ 727 @GwtIncompatible // TODO 728 public static <I, O> Future<O> lazyTransform( 729 final Future<I> input, final Function<? super I, ? extends O> function) { 730 checkNotNull(input); 731 checkNotNull(function); 732 return new Future<O>() { 733 734 @Override 735 public boolean cancel(boolean mayInterruptIfRunning) { 736 return input.cancel(mayInterruptIfRunning); 737 } 738 739 @Override 740 public boolean isCancelled() { 741 return input.isCancelled(); 742 } 743 744 @Override 745 public boolean isDone() { 746 return input.isDone(); 747 } 748 749 @Override 750 public O get() throws InterruptedException, ExecutionException { 751 return applyTransformation(input.get()); 752 } 753 754 @Override 755 public O get(long timeout, TimeUnit unit) 756 throws InterruptedException, ExecutionException, TimeoutException { 757 return applyTransformation(input.get(timeout, unit)); 758 } 759 760 private O applyTransformation(I input) throws ExecutionException { 761 try { 762 return function.apply(input); 763 } catch (Throwable t) { 764 throw new ExecutionException(t); 765 } 766 } 767 }; 768 } 769 770 /** 771 * Returns a new {@code ListenableFuture} whose result is the product of calling {@code get()} on 772 * the {@code Future} nested within the given {@code Future}, effectively chaining the futures one 773 * after the other. Example: 774 * 775 * <pre>{@code 776 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 777 * ListenableFuture<String> dereferenced = dereference(nested); 778 * }</pre> 779 * 780 * <p>Most users will not need this method. To create a {@code Future} that completes with the 781 * result of another {@code Future}, create a {@link SettableFuture}, and call {@link 782 * SettableFuture#setFuture setFuture(otherFuture)} on it. 783 * 784 * <p>{@code dereference} has the same cancellation and execution semantics as {@link 785 * #transformAsync(ListenableFuture, AsyncFunction, Executor)}, in that the returned {@code 786 * Future} attempts to keep its cancellation state in sync with both the input {@code Future} and 787 * the nested {@code Future}. The transformation is very lightweight and therefore takes place in 788 * the same thread (either the thread that called {@code dereference}, or the thread in which the 789 * dereferenced future completes). 790 * 791 * @deprecated Use {@link #submitAsync(AsyncCallable, Executor)} or {@link 792 * SettableFuture#setFuture} instead. Or, if you're dereferencing the output of {@link 793 * #transform} or {@link #catching}, switch to {@link #transformAsync} or {@link 794 * #catchingAsync} (and likewise for similar APIs). If the cancellation of this method's 795 * output future races with completion of the outer input future, cancellation may not be 796 * propagated to the inner input future. This method is scheduled to be removed in January 797 * 2018. 798 * @param nested The nested future to transform. 799 * @return A future that holds result of the inner future. 800 * @since 13.0 801 */ 802 @Deprecated 803 @SuppressWarnings({"rawtypes", "unchecked"}) 804 public static <V> ListenableFuture<V> dereference( 805 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 806 return transformAsync( 807 (ListenableFuture) nested, (AsyncFunction) DEREFERENCER, directExecutor()); 808 } 809 810 /** 811 * Helper {@code Function} for {@link #dereference}. 812 */ 813 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 814 new AsyncFunction<ListenableFuture<Object>, Object>() { 815 @Override 816 public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 817 return input; 818 } 819 }; 820 821 /** 822 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 823 * input futures, if all succeed. 824 * 825 * <p>The list of results is in the same order as the input list. 826 * 827 * <p>Canceling this future will attempt to cancel all the component futures, and if any of the 828 * provided futures fails or is canceled, this one is, too. 829 * 830 * @param futures futures to combine 831 * @return a future that provides a list of the results of the component futures 832 * @since 10.0 833 */ 834 @Beta 835 @SafeVarargs 836 public static <V> ListenableFuture<List<V>> allAsList(ListenableFuture<? extends V>... futures) { 837 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 838 } 839 840 /** 841 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 842 * input futures, if all succeed. 843 * 844 * <p>The list of results is in the same order as the input list. 845 * 846 * <p>Canceling this future will attempt to cancel all the component futures, and if any of the 847 * provided futures fails or is canceled, this one is, too. 848 * 849 * @param futures futures to combine 850 * @return a future that provides a list of the results of the component futures 851 * @since 10.0 852 */ 853 @Beta 854 public static <V> ListenableFuture<List<V>> allAsList( 855 Iterable<? extends ListenableFuture<? extends V>> futures) { 856 return new ListFuture<V>(ImmutableList.copyOf(futures), true); 857 } 858 859 /** 860 * Creates a {@link FutureCombiner} that processes the completed futures whether or not they're 861 * successful. 862 * 863 * @since 20.0 864 */ 865 @SafeVarargs 866 public static <V> FutureCombiner<V> whenAllComplete(ListenableFuture<? extends V>... futures) { 867 return new FutureCombiner<V>(false, ImmutableList.copyOf(futures)); 868 } 869 870 /** 871 * Creates a {@link FutureCombiner} that processes the completed futures whether or not they're 872 * successful. 873 * 874 * @since 20.0 875 */ 876 public static <V> FutureCombiner<V> whenAllComplete( 877 Iterable<? extends ListenableFuture<? extends V>> futures) { 878 return new FutureCombiner<V>(false, ImmutableList.copyOf(futures)); 879 } 880 881 /** 882 * Creates a {@link FutureCombiner} requiring that all passed in futures are successful. 883 * 884 * <p>If any input fails, the returned future fails immediately. 885 * 886 * @since 20.0 887 */ 888 @SafeVarargs 889 public static <V> FutureCombiner<V> whenAllSucceed(ListenableFuture<? extends V>... futures) { 890 return new FutureCombiner<V>(true, ImmutableList.copyOf(futures)); 891 } 892 893 /** 894 * Creates a {@link FutureCombiner} requiring that all passed in futures are successful. 895 * 896 * <p>If any input fails, the returned future fails immediately. 897 * 898 * @since 20.0 899 */ 900 public static <V> FutureCombiner<V> whenAllSucceed( 901 Iterable<? extends ListenableFuture<? extends V>> futures) { 902 return new FutureCombiner<V>(true, ImmutableList.copyOf(futures)); 903 } 904 905 /** 906 * A helper to create a new {@code ListenableFuture} whose result is generated from a combination 907 * of input futures. 908 * 909 * <p>See {@link #whenAllComplete} and {@link #whenAllSucceed} for how to instantiate this class. 910 * 911 * <p>Example: 912 * 913 * <pre> {@code 914 * final ListenableFuture<Instant> loginDateFuture = 915 * loginService.findLastLoginDate(username); 916 * final ListenableFuture<List<String>> recentCommandsFuture = 917 * recentCommandsService.findRecentCommands(username); 918 * Callable<UsageHistory> usageComputation = 919 * new Callable<UsageHistory>() { 920 * public UsageHistory call() throws Exception { 921 * return new UsageHistory( 922 * username, loginDateFuture.get(), recentCommandsFuture.get()); 923 * } 924 * }; 925 * ListenableFuture<UsageHistory> usageFuture = 926 * Futures.whenAllSucceed(loginDateFuture, recentCommandsFuture) 927 * .call(usageComputation, executor);}</pre> 928 * 929 * @since 20.0 930 */ 931 @Beta 932 @CanIgnoreReturnValue // TODO(cpovirk): Consider removing, especially if we provide run(Runnable) 933 @GwtCompatible 934 public static final class FutureCombiner<V> { 935 private final boolean allMustSucceed; 936 private final ImmutableList<ListenableFuture<? extends V>> futures; 937 938 private FutureCombiner( 939 boolean allMustSucceed, ImmutableList<ListenableFuture<? extends V>> futures) { 940 this.allMustSucceed = allMustSucceed; 941 this.futures = futures; 942 } 943 944 /** 945 * Creates the {@link ListenableFuture} which will return the result of calling {@link 946 * AsyncCallable#call} in {@code combiner} when all futures complete, using the specified {@code 947 * executor}. 948 * 949 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 950 * cancelled. 951 * 952 * <p>If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code 953 * ExecutionException} will be extracted and returned as the cause of the new {@code 954 * ExecutionException} that gets thrown by the returned combined future. 955 * 956 * <p>Canceling this future will attempt to cancel all the component futures. 957 */ 958 public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner, Executor executor) { 959 return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner); 960 } 961 962 /** 963 * Like {@link #callAsync(AsyncCallable, Executor)} but using {@linkplain 964 * MoreExecutors#directExecutor direct executor}. 965 * 966 * @deprecated Use {@linkplain #callAsync(AsyncCallable, Executor) the overload that requires an 967 * executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, but 968 * consider whether another executor would be safer, as discussed in the {@link 969 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 970 * scheduled to be removed in April 2018. 971 */ 972 @Deprecated 973 public <C> ListenableFuture<C> callAsync(AsyncCallable<C> combiner) { 974 return callAsync(combiner, directExecutor()); 975 } 976 977 /** 978 * Creates the {@link ListenableFuture} which will return the result of calling {@link 979 * Callable#call} in {@code combiner} when all futures complete, using the specified {@code 980 * executor}. 981 * 982 * <p>If the combiner throws a {@code CancellationException}, the returned future will be 983 * cancelled. 984 * 985 * <p>If the combiner throws an {@code ExecutionException}, the cause of the thrown {@code 986 * ExecutionException} will be extracted and returned as the cause of the new {@code 987 * ExecutionException} that gets thrown by the returned combined future. 988 * 989 * <p>Canceling this future will attempt to cancel all the component futures. 990 */ 991 @CanIgnoreReturnValue 992 public <C> ListenableFuture<C> call(Callable<C> combiner, Executor executor) { 993 return new CombinedFuture<C>(futures, allMustSucceed, executor, combiner); 994 } 995 996 /** 997 * Like {@link #call(Callable, Executor)} but using {@linkplain MoreExecutors#directExecutor 998 * direct executor}. 999 * 1000 * @deprecated Use {@linkplain #call(Callable, Executor) the overload that requires an 1001 * executor}. For identical behavior, pass {@link MoreExecutors#directExecutor}, but 1002 * consider whether another executor would be safer, as discussed in the {@link 1003 * ListenableFuture#addListener ListenableFuture.addListener} documentation. This method is 1004 * scheduled to be removed in April 2018. 1005 */ 1006 @CanIgnoreReturnValue 1007 @Deprecated 1008 public <C> ListenableFuture<C> call(Callable<C> combiner) { 1009 return call(combiner, directExecutor()); 1010 } 1011 1012 /* 1013 * TODO(cpovirk): Evaluate demand for a run(Runnable) version. Would it allow us to remove 1014 * @CanIgnoreReturnValue from the call() methods above? 1015 * https://github.com/google/guava/issues/2371 1016 */ 1017 } 1018 1019 /** 1020 * Returns a {@code ListenableFuture} whose result is set from the supplied future when it 1021 * completes. Cancelling the supplied future will also cancel the returned future, but cancelling 1022 * the returned future will have no effect on the supplied future. 1023 * 1024 * @since 15.0 1025 */ 1026 public static <V> ListenableFuture<V> nonCancellationPropagating(ListenableFuture<V> future) { 1027 if (future.isDone()) { 1028 return future; 1029 } 1030 NonCancellationPropagatingFuture<V> output = new NonCancellationPropagatingFuture<>(future); 1031 future.addListener(output, directExecutor()); 1032 return output; 1033 } 1034 1035 /** A wrapped future that does not propagate cancellation to its delegate. */ 1036 private static final class NonCancellationPropagatingFuture<V> 1037 extends AbstractFuture.TrustedFuture<V> implements Runnable { 1038 private ListenableFuture<V> delegate; 1039 1040 NonCancellationPropagatingFuture(final ListenableFuture<V> delegate) { 1041 this.delegate = delegate; 1042 } 1043 1044 @Override 1045 public void run() { 1046 // This prevents cancellation from propagating because we don't assign delegate until 1047 // delegate is already done, so calling cancel() on it is a no-op. 1048 ListenableFuture<V> localDelegate = delegate; 1049 if (localDelegate != null) { 1050 setFuture(localDelegate); 1051 } 1052 } 1053 1054 @Override 1055 protected String pendingToString() { 1056 ListenableFuture<V> localDelegate = delegate; 1057 if (localDelegate != null) { 1058 return "delegate=[" + localDelegate + "]"; 1059 } 1060 return null; 1061 } 1062 1063 @Override 1064 protected void afterDone() { 1065 delegate = null; 1066 } 1067 } 1068 1069 /** 1070 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 1071 * successful input futures. The list of results is in the same order as the input list, and if 1072 * any of the provided futures fails or is canceled, its corresponding position will contain 1073 * {@code null} (which is indistinguishable from the future having a successful value of {@code 1074 * null}). 1075 * 1076 * <p>Canceling this future will attempt to cancel all the component futures. 1077 * 1078 * @param futures futures to combine 1079 * @return a future that provides a list of the results of the component futures 1080 * @since 10.0 1081 */ 1082 @Beta 1083 @SafeVarargs 1084 public static <V> ListenableFuture<List<V>> successfulAsList( 1085 ListenableFuture<? extends V>... futures) { 1086 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1087 } 1088 1089 /** 1090 * Creates a new {@code ListenableFuture} whose value is a list containing the values of all its 1091 * successful input futures. The list of results is in the same order as the input list, and if 1092 * any of the provided futures fails or is canceled, its corresponding position will contain 1093 * {@code null} (which is indistinguishable from the future having a successful value of {@code 1094 * null}). 1095 * 1096 * <p>Canceling this future will attempt to cancel all the component futures. 1097 * 1098 * @param futures futures to combine 1099 * @return a future that provides a list of the results of the component futures 1100 * @since 10.0 1101 */ 1102 @Beta 1103 public static <V> ListenableFuture<List<V>> successfulAsList( 1104 Iterable<? extends ListenableFuture<? extends V>> futures) { 1105 return new ListFuture<V>(ImmutableList.copyOf(futures), false); 1106 } 1107 1108 /** 1109 * Returns a list of delegate futures that correspond to the futures received in the order that 1110 * they complete. Delegate futures return the same value or throw the same exception as the 1111 * corresponding input future returns/throws. 1112 * 1113 * <p>"In the order that they complete" means, for practical purposes, about what you would 1114 * expect, but there are some subtleties. First, we do guarantee that, if the output future at 1115 * index n is done, the output future at index n-1 is also done. (But as usual with futures, some 1116 * listeners for future n may complete before some for future n-1.) However, it is possible, if 1117 * one input completes with result X and another later with result Y, for Y to come before X in 1118 * the output future list. (Such races are impossible to solve without global synchronization of 1119 * all future completions. And they should have little practical impact.) 1120 * 1121 * <p>Cancelling a delegate future propagates to input futures once all the delegates complete, 1122 * either from cancellation or because an input future has completed. If N futures are passed in, 1123 * and M delegates are cancelled, the remaining M input futures will be cancelled once N - M of 1124 * the input futures complete. If all the delegates are cancelled, all the input futures will be 1125 * too. 1126 * 1127 * @since 17.0 1128 */ 1129 @Beta 1130 public static <T> ImmutableList<ListenableFuture<T>> inCompletionOrder( 1131 Iterable<? extends ListenableFuture<? extends T>> futures) { 1132 // Can't use Iterables.toArray because it's not gwt compatible 1133 final Collection<ListenableFuture<? extends T>> collection; 1134 if (futures instanceof Collection) { 1135 collection = (Collection<ListenableFuture<? extends T>>) futures; 1136 } else { 1137 collection = ImmutableList.copyOf(futures); 1138 } 1139 @SuppressWarnings("unchecked") 1140 ListenableFuture<? extends T>[] copy = 1141 (ListenableFuture<? extends T>[]) 1142 collection.toArray(new ListenableFuture[collection.size()]); 1143 final InCompletionOrderState<T> state = new InCompletionOrderState<>(copy); 1144 ImmutableList.Builder<AbstractFuture<T>> delegatesBuilder = ImmutableList.builder(); 1145 for (int i = 0; i < copy.length; i++) { 1146 delegatesBuilder.add(new InCompletionOrderFuture<T>(state)); 1147 } 1148 1149 final ImmutableList<AbstractFuture<T>> delegates = delegatesBuilder.build(); 1150 for (int i = 0; i < copy.length; i++) { 1151 final int localI = i; 1152 copy[i].addListener( 1153 new Runnable() { 1154 @Override 1155 public void run() { 1156 state.recordInputCompletion(delegates, localI); 1157 } 1158 }, 1159 directExecutor()); 1160 } 1161 1162 @SuppressWarnings("unchecked") 1163 ImmutableList<ListenableFuture<T>> delegatesCast = (ImmutableList) delegates; 1164 return delegatesCast; 1165 } 1166 1167 // This can't be a TrustedFuture, because TrustedFuture has clever optimizations that 1168 // mean cancel won't be called if this Future is passed into setFuture, and then 1169 // cancelled. 1170 private static final class InCompletionOrderFuture<T> extends AbstractFuture<T> { 1171 private InCompletionOrderState<T> state; 1172 1173 private InCompletionOrderFuture(InCompletionOrderState<T> state) { 1174 this.state = state; 1175 } 1176 1177 @Override 1178 public boolean cancel(boolean interruptIfRunning) { 1179 InCompletionOrderState<T> localState = state; 1180 if (super.cancel(interruptIfRunning)) { 1181 localState.recordOutputCancellation(interruptIfRunning); 1182 return true; 1183 } 1184 return false; 1185 } 1186 1187 @Override 1188 protected void afterDone() { 1189 state = null; 1190 } 1191 1192 @Override 1193 protected String pendingToString() { 1194 InCompletionOrderState<T> localState = state; 1195 if (localState != null) { 1196 // Don't print the actual array! We don't want inCompletionOrder(list).toString() to have 1197 // quadratic output. 1198 return "inputCount=[" 1199 + localState.inputFutures.length 1200 + "], remaining=[" 1201 + localState.incompleteOutputCount.get() 1202 + "]"; 1203 } 1204 return null; 1205 } 1206 } 1207 1208 private static final class InCompletionOrderState<T> { 1209 // A happens-before edge between the writes of these fields and their reads exists, because 1210 // in order to read these fields, the corresponding write to incompleteOutputCount must have 1211 // been read. 1212 private boolean wasCancelled = false; 1213 private boolean shouldInterrupt = true; 1214 private final AtomicInteger incompleteOutputCount; 1215 private final ListenableFuture<? extends T>[] inputFutures; 1216 private volatile int delegateIndex = 0; 1217 1218 private InCompletionOrderState(ListenableFuture<? extends T>[] inputFutures) { 1219 this.inputFutures = inputFutures; 1220 incompleteOutputCount = new AtomicInteger(inputFutures.length); 1221 } 1222 1223 private void recordOutputCancellation(boolean interruptIfRunning) { 1224 wasCancelled = true; 1225 // If all the futures were cancelled with interruption, cancel the input futures 1226 // with interruption; otherwise cancel without 1227 if (!interruptIfRunning) { 1228 shouldInterrupt = false; 1229 } 1230 recordCompletion(); 1231 } 1232 1233 private void recordInputCompletion( 1234 ImmutableList<AbstractFuture<T>> delegates, int inputFutureIndex) { 1235 ListenableFuture<? extends T> inputFuture = inputFutures[inputFutureIndex]; 1236 // Null out our reference to this future, so it can be GCed 1237 inputFutures[inputFutureIndex] = null; 1238 for (int i = delegateIndex; i < delegates.size(); i++) { 1239 if (delegates.get(i).setFuture(inputFuture)) { 1240 recordCompletion(); 1241 // this is technically unnecessary, but should speed up later accesses 1242 delegateIndex = i + 1; 1243 return; 1244 } 1245 } 1246 // If all the delegates were complete, no reason for the next listener to have to 1247 // go through the whole list. Avoids O(n^2) behavior when the entire output list is 1248 // cancelled. 1249 delegateIndex = delegates.size(); 1250 } 1251 1252 private void recordCompletion() { 1253 if (incompleteOutputCount.decrementAndGet() == 0 && wasCancelled) { 1254 for (ListenableFuture<?> toCancel : inputFutures) { 1255 if (toCancel != null) { 1256 toCancel.cancel(shouldInterrupt); 1257 } 1258 } 1259 } 1260 } 1261 } 1262 1263 /** 1264 * Registers separate success and failure callbacks to be run when the {@code Future}'s 1265 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 1266 * computation is already complete, immediately. 1267 * 1268 * <p>There is no guaranteed ordering of execution of callbacks, but any callback added through 1269 * this method is guaranteed to be called once the computation is complete. 1270 * 1271 * <p>Example: 1272 * 1273 * <pre>{@code 1274 * ListenableFuture<QueryResult> future = ...; 1275 * addCallback(future, 1276 * new FutureCallback<QueryResult>() { 1277 * public void onSuccess(QueryResult result) { 1278 * storeInCache(result); 1279 * } 1280 * public void onFailure(Throwable t) { 1281 * reportError(t); 1282 * } 1283 * }); 1284 * }</pre> 1285 * 1286 * <p>This overload, which does not accept an executor, uses {@code directExecutor}, a dangerous 1287 * choice in some cases. See the discussion in the {@link ListenableFuture#addListener 1288 * ListenableFuture.addListener} documentation. All its warnings about heavyweight listeners are 1289 * also applicable to heavyweight callbacks passed to this method. 1290 * 1291 * <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link 1292 * ListenableFuture#addListener addListener}. 1293 * 1294 * @param future The future attach the callback to. 1295 * @param callback The callback to invoke when {@code future} is completed. 1296 * @since 10.0 1297 * @deprecated Use {@linkplain #addCallback(ListenableFuture, FutureCallback, Executor) the 1298 * overload that requires an executor}. For identical behavior, pass {@link 1299 * MoreExecutors#directExecutor}, but consider whether another executor would be safer, as 1300 * discussed in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1301 * documentation. This method is scheduled to be removed in April 2018. 1302 */ 1303 @Deprecated 1304 public static <V> void addCallback( 1305 ListenableFuture<V> future, FutureCallback<? super V> callback) { 1306 addCallback(future, callback, directExecutor()); 1307 } 1308 1309 /** 1310 * Registers separate success and failure callbacks to be run when the {@code Future}'s 1311 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 1312 * computation is already complete, immediately. 1313 * 1314 * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of 1315 * callbacks, but any callback added through this method is guaranteed to be called once the 1316 * computation is complete. 1317 * 1318 * <p>Example: 1319 * 1320 * <pre>{@code 1321 * ListenableFuture<QueryResult> future = ...; 1322 * Executor e = ... 1323 * addCallback(future, 1324 * new FutureCallback<QueryResult>() { 1325 * public void onSuccess(QueryResult result) { 1326 * storeInCache(result); 1327 * } 1328 * public void onFailure(Throwable t) { 1329 * reportError(t); 1330 * } 1331 * }, e); 1332 * }</pre> 1333 * 1334 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 1335 * the discussion in the {@link ListenableFuture#addListener ListenableFuture.addListener} 1336 * documentation. All its warnings about heavyweight listeners are also applicable to heavyweight 1337 * callbacks passed to this method. 1338 * 1339 * <p>For a more general interface to attach a completion listener to a {@code Future}, see {@link 1340 * ListenableFuture#addListener addListener}. 1341 * 1342 * @param future The future attach the callback to. 1343 * @param callback The callback to invoke when {@code future} is completed. 1344 * @param executor The executor to run {@code callback} when the future completes. 1345 * @since 10.0 1346 */ 1347 public static <V> void addCallback( 1348 final ListenableFuture<V> future, 1349 final FutureCallback<? super V> callback, 1350 Executor executor) { 1351 Preconditions.checkNotNull(callback); 1352 future.addListener(new CallbackListener<V>(future, callback), executor); 1353 } 1354 1355 /** See {@link #addCallback(ListenableFuture, FutureCallback, Executor)} for behavioral notes. */ 1356 private static final class CallbackListener<V> implements Runnable { 1357 final Future<V> future; 1358 final FutureCallback<? super V> callback; 1359 1360 CallbackListener(Future<V> future, FutureCallback<? super V> callback) { 1361 this.future = future; 1362 this.callback = callback; 1363 } 1364 1365 @Override 1366 public void run() { 1367 final V value; 1368 try { 1369 value = getDone(future); 1370 } catch (ExecutionException e) { 1371 callback.onFailure(e.getCause()); 1372 return; 1373 } catch (RuntimeException | Error e) { 1374 callback.onFailure(e); 1375 return; 1376 } 1377 callback.onSuccess(value); 1378 } 1379 1380 @Override 1381 public String toString() { 1382 return MoreObjects.toStringHelper(this).addValue(callback).toString(); 1383 } 1384 } 1385 1386 /** 1387 * Returns the result of the input {@code Future}, which must have already completed. 1388 * 1389 * <p>The benefits of this method are twofold. First, the name "getDone" suggests to readers that 1390 * the {@code Future} is already done. Second, if buggy code calls {@code getDone} on a {@code 1391 * Future} that is still pending, the program will throw instead of block. This can be important 1392 * for APIs like {@link #whenAllComplete whenAllComplete(...)}{@code .}{@link 1393 * FutureCombiner#call(Callable, Executor) call(...)}, where it is easy to use a new input from 1394 * the {@code call} implementation but forget to add it to the arguments of {@code 1395 * whenAllComplete}. 1396 * 1397 * <p>If you are looking for a method to determine whether a given {@code Future} is done, use the 1398 * instance method {@link Future#isDone()}. 1399 * 1400 * @throws ExecutionException if the {@code Future} failed with an exception 1401 * @throws CancellationException if the {@code Future} was cancelled 1402 * @throws IllegalStateException if the {@code Future} is not done 1403 * @since 20.0 1404 */ 1405 @CanIgnoreReturnValue 1406 // TODO(cpovirk): Consider calling getDone() in our own code. 1407 public static <V> V getDone(Future<V> future) throws ExecutionException { 1408 /* 1409 * We throw IllegalStateException, since the call could succeed later. Perhaps we "should" throw 1410 * IllegalArgumentException, since the call could succeed with a different argument. Those 1411 * exceptions' docs suggest that either is acceptable. Google's Java Practices page recommends 1412 * IllegalArgumentException here, in part to keep its recommendation simple: Static methods 1413 * should throw IllegalStateException only when they use static state. 1414 * 1415 * 1416 * Why do we deviate here? The answer: We want for fluentFuture.getDone() to throw the same 1417 * exception as Futures.getDone(fluentFuture). 1418 */ 1419 checkState(future.isDone(), "Future was expected to be done: %s", future); 1420 return getUninterruptibly(future); 1421 } 1422 1423 /** 1424 * Returns the result of {@link Future#get()}, converting most exceptions to a new instance of the 1425 * given checked exception type. This reduces boilerplate for a common use of {@code Future} in 1426 * which it is unnecessary to programmatically distinguish between exception types or to extract 1427 * other information from the exception instance. 1428 * 1429 * <p>Exceptions from {@code Future.get} are treated as follows: 1430 * <ul> 1431 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@code X} if the cause is 1432 * a checked exception, an {@link UncheckedExecutionException} if the cause is a {@code 1433 * RuntimeException}, or an {@link ExecutionError} if the cause is an {@code Error}. 1434 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after restoring the 1435 * interrupt). 1436 * <li>Any {@link CancellationException} is propagated untouched, as is any other {@link 1437 * RuntimeException} (though {@code get} implementations are discouraged from throwing such 1438 * exceptions). 1439 * </ul> 1440 * 1441 * <p>The overall principle is to continue to treat every checked exception as a checked 1442 * exception, every unchecked exception as an unchecked exception, and every error as an error. In 1443 * addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the 1444 * new stack trace matches that of the current thread. 1445 * 1446 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor 1447 * that accepts zero or more arguments, all of type {@code String} or {@code Throwable} 1448 * (preferring constructors with at least one {@code String}) and calling the constructor via 1449 * reflection. If the exception did not already have a cause, one is set by calling {@link 1450 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code 1451 * IllegalArgumentException} is thrown. 1452 * 1453 * @throws X if {@code get} throws any checked exception except for an {@code ExecutionException} 1454 * whose cause is not itself a checked exception 1455 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a 1456 * {@code RuntimeException} as its cause 1457 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1458 * Error} as its cause 1459 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1460 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or 1461 * does not have a suitable constructor 1462 * @since 19.0 (in 10.0 as {@code get}) 1463 */ 1464 @CanIgnoreReturnValue 1465 @GwtIncompatible // reflection 1466 public static <V, X extends Exception> V getChecked(Future<V> future, Class<X> exceptionClass) 1467 throws X { 1468 return FuturesGetChecked.getChecked(future, exceptionClass); 1469 } 1470 1471 /** 1472 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most exceptions to a new 1473 * instance of the given checked exception type. This reduces boilerplate for a common use of 1474 * {@code Future} in which it is unnecessary to programmatically distinguish between exception 1475 * types or to extract other information from the exception instance. 1476 * 1477 * <p>Exceptions from {@code Future.get} are treated as follows: 1478 * <ul> 1479 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@code X} if the cause is 1480 * a checked exception, an {@link UncheckedExecutionException} if the cause is a {@code 1481 * RuntimeException}, or an {@link ExecutionError} if the cause is an {@code Error}. 1482 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after restoring the 1483 * interrupt). 1484 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 1485 * <li>Any {@link CancellationException} is propagated untouched, as is any other {@link 1486 * RuntimeException} (though {@code get} implementations are discouraged from throwing such 1487 * exceptions). 1488 * </ul> 1489 * 1490 * <p>The overall principle is to continue to treat every checked exception as a checked 1491 * exception, every unchecked exception as an unchecked exception, and every error as an error. In 1492 * addition, the cause of any {@code ExecutionException} is wrapped in order to ensure that the 1493 * new stack trace matches that of the current thread. 1494 * 1495 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary public constructor 1496 * that accepts zero or more arguments, all of type {@code String} or {@code Throwable} 1497 * (preferring constructors with at least one {@code String}) and calling the constructor via 1498 * reflection. If the exception did not already have a cause, one is set by calling {@link 1499 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an {@code 1500 * IllegalArgumentException} is thrown. 1501 * 1502 * @throws X if {@code get} throws any checked exception except for an {@code ExecutionException} 1503 * whose cause is not itself a checked exception 1504 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with a 1505 * {@code RuntimeException} as its cause 1506 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1507 * Error} as its cause 1508 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1509 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code RuntimeException} or 1510 * does not have a suitable constructor 1511 * @since 19.0 (in 10.0 as {@code get} and with different parameter order) 1512 */ 1513 @CanIgnoreReturnValue 1514 @GwtIncompatible // reflection 1515 public static <V, X extends Exception> V getChecked( 1516 Future<V> future, Class<X> exceptionClass, long timeout, TimeUnit unit) throws X { 1517 return FuturesGetChecked.getChecked(future, exceptionClass, timeout, unit); 1518 } 1519 1520 /** 1521 * Returns the result of calling {@link Future#get()} uninterruptibly on a task known not to throw 1522 * a checked exception. This makes {@code Future} more suitable for lightweight, fast-running 1523 * tasks that, barring bugs in the code, will not fail. This gives it exception-handling behavior 1524 * similar to that of {@code ForkJoinTask.join}. 1525 * 1526 * <p>Exceptions from {@code Future.get} are treated as follows: 1527 * <ul> 1528 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an {@link 1529 * UncheckedExecutionException} (if the cause is an {@code Exception}) or {@link 1530 * ExecutionError} (if the cause is an {@code Error}). 1531 * <li>Any {@link InterruptedException} causes a retry of the {@code get} call. The interrupt is 1532 * restored before {@code getUnchecked} returns. 1533 * <li>Any {@link CancellationException} is propagated untouched. So is any other {@link 1534 * RuntimeException} ({@code get} implementations are discouraged from throwing such 1535 * exceptions). 1536 * </ul> 1537 * 1538 * <p>The overall principle is to eliminate all checked exceptions: to loop to avoid {@code 1539 * InterruptedException}, to pass through {@code CancellationException}, and to wrap any exception 1540 * from the underlying computation in an {@code UncheckedExecutionException} or {@code 1541 * ExecutionError}. 1542 * 1543 * <p>For an uninterruptible {@code get} that preserves other exceptions, see {@link 1544 * Uninterruptibles#getUninterruptibly(Future)}. 1545 * 1546 * @throws UncheckedExecutionException if {@code get} throws an {@code ExecutionException} with an 1547 * {@code Exception} as its cause 1548 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} with an {@code 1549 * Error} as its cause 1550 * @throws CancellationException if {@code get} throws a {@code CancellationException} 1551 * @since 10.0 1552 */ 1553 @CanIgnoreReturnValue 1554 @GwtIncompatible // TODO 1555 public static <V> V getUnchecked(Future<V> future) { 1556 checkNotNull(future); 1557 try { 1558 return getUninterruptibly(future); 1559 } catch (ExecutionException e) { 1560 wrapAndThrowUnchecked(e.getCause()); 1561 throw new AssertionError(); 1562 } 1563 } 1564 1565 @GwtIncompatible // TODO 1566 private static void wrapAndThrowUnchecked(Throwable cause) { 1567 if (cause instanceof Error) { 1568 throw new ExecutionError((Error) cause); 1569 } 1570 /* 1571 * It's an Exception. (Or it's a non-Error, non-Exception Throwable. From my survey of such 1572 * classes, I believe that most users intended to extend Exception, so we'll treat it like an 1573 * Exception.) 1574 */ 1575 throw new UncheckedExecutionException(cause); 1576 } 1577 1578 /* 1579 * Arguably we don't need a timed getUnchecked because any operation slow enough to require a 1580 * timeout is heavyweight enough to throw a checked exception and therefore be inappropriate to 1581 * use with getUnchecked. Further, it's not clear that converting the checked TimeoutException to 1582 * a RuntimeException -- especially to an UncheckedExecutionException, since it wasn't thrown by 1583 * the computation -- makes sense, and if we don't convert it, the user still has to write a 1584 * try-catch block. 1585 * 1586 * If you think you would use this method, let us know. You might also also look into the 1587 * Fork-Join framework: http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html 1588 */ 1589 1590 /** 1591 * A checked future that uses a function to map from exceptions to the appropriate checked type. 1592 */ 1593 @GwtIncompatible // TODO 1594 private static class MappingCheckedFuture<V, X extends Exception> 1595 extends AbstractCheckedFuture<V, X> { 1596 1597 final Function<? super Exception, X> mapper; 1598 1599 MappingCheckedFuture(ListenableFuture<V> delegate, Function<? super Exception, X> mapper) { 1600 super(delegate); 1601 1602 this.mapper = checkNotNull(mapper); 1603 } 1604 1605 @Override 1606 protected X mapException(Exception e) { 1607 return mapper.apply(e); 1608 } 1609 } 1610}