001/*
002 * Copyright (C) 2006 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.util.concurrent.Internal.toNanosSaturated;
019
020import com.google.common.annotations.GwtCompatible;
021import com.google.common.annotations.GwtIncompatible;
022import com.google.common.annotations.J2ktIncompatible;
023import com.google.common.base.Function;
024import com.google.errorprone.annotations.CanIgnoreReturnValue;
025import com.google.errorprone.annotations.DoNotMock;
026import com.google.errorprone.annotations.InlineMe;
027import java.time.Duration;
028import java.util.concurrent.ExecutionException;
029import java.util.concurrent.Executor;
030import java.util.concurrent.ScheduledExecutorService;
031import java.util.concurrent.TimeUnit;
032import java.util.concurrent.TimeoutException;
033import org.jspecify.annotations.Nullable;
034
035/**
036 * A {@link ListenableFuture} that supports fluent chains of operations. For example:
037 *
038 * <pre>{@code
039 * ListenableFuture<Boolean> adminIsLoggedIn =
040 *     FluentFuture.from(usersDatabase.getAdminUser())
041 *         .transform(User::getId, directExecutor())
042 *         .transform(ActivityService::isLoggedIn, threadPool)
043 *         .catching(RpcException.class, e -> false, directExecutor());
044 * }</pre>
045 *
046 * <h3>Alternatives</h3>
047 *
048 * <h4>Frameworks</h4>
049 *
050 * <p>When chaining together a graph of asynchronous operations, you will often find it easier to
051 * use a framework. Frameworks automate the process, often adding features like monitoring,
052 * debugging, and cancellation. Examples of frameworks include:
053 *
054 * <ul>
055 *   <li><a href="https://dagger.dev/producers.html">Dagger Producers</a>
056 * </ul>
057 *
058 * <h4>{@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage}
059 * </h4>
060 *
061 * <p>Users of {@code CompletableFuture} will likely want to continue using {@code
062 * CompletableFuture}. {@code FluentFuture} is targeted at people who use {@code ListenableFuture},
063 * who can't use Java 8, or who want an API more focused than {@code CompletableFuture}. (If you
064 * need to adapt between {@code CompletableFuture} and {@code ListenableFuture}, consider <a
065 * href="https://github.com/lukas-krecan/future-converter">Future Converter</a>.)
066 *
067 * <h3>Extension</h3>
068 *
069 * If you want a class like {@code FluentFuture} but with extra methods, we recommend declaring your
070 * own subclass of {@link ListenableFuture}, complete with a method like {@link #from} to adapt an
071 * existing {@code ListenableFuture}, implemented atop a {@link ForwardingListenableFuture} that
072 * forwards to that future and adds the desired methods.
073 *
074 * @since 23.0
075 */
076@DoNotMock("Use FluentFuture.from(Futures.immediate*Future) or SettableFuture")
077@GwtCompatible(emulated = true)
078public abstract class FluentFuture<V extends @Nullable Object>
079    extends GwtFluentFutureCatchingSpecialization<V> {
080
081  /**
082   * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring
083   * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}.
084   */
085  abstract static class TrustedFuture<V extends @Nullable Object> extends FluentFuture<V>
086      implements AbstractFuture.Trusted<V> {
087    @CanIgnoreReturnValue
088    @Override
089    @ParametricNullness
090    public final V get() throws InterruptedException, ExecutionException {
091      return super.get();
092    }
093
094    @CanIgnoreReturnValue
095    @Override
096    @ParametricNullness
097    public final V get(long timeout, TimeUnit unit)
098        throws InterruptedException, ExecutionException, TimeoutException {
099      return super.get(timeout, unit);
100    }
101
102    @Override
103    public final boolean isDone() {
104      return super.isDone();
105    }
106
107    @Override
108    public final boolean isCancelled() {
109      return super.isCancelled();
110    }
111
112    @Override
113    public final void addListener(Runnable listener, Executor executor) {
114      super.addListener(listener, executor);
115    }
116
117    @CanIgnoreReturnValue
118    @Override
119    public final boolean cancel(boolean mayInterruptIfRunning) {
120      return super.cancel(mayInterruptIfRunning);
121    }
122  }
123
124  FluentFuture() {}
125
126  /**
127   * Converts the given {@code ListenableFuture} to an equivalent {@code FluentFuture}.
128   *
129   * <p>If the given {@code ListenableFuture} is already a {@code FluentFuture}, it is returned
130   * directly. If not, it is wrapped in a {@code FluentFuture} that delegates all calls to the
131   * original {@code ListenableFuture}.
132   */
133  public static <V extends @Nullable Object> FluentFuture<V> from(ListenableFuture<V> future) {
134    return future instanceof FluentFuture
135        ? (FluentFuture<V>) future
136        : new ForwardingFluentFuture<V>(future);
137  }
138
139  /**
140   * Simply returns its argument.
141   *
142   * @deprecated no need to use this
143   * @since 28.0
144   */
145  @InlineMe(
146      replacement = "checkNotNull(future)",
147      staticImports = "com.google.common.base.Preconditions.checkNotNull")
148  @Deprecated
149  public static <V extends @Nullable Object> FluentFuture<V> from(FluentFuture<V> future) {
150    return checkNotNull(future);
151  }
152
153  /**
154   * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
155   * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
156   * fallback}. {@link Function#apply} is not invoked until the primary input has failed, so if the
157   * primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, an
158   * exception is thrown, this exception is used as the result of the output {@code Future}.
159   *
160   * <p>Usage example:
161   *
162   * <pre>{@code
163   * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
164   * // counters.
165   * ListenableFuture<Integer> faultTolerantFuture =
166   *     fetchCounters().catching(FetchException.class, x -> 0, directExecutor());
167   * }</pre>
168   *
169   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
170   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
171   * listeners are also applicable to heavyweight functions passed to this method.
172   *
173   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
174   * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
175   * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
176   * #transform}.
177   *
178   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
179   *     type is matched against the input's exception. "The input's exception" means the cause of
180   *     the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a
181   *     different kind of exception, that exception itself. To avoid hiding bugs and other
182   *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
183   *     Throwable.class} in particular.
184   * @param fallback the {@link Function} to be called if the input fails with the expected
185   *     exception type. The function's argument is the input's exception. "The input's exception"
186   *     means the cause of the {@link ExecutionException} thrown by {@code this.get()} or, if
187   *     {@code get()} throws a different kind of exception, that exception itself.
188   * @param executor the executor that runs {@code fallback} if the input fails
189   */
190  @J2ktIncompatible
191  @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
192  public final <X extends Throwable> FluentFuture<V> catching(
193      Class<X> exceptionType, Function<? super X, ? extends V> fallback, Executor executor) {
194    return (FluentFuture<V>) Futures.catching(this, exceptionType, fallback, executor);
195  }
196
197  /**
198   * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code
199   * Future} fails with the given {@code exceptionType}, from the result provided by the {@code
200   * fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has failed, so if
201   * the primary input succeeds, it is never invoked. If, during the invocation of {@code fallback},
202   * an exception is thrown, this exception is used as the result of the output {@code Future}.
203   *
204   * <p>Usage examples:
205   *
206   * <pre>{@code
207   * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch
208   * // counters.
209   * ListenableFuture<Integer> faultTolerantFuture =
210   *     fetchCounters().catchingAsync(
211   *         FetchException.class, x -> immediateFuture(0), directExecutor());
212   * }</pre>
213   *
214   * <p>The fallback can also choose to propagate the original exception when desired:
215   *
216   * <pre>{@code
217   * // Falling back to a zero counter only in case the exception was a
218   * // TimeoutException.
219   * ListenableFuture<Integer> faultTolerantFuture =
220   *     fetchCounters().catchingAsync(
221   *         FetchException.class,
222   *         e -> {
223   *           if (omitDataOnFetchFailure) {
224   *             return immediateFuture(0);
225   *           }
226   *           throw e;
227   *         },
228   *         directExecutor());
229   * }</pre>
230   *
231   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
232   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
233   * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
234   * {@code directExecutor} functions should avoid heavyweight operations inside {@code
235   * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
236   * completing the returned {@code Future}.)
237   *
238   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It
239   * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle}
240   * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link
241   * #transform}.
242   *
243   * @param exceptionType the exception type that triggers use of {@code fallback}. The exception
244   *     type is matched against the input's exception. "The input's exception" means the cause of
245   *     the {@link ExecutionException} thrown by {@code this.get()} or, if {@code get()} throws a
246   *     different kind of exception, that exception itself. To avoid hiding bugs and other
247   *     unrecoverable errors, callers should prefer more specific types, avoiding {@code
248   *     Throwable.class} in particular.
249   * @param fallback the {@link AsyncFunction} to be called if the input fails with the expected
250   *     exception type. The function's argument is the input's exception. "The input's exception"
251   *     means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if
252   *     {@code get()} throws a different kind of exception, that exception itself.
253   * @param executor the executor that runs {@code fallback} if the input fails
254   */
255  @J2ktIncompatible
256  @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class")
257  public final <X extends Throwable> FluentFuture<V> catchingAsync(
258      Class<X> exceptionType, AsyncFunction<? super X, ? extends V> fallback, Executor executor) {
259    return (FluentFuture<V>) Futures.catchingAsync(this, exceptionType, fallback, executor);
260  }
261
262  /**
263   * Returns a future that delegates to this future but will finish early (via a {@link
264   * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires.
265   * If the timeout expires, not only will the output future finish, but also the input future
266   * ({@code this}) will be cancelled and interrupted.
267   *
268   * @param timeout when to time out the future
269   * @param scheduledExecutor The executor service to enforce the timeout.
270   * @since 33.4.0 (but since 28.0 in the JRE flavor)
271   */
272  @J2ktIncompatible
273  @GwtIncompatible // ScheduledExecutorService
274  @SuppressWarnings("Java7ApiChecker")
275  @IgnoreJRERequirement // Users will use this only if they're already using Duration.
276  public final FluentFuture<V> withTimeout(
277      Duration timeout, ScheduledExecutorService scheduledExecutor) {
278    return withTimeout(toNanosSaturated(timeout), TimeUnit.NANOSECONDS, scheduledExecutor);
279  }
280
281  /**
282   * Returns a future that delegates to this future but will finish early (via a {@link
283   * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires.
284   * If the timeout expires, not only will the output future finish, but also the input future
285   * ({@code this}) will be cancelled and interrupted.
286   *
287   * @param timeout when to time out the future
288   * @param unit the time unit of the time parameter
289   * @param scheduledExecutor The executor service to enforce the timeout.
290   */
291  @J2ktIncompatible
292  @GwtIncompatible // ScheduledExecutorService
293  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
294  public final FluentFuture<V> withTimeout(
295      long timeout, TimeUnit unit, ScheduledExecutorService scheduledExecutor) {
296    return (FluentFuture<V>) Futures.withTimeout(this, timeout, unit, scheduledExecutor);
297  }
298
299  /**
300   * Returns a new {@code Future} whose result is asynchronously derived from the result of this
301   * {@code Future}. If the input {@code Future} fails, the returned {@code Future} fails with the
302   * same exception (and the function is not invoked).
303   *
304   * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced
305   * by applying the given {@code AsyncFunction} to the result of the original {@code Future}.
306   * Example usage:
307   *
308   * <pre>{@code
309   * FluentFuture<RowKey> rowKeyFuture = FluentFuture.from(indexService.lookUp(query));
310   * ListenableFuture<QueryResult> queryFuture =
311   *     rowKeyFuture.transformAsync(dataService::readFuture, executor);
312   * }</pre>
313   *
314   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
315   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
316   * listeners are also applicable to heavyweight functions passed to this method. (Specifically,
317   * {@code directExecutor} functions should avoid heavyweight operations inside {@code
318   * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for
319   * completing the returned {@code Future}.)
320   *
321   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
322   * input future and that of the future returned by the chain function. That is, if the returned
323   * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the
324   * other two is cancelled, the returned {@code Future} will receive a callback in which it will
325   * attempt to cancel itself.
326   *
327   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenCompose} and
328   * {@link java.util.concurrent.CompletableFuture#thenComposeAsync}. It can also serve some of the
329   * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
330   * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
331   *
332   * @param function A function to transform the result of this future to the result of the output
333   *     future
334   * @param executor Executor to run the function in.
335   * @return A future that holds result of the function (if the input succeeded) or the original
336   *     input's failure (if not)
337   */
338  public final <T extends @Nullable Object> FluentFuture<T> transformAsync(
339      AsyncFunction<? super V, T> function, Executor executor) {
340    return (FluentFuture<T>) Futures.transformAsync(this, function, executor);
341  }
342
343  /**
344   * Returns a new {@code Future} whose result is derived from the result of this {@code Future}. If
345   * this input {@code Future} fails, the returned {@code Future} fails with the same exception (and
346   * the function is not invoked). Example usage:
347   *
348   * <pre>{@code
349   * ListenableFuture<List<Row>> rowsFuture =
350   *     queryFuture.transform(QueryResult::getRows, executor);
351   * }</pre>
352   *
353   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
354   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
355   * listeners are also applicable to heavyweight functions passed to this method.
356   *
357   * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the
358   * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel
359   * the input, and if the input is cancelled, the returned {@code Future} will receive a callback
360   * in which it will attempt to cancel itself.
361   *
362   * <p>An example use of this method is to convert a serializable object returned from an RPC into
363   * a POJO.
364   *
365   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenApply} and
366   * {@link java.util.concurrent.CompletableFuture#thenApplyAsync}. It can also serve some of the
367   * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link
368   * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}.
369   *
370   * @param function A Function to transform the results of this future to the results of the
371   *     returned future.
372   * @param executor Executor to run the function in.
373   * @return A future that holds result of the transformation.
374   */
375  public final <T extends @Nullable Object> FluentFuture<T> transform(
376      Function<? super V, T> function, Executor executor) {
377    return (FluentFuture<T>) Futures.transform(this, function, executor);
378  }
379
380  /**
381   * Registers separate success and failure callbacks to be run when this {@code Future}'s
382   * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the
383   * computation is already complete, immediately.
384   *
385   * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of
386   * callbacks, but any callback added through this method is guaranteed to be called once the
387   * computation is complete.
388   *
389   * <p>Example:
390   *
391   * <pre>{@code
392   * future.addCallback(
393   *     new FutureCallback<QueryResult>() {
394   *       public void onSuccess(QueryResult result) {
395   *         storeInCache(result);
396   *       }
397   *       public void onFailure(Throwable t) {
398   *         reportError(t);
399   *       }
400   *     }, executor);
401   * }</pre>
402   *
403   * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See
404   * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight
405   * listeners are also applicable to heavyweight callbacks passed to this method.
406   *
407   * <p>For a more general interface to attach a completion listener, see {@link #addListener}.
408   *
409   * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#whenComplete} and
410   * {@link java.util.concurrent.CompletableFuture#whenCompleteAsync}. It also serves the use case
411   * of {@link java.util.concurrent.CompletableFuture#thenAccept} and {@link
412   * java.util.concurrent.CompletableFuture#thenAcceptAsync}.
413   *
414   * @param callback The callback to invoke when this {@code Future} is completed.
415   * @param executor The executor to run {@code callback} when the future completes.
416   */
417  public final void addCallback(FutureCallback<? super V> callback, Executor executor) {
418    Futures.addCallback(this, callback, executor);
419  }
420}