001 /* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkArgument; 020 import static com.google.common.base.Preconditions.checkNotNull; 021 import static com.google.common.base.Preconditions.checkState; 022 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; 023 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 024 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; 025 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; 026 import static java.lang.Thread.currentThread; 027 import static java.util.Arrays.asList; 028 029 import com.google.common.annotations.Beta; 030 import com.google.common.base.Function; 031 import com.google.common.base.Preconditions; 032 import com.google.common.collect.ImmutableList; 033 import com.google.common.collect.Lists; 034 import com.google.common.collect.Ordering; 035 036 import java.lang.reflect.Constructor; 037 import java.lang.reflect.InvocationTargetException; 038 import java.lang.reflect.UndeclaredThrowableException; 039 import java.util.Arrays; 040 import java.util.List; 041 import java.util.concurrent.BlockingQueue; 042 import java.util.concurrent.CancellationException; 043 import java.util.concurrent.CountDownLatch; 044 import java.util.concurrent.ExecutionException; 045 import java.util.concurrent.Executor; 046 import java.util.concurrent.Future; 047 import java.util.concurrent.LinkedBlockingQueue; 048 import java.util.concurrent.TimeUnit; 049 import java.util.concurrent.TimeoutException; 050 import java.util.concurrent.atomic.AtomicInteger; 051 052 import javax.annotation.Nullable; 053 054 /** 055 * Static utility methods pertaining to the {@link Future} interface. 056 * 057 * <p>Many of these methods use the {@link ListenableFuture} API; consult the 058 * Guava User Guide article on <a href= 059 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> 060 * {@code ListenableFuture}</a>. 061 * 062 * @author Kevin Bourrillion 063 * @author Nishant Thakkar 064 * @author Sven Mawson 065 * @since 1.0 066 */ 067 @Beta 068 public final class Futures { 069 private Futures() {} 070 071 /** 072 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 073 * and a {@link Function} that maps from {@link Exception} instances into the 074 * appropriate checked type. 075 * 076 * <p>The given mapping function will be applied to an 077 * {@link InterruptedException}, a {@link CancellationException}, or an 078 * {@link ExecutionException} with the actual cause of the exception. 079 * See {@link Future#get()} for details on the exceptions thrown. 080 * 081 * @since 9.0 (source-compatible since 1.0) 082 */ 083 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 084 ListenableFuture<V> future, Function<Exception, X> mapper) { 085 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 086 } 087 088 /** 089 * Creates a {@code ListenableFuture} which has its value set immediately upon 090 * construction. The getters just return the value. This {@code Future} can't 091 * be canceled or timed out and its {@code isDone()} method always returns 092 * {@code true}. 093 */ 094 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 095 SettableFuture<V> future = SettableFuture.create(); 096 future.set(value); 097 return future; 098 } 099 100 /** 101 * Returns a {@code CheckedFuture} which has its value set immediately upon 102 * construction. 103 * 104 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 105 * method always returns {@code true}. Calling {@code get()} or {@code 106 * checkedGet()} will immediately return the provided value. 107 */ 108 public static <V, X extends Exception> CheckedFuture<V, X> 109 immediateCheckedFuture(@Nullable V value) { 110 SettableFuture<V> future = SettableFuture.create(); 111 future.set(value); 112 return Futures.makeChecked(future, new Function<Exception, X>() { 113 @Override 114 public X apply(Exception e) { 115 throw new AssertionError("impossible"); 116 } 117 }); 118 } 119 120 /** 121 * Returns a {@code ListenableFuture} which has an exception set immediately 122 * upon construction. 123 * 124 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 125 * method always returns {@code true}. Calling {@code get()} will immediately 126 * throw the provided {@code Throwable} wrapped in an {@code 127 * ExecutionException}. 128 * 129 * @throws Error if the throwable is an {@link Error}. 130 */ 131 public static <V> ListenableFuture<V> immediateFailedFuture( 132 Throwable throwable) { 133 checkNotNull(throwable); 134 SettableFuture<V> future = SettableFuture.create(); 135 future.setException(throwable); 136 return future; 137 } 138 139 /** 140 * Returns a {@code CheckedFuture} which has an exception set immediately upon 141 * construction. 142 * 143 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 144 * method always returns {@code true}. Calling {@code get()} will immediately 145 * throw the provided {@code Throwable} wrapped in an {@code 146 * ExecutionException}, and calling {@code checkedGet()} will throw the 147 * provided exception itself. 148 * 149 * @throws Error if the throwable is an {@link Error}. 150 */ 151 public static <V, X extends Exception> CheckedFuture<V, X> 152 immediateFailedCheckedFuture(final X exception) { 153 checkNotNull(exception); 154 return makeChecked(Futures.<V>immediateFailedFuture(exception), 155 new Function<Exception, X>() { 156 @Override 157 public X apply(Exception e) { 158 return exception; 159 } 160 }); 161 } 162 163 /** 164 * Returns a new {@code ListenableFuture} whose result is asynchronously 165 * derived from the result of the given {@code Future}. More precisely, the 166 * returned {@code Future} takes its result from a {@code Future} produced by 167 * applying the given {@code AsyncFunction} to the result of the original 168 * {@code Future}. Example: 169 * 170 * <pre> {@code 171 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 172 * AsyncFunction<RowKey, QueryResult> queryFunction = 173 * new AsyncFunction<RowKey, QueryResult>() { 174 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 175 * return dataService.read(rowKey); 176 * } 177 * }; 178 * ListenableFuture<QueryResult> queryFuture = 179 * transform(rowKeyFuture, queryFunction); 180 * }</pre> 181 * 182 * <p>Note: This overload of {@code transform} is designed for cases in which 183 * the work of creating the derived {@code Future} is fast and lightweight, 184 * as the method does not accept an {@code Executor} in which to perform the 185 * the work. (The created {@code Future} itself need not complete quickly.) 186 * For heavier operations, this overload carries some caveats: First, the 187 * thread that {@code function.apply} runs in depends on whether the input 188 * {@code Future} is done at the time {@code transform} is called. In 189 * particular, if called late, {@code transform} will run the operation in 190 * the thread that called {@code transform}. Second, {@code function.apply} 191 * may run in an internal thread of the system responsible for the input 192 * {@code Future}, such as an RPC network thread. Finally, during the 193 * execution of a {@code sameThreadExecutor} {@code function.apply}, all 194 * other registered but unexecuted listeners are prevented from running, even 195 * if those listeners are to run in other executors. 196 * 197 * <p>The returned {@code Future} attempts to keep its cancellation state in 198 * sync with that of the input future and that of the future returned by the 199 * function. That is, if the returned {@code Future} is cancelled, it will 200 * attempt to cancel the other two, and if either of the other two is 201 * cancelled, the returned {@code Future} will receive a callback in which it 202 * will attempt to cancel itself. 203 * 204 * @param input The future to transform 205 * @param function A function to transform the result of the input future 206 * to the result of the output future 207 * @return A future that holds result of the function (if the input succeeded) 208 * or the original input's failure (if not) 209 * @since 11.0 210 */ 211 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 212 AsyncFunction<? super I, ? extends O> function) { 213 return transform(input, function, MoreExecutors.sameThreadExecutor()); 214 } 215 216 /** 217 * Returns a new {@code ListenableFuture} whose result is asynchronously 218 * derived from the result of the given {@code Future}. More precisely, the 219 * returned {@code Future} takes its result from a {@code Future} produced by 220 * applying the given {@code AsyncFunction} to the result of the original 221 * {@code Future}. Example: 222 * 223 * <pre> {@code 224 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 225 * AsyncFunction<RowKey, QueryResult> queryFunction = 226 * new AsyncFunction<RowKey, QueryResult>() { 227 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 228 * return dataService.read(rowKey); 229 * } 230 * }; 231 * ListenableFuture<QueryResult> queryFuture = 232 * transform(rowKeyFuture, queryFunction, executor); 233 * }</pre> 234 * 235 * <p>The returned {@code Future} attempts to keep its cancellation state in 236 * sync with that of the input future and that of the future returned by the 237 * chain function. That is, if the returned {@code Future} is cancelled, it 238 * will attempt to cancel the other two, and if either of the other two is 239 * cancelled, the returned {@code Future} will receive a callback in which it 240 * will attempt to cancel itself. 241 * 242 * <p>Note: For cases in which the work of creating the derived future is 243 * fast and lightweight, consider {@linkplain 244 * Futures#transform(ListenableFuture, Function) the other overload} or 245 * explicit use of {@code sameThreadExecutor}. For heavier derivations, this 246 * choice carries some caveats: First, the thread that {@code function.apply} 247 * runs in depends on whether the input {@code Future} is done at the time 248 * {@code transform} is called. In particular, if called late, {@code 249 * transform} will run the operation in the thread that called {@code 250 * transform}. Second, {@code function.apply} may run in an internal thread 251 * of the system responsible for the input {@code Future}, such as an RPC 252 * network thread. Finally, during the execution of a {@code 253 * sameThreadExecutor} {@code function.apply}, all other registered but 254 * unexecuted listeners are prevented from running, even if those listeners 255 * are to run in other executors. 256 * 257 * @param input The future to transform 258 * @param function A function to transform the result of the input future 259 * to the result of the output future 260 * @param executor Executor to run the function in. 261 * @return A future that holds result of the function (if the input succeeded) 262 * or the original input's failure (if not) 263 * @since 11.0 264 */ 265 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 266 AsyncFunction<? super I, ? extends O> function, 267 Executor executor) { 268 ChainingListenableFuture<I, O> output = 269 new ChainingListenableFuture<I, O>(function, input); 270 input.addListener(output, executor); 271 return output; 272 } 273 274 /** 275 * Returns a new {@code ListenableFuture} whose result is the product of 276 * applying the given {@code Function} to the result of the given {@code 277 * Future}. Example: 278 * 279 * <pre> {@code 280 * ListenableFuture<QueryResult> queryFuture = ...; 281 * Function<QueryResult, List<Row>> rowsFunction = 282 * new Function<QueryResult, List<Row>>() { 283 * public List<Row> apply(QueryResult queryResult) { 284 * return queryResult.getRows(); 285 * } 286 * }; 287 * ListenableFuture<List<Row>> rowsFuture = 288 * transform(queryFuture, rowsFunction); 289 * }</pre> 290 * 291 * <p>Note: This overload of {@code transform} is designed for cases in which 292 * the transformation is fast and lightweight, as the method does not accept 293 * an {@code Executor} in which to perform the the work. For heavier 294 * transformations, this overload carries some caveats: First, the thread 295 * that the transformation runs in depends on whether the input {@code 296 * Future} is done at the time {@code transform} is called. In particular, if 297 * called late, {@code transform} will perform the transformation in the 298 * thread that called {@code transform}. Second, transformations may run in 299 * an internal thread of the system responsible for the input {@code Future}, 300 * such as an RPC network thread. Finally, during the execution of a {@code 301 * sameThreadExecutor} transformation, all other registered but unexecuted 302 * listeners are prevented from running, even if those listeners are to run 303 * in other executors. 304 * 305 * <p>The returned {@code Future} attempts to keep its cancellation state in 306 * sync with that of the input future. That is, if the returned {@code Future} 307 * is cancelled, it will attempt to cancel the input, and if the input is 308 * cancelled, the returned {@code Future} will receive a callback in which it 309 * will attempt to cancel itself. 310 * 311 * <p>An example use of this method is to convert a serializable object 312 * returned from an RPC into a POJO. 313 * 314 * @param input The future to transform 315 * @param function A Function to transform the results of the provided future 316 * to the results of the returned future. This will be run in the thread 317 * that notifies input it is complete. 318 * @return A future that holds result of the transformation. 319 * @since 9.0 (in 1.0 as {@code compose}) 320 */ 321 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 322 final Function<? super I, ? extends O> function) { 323 return transform(input, function, MoreExecutors.sameThreadExecutor()); 324 } 325 326 /** 327 * Returns a new {@code ListenableFuture} whose result is the product of 328 * applying the given {@code Function} to the result of the given {@code 329 * Future}. Example: 330 * 331 * <pre> {@code 332 * ListenableFuture<QueryResult> queryFuture = ...; 333 * Function<QueryResult, List<Row>> rowsFunction = 334 * new Function<QueryResult, List<Row>>() { 335 * public List<Row> apply(QueryResult queryResult) { 336 * return queryResult.getRows(); 337 * } 338 * }; 339 * ListenableFuture<List<Row>> rowsFuture = 340 * transform(queryFuture, rowsFunction, executor); 341 * }</pre> 342 * 343 * <p>The returned {@code Future} attempts to keep its cancellation state in 344 * sync with that of the input future. That is, if the returned {@code Future} 345 * is cancelled, it will attempt to cancel the input, and if the input is 346 * cancelled, the returned {@code Future} will receive a callback in which it 347 * will attempt to cancel itself. 348 * 349 * <p>An example use of this method is to convert a serializable object 350 * returned from an RPC into a POJO. 351 * 352 * <p>Note: For cases in which the transformation is fast and lightweight, 353 * consider {@linkplain Futures#transform(ListenableFuture, Function) the 354 * other overload} or explicit use of {@link 355 * MoreExecutors#sameThreadExecutor}. For heavier transformations, this 356 * choice carries some caveats: First, the thread that the transformation 357 * runs in depends on whether the input {@code Future} is done at the time 358 * {@code transform} is called. In particular, if called late, {@code 359 * transform} will perform the transformation in the thread that called 360 * {@code transform}. Second, transformations may run in an internal thread 361 * of the system responsible for the input {@code Future}, such as an RPC 362 * network thread. Finally, during the execution of a {@code 363 * sameThreadExecutor} transformation, all other registered but unexecuted 364 * listeners are prevented from running, even if those listeners are to run 365 * in other executors. 366 * 367 * @param input The future to transform 368 * @param function A Function to transform the results of the provided future 369 * to the results of the returned future. 370 * @param executor Executor to run the function in. 371 * @return A future that holds result of the transformation. 372 * @since 9.0 (in 2.0 as {@code compose}) 373 */ 374 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 375 final Function<? super I, ? extends O> function, Executor executor) { 376 checkNotNull(function); 377 AsyncFunction<I, O> wrapperFunction 378 = new AsyncFunction<I, O>() { 379 @Override public ListenableFuture<O> apply(I input) { 380 O output = function.apply(input); 381 return immediateFuture(output); 382 } 383 }; 384 return transform(input, wrapperFunction, executor); 385 } 386 387 /** 388 * Like {@link #transform(ListenableFuture, Function)} except that the 389 * transformation {@code function} is invoked on each call to 390 * {@link Future#get() get()} on the returned future. 391 * 392 * <p>The returned {@code Future} reflects the input's cancellation 393 * state directly, and any attempt to cancel the returned Future is likewise 394 * passed through to the input Future. 395 * 396 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 397 * only apply the timeout to the execution of the underlying {@code Future}, 398 * <em>not</em> to the execution of the transformation function. 399 * 400 * <p>The primary audience of this method is callers of {@code transform} 401 * who don't have a {@code ListenableFuture} available and 402 * do not mind repeated, lazy function evaluation. 403 * 404 * @param input The future to transform 405 * @param function A Function to transform the results of the provided future 406 * to the results of the returned future. 407 * @return A future that returns the result of the transformation. 408 * @since 10.0 409 */ 410 @Beta 411 public static <I, O> Future<O> lazyTransform(final Future<I> input, 412 final Function<? super I, ? extends O> function) { 413 checkNotNull(input); 414 checkNotNull(function); 415 return new Future<O>() { 416 417 @Override 418 public boolean cancel(boolean mayInterruptIfRunning) { 419 return input.cancel(mayInterruptIfRunning); 420 } 421 422 @Override 423 public boolean isCancelled() { 424 return input.isCancelled(); 425 } 426 427 @Override 428 public boolean isDone() { 429 return input.isDone(); 430 } 431 432 @Override 433 public O get() throws InterruptedException, ExecutionException { 434 return applyTransformation(input.get()); 435 } 436 437 @Override 438 public O get(long timeout, TimeUnit unit) 439 throws InterruptedException, ExecutionException, TimeoutException { 440 return applyTransformation(input.get(timeout, unit)); 441 } 442 443 private O applyTransformation(I input) throws ExecutionException { 444 try { 445 return function.apply(input); 446 } catch (Throwable t) { 447 throw new ExecutionException(t); 448 } 449 } 450 }; 451 } 452 453 /** 454 * An implementation of {@code ListenableFuture} that also implements 455 * {@code Runnable} so that it can be used to nest ListenableFutures. 456 * Once the passed-in {@code ListenableFuture} is complete, it calls the 457 * passed-in {@code Function} to generate the result. 458 * 459 * <p>If the function throws any checked exceptions, they should be wrapped 460 * in a {@code UndeclaredThrowableException} so that this class can get 461 * access to the cause. 462 */ 463 private static class ChainingListenableFuture<I, O> 464 extends AbstractFuture<O> implements Runnable { 465 466 private AsyncFunction<? super I, ? extends O> function; 467 private ListenableFuture<? extends I> inputFuture; 468 private volatile ListenableFuture<? extends O> outputFuture; 469 private final BlockingQueue<Boolean> mayInterruptIfRunningChannel = 470 new LinkedBlockingQueue<Boolean>(1); 471 private final CountDownLatch outputCreated = new CountDownLatch(1); 472 473 private ChainingListenableFuture( 474 AsyncFunction<? super I, ? extends O> function, 475 ListenableFuture<? extends I> inputFuture) { 476 this.function = checkNotNull(function); 477 this.inputFuture = checkNotNull(inputFuture); 478 } 479 480 @Override 481 public boolean cancel(boolean mayInterruptIfRunning) { 482 /* 483 * Our additional cancellation work needs to occur even if 484 * !mayInterruptIfRunning, so we can't move it into interruptTask(). 485 */ 486 if (super.cancel(mayInterruptIfRunning)) { 487 // This should never block since only one thread is allowed to cancel 488 // this Future. 489 putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning); 490 cancel(inputFuture, mayInterruptIfRunning); 491 cancel(outputFuture, mayInterruptIfRunning); 492 return true; 493 } 494 return false; 495 } 496 497 private void cancel(@Nullable Future<?> future, 498 boolean mayInterruptIfRunning) { 499 if (future != null) { 500 future.cancel(mayInterruptIfRunning); 501 } 502 } 503 504 @Override 505 public void run() { 506 try { 507 I sourceResult; 508 try { 509 sourceResult = getUninterruptibly(inputFuture); 510 } catch (CancellationException e) { 511 // Cancel this future and return. 512 // At this point, inputFuture is cancelled and outputFuture doesn't 513 // exist, so the value of mayInterruptIfRunning is irrelevant. 514 cancel(false); 515 return; 516 } catch (ExecutionException e) { 517 // Set the cause of the exception as this future's exception 518 setException(e.getCause()); 519 return; 520 } 521 522 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 523 function.apply(sourceResult); 524 if (isCancelled()) { 525 // Handles the case where cancel was called while the function was 526 // being applied. 527 // There is a gap in cancel(boolean) between calling sync.cancel() 528 // and storing the value of mayInterruptIfRunning, so this thread 529 // needs to block, waiting for that value. 530 outputFuture.cancel( 531 takeUninterruptibly(mayInterruptIfRunningChannel)); 532 this.outputFuture = null; 533 return; 534 } 535 outputFuture.addListener(new Runnable() { 536 @Override 537 public void run() { 538 try { 539 // Here it would have been nice to have had an 540 // UninterruptibleListenableFuture, but we don't want to start a 541 // combinatorial explosion of interfaces, so we have to make do. 542 set(getUninterruptibly(outputFuture)); 543 } catch (CancellationException e) { 544 // Cancel this future and return. 545 // At this point, inputFuture and outputFuture are done, so the 546 // value of mayInterruptIfRunning is irrelevant. 547 cancel(false); 548 return; 549 } catch (ExecutionException e) { 550 // Set the cause of the exception as this future's exception 551 setException(e.getCause()); 552 } finally { 553 // Don't pin inputs beyond completion 554 ChainingListenableFuture.this.outputFuture = null; 555 } 556 } 557 }, MoreExecutors.sameThreadExecutor()); 558 } catch (UndeclaredThrowableException e) { 559 // Set the cause of the exception as this future's exception 560 setException(e.getCause()); 561 } catch (Exception e) { 562 // This exception is irrelevant in this thread, but useful for the 563 // client 564 setException(e); 565 } catch (Error e) { 566 // Propagate errors up ASAP - our superclass will rethrow the error 567 setException(e); 568 } finally { 569 // Don't pin inputs beyond completion 570 function = null; 571 inputFuture = null; 572 // Allow our get routines to examine outputFuture now. 573 outputCreated.countDown(); 574 } 575 } 576 } 577 578 /** 579 * Creates a new {@code ListenableFuture} whose value is a list containing the 580 * values of all its input futures, if all succeed. If any input fails, the 581 * returned future fails. 582 * 583 * <p>The list of results is in the same order as the input list. 584 * 585 * <p>Canceling this future does not cancel any of the component futures; 586 * however, if any of the provided futures fails or is canceled, this one is, 587 * too. 588 * 589 * @param futures futures to combine 590 * @return a future that provides a list of the results of the component 591 * futures 592 * @since 10.0 593 */ 594 @Beta 595 public static <V> ListenableFuture<List<V>> allAsList( 596 ListenableFuture<? extends V>... futures) { 597 return new ListFuture<V>(ImmutableList.copyOf(futures), true, 598 MoreExecutors.sameThreadExecutor()); 599 } 600 601 /** 602 * Creates a new {@code ListenableFuture} whose value is a list containing the 603 * values of all its input futures, if all succeed. If any input fails, the 604 * returned future fails. 605 * 606 * <p>The list of results is in the same order as the input list. 607 * 608 * <p>Canceling this future does not cancel any of the component futures; 609 * however, if any of the provided futures fails or is canceled, this one is, 610 * too. 611 * 612 * @param futures futures to combine 613 * @return a future that provides a list of the results of the component 614 * futures 615 * @since 10.0 616 */ 617 @Beta 618 public static <V> ListenableFuture<List<V>> allAsList( 619 Iterable<? extends ListenableFuture<? extends V>> futures) { 620 return new ListFuture<V>(ImmutableList.copyOf(futures), true, 621 MoreExecutors.sameThreadExecutor()); 622 } 623 624 /** 625 * Creates a new {@code ListenableFuture} whose value is a list containing the 626 * values of all its successful input futures. The list of results is in the 627 * same order as the input list, and if any of the provided futures fails or 628 * is canceled, its corresponding position will contain {@code null} (which is 629 * indistinguishable from the future having a successful value of 630 * {@code null}). 631 * 632 * @param futures futures to combine 633 * @return a future that provides a list of the results of the component 634 * futures 635 * @since 10.0 636 */ 637 @Beta 638 public static <V> ListenableFuture<List<V>> successfulAsList( 639 ListenableFuture<? extends V>... futures) { 640 return new ListFuture<V>(ImmutableList.copyOf(futures), false, 641 MoreExecutors.sameThreadExecutor()); 642 } 643 644 /** 645 * Creates a new {@code ListenableFuture} whose value is a list containing the 646 * values of all its successful input futures. The list of results is in the 647 * same order as the input list, and if any of the provided futures fails or 648 * is canceled, its corresponding position will contain {@code null} (which is 649 * indistinguishable from the future having a successful value of 650 * {@code null}). 651 * 652 * @param futures futures to combine 653 * @return a future that provides a list of the results of the component 654 * futures 655 * @since 10.0 656 */ 657 @Beta 658 public static <V> ListenableFuture<List<V>> successfulAsList( 659 Iterable<? extends ListenableFuture<? extends V>> futures) { 660 return new ListFuture<V>(ImmutableList.copyOf(futures), false, 661 MoreExecutors.sameThreadExecutor()); 662 } 663 664 /** 665 * Registers separate success and failure callbacks to be run when the {@code 666 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 667 * complete} or, if the computation is already complete, immediately. 668 * 669 * <p>There is no guaranteed ordering of execution of callbacks, but any 670 * callback added through this method is guaranteed to be called once the 671 * computation is complete. 672 * 673 * Example: <pre> {@code 674 * ListenableFuture<QueryResult> future = ...; 675 * addCallback(future, 676 * new FutureCallback<QueryResult> { 677 * public void onSuccess(QueryResult result) { 678 * storeInCache(result); 679 * } 680 * public void onFailure(Throwable t) { 681 * reportError(t); 682 * } 683 * });}</pre> 684 * 685 * <p>Note: This overload of {@code addCallback} is designed for cases in 686 * which the callack is fast and lightweight, as the method does not accept 687 * an {@code Executor} in which to perform the the work. For heavier 688 * callbacks, this overload carries some caveats: First, the thread that the 689 * callback runs in depends on whether the input {@code Future} is done at the 690 * time {@code addCallback} is called and on whether the input {@code Future} 691 * is ever cancelled. In particular, {@code addCallback} may execute the 692 * callback in the thread that calls {@code addCallback} or {@code 693 * Future.cancel}. Second, callbacks may run in an internal thread of the 694 * system responsible for the input {@code Future}, such as an RPC network 695 * thread. Finally, during the execution of a {@code sameThreadExecutor} 696 * callback, all other registered but unexecuted listeners are prevented from 697 * running, even if those listeners are to run in other executors. 698 * 699 * <p>For a more general interface to attach a completion listener to a 700 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 701 * 702 * @param future The future attach the callback to. 703 * @param callback The callback to invoke when {@code future} is completed. 704 * @since 10.0 705 */ 706 public static <V> void addCallback(ListenableFuture<V> future, 707 FutureCallback<? super V> callback) { 708 addCallback(future, callback, MoreExecutors.sameThreadExecutor()); 709 } 710 711 /** 712 * Registers separate success and failure callbacks to be run when the {@code 713 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 714 * complete} or, if the computation is already complete, immediately. 715 * 716 * <p>The callback is run in {@code executor}. 717 * There is no guaranteed ordering of execution of callbacks, but any 718 * callback added through this method is guaranteed to be called once the 719 * computation is complete. 720 * 721 * Example: <pre> {@code 722 * ListenableFuture<QueryResult> future = ...; 723 * Executor e = ... 724 * addCallback(future, e, 725 * new FutureCallback<QueryResult> { 726 * public void onSuccess(QueryResult result) { 727 * storeInCache(result); 728 * } 729 * public void onFailure(Throwable t) { 730 * reportError(t); 731 * } 732 * });}</pre> 733 * 734 * When the callback is fast and lightweight consider {@linkplain 735 * Futures#addCallback(ListenableFuture, FutureCallback) the other overload} 736 * or explicit use of {@link MoreExecutors#sameThreadExecutor 737 * sameThreadExecutor}. For heavier callbacks, this choice carries some 738 * caveats: First, the thread that the callback runs in depends on whether 739 * the input {@code Future} is done at the time {@code addCallback} is called 740 * and on whether the input {@code Future} is ever cancelled. In particular, 741 * {@code addCallback} may execute the callback in the thread that calls 742 * {@code addCallback} or {@code Future.cancel}. Second, callbacks may run in 743 * an internal thread of the system responsible for the input {@code Future}, 744 * such as an RPC network thread. Finally, during the execution of a {@code 745 * sameThreadExecutor} callback, all other registered but unexecuted 746 * listeners are prevented from running, even if those listeners are to run 747 * in other executors. 748 * 749 * <p>For a more general interface to attach a completion listener to a 750 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 751 * 752 * @param future The future attach the callback to. 753 * @param callback The callback to invoke when {@code future} is completed. 754 * @param executor The executor to run {@code callback} when the future 755 * completes. 756 * @since 10.0 757 */ 758 public static <V> void addCallback(final ListenableFuture<V> future, 759 final FutureCallback<? super V> callback, Executor executor) { 760 Preconditions.checkNotNull(callback); 761 Runnable callbackListener = new Runnable() { 762 @Override 763 public void run() { 764 try { 765 // TODO(user): (Before Guava release), validate that this 766 // is the thing for IE. 767 V value = getUninterruptibly(future); 768 callback.onSuccess(value); 769 } catch (ExecutionException e) { 770 callback.onFailure(e.getCause()); 771 } catch (RuntimeException e) { 772 callback.onFailure(e); 773 } catch (Error e) { 774 callback.onFailure(e); 775 } 776 } 777 }; 778 future.addListener(callbackListener, executor); 779 } 780 781 /** 782 * Returns the result of {@link Future#get()}, converting most exceptions to a 783 * new instance of the given checked exception type. This reduces boilerplate 784 * for a common use of {@code Future} in which it is unnecessary to 785 * programmatically distinguish between exception types or to extract other 786 * information from the exception instance. 787 * 788 * <p>Exceptions from {@code Future.get} are treated as follows: 789 * <ul> 790 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 791 * {@code X} if the cause is a checked exception, an {@link 792 * UncheckedExecutionException} if the cause is a {@code 793 * RuntimeException}, or an {@link ExecutionError} if the cause is an 794 * {@code Error}. 795 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 796 * restoring the interrupt). 797 * <li>Any {@link CancellationException} is propagated untouched, as is any 798 * other {@link RuntimeException} (though {@code get} implementations are 799 * discouraged from throwing such exceptions). 800 * </ul> 801 * 802 * The overall principle is to continue to treat every checked exception as a 803 * checked exception, every unchecked exception as an unchecked exception, and 804 * every error as an error. In addition, the cause of any {@code 805 * ExecutionException} is wrapped in order to ensure that the new stack trace 806 * matches that of the current thread. 807 * 808 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 809 * public constructor that accepts zero or more arguments, all of type {@code 810 * String} or {@code Throwable} (preferring constructors with at least one 811 * {@code String}) and calling the constructor via reflection. If the 812 * exception did not already have a cause, one is set by calling {@link 813 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 814 * {@code IllegalArgumentException} is thrown. 815 * 816 * @throws X if {@code get} throws any checked exception except for an {@code 817 * ExecutionException} whose cause is not itself a checked exception 818 * @throws UncheckedExecutionException if {@code get} throws an {@code 819 * ExecutionException} with a {@code RuntimeException} as its cause 820 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 821 * with an {@code Error} as its cause 822 * @throws CancellationException if {@code get} throws a {@code 823 * CancellationException} 824 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 825 * RuntimeException} or does not have a suitable constructor 826 * @since 10.0 827 */ 828 @Beta 829 public static <V, X extends Exception> V get( 830 Future<V> future, Class<X> exceptionClass) throws X { 831 checkNotNull(future); 832 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 833 "Futures.get exception type (%s) must not be a RuntimeException", 834 exceptionClass); 835 try { 836 return future.get(); 837 } catch (InterruptedException e) { 838 currentThread().interrupt(); 839 throw newWithCause(exceptionClass, e); 840 } catch (ExecutionException e) { 841 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 842 throw new AssertionError(); 843 } 844 } 845 846 /** 847 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 848 * exceptions to a new instance of the given checked exception type. This 849 * reduces boilerplate for a common use of {@code Future} in which it is 850 * unnecessary to programmatically distinguish between exception types or to 851 * extract other information from the exception instance. 852 * 853 * <p>Exceptions from {@code Future.get} are treated as follows: 854 * <ul> 855 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 856 * {@code X} if the cause is a checked exception, an {@link 857 * UncheckedExecutionException} if the cause is a {@code 858 * RuntimeException}, or an {@link ExecutionError} if the cause is an 859 * {@code Error}. 860 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 861 * restoring the interrupt). 862 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 863 * <li>Any {@link CancellationException} is propagated untouched, as is any 864 * other {@link RuntimeException} (though {@code get} implementations are 865 * discouraged from throwing such exceptions). 866 * </ul> 867 * 868 * The overall principle is to continue to treat every checked exception as a 869 * checked exception, every unchecked exception as an unchecked exception, and 870 * every error as an error. In addition, the cause of any {@code 871 * ExecutionException} is wrapped in order to ensure that the new stack trace 872 * matches that of the current thread. 873 * 874 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 875 * public constructor that accepts zero or more arguments, all of type {@code 876 * String} or {@code Throwable} (preferring constructors with at least one 877 * {@code String}) and calling the constructor via reflection. If the 878 * exception did not already have a cause, one is set by calling {@link 879 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 880 * {@code IllegalArgumentException} is thrown. 881 * 882 * @throws X if {@code get} throws any checked exception except for an {@code 883 * ExecutionException} whose cause is not itself a checked exception 884 * @throws UncheckedExecutionException if {@code get} throws an {@code 885 * ExecutionException} with a {@code RuntimeException} as its cause 886 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 887 * with an {@code Error} as its cause 888 * @throws CancellationException if {@code get} throws a {@code 889 * CancellationException} 890 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 891 * RuntimeException} or does not have a suitable constructor 892 * @since 10.0 893 */ 894 @Beta 895 public static <V, X extends Exception> V get( 896 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 897 throws X { 898 checkNotNull(future); 899 checkNotNull(unit); 900 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 901 "Futures.get exception type (%s) must not be a RuntimeException", 902 exceptionClass); 903 try { 904 return future.get(timeout, unit); 905 } catch (InterruptedException e) { 906 currentThread().interrupt(); 907 throw newWithCause(exceptionClass, e); 908 } catch (TimeoutException e) { 909 throw newWithCause(exceptionClass, e); 910 } catch (ExecutionException e) { 911 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 912 throw new AssertionError(); 913 } 914 } 915 916 private static <X extends Exception> void wrapAndThrowExceptionOrError( 917 Throwable cause, Class<X> exceptionClass) throws X { 918 if (cause instanceof Error) { 919 throw new ExecutionError((Error) cause); 920 } 921 if (cause instanceof RuntimeException) { 922 throw new UncheckedExecutionException(cause); 923 } 924 throw newWithCause(exceptionClass, cause); 925 } 926 927 /** 928 * Returns the result of calling {@link Future#get()} uninterruptibly on a 929 * task known not to throw a checked exception. This makes {@code Future} more 930 * suitable for lightweight, fast-running tasks that, barring bugs in the 931 * code, will not fail. This gives it exception-handling behavior similar to 932 * that of {@code ForkJoinTask.join}. 933 * 934 * <p>Exceptions from {@code Future.get} are treated as follows: 935 * <ul> 936 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 937 * {@link UncheckedExecutionException} (if the cause is an {@code 938 * Exception}) or {@link ExecutionError} (if the cause is an {@code 939 * Error}). 940 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 941 * call. The interrupt is restored before {@code getUnchecked} returns. 942 * <li>Any {@link CancellationException} is propagated untouched. So is any 943 * other {@link RuntimeException} ({@code get} implementations are 944 * discouraged from throwing such exceptions). 945 * </ul> 946 * 947 * The overall principle is to eliminate all checked exceptions: to loop to 948 * avoid {@code InterruptedException}, to pass through {@code 949 * CancellationException}, and to wrap any exception from the underlying 950 * computation in an {@code UncheckedExecutionException} or {@code 951 * ExecutionError}. 952 * 953 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 954 * {@link Uninterruptibles#getUninterruptibly(Future)}. 955 * 956 * @throws UncheckedExecutionException if {@code get} throws an {@code 957 * ExecutionException} with an {@code Exception} as its cause 958 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 959 * with an {@code Error} as its cause 960 * @throws CancellationException if {@code get} throws a {@code 961 * CancellationException} 962 * @since 10.0 963 */ 964 @Beta 965 public static <V> V getUnchecked(Future<V> future) { 966 checkNotNull(future); 967 try { 968 return getUninterruptibly(future); 969 } catch (ExecutionException e) { 970 wrapAndThrowUnchecked(e.getCause()); 971 throw new AssertionError(); 972 } 973 } 974 975 private static void wrapAndThrowUnchecked(Throwable cause) { 976 if (cause instanceof Error) { 977 throw new ExecutionError((Error) cause); 978 } 979 /* 980 * It's a non-Error, non-Exception Throwable. From my survey of such 981 * classes, I believe that most users intended to extend Exception, so we'll 982 * treat it like an Exception. 983 */ 984 throw new UncheckedExecutionException(cause); 985 } 986 987 /* 988 * TODO(user): FutureChecker interface for these to be static methods on? If 989 * so, refer to it in the (static-method) Futures.get documentation 990 */ 991 992 /* 993 * Arguably we don't need a timed getUnchecked because any operation slow 994 * enough to require a timeout is heavyweight enough to throw a checked 995 * exception and therefore be inappropriate to use with getUnchecked. Further, 996 * it's not clear that converting the checked TimeoutException to a 997 * RuntimeException -- especially to an UncheckedExecutionException, since it 998 * wasn't thrown by the computation -- makes sense, and if we don't convert 999 * it, the user still has to write a try-catch block. 1000 * 1001 * If you think you would use this method, let us know. 1002 */ 1003 1004 private static <X extends Exception> X newWithCause( 1005 Class<X> exceptionClass, Throwable cause) { 1006 // getConstructors() guarantees this as long as we don't modify the array. 1007 @SuppressWarnings("unchecked") 1008 List<Constructor<X>> constructors = 1009 (List) Arrays.asList(exceptionClass.getConstructors()); 1010 for (Constructor<X> constructor : preferringStrings(constructors)) { 1011 @Nullable X instance = newFromConstructor(constructor, cause); 1012 if (instance != null) { 1013 if (instance.getCause() == null) { 1014 instance.initCause(cause); 1015 } 1016 return instance; 1017 } 1018 } 1019 throw new IllegalArgumentException( 1020 "No appropriate constructor for exception of type " + exceptionClass 1021 + " in response to chained exception", cause); 1022 } 1023 1024 private static <X extends Exception> List<Constructor<X>> 1025 preferringStrings(List<Constructor<X>> constructors) { 1026 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); 1027 } 1028 1029 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST = 1030 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() { 1031 @Override public Boolean apply(Constructor<?> input) { 1032 return asList(input.getParameterTypes()).contains(String.class); 1033 } 1034 }).reverse(); 1035 1036 @Nullable private static <X> X newFromConstructor( 1037 Constructor<X> constructor, Throwable cause) { 1038 Class<?>[] paramTypes = constructor.getParameterTypes(); 1039 Object[] params = new Object[paramTypes.length]; 1040 for (int i = 0; i < paramTypes.length; i++) { 1041 Class<?> paramType = paramTypes[i]; 1042 if (paramType.equals(String.class)) { 1043 params[i] = cause.toString(); 1044 } else if (paramType.equals(Throwable.class)) { 1045 params[i] = cause; 1046 } else { 1047 return null; 1048 } 1049 } 1050 try { 1051 return constructor.newInstance(params); 1052 } catch (IllegalArgumentException e) { 1053 return null; 1054 } catch (InstantiationException e) { 1055 return null; 1056 } catch (IllegalAccessException e) { 1057 return null; 1058 } catch (InvocationTargetException e) { 1059 return null; 1060 } 1061 } 1062 1063 /** 1064 * Class that implements {@link #allAsList} and {@link #successfulAsList}. 1065 * The idea is to create a (null-filled) List and register a listener with 1066 * each component future to fill out the value in the List when that future 1067 * completes. 1068 */ 1069 private static class ListFuture<V> extends AbstractFuture<List<V>> { 1070 ImmutableList<? extends ListenableFuture<? extends V>> futures; 1071 final boolean allMustSucceed; 1072 final AtomicInteger remaining; 1073 List<V> values; 1074 1075 /** 1076 * Constructor. 1077 * 1078 * @param futures all the futures to build the list from 1079 * @param allMustSucceed whether a single failure or cancellation should 1080 * propagate to this future 1081 * @param listenerExecutor used to run listeners on all the passed in 1082 * futures. 1083 */ 1084 ListFuture( 1085 final ImmutableList<? extends ListenableFuture<? extends V>> futures, 1086 final boolean allMustSucceed, final Executor listenerExecutor) { 1087 this.futures = futures; 1088 this.values = Lists.newArrayListWithCapacity(futures.size()); 1089 this.allMustSucceed = allMustSucceed; 1090 this.remaining = new AtomicInteger(futures.size()); 1091 1092 init(listenerExecutor); 1093 } 1094 1095 private void init(final Executor listenerExecutor) { 1096 // First, schedule cleanup to execute when the Future is done. 1097 addListener(new Runnable() { 1098 @Override 1099 public void run() { 1100 // By now the values array has either been set as the Future's value, 1101 // or (in case of failure) is no longer useful. 1102 ListFuture.this.values = null; 1103 1104 // Let go of the memory held by other futures 1105 ListFuture.this.futures = null; 1106 } 1107 }, MoreExecutors.sameThreadExecutor()); 1108 1109 // Now begin the "real" initialization. 1110 1111 // Corner case: List is empty. 1112 if (futures.isEmpty()) { 1113 set(Lists.newArrayList(values)); 1114 return; 1115 } 1116 1117 // Populate the results list with null initially. 1118 for (int i = 0; i < futures.size(); ++i) { 1119 values.add(null); 1120 } 1121 1122 // Register a listener on each Future in the list to update 1123 // the state of this future. 1124 // Note that if all the futures on the list are done prior to completing 1125 // this loop, the last call to addListener() will callback to 1126 // setOneValue(), transitively call our cleanup listener, and set 1127 // this.futures to null. 1128 // We store a reference to futures to avoid the NPE. 1129 ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures; 1130 for (int i = 0; i < localFutures.size(); i++) { 1131 final ListenableFuture<? extends V> listenable = localFutures.get(i); 1132 final int index = i; 1133 listenable.addListener(new Runnable() { 1134 @Override 1135 public void run() { 1136 setOneValue(index, listenable); 1137 } 1138 }, listenerExecutor); 1139 } 1140 } 1141 1142 /** 1143 * Sets the value at the given index to that of the given future. 1144 */ 1145 private void setOneValue(int index, Future<? extends V> future) { 1146 List<V> localValues = values; 1147 if (isDone() || localValues == null) { 1148 // Some other future failed or has been cancelled, causing this one to 1149 // also be cancelled or have an exception set. This should only happen 1150 // if allMustSucceed is true. 1151 checkState(allMustSucceed, 1152 "Future was done before all dependencies completed"); 1153 return; 1154 } 1155 1156 try { 1157 checkState(future.isDone(), 1158 "Tried to set value from future which is not done"); 1159 localValues.set(index, getUninterruptibly(future)); 1160 } catch (CancellationException e) { 1161 if (allMustSucceed) { 1162 // Set ourselves as cancelled. Let the input futures keep running 1163 // as some of them may be used elsewhere. 1164 // (Currently we don't override interruptTask, so 1165 // mayInterruptIfRunning==false isn't technically necessary.) 1166 cancel(false); 1167 } 1168 } catch (ExecutionException e) { 1169 if (allMustSucceed) { 1170 // As soon as the first one fails, throw the exception up. 1171 // The result of all other inputs is then ignored. 1172 setException(e.getCause()); 1173 } 1174 } catch (RuntimeException e) { 1175 if (allMustSucceed) { 1176 setException(e); 1177 } 1178 } catch (Error e) { 1179 // Propagate errors up ASAP - our superclass will rethrow the error 1180 setException(e); 1181 } finally { 1182 int newRemaining = remaining.decrementAndGet(); 1183 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 1184 if (newRemaining == 0) { 1185 localValues = values; 1186 if (localValues != null) { 1187 set(Lists.newArrayList(localValues)); 1188 } else { 1189 checkState(isDone()); 1190 } 1191 } 1192 } 1193 } 1194 1195 } 1196 1197 /** 1198 * A checked future that uses a function to map from exceptions to the 1199 * appropriate checked type. 1200 */ 1201 private static class MappingCheckedFuture<V, X extends Exception> extends 1202 AbstractCheckedFuture<V, X> { 1203 1204 final Function<Exception, X> mapper; 1205 1206 MappingCheckedFuture(ListenableFuture<V> delegate, 1207 Function<Exception, X> mapper) { 1208 super(delegate); 1209 1210 this.mapper = checkNotNull(mapper); 1211 } 1212 1213 @Override 1214 protected X mapException(Exception e) { 1215 return mapper.apply(e); 1216 } 1217 } 1218 }