001/* 002 * Copyright (C) 2018 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Preconditions.checkState; 019import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED; 020import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN; 021import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED; 022import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 023import static com.google.common.util.concurrent.Futures.immediateFuture; 024import static com.google.common.util.concurrent.Futures.immediateVoidFuture; 025import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 026import static java.util.Objects.requireNonNull; 027 028import com.google.common.annotations.J2ktIncompatible; 029import com.google.errorprone.annotations.concurrent.LazyInit; 030import java.util.concurrent.Callable; 031import java.util.concurrent.Executor; 032import java.util.concurrent.atomic.AtomicReference; 033import javax.annotation.CheckForNull; 034import org.checkerframework.checker.nullness.qual.Nullable; 035 036/** 037 * Serializes execution of tasks, somewhat like an "asynchronous {@code synchronized} block." Each 038 * {@linkplain #submit enqueued} callable will not be submitted to its associated executor until the 039 * previous callable has returned -- and, if the previous callable was an {@link AsyncCallable}, not 040 * until the {@code Future} it returned is {@linkplain Future#isDone done} (successful, failed, or 041 * cancelled). 042 * 043 * <p>This class serializes execution of <i>submitted</i> tasks but not any <i>listeners</i> of 044 * those tasks. 045 * 046 * <p>Submitted tasks have a happens-before order as defined in the Java Language Specification. 047 * Tasks execute with the same happens-before order that the function calls to {@link #submit} and 048 * {@link #submitAsync} that submitted those tasks had. 049 * 050 * <p>This class has limited support for cancellation and other "early completions": 051 * 052 * <ul> 053 * <li>While calls to {@code submit} and {@code submitAsync} return a {@code Future} that can be 054 * cancelled, cancellation never propagates to a task that has started to run -- neither to 055 * the callable itself nor to any {@code Future} returned by an {@code AsyncCallable}. 056 * (However, cancellation can prevent an <i>unstarted</i> task from running.) Therefore, the 057 * next task will wait for any running callable (or pending {@code Future} returned by an 058 * {@code AsyncCallable}) to complete, without interrupting it (and without calling {@code 059 * cancel} on the {@code Future}). So beware: <i>Even if you cancel every preceding {@code 060 * Future} returned by this class, the next task may still have to wait.</i>. 061 * <li>Once an {@code AsyncCallable} returns a {@code Future}, this class considers that task to 062 * be "done" as soon as <i>that</i> {@code Future} completes in any way. Notably, a {@code 063 * Future} is "completed" even if it is cancelled while its underlying work continues on a 064 * thread, an RPC, etc. The {@code Future} is also "completed" if it fails "early" -- for 065 * example, if the deadline expires on a {@code Future} returned from {@link 066 * Futures#withTimeout} while the {@code Future} it wraps continues its underlying work. So 067 * beware: <i>Your {@code AsyncCallable} should not complete its {@code Future} until it is 068 * safe for the next task to start.</i> 069 * </ul> 070 * 071 * <p>This class is similar to {@link MoreExecutors#newSequentialExecutor}. This class is different 072 * in a few ways: 073 * 074 * <ul> 075 * <li>Each task may be associated with a different executor. 076 * <li>Tasks may be of type {@code AsyncCallable}. 077 * <li>Running tasks <i>cannot</i> be interrupted. (Note that {@code newSequentialExecutor} does 078 * not return {@code Future} objects, so it doesn't support interruption directly, either. 079 * However, utilities that <i>use</i> that executor have the ability to interrupt tasks 080 * running on it. This class, by contrast, does not expose an {@code Executor} API.) 081 * </ul> 082 * 083 * <p>If you don't need the features of this class, you may prefer {@code newSequentialExecutor} for 084 * its simplicity and ability to accommodate interruption. 085 * 086 * @since 26.0 087 */ 088@ElementTypesAreNonnullByDefault 089@J2ktIncompatible 090public final class ExecutionSequencer { 091 092 private ExecutionSequencer() {} 093 094 /** Creates a new instance. */ 095 public static ExecutionSequencer create() { 096 return new ExecutionSequencer(); 097 } 098 099 /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ 100 private final AtomicReference<ListenableFuture<@Nullable Void>> ref = 101 new AtomicReference<>(immediateVoidFuture()); 102 103 private @LazyInit ThreadConfinedTaskQueue latestTaskQueue = new ThreadConfinedTaskQueue(); 104 105 /** 106 * This object is unsafely published, but avoids problematic races by relying exclusively on the 107 * identity equality of its Thread field so that the task field is only accessed by a single 108 * thread. 109 */ 110 private static final class ThreadConfinedTaskQueue { 111 /** 112 * This field is only used for identity comparisons with the current thread. Field assignments 113 * are atomic, but do not provide happens-before ordering; however: 114 * 115 * <ul> 116 * <li>If this field's value == currentThread, we know that it's up to date, because write 117 * operations in a thread always happen-before subsequent read operations in the same 118 * thread 119 * <li>If this field's value == null because of unsafe publication, we know that it isn't the 120 * object associated with our thread, because if it was the publication wouldn't have been 121 * unsafe and we'd have seen our thread as the value. This state is also why a new 122 * ThreadConfinedTaskQueue object must be created for each inline execution, because 123 * observing a null thread does not mean the object is safe to reuse. 124 * <li>If this field's value is some other thread object, we know that it's not our thread. 125 * <li>If this field's value == null because it originally belonged to another thread and that 126 * thread cleared it, we still know that it's not associated with our thread 127 * <li>If this field's value == null because it was associated with our thread and was 128 * cleared, we know that we're not executing inline any more 129 * </ul> 130 * 131 * All the states where thread != currentThread are identical for our purposes, and so even 132 * though it's racy, we don't care which of those values we get, so no need to synchronize. 133 */ 134 @CheckForNull @LazyInit Thread thread; 135 136 /** Only used by the thread associated with this object */ 137 @CheckForNull Runnable nextTask; 138 139 /** Only used by the thread associated with this object */ 140 @CheckForNull Executor nextExecutor; 141 } 142 143 /** 144 * Enqueues a task to run when the previous task (if any) completes. 145 * 146 * <p>Cancellation does not propagate from the output future to a callable that has begun to 147 * execute, but if the output future is cancelled before {@link Callable#call()} is invoked, 148 * {@link Callable#call()} will not be invoked. 149 */ 150 public <T extends @Nullable Object> ListenableFuture<T> submit( 151 Callable<T> callable, Executor executor) { 152 checkNotNull(callable); 153 checkNotNull(executor); 154 return submitAsync( 155 new AsyncCallable<T>() { 156 @Override 157 public ListenableFuture<T> call() throws Exception { 158 return immediateFuture(callable.call()); 159 } 160 161 @Override 162 public String toString() { 163 return callable.toString(); 164 } 165 }, 166 executor); 167 } 168 169 /** 170 * Enqueues a task to run when the previous task (if any) completes. 171 * 172 * <p>Cancellation does not propagate from the output future to the future returned from {@code 173 * callable} or a callable that has begun to execute, but if the output future is cancelled before 174 * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked. 175 */ 176 public <T extends @Nullable Object> ListenableFuture<T> submitAsync( 177 AsyncCallable<T> callable, Executor executor) { 178 checkNotNull(callable); 179 checkNotNull(executor); 180 TaskNonReentrantExecutor taskExecutor = new TaskNonReentrantExecutor(executor, this); 181 AsyncCallable<T> task = 182 new AsyncCallable<T>() { 183 @Override 184 public ListenableFuture<T> call() throws Exception { 185 if (!taskExecutor.trySetStarted()) { 186 return immediateCancelledFuture(); 187 } 188 return callable.call(); 189 } 190 191 @Override 192 public String toString() { 193 return callable.toString(); 194 } 195 }; 196 /* 197 * Four futures are at play here: 198 * taskFuture is the future tracking the result of the callable. 199 * newFuture is a future that completes after this and all prior tasks are done. 200 * oldFuture is the previous task's newFuture. 201 * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture. 202 * 203 * newFuture is guaranteed to only complete once all tasks previously submitted to this instance 204 * have completed - namely after oldFuture is done, and taskFuture has either completed or been 205 * cancelled before the callable started execution. 206 */ 207 SettableFuture<@Nullable Void> newFuture = SettableFuture.create(); 208 209 ListenableFuture<@Nullable Void> oldFuture = ref.getAndSet(newFuture); 210 211 // Invoke our task once the previous future completes. 212 TrustedListenableFutureTask<T> taskFuture = TrustedListenableFutureTask.create(task); 213 oldFuture.addListener(taskFuture, taskExecutor); 214 215 ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture); 216 217 // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture 218 // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that 219 // if the future we return is cancelled, we don't begin execution of the next task until after 220 // oldFuture completes. 221 Runnable listener = 222 () -> { 223 if (taskFuture.isDone()) { 224 // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of 225 // a future that eventually came from immediateFuture(null), this doesn't leak 226 // throwables or completion values. 227 newFuture.setFuture(oldFuture); 228 } else if (outputFuture.isCancelled() && taskExecutor.trySetCancelled()) { 229 // If this CAS succeeds, we know that the provided callable will never be invoked, 230 // so when oldFuture completes it is safe to allow the next submitted task to 231 // proceed. Doing this immediately here lets the next task run without waiting for 232 // the cancelled task's executor to run the noop AsyncCallable. 233 // 234 // --- 235 // 236 // If the CAS fails, the provided callable already started running (or it is about 237 // to). Our contract promises: 238 // 239 // 1. not to execute a new callable until the old one has returned 240 // 241 // If we were to cancel taskFuture, that would let the next task start while the old 242 // one is still running. 243 // 244 // Now, maybe we could tweak our implementation to not start the next task until the 245 // callable actually completes. (We could detect completion in our wrapper 246 // `AsyncCallable task`.) However, our contract also promises: 247 // 248 // 2. not to cancel any Future the user returned from an AsyncCallable 249 // 250 // We promise this because, once we cancel that Future, we would no longer be able to 251 // tell when any underlying work it is doing is done. Thus, we might start a new task 252 // while that underlying work is still running. 253 // 254 // So that is why we cancel only in the case of CAS success. 255 taskFuture.cancel(false); 256 } 257 }; 258 // Adding the listener to both futures guarantees that newFuture will always be set. Adding to 259 // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture 260 // propagates cancellation if the callable has not yet been invoked. 261 outputFuture.addListener(listener, directExecutor()); 262 taskFuture.addListener(listener, directExecutor()); 263 264 return outputFuture; 265 } 266 267 enum RunningState { 268 NOT_RUN, 269 CANCELLED, 270 STARTED, 271 } 272 273 /** 274 * This class helps avoid a StackOverflowError when large numbers of tasks are submitted with 275 * {@link MoreExecutors#directExecutor}. Normally, when the first future completes, all the other 276 * tasks would be called recursively. Here, we detect that the delegate executor is executing 277 * inline, and maintain a queue to dispatch tasks iteratively. There is one instance of this class 278 * per call to submit() or submitAsync(), and each instance supports only one call to execute(). 279 * 280 * <p>This class would certainly be simpler and easier to reason about if it were built with 281 * ThreadLocal; however, ThreadLocal is not well optimized for the case where the ThreadLocal is 282 * non-static, and is initialized/removed frequently - this causes churn in the Thread specific 283 * hashmaps. Using a static ThreadLocal to avoid that overhead would mean that different 284 * ExecutionSequencer objects interfere with each other, which would be undesirable, in addition 285 * to increasing the memory footprint of every thread that interacted with it. In order to release 286 * entries in thread-specific maps when the ThreadLocal object itself is no longer referenced, 287 * ThreadLocal is usually implemented with a WeakReference, which can have negative performance 288 * properties; for example, calling WeakReference.get() on Android will block during an 289 * otherwise-concurrent GC cycle. 290 */ 291 private static final class TaskNonReentrantExecutor extends AtomicReference<RunningState> 292 implements Executor, Runnable { 293 294 /** 295 * Used to update and read the latestTaskQueue field. Set to null once the runnable has been run 296 * or queued. 297 */ 298 @CheckForNull ExecutionSequencer sequencer; 299 300 /** 301 * Executor the task was set to run on. Set to null when the task has been queued, run, or 302 * cancelled. 303 */ 304 @CheckForNull Executor delegate; 305 306 /** 307 * Set before calling delegate.execute(); set to null once run, so that it can be GCed; this 308 * object may live on after, if submitAsync returns an incomplete future. 309 */ 310 @CheckForNull Runnable task; 311 312 /** Thread that called execute(). Set in execute, cleared when delegate.execute() returns. */ 313 @CheckForNull @LazyInit Thread submitting; 314 315 private TaskNonReentrantExecutor(Executor delegate, ExecutionSequencer sequencer) { 316 super(NOT_RUN); 317 this.delegate = delegate; 318 this.sequencer = sequencer; 319 } 320 321 @Override 322 public void execute(Runnable task) { 323 // If this operation was successfully cancelled already, calling the runnable will be a noop. 324 // This also avoids a race where if outputFuture is cancelled, it will call taskFuture.cancel, 325 // which will call newFuture.setFuture(oldFuture), to allow the next task in the queue to run 326 // without waiting for the user's executor to run our submitted Runnable. However, this can 327 // interact poorly with the reentrancy-avoiding behavior of this executor - when the operation 328 // before the cancelled future completes, it will synchronously complete both the newFuture 329 // from the cancelled operation and its own. This can cause one runnable to queue two tasks, 330 // breaking the invariant this method relies on to iteratively run the next task after the 331 // previous one completes. 332 if (get() == RunningState.CANCELLED) { 333 delegate = null; 334 sequencer = null; 335 return; 336 } 337 submitting = Thread.currentThread(); 338 339 try { 340 /* 341 * requireNonNull is safe because we don't null out `sequencer` except: 342 * 343 * - above, where we return (in which case we never get here) 344 * 345 * - in `run`, which can't run until this Runnable is submitted to an executor, which 346 * doesn't happen until below. (And this Executor -- yes, the object is both a Runnable 347 * and an Executor -- is used for only a single `execute` call.) 348 */ 349 ThreadConfinedTaskQueue submittingTaskQueue = requireNonNull(sequencer).latestTaskQueue; 350 if (submittingTaskQueue.thread == submitting) { 351 sequencer = null; 352 // Submit from inside a reentrant submit. We don't know if this one will be reentrant (and 353 // can't know without submitting something to the executor) so queue to run iteratively. 354 // Task must be null, since each execution on this executor can only produce one more 355 // execution. 356 checkState(submittingTaskQueue.nextTask == null); 357 submittingTaskQueue.nextTask = task; 358 // requireNonNull(delegate) is safe for reasons similar to requireNonNull(sequencer). 359 submittingTaskQueue.nextExecutor = requireNonNull(delegate); 360 delegate = null; 361 } else { 362 // requireNonNull(delegate) is safe for reasons similar to requireNonNull(sequencer). 363 Executor localDelegate = requireNonNull(delegate); 364 delegate = null; 365 this.task = task; 366 localDelegate.execute(this); 367 } 368 } finally { 369 // Important to null this out here - if we did *not* execute inline, we might still 370 // run() on the same thread that called execute() - such as in a thread pool, and think 371 // that it was happening inline. As a side benefit, avoids holding on to the Thread object 372 // longer than necessary. 373 submitting = null; 374 } 375 } 376 377 @SuppressWarnings("ShortCircuitBoolean") 378 @Override 379 public void run() { 380 Thread currentThread = Thread.currentThread(); 381 if (currentThread != submitting) { 382 /* 383 * requireNonNull is safe because we set `task` before submitting this Runnable to an 384 * Executor, and we don't null it out until here. 385 */ 386 Runnable localTask = requireNonNull(task); 387 task = null; 388 localTask.run(); 389 return; 390 } 391 // Executor called reentrantly! Make sure that further calls don't overflow stack. Further 392 // reentrant calls will see that their current thread is the same as the one set in 393 // latestTaskQueue, and queue rather than calling execute() directly. 394 ThreadConfinedTaskQueue executingTaskQueue = new ThreadConfinedTaskQueue(); 395 executingTaskQueue.thread = currentThread; 396 /* 397 * requireNonNull is safe because we don't null out `sequencer` except: 398 * 399 * - after the requireNonNull call below. (And this object has its Runnable.run override 400 * called only once, just as it has its Executor.execute override called only once.) 401 * 402 * - if we return immediately from `execute` (in which case we never get here) 403 * 404 * - in the "reentrant submit" case of `execute` (in which case we must have started running a 405 * user task -- which means that we already got past this code (or else we exited early 406 * above)) 407 */ 408 // Unconditionally set; there is no risk of throwing away a queued task from another thread, 409 // because in order for the current task to run on this executor the previous task must have 410 // already started execution. Because each task on a TaskNonReentrantExecutor can only produce 411 // one execute() call to another instance from the same ExecutionSequencer, we know by 412 // induction that the task that launched this one must not have added any other runnables to 413 // that thread's queue, and thus we cannot be replacing a TaskAndThread object that would 414 // otherwise have another task queued on to it. Note the exception to this, cancellation, is 415 // specially handled in execute() - execute() calls triggered by cancellation are no-ops, and 416 // thus don't count. 417 requireNonNull(sequencer).latestTaskQueue = executingTaskQueue; 418 sequencer = null; 419 try { 420 // requireNonNull is safe, as discussed above. 421 Runnable localTask = requireNonNull(task); 422 task = null; 423 localTask.run(); 424 // Now check if our task attempted to reentrantly execute the next task. 425 Runnable queuedTask; 426 Executor queuedExecutor; 427 // Intentionally using non-short-circuit operator 428 while ((queuedTask = executingTaskQueue.nextTask) != null 429 && (queuedExecutor = executingTaskQueue.nextExecutor) != null) { 430 executingTaskQueue.nextTask = null; 431 executingTaskQueue.nextExecutor = null; 432 queuedExecutor.execute(queuedTask); 433 } 434 } finally { 435 // Null out the thread field, so that we don't leak a reference to Thread, and so that 436 // future `thread == currentThread()` calls from this thread don't incorrectly queue instead 437 // of executing. Don't null out the latestTaskQueue field, because the work done here 438 // may have scheduled more operations on another thread, and if those operations then 439 // trigger reentrant calls that thread will have updated the latestTaskQueue field, and 440 // we'd be interfering with their operation. 441 executingTaskQueue.thread = null; 442 } 443 } 444 445 private boolean trySetStarted() { 446 return compareAndSet(NOT_RUN, STARTED); 447 } 448 449 private boolean trySetCancelled() { 450 return compareAndSet(NOT_RUN, CANCELLED); 451 } 452 } 453}