001 /* 002 * Copyright (C) 2006 Google Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkNotNull; 020 import static java.util.concurrent.TimeUnit.NANOSECONDS; 021 022 import com.google.common.annotations.Beta; 023 import com.google.common.base.Function; 024 025 import java.lang.reflect.UndeclaredThrowableException; 026 import java.util.concurrent.BlockingQueue; 027 import java.util.concurrent.CancellationException; 028 import java.util.concurrent.CountDownLatch; 029 import java.util.concurrent.ExecutionException; 030 import java.util.concurrent.Executor; 031 import java.util.concurrent.Executors; 032 import java.util.concurrent.Future; 033 import java.util.concurrent.LinkedBlockingQueue; 034 import java.util.concurrent.ThreadFactory; 035 import java.util.concurrent.TimeUnit; 036 import java.util.concurrent.TimeoutException; 037 import java.util.concurrent.atomic.AtomicBoolean; 038 039 import javax.annotation.Nullable; 040 041 /** 042 * Static utility methods pertaining to the {@link Future} interface. 043 * 044 * @author Kevin Bourrillion 045 * @author Nishant Thakkar 046 * @author Sven Mawson 047 * @since 1 048 */ 049 @Beta 050 public final class Futures { 051 private Futures() {} 052 053 /** 054 * Returns an uninterruptible view of a {@code Future}. If a thread is 055 * interrupted during an attempt to {@code get()} from the returned future, it 056 * continues to wait on the result until it is available or the timeout 057 * elapses, and only then re-interrupts the thread. 058 */ 059 public static <V> UninterruptibleFuture<V> makeUninterruptible( 060 final Future<V> future) { 061 checkNotNull(future); 062 if (future instanceof UninterruptibleFuture<?>) { 063 return (UninterruptibleFuture<V>) future; 064 } 065 return new UninterruptibleFuture<V>() { 066 public boolean cancel(boolean mayInterruptIfRunning) { 067 return future.cancel(mayInterruptIfRunning); 068 } 069 public boolean isCancelled() { 070 return future.isCancelled(); 071 } 072 public boolean isDone() { 073 return future.isDone(); 074 } 075 076 public V get(long originalTimeout, TimeUnit originalUnit) 077 throws TimeoutException, ExecutionException { 078 boolean interrupted = false; 079 try { 080 long end = System.nanoTime() + originalUnit.toNanos(originalTimeout); 081 while (true) { 082 try { 083 // Future treats negative timeouts just like zero. 084 return future.get(end - System.nanoTime(), NANOSECONDS); 085 } catch (InterruptedException e) { 086 interrupted = true; 087 } 088 } 089 } finally { 090 if (interrupted) { 091 Thread.currentThread().interrupt(); 092 } 093 } 094 } 095 096 public V get() throws ExecutionException { 097 boolean interrupted = false; 098 try { 099 while (true) { 100 try { 101 return future.get(); 102 } catch (InterruptedException ignored) { 103 interrupted = true; 104 } 105 } 106 } finally { 107 if (interrupted) { 108 Thread.currentThread().interrupt(); 109 } 110 } 111 } 112 }; 113 } 114 115 /** 116 * Creates a {@link ListenableFuture} out of a normal {@link Future}. The 117 * returned future will create a thread to wait for the source future to 118 * complete before executing the listeners. 119 * 120 * <p>Callers who have a future that subclasses 121 * {@link java.util.concurrent.FutureTask} may want to instead subclass 122 * {@link ListenableFutureTask}, which adds the {@link ListenableFuture} 123 * functionality to the standard {@code FutureTask} implementation. 124 */ 125 public static <V> ListenableFuture<V> makeListenable(Future<V> future) { 126 if (future instanceof ListenableFuture<?>) { 127 return (ListenableFuture<V>) future; 128 } 129 return new ListenableFutureAdapter<V>(future); 130 } 131 132 /** 133 * Creates a {@link ListenableFuture} out of a normal {@link Future} and uses 134 * the given {@link Executor} to get the value of the Future. The 135 * returned future will create a thread using the given executor to wait for 136 * the source future to complete before executing the listeners. 137 * 138 * <p>Callers who have a future that subclasses 139 * {@link java.util.concurrent.FutureTask} may want to instead subclass 140 * {@link ListenableFutureTask}, which adds the {@link ListenableFuture} 141 * functionality to the standard {@code FutureTask} implementation. 142 */ 143 static <V> ListenableFuture<V> makeListenable( 144 Future<V> future, Executor executor) { 145 checkNotNull(executor); 146 if (future instanceof ListenableFuture<?>) { 147 return (ListenableFuture<V>) future; 148 } 149 return new ListenableFutureAdapter<V>(future, executor); 150 } 151 152 /** 153 * Creates a {@link CheckedFuture} out of a normal {@link Future} and a 154 * {@link Function} that maps from {@link Exception} instances into the 155 * appropriate checked type. 156 * 157 * <p>The given mapping function will be applied to an 158 * {@link InterruptedException}, a {@link CancellationException}, or an 159 * {@link ExecutionException} with the actual cause of the exception. 160 * See {@link Future#get()} for details on the exceptions thrown. 161 */ 162 public static <V, X extends Exception> CheckedFuture<V, X> makeChecked( 163 Future<V> future, Function<Exception, X> mapper) { 164 return new MappingCheckedFuture<V, X>(makeListenable(future), mapper); 165 } 166 167 /** 168 * Creates a {@code ListenableFuture} which has its value set immediately upon 169 * construction. The getters just return the value. This {@code Future} can't 170 * be canceled or timed out and its {@code isDone()} method always returns 171 * {@code true}. It's useful for returning something that implements the 172 * {@code ListenableFuture} interface but already has the result. 173 */ 174 public static <V> ListenableFuture<V> immediateFuture(@Nullable V value) { 175 ValueFuture<V> future = ValueFuture.create(); 176 future.set(value); 177 return future; 178 } 179 180 /** 181 * Returns a {@code CheckedFuture} which has its value set immediately upon 182 * construction. 183 * 184 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 185 * method always returns {@code true}. Calling {@code get()} or {@code 186 * checkedGet()} will immediately return the provided value. 187 */ 188 public static <V, X extends Exception> CheckedFuture<V, X> 189 immediateCheckedFuture(@Nullable V value) { 190 ValueFuture<V> future = ValueFuture.create(); 191 future.set(value); 192 return Futures.makeChecked(future, new Function<Exception, X>() { 193 public X apply(Exception e) { 194 throw new AssertionError("impossible"); 195 } 196 }); 197 } 198 199 /** 200 * Returns a {@code ListenableFuture} which has an exception set immediately 201 * upon construction. 202 * 203 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 204 * method always returns {@code true}. Calling {@code get()} will immediately 205 * throw the provided {@code Throwable} wrapped in an {@code 206 * ExecutionException}. 207 * 208 * @throws Error if the throwable is an {@link Error}. 209 */ 210 public static <V> ListenableFuture<V> immediateFailedFuture( 211 Throwable throwable) { 212 checkNotNull(throwable); 213 ValueFuture<V> future = ValueFuture.create(); 214 future.setException(throwable); 215 return future; 216 } 217 218 /** 219 * Returns a {@code CheckedFuture} which has an exception set immediately upon 220 * construction. 221 * 222 * <p>The returned {@code Future} can't be cancelled, and its {@code isDone()} 223 * method always returns {@code true}. Calling {@code get()} will immediately 224 * throw the provided {@code Throwable} wrapped in an {@code 225 * ExecutionException}, and calling {@code checkedGet()} will throw the 226 * provided exception itself. 227 * 228 * @throws Error if the throwable is an {@link Error}. 229 */ 230 public static <V, X extends Exception> CheckedFuture<V, X> 231 immediateFailedCheckedFuture(final X exception) { 232 checkNotNull(exception); 233 return makeChecked(Futures.<V>immediateFailedFuture(exception), 234 new Function<Exception, X>() { 235 public X apply(Exception e) { 236 return exception; 237 } 238 }); 239 } 240 241 /** 242 * Returns a new {@code ListenableFuture} whose result is asynchronously 243 * derived from the result of the given {@code Future}. More precisely, the 244 * returned {@code Future} takes its result from a {@code Future} produced by 245 * applying the given {@code Function} to the result of the original {@code 246 * Future}. 247 * 248 * <p>Successful cancellation of either the input future or the result of 249 * function application will cause the returned future to be cancelled. 250 * Cancelling the returned future will succeed if it is currently running. 251 * In this case, attempts will be made to cancel the input future and the 252 * result of the function, however there is no guarantee of success. 253 * 254 * <p>TODO: Add a version that accepts a normal {@code Future} 255 * 256 * <p>The typical use for this method would be when a RPC call is dependent on 257 * the results of another RPC. One would call the first RPC (input), create a 258 * function that calls another RPC based on input's result, and then call 259 * chain on input and that function to get a {@code ListenableFuture} of 260 * the result. 261 * 262 * @param input The future to chain 263 * @param function A function to chain the results of the provided future 264 * to the results of the returned future. This will be run in the thread 265 * that notifies input it is complete. 266 * @return A future that holds result of the chain. 267 */ 268 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 269 Function<? super I, ? extends ListenableFuture<? extends O>> function) { 270 return chain(input, function, MoreExecutors.sameThreadExecutor()); 271 } 272 273 /** 274 * Returns a new {@code ListenableFuture} whose result is asynchronously 275 * derived from the result of the given {@code Future}. More precisely, the 276 * returned {@code Future} takes its result from a {@code Future} produced by 277 * applying the given {@code Function} to the result of the original {@code 278 * Future}. 279 * 280 * <p>Successful cancellation of either the input future or the result of 281 * function application will cause the returned future to be cancelled. 282 * Cancelling the returned future will succeed if it is currently running. 283 * In this case, attempts will be made to cancel the input future and the 284 * result of the function, however there is no guarantee of success. 285 * 286 * <p>This version allows an arbitrary executor to be passed in for running 287 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor}, 288 * the thread chained Function executes in will be whichever thread set the 289 * result of the input Future, which may be the network thread in the case of 290 * RPC-based Futures. 291 * 292 * @param input The future to chain 293 * @param function A function to chain the results of the provided future 294 * to the results of the returned future. 295 * @param exec Executor to run the function in. 296 * @return A future that holds result of the chain. 297 */ 298 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 299 Function<? super I, ? extends ListenableFuture<? extends O>> function, 300 Executor exec) { 301 ChainingListenableFuture<I, O> chain = 302 new ChainingListenableFuture<I, O>(function, input); 303 input.addListener(chain, exec); 304 return chain; 305 } 306 307 /** 308 * Returns a new {@code ListenableFuture} whose result is the product of 309 * applying the given {@code Function} to the result of the given {@code 310 * Future}. 311 * 312 * <p>Successful cancellation of the input future will cause the returned 313 * future to be cancelled. Cancelling the returned future will succeed if it 314 * is currently running. In this case, an attempt will be made to cancel the 315 * input future, however there is no guarantee of success. 316 * 317 * <p>An example use of this method is to convert a serializable object 318 * returned from an RPC into a POJO. 319 * 320 * @param future The future to compose 321 * @param function A Function to compose the results of the provided future 322 * to the results of the returned future. This will be run in the thread 323 * that notifies input it is complete. 324 * @return A future that holds result of the composition. 325 */ 326 public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future, 327 final Function<? super I, ? extends O> function) { 328 return compose(future, function, MoreExecutors.sameThreadExecutor()); 329 } 330 331 /** 332 * Returns a new {@code ListenableFuture} whose result is the product of 333 * applying the given {@code Function} to the result of the given {@code 334 * Future}. 335 * 336 * <p>Successful cancellation of the input future will cause the returned 337 * future to be cancelled. Cancelling the returned future will succeed if it 338 * is currently running. In this case, an attempt will be made to cancel the 339 * input future, however there is no guarantee of success. 340 * 341 * <p>An example use of this method is to convert a serializable object 342 * returned from an RPC into a POJO. 343 * 344 * <p>This version allows an arbitrary executor to be passed in for running 345 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor}, 346 * the thread chained Function executes in will be whichever thread set the 347 * result of the input Future, which may be the network thread in the case of 348 * RPC-based Futures. 349 * 350 * @param future The future to compose 351 * @param function A Function to compose the results of the provided future 352 * to the results of the returned future. 353 * @param exec Executor to run the function in. 354 * @return A future that holds result of the composition. 355 * @since 2 356 */ 357 public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future, 358 final Function<? super I, ? extends O> function, Executor exec) { 359 checkNotNull(function); 360 Function<I, ListenableFuture<O>> wrapperFunction 361 = new Function<I, ListenableFuture<O>>() { 362 @Override public ListenableFuture<O> apply(I input) { 363 O output = function.apply(input); 364 return immediateFuture(output); 365 } 366 }; 367 return chain(future, wrapperFunction, exec); 368 } 369 370 /** 371 * Returns a new {@code Future} whose result is the product of applying the 372 * given {@code Function} to the result of the given {@code Future}. 373 * 374 * <p>An example use of this method is to convert a Future that produces a 375 * handle to an object to a future that produces the object itself. 376 * 377 * <p>Each call to {@code Future<O>.get(*)} results in a call to 378 * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it 379 * is assumed that {@code Future<I>.get(*)} is idempotent. 380 * 381 * <p>When calling {@link Future#get(long, TimeUnit)} on the returned 382 * future, the timeout only applies to the future passed in to this method. 383 * Any additional time taken by applying {@code function} is not considered. 384 * (Exception: If the input future is a {@link ListenableFuture}, timeouts 385 * will be strictly enforced.) 386 * 387 * @param future The future to compose 388 * @param function A Function to compose the results of the provided future 389 * to the results of the returned future. This will be run in the thread 390 * that calls one of the varieties of {@code get()}. 391 * @return A future that computes result of the composition. 392 */ 393 public static <I, O> Future<O> compose(final Future<I> future, 394 final Function<? super I, ? extends O> function) { 395 if (future instanceof ListenableFuture) { 396 return compose((ListenableFuture<I>) future, function); 397 } 398 checkNotNull(future); 399 checkNotNull(function); 400 return new Future<O>() { 401 402 /* 403 * Concurrency detail: 404 * 405 * <p>To preserve the idempotency of calls to this.get(*) calls to the 406 * function are only applied once. A lock is required to prevent multiple 407 * applications of the function. The calls to future.get(*) are performed 408 * outside the lock, as is required to prevent calls to 409 * get(long, TimeUnit) to persist beyond their timeout. 410 * 411 * <p>Calls to future.get(*) on every call to this.get(*) also provide 412 * the cancellation behavior for this. 413 * 414 * <p>(Consider: in thread A, call get(), in thread B call get(long, 415 * TimeUnit). Thread B may have to wait for Thread A to finish, which 416 * would be unacceptable.) 417 * 418 * <p>Note that each call to Future<O>.get(*) results in a call to 419 * Future<I>.get(*), but the function is only applied once, so 420 * Future<I>.get(*) is assumed to be idempotent. 421 */ 422 423 private final Object lock = new Object(); 424 private boolean set = false; 425 private O value = null; 426 private ExecutionException exception = null; 427 428 @Override 429 public O get() throws InterruptedException, ExecutionException { 430 return apply(future.get()); 431 } 432 433 @Override 434 public O get(long timeout, TimeUnit unit) throws InterruptedException, 435 ExecutionException, TimeoutException { 436 return apply(future.get(timeout, unit)); 437 } 438 439 private O apply(I raw) throws ExecutionException { 440 synchronized (lock) { 441 if (!set) { 442 try { 443 value = function.apply(raw); 444 } catch (RuntimeException e) { 445 exception = new ExecutionException(e); 446 } catch (Error e) { 447 exception = new ExecutionException(e); 448 } 449 set = true; 450 } 451 452 if (exception != null) { 453 throw exception; 454 } 455 return value; 456 } 457 } 458 459 @Override 460 public boolean cancel(boolean mayInterruptIfRunning) { 461 return future.cancel(mayInterruptIfRunning); 462 } 463 464 @Override 465 public boolean isCancelled() { 466 return future.isCancelled(); 467 } 468 469 @Override 470 public boolean isDone() { 471 return future.isDone(); 472 } 473 }; 474 } 475 476 /** 477 * An implementation of {@code ListenableFuture} that also implements 478 * {@code Runnable} so that it can be used to nest ListenableFutures. 479 * Once the passed-in {@code ListenableFuture} is complete, it calls the 480 * passed-in {@code Function} to generate the result. 481 * 482 * <p>If the function throws any checked exceptions, they should be wrapped 483 * in a {@code UndeclaredThrowableException} so that this class can get 484 * access to the cause. 485 */ 486 private static class ChainingListenableFuture<I, O> 487 extends AbstractListenableFuture<O> implements Runnable { 488 489 private Function<? super I, ? extends ListenableFuture<? extends O>> 490 function; 491 private ListenableFuture<? extends I> inputFuture; 492 private volatile ListenableFuture<? extends O> outputFuture; 493 private final BlockingQueue<Boolean> mayInterruptIfRunningChannel = 494 new LinkedBlockingQueue<Boolean>(1); 495 private final CountDownLatch outputCreated = new CountDownLatch(1); 496 497 private ChainingListenableFuture( 498 Function<? super I, ? extends ListenableFuture<? extends O>> function, 499 ListenableFuture<? extends I> inputFuture) { 500 this.function = checkNotNull(function); 501 this.inputFuture = checkNotNull(inputFuture); 502 } 503 504 /** 505 * Delegate the get() to the input and output futures, in case 506 * their implementations defer starting computation until their 507 * own get() is invoked. 508 */ 509 @Override 510 public O get() throws InterruptedException, ExecutionException { 511 if (!isDone()) { 512 // Invoking get on the inputFuture will ensure our own run() 513 // method below is invoked as a listener when inputFuture sets 514 // its value. Therefore when get() returns we should then see 515 // the outputFuture be created. 516 ListenableFuture<? extends I> inputFuture = this.inputFuture; 517 if (inputFuture != null) { 518 inputFuture.get(); 519 } 520 521 // If our listener was scheduled to run on an executor we may 522 // need to wait for our listener to finish running before the 523 // outputFuture has been constructed by the function. 524 outputCreated.await(); 525 526 // Like above with the inputFuture, we have a listener on 527 // the outputFuture that will set our own value when its 528 // value is set. Invoking get will ensure the output can 529 // complete and invoke our listener, so that we can later 530 // get the result. 531 ListenableFuture<? extends O> outputFuture = this.outputFuture; 532 if (outputFuture != null) { 533 outputFuture.get(); 534 } 535 } 536 return super.get(); 537 } 538 539 /** 540 * Delegate the get() to the input and output futures, in case 541 * their implementations defer starting computation until their 542 * own get() is invoked. 543 */ 544 @Override 545 public O get(long timeout, TimeUnit unit) throws TimeoutException, 546 ExecutionException, InterruptedException { 547 if (!isDone()) { 548 // Use a single time unit so we can decrease remaining timeout 549 // as we wait for various phases to complete. 550 if (unit != NANOSECONDS) { 551 timeout = NANOSECONDS.convert(timeout, unit); 552 unit = NANOSECONDS; 553 } 554 555 // Invoking get on the inputFuture will ensure our own run() 556 // method below is invoked as a listener when inputFuture sets 557 // its value. Therefore when get() returns we should then see 558 // the outputFuture be created. 559 ListenableFuture<? extends I> inputFuture = this.inputFuture; 560 if (inputFuture != null) { 561 long start = System.nanoTime(); 562 inputFuture.get(timeout, unit); 563 timeout -= Math.max(0, System.nanoTime() - start); 564 } 565 566 // If our listener was scheduled to run on an executor we may 567 // need to wait for our listener to finish running before the 568 // outputFuture has been constructed by the function. 569 long start = System.nanoTime(); 570 if (!outputCreated.await(timeout, unit)) { 571 throw new TimeoutException(); 572 } 573 timeout -= Math.max(0, System.nanoTime() - start); 574 575 // Like above with the inputFuture, we have a listener on 576 // the outputFuture that will set our own value when its 577 // value is set. Invoking get will ensure the output can 578 // complete and invoke our listener, so that we can later 579 // get the result. 580 ListenableFuture<? extends O> outputFuture = this.outputFuture; 581 if (outputFuture != null) { 582 outputFuture.get(timeout, unit); 583 } 584 } 585 return super.get(timeout, unit); 586 } 587 588 @Override 589 public boolean cancel(boolean mayInterruptIfRunning) { 590 if (cancel()) { 591 try { 592 // This should never block since only one thread is allowed to cancel 593 // this Future. 594 mayInterruptIfRunningChannel.put(mayInterruptIfRunning); 595 } catch (InterruptedException ignored) { 596 Thread.currentThread().interrupt(); 597 } 598 cancel(inputFuture, mayInterruptIfRunning); 599 cancel(outputFuture, mayInterruptIfRunning); 600 return true; 601 } 602 return false; 603 } 604 605 private void cancel(@Nullable Future<?> future, 606 boolean mayInterruptIfRunning) { 607 if (future != null) { 608 future.cancel(mayInterruptIfRunning); 609 } 610 } 611 612 public void run() { 613 try { 614 I sourceResult; 615 try { 616 sourceResult = makeUninterruptible(inputFuture).get(); 617 } catch (CancellationException e) { 618 // Cancel this future and return. 619 cancel(); 620 return; 621 } catch (ExecutionException e) { 622 // Set the cause of the exception as this future's exception 623 setException(e.getCause()); 624 return; 625 } 626 627 final ListenableFuture<? extends O> outputFuture = this.outputFuture = 628 function.apply(sourceResult); 629 if (isCancelled()) { 630 // Handles the case where cancel was called while the function was 631 // being applied. 632 try { 633 // There is a gap in cancel(boolean) between calling cancel() and 634 // storing the value of mayInterruptIfRunning, so this thread needs 635 // to block, waiting for that value. 636 outputFuture.cancel(mayInterruptIfRunningChannel.take()); 637 } catch (InterruptedException ignored) { 638 Thread.currentThread().interrupt(); 639 } 640 this.outputFuture = null; 641 return; 642 } 643 outputFuture.addListener(new Runnable() { 644 public void run() { 645 try { 646 // Here it would have been nice to have had an 647 // UninterruptibleListenableFuture, but we don't want to start a 648 // combinatorial explosion of interfaces, so we have to make do. 649 set(makeUninterruptible(outputFuture).get()); 650 } catch (CancellationException e) { 651 // Cancel this future and return. 652 cancel(); 653 return; 654 } catch (ExecutionException e) { 655 // Set the cause of the exception as this future's exception 656 setException(e.getCause()); 657 } finally { 658 // Don't pin inputs beyond completion 659 ChainingListenableFuture.this.outputFuture = null; 660 } 661 } 662 }, MoreExecutors.sameThreadExecutor()); 663 } catch (UndeclaredThrowableException e) { 664 // Set the cause of the exception as this future's exception 665 setException(e.getCause()); 666 } catch (RuntimeException e) { 667 // This exception is irrelevant in this thread, but useful for the 668 // client 669 setException(e); 670 } catch (Error e) { 671 // Propagate errors up ASAP - our superclass will rethrow the error 672 setException(e); 673 } finally { 674 // Don't pin inputs beyond completion 675 function = null; 676 inputFuture = null; 677 // Allow our get routines to examine outputFuture now. 678 outputCreated.countDown(); 679 } 680 } 681 } 682 683 /** 684 * A checked future that uses a function to map from exceptions to the 685 * appropriate checked type. 686 */ 687 private static class MappingCheckedFuture<V, X extends Exception> extends 688 AbstractCheckedFuture<V, X> { 689 690 final Function<Exception, X> mapper; 691 692 MappingCheckedFuture(ListenableFuture<V> delegate, 693 Function<Exception, X> mapper) { 694 super(delegate); 695 696 this.mapper = checkNotNull(mapper); 697 } 698 699 @Override 700 protected X mapException(Exception e) { 701 return mapper.apply(e); 702 } 703 } 704 705 /** 706 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This 707 * will wait on the future to finish, and when it completes, run the 708 * listeners. This implementation will wait on the source future 709 * indefinitely, so if the source future never completes, the adapter will 710 * never complete either. 711 * 712 * <p>If the delegate future is interrupted or throws an unexpected unchecked 713 * exception, the listeners will not be invoked. 714 */ 715 private static class ListenableFutureAdapter<V> extends ForwardingFuture<V> 716 implements ListenableFuture<V> { 717 718 private static final ThreadFactory threadFactory = 719 new ThreadFactoryBuilder() 720 .setNameFormat("ListenableFutureAdapter-thread-%d") 721 .build(); 722 private static final Executor defaultAdapterExecutor = 723 Executors.newCachedThreadPool(threadFactory); 724 725 private final Executor adapterExecutor; 726 727 // The execution list to hold our listeners. 728 private final ExecutionList executionList = new ExecutionList(); 729 730 // This allows us to only start up a thread waiting on the delegate future 731 // when the first listener is added. 732 private final AtomicBoolean hasListeners = new AtomicBoolean(false); 733 734 // The delegate future. 735 private final Future<V> delegate; 736 737 ListenableFutureAdapter(Future<V> delegate) { 738 this(delegate, defaultAdapterExecutor); 739 } 740 741 ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) { 742 this.delegate = checkNotNull(delegate); 743 this.adapterExecutor = checkNotNull(adapterExecutor); 744 } 745 746 @Override 747 protected Future<V> delegate() { 748 return delegate; 749 } 750 751 @Override 752 public void addListener(Runnable listener, Executor exec) { 753 executionList.add(listener, exec); 754 755 // When a listener is first added, we run a task that will wait for 756 // the delegate to finish, and when it is done will run the listeners. 757 if (hasListeners.compareAndSet(false, true)) { 758 if (delegate.isDone()) { 759 // If the delegate is already done, run the execution list 760 // immediately on the current thread. 761 executionList.run(); 762 return; 763 } 764 765 adapterExecutor.execute(new Runnable() { 766 @Override 767 public void run() { 768 try { 769 delegate.get(); 770 } catch (Error e) { 771 throw e; 772 } catch (InterruptedException e) { 773 // This thread was interrupted. This should never happen, so we 774 // throw an IllegalStateException. 775 Thread.currentThread().interrupt(); 776 throw new IllegalStateException("Adapter thread interrupted!", e); 777 } catch (Throwable e) { 778 // ExecutionException / CancellationException / RuntimeException 779 // The task is done, run the listeners. 780 } 781 executionList.run(); 782 } 783 }); 784 } 785 } 786 } 787 }