001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.util.concurrent.Internal.toNanosSaturated;
019
020import com.google.common.annotations.Beta;
021import com.google.common.annotations.GwtCompatible;
022import com.google.common.annotations.GwtIncompatible;
023import com.google.common.base.Function;
024import com.google.errorprone.annotations.CanIgnoreReturnValue;
025import com.google.errorprone.annotations.DoNotMock;
026import java.time.Duration;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Executor;
029import java.util.concurrent.ScheduledExecutorService;
030import java.util.concurrent.TimeUnit;
031import java.util.concurrent.TimeoutException;
032import org.checkerframework.checker.nullness.qual.Nullable;
033
034/**
035 * A {@link ListenableFuture} that supports fluent chains of operations. For example:
036 *
037 * <pre>{@code
038 * ListenableFuture<Boolean> adminIsLoggedIn =
039 *     FluentFuture.from(usersDatabase.getAdminUser())
040 *         .transform(User::getId, directExecutor())
041 *         .transform(ActivityService::isLoggedIn, threadPool)
042 *         .catching(RpcException.class, e -> false, directExecutor());
043 * }</pre>
044 *
045 * <h3>Alternatives</h3>
046 *
047 * <h4>Frameworks</h4>
048 *
049 * <p>When chaining together a graph of asynchronous operations, you will often find it easier to
050 * use a framework. Frameworks automate the process, often adding features like monitoring,
051 * debugging, and cancellation. Examples of frameworks include:
052 *
053 * <ul>
054 *   <li><a href="https://dagger.dev/producers.html">Dagger Producers</a>
055 * </ul>
056 *
057 * <h4>{@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage}
058 * </h4>
059 *
060 * <p>Users of {@code CompletableFuture} will likely want to continue using {@code
061 * CompletableFuture}. {@code FluentFuture} is targeted at people who use {@code ListenableFuture},
062 * who can't use Java 8, or who want an API more focused than {@code CompletableFuture}. (If you
063 * need to adapt between {@code CompletableFuture} and {@code ListenableFuture}, consider <a
064 * href="https://github.com/lukas-krecan/future-converter">Future Converter</a>.)
065 *
066 * <h3>Extension</h3>
067 *
068 * If you want a class like {@code FluentFuture} but with extra methods, we recommend declaring your
069 * own subclass of {@link ListenableFuture}, complete with a method like {@link #from} to adapt an
070 * existing {@code ListenableFuture}, implemented atop a {@link ForwardingListenableFuture} that
071 * forwards to that future and adds the desired methods.
072 *
073 * @since 23.0
074 */
075@Beta
076@DoNotMock("Use FluentFuture.from(Futures.immediate*Future) or SettableFuture")
077@GwtCompatible(emulated = true)
078@ElementTypesAreNonnullByDefault
079public abstract class FluentFuture<V extends @Nullable Object>
080    extends GwtFluentFutureCatchingSpecialization<V> {
081
082  /**
083   * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring
084   * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}.
085   */
086  abstract static class TrustedFuture<V extends @Nullable Object> extends FluentFuture<V>
087      implements AbstractFuture.Trusted<V> {
088    @CanIgnoreReturnValue
089    @Override
090    @ParametricNullness
091    public final V get() throws InterruptedException, ExecutionException {
092      return super.get();
093    }
094
095    @CanIgnoreReturnValue
096    @Override
097    @ParametricNullness
098    public final V get(long timeout, TimeUnit unit)
099        throws InterruptedException, ExecutionException, TimeoutException {
100      return super.get(timeout, unit);
101    }
102
103    @Override
104    public final boolean isDone() {
105      return super.isDone();
106    }
107
108    @Override
109    public final boolean isCancelled() {
110      return super.isCancelled();
111    }
112
113    @Override
114    public final void addListener(Runnable listener, Executor executor) {
115      super.addListener(listener, executor);
116    }
117
118    @CanIgnoreReturnValue
119    @Override
120    public final boolean cancel(boolean mayInterruptIfRunning) {
121      return super.cancel(mayInterruptIfRunning);
122    }
123  }
124
125  FluentFuture() {}
126
127  /**
128   * Converts the given {@code ListenableFuture} to an equivalent {@code FluentFuture}.
129   *
130   * <p>If the given {@code ListenableFuture} is already a {@code FluentFuture}, it is returned
131   * directly. If not, it is wrapped in a {@code FluentFuture} that delegates all calls to the
132   * original {@code ListenableFuture}.
133   */
134  public static <V extends @Nullable Object> FluentFuture<V> from(ListenableFuture<V> future) {
135    return future instanceof FluentFuture
136        ? (FluentFuture<V>) future
137        : new ForwardingFluentFuture<V>(future);
138  }
139
140  /**
141   * Simply returns its argument.
142   *
143   * @deprecated no need to use this
144   * @since 28.0
145   */
146  @Deprecated
147  public static <V extends @Nullable Object> FluentFuture<V> from(FluentFuture<V> future) {
148    return checkNotNull(future);
149  }
150
151  /**
152   * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
153   * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
154   * fallback}. {@link Function#apply} is not invoked until the primary input has failed, so if the
155   * primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, an
156   * exception is thrown, this exception is used as the result of the output {@code Future}.
157   *
158   * <p>Usage example:
159   *
160   * <pre>{@code
161   * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
162   * // counters.
163   * ListenableFuture<Integer> faultTolerantFuture =
164   *     fetchCounters().catching(FetchException.class, x -> 0, directExecutor());
165   * }</pre>
166   *
167   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
168   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
169   * listeners are also applicable to heavyweight functions passed to this method.
170   *
171   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
172   * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
173   * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
174   * #transform}.
175   *
176   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
177   *     type is matched against the input's exception. "The input's exception" means the cause of
178   *     the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a
179   *     different kind of exception, that exception itself. To avoid hiding bugs and other
180   *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
181   *     Throwable.class} in particular.
182   * @param fallback the {@link Function} to be called if the input fails with the expected
183   *     exception type. The function's argument is the input's exception. "The input's exception"
184   *     means the cause of the {@link ExecutionException} thrown by {@code this.get()} or, if
185   *     {@code get()} throws a different kind of exception, that exception itself.
186   * @param executor the executor that runs {@code fallback} if the input fails
187   */
188  @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
189  public final <X extends Throwable> FluentFuture<V> catching(
190      Class<X> exceptionType, Function<? super X, ? extends V> fallback, Executor executor) {
191    return (FluentFuture<V>) Futures.catching(this, exceptionType, fallback, executor);
192  }
193
194  /**
195   * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
196   * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
197   * fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has failed, so if
198   * the primary input succeeds, it is never invoked. If, during the invocation of {@code fallback},
199   * an exception is thrown, this exception is used as the result of the output {@code Future}.
200   *
201   * <p>Usage examples:
202   *
203   * <pre>{@code
204   * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
205   * // counters.
206   * ListenableFuture<Integer> faultTolerantFuture =
207   *     fetchCounters().catchingAsync(
208   *         FetchException.class, x -> immediateFuture(0), directExecutor());
209   * }</pre>
210   *
211   * <p>The fallback can also choose to propagate the original exception when desired:
212   *
213   * <pre>{@code
214   * // Falling back to a zero counter only in case the exception was a
215   * // TimeoutException.
216   * ListenableFuture<Integer> faultTolerantFuture =
217   *     fetchCounters().catchingAsync(
218   *         FetchException.class,
219   *         e -> {
220   *           if (omitDataOnFetchFailure) {
221   *             return immediateFuture(0);
222   *           }
223   *           throw e;
224   *         },
225   *         directExecutor());
226   * }</pre>
227   *
228   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
229   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
230   * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
231   * {@code directExecutor} functions should avoid heavyweight operations inside {@code
232   * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
233   * completing the returned {@code Future}.)
234   *
235   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
236   * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
237   * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
238   * #transform}.
239   *
240   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
241   *     type is matched against the input's exception. "The input's exception" means the cause of
242   *     the {@link ExecutionException} thrown by {@code this.get()} or, if {@code get()} throws a
243   *     different kind of exception, that exception itself. To avoid hiding bugs and other
244   *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
245   *     Throwable.class} in particular.
246   * @param fallback the {@link AsyncFunction} to be called if the input fails with the expected
247   *     exception type. The function's argument is the input's exception. "The input's exception"
248   *     means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if
249   *     {@code get()} throws a different kind of exception, that exception itself.
250   * @param executor the executor that runs {@code fallback} if the input fails
251   */
252  @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
253  public final <X extends Throwable> FluentFuture<V> catchingAsync(
254      Class<X> exceptionType, AsyncFunction<? super X, ? extends V> fallback, Executor executor) {
255    return (FluentFuture<V>) Futures.catchingAsync(this, exceptionType, fallback, executor);
256  }
257
258  /**
259   * Returns a future that delegates to this future but will finish early (via a {@link
260   * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires.
261   * If the timeout expires, not only will the output future finish, but also the input future
262   * ({@code this}) will be cancelled and interrupted.
263   *
264   * @param timeout when to time out the future
265   * @param scheduledExecutor The executor service to enforce the timeout.
266   * @since 28.0
267   */
268  @GwtIncompatible // ScheduledExecutorService
269  public final FluentFuture<V> withTimeout(
270      Duration timeout, ScheduledExecutorService scheduledExecutor) {
271    return withTimeout(toNanosSaturated(timeout), TimeUnit.NANOSECONDS, scheduledExecutor);
272  }
273
274  /**
275   * Returns a future that delegates to this future but will finish early (via a {@link
276   * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires.
277   * If the timeout expires, not only will the output future finish, but also the input future
278   * ({@code this}) will be cancelled and interrupted.
279   *
280   * @param timeout when to time out the future
281   * @param unit the time unit of the time parameter
282   * @param scheduledExecutor The executor service to enforce the timeout.
283   */
284  @GwtIncompatible // ScheduledExecutorService
285  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
286  public final FluentFuture<V> withTimeout(
287      long timeout, TimeUnit unit, ScheduledExecutorService scheduledExecutor) {
288    return (FluentFuture<V>) Futures.withTimeout(this, timeout, unit, scheduledExecutor);
289  }
290
291  /**
292   * Returns a new {@code Future} whose result is asynchronously derived from the result of this
293   * {@code Future}. If the input {@code Future} fails, the returned {@code Future} fails with the
294   * same exception (and the function is not invoked).
295   *
296   * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced
297   * by applying the given {@code AsyncFunction} to the result of the original {@code Future}.
298   * Example usage:
299   *
300   * <pre>{@code
301   * FluentFuture<RowKey> rowKeyFuture = FluentFuture.from(indexService.lookUp(query));
302   * ListenableFuture<QueryResult> queryFuture =
303   *     rowKeyFuture.transformAsync(dataService::readFuture, executor);
304   * }</pre>
305   *
306   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
307   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
308   * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
309   * {@code directExecutor} functions should avoid heavyweight operations inside {@code
310   * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
311   * completing the returned {@code Future}.)
312   *
313   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
314   * input future and that of the future returned by the chain function. That is, if the returned
315   * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the
316   * other two is cancelled, the returned {@code Future} will receive a callback in which it will
317   * attempt to cancel itself.
318   *
319   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenCompose} and
320   * {@link java.util.concurrent.CompletableFuture#thenComposeAsync}. It can also serve some of the
321   * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
322   * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
323   *
324   * @param function A function to transform the result of this future to the result of the output
325   *     future
326   * @param executor Executor to run the function in.
327   * @return A future that holds result of the function (if the input succeeded) or the original
328   *     input's failure (if not)
329   */
330  public final <T extends @Nullable Object> FluentFuture<T> transformAsync(
331      AsyncFunction<? super V, T> function, Executor executor) {
332    return (FluentFuture<T>) Futures.transformAsync(this, function, executor);
333  }
334
335  /**
336   * Returns a new {@code Future} whose result is derived from the result of this {@code Future}. If
337   * this input {@code Future} fails, the returned {@code Future} fails with the same exception (and
338   * the function is not invoked). Example usage:
339   *
340   * <pre>{@code
341   * ListenableFuture<List<Row>> rowsFuture =
342   *     queryFuture.transform(QueryResult::getRows, executor);
343   * }</pre>
344   *
345   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
346   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
347   * listeners are also applicable to heavyweight functions passed to this method.
348   *
349   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
350   * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel
351   * the input, and if the input is cancelled, the returned {@code Future} will receive a callback
352   * in which it will attempt to cancel itself.
353   *
354   * <p>An example use of this method is to convert a serializable object returned from an RPC into
355   * a POJO.
356   *
357   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenApply} and
358   * {@link java.util.concurrent.CompletableFuture#thenApplyAsync}. It can also serve some of the
359   * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
360   * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
361   *
362   * @param function A Function to transform the results of this future to the results of the
363   *     returned future.
364   * @param executor Executor to run the function in.
365   * @return A future that holds result of the transformation.
366   */
367  public final <T extends @Nullable Object> FluentFuture<T> transform(
368      Function<? super V, T> function, Executor executor) {
369    return (FluentFuture<T>) Futures.transform(this, function, executor);
370  }
371
372  /**
373   * Registers separate success and failure callbacks to be run when this {@code Future}'s
374   * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the
375   * computation is already complete, immediately.
376   *
377   * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of
378   * callbacks, but any callback added through this method is guaranteed to be called once the
379   * computation is complete.
380   *
381   * <p>Example:
382   *
383   * <pre>{@code
384   * future.addCallback(
385   *     new FutureCallback<QueryResult>() {
386   *       public void onSuccess(QueryResult result) {
387   *         storeInCache(result);
388   *       }
389   *       public void onFailure(Throwable t) {
390   *         reportError(t);
391   *       }
392   *     }, executor);
393   * }</pre>
394   *
395   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
396   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
397   * listeners are also applicable to heavyweight callbacks passed to this method.
398   *
399   * <p>For a more general interface to attach a completion listener, see {@link #addListener}.
400   *
401   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#whenComplete} and
402   * {@link java.util.concurrent.CompletableFuture#whenCompleteAsync}. It also serves the use case
403   * of {@link java.util.concurrent.CompletableFuture#thenAccept} and {@link
404   * java.util.concurrent.CompletableFuture#thenAcceptAsync}.
405   *
406   * @param callback The callback to invoke when this {@code Future} is completed.
407   * @param executor The executor to run {@code callback} when the future completes.
408   */
409  public final void addCallback(FutureCallback<? super V> callback, Executor executor) {
410    Futures.addCallback(this, callback, executor);
411  }
412}