001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.util.concurrent.Internal.toNanosSaturated; 019 020import com.google.common.annotations.GwtCompatible; 021import com.google.common.annotations.GwtIncompatible; 022import com.google.common.annotations.J2ktIncompatible; 023import com.google.common.base.Function; 024import com.google.errorprone.annotations.CanIgnoreReturnValue; 025import com.google.errorprone.annotations.DoNotMock; 026import java.time.Duration; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Executor; 029import java.util.concurrent.ScheduledExecutorService; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.TimeoutException; 032import org.checkerframework.checker.nullness.qual.Nullable; 033 034/** 035 * A {@link ListenableFuture} that supports fluent chains of operations. For example: 036 * 037 * <pre>{@code 038 * ListenableFuture<Boolean> adminIsLoggedIn = 039 * FluentFuture.from(usersDatabase.getAdminUser()) 040 * .transform(User::getId, directExecutor()) 041 * .transform(ActivityService::isLoggedIn, threadPool) 042 * .catching(RpcException.class, e -> false, directExecutor()); 043 * }</pre> 044 * 045 * <h3>Alternatives</h3> 046 * 047 * <h4>Frameworks</h4> 048 * 049 * <p>When chaining together a graph of asynchronous operations, you will often find it easier to 050 * use a framework. Frameworks automate the process, often adding features like monitoring, 051 * debugging, and cancellation. Examples of frameworks include: 052 * 053 * <ul> 054 * <li><a href="https://dagger.dev/producers.html">Dagger Producers</a> 055 * </ul> 056 * 057 * <h4>{@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage} 058 * </h4> 059 * 060 * <p>Users of {@code CompletableFuture} will likely want to continue using {@code 061 * CompletableFuture}. {@code FluentFuture} is targeted at people who use {@code ListenableFuture}, 062 * who can't use Java 8, or who want an API more focused than {@code CompletableFuture}. (If you 063 * need to adapt between {@code CompletableFuture} and {@code ListenableFuture}, consider <a 064 * href="https://github.com/lukas-krecan/future-converter">Future Converter</a>.) 065 * 066 * <h3>Extension</h3> 067 * 068 * If you want a class like {@code FluentFuture} but with extra methods, we recommend declaring your 069 * own subclass of {@link ListenableFuture}, complete with a method like {@link #from} to adapt an 070 * existing {@code ListenableFuture}, implemented atop a {@link ForwardingListenableFuture} that 071 * forwards to that future and adds the desired methods. 072 * 073 * @since 23.0 074 */ 075@DoNotMock("Use FluentFuture.from(Futures.immediate*Future) or SettableFuture") 076@GwtCompatible(emulated = true) 077public abstract class FluentFuture<V extends @Nullable Object> 078 extends GwtFluentFutureCatchingSpecialization<V> { 079 080 /** 081 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 082 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 083 */ 084 abstract static class TrustedFuture<V extends @Nullable Object> extends FluentFuture<V> 085 implements AbstractFuture.Trusted<V> { 086 @CanIgnoreReturnValue 087 @Override 088 @ParametricNullness 089 public final V get() throws InterruptedException, ExecutionException { 090 return super.get(); 091 } 092 093 @CanIgnoreReturnValue 094 @Override 095 @ParametricNullness 096 public final V get(long timeout, TimeUnit unit) 097 throws InterruptedException, ExecutionException, TimeoutException { 098 return super.get(timeout, unit); 099 } 100 101 @Override 102 public final boolean isDone() { 103 return super.isDone(); 104 } 105 106 @Override 107 public final boolean isCancelled() { 108 return super.isCancelled(); 109 } 110 111 @Override 112 public final void addListener(Runnable listener, Executor executor) { 113 super.addListener(listener, executor); 114 } 115 116 @CanIgnoreReturnValue 117 @Override 118 public final boolean cancel(boolean mayInterruptIfRunning) { 119 return super.cancel(mayInterruptIfRunning); 120 } 121 } 122 123 FluentFuture() {} 124 125 /** 126 * Converts the given {@code ListenableFuture} to an equivalent {@code FluentFuture}. 127 * 128 * <p>If the given {@code ListenableFuture} is already a {@code FluentFuture}, it is returned 129 * directly. If not, it is wrapped in a {@code FluentFuture} that delegates all calls to the 130 * original {@code ListenableFuture}. 131 */ 132 public static <V extends @Nullable Object> FluentFuture<V> from(ListenableFuture<V> future) { 133 return future instanceof FluentFuture 134 ? (FluentFuture<V>) future 135 : new ForwardingFluentFuture<V>(future); 136 } 137 138 /** 139 * Simply returns its argument. 140 * 141 * @deprecated no need to use this 142 * @since 28.0 143 */ 144 @Deprecated 145 public static <V extends @Nullable Object> FluentFuture<V> from(FluentFuture<V> future) { 146 return checkNotNull(future); 147 } 148 149 /** 150 * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code 151 * Future} fails with the given {@code exceptionType}, from the result provided by the {@code 152 * fallback}. {@link Function#apply} is not invoked until the primary input has failed, so if the 153 * primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, an 154 * exception is thrown, this exception is used as the result of the output {@code Future}. 155 * 156 * <p>Usage example: 157 * 158 * <pre>{@code 159 * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch 160 * // counters. 161 * ListenableFuture<Integer> faultTolerantFuture = 162 * fetchCounters().catching(FetchException.class, x -> 0, directExecutor()); 163 * }</pre> 164 * 165 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 166 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 167 * listeners are also applicable to heavyweight functions passed to this method. 168 * 169 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It 170 * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle} 171 * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link 172 * #transform}. 173 * 174 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 175 * type is matched against the input's exception. "The input's exception" means the cause of 176 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 177 * different kind of exception, that exception itself. To avoid hiding bugs and other 178 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 179 * Throwable.class} in particular. 180 * @param fallback the {@link Function} to be called if the input fails with the expected 181 * exception type. The function's argument is the input's exception. "The input's exception" 182 * means the cause of the {@link ExecutionException} thrown by {@code this.get()} or, if 183 * {@code get()} throws a different kind of exception, that exception itself. 184 * @param executor the executor that runs {@code fallback} if the input fails 185 */ 186 @J2ktIncompatible 187 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 188 public final <X extends Throwable> FluentFuture<V> catching( 189 Class<X> exceptionType, Function<? super X, ? extends V> fallback, Executor executor) { 190 return (FluentFuture<V>) Futures.catching(this, exceptionType, fallback, executor); 191 } 192 193 /** 194 * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code 195 * Future} fails with the given {@code exceptionType}, from the result provided by the {@code 196 * fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has failed, so if 197 * the primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, 198 * an exception is thrown, this exception is used as the result of the output {@code Future}. 199 * 200 * <p>Usage examples: 201 * 202 * <pre>{@code 203 * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch 204 * // counters. 205 * ListenableFuture<Integer> faultTolerantFuture = 206 * fetchCounters().catchingAsync( 207 * FetchException.class, x -> immediateFuture(0), directExecutor()); 208 * }</pre> 209 * 210 * <p>The fallback can also choose to propagate the original exception when desired: 211 * 212 * <pre>{@code 213 * // Falling back to a zero counter only in case the exception was a 214 * // TimeoutException. 215 * ListenableFuture<Integer> faultTolerantFuture = 216 * fetchCounters().catchingAsync( 217 * FetchException.class, 218 * e -> { 219 * if (omitDataOnFetchFailure) { 220 * return immediateFuture(0); 221 * } 222 * throw e; 223 * }, 224 * directExecutor()); 225 * }</pre> 226 * 227 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 228 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 229 * listeners are also applicable to heavyweight functions passed to this method. (Specifically, 230 * {@code directExecutor} functions should avoid heavyweight operations inside {@code 231 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 232 * completing the returned {@code Future}.) 233 * 234 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It 235 * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle} 236 * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link 237 * #transform}. 238 * 239 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 240 * type is matched against the input's exception. "The input's exception" means the cause of 241 * the {@link ExecutionException} thrown by {@code this.get()} or, if {@code get()} throws a 242 * different kind of exception, that exception itself. To avoid hiding bugs and other 243 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 244 * Throwable.class} in particular. 245 * @param fallback the {@link AsyncFunction} to be called if the input fails with the expected 246 * exception type. The function's argument is the input's exception. "The input's exception" 247 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 248 * {@code get()} throws a different kind of exception, that exception itself. 249 * @param executor the executor that runs {@code fallback} if the input fails 250 */ 251 @J2ktIncompatible 252 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 253 public final <X extends Throwable> FluentFuture<V> catchingAsync( 254 Class<X> exceptionType, AsyncFunction<? super X, ? extends V> fallback, Executor executor) { 255 return (FluentFuture<V>) Futures.catchingAsync(this, exceptionType, fallback, executor); 256 } 257 258 /** 259 * Returns a future that delegates to this future but will finish early (via a {@link 260 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires. 261 * If the timeout expires, not only will the output future finish, but also the input future 262 * ({@code this}) will be cancelled and interrupted. 263 * 264 * @param timeout when to time out the future 265 * @param scheduledExecutor The executor service to enforce the timeout. 266 * @since 33.4.0 (but since 28.0 in the JRE flavor) 267 */ 268 @J2ktIncompatible 269 @GwtIncompatible // ScheduledExecutorService 270 @SuppressWarnings("Java7ApiChecker") 271 @IgnoreJRERequirement // Users will use this only if they're already using Duration. 272 public final FluentFuture<V> withTimeout( 273 Duration timeout, ScheduledExecutorService scheduledExecutor) { 274 return withTimeout(toNanosSaturated(timeout), TimeUnit.NANOSECONDS, scheduledExecutor); 275 } 276 277 /** 278 * Returns a future that delegates to this future but will finish early (via a {@link 279 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires. 280 * If the timeout expires, not only will the output future finish, but also the input future 281 * ({@code this}) will be cancelled and interrupted. 282 * 283 * @param timeout when to time out the future 284 * @param unit the time unit of the time parameter 285 * @param scheduledExecutor The executor service to enforce the timeout. 286 */ 287 @J2ktIncompatible 288 @GwtIncompatible // ScheduledExecutorService 289 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 290 public final FluentFuture<V> withTimeout( 291 long timeout, TimeUnit unit, ScheduledExecutorService scheduledExecutor) { 292 return (FluentFuture<V>) Futures.withTimeout(this, timeout, unit, scheduledExecutor); 293 } 294 295 /** 296 * Returns a new {@code Future} whose result is asynchronously derived from the result of this 297 * {@code Future}. If the input {@code Future} fails, the returned {@code Future} fails with the 298 * same exception (and the function is not invoked). 299 * 300 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 301 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 302 * Example usage: 303 * 304 * <pre>{@code 305 * FluentFuture<RowKey> rowKeyFuture = FluentFuture.from(indexService.lookUp(query)); 306 * ListenableFuture<QueryResult> queryFuture = 307 * rowKeyFuture.transformAsync(dataService::readFuture, executor); 308 * }</pre> 309 * 310 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 311 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 312 * listeners are also applicable to heavyweight functions passed to this method. (Specifically, 313 * {@code directExecutor} functions should avoid heavyweight operations inside {@code 314 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 315 * completing the returned {@code Future}.) 316 * 317 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 318 * input future and that of the future returned by the chain function. That is, if the returned 319 * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the 320 * other two is cancelled, the returned {@code Future} will receive a callback in which it will 321 * attempt to cancel itself. 322 * 323 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenCompose} and 324 * {@link java.util.concurrent.CompletableFuture#thenComposeAsync}. It can also serve some of the 325 * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link 326 * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}. 327 * 328 * @param function A function to transform the result of this future to the result of the output 329 * future 330 * @param executor Executor to run the function in. 331 * @return A future that holds result of the function (if the input succeeded) or the original 332 * input's failure (if not) 333 */ 334 public final <T extends @Nullable Object> FluentFuture<T> transformAsync( 335 AsyncFunction<? super V, T> function, Executor executor) { 336 return (FluentFuture<T>) Futures.transformAsync(this, function, executor); 337 } 338 339 /** 340 * Returns a new {@code Future} whose result is derived from the result of this {@code Future}. If 341 * this input {@code Future} fails, the returned {@code Future} fails with the same exception (and 342 * the function is not invoked). Example usage: 343 * 344 * <pre>{@code 345 * ListenableFuture<List<Row>> rowsFuture = 346 * queryFuture.transform(QueryResult::getRows, executor); 347 * }</pre> 348 * 349 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 350 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 351 * listeners are also applicable to heavyweight functions passed to this method. 352 * 353 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 354 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 355 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 356 * in which it will attempt to cancel itself. 357 * 358 * <p>An example use of this method is to convert a serializable object returned from an RPC into 359 * a POJO. 360 * 361 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenApply} and 362 * {@link java.util.concurrent.CompletableFuture#thenApplyAsync}. It can also serve some of the 363 * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link 364 * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}. 365 * 366 * @param function A Function to transform the results of this future to the results of the 367 * returned future. 368 * @param executor Executor to run the function in. 369 * @return A future that holds result of the transformation. 370 */ 371 public final <T extends @Nullable Object> FluentFuture<T> transform( 372 Function<? super V, T> function, Executor executor) { 373 return (FluentFuture<T>) Futures.transform(this, function, executor); 374 } 375 376 /** 377 * Registers separate success and failure callbacks to be run when this {@code Future}'s 378 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 379 * computation is already complete, immediately. 380 * 381 * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of 382 * callbacks, but any callback added through this method is guaranteed to be called once the 383 * computation is complete. 384 * 385 * <p>Example: 386 * 387 * <pre>{@code 388 * future.addCallback( 389 * new FutureCallback<QueryResult>() { 390 * public void onSuccess(QueryResult result) { 391 * storeInCache(result); 392 * } 393 * public void onFailure(Throwable t) { 394 * reportError(t); 395 * } 396 * }, executor); 397 * }</pre> 398 * 399 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 400 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 401 * listeners are also applicable to heavyweight callbacks passed to this method. 402 * 403 * <p>For a more general interface to attach a completion listener, see {@link #addListener}. 404 * 405 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#whenComplete} and 406 * {@link java.util.concurrent.CompletableFuture#whenCompleteAsync}. It also serves the use case 407 * of {@link java.util.concurrent.CompletableFuture#thenAccept} and {@link 408 * java.util.concurrent.CompletableFuture#thenAcceptAsync}. 409 * 410 * @param callback The callback to invoke when this {@code Future} is completed. 411 * @param executor The executor to run {@code callback} when the future completes. 412 */ 413 public final void addCallback(FutureCallback<? super V> callback, Executor executor) { 414 Futures.addCallback(this, callback, executor); 415 } 416}