001 /* 002 * Copyright (C) 2006 Google Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkNotNull; 020 import static java.util.concurrent.TimeUnit.NANOSECONDS; 021 022 import com.google.common.annotations.Beta; 023 import com.google.common.base.Function; 024 025 import java.lang.reflect.UndeclaredThrowableException; 026 import java.util.concurrent.CancellationException; 027 import java.util.concurrent.ExecutionException; 028 import java.util.concurrent.Executor; 029 import java.util.concurrent.Future; 030 import java.util.concurrent.TimeUnit; 031 import java.util.concurrent.TimeoutException; 032 import java.util.concurrent.atomic.AtomicBoolean; 033 034 import javax.annotation.Nullable; 035 036 /** 037 * Static utility methods pertaining to the {@link Future} interface. 038 * 039 * @author Kevin Bourrillion 040 * @author Nishant Thakkar 041 * @author Sven Mawson 042 * @since 1 043 */ 044 @Beta 045 public class Futures { 046 private Futures() {} 047 048 /** 049 * Returns an uninterruptible view of a {@code Future}. If a thread is 050 * interrupted during an attempt to {@code get()} from the returned future, it 051 * continues to wait on the result until it is available or the timeout 052 * elapses, and only then re-interrupts the thread. 053 */ 054 public static <V> UninterruptibleFuture<V> makeUninterruptible( 055 final Future<V> future) { 056 checkNotNull(future); 057 if (future instanceof UninterruptibleFuture) { 058 return (UninterruptibleFuture<V>) future; 059 } 060 return new UninterruptibleFuture<V>() { 061 public boolean cancel(boolean mayInterruptIfRunning) { 062 return future.cancel(mayInterruptIfRunning); 063 } 064 public boolean isCancelled() { 065 return future.isCancelled(); 066 } 067 public boolean isDone() { 068 return future.isDone(); 069 } 070 071 public V get(long timeoutDuration, TimeUnit timeoutUnit) 072 throws TimeoutException, ExecutionException { 073 boolean interrupted = false; 074 try { 075 long timeoutNanos = timeoutUnit.toNanos(timeoutDuration); 076 long end = System.nanoTime() + timeoutNanos; 077 while (true) { 078 try { 079 return future.get(timeoutNanos, NANOSECONDS); 080 } catch (InterruptedException e) { 081 // Future treats negative timeouts just like zero. 082 timeoutNanos = end - System.nanoTime(); 083 interrupted = true; 084 } 085 } 086 } finally { 087 if (interrupted) { 088 Thread.currentThread().interrupt(); 089 } 090 } 091 } 092 093 public V get() throws ExecutionException { 094 boolean interrupted = false; 095 try { 096 while (true) { 097 try { 098 return future.get(); 099 } catch (InterruptedException ignored) { 100 interrupted = true; 101 } 102 } 103 } finally { 104 if (interrupted) { 105 Thread.currentThread().interrupt(); 106 } 107 } 108 } 109 }; 110 } 111 112 /** 113 * Creates a {@link ListenableFuture} out of a normal {@link Future}. The 114 * returned future will create a thread to wait for the source future to 115 * complete before executing the listeners. 116 * 117 * <p>Callers who have a future that subclasses 118 * {@link java.util.concurrent.FutureTask} may want to instead subclass 119 * {@link ListenableFutureTask}, which adds the {@link ListenableFuture} 120 * functionality to the standard {@code FutureTask} implementation. 121 */ 122 public static <T> ListenableFuture<T> makeListenable(Future<T> future) { 123 if (future instanceof ListenableFuture) { 124 return (ListenableFuture<T>) future; 125 } 126 return new ListenableFutureAdapter<T>(future); 127 } 128 129 /** 130 * Creates a {@link CheckedFuture} out of a normal {@link Future} and a 131 * {@link Function} that maps from {@link Exception} instances into the 132 * appropriate checked type. 133 * 134 * <p>The given mapping function will be applied to an 135 * {@link InterruptedException}, a {@link CancellationException}, or an 136 * {@link ExecutionException} with the actual cause of the exception. 137 * See {@link Future#get()} for details on the exceptions thrown. 138 */ 139 public static <T, E extends Exception> CheckedFuture<T, E> makeChecked( 140 Future<T> future, Function<Exception, E> mapper) { 141 return new MappingCheckedFuture<T, E>(makeListenable(future), mapper); 142 } 143 144 /** 145 * Creates a {@code ListenableFuture} which has its value set immediately upon 146 * construction. The getters just return the value. This {@code Future} can't 147 * be canceled or timed out and its {@code isDone()} method always returns 148 * {@code true}. It's useful for returning something that implements the 149 * {@code ListenableFuture} interface but already has the result. 150 */ 151 public static <T> ListenableFuture<T> immediateFuture(@Nullable T value) { 152 ValueFuture<T> future = ValueFuture.create(); 153 future.set(value); 154 return future; 155 } 156 157 /** 158 * Creates a {@code CheckedFuture} which has its value set immediately upon 159 * construction. The getters just return the value. This {@code Future} can't 160 * be canceled or timed out and its {@code isDone()} method always returns 161 * {@code true}. It's useful for returning something that implements the 162 * {@code CheckedFuture} interface but already has the result. 163 */ 164 public static <T, E extends Exception> CheckedFuture<T, E> 165 immediateCheckedFuture(@Nullable T value) { 166 ValueFuture<T> future = ValueFuture.create(); 167 future.set(value); 168 return Futures.makeChecked(future, new Function<Exception, E>() { 169 public E apply(Exception e) { 170 throw new AssertionError("impossible"); 171 } 172 }); 173 } 174 175 /** 176 * Creates a {@code ListenableFuture} which has an exception set immediately 177 * upon construction. The getters just return the value. This {@code Future} 178 * can't be canceled or timed out and its {@code isDone()} method always 179 * returns {@code true}. It's useful for returning something that implements 180 * the {@code ListenableFuture} interface but already has a failed 181 * result. Calling {@code get()} will throw the provided {@code Throwable} 182 * (wrapped in an {@code ExecutionException}). 183 * 184 * @throws Error if the throwable was an {@link Error}. 185 */ 186 public static <T> ListenableFuture<T> immediateFailedFuture( 187 Throwable throwable) { 188 checkNotNull(throwable); 189 ValueFuture<T> future = ValueFuture.create(); 190 future.setException(throwable); 191 return future; 192 } 193 194 /** 195 * Creates a {@code CheckedFuture} which has an exception set immediately 196 * upon construction. The getters just return the value. This {@code Future} 197 * can't be canceled or timed out and its {@code isDone()} method always 198 * returns {@code true}. It's useful for returning something that implements 199 * the {@code CheckedFuture} interface but already has a failed result. 200 * Calling {@code get()} will throw the provided {@code Throwable} (wrapped in 201 * an {@code ExecutionException}) and calling {@code checkedGet()} will throw 202 * the provided exception itself. 203 * 204 * @throws Error if the throwable was an {@link Error}. 205 */ 206 public static <T, E extends Exception> CheckedFuture<T, E> 207 immediateFailedCheckedFuture(final E exception) { 208 checkNotNull(exception); 209 return makeChecked(Futures.<T>immediateFailedFuture(exception), 210 new Function<Exception, E>() { 211 public E apply(Exception e) { 212 return exception; 213 } 214 }); 215 } 216 217 /** 218 * Creates a new {@code ListenableFuture} that wraps another 219 * {@code ListenableFuture}. The result of the new future is the result of 220 * the provided function called on the result of the provided future. 221 * The resulting future doesn't interrupt when aborted. 222 * 223 * <p>TODO: Add a version that accepts a normal {@code Future} 224 * 225 * <p>The typical use for this method would be when a RPC call is dependent on 226 * the results of another RPC. One would call the first RPC (input), create a 227 * function that calls another RPC based on input's result, and then call 228 * chain on input and that function to get a {@code ListenableFuture} of 229 * the result. 230 * 231 * @param input The future to chain 232 * @param function A function to chain the results of the provided future 233 * to the results of the returned future. This will be run in the thread 234 * that notifies input it is complete. 235 * @return A future that holds result of the chain. 236 */ 237 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 238 Function<? super I, ? extends ListenableFuture<? extends O>> function) { 239 return chain(input, function, MoreExecutors.sameThreadExecutor()); 240 } 241 242 /** 243 * Creates a new {@code ListenableFuture} that wraps another 244 * {@code ListenableFuture}. The result of the new future is the result of 245 * the provided function called on the result of the provided future. 246 * The resulting future doesn't interrupt when aborted. 247 * 248 * <p>This version allows an arbitrary executor to be passed in for running 249 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor}, 250 * the thread chained Function executes in will be whichever thread set the 251 * result of the input Future, which may be the network thread in the case of 252 * RPC-based Futures. 253 * 254 * @param input The future to chain 255 * @param function A function to chain the results of the provided future 256 * to the results of the returned future. 257 * @param exec Executor to run the function in. 258 * @return A future that holds result of the chain. 259 */ 260 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input, 261 Function<? super I, ? extends ListenableFuture<? extends O>> function, 262 Executor exec) { 263 ChainingListenableFuture<I, O> chain = 264 new ChainingListenableFuture<I, O>(function, input); 265 input.addListener(chain, exec); 266 return chain; 267 } 268 269 /** 270 * Creates a new {@code ListenableFuture} that wraps another 271 * {@code ListenableFuture}. The result of the new future is the result of 272 * the provided function called on the result of the provided future. 273 * The resulting future doesn't interrupt when aborted. 274 * 275 * <p>An example use of this method is to convert a serializable object 276 * returned from an RPC into a POJO. 277 * 278 * @param future The future to compose 279 * @param function A Function to compose the results of the provided future 280 * to the results of the returned future. This will be run in the thread 281 * that notifies input it is complete. 282 * @return A future that holds result of the composition. 283 */ 284 public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future, 285 final Function<? super I, ? extends O> function) { 286 return compose(future, function, MoreExecutors.sameThreadExecutor()); 287 } 288 289 /** 290 * Creates a new {@code ListenableFuture} that wraps another 291 * {@code ListenableFuture}. The result of the new future is the result of 292 * the provided function called on the result of the provided future. 293 * The resulting future doesn't interrupt when aborted. 294 * 295 * <p>An example use of this method is to convert a serializable object 296 * returned from an RPC into a POJO. 297 * 298 * <p>This version allows an arbitrary executor to be passed in for running 299 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor}, 300 * the thread chained Function executes in will be whichever thread set the 301 * result of the input Future, which may be the network thread in the case of 302 * RPC-based Futures. 303 * 304 * @param future The future to compose 305 * @param function A Function to compose the results of the provided future 306 * to the results of the returned future. 307 * @param exec Executor to run the function in. 308 * @return A future that holds result of the composition. 309 * @since 2 310 */ 311 public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future, 312 final Function<? super I, ? extends O> function, Executor exec) { 313 checkNotNull(function); 314 Function<I, ListenableFuture<O>> wrapperFunction 315 = new Function<I, ListenableFuture<O>>() { 316 @Override public ListenableFuture<O> apply(I input) { 317 O output = function.apply(input); 318 return immediateFuture(output); 319 } 320 }; 321 return chain(future, wrapperFunction, exec); 322 } 323 324 /** 325 * Creates a new {@code Future} that wraps another {@code Future}. 326 * The result of the new future is the result of the provided function called 327 * on the result of the provided future. 328 * 329 * <p>An example use of this method is to convert a Future that produces a 330 * handle to an object to a future that produces the object itself. 331 * 332 * <p>Each call to {@code Future<O>.get(*)} results in a call to 333 * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it 334 * is assumed that {@code Future<I>.get(*)} is idempotent. 335 * 336 * <p>When calling {@link Future#get(long, TimeUnit)} on the returned 337 * future, the timeout only applies to the future passed in to this method. 338 * Any additional time taken by applying {@code function} is not considered. 339 * 340 * @param future The future to compose 341 * @param function A Function to compose the results of the provided future 342 * to the results of the returned future. This will be run in the thread 343 * that calls one of the varieties of {@code get()}. 344 * @return A future that computes result of the composition. 345 */ 346 public static <I, O> Future<O> compose(final Future<I> future, 347 final Function<? super I, ? extends O> function) { 348 checkNotNull(future); 349 checkNotNull(function); 350 return new Future<O>() { 351 352 /* 353 * Concurrency detail: 354 * 355 * <p>To preserve the idempotency of calls to this.get(*) calls to the 356 * function are only applied once. A lock is required to prevent multiple 357 * applications of the function. The calls to future.get(*) are performed 358 * outside the lock, as is required to prevent calls to 359 * get(long, TimeUnit) to persist beyond their timeout. 360 * 361 * <p>Calls to future.get(*) on every call to this.get(*) also provide 362 * the cancellation behavior for this. 363 * 364 * <p>(Consider: in thread A, call get(), in thread B call get(long, 365 * TimeUnit). Thread B may have to wait for Thread A to finish, which 366 * would be unacceptable.) 367 * 368 * <p>Note that each call to Future<O>.get(*) results in a call to 369 * Future<I>.get(*), but the function is only applied once, so 370 * Future<I>.get(*) is assumed to be idempotent. 371 */ 372 373 private final Object lock = new Object(); 374 private boolean set = false; 375 private O value = null; 376 377 @Override 378 public O get() throws InterruptedException, ExecutionException { 379 return apply(future.get()); 380 } 381 382 @Override 383 public O get(long timeout, TimeUnit unit) throws InterruptedException, 384 ExecutionException, TimeoutException { 385 return apply(future.get(timeout, unit)); 386 } 387 388 private O apply(I raw) { 389 synchronized(lock) { 390 if (!set) { 391 value = function.apply(raw); 392 set = true; 393 } 394 return value; 395 } 396 } 397 398 @Override 399 public boolean cancel(boolean mayInterruptIfRunning) { 400 return future.cancel(mayInterruptIfRunning); 401 } 402 403 @Override 404 public boolean isCancelled() { 405 return future.isCancelled(); 406 } 407 408 @Override 409 public boolean isDone() { 410 return future.isDone(); 411 } 412 }; 413 } 414 415 /** 416 * An implementation of {@code ListenableFuture} that also implements 417 * {@code Runnable} so that it can be used to nest ListenableFutures. 418 * Once the passed-in {@code ListenableFuture} is complete, it calls the 419 * passed-in {@code Function} to generate the result. 420 * The resulting future doesn't interrupt when aborted. 421 * 422 * <p>If the function throws any checked exceptions, they should be wrapped 423 * in a {@code UndeclaredThrowableException} so that this class can get 424 * access to the cause. 425 */ 426 private static class ChainingListenableFuture<I, O> 427 extends AbstractListenableFuture<O> implements Runnable { 428 429 private Function<? super I, ? extends ListenableFuture<? extends O>> 430 function; 431 private UninterruptibleFuture<? extends I> inputFuture; 432 433 private ChainingListenableFuture( 434 Function<? super I, ? extends ListenableFuture<? extends O>> function, 435 ListenableFuture<? extends I> inputFuture) { 436 this.function = checkNotNull(function); 437 this.inputFuture = makeUninterruptible(inputFuture); 438 } 439 440 public boolean cancel(boolean mayInterruptIfRunning) { 441 Future<? extends I> future = inputFuture; 442 if (future != null) { 443 return future.cancel(mayInterruptIfRunning); 444 } 445 return false; 446 } 447 448 public void run() { 449 try { 450 I sourceResult; 451 try { 452 sourceResult = inputFuture.get(); 453 } catch (CancellationException e) { 454 // Cancel this future and return. 455 cancel(); 456 return; 457 } catch (ExecutionException e) { 458 // Set the cause of the exception as this future's exception 459 setException(e.getCause()); 460 return; 461 } 462 463 final ListenableFuture<? extends O> outputFuture = 464 function.apply(sourceResult); 465 outputFuture.addListener(new Runnable() { 466 public void run() { 467 try { 468 // Here it would have been nice to have had an 469 // UninterruptibleListenableFuture, but we don't want to start a 470 // combinatorial explosion of interfaces, so we have to make do. 471 set(makeUninterruptible(outputFuture).get()); 472 } catch (ExecutionException e) { 473 // Set the cause of the exception as this future's exception 474 setException(e.getCause()); 475 } 476 } 477 }, MoreExecutors.sameThreadExecutor()); 478 } catch (UndeclaredThrowableException e) { 479 // Set the cause of the exception as this future's exception 480 setException(e.getCause()); 481 } catch (RuntimeException e) { 482 // This exception is irrelevant in this thread, but useful for the 483 // client 484 setException(e); 485 } catch (Error e) { 486 // This seems evil, but the client needs to know an error occured and 487 // the error needs to be propagated ASAP. 488 setException(e); 489 throw e; 490 } finally { 491 // Don't pin inputs beyond completion 492 function = null; 493 inputFuture = null; 494 } 495 } 496 } 497 498 /** 499 * A checked future that uses a function to map from exceptions to the 500 * appropriate checked type. 501 */ 502 private static class MappingCheckedFuture<T, E extends Exception> extends 503 AbstractCheckedFuture<T, E> { 504 505 final Function<Exception, E> mapper; 506 507 MappingCheckedFuture(ListenableFuture<T> delegate, 508 Function<Exception, E> mapper) { 509 super(delegate); 510 511 this.mapper = checkNotNull(mapper); 512 } 513 514 @Override 515 protected E mapException(Exception e) { 516 return mapper.apply(e); 517 } 518 } 519 520 /** 521 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This 522 * will wait on the future to finish, and when it completes, run the 523 * listeners. This implementation will wait on the source future 524 * indefinitely, so if the source future never completes, the adapter will 525 * never complete either. 526 * 527 * <p>If the delegate future is interrupted or throws an unexpected unchecked 528 * exception, the listeners will not be invoked. 529 */ 530 private static class ListenableFutureAdapter<T> extends ForwardingFuture<T> 531 implements ListenableFuture<T> { 532 533 private static final Executor adapterExecutor = 534 java.util.concurrent.Executors.newCachedThreadPool(); 535 536 // The execution list to hold our listeners. 537 private final ExecutionList executionList = new ExecutionList(); 538 539 // This allows us to only start up a thread waiting on the delegate future 540 // when the first listener is added. 541 private final AtomicBoolean hasListeners = new AtomicBoolean(false); 542 543 // The delegate future. 544 private final Future<T> delegate; 545 546 ListenableFutureAdapter(final Future<T> delegate) { 547 this.delegate = checkNotNull(delegate); 548 } 549 550 @Override 551 protected Future<T> delegate() { 552 return delegate; 553 } 554 555 @Override 556 public void addListener(Runnable listener, Executor exec) { 557 558 // When a listener is first added, we run a task that will wait for 559 // the delegate to finish, and when it is done will run the listeners. 560 if (!hasListeners.get() && hasListeners.compareAndSet(false, true)) { 561 adapterExecutor.execute(new Runnable() { 562 @Override 563 public void run() { 564 try { 565 delegate.get(); 566 } catch (CancellationException e) { 567 // The task was cancelled, so it is done, run the listeners. 568 } catch (InterruptedException e) { 569 // This thread was interrupted. This should never happen, so we 570 // throw an IllegalStateException. 571 throw new IllegalStateException("Adapter thread interrupted!", e); 572 } catch (ExecutionException e) { 573 // The task caused an exception, so it is done, run the listeners. 574 } 575 executionList.run(); 576 } 577 }); 578 } 579 executionList.add(listener, exec); 580 } 581 } 582 }