001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018
019import com.google.common.annotations.Beta;
020import com.google.common.annotations.GwtCompatible;
021import com.google.common.annotations.GwtIncompatible;
022import com.google.common.base.Function;
023import com.google.errorprone.annotations.CanIgnoreReturnValue;
024import java.util.concurrent.ExecutionException;
025import java.util.concurrent.Executor;
026import java.util.concurrent.ScheduledExecutorService;
027import java.util.concurrent.TimeUnit;
028import java.util.concurrent.TimeoutException;
029
030/**
031 * A {@link ListenableFuture} that supports fluent chains of operations. For example:
032 *
033 * <pre>{@code
034 * ListenableFuture<Boolean> adminIsLoggedIn =
035 *     FluentFuture.from(usersDatabase.getAdminUser())
036 *         .transform(User::getId, directExecutor())
037 *         .transform(ActivityService::isLoggedIn, threadPool)
038 *         .catching(RpcException.class, e -> false, directExecutor());
039 * }</pre>
040 *
041 * <h3>Alternatives</h3>
042 *
043 * <h4>Frameworks</h4>
044 *
045 * <p>When chaining together a graph of asynchronous operations, you will often find it easier to
046 * use a framework. Frameworks automate the process, often adding features like monitoring,
047 * debugging, and cancellation. Examples of frameworks include:
048 *
049 * <ul>
050 *   <li><a href="http://dagger.dev/producers.html">Dagger Producers</a>
051 * </ul>
052 *
053 * <h4>{@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage}
054 * </h4>
055 *
056 * <p>Users of {@code CompletableFuture} will likely want to continue using {@code
057 * CompletableFuture}. {@code FluentFuture} is targeted at people who use {@code ListenableFuture},
058 * who can't use Java 8, or who want an API more focused than {@code CompletableFuture}. (If you
059 * need to adapt between {@code CompletableFuture} and {@code ListenableFuture}, consider <a
060 * href="https://github.com/lukas-krecan/future-converter">Future Converter</a>.)
061 *
062 * <h3>Extension</h3>
063 *
064 * If you want a class like {@code FluentFuture} but with extra methods, we recommend declaring your
065 * own subclass of {@link ListenableFuture}, complete with a method like {@link #from} to adapt an
066 * existing {@code ListenableFuture}, implemented atop a {@link ForwardingListenableFuture} that
067 * forwards to that future and adds the desired methods.
068 *
069 * @since 23.0
070 */
071@Beta
072@GwtCompatible(emulated = true)
073public abstract class FluentFuture<V> extends GwtFluentFutureCatchingSpecialization<V> {
074
075  /**
076   * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring
077   * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}.
078   */
079  abstract static class TrustedFuture<V> extends FluentFuture<V>
080      implements AbstractFuture.Trusted<V> {
081    @CanIgnoreReturnValue
082    @Override
083    public final V get() throws InterruptedException, ExecutionException {
084      return super.get();
085    }
086
087    @CanIgnoreReturnValue
088    @Override
089    public final V get(long timeout, TimeUnit unit)
090        throws InterruptedException, ExecutionException, TimeoutException {
091      return super.get(timeout, unit);
092    }
093
094    @Override
095    public final boolean isDone() {
096      return super.isDone();
097    }
098
099    @Override
100    public final boolean isCancelled() {
101      return super.isCancelled();
102    }
103
104    @Override
105    public final void addListener(Runnable listener, Executor executor) {
106      super.addListener(listener, executor);
107    }
108
109    @CanIgnoreReturnValue
110    @Override
111    public final boolean cancel(boolean mayInterruptIfRunning) {
112      return super.cancel(mayInterruptIfRunning);
113    }
114  }
115
116  FluentFuture() {}
117
118  /**
119   * Converts the given {@code ListenableFuture} to an equivalent {@code FluentFuture}.
120   *
121   * <p>If the given {@code ListenableFuture} is already a {@code FluentFuture}, it is returned
122   * directly. If not, it is wrapped in a {@code FluentFuture} that delegates all calls to the
123   * original {@code ListenableFuture}.
124   */
125  public static <V> FluentFuture<V> from(ListenableFuture<V> future) {
126    return future instanceof FluentFuture
127        ? (FluentFuture<V>) future
128        : new ForwardingFluentFuture<V>(future);
129  }
130
131  /**
132   * Simply returns its argument.
133   *
134   * @deprecated no need to use this
135   * @since 28.0
136   */
137  @Deprecated
138  public static <V> FluentFuture<V> from(FluentFuture<V> future) {
139    return checkNotNull(future);
140  }
141
142  /**
143   * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
144   * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
145   * fallback}. {@link Function#apply} is not invoked until the primary input has failed, so if the
146   * primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, an
147   * exception is thrown, this exception is used as the result of the output {@code Future}.
148   *
149   * <p>Usage example:
150   *
151   * <pre>{@code
152   * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
153   * // counters.
154   * ListenableFuture<Integer> faultTolerantFuture =
155   *     fetchCounters().catching(FetchException.class, x -> 0, directExecutor());
156   * }</pre>
157   *
158   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
159   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
160   * listeners are also applicable to heavyweight functions passed to this method.
161   *
162   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
163   * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
164   * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
165   * #transform}.
166   *
167   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
168   *     type is matched against the input's exception. "The input's exception" means the cause of
169   *     the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a
170   *     different kind of exception, that exception itself. To avoid hiding bugs and other
171   *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
172   *     Throwable.class} in particular.
173   * @param fallback the {@link Function} to be called if the input fails with the expected
174   *     exception type. The function's argument is the input's exception. "The input's exception"
175   *     means the cause of the {@link ExecutionException} thrown by {@code this.get()} or, if
176   *     {@code get()} throws a different kind of exception, that exception itself.
177   * @param executor the executor that runs {@code fallback} if the input fails
178   */
179  @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
180  public final <X extends Throwable> FluentFuture<V> catching(
181      Class<X> exceptionType, Function<? super X, ? extends V> fallback, Executor executor) {
182    return (FluentFuture<V>) Futures.catching(this, exceptionType, fallback, executor);
183  }
184
185  /**
186   * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
187   * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
188   * fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has failed, so if
189   * the primary input succeeds, it is never invoked. If, during the invocation of {@code fallback},
190   * an exception is thrown, this exception is used as the result of the output {@code Future}.
191   *
192   * <p>Usage examples:
193   *
194   * <pre>{@code
195   * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
196   * // counters.
197   * ListenableFuture<Integer> faultTolerantFuture =
198   *     fetchCounters().catchingAsync(
199   *         FetchException.class, x -> immediateFuture(0), directExecutor());
200   * }</pre>
201   *
202   * <p>The fallback can also choose to propagate the original exception when desired:
203   *
204   * <pre>{@code
205   * // Falling back to a zero counter only in case the exception was a
206   * // TimeoutException.
207   * ListenableFuture<Integer> faultTolerantFuture =
208   *     fetchCounters().catchingAsync(
209   *         FetchException.class,
210   *         e -> {
211   *           if (omitDataOnFetchFailure) {
212   *             return immediateFuture(0);
213   *           }
214   *           throw e;
215   *         },
216   *         directExecutor());
217   * }</pre>
218   *
219   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
220   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
221   * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
222   * {@code directExecutor} functions should avoid heavyweight operations inside {@code
223   * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
224   * completing the returned {@code Future}.)
225   *
226   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
227   * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
228   * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
229   * #transform}.
230   *
231   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
232   *     type is matched against the input's exception. "The input's exception" means the cause of
233   *     the {@link ExecutionException} thrown by {@code this.get()} or, if {@code get()} throws a
234   *     different kind of exception, that exception itself. To avoid hiding bugs and other
235   *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
236   *     Throwable.class} in particular.
237   * @param fallback the {@link AsyncFunction} to be called if the input fails with the expected
238   *     exception type. The function's argument is the input's exception. "The input's exception"
239   *     means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if
240   *     {@code get()} throws a different kind of exception, that exception itself.
241   * @param executor the executor that runs {@code fallback} if the input fails
242   */
243  @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
244  public final <X extends Throwable> FluentFuture<V> catchingAsync(
245      Class<X> exceptionType, AsyncFunction<? super X, ? extends V> fallback, Executor executor) {
246    return (FluentFuture<V>) Futures.catchingAsync(this, exceptionType, fallback, executor);
247  }
248
249  /**
250   * Returns a future that delegates to this future but will finish early (via a {@link
251   * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires.
252   * If the timeout expires, not only will the output future finish, but also the input future
253   * ({@code this}) will be cancelled and interrupted.
254   *
255   * @param timeout when to time out the future
256   * @param unit the time unit of the time parameter
257   * @param scheduledExecutor The executor service to enforce the timeout.
258   */
259  @GwtIncompatible // ScheduledExecutorService
260  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
261  public final FluentFuture<V> withTimeout(
262      long timeout, TimeUnit unit, ScheduledExecutorService scheduledExecutor) {
263    return (FluentFuture<V>) Futures.withTimeout(this, timeout, unit, scheduledExecutor);
264  }
265
266  /**
267   * Returns a new {@code Future} whose result is asynchronously derived from the result of this
268   * {@code Future}. If the input {@code Future} fails, the returned {@code Future} fails with the
269   * same exception (and the function is not invoked).
270   *
271   * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced
272   * by applying the given {@code AsyncFunction} to the result of the original {@code Future}.
273   * Example usage:
274   *
275   * <pre>{@code
276   * FluentFuture<RowKey> rowKeyFuture = FluentFuture.from(indexService.lookUp(query));
277   * ListenableFuture<QueryResult> queryFuture =
278   *     rowKeyFuture.transformAsync(dataService::readFuture, executor);
279   * }</pre>
280   *
281   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
282   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
283   * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
284   * {@code directExecutor} functions should avoid heavyweight operations inside {@code
285   * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
286   * completing the returned {@code Future}.)
287   *
288   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
289   * input future and that of the future returned by the chain function. That is, if the returned
290   * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the
291   * other two is cancelled, the returned {@code Future} will receive a callback in which it will
292   * attempt to cancel itself.
293   *
294   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenCompose} and
295   * {@link java.util.concurrent.CompletableFuture#thenComposeAsync}. It can also serve some of the
296   * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
297   * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
298   *
299   * @param function A function to transform the result of this future to the result of the output
300   *     future
301   * @param executor Executor to run the function in.
302   * @return A future that holds result of the function (if the input succeeded) or the original
303   *     input's failure (if not)
304   */
305  public final <T> FluentFuture<T> transformAsync(
306      AsyncFunction<? super V, T> function, Executor executor) {
307    return (FluentFuture<T>) Futures.transformAsync(this, function, executor);
308  }
309
310  /**
311   * Returns a new {@code Future} whose result is derived from the result of this {@code Future}. If
312   * this input {@code Future} fails, the returned {@code Future} fails with the same exception (and
313   * the function is not invoked). Example usage:
314   *
315   * <pre>{@code
316   * ListenableFuture<List<Row>> rowsFuture =
317   *     queryFuture.transform(QueryResult::getRows, executor);
318   * }</pre>
319   *
320   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
321   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
322   * listeners are also applicable to heavyweight functions passed to this method.
323   *
324   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
325   * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel
326   * the input, and if the input is cancelled, the returned {@code Future} will receive a callback
327   * in which it will attempt to cancel itself.
328   *
329   * <p>An example use of this method is to convert a serializable object returned from an RPC into
330   * a POJO.
331   *
332   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenApply} and
333   * {@link java.util.concurrent.CompletableFuture#thenApplyAsync}. It can also serve some of the
334   * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
335   * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
336   *
337   * @param function A Function to transform the results of this future to the results of the
338   *     returned future.
339   * @param executor Executor to run the function in.
340   * @return A future that holds result of the transformation.
341   */
342  public final <T> FluentFuture<T> transform(Function<? super V, T> function, Executor executor) {
343    return (FluentFuture<T>) Futures.transform(this, function, executor);
344  }
345
346  /**
347   * Registers separate success and failure callbacks to be run when this {@code Future}'s
348   * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the
349   * computation is already complete, immediately.
350   *
351   * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of
352   * callbacks, but any callback added through this method is guaranteed to be called once the
353   * computation is complete.
354   *
355   * <p>Example:
356   *
357   * <pre>{@code
358   * future.addCallback(
359   *     new FutureCallback<QueryResult>() {
360   *       public void onSuccess(QueryResult result) {
361   *         storeInCache(result);
362   *       }
363   *       public void onFailure(Throwable t) {
364   *         reportError(t);
365   *       }
366   *     }, executor);
367   * }</pre>
368   *
369   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
370   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
371   * listeners are also applicable to heavyweight callbacks passed to this method.
372   *
373   * <p>For a more general interface to attach a completion listener, see {@link #addListener}.
374   *
375   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#whenComplete} and
376   * {@link java.util.concurrent.CompletableFuture#whenCompleteAsync}. It also serves the use case
377   * of {@link java.util.concurrent.CompletableFuture#thenAccept} and {@link
378   * java.util.concurrent.CompletableFuture#thenAcceptAsync}.
379   *
380   * @param callback The callback to invoke when this {@code Future} is completed.
381   * @param executor The executor to run {@code callback} when the future completes.
382   */
383  public final void addCallback(FutureCallback<? super V> callback, Executor executor) {
384    Futures.addCallback(this, callback, executor);
385  }
386}