001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.util.concurrent.Internal.toNanosSaturated; 019 020import com.google.common.annotations.GwtCompatible; 021import com.google.common.annotations.GwtIncompatible; 022import com.google.common.annotations.J2ktIncompatible; 023import com.google.common.base.Function; 024import com.google.errorprone.annotations.CanIgnoreReturnValue; 025import com.google.errorprone.annotations.DoNotMock; 026import com.google.errorprone.annotations.InlineMe; 027import java.time.Duration; 028import java.util.concurrent.ExecutionException; 029import java.util.concurrent.Executor; 030import java.util.concurrent.ScheduledExecutorService; 031import java.util.concurrent.TimeUnit; 032import java.util.concurrent.TimeoutException; 033import org.jspecify.annotations.Nullable; 034 035/** 036 * A {@link ListenableFuture} that supports fluent chains of operations. For example: 037 * 038 * <pre>{@code 039 * ListenableFuture<Boolean> adminIsLoggedIn = 040 * FluentFuture.from(usersDatabase.getAdminUser()) 041 * .transform(User::getId, directExecutor()) 042 * .transform(ActivityService::isLoggedIn, threadPool) 043 * .catching(RpcException.class, e -> false, directExecutor()); 044 * }</pre> 045 * 046 * <h3>Alternatives</h3> 047 * 048 * <h4>Frameworks</h4> 049 * 050 * <p>When chaining together a graph of asynchronous operations, you will often find it easier to 051 * use a framework. Frameworks automate the process, often adding features like monitoring, 052 * debugging, and cancellation. Examples of frameworks include: 053 * 054 * <ul> 055 * <li><a href="https://dagger.dev/producers.html">Dagger Producers</a> 056 * </ul> 057 * 058 * <h4>{@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage} 059 * </h4> 060 * 061 * <p>Users of {@code CompletableFuture} will likely want to continue using {@code 062 * CompletableFuture}. {@code FluentFuture} is targeted at people who use {@code ListenableFuture}, 063 * who can't use Java 8, or who want an API more focused than {@code CompletableFuture}. (If you 064 * need to adapt between {@code CompletableFuture} and {@code ListenableFuture}, consider <a 065 * href="https://github.com/lukas-krecan/future-converter">Future Converter</a>.) 066 * 067 * <h3>Extension</h3> 068 * 069 * If you want a class like {@code FluentFuture} but with extra methods, we recommend declaring your 070 * own subclass of {@link ListenableFuture}, complete with a method like {@link #from} to adapt an 071 * existing {@code ListenableFuture}, implemented atop a {@link ForwardingListenableFuture} that 072 * forwards to that future and adds the desired methods. 073 * 074 * @since 23.0 075 */ 076@DoNotMock("Use FluentFuture.from(Futures.immediate*Future) or SettableFuture") 077@GwtCompatible(emulated = true) 078public abstract class FluentFuture<V extends @Nullable Object> 079 extends GwtFluentFutureCatchingSpecialization<V> { 080 081 /** 082 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 083 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 084 */ 085 abstract static class TrustedFuture<V extends @Nullable Object> extends FluentFuture<V> 086 implements AbstractFuture.Trusted<V> { 087 @CanIgnoreReturnValue 088 @Override 089 @ParametricNullness 090 public final V get() throws InterruptedException, ExecutionException { 091 return super.get(); 092 } 093 094 @CanIgnoreReturnValue 095 @Override 096 @ParametricNullness 097 public final V get(long timeout, TimeUnit unit) 098 throws InterruptedException, ExecutionException, TimeoutException { 099 return super.get(timeout, unit); 100 } 101 102 @Override 103 public final boolean isDone() { 104 return super.isDone(); 105 } 106 107 @Override 108 public final boolean isCancelled() { 109 return super.isCancelled(); 110 } 111 112 @Override 113 public final void addListener(Runnable listener, Executor executor) { 114 super.addListener(listener, executor); 115 } 116 117 @CanIgnoreReturnValue 118 @Override 119 public final boolean cancel(boolean mayInterruptIfRunning) { 120 return super.cancel(mayInterruptIfRunning); 121 } 122 } 123 124 FluentFuture() {} 125 126 /** 127 * Converts the given {@code ListenableFuture} to an equivalent {@code FluentFuture}. 128 * 129 * <p>If the given {@code ListenableFuture} is already a {@code FluentFuture}, it is returned 130 * directly. If not, it is wrapped in a {@code FluentFuture} that delegates all calls to the 131 * original {@code ListenableFuture}. 132 */ 133 public static <V extends @Nullable Object> FluentFuture<V> from(ListenableFuture<V> future) { 134 return future instanceof FluentFuture 135 ? (FluentFuture<V>) future 136 : new ForwardingFluentFuture<V>(future); 137 } 138 139 /** 140 * Simply returns its argument. 141 * 142 * @deprecated no need to use this 143 * @since 28.0 144 */ 145 @InlineMe( 146 replacement = "checkNotNull(future)", 147 staticImports = "com.google.common.base.Preconditions.checkNotNull") 148 @Deprecated 149 public static <V extends @Nullable Object> FluentFuture<V> from(FluentFuture<V> future) { 150 return checkNotNull(future); 151 } 152 153 /** 154 * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code 155 * Future} fails with the given {@code exceptionType}, from the result provided by the {@code 156 * fallback}. {@link Function#apply} is not invoked until the primary input has failed, so if the 157 * primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, an 158 * exception is thrown, this exception is used as the result of the output {@code Future}. 159 * 160 * <p>Usage example: 161 * 162 * <pre>{@code 163 * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch 164 * // counters. 165 * ListenableFuture<Integer> faultTolerantFuture = 166 * fetchCounters().catching(FetchException.class, x -> 0, directExecutor()); 167 * }</pre> 168 * 169 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 170 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 171 * listeners are also applicable to heavyweight functions passed to this method. 172 * 173 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It 174 * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle} 175 * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link 176 * #transform}. 177 * 178 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 179 * type is matched against the input's exception. "The input's exception" means the cause of 180 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 181 * different kind of exception, that exception itself. To avoid hiding bugs and other 182 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 183 * Throwable.class} in particular. 184 * @param fallback the {@link Function} to be called if the input fails with the expected 185 * exception type. The function's argument is the input's exception. "The input's exception" 186 * means the cause of the {@link ExecutionException} thrown by {@code this.get()} or, if 187 * {@code get()} throws a different kind of exception, that exception itself. 188 * @param executor the executor that runs {@code fallback} if the input fails 189 */ 190 @J2ktIncompatible 191 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 192 public final <X extends Throwable> FluentFuture<V> catching( 193 Class<X> exceptionType, Function<? super X, ? extends V> fallback, Executor executor) { 194 return (FluentFuture<V>) Futures.catching(this, exceptionType, fallback, executor); 195 } 196 197 /** 198 * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code 199 * Future} fails with the given {@code exceptionType}, from the result provided by the {@code 200 * fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has failed, so if 201 * the primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, 202 * an exception is thrown, this exception is used as the result of the output {@code Future}. 203 * 204 * <p>Usage examples: 205 * 206 * <pre>{@code 207 * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch 208 * // counters. 209 * ListenableFuture<Integer> faultTolerantFuture = 210 * fetchCounters().catchingAsync( 211 * FetchException.class, x -> immediateFuture(0), directExecutor()); 212 * }</pre> 213 * 214 * <p>The fallback can also choose to propagate the original exception when desired: 215 * 216 * <pre>{@code 217 * // Falling back to a zero counter only in case the exception was a 218 * // TimeoutException. 219 * ListenableFuture<Integer> faultTolerantFuture = 220 * fetchCounters().catchingAsync( 221 * FetchException.class, 222 * e -> { 223 * if (omitDataOnFetchFailure) { 224 * return immediateFuture(0); 225 * } 226 * throw e; 227 * }, 228 * directExecutor()); 229 * }</pre> 230 * 231 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 232 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 233 * listeners are also applicable to heavyweight functions passed to this method. (Specifically, 234 * {@code directExecutor} functions should avoid heavyweight operations inside {@code 235 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 236 * completing the returned {@code Future}.) 237 * 238 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It 239 * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle} 240 * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link 241 * #transform}. 242 * 243 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 244 * type is matched against the input's exception. "The input's exception" means the cause of 245 * the {@link ExecutionException} thrown by {@code this.get()} or, if {@code get()} throws a 246 * different kind of exception, that exception itself. To avoid hiding bugs and other 247 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 248 * Throwable.class} in particular. 249 * @param fallback the {@link AsyncFunction} to be called if the input fails with the expected 250 * exception type. The function's argument is the input's exception. "The input's exception" 251 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 252 * {@code get()} throws a different kind of exception, that exception itself. 253 * @param executor the executor that runs {@code fallback} if the input fails 254 */ 255 @J2ktIncompatible 256 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 257 public final <X extends Throwable> FluentFuture<V> catchingAsync( 258 Class<X> exceptionType, AsyncFunction<? super X, ? extends V> fallback, Executor executor) { 259 return (FluentFuture<V>) Futures.catchingAsync(this, exceptionType, fallback, executor); 260 } 261 262 /** 263 * Returns a future that delegates to this future but will finish early (via a {@link 264 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires. 265 * If the timeout expires, not only will the output future finish, but also the input future 266 * ({@code this}) will be cancelled and interrupted. 267 * 268 * @param timeout when to time out the future 269 * @param scheduledExecutor The executor service to enforce the timeout. 270 * @since 33.4.0 (but since 28.0 in the JRE flavor) 271 */ 272 @J2ktIncompatible 273 @GwtIncompatible // ScheduledExecutorService 274 @SuppressWarnings("Java7ApiChecker") 275 @IgnoreJRERequirement // Users will use this only if they're already using Duration. 276 public final FluentFuture<V> withTimeout( 277 Duration timeout, ScheduledExecutorService scheduledExecutor) { 278 return withTimeout(toNanosSaturated(timeout), TimeUnit.NANOSECONDS, scheduledExecutor); 279 } 280 281 /** 282 * Returns a future that delegates to this future but will finish early (via a {@link 283 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires. 284 * If the timeout expires, not only will the output future finish, but also the input future 285 * ({@code this}) will be cancelled and interrupted. 286 * 287 * @param timeout when to time out the future 288 * @param unit the time unit of the time parameter 289 * @param scheduledExecutor The executor service to enforce the timeout. 290 */ 291 @J2ktIncompatible 292 @GwtIncompatible // ScheduledExecutorService 293 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 294 public final FluentFuture<V> withTimeout( 295 long timeout, TimeUnit unit, ScheduledExecutorService scheduledExecutor) { 296 return (FluentFuture<V>) Futures.withTimeout(this, timeout, unit, scheduledExecutor); 297 } 298 299 /** 300 * Returns a new {@code Future} whose result is asynchronously derived from the result of this 301 * {@code Future}. If the input {@code Future} fails, the returned {@code Future} fails with the 302 * same exception (and the function is not invoked). 303 * 304 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 305 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 306 * Example usage: 307 * 308 * <pre>{@code 309 * FluentFuture<RowKey> rowKeyFuture = FluentFuture.from(indexService.lookUp(query)); 310 * ListenableFuture<QueryResult> queryFuture = 311 * rowKeyFuture.transformAsync(dataService::readFuture, executor); 312 * }</pre> 313 * 314 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 315 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 316 * listeners are also applicable to heavyweight functions passed to this method. (Specifically, 317 * {@code directExecutor} functions should avoid heavyweight operations inside {@code 318 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 319 * completing the returned {@code Future}.) 320 * 321 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 322 * input future and that of the future returned by the chain function. That is, if the returned 323 * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the 324 * other two is cancelled, the returned {@code Future} will receive a callback in which it will 325 * attempt to cancel itself. 326 * 327 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenCompose} and 328 * {@link java.util.concurrent.CompletableFuture#thenComposeAsync}. It can also serve some of the 329 * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link 330 * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}. 331 * 332 * @param function A function to transform the result of this future to the result of the output 333 * future 334 * @param executor Executor to run the function in. 335 * @return A future that holds result of the function (if the input succeeded) or the original 336 * input's failure (if not) 337 */ 338 public final <T extends @Nullable Object> FluentFuture<T> transformAsync( 339 AsyncFunction<? super V, T> function, Executor executor) { 340 return (FluentFuture<T>) Futures.transformAsync(this, function, executor); 341 } 342 343 /** 344 * Returns a new {@code Future} whose result is derived from the result of this {@code Future}. If 345 * this input {@code Future} fails, the returned {@code Future} fails with the same exception (and 346 * the function is not invoked). Example usage: 347 * 348 * <pre>{@code 349 * ListenableFuture<List<Row>> rowsFuture = 350 * queryFuture.transform(QueryResult::getRows, executor); 351 * }</pre> 352 * 353 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 354 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 355 * listeners are also applicable to heavyweight functions passed to this method. 356 * 357 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 358 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 359 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 360 * in which it will attempt to cancel itself. 361 * 362 * <p>An example use of this method is to convert a serializable object returned from an RPC into 363 * a POJO. 364 * 365 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenApply} and 366 * {@link java.util.concurrent.CompletableFuture#thenApplyAsync}. It can also serve some of the 367 * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link 368 * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}. 369 * 370 * @param function A Function to transform the results of this future to the results of the 371 * returned future. 372 * @param executor Executor to run the function in. 373 * @return A future that holds result of the transformation. 374 */ 375 public final <T extends @Nullable Object> FluentFuture<T> transform( 376 Function<? super V, T> function, Executor executor) { 377 return (FluentFuture<T>) Futures.transform(this, function, executor); 378 } 379 380 /** 381 * Registers separate success and failure callbacks to be run when this {@code Future}'s 382 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 383 * computation is already complete, immediately. 384 * 385 * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of 386 * callbacks, but any callback added through this method is guaranteed to be called once the 387 * computation is complete. 388 * 389 * <p>Example: 390 * 391 * <pre>{@code 392 * future.addCallback( 393 * new FutureCallback<QueryResult>() { 394 * public void onSuccess(QueryResult result) { 395 * storeInCache(result); 396 * } 397 * public void onFailure(Throwable t) { 398 * reportError(t); 399 * } 400 * }, executor); 401 * }</pre> 402 * 403 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 404 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 405 * listeners are also applicable to heavyweight callbacks passed to this method. 406 * 407 * <p>For a more general interface to attach a completion listener, see {@link #addListener}. 408 * 409 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#whenComplete} and 410 * {@link java.util.concurrent.CompletableFuture#whenCompleteAsync}. It also serves the use case 411 * of {@link java.util.concurrent.CompletableFuture#thenAccept} and {@link 412 * java.util.concurrent.CompletableFuture#thenAcceptAsync}. 413 * 414 * @param callback The callback to invoke when this {@code Future} is completed. 415 * @param executor The executor to run {@code callback} when the future completes. 416 */ 417 public final void addCallback(FutureCallback<? super V> callback, Executor executor) { 418 Futures.addCallback(this, callback, executor); 419 } 420}