001 /* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkArgument; 020 import static com.google.common.base.Preconditions.checkNotNull; 021 import static com.google.common.base.Preconditions.checkState; 022 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; 023 import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly; 024 import static com.google.common.util.concurrent.Uninterruptibles.putUninterruptibly; 025 import static com.google.common.util.concurrent.Uninterruptibles.takeUninterruptibly; 026 import static java.lang.Thread.currentThread; 027 import static java.util.Arrays.asList; 028 029 import com.google.common.annotations.Beta; 030 import com.google.common.base.Function; 031 import com.google.common.base.Preconditions; 032 import com.google.common.collect.ImmutableList; 033 import com.google.common.collect.Lists; 034 import com.google.common.collect.Ordering; 035 036 import java.lang.reflect.Constructor; 037 import java.lang.reflect.InvocationTargetException; 038 import java.lang.reflect.UndeclaredThrowableException; 039 import java.util.Arrays; 040 import java.util.List; 041 import java.util.concurrent.BlockingQueue; 042 import java.util.concurrent.CancellationException; 043 import java.util.concurrent.CountDownLatch; 044 import java.util.concurrent.ExecutionException; 045 import java.util.concurrent.Executor; 046 import java.util.concurrent.Future; 047 import java.util.concurrent.LinkedBlockingQueue; 048 import java.util.concurrent.TimeUnit; 049 import java.util.concurrent.TimeoutException; 050 import java.util.concurrent.atomic.AtomicInteger; 051 052 import javax.annotation.Nullable; 053 054 /** 055 * Static utility methods pertaining to the {@link Future} interface. 056 * 057 * <p>Many of these methods use the {@link ListenableFuture} API; consult the 058 * Guava User Guide article on <a href= 059 * "http://code.google.com/p/guava-libraries/wiki/ListenableFutureExplained"> 060 * {@code ListenableFuture}</a>. 061 * 062 * @author Kevin Bourrillion 063 * @author Nishant Thakkar 064 * @author Sven Mawson 065 * @since 1.0 066 */ 067 @Beta 068 public final class Futures { 069 private Futures() {} 070 071 /** 072 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 073 * and a {@link Function} that maps from {@link Exception} instances into the 074 * appropriate checked type. 075 * 076 * <p>The given mapping function will be applied to an 077 * {@link InterruptedException}, a {@link CancellationException}, or an 078 * {@link ExecutionException} with the actual cause of the exception. 079 * See {@link Future#get()} for details on the exceptions thrown. 080 * 081 * @since 9.0 (source-compatible since 1.0) 082 */ 083 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 084 ListenableFuture<V> future, Function<Exception, X> mapper) { 085 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 086 } 087 088 /** 089 * Creates a {@code ListenableFuture} which has its value set immediately upon 090 * construction. The getters just return the value. This {@code Future} can't 091 * be canceled or timed out and its {@code isDone()} method always returns 092 * {@code true}. 093 */ 094 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 095 SettableFuture<V> future = SettableFuture.create(); 096 future.set(value); 097 return future; 098 } 099 100 /** 101 * Returns a {@code CheckedFuture} which has its value set immediately upon 102 * construction. 103 * 104 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 105 * method always returns {@code true}. Calling {@code get()} or {@code 106 * checkedGet()} will immediately return the provided value. 107 */ 108 public static <V, X extends Exception> CheckedFuture<V, X> 109 immediateCheckedFuture(@Nullable V value) { 110 SettableFuture<V> future = SettableFuture.create(); 111 future.set(value); 112 return Futures.makeChecked(future, new Function<Exception, X>() { 113 @Override 114 public X apply(Exception e) { 115 throw new AssertionError("impossible"); 116 } 117 }); 118 } 119 120 /** 121 * Returns a {@code ListenableFuture} which has an exception set immediately 122 * upon construction. 123 * 124 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 125 * method always returns {@code true}. Calling {@code get()} will immediately 126 * throw the provided {@code Throwable} wrapped in an {@code 127 * ExecutionException}. 128 * 129 * @throws Error if the throwable is an {@link Error}. 130 */ 131 public static <V> ListenableFuture<V> immediateFailedFuture( 132 Throwable throwable) { 133 checkNotNull(throwable); 134 SettableFuture<V> future = SettableFuture.create(); 135 future.setException(throwable); 136 return future; 137 } 138 139 /** 140 * Returns a {@code CheckedFuture} which has an exception set immediately upon 141 * construction. 142 * 143 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 144 * method always returns {@code true}. Calling {@code get()} will immediately 145 * throw the provided {@code Throwable} wrapped in an {@code 146 * ExecutionException}, and calling {@code checkedGet()} will throw the 147 * provided exception itself. 148 * 149 * @throws Error if the throwable is an {@link Error}. 150 */ 151 public static <V, X extends Exception> CheckedFuture<V, X> 152 immediateFailedCheckedFuture(final X exception) { 153 checkNotNull(exception); 154 return makeChecked(Futures.<V>immediateFailedFuture(exception), 155 new Function<Exception, X>() { 156 @Override 157 public X apply(Exception e) { 158 return exception; 159 } 160 }); 161 } 162 163 /** 164 * Returns a new {@code ListenableFuture} whose result is asynchronously 165 * derived from the result of the given {@code Future}. More precisely, the 166 * returned {@code Future} takes its result from a {@code Future} produced by 167 * applying the given {@code AsyncFunction} to the result of the original 168 * {@code Future}. Example: 169 * 170 * <pre> {@code 171 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 172 * AsyncFunction<RowKey, QueryResult> queryFunction = 173 * new AsyncFunction<RowKey, QueryResult>() { 174 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 175 * return dataService.read(rowKey); 176 * } 177 * }; 178 * ListenableFuture<QueryResult> queryFuture = 179 * transform(rowKeyFuture, queryFunction); 180 * }</pre> 181 * 182 * Note: If the derived {@code Future} is slow or heavyweight to create 183 * (whether the {@code Future} itself is slow or heavyweight to complete is 184 * irrelevant), consider {@linkplain #transform(ListenableFuture, 185 * AsyncFunction, Executor) supplying an executor}. If you do not supply an 186 * executor, {@code transform} will use {@link 187 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 188 * caveats for heavier operations. For example, the call to {@code 189 * function.apply} may run on an unpredictable or undesirable thread: 190 * 191 * <ul> 192 * <li>If the input {@code Future} is done at the time {@code transform} is 193 * called, {@code transform} will call {@code function.apply} inline. 194 * <li>If the input {@code Future} is not yet done, {@code transform} will 195 * schedule {@code function.apply} to be run by the thread that completes the 196 * input {@code Future}, which may be an internal system thread such as an 197 * RPC network thread. 198 * </ul> 199 * 200 * Also note that, regardless of which thread executes {@code 201 * function.apply}, all other registered but unexecuted listeners are 202 * prevented from running during its execution, even if those listeners are 203 * to run in other executors. 204 * 205 * <p>The returned {@code Future} attempts to keep its cancellation state in 206 * sync with that of the input future and that of the future returned by the 207 * function. That is, if the returned {@code Future} is cancelled, it will 208 * attempt to cancel the other two, and if either of the other two is 209 * cancelled, the returned {@code Future} will receive a callback in which it 210 * will attempt to cancel itself. 211 * 212 * @param input The future to transform 213 * @param function A function to transform the result of the input future 214 * to the result of the output future 215 * @return A future that holds result of the function (if the input succeeded) 216 * or the original input's failure (if not) 217 * @since 11.0 218 */ 219 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 220 AsyncFunction<? super I, ? extends O> function) { 221 return transform(input, function, MoreExecutors.sameThreadExecutor()); 222 } 223 224 /** 225 * Returns a new {@code ListenableFuture} whose result is asynchronously 226 * derived from the result of the given {@code Future}. More precisely, the 227 * returned {@code Future} takes its result from a {@code Future} produced by 228 * applying the given {@code AsyncFunction} to the result of the original 229 * {@code Future}. Example: 230 * 231 * <pre> {@code 232 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 233 * AsyncFunction<RowKey, QueryResult> queryFunction = 234 * new AsyncFunction<RowKey, QueryResult>() { 235 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 236 * return dataService.read(rowKey); 237 * } 238 * }; 239 * ListenableFuture<QueryResult> queryFuture = 240 * transform(rowKeyFuture, queryFunction, executor); 241 * }</pre> 242 * 243 * <p>The returned {@code Future} attempts to keep its cancellation state in 244 * sync with that of the input future and that of the future returned by the 245 * chain function. That is, if the returned {@code Future} is cancelled, it 246 * will attempt to cancel the other two, and if either of the other two is 247 * cancelled, the returned {@code Future} will receive a callback in which it 248 * will attempt to cancel itself. 249 * 250 * <p>When the execution of {@code function.apply} is fast and lightweight 251 * (though the {@code Future} it returns need not meet these criteria), 252 * consider {@linkplain #transform(ListenableFuture, AsyncFunction) omitting 253 * the executor} or explicitly specifying {@code sameThreadExecutor}. 254 * However, be aware of the caveats documented in the link above. 255 * 256 * @param input The future to transform 257 * @param function A function to transform the result of the input future 258 * to the result of the output future 259 * @param executor Executor to run the function in. 260 * @return A future that holds result of the function (if the input succeeded) 261 * or the original input's failure (if not) 262 * @since 11.0 263 */ 264 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 265 AsyncFunction<? super I, ? extends O> function, 266 Executor executor) { 267 ChainingListenableFuture<I, O> output = 268 new ChainingListenableFuture<I, O>(function, input); 269 input.addListener(output, executor); 270 return output; 271 } 272 273 /** 274 * Returns a new {@code ListenableFuture} whose result is the product of 275 * applying the given {@code Function} to the result of the given {@code 276 * Future}. Example: 277 * 278 * <pre> {@code 279 * ListenableFuture<QueryResult> queryFuture = ...; 280 * Function<QueryResult, List<Row>> rowsFunction = 281 * new Function<QueryResult, List<Row>>() { 282 * public List<Row> apply(QueryResult queryResult) { 283 * return queryResult.getRows(); 284 * } 285 * }; 286 * ListenableFuture<List<Row>> rowsFuture = 287 * transform(queryFuture, rowsFunction); 288 * }</pre> 289 * 290 * Note: If the transformation is slow or heavyweight, consider {@linkplain 291 * #transform(ListenableFuture, Function, Executor) supplying an executor}. 292 * If you do not supply an executor, {@code transform} will use {@link 293 * MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries some 294 * caveats for heavier operations. For example, the call to {@code 295 * function.apply} may run on an unpredictable or undesirable thread: 296 * 297 * <ul> 298 * <li>If the input {@code Future} is done at the time {@code transform} is 299 * called, {@code transform} will call {@code function.apply} inline. 300 * <li>If the input {@code Future} is not yet done, {@code transform} will 301 * schedule {@code function.apply} to be run by the thread that completes the 302 * input {@code Future}, which may be an internal system thread such as an 303 * RPC network thread. 304 * </ul> 305 * 306 * Also note that, regardless of which thread executes {@code 307 * function.apply}, all other registered but unexecuted listeners are 308 * prevented from running during its execution, even if those listeners are 309 * to run in other executors. 310 * 311 * <p>The returned {@code Future} attempts to keep its cancellation state in 312 * sync with that of the input future. That is, if the returned {@code Future} 313 * is cancelled, it will attempt to cancel the input, and if the input is 314 * cancelled, the returned {@code Future} will receive a callback in which it 315 * will attempt to cancel itself. 316 * 317 * <p>An example use of this method is to convert a serializable object 318 * returned from an RPC into a POJO. 319 * 320 * @param input The future to transform 321 * @param function A Function to transform the results of the provided future 322 * to the results of the returned future. This will be run in the thread 323 * that notifies input it is complete. 324 * @return A future that holds result of the transformation. 325 * @since 9.0 (in 1.0 as {@code compose}) 326 */ 327 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 328 final Function<? super I, ? extends O> function) { 329 return transform(input, function, MoreExecutors.sameThreadExecutor()); 330 } 331 332 /** 333 * Returns a new {@code ListenableFuture} whose result is the product of 334 * applying the given {@code Function} to the result of the given {@code 335 * Future}. Example: 336 * 337 * <pre> {@code 338 * ListenableFuture<QueryResult> queryFuture = ...; 339 * Function<QueryResult, List<Row>> rowsFunction = 340 * new Function<QueryResult, List<Row>>() { 341 * public List<Row> apply(QueryResult queryResult) { 342 * return queryResult.getRows(); 343 * } 344 * }; 345 * ListenableFuture<List<Row>> rowsFuture = 346 * transform(queryFuture, rowsFunction, executor); 347 * }</pre> 348 * 349 * <p>The returned {@code Future} attempts to keep its cancellation state in 350 * sync with that of the input future. That is, if the returned {@code Future} 351 * is cancelled, it will attempt to cancel the input, and if the input is 352 * cancelled, the returned {@code Future} will receive a callback in which it 353 * will attempt to cancel itself. 354 * 355 * <p>An example use of this method is to convert a serializable object 356 * returned from an RPC into a POJO. 357 * 358 * <p>When the transformation is fast and lightweight, consider {@linkplain 359 * #transform(ListenableFuture, Function) omitting the executor} or 360 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 361 * caveats documented in the link above. 362 * 363 * @param input The future to transform 364 * @param function A Function to transform the results of the provided future 365 * to the results of the returned future. 366 * @param executor Executor to run the function in. 367 * @return A future that holds result of the transformation. 368 * @since 9.0 (in 2.0 as {@code compose}) 369 */ 370 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> input, 371 final Function<? super I, ? extends O> function, Executor executor) { 372 checkNotNull(function); 373 AsyncFunction<I, O> wrapperFunction 374 = new AsyncFunction<I, O>() { 375 @Override public ListenableFuture<O> apply(I input) { 376 O output = function.apply(input); 377 return immediateFuture(output); 378 } 379 }; 380 return transform(input, wrapperFunction, executor); 381 } 382 383 /** 384 * Like {@link #transform(ListenableFuture, Function)} except that the 385 * transformation {@code function} is invoked on each call to 386 * {@link Future#get() get()} on the returned future. 387 * 388 * <p>The returned {@code Future} reflects the input's cancellation 389 * state directly, and any attempt to cancel the returned Future is likewise 390 * passed through to the input Future. 391 * 392 * <p>Note that calls to {@linkplain Future#get(long, TimeUnit) timed get} 393 * only apply the timeout to the execution of the underlying {@code Future}, 394 * <em>not</em> to the execution of the transformation function. 395 * 396 * <p>The primary audience of this method is callers of {@code transform} 397 * who don't have a {@code ListenableFuture} available and 398 * do not mind repeated, lazy function evaluation. 399 * 400 * @param input The future to transform 401 * @param function A Function to transform the results of the provided future 402 * to the results of the returned future. 403 * @return A future that returns the result of the transformation. 404 * @since 10.0 405 */ 406 @Beta 407 public static <I, O> Future<O> lazyTransform(final Future<I> input, 408 final Function<? super I, ? extends O> function) { 409 checkNotNull(input); 410 checkNotNull(function); 411 return new Future<O>() { 412 413 @Override 414 public boolean cancel(boolean mayInterruptIfRunning) { 415 return input.cancel(mayInterruptIfRunning); 416 } 417 418 @Override 419 public boolean isCancelled() { 420 return input.isCancelled(); 421 } 422 423 @Override 424 public boolean isDone() { 425 return input.isDone(); 426 } 427 428 @Override 429 public O get() throws InterruptedException, ExecutionException { 430 return applyTransformation(input.get()); 431 } 432 433 @Override 434 public O get(long timeout, TimeUnit unit) 435 throws InterruptedException, ExecutionException, TimeoutException { 436 return applyTransformation(input.get(timeout, unit)); 437 } 438 439 private O applyTransformation(I input) throws ExecutionException { 440 try { 441 return function.apply(input); 442 } catch (Throwable t) { 443 throw new ExecutionException(t); 444 } 445 } 446 }; 447 } 448 449 /** 450 * An implementation of {@code ListenableFuture} that also implements 451 * {@code Runnable} so that it can be used to nest ListenableFutures. 452 * Once the passed-in {@code ListenableFuture} is complete, it calls the 453 * passed-in {@code Function} to generate the result. 454 * 455 * <p>If the function throws any checked exceptions, they should be wrapped 456 * in a {@code UndeclaredThrowableException} so that this class can get 457 * access to the cause. 458 */ 459 private static class ChainingListenableFuture<I, O> 460 extends AbstractFuture<O> implements Runnable { 461 462 private AsyncFunction<? super I, ? extends O> function; 463 private ListenableFuture<? extends I> inputFuture; 464 private volatile ListenableFuture<? extends O> outputFuture; 465 private final BlockingQueue<Boolean> mayInterruptIfRunningChannel = 466 new LinkedBlockingQueue<Boolean>(1); 467 private final CountDownLatch outputCreated = new CountDownLatch(1); 468 469 private ChainingListenableFuture( 470 AsyncFunction<? super I, ? extends O> function, 471 ListenableFuture<? extends I> inputFuture) { 472 this.function = checkNotNull(function); 473 this.inputFuture = checkNotNull(inputFuture); 474 } 475 476 @Override 477 public boolean cancel(boolean mayInterruptIfRunning) { 478 /* 479 * Our additional cancellation work needs to occur even if 480 * !mayInterruptIfRunning, so we can't move it into interruptTask(). 481 */ 482 if (super.cancel(mayInterruptIfRunning)) { 483 // This should never block since only one thread is allowed to cancel 484 // this Future. 485 putUninterruptibly(mayInterruptIfRunningChannel, mayInterruptIfRunning); 486 cancel(inputFuture, mayInterruptIfRunning); 487 cancel(outputFuture, mayInterruptIfRunning); 488 return true; 489 } 490 return false; 491 } 492 493 private void cancel(@Nullable Future<?> future, 494 boolean mayInterruptIfRunning) { 495 if (future != null) { 496 future.cancel(mayInterruptIfRunning); 497 } 498 } 499 500 @Override 501 public void run() { 502 try { 503 I sourceResult; 504 try { 505 sourceResult = getUninterruptibly(inputFuture); 506 } catch (CancellationException e) { 507 // Cancel this future and return. 508 // At this point, inputFuture is cancelled and outputFuture doesn't 509 // exist, so the value of mayInterruptIfRunning is irrelevant. 510 cancel(false); 511 return; 512 } catch (ExecutionException e) { 513 // Set the cause of the exception as this future's exception 514 setException(e.getCause()); 515 return; 516 } 517 518 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 519 function.apply(sourceResult); 520 if (isCancelled()) { 521 // Handles the case where cancel was called while the function was 522 // being applied. 523 // There is a gap in cancel(boolean) between calling sync.cancel() 524 // and storing the value of mayInterruptIfRunning, so this thread 525 // needs to block, waiting for that value. 526 outputFuture.cancel( 527 takeUninterruptibly(mayInterruptIfRunningChannel)); 528 this.outputFuture = null; 529 return; 530 } 531 outputFuture.addListener(new Runnable() { 532 @Override 533 public void run() { 534 try { 535 // Here it would have been nice to have had an 536 // UninterruptibleListenableFuture, but we don't want to start a 537 // combinatorial explosion of interfaces, so we have to make do. 538 set(getUninterruptibly(outputFuture)); 539 } catch (CancellationException e) { 540 // Cancel this future and return. 541 // At this point, inputFuture and outputFuture are done, so the 542 // value of mayInterruptIfRunning is irrelevant. 543 cancel(false); 544 return; 545 } catch (ExecutionException e) { 546 // Set the cause of the exception as this future's exception 547 setException(e.getCause()); 548 } finally { 549 // Don't pin inputs beyond completion 550 ChainingListenableFuture.this.outputFuture = null; 551 } 552 } 553 }, MoreExecutors.sameThreadExecutor()); 554 } catch (UndeclaredThrowableException e) { 555 // Set the cause of the exception as this future's exception 556 setException(e.getCause()); 557 } catch (Exception e) { 558 // This exception is irrelevant in this thread, but useful for the 559 // client 560 setException(e); 561 } catch (Error e) { 562 // Propagate errors up ASAP - our superclass will rethrow the error 563 setException(e); 564 } finally { 565 // Don't pin inputs beyond completion 566 function = null; 567 inputFuture = null; 568 // Allow our get routines to examine outputFuture now. 569 outputCreated.countDown(); 570 } 571 } 572 } 573 574 /** 575 * Returns a new {@code ListenableFuture} whose result is the product of 576 * calling {@code get()} on the {@code Future} nested within the given {@code 577 * Future}, effectively chaining the futures one after the other. Example: 578 * 579 * <pre> {@code 580 * SettableFuture<ListenableFuture<String>> nested = SettableFuture.create(); 581 * ListenableFuture<String> dereferenced = dereference(nested); 582 * }</pre> 583 * 584 * <p>This call has the same cancellation and execution semantics as {@link 585 * #transform(ListenableFuture, AsyncFunction)}, in that the returned {@code 586 * Future} attempts to keep its cancellation state in sync with both the 587 * input {@code Future} and the nested {@code Future}. The transformation 588 * is very lightweight and therefore takes place in the thread that called 589 * {@code dereference}. 590 * 591 * @param nested The nested future to transform. 592 * @return A future that holds result of the inner future. 593 * @since 13.0 594 */ 595 @Beta 596 @SuppressWarnings({"rawtypes", "unchecked"}) 597 public static <V> ListenableFuture<V> dereference( 598 ListenableFuture<? extends ListenableFuture<? extends V>> nested) { 599 return Futures.transform((ListenableFuture) nested, (AsyncFunction) DEREFERENCER); 600 } 601 602 /** 603 * Helper {@code Function} for {@link #dereference}. 604 */ 605 private static final AsyncFunction<ListenableFuture<Object>, Object> DEREFERENCER = 606 new AsyncFunction<ListenableFuture<Object>, Object>() { 607 @Override public ListenableFuture<Object> apply(ListenableFuture<Object> input) { 608 return input; 609 } 610 }; 611 612 /** 613 * Creates a new {@code ListenableFuture} whose value is a list containing the 614 * values of all its input futures, if all succeed. If any input fails, the 615 * returned future fails. 616 * 617 * <p>The list of results is in the same order as the input list. 618 * 619 * <p>Canceling this future does not cancel any of the component futures; 620 * however, if any of the provided futures fails or is canceled, this one is, 621 * too. 622 * 623 * @param futures futures to combine 624 * @return a future that provides a list of the results of the component 625 * futures 626 * @since 10.0 627 */ 628 @Beta 629 public static <V> ListenableFuture<List<V>> allAsList( 630 ListenableFuture<? extends V>... futures) { 631 return new ListFuture<V>(ImmutableList.copyOf(futures), true, 632 MoreExecutors.sameThreadExecutor()); 633 } 634 635 /** 636 * Creates a new {@code ListenableFuture} whose value is a list containing the 637 * values of all its input futures, if all succeed. If any input fails, the 638 * returned future fails. 639 * 640 * <p>The list of results is in the same order as the input list. 641 * 642 * <p>Canceling this future does not cancel any of the component futures; 643 * however, if any of the provided futures fails or is canceled, this one is, 644 * too. 645 * 646 * @param futures futures to combine 647 * @return a future that provides a list of the results of the component 648 * futures 649 * @since 10.0 650 */ 651 @Beta 652 public static <V> ListenableFuture<List<V>> allAsList( 653 Iterable<? extends ListenableFuture<? extends V>> futures) { 654 return new ListFuture<V>(ImmutableList.copyOf(futures), true, 655 MoreExecutors.sameThreadExecutor()); 656 } 657 658 /** 659 * Creates a new {@code ListenableFuture} whose value is a list containing the 660 * values of all its successful input futures. The list of results is in the 661 * same order as the input list, and if any of the provided futures fails or 662 * is canceled, its corresponding position will contain {@code null} (which is 663 * indistinguishable from the future having a successful value of 664 * {@code null}). 665 * 666 * @param futures futures to combine 667 * @return a future that provides a list of the results of the component 668 * futures 669 * @since 10.0 670 */ 671 @Beta 672 public static <V> ListenableFuture<List<V>> successfulAsList( 673 ListenableFuture<? extends V>... futures) { 674 return new ListFuture<V>(ImmutableList.copyOf(futures), false, 675 MoreExecutors.sameThreadExecutor()); 676 } 677 678 /** 679 * Creates a new {@code ListenableFuture} whose value is a list containing the 680 * values of all its successful input futures. The list of results is in the 681 * same order as the input list, and if any of the provided futures fails or 682 * is canceled, its corresponding position will contain {@code null} (which is 683 * indistinguishable from the future having a successful value of 684 * {@code null}). 685 * 686 * @param futures futures to combine 687 * @return a future that provides a list of the results of the component 688 * futures 689 * @since 10.0 690 */ 691 @Beta 692 public static <V> ListenableFuture<List<V>> successfulAsList( 693 Iterable<? extends ListenableFuture<? extends V>> futures) { 694 return new ListFuture<V>(ImmutableList.copyOf(futures), false, 695 MoreExecutors.sameThreadExecutor()); 696 } 697 698 /** 699 * Registers separate success and failure callbacks to be run when the {@code 700 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 701 * complete} or, if the computation is already complete, immediately. 702 * 703 * <p>There is no guaranteed ordering of execution of callbacks, but any 704 * callback added through this method is guaranteed to be called once the 705 * computation is complete. 706 * 707 * Example: <pre> {@code 708 * ListenableFuture<QueryResult> future = ...; 709 * addCallback(future, 710 * new FutureCallback<QueryResult> { 711 * public void onSuccess(QueryResult result) { 712 * storeInCache(result); 713 * } 714 * public void onFailure(Throwable t) { 715 * reportError(t); 716 * } 717 * });}</pre> 718 * 719 * Note: If the callback is slow or heavyweight, consider {@linkplain 720 * #addCallback(ListenableFuture, FutureCallback, Executor) supplying an 721 * executor}. If you do not supply an executor, {@code addCallback} will use 722 * {@link MoreExecutors#sameThreadExecutor sameThreadExecutor}, which carries 723 * some caveats for heavier operations. For example, the callback may run on 724 * an unpredictable or undesirable thread: 725 * 726 * <ul> 727 * <li>If the input {@code Future} is done at the time {@code addCallback} is 728 * called, {@code addCallback} will execute the callback inline. 729 * <li>If the input {@code Future} is not yet done, {@code addCallback} will 730 * schedule the callback to be run by the thread that completes the input 731 * {@code Future}, which may be an internal system thread such as an RPC 732 * network thread. 733 * </ul> 734 * 735 * Also note that, regardless of which thread executes the callback, all 736 * other registered but unexecuted listeners are prevented from running 737 * during its execution, even if those listeners are to run in other 738 * executors. 739 * 740 * <p>For a more general interface to attach a completion listener to a 741 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 742 * 743 * @param future The future attach the callback to. 744 * @param callback The callback to invoke when {@code future} is completed. 745 * @since 10.0 746 */ 747 public static <V> void addCallback(ListenableFuture<V> future, 748 FutureCallback<? super V> callback) { 749 addCallback(future, callback, MoreExecutors.sameThreadExecutor()); 750 } 751 752 /** 753 * Registers separate success and failure callbacks to be run when the {@code 754 * Future}'s computation is {@linkplain java.util.concurrent.Future#isDone() 755 * complete} or, if the computation is already complete, immediately. 756 * 757 * <p>The callback is run in {@code executor}. 758 * There is no guaranteed ordering of execution of callbacks, but any 759 * callback added through this method is guaranteed to be called once the 760 * computation is complete. 761 * 762 * Example: <pre> {@code 763 * ListenableFuture<QueryResult> future = ...; 764 * Executor e = ... 765 * addCallback(future, e, 766 * new FutureCallback<QueryResult> { 767 * public void onSuccess(QueryResult result) { 768 * storeInCache(result); 769 * } 770 * public void onFailure(Throwable t) { 771 * reportError(t); 772 * } 773 * });}</pre> 774 * 775 * When the callback is fast and lightweight, consider {@linkplain 776 * #addCallback(ListenableFuture, FutureCallback) omitting the executor} or 777 * explicitly specifying {@code sameThreadExecutor}. However, be aware of the 778 * caveats documented in the link above. 779 * 780 * <p>For a more general interface to attach a completion listener to a 781 * {@code Future}, see {@link ListenableFuture#addListener addListener}. 782 * 783 * @param future The future attach the callback to. 784 * @param callback The callback to invoke when {@code future} is completed. 785 * @param executor The executor to run {@code callback} when the future 786 * completes. 787 * @since 10.0 788 */ 789 public static <V> void addCallback(final ListenableFuture<V> future, 790 final FutureCallback<? super V> callback, Executor executor) { 791 Preconditions.checkNotNull(callback); 792 Runnable callbackListener = new Runnable() { 793 @Override 794 public void run() { 795 try { 796 // TODO(user): (Before Guava release), validate that this 797 // is the thing for IE. 798 V value = getUninterruptibly(future); 799 callback.onSuccess(value); 800 } catch (ExecutionException e) { 801 callback.onFailure(e.getCause()); 802 } catch (RuntimeException e) { 803 callback.onFailure(e); 804 } catch (Error e) { 805 callback.onFailure(e); 806 } 807 } 808 }; 809 future.addListener(callbackListener, executor); 810 } 811 812 /** 813 * Returns the result of {@link Future#get()}, converting most exceptions to a 814 * new instance of the given checked exception type. This reduces boilerplate 815 * for a common use of {@code Future} in which it is unnecessary to 816 * programmatically distinguish between exception types or to extract other 817 * information from the exception instance. 818 * 819 * <p>Exceptions from {@code Future.get} are treated as follows: 820 * <ul> 821 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 822 * {@code X} if the cause is a checked exception, an {@link 823 * UncheckedExecutionException} if the cause is a {@code 824 * RuntimeException}, or an {@link ExecutionError} if the cause is an 825 * {@code Error}. 826 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 827 * restoring the interrupt). 828 * <li>Any {@link CancellationException} is propagated untouched, as is any 829 * other {@link RuntimeException} (though {@code get} implementations are 830 * discouraged from throwing such exceptions). 831 * </ul> 832 * 833 * The overall principle is to continue to treat every checked exception as a 834 * checked exception, every unchecked exception as an unchecked exception, and 835 * every error as an error. In addition, the cause of any {@code 836 * ExecutionException} is wrapped in order to ensure that the new stack trace 837 * matches that of the current thread. 838 * 839 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 840 * public constructor that accepts zero or more arguments, all of type {@code 841 * String} or {@code Throwable} (preferring constructors with at least one 842 * {@code String}) and calling the constructor via reflection. If the 843 * exception did not already have a cause, one is set by calling {@link 844 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 845 * {@code IllegalArgumentException} is thrown. 846 * 847 * @throws X if {@code get} throws any checked exception except for an {@code 848 * ExecutionException} whose cause is not itself a checked exception 849 * @throws UncheckedExecutionException if {@code get} throws an {@code 850 * ExecutionException} with a {@code RuntimeException} as its cause 851 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 852 * with an {@code Error} as its cause 853 * @throws CancellationException if {@code get} throws a {@code 854 * CancellationException} 855 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 856 * RuntimeException} or does not have a suitable constructor 857 * @since 10.0 858 */ 859 @Beta 860 public static <V, X extends Exception> V get( 861 Future<V> future, Class<X> exceptionClass) throws X { 862 checkNotNull(future); 863 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 864 "Futures.get exception type (%s) must not be a RuntimeException", 865 exceptionClass); 866 try { 867 return future.get(); 868 } catch (InterruptedException e) { 869 currentThread().interrupt(); 870 throw newWithCause(exceptionClass, e); 871 } catch (ExecutionException e) { 872 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 873 throw new AssertionError(); 874 } 875 } 876 877 /** 878 * Returns the result of {@link Future#get(long, TimeUnit)}, converting most 879 * exceptions to a new instance of the given checked exception type. This 880 * reduces boilerplate for a common use of {@code Future} in which it is 881 * unnecessary to programmatically distinguish between exception types or to 882 * extract other information from the exception instance. 883 * 884 * <p>Exceptions from {@code Future.get} are treated as follows: 885 * <ul> 886 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 887 * {@code X} if the cause is a checked exception, an {@link 888 * UncheckedExecutionException} if the cause is a {@code 889 * RuntimeException}, or an {@link ExecutionError} if the cause is an 890 * {@code Error}. 891 * <li>Any {@link InterruptedException} is wrapped in an {@code X} (after 892 * restoring the interrupt). 893 * <li>Any {@link TimeoutException} is wrapped in an {@code X}. 894 * <li>Any {@link CancellationException} is propagated untouched, as is any 895 * other {@link RuntimeException} (though {@code get} implementations are 896 * discouraged from throwing such exceptions). 897 * </ul> 898 * 899 * The overall principle is to continue to treat every checked exception as a 900 * checked exception, every unchecked exception as an unchecked exception, and 901 * every error as an error. In addition, the cause of any {@code 902 * ExecutionException} is wrapped in order to ensure that the new stack trace 903 * matches that of the current thread. 904 * 905 * <p>Instances of {@code exceptionClass} are created by choosing an arbitrary 906 * public constructor that accepts zero or more arguments, all of type {@code 907 * String} or {@code Throwable} (preferring constructors with at least one 908 * {@code String}) and calling the constructor via reflection. If the 909 * exception did not already have a cause, one is set by calling {@link 910 * Throwable#initCause(Throwable)} on it. If no such constructor exists, an 911 * {@code IllegalArgumentException} is thrown. 912 * 913 * @throws X if {@code get} throws any checked exception except for an {@code 914 * ExecutionException} whose cause is not itself a checked exception 915 * @throws UncheckedExecutionException if {@code get} throws an {@code 916 * ExecutionException} with a {@code RuntimeException} as its cause 917 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 918 * with an {@code Error} as its cause 919 * @throws CancellationException if {@code get} throws a {@code 920 * CancellationException} 921 * @throws IllegalArgumentException if {@code exceptionClass} extends {@code 922 * RuntimeException} or does not have a suitable constructor 923 * @since 10.0 924 */ 925 @Beta 926 public static <V, X extends Exception> V get( 927 Future<V> future, long timeout, TimeUnit unit, Class<X> exceptionClass) 928 throws X { 929 checkNotNull(future); 930 checkNotNull(unit); 931 checkArgument(!RuntimeException.class.isAssignableFrom(exceptionClass), 932 "Futures.get exception type (%s) must not be a RuntimeException", 933 exceptionClass); 934 try { 935 return future.get(timeout, unit); 936 } catch (InterruptedException e) { 937 currentThread().interrupt(); 938 throw newWithCause(exceptionClass, e); 939 } catch (TimeoutException e) { 940 throw newWithCause(exceptionClass, e); 941 } catch (ExecutionException e) { 942 wrapAndThrowExceptionOrError(e.getCause(), exceptionClass); 943 throw new AssertionError(); 944 } 945 } 946 947 private static <X extends Exception> void wrapAndThrowExceptionOrError( 948 Throwable cause, Class<X> exceptionClass) throws X { 949 if (cause instanceof Error) { 950 throw new ExecutionError((Error) cause); 951 } 952 if (cause instanceof RuntimeException) { 953 throw new UncheckedExecutionException(cause); 954 } 955 throw newWithCause(exceptionClass, cause); 956 } 957 958 /** 959 * Returns the result of calling {@link Future#get()} uninterruptibly on a 960 * task known not to throw a checked exception. This makes {@code Future} more 961 * suitable for lightweight, fast-running tasks that, barring bugs in the 962 * code, will not fail. This gives it exception-handling behavior similar to 963 * that of {@code ForkJoinTask.join}. 964 * 965 * <p>Exceptions from {@code Future.get} are treated as follows: 966 * <ul> 967 * <li>Any {@link ExecutionException} has its <i>cause</i> wrapped in an 968 * {@link UncheckedExecutionException} (if the cause is an {@code 969 * Exception}) or {@link ExecutionError} (if the cause is an {@code 970 * Error}). 971 * <li>Any {@link InterruptedException} causes a retry of the {@code get} 972 * call. The interrupt is restored before {@code getUnchecked} returns. 973 * <li>Any {@link CancellationException} is propagated untouched. So is any 974 * other {@link RuntimeException} ({@code get} implementations are 975 * discouraged from throwing such exceptions). 976 * </ul> 977 * 978 * The overall principle is to eliminate all checked exceptions: to loop to 979 * avoid {@code InterruptedException}, to pass through {@code 980 * CancellationException}, and to wrap any exception from the underlying 981 * computation in an {@code UncheckedExecutionException} or {@code 982 * ExecutionError}. 983 * 984 * <p>For an uninterruptible {@code get} that preserves other exceptions, see 985 * {@link Uninterruptibles#getUninterruptibly(Future)}. 986 * 987 * @throws UncheckedExecutionException if {@code get} throws an {@code 988 * ExecutionException} with an {@code Exception} as its cause 989 * @throws ExecutionError if {@code get} throws an {@code ExecutionException} 990 * with an {@code Error} as its cause 991 * @throws CancellationException if {@code get} throws a {@code 992 * CancellationException} 993 * @since 10.0 994 */ 995 @Beta 996 public static <V> V getUnchecked(Future<V> future) { 997 checkNotNull(future); 998 try { 999 return getUninterruptibly(future); 1000 } catch (ExecutionException e) { 1001 wrapAndThrowUnchecked(e.getCause()); 1002 throw new AssertionError(); 1003 } 1004 } 1005 1006 private static void wrapAndThrowUnchecked(Throwable cause) { 1007 if (cause instanceof Error) { 1008 throw new ExecutionError((Error) cause); 1009 } 1010 /* 1011 * It's a non-Error, non-Exception Throwable. From my survey of such 1012 * classes, I believe that most users intended to extend Exception, so we'll 1013 * treat it like an Exception. 1014 */ 1015 throw new UncheckedExecutionException(cause); 1016 } 1017 1018 /* 1019 * TODO(user): FutureChecker interface for these to be static methods on? If 1020 * so, refer to it in the (static-method) Futures.get documentation 1021 */ 1022 1023 /* 1024 * Arguably we don't need a timed getUnchecked because any operation slow 1025 * enough to require a timeout is heavyweight enough to throw a checked 1026 * exception and therefore be inappropriate to use with getUnchecked. Further, 1027 * it's not clear that converting the checked TimeoutException to a 1028 * RuntimeException -- especially to an UncheckedExecutionException, since it 1029 * wasn't thrown by the computation -- makes sense, and if we don't convert 1030 * it, the user still has to write a try-catch block. 1031 * 1032 * If you think you would use this method, let us know. 1033 */ 1034 1035 private static <X extends Exception> X newWithCause( 1036 Class<X> exceptionClass, Throwable cause) { 1037 // getConstructors() guarantees this as long as we don't modify the array. 1038 @SuppressWarnings("unchecked") 1039 List<Constructor<X>> constructors = 1040 (List) Arrays.asList(exceptionClass.getConstructors()); 1041 for (Constructor<X> constructor : preferringStrings(constructors)) { 1042 @Nullable X instance = newFromConstructor(constructor, cause); 1043 if (instance != null) { 1044 if (instance.getCause() == null) { 1045 instance.initCause(cause); 1046 } 1047 return instance; 1048 } 1049 } 1050 throw new IllegalArgumentException( 1051 "No appropriate constructor for exception of type " + exceptionClass 1052 + " in response to chained exception", cause); 1053 } 1054 1055 private static <X extends Exception> List<Constructor<X>> 1056 preferringStrings(List<Constructor<X>> constructors) { 1057 return WITH_STRING_PARAM_FIRST.sortedCopy(constructors); 1058 } 1059 1060 private static final Ordering<Constructor<?>> WITH_STRING_PARAM_FIRST = 1061 Ordering.natural().onResultOf(new Function<Constructor<?>, Boolean>() { 1062 @Override public Boolean apply(Constructor<?> input) { 1063 return asList(input.getParameterTypes()).contains(String.class); 1064 } 1065 }).reverse(); 1066 1067 @Nullable private static <X> X newFromConstructor( 1068 Constructor<X> constructor, Throwable cause) { 1069 Class<?>[] paramTypes = constructor.getParameterTypes(); 1070 Object[] params = new Object[paramTypes.length]; 1071 for (int i = 0; i < paramTypes.length; i++) { 1072 Class<?> paramType = paramTypes[i]; 1073 if (paramType.equals(String.class)) { 1074 params[i] = cause.toString(); 1075 } else if (paramType.equals(Throwable.class)) { 1076 params[i] = cause; 1077 } else { 1078 return null; 1079 } 1080 } 1081 try { 1082 return constructor.newInstance(params); 1083 } catch (IllegalArgumentException e) { 1084 return null; 1085 } catch (InstantiationException e) { 1086 return null; 1087 } catch (IllegalAccessException e) { 1088 return null; 1089 } catch (InvocationTargetException e) { 1090 return null; 1091 } 1092 } 1093 1094 /** 1095 * Class that implements {@link #allAsList} and {@link #successfulAsList}. 1096 * The idea is to create a (null-filled) List and register a listener with 1097 * each component future to fill out the value in the List when that future 1098 * completes. 1099 */ 1100 private static class ListFuture<V> extends AbstractFuture<List<V>> { 1101 ImmutableList<? extends ListenableFuture<? extends V>> futures; 1102 final boolean allMustSucceed; 1103 final AtomicInteger remaining; 1104 List<V> values; 1105 1106 /** 1107 * Constructor. 1108 * 1109 * @param futures all the futures to build the list from 1110 * @param allMustSucceed whether a single failure or cancellation should 1111 * propagate to this future 1112 * @param listenerExecutor used to run listeners on all the passed in 1113 * futures. 1114 */ 1115 ListFuture( 1116 final ImmutableList<? extends ListenableFuture<? extends V>> futures, 1117 final boolean allMustSucceed, final Executor listenerExecutor) { 1118 this.futures = futures; 1119 this.values = Lists.newArrayListWithCapacity(futures.size()); 1120 this.allMustSucceed = allMustSucceed; 1121 this.remaining = new AtomicInteger(futures.size()); 1122 1123 init(listenerExecutor); 1124 } 1125 1126 private void init(final Executor listenerExecutor) { 1127 // First, schedule cleanup to execute when the Future is done. 1128 addListener(new Runnable() { 1129 @Override 1130 public void run() { 1131 // By now the values array has either been set as the Future's value, 1132 // or (in case of failure) is no longer useful. 1133 ListFuture.this.values = null; 1134 1135 // Let go of the memory held by other futures 1136 ListFuture.this.futures = null; 1137 } 1138 }, MoreExecutors.sameThreadExecutor()); 1139 1140 // Now begin the "real" initialization. 1141 1142 // Corner case: List is empty. 1143 if (futures.isEmpty()) { 1144 set(Lists.newArrayList(values)); 1145 return; 1146 } 1147 1148 // Populate the results list with null initially. 1149 for (int i = 0; i < futures.size(); ++i) { 1150 values.add(null); 1151 } 1152 1153 // Register a listener on each Future in the list to update 1154 // the state of this future. 1155 // Note that if all the futures on the list are done prior to completing 1156 // this loop, the last call to addListener() will callback to 1157 // setOneValue(), transitively call our cleanup listener, and set 1158 // this.futures to null. 1159 // We store a reference to futures to avoid the NPE. 1160 ImmutableList<? extends ListenableFuture<? extends V>> localFutures = futures; 1161 for (int i = 0; i < localFutures.size(); i++) { 1162 final ListenableFuture<? extends V> listenable = localFutures.get(i); 1163 final int index = i; 1164 listenable.addListener(new Runnable() { 1165 @Override 1166 public void run() { 1167 setOneValue(index, listenable); 1168 } 1169 }, listenerExecutor); 1170 } 1171 } 1172 1173 /** 1174 * Sets the value at the given index to that of the given future. 1175 */ 1176 private void setOneValue(int index, Future<? extends V> future) { 1177 List<V> localValues = values; 1178 if (isDone() || localValues == null) { 1179 // Some other future failed or has been cancelled, causing this one to 1180 // also be cancelled or have an exception set. This should only happen 1181 // if allMustSucceed is true. 1182 checkState(allMustSucceed, 1183 "Future was done before all dependencies completed"); 1184 return; 1185 } 1186 1187 try { 1188 checkState(future.isDone(), 1189 "Tried to set value from future which is not done"); 1190 localValues.set(index, getUninterruptibly(future)); 1191 } catch (CancellationException e) { 1192 if (allMustSucceed) { 1193 // Set ourselves as cancelled. Let the input futures keep running 1194 // as some of them may be used elsewhere. 1195 // (Currently we don't override interruptTask, so 1196 // mayInterruptIfRunning==false isn't technically necessary.) 1197 cancel(false); 1198 } 1199 } catch (ExecutionException e) { 1200 if (allMustSucceed) { 1201 // As soon as the first one fails, throw the exception up. 1202 // The result of all other inputs is then ignored. 1203 setException(e.getCause()); 1204 } 1205 } catch (RuntimeException e) { 1206 if (allMustSucceed) { 1207 setException(e); 1208 } 1209 } catch (Error e) { 1210 // Propagate errors up ASAP - our superclass will rethrow the error 1211 setException(e); 1212 } finally { 1213 int newRemaining = remaining.decrementAndGet(); 1214 checkState(newRemaining >= 0, "Less than 0 remaining futures"); 1215 if (newRemaining == 0) { 1216 localValues = values; 1217 if (localValues != null) { 1218 set(Lists.newArrayList(localValues)); 1219 } else { 1220 checkState(isDone()); 1221 } 1222 } 1223 } 1224 } 1225 1226 } 1227 1228 /** 1229 * A checked future that uses a function to map from exceptions to the 1230 * appropriate checked type. 1231 */ 1232 private static class MappingCheckedFuture<V, X extends Exception> extends 1233 AbstractCheckedFuture<V, X> { 1234 1235 final Function<Exception, X> mapper; 1236 1237 MappingCheckedFuture(ListenableFuture<V> delegate, 1238 Function<Exception, X> mapper) { 1239 super(delegate); 1240 1241 this.mapper = checkNotNull(mapper); 1242 } 1243 1244 @Override 1245 protected X mapException(Exception e) { 1246 return mapper.apply(e); 1247 } 1248 } 1249 }