001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.util.concurrent.Internal.saturatedToNanos;
019
020import com.google.common.annotations.Beta;
021import com.google.common.annotations.GwtCompatible;
022import com.google.common.annotations.GwtIncompatible;
023import com.google.common.base.Function;
024import com.google.errorprone.annotations.CanIgnoreReturnValue;
025import java.time.Duration;
026import java.util.concurrent.ExecutionException;
027import java.util.concurrent.Executor;
028import java.util.concurrent.ScheduledExecutorService;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.TimeoutException;
031
032/**
033 * A {@link ListenableFuture} that supports fluent chains of operations. For example:
034 *
035 * <pre>{@code
036 * ListenableFuture<Boolean> adminIsLoggedIn =
037 *     FluentFuture.from(usersDatabase.getAdminUser())
038 *         .transform(User::getId, directExecutor())
039 *         .transform(ActivityService::isLoggedIn, threadPool)
040 *         .catching(RpcException.class, e -> false, directExecutor());
041 * }</pre>
042 *
043 * <h3>Alternatives</h3>
044 *
045 * <h4>Frameworks</h4>
046 *
047 * <p>When chaining together a graph of asynchronous operations, you will often find it easier to
048 * use a framework. Frameworks automate the process, often adding features like monitoring,
049 * debugging, and cancellation. Examples of frameworks include:
050 *
051 * <ul>
052 *   <li><a href="http://dagger.dev/producers.html">Dagger Producers</a>
053 * </ul>
054 *
055 * <h4>{@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage}
056 * </h4>
057 *
058 * <p>Users of {@code CompletableFuture} will likely want to continue using {@code
059 * CompletableFuture}. {@code FluentFuture} is targeted at people who use {@code ListenableFuture},
060 * who can't use Java 8, or who want an API more focused than {@code CompletableFuture}. (If you
061 * need to adapt between {@code CompletableFuture} and {@code ListenableFuture}, consider <a
062 * href="https://github.com/lukas-krecan/future-converter">Future Converter</a>.)
063 *
064 * <h3>Extension</h3>
065 *
066 * If you want a class like {@code FluentFuture} but with extra methods, we recommend declaring your
067 * own subclass of {@link ListenableFuture}, complete with a method like {@link #from} to adapt an
068 * existing {@code ListenableFuture}, implemented atop a {@link ForwardingListenableFuture} that
069 * forwards to that future and adds the desired methods.
070 *
071 * @since 23.0
072 */
073@Beta
074@GwtCompatible(emulated = true)
075public abstract class FluentFuture<V> extends GwtFluentFutureCatchingSpecialization<V> {
076
077  /**
078   * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring
079   * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}.
080   */
081  abstract static class TrustedFuture<V> extends FluentFuture<V>
082      implements AbstractFuture.Trusted<V> {
083    @CanIgnoreReturnValue
084    @Override
085    public final V get() throws InterruptedException, ExecutionException {
086      return super.get();
087    }
088
089    @CanIgnoreReturnValue
090    @Override
091    public final V get(long timeout, TimeUnit unit)
092        throws InterruptedException, ExecutionException, TimeoutException {
093      return super.get(timeout, unit);
094    }
095
096    @Override
097    public final boolean isDone() {
098      return super.isDone();
099    }
100
101    @Override
102    public final boolean isCancelled() {
103      return super.isCancelled();
104    }
105
106    @Override
107    public final void addListener(Runnable listener, Executor executor) {
108      super.addListener(listener, executor);
109    }
110
111    @CanIgnoreReturnValue
112    @Override
113    public final boolean cancel(boolean mayInterruptIfRunning) {
114      return super.cancel(mayInterruptIfRunning);
115    }
116  }
117
118  FluentFuture() {}
119
120  /**
121   * Converts the given {@code ListenableFuture} to an equivalent {@code FluentFuture}.
122   *
123   * <p>If the given {@code ListenableFuture} is already a {@code FluentFuture}, it is returned
124   * directly. If not, it is wrapped in a {@code FluentFuture} that delegates all calls to the
125   * original {@code ListenableFuture}.
126   */
127  public static <V> FluentFuture<V> from(ListenableFuture<V> future) {
128    return future instanceof FluentFuture
129        ? (FluentFuture<V>) future
130        : new ForwardingFluentFuture<V>(future);
131  }
132
133  /**
134   * Simply returns its argument.
135   *
136   * @deprecated no need to use this
137   * @since 28.0
138   */
139  @Deprecated
140  public static <V> FluentFuture<V> from(FluentFuture<V> future) {
141    return checkNotNull(future);
142  }
143
144  /**
145   * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
146   * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
147   * fallback}. {@link Function#apply} is not invoked until the primary input has failed, so if the
148   * primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, an
149   * exception is thrown, this exception is used as the result of the output {@code Future}.
150   *
151   * <p>Usage example:
152   *
153   * <pre>{@code
154   * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
155   * // counters.
156   * ListenableFuture<Integer> faultTolerantFuture =
157   *     fetchCounters().catching(FetchException.class, x -> 0, directExecutor());
158   * }</pre>
159   *
160   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
161   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
162   * listeners are also applicable to heavyweight functions passed to this method.
163   *
164   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
165   * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
166   * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
167   * #transform}.
168   *
169   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
170   *     type is matched against the input's exception. "The input's exception" means the cause of
171   *     the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a
172   *     different kind of exception, that exception itself. To avoid hiding bugs and other
173   *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
174   *     Throwable.class} in particular.
175   * @param fallback the {@link Function} to be called if the input fails with the expected
176   *     exception type. The function's argument is the input's exception. "The input's exception"
177   *     means the cause of the {@link ExecutionException} thrown by {@code this.get()} or, if
178   *     {@code get()} throws a different kind of exception, that exception itself.
179   * @param executor the executor that runs {@code fallback} if the input fails
180   */
181  @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
182  public final <X extends Throwable> FluentFuture<V> catching(
183      Class<X> exceptionType, Function<? super X, ? extends V> fallback, Executor executor) {
184    return (FluentFuture<V>) Futures.catching(this, exceptionType, fallback, executor);
185  }
186
187  /**
188   * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
189   * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
190   * fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has failed, so if
191   * the primary input succeeds, it is never invoked. If, during the invocation of {@code fallback},
192   * an exception is thrown, this exception is used as the result of the output {@code Future}.
193   *
194   * <p>Usage examples:
195   *
196   * <pre>{@code
197   * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
198   * // counters.
199   * ListenableFuture<Integer> faultTolerantFuture =
200   *     fetchCounters().catchingAsync(
201   *         FetchException.class, x -> immediateFuture(0), directExecutor());
202   * }</pre>
203   *
204   * <p>The fallback can also choose to propagate the original exception when desired:
205   *
206   * <pre>{@code
207   * // Falling back to a zero counter only in case the exception was a
208   * // TimeoutException.
209   * ListenableFuture<Integer> faultTolerantFuture =
210   *     fetchCounters().catchingAsync(
211   *         FetchException.class,
212   *         e -> {
213   *           if (omitDataOnFetchFailure) {
214   *             return immediateFuture(0);
215   *           }
216   *           throw e;
217   *         },
218   *         directExecutor());
219   * }</pre>
220   *
221   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
222   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
223   * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
224   * {@code directExecutor} functions should avoid heavyweight operations inside {@code
225   * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
226   * completing the returned {@code Future}.)
227   *
228   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
229   * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
230   * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
231   * #transform}.
232   *
233   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
234   *     type is matched against the input's exception. "The input's exception" means the cause of
235   *     the {@link ExecutionException} thrown by {@code this.get()} or, if {@code get()} throws a
236   *     different kind of exception, that exception itself. To avoid hiding bugs and other
237   *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
238   *     Throwable.class} in particular.
239   * @param fallback the {@link AsyncFunction} to be called if the input fails with the expected
240   *     exception type. The function's argument is the input's exception. "The input's exception"
241   *     means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if
242   *     {@code get()} throws a different kind of exception, that exception itself.
243   * @param executor the executor that runs {@code fallback} if the input fails
244   */
245  @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
246  public final <X extends Throwable> FluentFuture<V> catchingAsync(
247      Class<X> exceptionType, AsyncFunction<? super X, ? extends V> fallback, Executor executor) {
248    return (FluentFuture<V>) Futures.catchingAsync(this, exceptionType, fallback, executor);
249  }
250
251  /**
252   * Returns a future that delegates to this future but will finish early (via a {@link
253   * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires.
254   * If the timeout expires, not only will the output future finish, but also the input future
255   * ({@code this}) will be cancelled and interrupted.
256   *
257   * @param timeout when to time out the future
258   * @param scheduledExecutor The executor service to enforce the timeout.
259   * @since 28.0
260   */
261  @GwtIncompatible // ScheduledExecutorService
262  public final FluentFuture<V> withTimeout(
263      Duration timeout, ScheduledExecutorService scheduledExecutor) {
264    return withTimeout(saturatedToNanos(timeout), TimeUnit.NANOSECONDS, scheduledExecutor);
265  }
266
267  /**
268   * Returns a future that delegates to this future but will finish early (via a {@link
269   * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires.
270   * If the timeout expires, not only will the output future finish, but also the input future
271   * ({@code this}) will be cancelled and interrupted.
272   *
273   * @param timeout when to time out the future
274   * @param unit the time unit of the time parameter
275   * @param scheduledExecutor The executor service to enforce the timeout.
276   */
277  @GwtIncompatible // ScheduledExecutorService
278  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
279  public final FluentFuture<V> withTimeout(
280      long timeout, TimeUnit unit, ScheduledExecutorService scheduledExecutor) {
281    return (FluentFuture<V>) Futures.withTimeout(this, timeout, unit, scheduledExecutor);
282  }
283
284  /**
285   * Returns a new {@code Future} whose result is asynchronously derived from the result of this
286   * {@code Future}. If the input {@code Future} fails, the returned {@code Future} fails with the
287   * same exception (and the function is not invoked).
288   *
289   * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced
290   * by applying the given {@code AsyncFunction} to the result of the original {@code Future}.
291   * Example usage:
292   *
293   * <pre>{@code
294   * FluentFuture<RowKey> rowKeyFuture = FluentFuture.from(indexService.lookUp(query));
295   * ListenableFuture<QueryResult> queryFuture =
296   *     rowKeyFuture.transformAsync(dataService::readFuture, executor);
297   * }</pre>
298   *
299   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
300   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
301   * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
302   * {@code directExecutor} functions should avoid heavyweight operations inside {@code
303   * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
304   * completing the returned {@code Future}.)
305   *
306   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
307   * input future and that of the future returned by the chain function. That is, if the returned
308   * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the
309   * other two is cancelled, the returned {@code Future} will receive a callback in which it will
310   * attempt to cancel itself.
311   *
312   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenCompose} and
313   * {@link java.util.concurrent.CompletableFuture#thenComposeAsync}. It can also serve some of the
314   * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
315   * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
316   *
317   * @param function A function to transform the result of this future to the result of the output
318   *     future
319   * @param executor Executor to run the function in.
320   * @return A future that holds result of the function (if the input succeeded) or the original
321   *     input's failure (if not)
322   */
323  public final <T> FluentFuture<T> transformAsync(
324      AsyncFunction<? super V, T> function, Executor executor) {
325    return (FluentFuture<T>) Futures.transformAsync(this, function, executor);
326  }
327
328  /**
329   * Returns a new {@code Future} whose result is derived from the result of this {@code Future}. If
330   * this input {@code Future} fails, the returned {@code Future} fails with the same exception (and
331   * the function is not invoked). Example usage:
332   *
333   * <pre>{@code
334   * ListenableFuture<List<Row>> rowsFuture =
335   *     queryFuture.transform(QueryResult::getRows, executor);
336   * }</pre>
337   *
338   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
339   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
340   * listeners are also applicable to heavyweight functions passed to this method.
341   *
342   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
343   * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel
344   * the input, and if the input is cancelled, the returned {@code Future} will receive a callback
345   * in which it will attempt to cancel itself.
346   *
347   * <p>An example use of this method is to convert a serializable object returned from an RPC into
348   * a POJO.
349   *
350   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenApply} and
351   * {@link java.util.concurrent.CompletableFuture#thenApplyAsync}. It can also serve some of the
352   * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
353   * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
354   *
355   * @param function A Function to transform the results of this future to the results of the
356   *     returned future.
357   * @param executor Executor to run the function in.
358   * @return A future that holds result of the transformation.
359   */
360  public final <T> FluentFuture<T> transform(Function<? super V, T> function, Executor executor) {
361    return (FluentFuture<T>) Futures.transform(this, function, executor);
362  }
363
364  /**
365   * Registers separate success and failure callbacks to be run when this {@code Future}'s
366   * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the
367   * computation is already complete, immediately.
368   *
369   * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of
370   * callbacks, but any callback added through this method is guaranteed to be called once the
371   * computation is complete.
372   *
373   * <p>Example:
374   *
375   * <pre>{@code
376   * future.addCallback(
377   *     new FutureCallback<QueryResult>() {
378   *       public void onSuccess(QueryResult result) {
379   *         storeInCache(result);
380   *       }
381   *       public void onFailure(Throwable t) {
382   *         reportError(t);
383   *       }
384   *     }, executor);
385   * }</pre>
386   *
387   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
388   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
389   * listeners are also applicable to heavyweight callbacks passed to this method.
390   *
391   * <p>For a more general interface to attach a completion listener, see {@link #addListener}.
392   *
393   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#whenComplete} and
394   * {@link java.util.concurrent.CompletableFuture#whenCompleteAsync}. It also serves the use case
395   * of {@link java.util.concurrent.CompletableFuture#thenAccept} and {@link
396   * java.util.concurrent.CompletableFuture#thenAcceptAsync}.
397   *
398   * @param callback The callback to invoke when this {@code Future} is completed.
399   * @param executor The executor to run {@code callback} when the future completes.
400   */
401  public final void addCallback(FutureCallback<? super V> callback, Executor executor) {
402    Futures.addCallback(this, callback, executor);
403  }
404}