001 /* 002 * Copyright (C) 2007 Google Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import com.google.common.annotations.Beta; 020 021 import java.util.concurrent.CancellationException; 022 import java.util.concurrent.ExecutionException; 023 import java.util.concurrent.Future; 024 import java.util.concurrent.TimeUnit; 025 import java.util.concurrent.TimeoutException; 026 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 027 028 import javax.annotation.Nullable; 029 030 /** 031 * <p>An abstract implementation of the {@link Future} interface. This class 032 * is an abstraction of {@link java.util.concurrent.FutureTask} to support use 033 * for tasks other than {@link Runnable}s. It uses an 034 * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and 035 * guarantee thread safety. It could be used as a base class to 036 * {@code FutureTask}, or any other implementor of the {@code Future} interface. 037 * 038 * <p>This class implements all methods in {@code Future}. Subclasses should 039 * provide a way to set the result of the computation through the protected 040 * methods {@link #set(Object)}, {@link #setException(Throwable)}, or 041 * {@link #cancel()}. If subclasses want to implement cancellation they can 042 * override the {@link #cancel(boolean)} method with a real implementation, the 043 * default implementation doesn't support cancellation. 044 * 045 * <p>The state changing methods all return a boolean indicating success or 046 * failure in changing the future's state. Valid states are running, 047 * completed, failed, or cancelled. Because this class does not implement 048 * cancellation it is left to the subclass to distinguish between created 049 * and running tasks. 050 * 051 * @author Sven Mawson 052 * @since 1 053 */ 054 @Beta 055 public abstract class AbstractFuture<V> implements Future<V> { 056 057 /** Synchronization control for AbstractFutures. */ 058 private final Sync<V> sync = new Sync<V>(); 059 060 /* 061 * Blocks until either the task completes or the timeout expires. Uses the 062 * sync blocking-with-timeout support provided by AQS. 063 */ 064 public V get(long timeout, TimeUnit unit) throws InterruptedException, 065 TimeoutException, ExecutionException { 066 return sync.get(unit.toNanos(timeout)); 067 } 068 069 /* 070 * Blocks until the task completes or we get interrupted. Uses the 071 * interruptible blocking support provided by AQS. 072 */ 073 public V get() throws InterruptedException, ExecutionException { 074 return sync.get(); 075 } 076 077 /* 078 * Checks if the sync is not in the running state. 079 */ 080 public boolean isDone() { 081 return sync.isDone(); 082 } 083 084 /* 085 * Checks if the sync is in the cancelled state. 086 */ 087 public boolean isCancelled() { 088 return sync.isCancelled(); 089 } 090 091 /* 092 * Default implementation of cancel that never cancels the future. 093 * Subclasses should override this to implement cancellation if desired. 094 */ 095 public boolean cancel(boolean mayInterruptIfRunning) { 096 return false; 097 } 098 099 /** 100 * Subclasses should invoke this method to set the result of the computation 101 * to {@code value}. This will set the state of the future to 102 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the 103 * state was successfully changed. 104 * 105 * @param value the value that was the result of the task. 106 * @return true if the state was successfully changed. 107 */ 108 protected boolean set(@Nullable V value) { 109 boolean result = sync.set(value); 110 if (result) { 111 done(); 112 } 113 return result; 114 } 115 116 /** 117 * Subclasses should invoke this method to set the result of the computation 118 * to an error, {@code throwable}. This will set the state of the future to 119 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the 120 * state was successfully changed. 121 * 122 * @param throwable the exception that the task failed with. 123 * @return true if the state was successfully changed. 124 * @throws Error if the throwable was an {@link Error}. 125 */ 126 protected boolean setException(Throwable throwable) { 127 boolean result = sync.setException(throwable); 128 if (result) { 129 done(); 130 } 131 132 // If it's an Error, we want to make sure it reaches the top of the 133 // call stack, so we rethrow it. 134 if (throwable instanceof Error) { 135 throw (Error) throwable; 136 } 137 return result; 138 } 139 140 /** 141 * Subclasses should invoke this method to mark the future as cancelled. 142 * This will set the state of the future to {@link 143 * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was 144 * successfully changed. 145 * 146 * @return true if the state was successfully changed. 147 */ 148 protected final boolean cancel() { 149 boolean result = sync.cancel(); 150 if (result) { 151 done(); 152 } 153 return result; 154 } 155 156 /* 157 * Called by the success, failed, or cancelled methods to indicate that the 158 * value is now available and the latch can be released. Subclasses can 159 * use this method to deal with any actions that should be undertaken when 160 * the task has completed. 161 */ 162 protected void done() { 163 // Default implementation does nothing. 164 } 165 166 /** 167 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a 168 * private subclass to hold the synchronizer. This synchronizer is used to 169 * implement the blocking and waiting calls as well as to handle state changes 170 * in a thread-safe manner. The current state of the future is held in the 171 * Sync state, and the lock is released whenever the state changes to either 172 * {@link #COMPLETED} or {@link #CANCELLED}. 173 * 174 * <p>To avoid races between threads doing release and acquire, we transition 175 * to the final state in two steps. One thread will successfully CAS from 176 * RUNNING to COMPLETING, that thread will then set the result of the 177 * computation, and only then transition to COMPLETED or CANCELLED. 178 * 179 * <p>We don't use the integer argument passed between acquire methods so we 180 * pass around a -1 everywhere. 181 */ 182 static final class Sync<V> extends AbstractQueuedSynchronizer { 183 184 private static final long serialVersionUID = 0L; 185 186 /* Valid states. */ 187 static final int RUNNING = 0; 188 static final int COMPLETING = 1; 189 static final int COMPLETED = 2; 190 static final int CANCELLED = 4; 191 192 private V value; 193 private ExecutionException exception; 194 195 /* 196 * Acquisition succeeds if the future is done, otherwise it fails. 197 */ 198 @Override 199 protected int tryAcquireShared(int ignored) { 200 if (isDone()) { 201 return 1; 202 } 203 return -1; 204 } 205 206 /* 207 * We always allow a release to go through, this means the state has been 208 * successfully changed and the result is available. 209 */ 210 @Override 211 protected boolean tryReleaseShared(int finalState) { 212 setState(finalState); 213 return true; 214 } 215 216 /** 217 * Blocks until the task is complete or the timeout expires. Throws a 218 * {@link TimeoutException} if the timer expires, otherwise behaves like 219 * {@link #get()}. 220 */ 221 V get(long nanos) throws TimeoutException, CancellationException, 222 ExecutionException, InterruptedException { 223 224 // Attempt to acquire the shared lock with a timeout. 225 if (!tryAcquireSharedNanos(-1, nanos)) { 226 throw new TimeoutException("Timeout waiting for task."); 227 } 228 229 return getValue(); 230 } 231 232 /** 233 * Blocks until {@link #complete(Object, Throwable, int)} has been 234 * successfully called. Throws a {@link CancellationException} if the task 235 * was cancelled, or a {@link ExecutionException} if the task completed with 236 * an error. 237 */ 238 V get() throws CancellationException, ExecutionException, 239 InterruptedException { 240 241 // Acquire the shared lock allowing interruption. 242 acquireSharedInterruptibly(-1); 243 return getValue(); 244 } 245 246 /** 247 * Implementation of the actual value retrieval. Will return the value 248 * on success, an exception on failure, a cancellation on cancellation, or 249 * an illegal state if the synchronizer is in an invalid state. 250 */ 251 private V getValue() throws CancellationException, ExecutionException { 252 int state = getState(); 253 switch (state) { 254 case COMPLETED: 255 if (exception != null) { 256 throw exception; 257 } else { 258 return value; 259 } 260 261 case CANCELLED: 262 throw new CancellationException("Task was cancelled."); 263 264 default: 265 throw new IllegalStateException( 266 "Error, synchronizer in invalid state: " + state); 267 } 268 } 269 270 /** 271 * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}. 272 */ 273 boolean isDone() { 274 return (getState() & (COMPLETED | CANCELLED)) != 0; 275 } 276 277 /** 278 * Checks if the state is {@link #CANCELLED}. 279 */ 280 boolean isCancelled() { 281 return getState() == CANCELLED; 282 } 283 284 /** 285 * Transition to the COMPLETED state and set the value. 286 */ 287 boolean set(V v) { 288 return complete(v, null, COMPLETED); 289 } 290 291 /** 292 * Transition to the COMPLETED state and set the exception. 293 */ 294 boolean setException(Throwable t) { 295 return complete(null, t, COMPLETED); 296 } 297 298 /** 299 * Transition to the CANCELLED state. 300 */ 301 boolean cancel() { 302 return complete(null, null, CANCELLED); 303 } 304 305 /** 306 * Implementation of completing a task. Either {@code v} or {@code t} will 307 * be set but not both. The {@code finalState} is the state to change to 308 * from {@link #RUNNING}. If the state is not in the RUNNING state we 309 * return {@code false}. 310 * 311 * @param v the value to set as the result of the computation. 312 * @param t the exception to set as the result of the computation. 313 * @param finalState the state to transition to. 314 */ 315 private boolean complete(V v, Throwable t, int finalState) { 316 if (compareAndSetState(RUNNING, COMPLETING)) { 317 this.value = v; 318 this.exception = t == null ? null : new ExecutionException(t); 319 releaseShared(finalState); 320 return true; 321 } 322 323 // The state was not RUNNING, so there are no valid transitions. 324 return false; 325 } 326 } 327 }