001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.util.concurrent.Internal.toNanosSaturated;
019
020import com.google.common.annotations.Beta;
021import com.google.common.annotations.GwtCompatible;
022import com.google.common.annotations.GwtIncompatible;
023import com.google.common.base.Function;
024import com.google.errorprone.annotations.CanIgnoreReturnValue;
025import com.google.errorprone.annotations.DoNotMock;
026import java.time.Duration;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Executor;
029import java.util.concurrent.ScheduledExecutorService;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.TimeoutException;
032
033/**
034 * A {@link ListenableFuture} that supports fluent chains of operations. For example:
035 *
036 * <pre>{@code
037 * ListenableFuture<Boolean> adminIsLoggedIn =
038 *     FluentFuture.from(usersDatabase.getAdminUser())
039 *         .transform(User::getId, directExecutor())
040 *         .transform(ActivityService::isLoggedIn, threadPool)
041 *         .catching(RpcException.class, e -> false, directExecutor());
042 * }</pre>
043 *
044 * <h3>Alternatives</h3>
045 *
046 * <h4>Frameworks</h4>
047 *
048 * <p>When chaining together a graph of asynchronous operations, you will often find it easier to
049 * use a framework. Frameworks automate the process, often adding features like monitoring,
050 * debugging, and cancellation. Examples of frameworks include:
051 *
052 * <ul>
053 *   <li><a href="http://dagger.dev/producers.html">Dagger Producers</a>
054 * </ul>
055 *
056 * <h4>{@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage}
057 * </h4>
058 *
059 * <p>Users of {@code CompletableFuture} will likely want to continue using {@code
060 * CompletableFuture}. {@code FluentFuture} is targeted at people who use {@code ListenableFuture},
061 * who can't use Java 8, or who want an API more focused than {@code CompletableFuture}. (If you
062 * need to adapt between {@code CompletableFuture} and {@code ListenableFuture}, consider <a
063 * href="https://github.com/lukas-krecan/future-converter">Future Converter</a>.)
064 *
065 * <h3>Extension</h3>
066 *
067 * If you want a class like {@code FluentFuture} but with extra methods, we recommend declaring your
068 * own subclass of {@link ListenableFuture}, complete with a method like {@link #from} to adapt an
069 * existing {@code ListenableFuture}, implemented atop a {@link ForwardingListenableFuture} that
070 * forwards to that future and adds the desired methods.
071 *
072 * @since 23.0
073 */
074@Beta
075@DoNotMock("Use FluentFuture.from(Futures.immediate*Future) or SettableFuture")
076@GwtCompatible(emulated = true)
077public abstract class FluentFuture<V> extends GwtFluentFutureCatchingSpecialization<V> {
078
079  /**
080   * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring
081   * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}.
082   */
083  abstract static class TrustedFuture<V> extends FluentFuture<V>
084      implements AbstractFuture.Trusted<V> {
085    @CanIgnoreReturnValue
086    @Override
087    public final V get() throws InterruptedException, ExecutionException {
088      return super.get();
089    }
090
091    @CanIgnoreReturnValue
092    @Override
093    public final V get(long timeout, TimeUnit unit)
094        throws InterruptedException, ExecutionException, TimeoutException {
095      return super.get(timeout, unit);
096    }
097
098    @Override
099    public final boolean isDone() {
100      return super.isDone();
101    }
102
103    @Override
104    public final boolean isCancelled() {
105      return super.isCancelled();
106    }
107
108    @Override
109    public final void addListener(Runnable listener, Executor executor) {
110      super.addListener(listener, executor);
111    }
112
113    @CanIgnoreReturnValue
114    @Override
115    public final boolean cancel(boolean mayInterruptIfRunning) {
116      return super.cancel(mayInterruptIfRunning);
117    }
118  }
119
120  FluentFuture() {}
121
122  /**
123   * Converts the given {@code ListenableFuture} to an equivalent {@code FluentFuture}.
124   *
125   * <p>If the given {@code ListenableFuture} is already a {@code FluentFuture}, it is returned
126   * directly. If not, it is wrapped in a {@code FluentFuture} that delegates all calls to the
127   * original {@code ListenableFuture}.
128   */
129  public static <V> FluentFuture<V> from(ListenableFuture<V> future) {
130    return future instanceof FluentFuture
131        ? (FluentFuture<V>) future
132        : new ForwardingFluentFuture<V>(future);
133  }
134
135  /**
136   * Simply returns its argument.
137   *
138   * @deprecated no need to use this
139   * @since 28.0
140   */
141  @Deprecated
142  public static <V> FluentFuture<V> from(FluentFuture<V> future) {
143    return checkNotNull(future);
144  }
145
146  /**
147   * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
148   * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
149   * fallback}. {@link Function#apply} is not invoked until the primary input has failed, so if the
150   * primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, an
151   * exception is thrown, this exception is used as the result of the output {@code Future}.
152   *
153   * <p>Usage example:
154   *
155   * <pre>{@code
156   * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
157   * // counters.
158   * ListenableFuture<Integer> faultTolerantFuture =
159   *     fetchCounters().catching(FetchException.class, x -> 0, directExecutor());
160   * }</pre>
161   *
162   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
163   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
164   * listeners are also applicable to heavyweight functions passed to this method.
165   *
166   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
167   * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
168   * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
169   * #transform}.
170   *
171   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
172   *     type is matched against the input's exception. "The input's exception" means the cause of
173   *     the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a
174   *     different kind of exception, that exception itself. To avoid hiding bugs and other
175   *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
176   *     Throwable.class} in particular.
177   * @param fallback the {@link Function} to be called if the input fails with the expected
178   *     exception type. The function's argument is the input's exception. "The input's exception"
179   *     means the cause of the {@link ExecutionException} thrown by {@code this.get()} or, if
180   *     {@code get()} throws a different kind of exception, that exception itself.
181   * @param executor the executor that runs {@code fallback} if the input fails
182   */
183  @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
184  public final <X extends Throwable> FluentFuture<V> catching(
185      Class<X> exceptionType, Function<? super X, ? extends V> fallback, Executor executor) {
186    return (FluentFuture<V>) Futures.catching(this, exceptionType, fallback, executor);
187  }
188
189  /**
190   * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
191   * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
192   * fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has failed, so if
193   * the primary input succeeds, it is never invoked. If, during the invocation of {@code fallback},
194   * an exception is thrown, this exception is used as the result of the output {@code Future}.
195   *
196   * <p>Usage examples:
197   *
198   * <pre>{@code
199   * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
200   * // counters.
201   * ListenableFuture<Integer> faultTolerantFuture =
202   *     fetchCounters().catchingAsync(
203   *         FetchException.class, x -> immediateFuture(0), directExecutor());
204   * }</pre>
205   *
206   * <p>The fallback can also choose to propagate the original exception when desired:
207   *
208   * <pre>{@code
209   * // Falling back to a zero counter only in case the exception was a
210   * // TimeoutException.
211   * ListenableFuture<Integer> faultTolerantFuture =
212   *     fetchCounters().catchingAsync(
213   *         FetchException.class,
214   *         e -> {
215   *           if (omitDataOnFetchFailure) {
216   *             return immediateFuture(0);
217   *           }
218   *           throw e;
219   *         },
220   *         directExecutor());
221   * }</pre>
222   *
223   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
224   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
225   * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
226   * {@code directExecutor} functions should avoid heavyweight operations inside {@code
227   * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
228   * completing the returned {@code Future}.)
229   *
230   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
231   * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
232   * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
233   * #transform}.
234   *
235   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
236   *     type is matched against the input's exception. "The input's exception" means the cause of
237   *     the {@link ExecutionException} thrown by {@code this.get()} or, if {@code get()} throws a
238   *     different kind of exception, that exception itself. To avoid hiding bugs and other
239   *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
240   *     Throwable.class} in particular.
241   * @param fallback the {@link AsyncFunction} to be called if the input fails with the expected
242   *     exception type. The function's argument is the input's exception. "The input's exception"
243   *     means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if
244   *     {@code get()} throws a different kind of exception, that exception itself.
245   * @param executor the executor that runs {@code fallback} if the input fails
246   */
247  @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
248  public final <X extends Throwable> FluentFuture<V> catchingAsync(
249      Class<X> exceptionType, AsyncFunction<? super X, ? extends V> fallback, Executor executor) {
250    return (FluentFuture<V>) Futures.catchingAsync(this, exceptionType, fallback, executor);
251  }
252
253  /**
254   * Returns a future that delegates to this future but will finish early (via a {@link
255   * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires.
256   * If the timeout expires, not only will the output future finish, but also the input future
257   * ({@code this}) will be cancelled and interrupted.
258   *
259   * @param timeout when to time out the future
260   * @param scheduledExecutor The executor service to enforce the timeout.
261   * @since 28.0
262   */
263  @GwtIncompatible // ScheduledExecutorService
264  public final FluentFuture<V> withTimeout(
265      Duration timeout, ScheduledExecutorService scheduledExecutor) {
266    return withTimeout(toNanosSaturated(timeout), TimeUnit.NANOSECONDS, scheduledExecutor);
267  }
268
269  /**
270   * Returns a future that delegates to this future but will finish early (via a {@link
271   * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires.
272   * If the timeout expires, not only will the output future finish, but also the input future
273   * ({@code this}) will be cancelled and interrupted.
274   *
275   * @param timeout when to time out the future
276   * @param unit the time unit of the time parameter
277   * @param scheduledExecutor The executor service to enforce the timeout.
278   */
279  @GwtIncompatible // ScheduledExecutorService
280  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
281  public final FluentFuture<V> withTimeout(
282      long timeout, TimeUnit unit, ScheduledExecutorService scheduledExecutor) {
283    return (FluentFuture<V>) Futures.withTimeout(this, timeout, unit, scheduledExecutor);
284  }
285
286  /**
287   * Returns a new {@code Future} whose result is asynchronously derived from the result of this
288   * {@code Future}. If the input {@code Future} fails, the returned {@code Future} fails with the
289   * same exception (and the function is not invoked).
290   *
291   * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced
292   * by applying the given {@code AsyncFunction} to the result of the original {@code Future}.
293   * Example usage:
294   *
295   * <pre>{@code
296   * FluentFuture<RowKey> rowKeyFuture = FluentFuture.from(indexService.lookUp(query));
297   * ListenableFuture<QueryResult> queryFuture =
298   *     rowKeyFuture.transformAsync(dataService::readFuture, executor);
299   * }</pre>
300   *
301   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
302   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
303   * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
304   * {@code directExecutor} functions should avoid heavyweight operations inside {@code
305   * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
306   * completing the returned {@code Future}.)
307   *
308   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
309   * input future and that of the future returned by the chain function. That is, if the returned
310   * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the
311   * other two is cancelled, the returned {@code Future} will receive a callback in which it will
312   * attempt to cancel itself.
313   *
314   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenCompose} and
315   * {@link java.util.concurrent.CompletableFuture#thenComposeAsync}. It can also serve some of the
316   * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
317   * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
318   *
319   * @param function A function to transform the result of this future to the result of the output
320   *     future
321   * @param executor Executor to run the function in.
322   * @return A future that holds result of the function (if the input succeeded) or the original
323   *     input's failure (if not)
324   */
325  public final <T> FluentFuture<T> transformAsync(
326      AsyncFunction<? super V, T> function, Executor executor) {
327    return (FluentFuture<T>) Futures.transformAsync(this, function, executor);
328  }
329
330  /**
331   * Returns a new {@code Future} whose result is derived from the result of this {@code Future}. If
332   * this input {@code Future} fails, the returned {@code Future} fails with the same exception (and
333   * the function is not invoked). Example usage:
334   *
335   * <pre>{@code
336   * ListenableFuture<List<Row>> rowsFuture =
337   *     queryFuture.transform(QueryResult::getRows, executor);
338   * }</pre>
339   *
340   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
341   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
342   * listeners are also applicable to heavyweight functions passed to this method.
343   *
344   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
345   * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel
346   * the input, and if the input is cancelled, the returned {@code Future} will receive a callback
347   * in which it will attempt to cancel itself.
348   *
349   * <p>An example use of this method is to convert a serializable object returned from an RPC into
350   * a POJO.
351   *
352   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenApply} and
353   * {@link java.util.concurrent.CompletableFuture#thenApplyAsync}. It can also serve some of the
354   * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
355   * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
356   *
357   * @param function A Function to transform the results of this future to the results of the
358   *     returned future.
359   * @param executor Executor to run the function in.
360   * @return A future that holds result of the transformation.
361   */
362  public final <T> FluentFuture<T> transform(Function<? super V, T> function, Executor executor) {
363    return (FluentFuture<T>) Futures.transform(this, function, executor);
364  }
365
366  /**
367   * Registers separate success and failure callbacks to be run when this {@code Future}'s
368   * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the
369   * computation is already complete, immediately.
370   *
371   * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of
372   * callbacks, but any callback added through this method is guaranteed to be called once the
373   * computation is complete.
374   *
375   * <p>Example:
376   *
377   * <pre>{@code
378   * future.addCallback(
379   *     new FutureCallback<QueryResult>() {
380   *       public void onSuccess(QueryResult result) {
381   *         storeInCache(result);
382   *       }
383   *       public void onFailure(Throwable t) {
384   *         reportError(t);
385   *       }
386   *     }, executor);
387   * }</pre>
388   *
389   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
390   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
391   * listeners are also applicable to heavyweight callbacks passed to this method.
392   *
393   * <p>For a more general interface to attach a completion listener, see {@link #addListener}.
394   *
395   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#whenComplete} and
396   * {@link java.util.concurrent.CompletableFuture#whenCompleteAsync}. It also serves the use case
397   * of {@link java.util.concurrent.CompletableFuture#thenAccept} and {@link
398   * java.util.concurrent.CompletableFuture#thenAcceptAsync}.
399   *
400   * @param callback The callback to invoke when this {@code Future} is completed.
401   * @param executor The executor to run {@code callback} when the future completes.
402   */
403  public final void addCallback(FutureCallback<? super V> callback, Executor executor) {
404    Futures.addCallback(this, callback, executor);
405  }
406}