001/*
002 * Copyright (C) 2018 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.base.Preconditions.checkState;
019import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED;
020import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN;
021import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED;
022import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
023import static com.google.common.util.concurrent.Futures.immediateFuture;
024import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
025import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
026import static java.util.Objects.requireNonNull;
027
028import com.google.common.annotations.J2ktIncompatible;
029import com.google.errorprone.annotations.concurrent.LazyInit;
030import java.util.concurrent.Callable;
031import java.util.concurrent.Executor;
032import java.util.concurrent.atomic.AtomicReference;
033import javax.annotation.CheckForNull;
034import org.checkerframework.checker.nullness.qual.Nullable;
035
036/**
037 * Serializes execution of tasks, somewhat like an "asynchronous {@code synchronized} block." Each
038 * {@linkplain #submit enqueued} callable will not be submitted to its associated executor until the
039 * previous callable has returned -- and, if the previous callable was an {@link AsyncCallable}, not
040 * until the {@code Future} it returned is {@linkplain Future#isDone done} (successful, failed, or
041 * cancelled).
042 *
043 * <p>This class serializes execution of <i>submitted</i> tasks but not any <i>listeners</i> of
044 * those tasks.
045 *
046 * <p>Submitted tasks have a happens-before order as defined in the Java Language Specification.
047 * Tasks execute with the same happens-before order that the function calls to {@link #submit} and
048 * {@link #submitAsync} that submitted those tasks had.
049 *
050 * <p>This class has limited support for cancellation and other "early completions":
051 *
052 * <ul>
053 *   <li>While calls to {@code submit} and {@code submitAsync} return a {@code Future} that can be
054 *       cancelled, cancellation never propagates to a task that has started to run -- neither to
055 *       the callable itself nor to any {@code Future} returned by an {@code AsyncCallable}.
056 *       (However, cancellation can prevent an <i>unstarted</i> task from running.) Therefore, the
057 *       next task will wait for any running callable (or pending {@code Future} returned by an
058 *       {@code AsyncCallable}) to complete, without interrupting it (and without calling {@code
059 *       cancel} on the {@code Future}). So beware: <i>Even if you cancel every preceding {@code
060 *       Future} returned by this class, the next task may still have to wait.</i>.
061 *   <li>Once an {@code AsyncCallable} returns a {@code Future}, this class considers that task to
062 *       be "done" as soon as <i>that</i> {@code Future} completes in any way. Notably, a {@code
063 *       Future} is "completed" even if it is cancelled while its underlying work continues on a
064 *       thread, an RPC, etc. The {@code Future} is also "completed" if it fails "early" -- for
065 *       example, if the deadline expires on a {@code Future} returned from {@link
066 *       Futures#withTimeout} while the {@code Future} it wraps continues its underlying work. So
067 *       beware: <i>Your {@code AsyncCallable} should not complete its {@code Future} until it is
068 *       safe for the next task to start.</i>
069 * </ul>
070 *
071 * <p>This class is similar to {@link MoreExecutors#newSequentialExecutor}. This class is different
072 * in a few ways:
073 *
074 * <ul>
075 *   <li>Each task may be associated with a different executor.
076 *   <li>Tasks may be of type {@code AsyncCallable}.
077 *   <li>Running tasks <i>cannot</i> be interrupted. (Note that {@code newSequentialExecutor} does
078 *       not return {@code Future} objects, so it doesn't support interruption directly, either.
079 *       However, utilities that <i>use</i> that executor have the ability to interrupt tasks
080 *       running on it. This class, by contrast, does not expose an {@code Executor} API.)
081 * </ul>
082 *
083 * <p>If you don't need the features of this class, you may prefer {@code newSequentialExecutor} for
084 * its simplicity and ability to accommodate interruption.
085 *
086 * @since 26.0
087 */
088@ElementTypesAreNonnullByDefault
089@J2ktIncompatible
090public final class ExecutionSequencer {
091
092  private ExecutionSequencer() {}
093
094  /** Creates a new instance. */
095  public static ExecutionSequencer create() {
096    return new ExecutionSequencer();
097  }
098
099  /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */
100  private final AtomicReference<ListenableFuture<@Nullable Void>> ref =
101      new AtomicReference<>(immediateVoidFuture());
102
103  private @LazyInit ThreadConfinedTaskQueue latestTaskQueue = new ThreadConfinedTaskQueue();
104
105  /**
106   * This object is unsafely published, but avoids problematic races by relying exclusively on the
107   * identity equality of its Thread field so that the task field is only accessed by a single
108   * thread.
109   */
110  private static final class ThreadConfinedTaskQueue {
111    /**
112     * This field is only used for identity comparisons with the current thread. Field assignments
113     * are atomic, but do not provide happens-before ordering; however:
114     *
115     * <ul>
116     *   <li>If this field's value == currentThread, we know that it's up to date, because write
117     *       operations in a thread always happen-before subsequent read operations in the same
118     *       thread
119     *   <li>If this field's value == null because of unsafe publication, we know that it isn't the
120     *       object associated with our thread, because if it was the publication wouldn't have been
121     *       unsafe and we'd have seen our thread as the value. This state is also why a new
122     *       ThreadConfinedTaskQueue object must be created for each inline execution, because
123     *       observing a null thread does not mean the object is safe to reuse.
124     *   <li>If this field's value is some other thread object, we know that it's not our thread.
125     *   <li>If this field's value == null because it originally belonged to another thread and that
126     *       thread cleared it, we still know that it's not associated with our thread
127     *   <li>If this field's value == null because it was associated with our thread and was
128     *       cleared, we know that we're not executing inline any more
129     * </ul>
130     *
131     * All the states where thread != currentThread are identical for our purposes, and so even
132     * though it's racy, we don't care which of those values we get, so no need to synchronize.
133     */
134    @CheckForNull @LazyInit Thread thread;
135
136    /** Only used by the thread associated with this object */
137    @CheckForNull Runnable nextTask;
138
139    /** Only used by the thread associated with this object */
140    @CheckForNull Executor nextExecutor;
141  }
142
143  /**
144   * Enqueues a task to run when the previous task (if any) completes.
145   *
146   * <p>Cancellation does not propagate from the output future to a callable that has begun to
147   * execute, but if the output future is cancelled before {@link Callable#call()} is invoked,
148   * {@link Callable#call()} will not be invoked.
149   */
150  public <T extends @Nullable Object> ListenableFuture<T> submit(
151      Callable<T> callable, Executor executor) {
152    checkNotNull(callable);
153    checkNotNull(executor);
154    return submitAsync(
155        new AsyncCallable<T>() {
156          @Override
157          public ListenableFuture<T> call() throws Exception {
158            return immediateFuture(callable.call());
159          }
160
161          @Override
162          public String toString() {
163            return callable.toString();
164          }
165        },
166        executor);
167  }
168
169  /**
170   * Enqueues a task to run when the previous task (if any) completes.
171   *
172   * <p>Cancellation does not propagate from the output future to the future returned from {@code
173   * callable} or a callable that has begun to execute, but if the output future is cancelled before
174   * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked.
175   */
176  public <T extends @Nullable Object> ListenableFuture<T> submitAsync(
177      AsyncCallable<T> callable, Executor executor) {
178    checkNotNull(callable);
179    checkNotNull(executor);
180    TaskNonReentrantExecutor taskExecutor = new TaskNonReentrantExecutor(executor, this);
181    AsyncCallable<T> task =
182        new AsyncCallable<T>() {
183          @Override
184          public ListenableFuture<T> call() throws Exception {
185            if (!taskExecutor.trySetStarted()) {
186              return immediateCancelledFuture();
187            }
188            return callable.call();
189          }
190
191          @Override
192          public String toString() {
193            return callable.toString();
194          }
195        };
196    /*
197     * Four futures are at play here:
198     * taskFuture is the future tracking the result of the callable.
199     * newFuture is a future that completes after this and all prior tasks are done.
200     * oldFuture is the previous task's newFuture.
201     * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture.
202     *
203     * newFuture is guaranteed to only complete once all tasks previously submitted to this instance
204     * have completed - namely after oldFuture is done, and taskFuture has either completed or been
205     * cancelled before the callable started execution.
206     */
207    SettableFuture<@Nullable Void> newFuture = SettableFuture.create();
208
209    ListenableFuture<@Nullable Void> oldFuture = ref.getAndSet(newFuture);
210
211    // Invoke our task once the previous future completes.
212    TrustedListenableFutureTask<T> taskFuture = TrustedListenableFutureTask.create(task);
213    oldFuture.addListener(taskFuture, taskExecutor);
214
215    ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture);
216
217    // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture
218    // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that
219    // if the future we return is cancelled, we don't begin execution of the next task until after
220    // oldFuture completes.
221    Runnable listener =
222        () -> {
223          if (taskFuture.isDone()) {
224            // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of
225            // a future that eventually came from immediateFuture(null), this doesn't leak
226            // throwables or completion values.
227            newFuture.setFuture(oldFuture);
228          } else if (outputFuture.isCancelled() && taskExecutor.trySetCancelled()) {
229            // If this CAS succeeds, we know that the provided callable will never be invoked,
230            // so when oldFuture completes it is safe to allow the next submitted task to
231            // proceed. Doing this immediately here lets the next task run without waiting for
232            // the cancelled task's executor to run the noop AsyncCallable.
233            //
234            // ---
235            //
236            // If the CAS fails, the provided callable already started running (or it is about
237            // to). Our contract promises:
238            //
239            // 1. not to execute a new callable until the old one has returned
240            //
241            // If we were to cancel taskFuture, that would let the next task start while the old
242            // one is still running.
243            //
244            // Now, maybe we could tweak our implementation to not start the next task until the
245            // callable actually completes. (We could detect completion in our wrapper
246            // `AsyncCallable task`.) However, our contract also promises:
247            //
248            // 2. not to cancel any Future the user returned from an AsyncCallable
249            //
250            // We promise this because, once we cancel that Future, we would no longer be able to
251            // tell when any underlying work it is doing is done. Thus, we might start a new task
252            // while that underlying work is still running.
253            //
254            // So that is why we cancel only in the case of CAS success.
255            taskFuture.cancel(false);
256          }
257        };
258    // Adding the listener to both futures guarantees that newFuture will always be set. Adding to
259    // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture
260    // propagates cancellation if the callable has not yet been invoked.
261    outputFuture.addListener(listener, directExecutor());
262    taskFuture.addListener(listener, directExecutor());
263
264    return outputFuture;
265  }
266
267  enum RunningState {
268    NOT_RUN,
269    CANCELLED,
270    STARTED,
271  }
272
273  /**
274   * This class helps avoid a StackOverflowError when large numbers of tasks are submitted with
275   * {@link MoreExecutors#directExecutor}. Normally, when the first future completes, all the other
276   * tasks would be called recursively. Here, we detect that the delegate executor is executing
277   * inline, and maintain a queue to dispatch tasks iteratively. There is one instance of this class
278   * per call to submit() or submitAsync(), and each instance supports only one call to execute().
279   *
280   * <p>This class would certainly be simpler and easier to reason about if it were built with
281   * ThreadLocal; however, ThreadLocal is not well optimized for the case where the ThreadLocal is
282   * non-static, and is initialized/removed frequently - this causes churn in the Thread specific
283   * hashmaps. Using a static ThreadLocal to avoid that overhead would mean that different
284   * ExecutionSequencer objects interfere with each other, which would be undesirable, in addition
285   * to increasing the memory footprint of every thread that interacted with it. In order to release
286   * entries in thread-specific maps when the ThreadLocal object itself is no longer referenced,
287   * ThreadLocal is usually implemented with a WeakReference, which can have negative performance
288   * properties; for example, calling WeakReference.get() on Android will block during an
289   * otherwise-concurrent GC cycle.
290   */
291  private static final class TaskNonReentrantExecutor extends AtomicReference<RunningState>
292      implements Executor, Runnable {
293
294    /**
295     * Used to update and read the latestTaskQueue field. Set to null once the runnable has been run
296     * or queued.
297     */
298    @CheckForNull ExecutionSequencer sequencer;
299
300    /**
301     * Executor the task was set to run on. Set to null when the task has been queued, run, or
302     * cancelled.
303     */
304    @CheckForNull Executor delegate;
305
306    /**
307     * Set before calling delegate.execute(); set to null once run, so that it can be GCed; this
308     * object may live on after, if submitAsync returns an incomplete future.
309     */
310    @CheckForNull Runnable task;
311
312    /** Thread that called execute(). Set in execute, cleared when delegate.execute() returns. */
313    @CheckForNull @LazyInit Thread submitting;
314
315    private TaskNonReentrantExecutor(Executor delegate, ExecutionSequencer sequencer) {
316      super(NOT_RUN);
317      this.delegate = delegate;
318      this.sequencer = sequencer;
319    }
320
321    @Override
322    public void execute(Runnable task) {
323      // If this operation was successfully cancelled already, calling the runnable will be a noop.
324      // This also avoids a race where if outputFuture is cancelled, it will call taskFuture.cancel,
325      // which will call newFuture.setFuture(oldFuture), to allow the next task in the queue to run
326      // without waiting for the user's executor to run our submitted Runnable. However, this can
327      // interact poorly with the reentrancy-avoiding behavior of this executor - when the operation
328      // before the cancelled future completes, it will synchronously complete both the newFuture
329      // from the cancelled operation and its own. This can cause one runnable to queue two tasks,
330      // breaking the invariant this method relies on to iteratively run the next task after the
331      // previous one completes.
332      if (get() == RunningState.CANCELLED) {
333        delegate = null;
334        sequencer = null;
335        return;
336      }
337      submitting = Thread.currentThread();
338
339      try {
340        /*
341         * requireNonNull is safe because we don't null out `sequencer` except:
342         *
343         * - above, where we return (in which case we never get here)
344         *
345         * - in `run`, which can't run until this Runnable is submitted to an executor, which
346         *   doesn't happen until below. (And this Executor -- yes, the object is both a Runnable
347         *   and an Executor -- is used for only a single `execute` call.)
348         */
349        ThreadConfinedTaskQueue submittingTaskQueue = requireNonNull(sequencer).latestTaskQueue;
350        if (submittingTaskQueue.thread == submitting) {
351          sequencer = null;
352          // Submit from inside a reentrant submit. We don't know if this one will be reentrant (and
353          // can't know without submitting something to the executor) so queue to run iteratively.
354          // Task must be null, since each execution on this executor can only produce one more
355          // execution.
356          checkState(submittingTaskQueue.nextTask == null);
357          submittingTaskQueue.nextTask = task;
358          // requireNonNull(delegate) is safe for reasons similar to requireNonNull(sequencer).
359          submittingTaskQueue.nextExecutor = requireNonNull(delegate);
360          delegate = null;
361        } else {
362          // requireNonNull(delegate) is safe for reasons similar to requireNonNull(sequencer).
363          Executor localDelegate = requireNonNull(delegate);
364          delegate = null;
365          this.task = task;
366          localDelegate.execute(this);
367        }
368      } finally {
369        // Important to null this out here - if we did *not* execute inline, we might still
370        // run() on the same thread that called execute() - such as in a thread pool, and think
371        // that it was happening inline. As a side benefit, avoids holding on to the Thread object
372        // longer than necessary.
373        submitting = null;
374      }
375    }
376
377    @SuppressWarnings("ShortCircuitBoolean")
378    @Override
379    public void run() {
380      Thread currentThread = Thread.currentThread();
381      if (currentThread != submitting) {
382        /*
383         * requireNonNull is safe because we set `task` before submitting this Runnable to an
384         * Executor, and we don't null it out until here.
385         */
386        Runnable localTask = requireNonNull(task);
387        task = null;
388        localTask.run();
389        return;
390      }
391      // Executor called reentrantly! Make sure that further calls don't overflow stack. Further
392      // reentrant calls will see that their current thread is the same as the one set in
393      // latestTaskQueue, and queue rather than calling execute() directly.
394      ThreadConfinedTaskQueue executingTaskQueue = new ThreadConfinedTaskQueue();
395      executingTaskQueue.thread = currentThread;
396      /*
397       * requireNonNull is safe because we don't null out `sequencer` except:
398       *
399       * - after the requireNonNull call below. (And this object has its Runnable.run override
400       *   called only once, just as it has its Executor.execute override called only once.)
401       *
402       * - if we return immediately from `execute` (in which case we never get here)
403       *
404       * - in the "reentrant submit" case of `execute` (in which case we must have started running a
405       *   user task -- which means that we already got past this code (or else we exited early
406       *   above))
407       */
408      // Unconditionally set; there is no risk of throwing away a queued task from another thread,
409      // because in order for the current task to run on this executor the previous task must have
410      // already started execution. Because each task on a TaskNonReentrantExecutor can only produce
411      // one execute() call to another instance from the same ExecutionSequencer, we know by
412      // induction that the task that launched this one must not have added any other runnables to
413      // that thread's queue, and thus we cannot be replacing a TaskAndThread object that would
414      // otherwise have another task queued on to it. Note the exception to this, cancellation, is
415      // specially handled in execute() - execute() calls triggered by cancellation are no-ops, and
416      // thus don't count.
417      requireNonNull(sequencer).latestTaskQueue = executingTaskQueue;
418      sequencer = null;
419      try {
420        // requireNonNull is safe, as discussed above.
421        Runnable localTask = requireNonNull(task);
422        task = null;
423        localTask.run();
424        // Now check if our task attempted to reentrantly execute the next task.
425        Runnable queuedTask;
426        Executor queuedExecutor;
427        // Intentionally using non-short-circuit operator
428        while ((queuedTask = executingTaskQueue.nextTask) != null
429            && (queuedExecutor = executingTaskQueue.nextExecutor) != null) {
430          executingTaskQueue.nextTask = null;
431          executingTaskQueue.nextExecutor = null;
432          queuedExecutor.execute(queuedTask);
433        }
434      } finally {
435        // Null out the thread field, so that we don't leak a reference to Thread, and so that
436        // future `thread == currentThread()` calls from this thread don't incorrectly queue instead
437        // of executing. Don't null out the latestTaskQueue field, because the work done here
438        // may have scheduled more operations on another thread, and if those operations then
439        // trigger reentrant calls that thread will have updated the latestTaskQueue field, and
440        // we'd be interfering with their operation.
441        executingTaskQueue.thread = null;
442      }
443    }
444
445    private boolean trySetStarted() {
446      return compareAndSet(NOT_RUN, STARTED);
447    }
448
449    private boolean trySetCancelled() {
450      return compareAndSet(NOT_RUN, CANCELLED);
451    }
452  }
453}