001/*
002 * Copyright (C) 2018 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.base.Preconditions.checkState;
019import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED;
020import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN;
021import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED;
022import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
023import static com.google.common.util.concurrent.Futures.immediateFuture;
024import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
025import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
026
027import com.google.common.annotations.Beta;
028import java.util.concurrent.Callable;
029import java.util.concurrent.Executor;
030import java.util.concurrent.atomic.AtomicReference;
031
032/**
033 * Serializes execution of tasks, somewhat like an "asynchronous {@code synchronized} block." Each
034 * {@linkplain #submit enqueued} callable will not be submitted to its associated executor until the
035 * previous callable has returned -- and, if the previous callable was an {@link AsyncCallable}, not
036 * until the {@code Future} it returned is {@linkplain Future#isDone done} (successful, failed, or
037 * cancelled).
038 *
039 * <p>This class has limited support for cancellation and other "early completion":
040 *
041 * <ul>
042 *   <li>While calls to {@code submit} and {@code submitAsync} return a {@code Future} that can be
043 *       cancelled, cancellation never propagates to a task that has started to run -- neither to
044 *       the callable itself nor to any {@code Future} returned by an {@code AsyncCallable}.
045 *       (However, cancellation can prevent an <i>unstarted</i> task from running.) Therefore, the
046 *       next task will wait for any running callable (or pending {@code Future} returned by an
047 *       {@code AsyncCallable}) to complete, without interrupting it (and without calling {@code
048 *       cancel} on the {@code Future}). So beware: <i>Even if you cancel every precededing {@code
049 *       Future} returned by this class, the next task may still have to wait.</i>.
050 *   <li>Once an {@code AsyncCallable} returns a {@code Future}, this class considers that task to
051 *       be "done" as soon as <i>that</i> {@code Future} completes in any way. Notably, a {@code
052 *       Future} is "completed" even if it is cancelled while its underlying work continues on a
053 *       thread, an RPC, etc. The {@code Future} is also "completed" if it fails "early" -- for
054 *       example, if the deadline expires on a {@code Future} returned from {@link
055 *       Futures#withTimeout} while the {@code Future} it wraps continues its underlying work. So
056 *       beware: <i>Your {@code AsyncCallable} should not complete its {@code Future} until it is
057 *       safe for the next task to start.</i>
058 * </ul>
059 *
060 * <p>An additional limitation: this class serializes execution of <i>tasks</i> but not any
061 * <i>listeners</i> of those tasks.
062 *
063 * <p>This class is similar to {@link MoreExecutors#newSequentialExecutor}. This class is different
064 * in a few ways:
065 *
066 * <ul>
067 *   <li>Each task may be associated with a different executor.
068 *   <li>Tasks may be of type {@code AsyncCallable}.
069 *   <li>Running tasks <i>cannot</i> be interrupted. (Note that {@code newSequentialExecutor} does
070 *       not return {@code Future} objects, so it doesn't support interruption directly, either.
071 *       However, utilities that <i>use</i> that executor have the ability to interrupt tasks
072 *       running on it. This class, by contrast, does not expose an {@code Executor} API.)
073 * </ul>
074 *
075 * <p>If you don't need the features of this class, you may prefer {@code newSequentialExecutor} for
076 * its simplicity and ability to accommodate interruption.
077 *
078 * @since 26.0
079 */
080@Beta
081public final class ExecutionSequencer {
082
083  private ExecutionSequencer() {}
084
085  /** Creates a new instance. */
086  public static ExecutionSequencer create() {
087    return new ExecutionSequencer();
088  }
089
090  /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */
091  private final AtomicReference<ListenableFuture<Void>> ref =
092      new AtomicReference<>(immediateVoidFuture());
093
094  private ThreadConfinedTaskQueue latestTaskQueue = new ThreadConfinedTaskQueue();
095
096  /**
097   * This object is unsafely published, but avoids problematic races by relying exclusively on the
098   * identity equality of its Thread field so that the task field is only accessed by a single
099   * thread.
100   */
101  private static final class ThreadConfinedTaskQueue {
102    /**
103     * This field is only used for identity comparisons with the current thread. Field assignments
104     * are atomic, but do not provide happens-before ordering; however:
105     *
106     * <ul>
107     *   <li>If this field's value == currentThread, we know that it's up to date, because write
108     *       operations in a thread always happen-before subsequent read operations in the same
109     *       thread
110     *   <li>If this field's value == null because of unsafe publication, we know that it isn't the
111     *       object associated with our thread, because if it was the publication wouldn't have been
112     *       unsafe and we'd have seen our thread as the value. This state is also why a new
113     *       ThreadConfinedTaskQueue object must be created for each inline execution, because
114     *       observing a null thread does not mean the object is safe to reuse.
115     *   <li>If this field's value is some other thread object, we know that it's not our thread.
116     *   <li>If this field's value == null because it originally belonged to another thread and that
117     *       thread cleared it, we still know that it's not associated with our thread
118     *   <li>If this field's value == null because it was associated with our thread and was
119     *       cleared, we know that we're not executing inline any more
120     * </ul>
121     *
122     * All the states where thread != currentThread are identical for our purposes, and so even
123     * though it's racy, we don't care which of those values we get, so no need to synchronize.
124     */
125    Thread thread;
126    /** Only used by the thread associated with this object */
127    Runnable nextTask;
128    /** Only used by the thread associated with this object */
129    Executor nextExecutor;
130  }
131
132  /**
133   * Enqueues a task to run when the previous task (if any) completes.
134   *
135   * <p>Cancellation does not propagate from the output future to a callable that has begun to
136   * execute, but if the output future is cancelled before {@link Callable#call()} is invoked,
137   * {@link Callable#call()} will not be invoked.
138   */
139  public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) {
140    checkNotNull(callable);
141    checkNotNull(executor);
142    return submitAsync(
143        new AsyncCallable<T>() {
144          @Override
145          public ListenableFuture<T> call() throws Exception {
146            return immediateFuture(callable.call());
147          }
148
149          @Override
150          public String toString() {
151            return callable.toString();
152          }
153        },
154        executor);
155  }
156
157  /**
158   * Enqueues a task to run when the previous task (if any) completes.
159   *
160   * <p>Cancellation does not propagate from the output future to the future returned from {@code
161   * callable} or a callable that has begun to execute, but if the output future is cancelled before
162   * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked.
163   */
164  public <T> ListenableFuture<T> submitAsync(
165      final AsyncCallable<T> callable, final Executor executor) {
166    checkNotNull(callable);
167    checkNotNull(executor);
168    final TaskNonReentrantExecutor taskExecutor = new TaskNonReentrantExecutor(executor, this);
169    final AsyncCallable<T> task =
170        new AsyncCallable<T>() {
171          @Override
172          public ListenableFuture<T> call() throws Exception {
173            if (!taskExecutor.trySetStarted()) {
174              return immediateCancelledFuture();
175            }
176            return callable.call();
177          }
178
179          @Override
180          public String toString() {
181            return callable.toString();
182          }
183        };
184    /*
185     * Four futures are at play here:
186     * taskFuture is the future tracking the result of the callable.
187     * newFuture is a future that completes after this and all prior tasks are done.
188     * oldFuture is the previous task's newFuture.
189     * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture.
190     *
191     * newFuture is guaranteed to only complete once all tasks previously submitted to this instance
192     * have completed - namely after oldFuture is done, and taskFuture has either completed or been
193     * cancelled before the callable started execution.
194     */
195    final SettableFuture<Void> newFuture = SettableFuture.create();
196
197    final ListenableFuture<Void> oldFuture = ref.getAndSet(newFuture);
198
199    // Invoke our task once the previous future completes.
200    final TrustedListenableFutureTask<T> taskFuture = TrustedListenableFutureTask.create(task);
201    oldFuture.addListener(taskFuture, taskExecutor);
202
203    final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture);
204
205    // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture
206    // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that
207    // if the future we return is cancelled, we don't begin execution of the next task until after
208    // oldFuture completes.
209    Runnable listener =
210        new Runnable() {
211          @Override
212          public void run() {
213            if (taskFuture.isDone()) {
214              // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of
215              // a future that eventually came from immediateFuture(null), this doesn't leak
216              // throwables or completion values.
217              newFuture.setFuture(oldFuture);
218            } else if (outputFuture.isCancelled() && taskExecutor.trySetCancelled()) {
219              // If this CAS succeeds, we know that the provided callable will never be invoked,
220              // so when oldFuture completes it is safe to allow the next submitted task to
221              // proceed. Doing this immediately here lets the next task run without waiting for
222              // the cancelled task's executor to run the noop AsyncCallable.
223              //
224              // ---
225              //
226              // If the CAS fails, the provided callable already started running (or it is about
227              // to). Our contract promises:
228              //
229              // 1. not to execute a new callable until the old one has returned
230              //
231              // If we were to cancel taskFuture, that would let the next task start while the old
232              // one is still running.
233              //
234              // Now, maybe we could tweak our implementation to not start the next task until the
235              // callable actually completes. (We could detect completion in our wrapper
236              // `AsyncCallable task`.) However, our contract also promises:
237              //
238              // 2. not to cancel any Future the user returned from an AsyncCallable
239              //
240              // We promise this because, once we cancel that Future, we would no longer be able to
241              // tell when any underlying work it is doing is done. Thus, we might start a new task
242              // while that underlying work is still running.
243              //
244              // So that is why we cancel only in the case of CAS success.
245              taskFuture.cancel(false);
246            }
247          }
248        };
249    // Adding the listener to both futures guarantees that newFuture will aways be set. Adding to
250    // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture
251    // propagates cancellation if the callable has not yet been invoked.
252    outputFuture.addListener(listener, directExecutor());
253    taskFuture.addListener(listener, directExecutor());
254
255    return outputFuture;
256  }
257
258  enum RunningState {
259    NOT_RUN,
260    CANCELLED,
261    STARTED,
262  }
263
264  /**
265   * This class helps avoid a StackOverflowError when large numbers of tasks are submitted with
266   * {@link MoreExecutors#directExecutor}. Normally, when the first future completes, all the other
267   * tasks would be called recursively. Here, we detect that the delegate executor is executing
268   * inline, and maintain a queue to dispatch tasks iteratively. There is one instance of this class
269   * per call to submit() or submitAsync(), and each instance supports only one call to execute().
270   *
271   * <p>This class would certainly be simpler and easier to reason about if it were built with
272   * ThreadLocal; however, ThreadLocal is not well optimized for the case where the ThreadLocal is
273   * non-static, and is initialized/removed frequently - this causes churn in the Thread specific
274   * hashmaps. Using a static ThreadLocal to avoid that overhead would mean that different
275   * ExecutionSequencer objects interfere with each other, which would be undesirable, in addition
276   * to increasing the memory footprint of every thread that interacted with it. In order to release
277   * entries in thread-specific maps when the ThreadLocal object itself is no longer referenced,
278   * ThreadLocal is usually implemented with a WeakReference, which can have negative performance
279   * properties; for example, calling WeakReference.get() on Android will block during an
280   * otherwise-concurrent GC cycle.
281   */
282  @SuppressWarnings("ShouldNotSubclass") // Saving an allocation here is worth it
283  private static final class TaskNonReentrantExecutor extends AtomicReference<RunningState>
284      implements Executor, Runnable {
285
286    /**
287     * Used to update and read the latestTaskQueue field. Set to null once the runnable has been run
288     * or queued.
289     */
290    ExecutionSequencer sequencer;
291
292    /**
293     * Executor the task was set to run on. Set to null when the task has been queued, run, or
294     * cancelled.
295     */
296    Executor delegate;
297
298    /**
299     * Set before calling delegate.execute(); set to null once run, so that it can be GCed; this
300     * object may live on after, if submitAsync returns an incomplete future.
301     */
302    Runnable task;
303
304    /** Thread that called execute(). Set in execute, cleared when delegate.execute() returns. */
305    Thread submitting;
306
307    private TaskNonReentrantExecutor(Executor delegate, ExecutionSequencer sequencer) {
308      super(NOT_RUN);
309      this.delegate = delegate;
310      this.sequencer = sequencer;
311    }
312
313    @Override
314    public void execute(Runnable task) {
315      // If this operation was successfully cancelled already, calling the runnable will be a noop.
316      // This also avoids a race where if outputFuture is cancelled, it will call taskFuture.cancel,
317      // which will call newFuture.setFuture(oldFuture), to allow the next task in the queue to run
318      // without waiting for the user's executor to run our submitted Runnable. However, this can
319      // interact poorly with the reentrancy-avoiding behavior of this executor - when the operation
320      // before the cancelled future completes, it will synchronously complete both the newFuture
321      // from the cancelled operation and its own. This can cause one runnable to queue two tasks,
322      // breaking the invariant this method relies on to iteratively run the next task after the
323      // previous one completes.
324      if (get() == RunningState.CANCELLED) {
325        delegate = null;
326        sequencer = null;
327        return;
328      }
329      submitting = Thread.currentThread();
330      try {
331        ThreadConfinedTaskQueue submittingTaskQueue = sequencer.latestTaskQueue;
332        if (submittingTaskQueue.thread == submitting) {
333          sequencer = null;
334          // Submit from inside a reentrant submit. We don't know if this one will be reentrant (and
335          // can't know without submitting something to the executor) so queue to run iteratively.
336          // Task must be null, since each execution on this executor can only produce one more
337          // execution.
338          checkState(submittingTaskQueue.nextTask == null);
339          submittingTaskQueue.nextTask = task;
340          submittingTaskQueue.nextExecutor = delegate;
341          delegate = null;
342        } else {
343          Executor localDelegate = delegate;
344          delegate = null;
345          this.task = task;
346          localDelegate.execute(this);
347        }
348      } finally {
349        // Important to null this out here - if we did *not* execute inline, we might still
350        // run() on the same thread that called execute() - such as in a thread pool, and think
351        // that it was happening inline. As a side benefit, avoids holding on to the Thread object
352        // longer than necessary.
353        submitting = null;
354      }
355    }
356
357    @SuppressWarnings("ShortCircuitBoolean")
358    @Override
359    public void run() {
360      Thread currentThread = Thread.currentThread();
361      if (currentThread != submitting) {
362        Runnable localTask = task;
363        task = null;
364        localTask.run();
365        return;
366      }
367      // Executor called reentrantly! Make sure that further calls don't overflow stack. Further
368      // reentrant calls will see that their current thread is the same as the one set in
369      // latestTaskQueue, and queue rather than calling execute() directly.
370      ThreadConfinedTaskQueue executingTaskQueue = new ThreadConfinedTaskQueue();
371      executingTaskQueue.thread = currentThread;
372      // Unconditionally set; there is no risk of throwing away a queued task from another thread,
373      // because in order for the current task to run on this executor the previous task must have
374      // already started execution. Because each task on a TaskNonReentrantExecutor can only produce
375      // one execute() call to another instance from the same ExecutionSequencer, we know by
376      // induction that the task that launched this one must not have added any other runnables to
377      // that thread's queue, and thus we cannot be replacing a TaskAndThread object that would
378      // otherwise have another task queued on to it. Note the exception to this, cancellation, is
379      // specially handled in execute() - execute() calls triggered by cancellation are no-ops, and
380      // thus don't count.
381      sequencer.latestTaskQueue = executingTaskQueue;
382      sequencer = null;
383      try {
384        Runnable localTask = task;
385        task = null;
386        localTask.run();
387        // Now check if our task attempted to reentrantly execute the next task.
388        Runnable queuedTask;
389        Executor queuedExecutor;
390        // Intentionally using non-short-circuit operator
391        while ((queuedTask = executingTaskQueue.nextTask) != null
392            & (queuedExecutor = executingTaskQueue.nextExecutor) != null) {
393          executingTaskQueue.nextTask = null;
394          executingTaskQueue.nextExecutor = null;
395          queuedExecutor.execute(queuedTask);
396        }
397      } finally {
398        // Null out the thread field, so that we don't leak a reference to Thread, and so that
399        // future `thread == currentThread()` calls from this thread don't incorrectly queue instead
400        // of executing. Don't null out the latestTaskQueue field, because the work done here
401        // may have scheduled more operations on another thread, and if those operations then
402        // trigger reentrant calls that thread will have updated the latestTaskQueue field, and
403        // we'd be interfering with their operation.
404        executingTaskQueue.thread = null;
405      }
406    }
407
408    private boolean trySetStarted() {
409      return compareAndSet(NOT_RUN, STARTED);
410    }
411
412    private boolean trySetCancelled() {
413      return compareAndSet(NOT_RUN, CANCELLED);
414    }
415  }
416}