001/*
002 * Copyright (C) 2018 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED;
019import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN;
020import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED;
021import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
022import static com.google.common.util.concurrent.Futures.immediateFuture;
023import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
024
025import com.google.common.annotations.Beta;
026import java.util.concurrent.Callable;
027import java.util.concurrent.Executor;
028import java.util.concurrent.atomic.AtomicReference;
029
030/**
031 * Serializes execution of a set of operations. This class guarantees that a submitted callable will
032 * not be called before previously submitted callables (and any {@code Future}s returned from them)
033 * have completed.
034 *
035 * <p>This class implements a superset of the behavior of {@link
036 * MoreExecutors#newSequentialExecutor}. If your tasks all run on the same underlying executor and
037 * don't need to wait for {@code Future}s returned from {@code AsyncCallable}s, use it instead.
038 *
039 * @since 26.0
040 */
041@Beta
042public final class ExecutionSequencer {
043
044  private ExecutionSequencer() {}
045
046  /** Creates a new instance. */
047  public static ExecutionSequencer create() {
048    return new ExecutionSequencer();
049  }
050
051  enum RunningState {
052    NOT_RUN,
053    CANCELLED,
054    STARTED,
055  }
056
057  /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */
058  private final AtomicReference<ListenableFuture<Object>> ref =
059      new AtomicReference<>(immediateFuture(null));
060
061  /**
062   * Enqueues a task to run when the previous task (if any) completes.
063   *
064   * <p>Cancellation does not propagate from the output future to a callable that has begun to
065   * execute, but if the output future is cancelled before {@link Callable#call()} is invoked,
066   * {@link Callable#call()} will not be invoked.
067   */
068  public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) {
069    checkNotNull(callable);
070    return submitAsync(
071        new AsyncCallable<T>() {
072          @Override
073          public ListenableFuture<T> call() throws Exception {
074            return immediateFuture(callable.call());
075          }
076
077          @Override
078          public String toString() {
079            return callable.toString();
080          }
081        },
082        executor);
083  }
084
085  /**
086   * Enqueues a task to run when the previous task (if any) completes.
087   *
088   * <p>Cancellation does not propagate from the output future to the future returned from {@code
089   * callable} or a callable that has begun to execute, but if the output future is cancelled before
090   * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked.
091   */
092  public <T> ListenableFuture<T> submitAsync(
093      final AsyncCallable<T> callable, final Executor executor) {
094    checkNotNull(callable);
095    final AtomicReference<RunningState> runningState = new AtomicReference<>(NOT_RUN);
096    final AsyncCallable<T> task =
097        new AsyncCallable<T>() {
098          @Override
099          public ListenableFuture<T> call() throws Exception {
100            if (!runningState.compareAndSet(NOT_RUN, STARTED)) {
101              return immediateCancelledFuture();
102            }
103            return callable.call();
104          }
105
106          @Override
107          public String toString() {
108            return callable.toString();
109          }
110        };
111    /*
112     * Four futures are at play here:
113     * taskFuture is the future tracking the result of the callable.
114     * newFuture is a future that completes after this and all prior tasks are done.
115     * oldFuture is the previous task's newFuture.
116     * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture.
117     *
118     * newFuture is guaranteed to only complete once all tasks previously submitted to this instance
119     * have completed - namely after oldFuture is done, and taskFuture has either completed or been
120     * cancelled before the callable started execution.
121     */
122    final SettableFuture<Object> newFuture = SettableFuture.create();
123
124    final ListenableFuture<?> oldFuture = ref.getAndSet(newFuture);
125
126    // Invoke our task once the previous future completes.
127    final ListenableFuture<T> taskFuture =
128        Futures.submitAsync(
129            task,
130            new Executor() {
131              @Override
132              public void execute(Runnable runnable) {
133                oldFuture.addListener(runnable, executor);
134              }
135            });
136
137    final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture);
138
139    // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture
140    // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that
141    // if the future we return is cancelled, we don't begin execution of the next task until after
142    // oldFuture completes.
143    Runnable listener =
144        new Runnable() {
145          @Override
146          public void run() {
147            if (taskFuture.isDone()
148                // If this CAS succeeds, we know that the provided callable will never be invoked,
149                // so when oldFuture completes it is safe to allow the next submitted task to
150                // proceed.
151                || (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) {
152              // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of
153              // a future that eventually came from immediateFuture(null), this doesn't leak
154              // throwables or completion values.
155              newFuture.setFuture(oldFuture);
156            }
157          }
158        };
159    // Adding the listener to both futures guarantees that newFuture will aways be set. Adding to
160    // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture
161    // propagates cancellation if the callable has not yet been invoked.
162    outputFuture.addListener(listener, directExecutor());
163    taskFuture.addListener(listener, directExecutor());
164
165    return outputFuture;
166  }
167}