001/* 002 * Copyright (C) 2018 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.CANCELLED; 019import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.NOT_RUN; 020import static com.google.common.util.concurrent.ExecutionSequencer.RunningState.STARTED; 021import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 022import static com.google.common.util.concurrent.Futures.immediateFuture; 023import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 024 025import com.google.common.annotations.Beta; 026import java.util.concurrent.Callable; 027import java.util.concurrent.Executor; 028import java.util.concurrent.atomic.AtomicReference; 029 030/** 031 * Serializes execution of a set of operations. This class guarantees that a submitted callable will 032 * not be called before previously submitted callables (and any {@code Future}s returned from them) 033 * have completed. 034 * 035 * <p>This class implements a superset of the behavior of {@link 036 * MoreExecutors#newSequentialExecutor}. If your tasks all run on the same underlying executor and 037 * don't need to wait for {@code Future}s returned from {@code AsyncCallable}s, use it instead. 038 * 039 * @since 26.0 040 */ 041@Beta 042public final class ExecutionSequencer { 043 044 private ExecutionSequencer() {} 045 046 /** Creates a new instance. */ 047 public static ExecutionSequencer create() { 048 return new ExecutionSequencer(); 049 } 050 051 enum RunningState { 052 NOT_RUN, 053 CANCELLED, 054 STARTED, 055 } 056 057 /** This reference acts as a pointer tracking the head of a linked list of ListenableFutures. */ 058 private final AtomicReference<ListenableFuture<Object>> ref = 059 new AtomicReference<>(immediateFuture(null)); 060 061 /** 062 * Enqueues a task to run when the previous task (if any) completes. 063 * 064 * <p>Cancellation does not propagate from the output future to a callable that has begun to 065 * execute, but if the output future is cancelled before {@link Callable#call()} is invoked, 066 * {@link Callable#call()} will not be invoked. 067 */ 068 public <T> ListenableFuture<T> submit(final Callable<T> callable, Executor executor) { 069 checkNotNull(callable); 070 return submitAsync( 071 new AsyncCallable<T>() { 072 @Override 073 public ListenableFuture<T> call() throws Exception { 074 return immediateFuture(callable.call()); 075 } 076 077 @Override 078 public String toString() { 079 return callable.toString(); 080 } 081 }, 082 executor); 083 } 084 085 /** 086 * Enqueues a task to run when the previous task (if any) completes. 087 * 088 * <p>Cancellation does not propagate from the output future to the future returned from {@code 089 * callable} or a callable that has begun to execute, but if the output future is cancelled before 090 * {@link AsyncCallable#call()} is invoked, {@link AsyncCallable#call()} will not be invoked. 091 */ 092 public <T> ListenableFuture<T> submitAsync( 093 final AsyncCallable<T> callable, final Executor executor) { 094 checkNotNull(callable); 095 final AtomicReference<RunningState> runningState = new AtomicReference<>(NOT_RUN); 096 final AsyncCallable<T> task = 097 new AsyncCallable<T>() { 098 @Override 099 public ListenableFuture<T> call() throws Exception { 100 if (!runningState.compareAndSet(NOT_RUN, STARTED)) { 101 return immediateCancelledFuture(); 102 } 103 return callable.call(); 104 } 105 106 @Override 107 public String toString() { 108 return callable.toString(); 109 } 110 }; 111 /* 112 * Four futures are at play here: 113 * taskFuture is the future tracking the result of the callable. 114 * newFuture is a future that completes after this and all prior tasks are done. 115 * oldFuture is the previous task's newFuture. 116 * outputFuture is the future we return to the caller, a nonCancellationPropagating taskFuture. 117 * 118 * newFuture is guaranteed to only complete once all tasks previously submitted to this instance 119 * have completed - namely after oldFuture is done, and taskFuture has either completed or been 120 * cancelled before the callable started execution. 121 */ 122 final SettableFuture<Object> newFuture = SettableFuture.create(); 123 124 final ListenableFuture<?> oldFuture = ref.getAndSet(newFuture); 125 126 // Invoke our task once the previous future completes. 127 final ListenableFuture<T> taskFuture = 128 Futures.submitAsync( 129 task, 130 new Executor() { 131 @Override 132 public void execute(Runnable runnable) { 133 oldFuture.addListener(runnable, executor); 134 } 135 }); 136 137 final ListenableFuture<T> outputFuture = Futures.nonCancellationPropagating(taskFuture); 138 139 // newFuture's lifetime is determined by taskFuture, which can't complete before oldFuture 140 // unless taskFuture is cancelled, in which case it falls back to oldFuture. This ensures that 141 // if the future we return is cancelled, we don't begin execution of the next task until after 142 // oldFuture completes. 143 Runnable listener = 144 new Runnable() { 145 @Override 146 public void run() { 147 if (taskFuture.isDone() 148 // If this CAS succeeds, we know that the provided callable will never be invoked, 149 // so when oldFuture completes it is safe to allow the next submitted task to 150 // proceed. 151 || (outputFuture.isCancelled() && runningState.compareAndSet(NOT_RUN, CANCELLED))) { 152 // Since the value of oldFuture can only ever be immediateFuture(null) or setFuture of 153 // a future that eventually came from immediateFuture(null), this doesn't leak 154 // throwables or completion values. 155 newFuture.setFuture(oldFuture); 156 } 157 } 158 }; 159 // Adding the listener to both futures guarantees that newFuture will aways be set. Adding to 160 // taskFuture guarantees completion if the callable is invoked, and adding to outputFuture 161 // propagates cancellation if the callable has not yet been invoked. 162 outputFuture.addListener(listener, directExecutor()); 163 taskFuture.addListener(listener, directExecutor()); 164 165 return outputFuture; 166 } 167}