001/* 002 * Copyright (C) 2018 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Preconditions.checkState; 019import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED; 020import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN; 021import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED; 022import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 023import static com.google.common.util.concurrent.Futures.immediateFuture; 024import static com.google.common.util.concurrent.Futures.immediateVoidFuture; 025import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 026 027import com.google.common.annotations.Beta; 028import java.util.concurrent.Callable; 029import java.util.concurrent.Executor; 030import java.util.concurrent.atomic.AtomicReference; 031 032/** 033 * Serializes execution of tasks, somewhat like an "asynchronous {@code synchronized} block." Each 034 * {@linkplain #submit enqueued} callable will not be submitted to its associated executor until the 035 * previous callable has returned -- and, if the previous callable was an {@link AsyncCallable}, not 036 * until the {@code Future} it returned is {@linkplain Future#isDone done} (successful, failed, or 037 * cancelled). 038 * 039 * <p>This class has limited support for cancellation and other "early completion": 040 * 041 * <ul> 042 * <li>While calls to {@code submit} and {@code submitAsync} return a {@code Future} that can be 043 * cancelled, cancellation never propagates to a task that has started to run -- neither to 044 * the callable itself nor to any {@code Future} returned by an {@code AsyncCallable}. 045 * (However, cancellation can prevent an <i>unstarted</i> task from running.) Therefore, the 046 * next task will wait for any running callable (or pending {@code Future} returned by an 047 * {@code AsyncCallable}) to complete, without interrupting it (and without calling {@code 048 * cancel} on the {@code Future}). So beware: <i>Even if you cancel every precededing {@code 049 * Future} returned by this class, the next task may still have to wait.</i>. 050 * <li>Once an {@code AsyncCallable} returns a {@code Future}, this class considers that task to 051 * be "done" as soon as <i>that</i> {@code Future} completes in any way. Notably, a {@code 052 * Future} is "completed" even if it is cancelled while its underlying work continues on a 053 * thread, an RPC, etc. The {@code Future} is also "completed" if it fails "early" -- for 054 * example, if the deadline expires on a {@code Future} returned from {@link 055 * Futures#withTimeout} while the {@code Future} it wraps continues its underlying work. So 056 * beware: <i>Your {@code AsyncCallable} should not complete its {@code Future} until it is 057 * safe for the next task to start.</i> 058 * </ul> 059 * 060 * <p>An additional limitation: this class serializes execution of <i>tasks</i> but not any 061 * <i>listeners</i> of those tasks. 062 * 063 * <p>This class is similar to {@link MoreExecutors#newSequentialExecutor}. This class is different 064 * in a few ways: 065 * 066 * <ul> 067 * <li>Each task may be associated with a different executor. 068 * <li>Tasks may be of type {@code AsyncCallable}. 069 * <li>Running tasks <i>cannot</i> be interrupted. (Note that {@code newSequentialExecutor} does 070 * not return {@code Future} objects, so it doesn't support interruption directly, either. 071 * However, utilities that <i>use</i> that executor have the ability to interrupt tasks 072 * running on it. This class, by contrast, does not expose an {@code Executor} API.) 073 * </ul> 074 * 075 * <p>If you don't need the features of this class, you may prefer {@code newSequentialExecutor} for 076 * its simplicity and ability to accommodate interruption. 077 * 078 * @since 26.0 079 */ 080@Beta 081public final class ExecutionSequencer { 082 083 private ExecutionSequencer() {} 084 085 /** Creates a new instance. */ 086 public static ExecutionSequencer create() { 087 return new ExecutionSequencer(); 088 } 089 090 /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ 091 private final AtomicReference<ListenableFuture<Void>> ref = 092 new AtomicReference<>(immediateVoidFuture()); 093 094 private ThreadConfinedTaskQueue latestTaskQueue = new ThreadConfinedTaskQueue(); 095 096 /** 097 * This object is unsafely published, but avoids problematic races by relying exclusively on the 098 * identity equality of its Thread field so that the task field is only accessed by a single 099 * thread. 100 */ 101 private static final class ThreadConfinedTaskQueue { 102 /** 103 * This field is only used for identity comparisons with the current thread. Field assignments 104 * are atomic, but do not provide happens-before ordering; however: 105 * 106 * <ul> 107 * <li>If this field's value == currentThread, we know that it's up to date, because write 108 * operations in a thread always happen-before subsequent read operations in the same 109 * thread 110 * <li>If this field's value == null because of unsafe publication, we know that it isn't the 111 * object associated with our thread, because if it was the publication wouldn't have been 112 * unsafe and we'd have seen our thread as the value. This state is also why a new 113 * ThreadConfinedTaskQueue object must be created for each inline execution, because 114 * observing a null thread does not mean the object is safe to reuse. 115 * <li>If this field's value is some other thread object, we know that it's not our thread. 116 * <li>If this field's value == null because it originally belonged to another thread and that 117 * thread cleared it, we still know that it's not associated with our thread 118 * <li>If this field's value == null because it was associated with our thread and was 119 * cleared, we know that we're not executing inline any more 120 * </ul> 121 * 122 * All the states where thread != currentThread are identical for our purposes, and so even 123 * though it's racy, we don't care which of those values we get, so no need to synchronize. 124 */ 125 Thread thread; 126 /** Only used by the thread associated with this object */ 127 Runnable nextTask; 128 /** Only used by the thread associated with this object */ 129 Executor nextExecutor; 130 } 131 132 /** 133 * Enqueues a task to run when the previous task (if any) completes. 134 * 135 * <p>Cancellation does not propagate from the output future to a callable that has begun to 136 * execute, but if the output future is cancelled before {@link Callable#call()} is invoked, 137 * {@link Callable#call()} will not be invoked. 138 */ 139 public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) { 140 checkNotNull(callable); 141 checkNotNull(executor); 142 return submitAsync( 143 new AsyncCallable<T>() { 144 @Override 145 public ListenableFuture<T> call() throws Exception { 146 return immediateFuture(callable.call()); 147 } 148 149 @Override 150 public String toString() { 151 return callable.toString(); 152 } 153 }, 154 executor); 155 } 156 157 /** 158 * Enqueues a task to run when the previous task (if any) completes. 159 * 160 * <p>Cancellation does not propagate from the output future to the future returned from {@code 161 * callable} or a callable that has begun to execute, but if the output future is cancelled before 162 * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked. 163 */ 164 public <T> ListenableFuture<T> submitAsync( 165 final AsyncCallable<T> callable, final Executor executor) { 166 checkNotNull(callable); 167 checkNotNull(executor); 168 final TaskNonReentrantExecutor taskExecutor = new TaskNonReentrantExecutor(executor, this); 169 final AsyncCallable<T> task = 170 new AsyncCallable<T>() { 171 @Override 172 public ListenableFuture<T> call() throws Exception { 173 if (!taskExecutor.trySetStarted()) { 174 return immediateCancelledFuture(); 175 } 176 return callable.call(); 177 } 178 179 @Override 180 public String toString() { 181 return callable.toString(); 182 } 183 }; 184 /* 185 * Four futures are at play here: 186 * taskFuture is the future tracking the result of the callable. 187 * newFuture is a future that completes after this and all prior tasks are done. 188 * oldFuture is the previous task's newFuture. 189 * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture. 190 * 191 * newFuture is guaranteed to only complete once all tasks previously submitted to this instance 192 * have completed - namely after oldFuture is done, and taskFuture has either completed or been 193 * cancelled before the callable started execution. 194 */ 195 final SettableFuture<Void> newFuture = SettableFuture.create(); 196 197 final ListenableFuture<Void> oldFuture = ref.getAndSet(newFuture); 198 199 // Invoke our task once the previous future completes. 200 final TrustedListenableFutureTask<T> taskFuture = TrustedListenableFutureTask.create(task); 201 oldFuture.addListener(taskFuture, taskExecutor); 202 203 final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture); 204 205 // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture 206 // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that 207 // if the future we return is cancelled, we don't begin execution of the next task until after 208 // oldFuture completes. 209 Runnable listener = 210 new Runnable() { 211 @Override 212 public void run() { 213 if (taskFuture.isDone()) { 214 // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of 215 // a future that eventually came from immediateFuture(null), this doesn't leak 216 // throwables or completion values. 217 newFuture.setFuture(oldFuture); 218 } else if (outputFuture.isCancelled() && taskExecutor.trySetCancelled()) { 219 // If this CAS succeeds, we know that the provided callable will never be invoked, 220 // so when oldFuture completes it is safe to allow the next submitted task to 221 // proceed. Doing this immediately here lets the next task run without waiting for 222 // the cancelled task's executor to run the noop AsyncCallable. 223 // 224 // --- 225 // 226 // If the CAS fails, the provided callable already started running (or it is about 227 // to). Our contract promises: 228 // 229 // 1. not to execute a new callable until the old one has returned 230 // 231 // If we were to cancel taskFuture, that would let the next task start while the old 232 // one is still running. 233 // 234 // Now, maybe we could tweak our implementation to not start the next task until the 235 // callable actually completes. (We could detect completion in our wrapper 236 // `AsyncCallable task`.) However, our contract also promises: 237 // 238 // 2. not to cancel any Future the user returned from an AsyncCallable 239 // 240 // We promise this because, once we cancel that Future, we would no longer be able to 241 // tell when any underlying work it is doing is done. Thus, we might start a new task 242 // while that underlying work is still running. 243 // 244 // So that is why we cancel only in the case of CAS success. 245 taskFuture.cancel(false); 246 } 247 } 248 }; 249 // Adding the listener to both futures guarantees that newFuture will aways be set. Adding to 250 // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture 251 // propagates cancellation if the callable has not yet been invoked. 252 outputFuture.addListener(listener, directExecutor()); 253 taskFuture.addListener(listener, directExecutor()); 254 255 return outputFuture; 256 } 257 258 enum RunningState { 259 NOT_RUN, 260 CANCELLED, 261 STARTED, 262 } 263 264 /** 265 * This class helps avoid a StackOverflowError when large numbers of tasks are submitted with 266 * {@link MoreExecutors#directExecutor}. Normally, when the first future completes, all the other 267 * tasks would be called recursively. Here, we detect that the delegate executor is executing 268 * inline, and maintain a queue to dispatch tasks iteratively. There is one instance of this class 269 * per call to submit() or submitAsync(), and each instance supports only one call to execute(). 270 * 271 * <p>This class would certainly be simpler and easier to reason about if it were built with 272 * ThreadLocal; however, ThreadLocal is not well optimized for the case where the ThreadLocal is 273 * non-static, and is initialized/removed frequently - this causes churn in the Thread specific 274 * hashmaps. Using a static ThreadLocal to avoid that overhead would mean that different 275 * ExecutionSequencer objects interfere with each other, which would be undesirable, in addition 276 * to increasing the memory footprint of every thread that interacted with it. In order to release 277 * entries in thread-specific maps when the ThreadLocal object itself is no longer referenced, 278 * ThreadLocal is usually implemented with a WeakReference, which can have negative performance 279 * properties; for example, calling WeakReference.get() on Android will block during an 280 * otherwise-concurrent GC cycle. 281 */ 282 @SuppressWarnings("ShouldNotSubclass") // Saving an allocation here is worth it 283 private static final class TaskNonReentrantExecutor extends AtomicReference<RunningState> 284 implements Executor, Runnable { 285 286 /** 287 * Used to update and read the latestTaskQueue field. Set to null once the runnable has been run 288 * or queued. 289 */ 290 ExecutionSequencer sequencer; 291 292 /** 293 * Executor the task was set to run on. Set to null when the task has been queued, run, or 294 * cancelled. 295 */ 296 Executor delegate; 297 298 /** 299 * Set before calling delegate.execute(); set to null once run, so that it can be GCed; this 300 * object may live on after, if submitAsync returns an incomplete future. 301 */ 302 Runnable task; 303 304 /** Thread that called execute(). Set in execute, cleared when delegate.execute() returns. */ 305 Thread submitting; 306 307 private TaskNonReentrantExecutor(Executor delegate, ExecutionSequencer sequencer) { 308 super(NOT_RUN); 309 this.delegate = delegate; 310 this.sequencer = sequencer; 311 } 312 313 @Override 314 public void execute(Runnable task) { 315 // If this operation was successfully cancelled already, calling the runnable will be a noop. 316 // This also avoids a race where if outputFuture is cancelled, it will call taskFuture.cancel, 317 // which will call newFuture.setFuture(oldFuture), to allow the next task in the queue to run 318 // without waiting for the user's executor to run our submitted Runnable. However, this can 319 // interact poorly with the reentrancy-avoiding behavior of this executor - when the operation 320 // before the cancelled future completes, it will synchronously complete both the newFuture 321 // from the cancelled operation and its own. This can cause one runnable to queue two tasks, 322 // breaking the invariant this method relies on to iteratively run the next task after the 323 // previous one completes. 324 if (get() == RunningState.CANCELLED) { 325 delegate = null; 326 sequencer = null; 327 return; 328 } 329 submitting = Thread.currentThread(); 330 try { 331 ThreadConfinedTaskQueue submittingTaskQueue = sequencer.latestTaskQueue; 332 if (submittingTaskQueue.thread == submitting) { 333 sequencer = null; 334 // Submit from inside a reentrant submit. We don't know if this one will be reentrant (and 335 // can't know without submitting something to the executor) so queue to run iteratively. 336 // Task must be null, since each execution on this executor can only produce one more 337 // execution. 338 checkState(submittingTaskQueue.nextTask == null); 339 submittingTaskQueue.nextTask = task; 340 submittingTaskQueue.nextExecutor = delegate; 341 delegate = null; 342 } else { 343 Executor localDelegate = delegate; 344 delegate = null; 345 this.task = task; 346 localDelegate.execute(this); 347 } 348 } finally { 349 // Important to null this out here - if we did *not* execute inline, we might still 350 // run() on the same thread that called execute() - such as in a thread pool, and think 351 // that it was happening inline. As a side benefit, avoids holding on to the Thread object 352 // longer than necessary. 353 submitting = null; 354 } 355 } 356 357 @SuppressWarnings("ShortCircuitBoolean") 358 @Override 359 public void run() { 360 Thread currentThread = Thread.currentThread(); 361 if (currentThread != submitting) { 362 Runnable localTask = task; 363 task = null; 364 localTask.run(); 365 return; 366 } 367 // Executor called reentrantly! Make sure that further calls don't overflow stack. Further 368 // reentrant calls will see that their current thread is the same as the one set in 369 // latestTaskQueue, and queue rather than calling execute() directly. 370 ThreadConfinedTaskQueue executingTaskQueue = new ThreadConfinedTaskQueue(); 371 executingTaskQueue.thread = currentThread; 372 // Unconditionally set; there is no risk of throwing away a queued task from another thread, 373 // because in order for the current task to run on this executor the previous task must have 374 // already started execution. Because each task on a TaskNonReentrantExecutor can only produce 375 // one execute() call to another instance from the same ExecutionSequencer, we know by 376 // induction that the task that launched this one must not have added any other runnables to 377 // that thread's queue, and thus we cannot be replacing a TaskAndThread object that would 378 // otherwise have another task queued on to it. Note the exception to this, cancellation, is 379 // specially handled in execute() - execute() calls triggered by cancellation are no-ops, and 380 // thus don't count. 381 sequencer.latestTaskQueue = executingTaskQueue; 382 sequencer = null; 383 try { 384 Runnable localTask = task; 385 task = null; 386 localTask.run(); 387 // Now check if our task attempted to reentrantly execute the next task. 388 Runnable queuedTask; 389 Executor queuedExecutor; 390 // Intentionally using non-short-circuit operator 391 while ((queuedTask = executingTaskQueue.nextTask) != null 392 & (queuedExecutor = executingTaskQueue.nextExecutor) != null) { 393 executingTaskQueue.nextTask = null; 394 executingTaskQueue.nextExecutor = null; 395 queuedExecutor.execute(queuedTask); 396 } 397 } finally { 398 // Null out the thread field, so that we don't leak a reference to Thread, and so that 399 // future `thread == currentThread()` calls from this thread don't incorrectly queue instead 400 // of executing. Don't null out the latestTaskQueue field, because the work done here 401 // may have scheduled more operations on another thread, and if those operations then 402 // trigger reentrant calls that thread will have updated the latestTaskQueue field, and 403 // we'd be interfering with their operation. 404 executingTaskQueue.thread = null; 405 } 406 } 407 408 private boolean trySetStarted() { 409 return compareAndSet(NOT_RUN, STARTED); 410 } 411 412 private boolean trySetCancelled() { 413 return compareAndSet(NOT_RUN, CANCELLED); 414 } 415 } 416}