001 /*
002 * Copyright (C) 2006 Google Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.util.concurrent;
018
019 import static com.google.common.base.Preconditions.checkNotNull;
020 import static java.util.concurrent.TimeUnit.NANOSECONDS;
021
022 import com.google.common.annotations.Beta;
023 import com.google.common.base.Function;
024
025 import java.lang.reflect.UndeclaredThrowableException;
026 import java.util.concurrent.CancellationException;
027 import java.util.concurrent.ExecutionException;
028 import java.util.concurrent.Executor;
029 import java.util.concurrent.Future;
030 import java.util.concurrent.TimeUnit;
031 import java.util.concurrent.TimeoutException;
032 import java.util.concurrent.atomic.AtomicBoolean;
033
034 import javax.annotation.Nullable;
035
036 /**
037 * Static utility methods pertaining to the {@link Future} interface.
038 *
039 * @author Kevin Bourrillion
040 * @author Nishant Thakkar
041 * @author Sven Mawson
042 * @since 1
043 */
044 @Beta
045 public class Futures {
046 private Futures() {}
047
048 /**
049 * Returns an uninterruptible view of a {@code Future}. If a thread is
050 * interrupted during an attempt to {@code get()} from the returned future, it
051 * continues to wait on the result until it is available or the timeout
052 * elapses, and only then re-interrupts the thread.
053 */
054 public static <V> UninterruptibleFuture<V> makeUninterruptible(
055 final Future<V> future) {
056 checkNotNull(future);
057 if (future instanceof UninterruptibleFuture) {
058 return (UninterruptibleFuture<V>) future;
059 }
060 return new UninterruptibleFuture<V>() {
061 public boolean cancel(boolean mayInterruptIfRunning) {
062 return future.cancel(mayInterruptIfRunning);
063 }
064 public boolean isCancelled() {
065 return future.isCancelled();
066 }
067 public boolean isDone() {
068 return future.isDone();
069 }
070
071 public V get(long timeoutDuration, TimeUnit timeoutUnit)
072 throws TimeoutException, ExecutionException {
073 boolean interrupted = false;
074 try {
075 long timeoutNanos = timeoutUnit.toNanos(timeoutDuration);
076 long end = System.nanoTime() + timeoutNanos;
077 while (true) {
078 try {
079 return future.get(timeoutNanos, NANOSECONDS);
080 } catch (InterruptedException e) {
081 // Future treats negative timeouts just like zero.
082 timeoutNanos = end - System.nanoTime();
083 interrupted = true;
084 }
085 }
086 } finally {
087 if (interrupted) {
088 Thread.currentThread().interrupt();
089 }
090 }
091 }
092
093 public V get() throws ExecutionException {
094 boolean interrupted = false;
095 try {
096 while (true) {
097 try {
098 return future.get();
099 } catch (InterruptedException ignored) {
100 interrupted = true;
101 }
102 }
103 } finally {
104 if (interrupted) {
105 Thread.currentThread().interrupt();
106 }
107 }
108 }
109 };
110 }
111
112 /**
113 * Creates a {@link ListenableFuture} out of a normal {@link Future}. The
114 * returned future will create a thread to wait for the source future to
115 * complete before executing the listeners.
116 *
117 * <p>Callers who have a future that subclasses
118 * {@link java.util.concurrent.FutureTask} may want to instead subclass
119 * {@link ListenableFutureTask}, which adds the {@link ListenableFuture}
120 * functionality to the standard {@code FutureTask} implementation.
121 */
122 public static <T> ListenableFuture<T> makeListenable(Future<T> future) {
123 if (future instanceof ListenableFuture) {
124 return (ListenableFuture<T>) future;
125 }
126 return new ListenableFutureAdapter<T>(future);
127 }
128
129 /**
130 * Creates a {@link CheckedFuture} out of a normal {@link Future} and a
131 * {@link Function} that maps from {@link Exception} instances into the
132 * appropriate checked type.
133 *
134 * <p>The given mapping function will be applied to an
135 * {@link InterruptedException}, a {@link CancellationException}, or an
136 * {@link ExecutionException} with the actual cause of the exception.
137 * See {@link Future#get()} for details on the exceptions thrown.
138 */
139 public static <T, E extends Exception> CheckedFuture<T, E> makeChecked(
140 Future<T> future, Function<Exception, E> mapper) {
141 return new MappingCheckedFuture<T, E>(makeListenable(future), mapper);
142 }
143
144 /**
145 * Creates a {@code ListenableFuture} which has its value set immediately upon
146 * construction. The getters just return the value. This {@code Future} can't
147 * be canceled or timed out and its {@code isDone()} method always returns
148 * {@code true}. It's useful for returning something that implements the
149 * {@code ListenableFuture} interface but already has the result.
150 */
151 public static <T> ListenableFuture<T> immediateFuture(@Nullable T value) {
152 ValueFuture<T> future = ValueFuture.create();
153 future.set(value);
154 return future;
155 }
156
157 /**
158 * Creates a {@code CheckedFuture} which has its value set immediately upon
159 * construction. The getters just return the value. This {@code Future} can't
160 * be canceled or timed out and its {@code isDone()} method always returns
161 * {@code true}. It's useful for returning something that implements the
162 * {@code CheckedFuture} interface but already has the result.
163 */
164 public static <T, E extends Exception> CheckedFuture<T, E>
165 immediateCheckedFuture(@Nullable T value) {
166 ValueFuture<T> future = ValueFuture.create();
167 future.set(value);
168 return Futures.makeChecked(future, new Function<Exception, E>() {
169 public E apply(Exception e) {
170 throw new AssertionError("impossible");
171 }
172 });
173 }
174
175 /**
176 * Creates a {@code ListenableFuture} which has an exception set immediately
177 * upon construction. The getters just return the value. This {@code Future}
178 * can't be canceled or timed out and its {@code isDone()} method always
179 * returns {@code true}. It's useful for returning something that implements
180 * the {@code ListenableFuture} interface but already has a failed
181 * result. Calling {@code get()} will throw the provided {@code Throwable}
182 * (wrapped in an {@code ExecutionException}).
183 *
184 * @throws Error if the throwable was an {@link Error}.
185 */
186 public static <T> ListenableFuture<T> immediateFailedFuture(
187 Throwable throwable) {
188 checkNotNull(throwable);
189 ValueFuture<T> future = ValueFuture.create();
190 future.setException(throwable);
191 return future;
192 }
193
194 /**
195 * Creates a {@code CheckedFuture} which has an exception set immediately
196 * upon construction. The getters just return the value. This {@code Future}
197 * can't be canceled or timed out and its {@code isDone()} method always
198 * returns {@code true}. It's useful for returning something that implements
199 * the {@code CheckedFuture} interface but already has a failed result.
200 * Calling {@code get()} will throw the provided {@code Throwable} (wrapped in
201 * an {@code ExecutionException}) and calling {@code checkedGet()} will throw
202 * the provided exception itself.
203 *
204 * @throws Error if the throwable was an {@link Error}.
205 */
206 public static <T, E extends Exception> CheckedFuture<T, E>
207 immediateFailedCheckedFuture(final E exception) {
208 checkNotNull(exception);
209 return makeChecked(Futures.<T>immediateFailedFuture(exception),
210 new Function<Exception, E>() {
211 public E apply(Exception e) {
212 return exception;
213 }
214 });
215 }
216
217 /**
218 * Creates a new {@code ListenableFuture} that wraps another
219 * {@code ListenableFuture}. The result of the new future is the result of
220 * the provided function called on the result of the provided future.
221 * The resulting future doesn't interrupt when aborted.
222 *
223 * <p>TODO: Add a version that accepts a normal {@code Future}
224 *
225 * <p>The typical use for this method would be when a RPC call is dependent on
226 * the results of another RPC. One would call the first RPC (input), create a
227 * function that calls another RPC based on input's result, and then call
228 * chain on input and that function to get a {@code ListenableFuture} of
229 * the result.
230 *
231 * @param input The future to chain
232 * @param function A function to chain the results of the provided future
233 * to the results of the returned future. This will be run in the thread
234 * that notifies input it is complete.
235 * @return A future that holds result of the chain.
236 */
237 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
238 Function<? super I, ? extends ListenableFuture<? extends O>> function) {
239 return chain(input, function, MoreExecutors.sameThreadExecutor());
240 }
241
242 /**
243 * Creates a new {@code ListenableFuture} that wraps another
244 * {@code ListenableFuture}. The result of the new future is the result of
245 * the provided function called on the result of the provided future.
246 * The resulting future doesn't interrupt when aborted.
247 *
248 * <p>This version allows an arbitrary executor to be passed in for running
249 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
250 * the thread chained Function executes in will be whichever thread set the
251 * result of the input Future, which may be the network thread in the case of
252 * RPC-based Futures.
253 *
254 * @param input The future to chain
255 * @param function A function to chain the results of the provided future
256 * to the results of the returned future.
257 * @param exec Executor to run the function in.
258 * @return A future that holds result of the chain.
259 */
260 public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
261 Function<? super I, ? extends ListenableFuture<? extends O>> function,
262 Executor exec) {
263 ChainingListenableFuture<I, O> chain =
264 new ChainingListenableFuture<I, O>(function, input);
265 input.addListener(chain, exec);
266 return chain;
267 }
268
269 /**
270 * Creates a new {@code ListenableFuture} that wraps another
271 * {@code ListenableFuture}. The result of the new future is the result of
272 * the provided function called on the result of the provided future.
273 * The resulting future doesn't interrupt when aborted.
274 *
275 * <p>An example use of this method is to convert a serializable object
276 * returned from an RPC into a POJO.
277 *
278 * @param future The future to compose
279 * @param function A Function to compose the results of the provided future
280 * to the results of the returned future. This will be run in the thread
281 * that notifies input it is complete.
282 * @return A future that holds result of the composition.
283 */
284 public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future,
285 final Function<? super I, ? extends O> function) {
286 return compose(future, function, MoreExecutors.sameThreadExecutor());
287 }
288
289 /**
290 * Creates a new {@code ListenableFuture} that wraps another
291 * {@code ListenableFuture}. The result of the new future is the result of
292 * the provided function called on the result of the provided future.
293 * The resulting future doesn't interrupt when aborted.
294 *
295 * <p>An example use of this method is to convert a serializable object
296 * returned from an RPC into a POJO.
297 *
298 * <p>This version allows an arbitrary executor to be passed in for running
299 * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
300 * the thread chained Function executes in will be whichever thread set the
301 * result of the input Future, which may be the network thread in the case of
302 * RPC-based Futures.
303 *
304 * @param future The future to compose
305 * @param function A Function to compose the results of the provided future
306 * to the results of the returned future.
307 * @param exec Executor to run the function in.
308 * @return A future that holds result of the composition.
309 * @since 2
310 */
311 public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future,
312 final Function<? super I, ? extends O> function, Executor exec) {
313 checkNotNull(function);
314 Function<I, ListenableFuture<O>> wrapperFunction
315 = new Function<I, ListenableFuture<O>>() {
316 @Override public ListenableFuture<O> apply(I input) {
317 O output = function.apply(input);
318 return immediateFuture(output);
319 }
320 };
321 return chain(future, wrapperFunction, exec);
322 }
323
324 /**
325 * Creates a new {@code Future} that wraps another {@code Future}.
326 * The result of the new future is the result of the provided function called
327 * on the result of the provided future.
328 *
329 * <p>An example use of this method is to convert a Future that produces a
330 * handle to an object to a future that produces the object itself.
331 *
332 * <p>Each call to {@code Future<O>.get(*)} results in a call to
333 * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it
334 * is assumed that {@code Future<I>.get(*)} is idempotent.
335 *
336 * <p>When calling {@link Future#get(long, TimeUnit)} on the returned
337 * future, the timeout only applies to the future passed in to this method.
338 * Any additional time taken by applying {@code function} is not considered.
339 *
340 * @param future The future to compose
341 * @param function A Function to compose the results of the provided future
342 * to the results of the returned future. This will be run in the thread
343 * that calls one of the varieties of {@code get()}.
344 * @return A future that computes result of the composition.
345 */
346 public static <I, O> Future<O> compose(final Future<I> future,
347 final Function<? super I, ? extends O> function) {
348 checkNotNull(future);
349 checkNotNull(function);
350 return new Future<O>() {
351
352 /*
353 * Concurrency detail:
354 *
355 * <p>To preserve the idempotency of calls to this.get(*) calls to the
356 * function are only applied once. A lock is required to prevent multiple
357 * applications of the function. The calls to future.get(*) are performed
358 * outside the lock, as is required to prevent calls to
359 * get(long, TimeUnit) to persist beyond their timeout.
360 *
361 * <p>Calls to future.get(*) on every call to this.get(*) also provide
362 * the cancellation behavior for this.
363 *
364 * <p>(Consider: in thread A, call get(), in thread B call get(long,
365 * TimeUnit). Thread B may have to wait for Thread A to finish, which
366 * would be unacceptable.)
367 *
368 * <p>Note that each call to Future<O>.get(*) results in a call to
369 * Future<I>.get(*), but the function is only applied once, so
370 * Future<I>.get(*) is assumed to be idempotent.
371 */
372
373 private final Object lock = new Object();
374 private boolean set = false;
375 private O value = null;
376
377 @Override
378 public O get() throws InterruptedException, ExecutionException {
379 return apply(future.get());
380 }
381
382 @Override
383 public O get(long timeout, TimeUnit unit) throws InterruptedException,
384 ExecutionException, TimeoutException {
385 return apply(future.get(timeout, unit));
386 }
387
388 private O apply(I raw) {
389 synchronized(lock) {
390 if (!set) {
391 value = function.apply(raw);
392 set = true;
393 }
394 return value;
395 }
396 }
397
398 @Override
399 public boolean cancel(boolean mayInterruptIfRunning) {
400 return future.cancel(mayInterruptIfRunning);
401 }
402
403 @Override
404 public boolean isCancelled() {
405 return future.isCancelled();
406 }
407
408 @Override
409 public boolean isDone() {
410 return future.isDone();
411 }
412 };
413 }
414
415 /**
416 * An implementation of {@code ListenableFuture} that also implements
417 * {@code Runnable} so that it can be used to nest ListenableFutures.
418 * Once the passed-in {@code ListenableFuture} is complete, it calls the
419 * passed-in {@code Function} to generate the result.
420 * The resulting future doesn't interrupt when aborted.
421 *
422 * <p>If the function throws any checked exceptions, they should be wrapped
423 * in a {@code UndeclaredThrowableException} so that this class can get
424 * access to the cause.
425 */
426 private static class ChainingListenableFuture<I, O>
427 extends AbstractListenableFuture<O> implements Runnable {
428
429 private Function<? super I, ? extends ListenableFuture<? extends O>>
430 function;
431 private UninterruptibleFuture<? extends I> inputFuture;
432
433 private ChainingListenableFuture(
434 Function<? super I, ? extends ListenableFuture<? extends O>> function,
435 ListenableFuture<? extends I> inputFuture) {
436 this.function = checkNotNull(function);
437 this.inputFuture = makeUninterruptible(inputFuture);
438 }
439
440 public boolean cancel(boolean mayInterruptIfRunning) {
441 Future<? extends I> future = inputFuture;
442 if (future != null) {
443 return future.cancel(mayInterruptIfRunning);
444 }
445 return false;
446 }
447
448 public void run() {
449 try {
450 I sourceResult;
451 try {
452 sourceResult = inputFuture.get();
453 } catch (CancellationException e) {
454 // Cancel this future and return.
455 cancel();
456 return;
457 } catch (ExecutionException e) {
458 // Set the cause of the exception as this future's exception
459 setException(e.getCause());
460 return;
461 }
462
463 final ListenableFuture<? extends O> outputFuture =
464 function.apply(sourceResult);
465 outputFuture.addListener(new Runnable() {
466 public void run() {
467 try {
468 // Here it would have been nice to have had an
469 // UninterruptibleListenableFuture, but we don't want to start a
470 // combinatorial explosion of interfaces, so we have to make do.
471 set(makeUninterruptible(outputFuture).get());
472 } catch (ExecutionException e) {
473 // Set the cause of the exception as this future's exception
474 setException(e.getCause());
475 }
476 }
477 }, MoreExecutors.sameThreadExecutor());
478 } catch (UndeclaredThrowableException e) {
479 // Set the cause of the exception as this future's exception
480 setException(e.getCause());
481 } catch (RuntimeException e) {
482 // This exception is irrelevant in this thread, but useful for the
483 // client
484 setException(e);
485 } catch (Error e) {
486 // This seems evil, but the client needs to know an error occured and
487 // the error needs to be propagated ASAP.
488 setException(e);
489 throw e;
490 } finally {
491 // Don't pin inputs beyond completion
492 function = null;
493 inputFuture = null;
494 }
495 }
496 }
497
498 /**
499 * A checked future that uses a function to map from exceptions to the
500 * appropriate checked type.
501 */
502 private static class MappingCheckedFuture<T, E extends Exception> extends
503 AbstractCheckedFuture<T, E> {
504
505 final Function<Exception, E> mapper;
506
507 MappingCheckedFuture(ListenableFuture<T> delegate,
508 Function<Exception, E> mapper) {
509 super(delegate);
510
511 this.mapper = checkNotNull(mapper);
512 }
513
514 @Override
515 protected E mapException(Exception e) {
516 return mapper.apply(e);
517 }
518 }
519
520 /**
521 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This
522 * will wait on the future to finish, and when it completes, run the
523 * listeners. This implementation will wait on the source future
524 * indefinitely, so if the source future never completes, the adapter will
525 * never complete either.
526 *
527 * <p>If the delegate future is interrupted or throws an unexpected unchecked
528 * exception, the listeners will not be invoked.
529 */
530 private static class ListenableFutureAdapter<T> extends ForwardingFuture<T>
531 implements ListenableFuture<T> {
532
533 private static final Executor adapterExecutor =
534 java.util.concurrent.Executors.newCachedThreadPool();
535
536 // The execution list to hold our listeners.
537 private final ExecutionList executionList = new ExecutionList();
538
539 // This allows us to only start up a thread waiting on the delegate future
540 // when the first listener is added.
541 private final AtomicBoolean hasListeners = new AtomicBoolean(false);
542
543 // The delegate future.
544 private final Future<T> delegate;
545
546 ListenableFutureAdapter(final Future<T> delegate) {
547 this.delegate = checkNotNull(delegate);
548 }
549
550 @Override
551 protected Future<T> delegate() {
552 return delegate;
553 }
554
555 @Override
556 public void addListener(Runnable listener, Executor exec) {
557
558 // When a listener is first added, we run a task that will wait for
559 // the delegate to finish, and when it is done will run the listeners.
560 if (!hasListeners.get() && hasListeners.compareAndSet(false, true)) {
561 adapterExecutor.execute(new Runnable() {
562 @Override
563 public void run() {
564 try {
565 delegate.get();
566 } catch (CancellationException e) {
567 // The task was cancelled, so it is done, run the listeners.
568 } catch (InterruptedException e) {
569 // This thread was interrupted. This should never happen, so we
570 // throw an IllegalStateException.
571 throw new IllegalStateException("Adapter thread interrupted!", e);
572 } catch (ExecutionException e) {
573 // The task caused an exception, so it is done, run the listeners.
574 }
575 executionList.run();
576 }
577 });
578 }
579 executionList.add(listener, exec);
580 }
581 }
582 }