001/* 002 * Copyright (C) 2006 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.util.concurrent.Internal.toNanosSaturated; 019 020import com.google.common.annotations.Beta; 021import com.google.common.annotations.GwtCompatible; 022import com.google.common.annotations.GwtIncompatible; 023import com.google.common.base.Function; 024import com.google.errorprone.annotations.CanIgnoreReturnValue; 025import com.google.errorprone.annotations.DoNotMock; 026import java.time.Duration; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Executor; 029import java.util.concurrent.ScheduledExecutorService; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.TimeoutException; 032 033/** 034 * A {@link ListenableFuture} that supports fluent chains of operations. For example: 035 * 036 * <pre>{@code 037 * ListenableFuture<Boolean> adminIsLoggedIn = 038 * FluentFuture.from(usersDatabase.getAdminUser()) 039 * .transform(User::getId, directExecutor()) 040 * .transform(ActivityService::isLoggedIn, threadPool) 041 * .catching(RpcException.class, e -> false, directExecutor()); 042 * }</pre> 043 * 044 * <h3>Alternatives</h3> 045 * 046 * <h4>Frameworks</h4> 047 * 048 * <p>When chaining together a graph of asynchronous operations, you will often find it easier to 049 * use a framework. Frameworks automate the process, often adding features like monitoring, 050 * debugging, and cancellation. Examples of frameworks include: 051 * 052 * <ul> 053 * <li><a href="https://dagger.dev/producers.html">Dagger Producers</a> 054 * </ul> 055 * 056 * <h4>{@link java.util.concurrent.CompletableFuture} / {@link java.util.concurrent.CompletionStage} 057 * </h4> 058 * 059 * <p>Users of {@code CompletableFuture} will likely want to continue using {@code 060 * CompletableFuture}. {@code FluentFuture} is targeted at people who use {@code ListenableFuture}, 061 * who can't use Java 8, or who want an API more focused than {@code CompletableFuture}. (If you 062 * need to adapt between {@code CompletableFuture} and {@code ListenableFuture}, consider <a 063 * href="https://github.com/lukas-krecan/future-converter">Future Converter</a>.) 064 * 065 * <h3>Extension</h3> 066 * 067 * If you want a class like {@code FluentFuture} but with extra methods, we recommend declaring your 068 * own subclass of {@link ListenableFuture}, complete with a method like {@link #from} to adapt an 069 * existing {@code ListenableFuture}, implemented atop a {@link ForwardingListenableFuture} that 070 * forwards to that future and adds the desired methods. 071 * 072 * @since 23.0 073 */ 074@Beta 075@DoNotMock("Use FluentFuture.from(Futures.immediate*Future) or SettableFuture") 076@GwtCompatible(emulated = true) 077public abstract class FluentFuture<V> extends GwtFluentFutureCatchingSpecialization<V> { 078 079 /** 080 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 081 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 082 */ 083 abstract static class TrustedFuture<V> extends FluentFuture<V> 084 implements AbstractFuture.Trusted<V> { 085 @CanIgnoreReturnValue 086 @Override 087 public final V get() throws InterruptedException, ExecutionException { 088 return super.get(); 089 } 090 091 @CanIgnoreReturnValue 092 @Override 093 public final V get(long timeout, TimeUnit unit) 094 throws InterruptedException, ExecutionException, TimeoutException { 095 return super.get(timeout, unit); 096 } 097 098 @Override 099 public final boolean isDone() { 100 return super.isDone(); 101 } 102 103 @Override 104 public final boolean isCancelled() { 105 return super.isCancelled(); 106 } 107 108 @Override 109 public final void addListener(Runnable listener, Executor executor) { 110 super.addListener(listener, executor); 111 } 112 113 @CanIgnoreReturnValue 114 @Override 115 public final boolean cancel(boolean mayInterruptIfRunning) { 116 return super.cancel(mayInterruptIfRunning); 117 } 118 } 119 120 FluentFuture() {} 121 122 /** 123 * Converts the given {@code ListenableFuture} to an equivalent {@code FluentFuture}. 124 * 125 * <p>If the given {@code ListenableFuture} is already a {@code FluentFuture}, it is returned 126 * directly. If not, it is wrapped in a {@code FluentFuture} that delegates all calls to the 127 * original {@code ListenableFuture}. 128 */ 129 public static <V> FluentFuture<V> from(ListenableFuture<V> future) { 130 return future instanceof FluentFuture 131 ? (FluentFuture<V>) future 132 : new ForwardingFluentFuture<V>(future); 133 } 134 135 /** 136 * Simply returns its argument. 137 * 138 * @deprecated no need to use this 139 * @since 28.0 140 */ 141 @Deprecated 142 public static <V> FluentFuture<V> from(FluentFuture<V> future) { 143 return checkNotNull(future); 144 } 145 146 /** 147 * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code 148 * Future} fails with the given {@code exceptionType}, from the result provided by the {@code 149 * fallback}. {@link Function#apply} is not invoked until the primary input has failed, so if the 150 * primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, an 151 * exception is thrown, this exception is used as the result of the output {@code Future}. 152 * 153 * <p>Usage example: 154 * 155 * <pre>{@code 156 * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch 157 * // counters. 158 * ListenableFuture<Integer> faultTolerantFuture = 159 * fetchCounters().catching(FetchException.class, x -> 0, directExecutor()); 160 * }</pre> 161 * 162 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 163 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 164 * listeners are also applicable to heavyweight functions passed to this method. 165 * 166 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It 167 * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle} 168 * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link 169 * #transform}. 170 * 171 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 172 * type is matched against the input's exception. "The input's exception" means the cause of 173 * the {@link ExecutionException} thrown by {@code input.get()} or, if {@code get()} throws a 174 * different kind of exception, that exception itself. To avoid hiding bugs and other 175 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 176 * Throwable.class} in particular. 177 * @param fallback the {@link Function} to be called if the input fails with the expected 178 * exception type. The function's argument is the input's exception. "The input's exception" 179 * means the cause of the {@link ExecutionException} thrown by {@code this.get()} or, if 180 * {@code get()} throws a different kind of exception, that exception itself. 181 * @param executor the executor that runs {@code fallback} if the input fails 182 */ 183 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 184 public final <X extends Throwable> FluentFuture<V> catching( 185 Class<X> exceptionType, Function<? super X, ? extends V> fallback, Executor executor) { 186 return (FluentFuture<V>) Futures.catching(this, exceptionType, fallback, executor); 187 } 188 189 /** 190 * Returns a {@code Future} whose result is taken from this {@code Future} or, if this {@code 191 * Future} fails with the given {@code exceptionType}, from the result provided by the {@code 192 * fallback}. {@link AsyncFunction#apply} is not invoked until the primary input has failed, so if 193 * the primary input succeeds, it is never invoked. If, during the invocation of {@code fallback}, 194 * an exception is thrown, this exception is used as the result of the output {@code Future}. 195 * 196 * <p>Usage examples: 197 * 198 * <pre>{@code 199 * // Falling back to a zero counter in case an exception happens when processing the RPC to fetch 200 * // counters. 201 * ListenableFuture<Integer> faultTolerantFuture = 202 * fetchCounters().catchingAsync( 203 * FetchException.class, x -> immediateFuture(0), directExecutor()); 204 * }</pre> 205 * 206 * <p>The fallback can also choose to propagate the original exception when desired: 207 * 208 * <pre>{@code 209 * // Falling back to a zero counter only in case the exception was a 210 * // TimeoutException. 211 * ListenableFuture<Integer> faultTolerantFuture = 212 * fetchCounters().catchingAsync( 213 * FetchException.class, 214 * e -> { 215 * if (omitDataOnFetchFailure) { 216 * return immediateFuture(0); 217 * } 218 * throw e; 219 * }, 220 * directExecutor()); 221 * }</pre> 222 * 223 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 224 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 225 * listeners are also applicable to heavyweight functions passed to this method. (Specifically, 226 * {@code directExecutor} functions should avoid heavyweight operations inside {@code 227 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 228 * completing the returned {@code Future}.) 229 * 230 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#exceptionally}. It 231 * can also serve some of the use cases of {@link java.util.concurrent.CompletableFuture#handle} 232 * and {@link java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link 233 * #transform}. 234 * 235 * @param exceptionType the exception type that triggers use of {@code fallback}. The exception 236 * type is matched against the input's exception. "The input's exception" means the cause of 237 * the {@link ExecutionException} thrown by {@code this.get()} or, if {@code get()} throws a 238 * different kind of exception, that exception itself. To avoid hiding bugs and other 239 * unrecoverable errors, callers should prefer more specific types, avoiding {@code 240 * Throwable.class} in particular. 241 * @param fallback the {@link AsyncFunction} to be called if the input fails with the expected 242 * exception type. The function's argument is the input's exception. "The input's exception" 243 * means the cause of the {@link ExecutionException} thrown by {@code input.get()} or, if 244 * {@code get()} throws a different kind of exception, that exception itself. 245 * @param executor the executor that runs {@code fallback} if the input fails 246 */ 247 @Partially.GwtIncompatible("AVAILABLE but requires exceptionType to be Throwable.class") 248 public final <X extends Throwable> FluentFuture<V> catchingAsync( 249 Class<X> exceptionType, AsyncFunction<? super X, ? extends V> fallback, Executor executor) { 250 return (FluentFuture<V>) Futures.catchingAsync(this, exceptionType, fallback, executor); 251 } 252 253 /** 254 * Returns a future that delegates to this future but will finish early (via a {@link 255 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires. 256 * If the timeout expires, not only will the output future finish, but also the input future 257 * ({@code this}) will be cancelled and interrupted. 258 * 259 * @param timeout when to time out the future 260 * @param scheduledExecutor The executor service to enforce the timeout. 261 * @since 28.0 262 */ 263 @GwtIncompatible // ScheduledExecutorService 264 public final FluentFuture<V> withTimeout( 265 Duration timeout, ScheduledExecutorService scheduledExecutor) { 266 return withTimeout(toNanosSaturated(timeout), TimeUnit.NANOSECONDS, scheduledExecutor); 267 } 268 269 /** 270 * Returns a future that delegates to this future but will finish early (via a {@link 271 * TimeoutException} wrapped in an {@link ExecutionException}) if the specified timeout expires. 272 * If the timeout expires, not only will the output future finish, but also the input future 273 * ({@code this}) will be cancelled and interrupted. 274 * 275 * @param timeout when to time out the future 276 * @param unit the time unit of the time parameter 277 * @param scheduledExecutor The executor service to enforce the timeout. 278 */ 279 @GwtIncompatible // ScheduledExecutorService 280 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 281 public final FluentFuture<V> withTimeout( 282 long timeout, TimeUnit unit, ScheduledExecutorService scheduledExecutor) { 283 return (FluentFuture<V>) Futures.withTimeout(this, timeout, unit, scheduledExecutor); 284 } 285 286 /** 287 * Returns a new {@code Future} whose result is asynchronously derived from the result of this 288 * {@code Future}. If the input {@code Future} fails, the returned {@code Future} fails with the 289 * same exception (and the function is not invoked). 290 * 291 * <p>More precisely, the returned {@code Future} takes its result from a {@code Future} produced 292 * by applying the given {@code AsyncFunction} to the result of the original {@code Future}. 293 * Example usage: 294 * 295 * <pre>{@code 296 * FluentFuture<RowKey> rowKeyFuture = FluentFuture.from(indexService.lookUp(query)); 297 * ListenableFuture<QueryResult> queryFuture = 298 * rowKeyFuture.transformAsync(dataService::readFuture, executor); 299 * }</pre> 300 * 301 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 302 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 303 * listeners are also applicable to heavyweight functions passed to this method. (Specifically, 304 * {@code directExecutor} functions should avoid heavyweight operations inside {@code 305 * AsyncFunction.apply}. Any heavyweight operations should occur in other threads responsible for 306 * completing the returned {@code Future}.) 307 * 308 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 309 * input future and that of the future returned by the chain function. That is, if the returned 310 * {@code Future} is cancelled, it will attempt to cancel the other two, and if either of the 311 * other two is cancelled, the returned {@code Future} will receive a callback in which it will 312 * attempt to cancel itself. 313 * 314 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenCompose} and 315 * {@link java.util.concurrent.CompletableFuture#thenComposeAsync}. It can also serve some of the 316 * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link 317 * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}. 318 * 319 * @param function A function to transform the result of this future to the result of the output 320 * future 321 * @param executor Executor to run the function in. 322 * @return A future that holds result of the function (if the input succeeded) or the original 323 * input's failure (if not) 324 */ 325 public final <T> FluentFuture<T> transformAsync( 326 AsyncFunction<? super V, T> function, Executor executor) { 327 return (FluentFuture<T>) Futures.transformAsync(this, function, executor); 328 } 329 330 /** 331 * Returns a new {@code Future} whose result is derived from the result of this {@code Future}. If 332 * this input {@code Future} fails, the returned {@code Future} fails with the same exception (and 333 * the function is not invoked). Example usage: 334 * 335 * <pre>{@code 336 * ListenableFuture<List<Row>> rowsFuture = 337 * queryFuture.transform(QueryResult::getRows, executor); 338 * }</pre> 339 * 340 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 341 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 342 * listeners are also applicable to heavyweight functions passed to this method. 343 * 344 * <p>The returned {@code Future} attempts to keep its cancellation state in sync with that of the 345 * input future. That is, if the returned {@code Future} is cancelled, it will attempt to cancel 346 * the input, and if the input is cancelled, the returned {@code Future} will receive a callback 347 * in which it will attempt to cancel itself. 348 * 349 * <p>An example use of this method is to convert a serializable object returned from an RPC into 350 * a POJO. 351 * 352 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#thenApply} and 353 * {@link java.util.concurrent.CompletableFuture#thenApplyAsync}. It can also serve some of the 354 * use cases of {@link java.util.concurrent.CompletableFuture#handle} and {@link 355 * java.util.concurrent.CompletableFuture#handleAsync} when used along with {@link #catching}. 356 * 357 * @param function A Function to transform the results of this future to the results of the 358 * returned future. 359 * @param executor Executor to run the function in. 360 * @return A future that holds result of the transformation. 361 */ 362 public final <T> FluentFuture<T> transform(Function<? super V, T> function, Executor executor) { 363 return (FluentFuture<T>) Futures.transform(this, function, executor); 364 } 365 366 /** 367 * Registers separate success and failure callbacks to be run when this {@code Future}'s 368 * computation is {@linkplain java.util.concurrent.Future#isDone() complete} or, if the 369 * computation is already complete, immediately. 370 * 371 * <p>The callback is run on {@code executor}. There is no guaranteed ordering of execution of 372 * callbacks, but any callback added through this method is guaranteed to be called once the 373 * computation is complete. 374 * 375 * <p>Example: 376 * 377 * <pre>{@code 378 * future.addCallback( 379 * new FutureCallback<QueryResult>() { 380 * public void onSuccess(QueryResult result) { 381 * storeInCache(result); 382 * } 383 * public void onFailure(Throwable t) { 384 * reportError(t); 385 * } 386 * }, executor); 387 * }</pre> 388 * 389 * <p>When selecting an executor, note that {@code directExecutor} is dangerous in some cases. See 390 * the discussion in the {@link #addListener} documentation. All its warnings about heavyweight 391 * listeners are also applicable to heavyweight callbacks passed to this method. 392 * 393 * <p>For a more general interface to attach a completion listener, see {@link #addListener}. 394 * 395 * <p>This method is similar to {@link java.util.concurrent.CompletableFuture#whenComplete} and 396 * {@link java.util.concurrent.CompletableFuture#whenCompleteAsync}. It also serves the use case 397 * of {@link java.util.concurrent.CompletableFuture#thenAccept} and {@link 398 * java.util.concurrent.CompletableFuture#thenAcceptAsync}. 399 * 400 * @param callback The callback to invoke when this {@code Future} is completed. 401 * @param executor The executor to run {@code callback} when the future completes. 402 */ 403 public final void addCallback(FutureCallback<? super V> callback, Executor executor) { 404 Futures.addCallback(this, callback, executor); 405 } 406}