001 /*
002 * Copyright (C) 2007 Google Inc.
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.util.concurrent;
018
019 import com.google.common.annotations.Beta;
020
021 import java.util.concurrent.CancellationException;
022 import java.util.concurrent.ExecutionException;
023 import java.util.concurrent.Future;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.TimeoutException;
026 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
027
028 import javax.annotation.Nullable;
029
030 /**
031 * <p>An abstract implementation of the {@link Future} interface. This class
032 * is an abstraction of {@link java.util.concurrent.FutureTask} to support use
033 * for tasks other than {@link Runnable}s. It uses an
034 * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and
035 * guarantee thread safety. It could be used as a base class to
036 * {@code FutureTask}, or any other implementor of the {@code Future} interface.
037 *
038 * <p>This class implements all methods in {@code Future}. Subclasses should
039 * provide a way to set the result of the computation through the protected
040 * methods {@link #set(Object)}, {@link #setException(Throwable)}, or
041 * {@link #cancel()}. If subclasses want to implement cancellation they can
042 * override the {@link #cancel(boolean)} method with a real implementation, the
043 * default implementation doesn't support cancellation.
044 *
045 * <p>The state changing methods all return a boolean indicating success or
046 * failure in changing the future's state. Valid states are running,
047 * completed, failed, or cancelled. Because this class does not implement
048 * cancellation it is left to the subclass to distinguish between created
049 * and running tasks.
050 *
051 * @author Sven Mawson
052 * @since 1
053 */
054 @Beta
055 public abstract class AbstractFuture<V> implements Future<V> {
056
057 /** Synchronization control for AbstractFutures. */
058 private final Sync<V> sync = new Sync<V>();
059
060 /*
061 * Blocks until either the task completes or the timeout expires. Uses the
062 * sync blocking-with-timeout support provided by AQS.
063 */
064 public V get(long timeout, TimeUnit unit) throws InterruptedException,
065 TimeoutException, ExecutionException {
066 return sync.get(unit.toNanos(timeout));
067 }
068
069 /*
070 * Blocks until the task completes or we get interrupted. Uses the
071 * interruptible blocking support provided by AQS.
072 */
073 public V get() throws InterruptedException, ExecutionException {
074 return sync.get();
075 }
076
077 /*
078 * Checks if the sync is not in the running state.
079 */
080 public boolean isDone() {
081 return sync.isDone();
082 }
083
084 /*
085 * Checks if the sync is in the cancelled state.
086 */
087 public boolean isCancelled() {
088 return sync.isCancelled();
089 }
090
091 /*
092 * Default implementation of cancel that never cancels the future.
093 * Subclasses should override this to implement cancellation if desired.
094 */
095 public boolean cancel(boolean mayInterruptIfRunning) {
096 return false;
097 }
098
099 /**
100 * Subclasses should invoke this method to set the result of the computation
101 * to {@code value}. This will set the state of the future to
102 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
103 * state was successfully changed.
104 *
105 * @param value the value that was the result of the task.
106 * @return true if the state was successfully changed.
107 */
108 protected boolean set(@Nullable V value) {
109 boolean result = sync.set(value);
110 if (result) {
111 done();
112 }
113 return result;
114 }
115
116 /**
117 * Subclasses should invoke this method to set the result of the computation
118 * to an error, {@code throwable}. This will set the state of the future to
119 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
120 * state was successfully changed.
121 *
122 * @param throwable the exception that the task failed with.
123 * @return true if the state was successfully changed.
124 * @throws Error if the throwable was an {@link Error}.
125 */
126 protected boolean setException(Throwable throwable) {
127 boolean result = sync.setException(throwable);
128 if (result) {
129 done();
130 }
131
132 // If it's an Error, we want to make sure it reaches the top of the
133 // call stack, so we rethrow it.
134 if (throwable instanceof Error) {
135 throw (Error) throwable;
136 }
137 return result;
138 }
139
140 /**
141 * Subclasses should invoke this method to mark the future as cancelled.
142 * This will set the state of the future to {@link
143 * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
144 * successfully changed.
145 *
146 * @return true if the state was successfully changed.
147 */
148 protected final boolean cancel() {
149 boolean result = sync.cancel();
150 if (result) {
151 done();
152 }
153 return result;
154 }
155
156 /*
157 * Called by the success, failed, or cancelled methods to indicate that the
158 * value is now available and the latch can be released. Subclasses can
159 * use this method to deal with any actions that should be undertaken when
160 * the task has completed.
161 */
162 protected void done() {
163 // Default implementation does nothing.
164 }
165
166 /**
167 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
168 * private subclass to hold the synchronizer. This synchronizer is used to
169 * implement the blocking and waiting calls as well as to handle state changes
170 * in a thread-safe manner. The current state of the future is held in the
171 * Sync state, and the lock is released whenever the state changes to either
172 * {@link #COMPLETED} or {@link #CANCELLED}.
173 *
174 * <p>To avoid races between threads doing release and acquire, we transition
175 * to the final state in two steps. One thread will successfully CAS from
176 * RUNNING to COMPLETING, that thread will then set the result of the
177 * computation, and only then transition to COMPLETED or CANCELLED.
178 *
179 * <p>We don't use the integer argument passed between acquire methods so we
180 * pass around a -1 everywhere.
181 */
182 static final class Sync<V> extends AbstractQueuedSynchronizer {
183
184 private static final long serialVersionUID = 0L;
185
186 /* Valid states. */
187 static final int RUNNING = 0;
188 static final int COMPLETING = 1;
189 static final int COMPLETED = 2;
190 static final int CANCELLED = 4;
191
192 private V value;
193 private ExecutionException exception;
194
195 /*
196 * Acquisition succeeds if the future is done, otherwise it fails.
197 */
198 @Override
199 protected int tryAcquireShared(int ignored) {
200 if (isDone()) {
201 return 1;
202 }
203 return -1;
204 }
205
206 /*
207 * We always allow a release to go through, this means the state has been
208 * successfully changed and the result is available.
209 */
210 @Override
211 protected boolean tryReleaseShared(int finalState) {
212 setState(finalState);
213 return true;
214 }
215
216 /**
217 * Blocks until the task is complete or the timeout expires. Throws a
218 * {@link TimeoutException} if the timer expires, otherwise behaves like
219 * {@link #get()}.
220 */
221 V get(long nanos) throws TimeoutException, CancellationException,
222 ExecutionException, InterruptedException {
223
224 // Attempt to acquire the shared lock with a timeout.
225 if (!tryAcquireSharedNanos(-1, nanos)) {
226 throw new TimeoutException("Timeout waiting for task.");
227 }
228
229 return getValue();
230 }
231
232 /**
233 * Blocks until {@link #complete(Object, Throwable, int)} has been
234 * successfully called. Throws a {@link CancellationException} if the task
235 * was cancelled, or a {@link ExecutionException} if the task completed with
236 * an error.
237 */
238 V get() throws CancellationException, ExecutionException,
239 InterruptedException {
240
241 // Acquire the shared lock allowing interruption.
242 acquireSharedInterruptibly(-1);
243 return getValue();
244 }
245
246 /**
247 * Implementation of the actual value retrieval. Will return the value
248 * on success, an exception on failure, a cancellation on cancellation, or
249 * an illegal state if the synchronizer is in an invalid state.
250 */
251 private V getValue() throws CancellationException, ExecutionException {
252 int state = getState();
253 switch (state) {
254 case COMPLETED:
255 if (exception != null) {
256 throw exception;
257 } else {
258 return value;
259 }
260
261 case CANCELLED:
262 throw new CancellationException("Task was cancelled.");
263
264 default:
265 throw new IllegalStateException(
266 "Error, synchronizer in invalid state: " + state);
267 }
268 }
269
270 /**
271 * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
272 */
273 boolean isDone() {
274 return (getState() & (COMPLETED | CANCELLED)) != 0;
275 }
276
277 /**
278 * Checks if the state is {@link #CANCELLED}.
279 */
280 boolean isCancelled() {
281 return getState() == CANCELLED;
282 }
283
284 /**
285 * Transition to the COMPLETED state and set the value.
286 */
287 boolean set(V v) {
288 return complete(v, null, COMPLETED);
289 }
290
291 /**
292 * Transition to the COMPLETED state and set the exception.
293 */
294 boolean setException(Throwable t) {
295 return complete(null, t, COMPLETED);
296 }
297
298 /**
299 * Transition to the CANCELLED state.
300 */
301 boolean cancel() {
302 return complete(null, null, CANCELLED);
303 }
304
305 /**
306 * Implementation of completing a task. Either {@code v} or {@code t} will
307 * be set but not both. The {@code finalState} is the state to change to
308 * from {@link #RUNNING}. If the state is not in the RUNNING state we
309 * return {@code false}.
310 *
311 * @param v the value to set as the result of the computation.
312 * @param t the exception to set as the result of the computation.
313 * @param finalState the state to transition to.
314 */
315 private boolean complete(V v, Throwable t, int finalState) {
316 if (compareAndSetState(RUNNING, COMPLETING)) {
317 this.value = v;
318 this.exception = t == null ? null : new ExecutionException(t);
319 releaseShared(finalState);
320 return true;
321 }
322
323 // The state was not RUNNING, so there are no valid transitions.
324 return false;
325 }
326 }
327 }