001/*
002 * Copyright (C) 2018 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED;
019import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN;
020import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED;
021import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
022import static com.google.common.util.concurrent.Futures.immediateFuture;
023import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
024
025import com.google.common.annotations.Beta;
026import java.util.concurrent.Callable;
027import java.util.concurrent.Executor;
028import java.util.concurrent.atomic.AtomicReference;
029
030/**
031 * Serializes execution of a set of operations. This class guarantees that a submitted callable will
032 * not be called before previously submitted callables (and any {@code Future}s returned from them)
033 * have completed.
034 *
035 * <p>This class implements a superset of the behavior of {@link
036 * MoreExecutors#newSequentialExecutor}. If your tasks all run on the same underlying executor and
037 * don't need to wait for {@code Future}s returned from {@code AsyncCallable}s, use it instead.
038 *
039 * @since 26.0
040 */
041@Beta
042public final class ExecutionSequencer {
043
044  private ExecutionSequencer() {}
045
046  /** Creates a new instance. */
047  public static ExecutionSequencer create() {
048    return new ExecutionSequencer();
049  }
050
051  enum RunningState {
052    NOT_RUN,
053    CANCELLED,
054    STARTED,
055  }
056
057  /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */
058  private final AtomicReference<ListenableFuture<Object>> ref =
059      new AtomicReference<>(immediateFuture(null));
060
061  /**
062   * Enqueues a task to run when the previous task (if any) completes.
063   *
064   * <p>Cancellation does not propagate from the output future to a callable that has begun to
065   * execute, but if the output future is cancelled before {@link Callable#call()} is invoked,
066   * {@link Callable#call()} will not be invoked.
067   */
068  public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) {
069    checkNotNull(callable);
070    return submitAsync(
071        new AsyncCallable<T>() {
072          @Override
073          public ListenableFuture<T> call() throws Exception {
074            return immediateFuture(callable.call());
075          }
076        },
077        executor);
078  }
079
080  /**
081   * Enqueues a task to run when the previous task (if any) completes.
082   *
083   * <p>Cancellation does not propagate from the output future to the future returned from {@code
084   * callable} or a callable that has begun to execute, but if the output future is cancelled before
085   * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked.
086   */
087  public <T> ListenableFuture<T> submitAsync(
088      final AsyncCallable<T> callable, final Executor executor) {
089    checkNotNull(callable);
090    final AtomicReference<RunningState> runningState = new AtomicReference<>(NOT_RUN);
091    final AsyncCallable<T> task =
092        new AsyncCallable<T>() {
093          @Override
094          public ListenableFuture<T> call() throws Exception {
095            if (!runningState.compareAndSet(NOT_RUN, STARTED)) {
096              return immediateCancelledFuture();
097            }
098            return callable.call();
099          }
100        };
101    /*
102     * Four futures are at play here:
103     * taskFuture is the future tracking the result of the callable.
104     * newFuture is a future that completes after this and all prior tasks are done.
105     * oldFuture is the previous task's newFuture.
106     * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture.
107     *
108     * newFuture is guaranteed to only complete once all tasks previously submitted to this instance
109     * have completed - namely after oldFuture is done, and taskFuture has either completed or been
110     * cancelled before the callable started execution.
111     */
112    final SettableFuture<Object> newFuture = SettableFuture.create();
113
114    final ListenableFuture<?> oldFuture = ref.getAndSet(newFuture);
115
116    // Invoke our task once the previous future completes.
117    final ListenableFuture<T> taskFuture =
118        Futures.submitAsync(
119            task,
120            new Executor() {
121              @Override
122              public void execute(Runnable runnable) {
123                oldFuture.addListener(runnable, executor);
124              }
125            });
126
127    final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture);
128
129    // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture
130    // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that
131    // if the future we return is cancelled, we don't begin execution of the next task until after
132    // oldFuture completes.
133    Runnable listener =
134        new Runnable() {
135          @Override
136          public void run() {
137            if (taskFuture.isDone()
138                // If this CAS succeeds, we know that the provided callable will never be invoked,
139                // so when oldFuture completes it is safe to allow the next submitted task to
140                // proceed.
141                || (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) {
142              // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of
143              // a future that eventually came from immediateFuture(null), this doesn't leak
144              // throwables or completion values.
145              newFuture.setFuture(oldFuture);
146            }
147          }
148        };
149    // Adding the listener to both futures guarantees that newFuture will aways be set. Adding to
150    // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture
151    // propagates cancellation if the callable has not yet been invoked.
152    outputFuture.addListener(listener, directExecutor());
153    taskFuture.addListener(listener, directExecutor());
154
155    return outputFuture;
156  }
157}