001 /* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkNotNull; 020 import static java.util.concurrent.TimeUnit.NANOSECONDS; 021 022 import com.google.common.annotations.Beta; 023 import com.google.common.base.Function; 024 025 import java.lang.reflect.UndeclaredThrowableException; 026 import java.util.List; 027 import java.util.concurrent.BlockingQueue; 028 import java.util.concurrent.CancellationException; 029 import java.util.concurrent.CountDownLatch; 030 import java.util.concurrent.ExecutionException; 031 import java.util.concurrent.Executor; 032 import java.util.concurrent.Executors; 033 import java.util.concurrent.Future; 034 import java.util.concurrent.LinkedBlockingQueue; 035 import java.util.concurrent.ThreadFactory; 036 import java.util.concurrent.TimeUnit; 037 import java.util.concurrent.TimeoutException; 038 import java.util.concurrent.atomic.AtomicBoolean; 039 040 import javax.annotation.Nullable; 041 042 /** 043 * Static utility methods pertaining to the {@link Future} interface. 044 * 045 * @author Kevin Bourrillion 046 * @author Nishant Thakkar 047 * @author Sven Mawson 048 * @since 1 049 */ 050 @Beta 051 public final class Futures { 052 private Futures() {} 053 054 /** 055 * Returns an uninterruptible view of a {@code Future}. If a thread is 056 * interrupted during an attempt to {@code get()} from the returned future, it 057 * continues to wait on the result until it is available or the timeout 058 * elapses, and only then re-interrupts the thread. 059 */ 060 public static <V> UninterruptibleFuture<V> makeUninterruptible( 061 final Future<V> future) { 062 checkNotNull(future); 063 if (future instanceof UninterruptibleFuture<?>) { 064 return (UninterruptibleFuture<V>) future; 065 } 066 return new UninterruptibleFuture<V>() { 067 @Override 068 public boolean cancel(boolean mayInterruptIfRunning) { 069 return future.cancel(mayInterruptIfRunning); 070 } 071 @Override 072 public boolean isCancelled() { 073 return future.isCancelled(); 074 } 075 @Override 076 public boolean isDone() { 077 return future.isDone(); 078 } 079 080 @Override 081 public V get(long originalTimeout, TimeUnit originalUnit) 082 throws TimeoutException, ExecutionException { 083 boolean interrupted = false; 084 try { 085 long end = System.nanoTime() + originalUnit.toNanos(originalTimeout); 086 while (true) { 087 try { 088 // Future treats negative timeouts just like zero. 089 return future.get(end - System.nanoTime(), NANOSECONDS); 090 } catch (InterruptedException e) { 091 interrupted = true; 092 } 093 } 094 } finally { 095 if (interrupted) { 096 Thread.currentThread().interrupt(); 097 } 098 } 099 } 100 101 @Override 102 public V get() throws ExecutionException { 103 boolean interrupted = false; 104 try { 105 while (true) { 106 try { 107 return future.get(); 108 } catch (InterruptedException ignored) { 109 interrupted = true; 110 } 111 } 112 } finally { 113 if (interrupted) { 114 Thread.currentThread().interrupt(); 115 } 116 } 117 } 118 }; 119 } 120 121 /** 122 * 123 * <p>Creates a {@link ListenableFuture} out of a normal {@link Future}. The 124 * returned future will create a thread to wait for the source future to 125 * complete before executing the listeners. 126 * 127 * <p><b>Warning:</b> If the input future does not already implement {@link 128 * ListenableFuture}, the returned future will emulate {@link 129 * ListenableFuture#addListener} by taking a thread from an internal, 130 * unbounded pool at the first call to {@code addListener} and holding it 131 * until the future is {@linkplain Future#isDone() done}. 132 * 133 * <p>Callers who have a future that subclasses 134 * {@link java.util.concurrent.FutureTask} may want to instead subclass 135 * {@link ListenableFutureTask}, which adds the {@link ListenableFuture} 136 * functionality to the standard {@code FutureTask} implementation. 137 */ 138 public static <V> ListenableFuture<V> makeListenable( 139 Future<V> future) { 140 if (future instanceof ListenableFuture<?>) { 141 return (ListenableFuture<V>) future; 142 } 143 return new ListenableFutureAdapter<V>(future); 144 } 145 146 static <V> ListenableFuture<V> makeListenable( 147 Future<V> future, Executor executor) { 148 checkNotNull(executor); 149 if (future instanceof ListenableFuture<?>) { 150 return (ListenableFuture<V>) future; 151 } 152 return new ListenableFutureAdapter<V>(future, executor); 153 } 154 155 /** 156 * Creates a {@link CheckedFuture} out of a normal {@link Future} and a 157 * {@link Function} that maps from {@link Exception} instances into the 158 * appropriate checked type. 159 * 160 * <p><b>Warning:</b> If the input future does not implement {@link 161 * ListenableFuture}, the returned future will emulate {@link 162 * ListenableFuture#addListener} by taking a thread from an internal, 163 * unbounded pool at the first call to {@code addListener} and holding it 164 * until the future is {@linkplain Future#isDone() done}. 165 * 166 * <p>The given mapping function will be applied to an 167 * {@link InterruptedException}, a {@link CancellationException}, or an 168 * {@link ExecutionException} with the actual cause of the exception. 169 * See {@link Future#get()} for details on the exceptions thrown. 170 */ 171 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 172 Future<V> future, Function<Exception, X> mapper) { 173 return new MappingCheckedFuture<V, X>(makeListenable(future), mapper); 174 } 175 176 /** 177 * Creates a {@link CheckedFuture} out of a normal {@link ListenableFuture} 178 * and a {@link Function} that maps from {@link Exception} instances into the 179 * appropriate checked type. 180 * 181 * <p>The given mapping function will be applied to an 182 * {@link InterruptedException}, a {@link CancellationException}, or an 183 * {@link ExecutionException} with the actual cause of the exception. 184 * See {@link Future#get()} for details on the exceptions thrown. 185 * 186 * @since 9 (source-compatible since release 1) 187 */ 188 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 189 ListenableFuture<V> future, Function<Exception, X> mapper) { 190 return new MappingCheckedFuture<V, X>(checkNotNull(future), mapper); 191 } 192 193 /** 194 * Creates a {@code ListenableFuture} which has its value set immediately upon 195 * construction. The getters just return the value. This {@code Future} can't 196 * be canceled or timed out and its {@code isDone()} method always returns 197 * {@code true}. 198 */ 199 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 200 SettableFuture<V> future = SettableFuture.create(); 201 future.set(value); 202 return future; 203 } 204 205 /** 206 * Returns a {@code CheckedFuture} which has its value set immediately upon 207 * construction. 208 * 209 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 210 * method always returns {@code true}. Calling {@code get()} or {@code 211 * checkedGet()} will immediately return the provided value. 212 */ 213 public static <V, X extends Exception> CheckedFuture<V, X> 214 immediateCheckedFuture(@Nullable V value) { 215 SettableFuture<V> future = SettableFuture.create(); 216 future.set(value); 217 return Futures.makeChecked(future, new Function<Exception, X>() { 218 @Override 219 public X apply(Exception e) { 220 throw new AssertionError("impossible"); 221 } 222 }); 223 } 224 225 /** 226 * Returns a {@code ListenableFuture} which has an exception set immediately 227 * upon construction. 228 * 229 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 230 * method always returns {@code true}. Calling {@code get()} will immediately 231 * throw the provided {@code Throwable} wrapped in an {@code 232 * ExecutionException}. 233 * 234 * @throws Error if the throwable is an {@link Error}. 235 */ 236 public static <V> ListenableFuture<V> immediateFailedFuture( 237 Throwable throwable) { 238 checkNotNull(throwable); 239 SettableFuture<V> future = SettableFuture.create(); 240 future.setException(throwable); 241 return future; 242 } 243 244 /** 245 * Returns a {@code CheckedFuture} which has an exception set immediately upon 246 * construction. 247 * 248 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 249 * method always returns {@code true}. Calling {@code get()} will immediately 250 * throw the provided {@code Throwable} wrapped in an {@code 251 * ExecutionException}, and calling {@code checkedGet()} will throw the 252 * provided exception itself. 253 * 254 * @throws Error if the throwable is an {@link Error}. 255 */ 256 public static <V, X extends Exception> CheckedFuture<V, X> 257 immediateFailedCheckedFuture(final X exception) { 258 checkNotNull(exception); 259 return makeChecked(Futures.<V>immediateFailedFuture(exception), 260 new Function<Exception, X>() { 261 @Override 262 public X apply(Exception e) { 263 return exception; 264 } 265 }); 266 } 267 268 /** 269 * Returns a new {@code ListenableFuture} whose result is asynchronously 270 * derived from the result of the given {@code Future}. More precisely, the 271 * returned {@code Future} takes its result from a {@code Future} produced by 272 * applying the given {@code Function} to the result of the original {@code 273 * Future}. Example: 274 * 275 * <pre> {@code 276 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 277 * Function<RowKey, ListenableFuture<QueryResult>> queryFunction = 278 * new Function<RowKey, ListenableFuture<QueryResult>>() { 279 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 280 * return dataService.read(rowKey); 281 * } 282 * }; 283 * ListenableFuture<QueryResult> queryFuture = 284 * chain(queryFuture, queryFunction); 285 * }</pre> 286 * 287 * <p>Successful cancellation of either the input future or the result of 288 * function application will cause the returned future to be cancelled. 289 * Cancelling the returned future will succeed if it is currently running. 290 * In this case, attempts will be made to cancel the input future and the 291 * result of the function, however there is no guarantee of success. 292 * 293 * <p>TODO: Add a version that accepts a normal {@code Future} 294 * 295 * <p>The typical use for this method would be when a RPC call is dependent on 296 * the results of another RPC. One would call the first RPC (input), create a 297 * function that calls another RPC based on input's result, and then call 298 * chain on input and that function to get a {@code ListenableFuture} of 299 * the result. 300 * 301 * @param input The future to chain 302 * @param function A function to chain the results of the provided future 303 * to the results of the returned future. This will be run in the thread 304 * that notifies input it is complete. 305 * @return A future that holds result of the chain. 306 */ 307 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 308 Function<? super I, ? extends ListenableFuture<? extends O>> function) { 309 return chain(input, function, MoreExecutors.sameThreadExecutor()); 310 } 311 312 /** 313 * Returns a new {@code ListenableFuture} whose result is asynchronously 314 * derived from the result of the given {@code Future}. More precisely, the 315 * returned {@code Future} takes its result from a {@code Future} produced by 316 * applying the given {@code Function} to the result of the original {@code 317 * Future}. Example: 318 * 319 * <pre> {@code 320 * ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query); 321 * Function<RowKey, ListenableFuture<QueryResult>> queryFunction = 322 * new Function<RowKey, ListenableFuture<QueryResult>>() { 323 * public ListenableFuture<QueryResult> apply(RowKey rowKey) { 324 * return dataService.read(rowKey); 325 * } 326 * }; 327 * ListenableFuture<QueryResult> queryFuture = 328 * chain(queryFuture, queryFunction, executor); 329 * }</pre> 330 * 331 * <p>Successful cancellation of either the input future or the result of 332 * function application will cause the returned future to be cancelled. 333 * Cancelling the returned future will succeed if it is currently running. 334 * In this case, attempts will be made to cancel the input future and the 335 * result of the function, however there is no guarantee of success. 336 * 337 * <p>This version allows an arbitrary executor to be passed in for running 338 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor}, 339 * the thread chained Function executes in will be whichever thread set the 340 * result of the input Future, which may be the network thread in the case of 341 * RPC-based Futures. 342 * 343 * @param input The future to chain 344 * @param function A function to chain the results of the provided future 345 * to the results of the returned future. 346 * @param exec Executor to run the function in. 347 * @return A future that holds result of the chain. 348 */ 349 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 350 Function<? super I, ? extends ListenableFuture<? extends O>> function, 351 Executor exec) { 352 ChainingListenableFuture<I, O> chain = 353 new ChainingListenableFuture<I, O>(function, input); 354 input.addListener(chain, exec); 355 return chain; 356 } 357 358 /** 359 * Returns a new {@code ListenableFuture} whose result is the product of 360 * applying the given {@code Function} to the result of the given {@code 361 * Future}. Example: 362 * 363 * <pre> {@code 364 * ListenableFuture<QueryResult> queryFuture = ...; 365 * Function<QueryResult, List<Row>> rowsFunction = 366 * new Function<QueryResult, List<Row>>() { 367 * public List<Row> apply(QueryResult queryResult) { 368 * return queryResult.getRows(); 369 * } 370 * }; 371 * ListenableFuture<List<Row>> rowsFuture = 372 * transform(queryFuture, rowsFunction); 373 * }</pre> 374 * 375 * <p>Successful cancellation of the input future will cause the returned 376 * future to be cancelled. Cancelling the returned future will succeed if it 377 * is currently running. In this case, an attempt will be made to cancel the 378 * input future, however there is no guarantee of success. 379 * 380 * <p>An example use of this method is to convert a serializable object 381 * returned from an RPC into a POJO. 382 * 383 * @param future The future to compose 384 * @param function A Function to compose the results of the provided future 385 * to the results of the returned future. This will be run in the thread 386 * that notifies input it is complete. 387 * @return A future that holds result of the composition. 388 * @since 9 (in version 1 as {@code compose}) 389 */ 390 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future, 391 final Function<? super I, ? extends O> function) { 392 return transform(future, function, MoreExecutors.sameThreadExecutor()); 393 } 394 395 /** 396 * Returns a new {@code ListenableFuture} whose result is the product of 397 * applying the given {@code Function} to the result of the given {@code 398 * Future}. Example: 399 * 400 * <pre> {@code 401 * ListenableFuture<QueryResult> queryFuture = ...; 402 * Function<QueryResult, List<Row>> rowsFunction = 403 * new Function<QueryResult, List<Row>>() { 404 * public List<Row> apply(QueryResult queryResult) { 405 * return queryResult.getRows(); 406 * } 407 * }; 408 * ListenableFuture<List<Row>> rowsFuture = 409 * transform(queryFuture, rowsFunction, executor); 410 * }</pre> 411 * 412 * <p>Successful cancellation of the input future will cause the returned 413 * future to be cancelled. Cancelling the returned future will succeed if it 414 * is currently running. In this case, an attempt will be made to cancel the 415 * input future, however there is no guarantee of success. 416 * 417 * <p>An example use of this method is to convert a serializable object 418 * returned from an RPC into a POJO. 419 * 420 * <p>This version allows an arbitrary executor to be passed in for running 421 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor}, 422 * the thread chained Function executes in will be whichever thread set the 423 * result of the input Future, which may be the network thread in the case of 424 * RPC-based Futures. 425 * 426 * @param future The future to compose 427 * @param function A Function to compose the results of the provided future 428 * to the results of the returned future. 429 * @param exec Executor to run the function in. 430 * @return A future that holds result of the composition. 431 * @since 9 (in version 2 as {@code compose}) 432 */ 433 public static <I, O> ListenableFuture<O> transform(ListenableFuture<I> future, 434 final Function<? super I, ? extends O> function, Executor exec) { 435 checkNotNull(function); 436 Function<I, ListenableFuture<O>> wrapperFunction 437 = new Function<I, ListenableFuture<O>>() { 438 @Override public ListenableFuture<O> apply(I input) { 439 O output = function.apply(input); 440 return immediateFuture(output); 441 } 442 }; 443 return chain(future, wrapperFunction, exec); 444 } 445 446 /** 447 * Returns a new {@code Future} whose result is the product of applying the 448 * given {@code Function} to the result of the given {@code Future}. Example: 449 * 450 * <pre> {@code 451 * Future<QueryResult> queryFuture = ...; 452 * Function<QueryResult, List<Row>> rowsFunction = 453 * new Function<QueryResult, List<Row>>() { 454 * public List<Row> apply(QueryResult queryResult) { 455 * return queryResult.getRows(); 456 * } 457 * }; 458 * Future<List<Row>> rowsFuture = transform(queryFuture, rowsFunction); 459 * }</pre> 460 * 461 * <p>Each call to {@code Future<O>.get(*)} results in a call to 462 * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it 463 * is assumed that {@code Future<I>.get(*)} is idempotent. 464 * 465 * <p>When calling {@link Future#get(long, TimeUnit)} on the returned 466 * future, the timeout only applies to the future passed in to this method. 467 * Any additional time taken by applying {@code function} is not considered. 468 * (Exception: If the input future is a {@link ListenableFuture}, timeouts 469 * will be strictly enforced.) 470 * 471 * @param future The future to compose 472 * @param function A Function to compose the results of the provided future 473 * to the results of the returned future. This will be run in the thread 474 * that calls one of the varieties of {@code get()}. 475 * @return A future that computes result of the composition. 476 * @since 9 (in version 1 as {@code compose}) 477 */ 478 public static <I, O> Future<O> transform(final Future<I> future, 479 final Function<? super I, ? extends O> function) { 480 if (future instanceof ListenableFuture) { 481 return transform((ListenableFuture<I>) future, function); 482 } 483 checkNotNull(future); 484 checkNotNull(function); 485 return new Future<O>() { 486 487 /* 488 * Concurrency detail: 489 * 490 * <p>To preserve the idempotency of calls to this.get(*) calls to the 491 * function are only applied once. A lock is required to prevent multiple 492 * applications of the function. The calls to future.get(*) are performed 493 * outside the lock, as is required to prevent calls to 494 * get(long, TimeUnit) to persist beyond their timeout. 495 * 496 * <p>Calls to future.get(*) on every call to this.get(*) also provide 497 * the cancellation behavior for this. 498 * 499 * <p>(Consider: in thread A, call get(), in thread B call get(long, 500 * TimeUnit). Thread B may have to wait for Thread A to finish, which 501 * would be unacceptable.) 502 * 503 * <p>Note that each call to Future<O>.get(*) results in a call to 504 * Future<I>.get(*), but the function is only applied once, so 505 * Future<I>.get(*) is assumed to be idempotent. 506 */ 507 508 private final Object lock = new Object(); 509 private boolean set = false; 510 private O value = null; 511 private ExecutionException exception = null; 512 513 @Override 514 public O get() throws InterruptedException, ExecutionException { 515 return apply(future.get()); 516 } 517 518 @Override 519 public O get(long timeout, TimeUnit unit) throws InterruptedException, 520 ExecutionException, TimeoutException { 521 return apply(future.get(timeout, unit)); 522 } 523 524 private O apply(I raw) throws ExecutionException { 525 synchronized (lock) { 526 if (!set) { 527 try { 528 value = function.apply(raw); 529 } catch (RuntimeException e) { 530 exception = new ExecutionException(e); 531 } catch (Error e) { 532 exception = new ExecutionException(e); 533 } 534 set = true; 535 } 536 537 if (exception != null) { 538 throw exception; 539 } 540 return value; 541 } 542 } 543 544 @Override 545 public boolean cancel(boolean mayInterruptIfRunning) { 546 return future.cancel(mayInterruptIfRunning); 547 } 548 549 @Override 550 public boolean isCancelled() { 551 return future.isCancelled(); 552 } 553 554 @Override 555 public boolean isDone() { 556 return future.isDone(); 557 } 558 }; 559 } 560 561 /** 562 * An implementation of {@code ListenableFuture} that also implements 563 * {@code Runnable} so that it can be used to nest ListenableFutures. 564 * Once the passed-in {@code ListenableFuture} is complete, it calls the 565 * passed-in {@code Function} to generate the result. 566 * 567 * <p>If the function throws any checked exceptions, they should be wrapped 568 * in a {@code UndeclaredThrowableException} so that this class can get 569 * access to the cause. 570 */ 571 private static class ChainingListenableFuture<I, O> 572 extends AbstractListenableFuture<O> implements Runnable { 573 574 private Function<? super I, ? extends ListenableFuture<? extends O>> 575 function; 576 private ListenableFuture<? extends I> inputFuture; 577 private volatile ListenableFuture<? extends O> outputFuture; 578 private final BlockingQueue<Boolean> mayInterruptIfRunningChannel = 579 new LinkedBlockingQueue<Boolean>(1); 580 private final CountDownLatch outputCreated = new CountDownLatch(1); 581 582 private ChainingListenableFuture( 583 Function<? super I, ? extends ListenableFuture<? extends O>> function, 584 ListenableFuture<? extends I> inputFuture) { 585 this.function = checkNotNull(function); 586 this.inputFuture = checkNotNull(inputFuture); 587 } 588 589 /** 590 * Delegate the get() to the input and output futures, in case 591 * their implementations defer starting computation until their 592 * own get() is invoked. 593 */ 594 @Override 595 public O get() throws InterruptedException, ExecutionException { 596 if (!isDone()) { 597 // Invoking get on the inputFuture will ensure our own run() 598 // method below is invoked as a listener when inputFuture sets 599 // its value. Therefore when get() returns we should then see 600 // the outputFuture be created. 601 ListenableFuture<? extends I> inputFuture = this.inputFuture; 602 if (inputFuture != null) { 603 inputFuture.get(); 604 } 605 606 // If our listener was scheduled to run on an executor we may 607 // need to wait for our listener to finish running before the 608 // outputFuture has been constructed by the function. 609 outputCreated.await(); 610 611 // Like above with the inputFuture, we have a listener on 612 // the outputFuture that will set our own value when its 613 // value is set. Invoking get will ensure the output can 614 // complete and invoke our listener, so that we can later 615 // get the result. 616 ListenableFuture<? extends O> outputFuture = this.outputFuture; 617 if (outputFuture != null) { 618 outputFuture.get(); 619 } 620 } 621 return super.get(); 622 } 623 624 /** 625 * Delegate the get() to the input and output futures, in case 626 * their implementations defer starting computation until their 627 * own get() is invoked. 628 */ 629 @Override 630 public O get(long timeout, TimeUnit unit) throws TimeoutException, 631 ExecutionException, InterruptedException { 632 if (!isDone()) { 633 // Use a single time unit so we can decrease remaining timeout 634 // as we wait for various phases to complete. 635 if (unit != NANOSECONDS) { 636 timeout = NANOSECONDS.convert(timeout, unit); 637 unit = NANOSECONDS; 638 } 639 640 // Invoking get on the inputFuture will ensure our own run() 641 // method below is invoked as a listener when inputFuture sets 642 // its value. Therefore when get() returns we should then see 643 // the outputFuture be created. 644 ListenableFuture<? extends I> inputFuture = this.inputFuture; 645 if (inputFuture != null) { 646 long start = System.nanoTime(); 647 inputFuture.get(timeout, unit); 648 timeout -= Math.max(0, System.nanoTime() - start); 649 } 650 651 // If our listener was scheduled to run on an executor we may 652 // need to wait for our listener to finish running before the 653 // outputFuture has been constructed by the function. 654 long start = System.nanoTime(); 655 if (!outputCreated.await(timeout, unit)) { 656 throw new TimeoutException(); 657 } 658 timeout -= Math.max(0, System.nanoTime() - start); 659 660 // Like above with the inputFuture, we have a listener on 661 // the outputFuture that will set our own value when its 662 // value is set. Invoking get will ensure the output can 663 // complete and invoke our listener, so that we can later 664 // get the result. 665 ListenableFuture<? extends O> outputFuture = this.outputFuture; 666 if (outputFuture != null) { 667 outputFuture.get(timeout, unit); 668 } 669 } 670 return super.get(timeout, unit); 671 } 672 673 @Override 674 public boolean cancel(boolean mayInterruptIfRunning) { 675 if (cancel()) { 676 try { 677 // This should never block since only one thread is allowed to cancel 678 // this Future. 679 mayInterruptIfRunningChannel.put(mayInterruptIfRunning); 680 } catch (InterruptedException ignored) { 681 Thread.currentThread().interrupt(); 682 } 683 cancel(inputFuture, mayInterruptIfRunning); 684 cancel(outputFuture, mayInterruptIfRunning); 685 return true; 686 } 687 return false; 688 } 689 690 private void cancel(@Nullable Future<?> future, 691 boolean mayInterruptIfRunning) { 692 if (future != null) { 693 future.cancel(mayInterruptIfRunning); 694 } 695 } 696 697 @Override 698 public void run() { 699 try { 700 I sourceResult; 701 try { 702 sourceResult = makeUninterruptible(inputFuture).get(); 703 } catch (CancellationException e) { 704 // Cancel this future and return. 705 cancel(); 706 return; 707 } catch (ExecutionException e) { 708 // Set the cause of the exception as this future's exception 709 setException(e.getCause()); 710 return; 711 } 712 713 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 714 function.apply(sourceResult); 715 if (isCancelled()) { 716 // Handles the case where cancel was called while the function was 717 // being applied. 718 try { 719 // There is a gap in cancel(boolean) between calling cancel() and 720 // storing the value of mayInterruptIfRunning, so this thread needs 721 // to block, waiting for that value. 722 outputFuture.cancel(mayInterruptIfRunningChannel.take()); 723 } catch (InterruptedException ignored) { 724 Thread.currentThread().interrupt(); 725 } 726 this.outputFuture = null; 727 return; 728 } 729 outputFuture.addListener(new Runnable() { 730 @Override 731 public void run() { 732 try { 733 // Here it would have been nice to have had an 734 // UninterruptibleListenableFuture, but we don't want to start a 735 // combinatorial explosion of interfaces, so we have to make do. 736 set(makeUninterruptible(outputFuture).get()); 737 } catch (CancellationException e) { 738 // Cancel this future and return. 739 cancel(); 740 return; 741 } catch (ExecutionException e) { 742 // Set the cause of the exception as this future's exception 743 setException(e.getCause()); 744 } finally { 745 // Don't pin inputs beyond completion 746 ChainingListenableFuture.this.outputFuture = null; 747 } 748 } 749 }, MoreExecutors.sameThreadExecutor()); 750 } catch (UndeclaredThrowableException e) { 751 // Set the cause of the exception as this future's exception 752 setException(e.getCause()); 753 } catch (RuntimeException e) { 754 // This exception is irrelevant in this thread, but useful for the 755 // client 756 setException(e); 757 } catch (Error e) { 758 // Propagate errors up ASAP - our superclass will rethrow the error 759 setException(e); 760 } finally { 761 // Don't pin inputs beyond completion 762 function = null; 763 inputFuture = null; 764 // Allow our get routines to examine outputFuture now. 765 outputCreated.countDown(); 766 } 767 } 768 } 769 770 /** 771 * A checked future that uses a function to map from exceptions to the 772 * appropriate checked type. 773 */ 774 private static class MappingCheckedFuture<V, X extends Exception> extends 775 AbstractCheckedFuture<V, X> { 776 777 final Function<Exception, X> mapper; 778 779 MappingCheckedFuture(ListenableFuture<V> delegate, 780 Function<Exception, X> mapper) { 781 super(delegate); 782 783 this.mapper = checkNotNull(mapper); 784 } 785 786 @Override 787 protected X mapException(Exception e) { 788 return mapper.apply(e); 789 } 790 } 791 792 /** 793 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This 794 * will wait on the future to finish, and when it completes, run the 795 * listeners. This implementation will wait on the source future 796 * indefinitely, so if the source future never completes, the adapter will 797 * never complete either. 798 * 799 * <p>If the delegate future is interrupted or throws an unexpected unchecked 800 * exception, the listeners will not be invoked. 801 */ 802 private static class ListenableFutureAdapter<V> extends ForwardingFuture<V> 803 implements ListenableFuture<V> { 804 805 private static final ThreadFactory threadFactory = 806 new ThreadFactoryBuilder() 807 .setNameFormat("ListenableFutureAdapter-thread-%d") 808 .build(); 809 private static final Executor defaultAdapterExecutor = 810 Executors.newCachedThreadPool(threadFactory); 811 812 private final Executor adapterExecutor; 813 814 // The execution list to hold our listeners. 815 private final ExecutionList executionList = new ExecutionList(); 816 817 // This allows us to only start up a thread waiting on the delegate future 818 // when the first listener is added. 819 private final AtomicBoolean hasListeners = new AtomicBoolean(false); 820 821 // The delegate future. 822 private final Future<V> delegate; 823 824 ListenableFutureAdapter(Future<V> delegate) { 825 this(delegate, defaultAdapterExecutor); 826 } 827 828 ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) { 829 this.delegate = checkNotNull(delegate); 830 this.adapterExecutor = checkNotNull(adapterExecutor); 831 } 832 833 @Override 834 protected Future<V> delegate() { 835 return delegate; 836 } 837 838 @Override 839 public void addListener(Runnable listener, Executor exec) { 840 executionList.add(listener, exec); 841 842 // When a listener is first added, we run a task that will wait for 843 // the delegate to finish, and when it is done will run the listeners. 844 if (hasListeners.compareAndSet(false, true)) { 845 if (delegate.isDone()) { 846 // If the delegate is already done, run the execution list 847 // immediately on the current thread. 848 executionList.run(); 849 return; 850 } 851 852 adapterExecutor.execute(new Runnable() { 853 @Override 854 public void run() { 855 try { 856 delegate.get(); 857 } catch (Error e) { 858 throw e; 859 } catch (InterruptedException e) { 860 // This thread was interrupted. This should never happen, so we 861 // throw an IllegalStateException. 862 Thread.currentThread().interrupt(); 863 throw new IllegalStateException("Adapter thread interrupted!", e); 864 } catch (Throwable e) { 865 // ExecutionException / CancellationException / RuntimeException 866 // The task is done, run the listeners. 867 } 868 executionList.run(); 869 } 870 }); 871 } 872 } 873 } 874 }