001    /*
002     * Copyright (C) 2006 Google Inc.
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    import static java.util.concurrent.TimeUnit.NANOSECONDS;
021    
022    import com.google.common.annotations.Beta;
023    import com.google.common.base.Function;
024    
025    import java.lang.reflect.UndeclaredThrowableException;
026    import java.util.concurrent.CancellationException;
027    import java.util.concurrent.ExecutionException;
028    import java.util.concurrent.Executor;
029    import java.util.concurrent.Future;
030    import java.util.concurrent.TimeUnit;
031    import java.util.concurrent.TimeoutException;
032    import java.util.concurrent.atomic.AtomicBoolean;
033    
034    import javax.annotation.Nullable;
035    
036    /**
037     * Static utility methods pertaining to the {@link Future} interface.
038     *
039     * @author Kevin Bourrillion
040     * @author Nishant Thakkar
041     * @author Sven Mawson
042     * @since 1
043     */
044    @Beta
045    public class Futures {
046      private Futures() {}
047    
048      /**
049       * Returns an uninterruptible view of a {@code Future}. If a thread is
050       * interrupted during an attempt to {@code get()} from the returned future, it
051       * continues to wait on the result until it is available or the timeout
052       * elapses, and only then re-interrupts the thread.
053       */
054      public static <V> UninterruptibleFuture<V> makeUninterruptible(
055          final Future<V> future) {
056        checkNotNull(future);
057        if (future instanceof UninterruptibleFuture) {
058          return (UninterruptibleFuture<V>) future;
059        }
060        return new UninterruptibleFuture<V>() {
061          public boolean cancel(boolean mayInterruptIfRunning) {
062            return future.cancel(mayInterruptIfRunning);
063          }
064          public boolean isCancelled() {
065            return future.isCancelled();
066          }
067          public boolean isDone() {
068            return future.isDone();
069          }
070    
071          public V get(long timeoutDuration, TimeUnit timeoutUnit)
072              throws TimeoutException, ExecutionException {
073            boolean interrupted = false;
074            try {
075              long timeoutNanos = timeoutUnit.toNanos(timeoutDuration);
076              long end = System.nanoTime() + timeoutNanos;
077              while (true) {
078                try {
079                  return future.get(timeoutNanos, NANOSECONDS);
080                } catch (InterruptedException e) {
081                  // Future treats negative timeouts just like zero.
082                  timeoutNanos = end - System.nanoTime();
083                  interrupted = true;
084                }
085              }
086            } finally {
087              if (interrupted) {
088                Thread.currentThread().interrupt();
089              }
090            }
091          }
092    
093          public V get() throws ExecutionException {
094            boolean interrupted = false;
095            try {
096              while (true) {
097                try {
098                  return future.get();
099                } catch (InterruptedException ignored) {
100                  interrupted = true;
101                }
102              }
103            } finally {
104              if (interrupted) {
105                Thread.currentThread().interrupt();
106              }
107            }
108          }
109        };
110      }
111    
112      /**
113       * Creates a {@link ListenableFuture} out of a normal {@link Future}. The
114       * returned future will create a thread to wait for the source future to
115       * complete before executing the listeners.
116       *
117       * <p>Callers who have a future that subclasses
118       * {@link java.util.concurrent.FutureTask} may want to instead subclass
119       * {@link ListenableFutureTask}, which adds the {@link ListenableFuture}
120       * functionality to the standard {@code FutureTask} implementation.
121       */
122      public static <T> ListenableFuture<T> makeListenable(Future<T> future) {
123        if (future instanceof ListenableFuture) {
124          return (ListenableFuture<T>) future;
125        }
126        return new ListenableFutureAdapter<T>(future);
127      }
128    
129      /**
130       * Creates a {@link CheckedFuture} out of a normal {@link Future} and a
131       * {@link Function} that maps from {@link Exception} instances into the
132       * appropriate checked type.
133       *
134       * <p>The given mapping function will be applied to an
135       * {@link InterruptedException}, a {@link CancellationException}, or an
136       * {@link ExecutionException} with the actual cause of the exception.
137       * See {@link Future#get()} for details on the exceptions thrown.
138       */
139      public static <T, E extends Exception> CheckedFuture<T, E> makeChecked(
140          Future<T> future, Function<Exception, E> mapper) {
141        return new MappingCheckedFuture<T, E>(makeListenable(future), mapper);
142      }
143    
144      /**
145       * Creates a {@code ListenableFuture} which has its value set immediately upon
146       * construction. The getters just return the value. This {@code Future} can't
147       * be canceled or timed out and its {@code isDone()} method always returns
148       * {@code true}. It's useful for returning something that implements the
149       * {@code ListenableFuture} interface but already has the result.
150       */
151      public static <T> ListenableFuture<T> immediateFuture(@Nullable T value) {
152        ValueFuture<T> future = ValueFuture.create();
153        future.set(value);
154        return future;
155      }
156    
157      /**
158       * Creates a {@code CheckedFuture} which has its value set immediately upon
159       * construction. The getters just return the value. This {@code Future} can't
160       * be canceled or timed out and its {@code isDone()} method always returns
161       * {@code true}. It's useful for returning something that implements the
162       * {@code CheckedFuture} interface but already has the result.
163       */
164      public static <T, E extends Exception> CheckedFuture<T, E>
165          immediateCheckedFuture(@Nullable T value) {
166        ValueFuture<T> future = ValueFuture.create();
167        future.set(value);
168        return Futures.makeChecked(future, new Function<Exception, E>() {
169          public E apply(Exception e) {
170            throw new AssertionError("impossible");
171          }
172        });
173      }
174    
175      /**
176       * Creates a {@code ListenableFuture} which has an exception set immediately
177       * upon construction. The getters just return the value. This {@code Future}
178       * can't be canceled or timed out and its {@code isDone()} method always
179       * returns {@code true}. It's useful for returning something that implements
180       * the {@code ListenableFuture} interface but already has a failed
181       * result. Calling {@code get()} will throw the provided {@code Throwable}
182       * (wrapped in an {@code ExecutionException}).
183       *
184       * @throws Error if the throwable was an {@link Error}.
185       */
186      public static <T> ListenableFuture<T> immediateFailedFuture(
187          Throwable throwable) {
188        checkNotNull(throwable);
189        ValueFuture<T> future = ValueFuture.create();
190        future.setException(throwable);
191        return future;
192      }
193    
194      /**
195       * Creates a {@code CheckedFuture} which has an exception set immediately
196       * upon construction. The getters just return the value. This {@code Future}
197       * can't be canceled or timed out and its {@code isDone()} method always
198       * returns {@code true}. It's useful for returning something that implements
199       * the {@code CheckedFuture} interface but already has a failed result.
200       * Calling {@code get()} will throw the provided {@code Throwable} (wrapped in
201       * an {@code ExecutionException}) and calling {@code checkedGet()} will throw
202       * the provided exception itself.
203       *
204       * @throws Error if the throwable was an {@link Error}.
205       */
206      public static <T, E extends Exception> CheckedFuture<T, E>
207          immediateFailedCheckedFuture(final E exception) {
208        checkNotNull(exception);
209        return makeChecked(Futures.<T>immediateFailedFuture(exception),
210            new Function<Exception, E>() {
211              public E apply(Exception e) {
212                return exception;
213              }
214            });
215      }
216    
217      /**
218       * Creates a new {@code ListenableFuture} that wraps another
219       * {@code ListenableFuture}.  The result of the new future is the result of
220       * the provided function called on the result of the provided future.
221       * The resulting future doesn't interrupt when aborted.
222       *
223       * <p>TODO: Add a version that accepts a normal {@code Future}
224       *
225       * <p>The typical use for this method would be when a RPC call is dependent on
226       * the results of another RPC.  One would call the first RPC (input), create a
227       * function that calls another RPC based on input's result, and then call
228       * chain on input and that function to get a {@code ListenableFuture} of
229       * the result.
230       *
231       * @param input The future to chain
232       * @param function A function to chain the results of the provided future
233       *     to the results of the returned future.  This will be run in the thread
234       *     that notifies input it is complete.
235       * @return A future that holds result of the chain.
236       */
237      public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
238          Function<? super I, ? extends ListenableFuture<? extends O>> function) {
239        return chain(input, function, MoreExecutors.sameThreadExecutor());
240      }
241    
242      /**
243       * Creates a new {@code ListenableFuture} that wraps another
244       * {@code ListenableFuture}.  The result of the new future is the result of
245       * the provided function called on the result of the provided future.
246       * The resulting future doesn't interrupt when aborted.
247       *
248       * <p>This version allows an arbitrary executor to be passed in for running
249       * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
250       * the thread chained Function executes in will be whichever thread set the
251       * result of the input Future, which may be the network thread in the case of
252       * RPC-based Futures.
253       *
254       * @param input The future to chain
255       * @param function A function to chain the results of the provided future
256       *     to the results of the returned future.
257       * @param exec Executor to run the function in.
258       * @return A future that holds result of the chain.
259       */
260      public static <I, O> ListenableFuture<O> chain(ListenableFuture<I> input,
261          Function<? super I, ? extends ListenableFuture<? extends O>> function,
262          Executor exec) {
263        ChainingListenableFuture<I, O> chain =
264            new ChainingListenableFuture<I, O>(function, input);
265        input.addListener(chain, exec);
266        return chain;
267      }
268    
269      /**
270       * Creates a new {@code ListenableFuture} that wraps another
271       * {@code ListenableFuture}.  The result of the new future is the result of
272       * the provided function called on the result of the provided future.
273       * The resulting future doesn't interrupt when aborted.
274       *
275       * <p>An example use of this method is to convert a serializable object
276       * returned from an RPC into a POJO.
277       *
278       * @param future The future to compose
279       * @param function A Function to compose the results of the provided future
280       *     to the results of the returned future.  This will be run in the thread
281       *     that notifies input it is complete.
282       * @return A future that holds result of the composition.
283       */
284      public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future,
285          final Function<? super I, ? extends O> function) {
286        return compose(future, function, MoreExecutors.sameThreadExecutor());
287      }
288    
289      /**
290       * Creates a new {@code ListenableFuture} that wraps another
291       * {@code ListenableFuture}.  The result of the new future is the result of
292       * the provided function called on the result of the provided future.
293       * The resulting future doesn't interrupt when aborted.
294       *
295       * <p>An example use of this method is to convert a serializable object
296       * returned from an RPC into a POJO.
297       *
298       * <p>This version allows an arbitrary executor to be passed in for running
299       * the chained Function. When using {@link MoreExecutors#sameThreadExecutor},
300       * the thread chained Function executes in will be whichever thread set the
301       * result of the input Future, which may be the network thread in the case of
302       * RPC-based Futures.
303       *
304       * @param future The future to compose
305       * @param function A Function to compose the results of the provided future
306       *     to the results of the returned future.
307       * @param exec Executor to run the function in.
308       * @return A future that holds result of the composition.
309       * @since 2
310       */
311      public static <I, O> ListenableFuture<O> compose(ListenableFuture<I> future,
312          final Function<? super I, ? extends O> function, Executor exec) {
313        checkNotNull(function);
314        Function<I, ListenableFuture<O>> wrapperFunction
315            = new Function<I, ListenableFuture<O>>() {
316                @Override public ListenableFuture<O> apply(I input) {
317                  O output = function.apply(input);
318                  return immediateFuture(output);
319                }
320            };
321        return chain(future, wrapperFunction, exec);
322      }
323    
324      /**
325       * Creates a new {@code Future} that wraps another {@code Future}.
326       * The result of the new future is the result of the provided function called
327       * on the result of the provided future.
328       *
329       * <p>An example use of this method is to convert a Future that produces a
330       * handle to an object to a future that produces the object itself.
331       *
332       * <p>Each call to {@code Future<O>.get(*)} results in a call to
333       * {@code Future<I>.get(*)}, but {@code function} is only applied once, so it
334       * is assumed that {@code Future<I>.get(*)} is idempotent.
335       *
336       * <p>When calling {@link Future#get(long, TimeUnit)} on the returned
337       * future, the timeout only applies to the future passed in to this method.
338       * Any additional time taken by applying {@code function} is not considered.
339       *
340       * @param future The future to compose
341       * @param function A Function to compose the results of the provided future
342       *     to the results of the returned future.  This will be run in the thread
343       *     that calls one of the varieties of {@code get()}.
344       * @return A future that computes result of the composition.
345       */
346      public static <I, O> Future<O> compose(final Future<I> future,
347          final Function<? super I, ? extends O> function) {
348        checkNotNull(future);
349        checkNotNull(function);
350        return new Future<O>() {
351    
352          /*
353           * Concurrency detail:
354           *
355           * <p>To preserve the idempotency of calls to this.get(*) calls to the
356           * function are only applied once. A lock is required to prevent multiple
357           * applications of the function. The calls to future.get(*) are performed
358           * outside the lock, as is required to prevent calls to
359           * get(long, TimeUnit) to persist beyond their timeout.
360           *
361           * <p>Calls to future.get(*) on every call to this.get(*) also provide
362           * the cancellation behavior for this.
363           *
364           * <p>(Consider: in thread A, call get(), in thread B call get(long,
365           * TimeUnit). Thread B may have to wait for Thread A to finish, which
366           * would be unacceptable.)
367           *
368           * <p>Note that each call to Future<O>.get(*) results in a call to
369           * Future<I>.get(*), but the function is only applied once, so
370           * Future<I>.get(*) is assumed to be idempotent.
371           */
372    
373          private final Object lock = new Object();
374          private boolean set = false;
375          private O value = null;
376    
377          @Override
378          public O get() throws InterruptedException, ExecutionException {
379            return apply(future.get());
380          }
381    
382          @Override
383          public O get(long timeout, TimeUnit unit) throws InterruptedException,
384              ExecutionException, TimeoutException {
385            return apply(future.get(timeout, unit));
386          }
387    
388          private O apply(I raw) {
389            synchronized(lock) {
390              if (!set) {
391                value = function.apply(raw);
392                set = true;
393              }
394              return value;
395            }
396          }
397    
398          @Override
399          public boolean cancel(boolean mayInterruptIfRunning) {
400            return future.cancel(mayInterruptIfRunning);
401          }
402    
403          @Override
404          public boolean isCancelled() {
405            return future.isCancelled();
406          }
407    
408          @Override
409          public boolean isDone() {
410            return future.isDone();
411          }
412        };
413      }
414    
415      /**
416       * An implementation of {@code ListenableFuture} that also implements
417       * {@code Runnable} so that it can be used to nest ListenableFutures.
418       * Once the passed-in {@code ListenableFuture} is complete, it calls the
419       * passed-in {@code Function} to generate the result.
420       * The resulting future doesn't interrupt when aborted.
421       *
422       * <p>If the function throws any checked exceptions, they should be wrapped
423       * in a {@code UndeclaredThrowableException} so that this class can get
424       * access to the cause.
425       */
426      private static class ChainingListenableFuture<I, O>
427          extends AbstractListenableFuture<O> implements Runnable {
428    
429        private Function<? super I, ? extends ListenableFuture<? extends O>>
430            function;
431        private UninterruptibleFuture<? extends I> inputFuture;
432    
433        private ChainingListenableFuture(
434            Function<? super I, ? extends ListenableFuture<? extends O>> function,
435            ListenableFuture<? extends I> inputFuture) {
436          this.function = checkNotNull(function);
437          this.inputFuture = makeUninterruptible(inputFuture);
438        }
439    
440        public boolean cancel(boolean mayInterruptIfRunning) {
441          Future<? extends I> future = inputFuture;
442          if (future != null) {
443            return future.cancel(mayInterruptIfRunning);
444          }
445          return false;
446        }
447    
448        public void run() {
449          try {
450            I sourceResult;
451            try {
452              sourceResult = inputFuture.get();
453            } catch (CancellationException e) {
454              // Cancel this future and return.
455              cancel();
456              return;
457            } catch (ExecutionException e) {
458              // Set the cause of the exception as this future's exception
459              setException(e.getCause());
460              return;
461            }
462    
463            final ListenableFuture<? extends O> outputFuture =
464                function.apply(sourceResult);
465            outputFuture.addListener(new Runnable() {
466                public void run() {
467                  try {
468                    // Here it would have been nice to have had an
469                    // UninterruptibleListenableFuture, but we don't want to start a
470                    // combinatorial explosion of interfaces, so we have to make do.
471                    set(makeUninterruptible(outputFuture).get());
472                  } catch (ExecutionException e) {
473                    // Set the cause of the exception as this future's exception
474                    setException(e.getCause());
475                  }
476                }
477              }, MoreExecutors.sameThreadExecutor());
478          } catch (UndeclaredThrowableException e) {
479            // Set the cause of the exception as this future's exception
480            setException(e.getCause());
481          } catch (RuntimeException e) {
482            // This exception is irrelevant in this thread, but useful for the
483            // client
484            setException(e);
485          } catch (Error e) {
486            // This seems evil, but the client needs to know an error occured and
487            // the error needs to be propagated ASAP.
488            setException(e);
489            throw e;
490          } finally {
491            // Don't pin inputs beyond completion
492            function = null;
493            inputFuture = null;
494          }
495        }
496      }
497    
498      /**
499       * A checked future that uses a function to map from exceptions to the
500       * appropriate checked type.
501       */
502      private static class MappingCheckedFuture<T, E extends Exception> extends
503          AbstractCheckedFuture<T, E> {
504    
505        final Function<Exception, E> mapper;
506    
507        MappingCheckedFuture(ListenableFuture<T> delegate,
508            Function<Exception, E> mapper) {
509          super(delegate);
510    
511          this.mapper = checkNotNull(mapper);
512        }
513    
514        @Override
515        protected E mapException(Exception e) {
516          return mapper.apply(e);
517        }
518      }
519    
520      /**
521       * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
522       * will wait on the future to finish, and when it completes, run the
523       * listeners.  This implementation will wait on the source future
524       * indefinitely, so if the source future never completes, the adapter will
525       * never complete either.
526       *
527       * <p>If the delegate future is interrupted or throws an unexpected unchecked
528       * exception, the listeners will not be invoked.
529       */
530      private static class ListenableFutureAdapter<T> extends ForwardingFuture<T>
531          implements ListenableFuture<T> {
532    
533        private static final Executor adapterExecutor =
534            java.util.concurrent.Executors.newCachedThreadPool();
535    
536        // The execution list to hold our listeners.
537        private final ExecutionList executionList = new ExecutionList();
538    
539        // This allows us to only start up a thread waiting on the delegate future
540        // when the first listener is added.
541        private final AtomicBoolean hasListeners = new AtomicBoolean(false);
542    
543        // The delegate future.
544        private final Future<T> delegate;
545    
546        ListenableFutureAdapter(final Future<T> delegate) {
547          this.delegate = checkNotNull(delegate);
548        }
549    
550        @Override
551        protected Future<T> delegate() {
552          return delegate;
553        }
554    
555        @Override
556        public void addListener(Runnable listener, Executor exec) {
557    
558          // When a listener is first added, we run a task that will wait for
559          // the delegate to finish, and when it is done will run the listeners.
560          if (!hasListeners.get() && hasListeners.compareAndSet(false, true)) {
561            adapterExecutor.execute(new Runnable() {
562              @Override
563              public void run() {
564                try {
565                  delegate.get();
566                } catch (CancellationException e) {
567                  // The task was cancelled, so it is done, run the listeners.
568                } catch (InterruptedException e) {
569                  // This thread was interrupted.  This should never happen, so we
570                  // throw an IllegalStateException.
571                  throw new IllegalStateException("Adapter thread interrupted!", e);
572                } catch (ExecutionException e) {
573                  // The task caused an exception, so it is done, run the listeners.
574                }
575                executionList.run();
576              }
577            });
578          }
579          executionList.add(listener, exec);
580        }
581      }
582    }