001 /*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.util.concurrent;
018
019 import static com.google.common.base.Preconditions.checkNotNull;
020
021 import com.google.common.annotations.Beta;
022
023 import java.util.concurrent.CancellationException;
024 import java.util.concurrent.ExecutionException;
025 import java.util.concurrent.Future;
026 import java.util.concurrent.TimeUnit;
027 import java.util.concurrent.TimeoutException;
028 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
029
030 import javax.annotation.Nullable;
031
032 /**
033 * <p>An abstract implementation of the {@link Future} interface. This class
034 * is an abstraction of {@link java.util.concurrent.FutureTask} to support use
035 * for tasks other than {@link Runnable}s. It uses an
036 * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and
037 * guarantee thread safety. It could be used as a base class to
038 * {@code FutureTask}, or any other implementor of the {@code Future} interface.
039 *
040 * <p>This class implements all methods in {@code Future}. Subclasses should
041 * provide a way to set the result of the computation through the protected
042 * methods {@link #set(Object)}, {@link #setException(Throwable)}, or
043 * {@link #cancel()}. If subclasses want to implement cancellation they can
044 * override the {@link #cancel(boolean)} method with a real implementation, the
045 * default implementation doesn't support cancellation.
046 *
047 * <p>The state changing methods all return a boolean indicating success or
048 * failure in changing the future's state. Valid states are running,
049 * completed, failed, or cancelled. Because this class does not implement
050 * cancellation it is left to the subclass to distinguish between created
051 * and running tasks.
052 *
053 * @author Sven Mawson
054 * @since 1
055 */
056 @Beta
057 public abstract class AbstractFuture<V> implements Future<V> {
058
059 /** Synchronization control for AbstractFutures. */
060 private final Sync<V> sync = new Sync<V>();
061
062 /*
063 * Blocks until either the task completes or the timeout expires. Uses the
064 * sync blocking-with-timeout support provided by AQS.
065 */
066 @Override
067 public V get(long timeout, TimeUnit unit) throws InterruptedException,
068 TimeoutException, ExecutionException {
069 return sync.get(unit.toNanos(timeout));
070 }
071
072 /*
073 * Blocks until the task completes or we get interrupted. Uses the
074 * interruptible blocking support provided by AQS.
075 */
076 @Override
077 public V get() throws InterruptedException, ExecutionException {
078 return sync.get();
079 }
080
081 /*
082 * Checks if the sync is not in the running state.
083 */
084 @Override
085 public boolean isDone() {
086 return sync.isDone();
087 }
088
089 /*
090 * Checks if the sync is in the cancelled state.
091 */
092 @Override
093 public boolean isCancelled() {
094 return sync.isCancelled();
095 }
096
097 /*
098 * Default implementation of cancel that never cancels the future.
099 * Subclasses should override this to implement cancellation if desired.
100 */
101 @Override
102 public boolean cancel(boolean mayInterruptIfRunning) {
103 return false;
104 }
105
106 /**
107 * Subclasses should invoke this method to set the result of the computation
108 * to {@code value}. This will set the state of the future to
109 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
110 * state was successfully changed.
111 *
112 * @param value the value that was the result of the task.
113 * @return true if the state was successfully changed.
114 */
115 protected boolean set(@Nullable V value) {
116 boolean result = sync.set(value);
117 if (result) {
118 done();
119 }
120 return result;
121 }
122
123 /**
124 * Subclasses should invoke this method to set the result of the computation
125 * to an error, {@code throwable}. This will set the state of the future to
126 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
127 * state was successfully changed.
128 *
129 * @param throwable the exception that the task failed with.
130 * @return true if the state was successfully changed.
131 * @throws Error if the throwable was an {@link Error}.
132 */
133 protected boolean setException(Throwable throwable) {
134 boolean result = sync.setException(checkNotNull(throwable));
135 if (result) {
136 done();
137 }
138
139 // If it's an Error, we want to make sure it reaches the top of the
140 // call stack, so we rethrow it.
141 if (throwable instanceof Error) {
142 throw (Error) throwable;
143 }
144 return result;
145 }
146
147 /**
148 * Subclasses should invoke this method to mark the future as cancelled.
149 * This will set the state of the future to {@link
150 * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
151 * successfully changed.
152 *
153 * @return true if the state was successfully changed.
154 */
155 protected final boolean cancel() {
156 boolean result = sync.cancel();
157 if (result) {
158 done();
159 }
160 return result;
161 }
162
163 /*
164 * Called by the success, failed, or cancelled methods to indicate that the
165 * value is now available and the latch can be released. Subclasses can
166 * use this method to deal with any actions that should be undertaken when
167 * the task has completed.
168 */
169 protected void done() {
170 // Default implementation does nothing.
171 }
172
173 /**
174 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
175 * private subclass to hold the synchronizer. This synchronizer is used to
176 * implement the blocking and waiting calls as well as to handle state changes
177 * in a thread-safe manner. The current state of the future is held in the
178 * Sync state, and the lock is released whenever the state changes to either
179 * {@link #COMPLETED} or {@link #CANCELLED}.
180 *
181 * <p>To avoid races between threads doing release and acquire, we transition
182 * to the final state in two steps. One thread will successfully CAS from
183 * RUNNING to COMPLETING, that thread will then set the result of the
184 * computation, and only then transition to COMPLETED or CANCELLED.
185 *
186 * <p>We don't use the integer argument passed between acquire methods so we
187 * pass around a -1 everywhere.
188 */
189 static final class Sync<V> extends AbstractQueuedSynchronizer {
190
191 private static final long serialVersionUID = 0L;
192
193 /* Valid states. */
194 static final int RUNNING = 0;
195 static final int COMPLETING = 1;
196 static final int COMPLETED = 2;
197 static final int CANCELLED = 4;
198
199 private V value;
200 private Throwable exception;
201
202 /*
203 * Acquisition succeeds if the future is done, otherwise it fails.
204 */
205 @Override
206 protected int tryAcquireShared(int ignored) {
207 if (isDone()) {
208 return 1;
209 }
210 return -1;
211 }
212
213 /*
214 * We always allow a release to go through, this means the state has been
215 * successfully changed and the result is available.
216 */
217 @Override
218 protected boolean tryReleaseShared(int finalState) {
219 setState(finalState);
220 return true;
221 }
222
223 /**
224 * Blocks until the task is complete or the timeout expires. Throws a
225 * {@link TimeoutException} if the timer expires, otherwise behaves like
226 * {@link #get()}.
227 */
228 V get(long nanos) throws TimeoutException, CancellationException,
229 ExecutionException, InterruptedException {
230
231 // Attempt to acquire the shared lock with a timeout.
232 if (!tryAcquireSharedNanos(-1, nanos)) {
233 throw new TimeoutException("Timeout waiting for task.");
234 }
235
236 return getValue();
237 }
238
239 /**
240 * Blocks until {@link #complete(Object, Throwable, int)} has been
241 * successfully called. Throws a {@link CancellationException} if the task
242 * was cancelled, or a {@link ExecutionException} if the task completed with
243 * an error.
244 */
245 V get() throws CancellationException, ExecutionException,
246 InterruptedException {
247
248 // Acquire the shared lock allowing interruption.
249 acquireSharedInterruptibly(-1);
250 return getValue();
251 }
252
253 /**
254 * Implementation of the actual value retrieval. Will return the value
255 * on success, an exception on failure, a cancellation on cancellation, or
256 * an illegal state if the synchronizer is in an invalid state.
257 */
258 private V getValue() throws CancellationException, ExecutionException {
259 int state = getState();
260 switch (state) {
261 case COMPLETED:
262 if (exception != null) {
263 throw new ExecutionException(exception);
264 } else {
265 return value;
266 }
267
268 case CANCELLED:
269 throw new CancellationException("Task was cancelled.");
270
271 default:
272 throw new IllegalStateException(
273 "Error, synchronizer in invalid state: " + state);
274 }
275 }
276
277 /**
278 * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
279 */
280 boolean isDone() {
281 return (getState() & (COMPLETED | CANCELLED)) != 0;
282 }
283
284 /**
285 * Checks if the state is {@link #CANCELLED}.
286 */
287 boolean isCancelled() {
288 return getState() == CANCELLED;
289 }
290
291 /**
292 * Transition to the COMPLETED state and set the value.
293 */
294 boolean set(@Nullable V v) {
295 return complete(v, null, COMPLETED);
296 }
297
298 /**
299 * Transition to the COMPLETED state and set the exception.
300 */
301 boolean setException(Throwable t) {
302 return complete(null, t, COMPLETED);
303 }
304
305 /**
306 * Transition to the CANCELLED state.
307 */
308 boolean cancel() {
309 return complete(null, null, CANCELLED);
310 }
311
312 /**
313 * Implementation of completing a task. Either {@code v} or {@code t} will
314 * be set but not both. The {@code finalState} is the state to change to
315 * from {@link #RUNNING}. If the state is not in the RUNNING state we
316 * return {@code false}.
317 *
318 * @param v the value to set as the result of the computation.
319 * @param t the exception to set as the result of the computation.
320 * @param finalState the state to transition to.
321 */
322 private boolean complete(@Nullable V v, Throwable t, int finalState) {
323 if (compareAndSetState(RUNNING, COMPLETING)) {
324 this.value = v;
325 this.exception = t;
326 releaseShared(finalState);
327 return true;
328 }
329
330 // The state was not RUNNING, so there are no valid transitions.
331 return false;
332 }
333 }
334 }