001 /* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkNotNull; 020 021 import com.google.common.annotations.Beta; 022 023 import java.util.concurrent.CancellationException; 024 import java.util.concurrent.ExecutionException; 025 import java.util.concurrent.Future; 026 import java.util.concurrent.TimeUnit; 027 import java.util.concurrent.TimeoutException; 028 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 029 030 import javax.annotation.Nullable; 031 032 /** 033 * <p>An abstract implementation of the {@link Future} interface. This class 034 * is an abstraction of {@link java.util.concurrent.FutureTask} to support use 035 * for tasks other than {@link Runnable}s. It uses an 036 * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and 037 * guarantee thread safety. It could be used as a base class to 038 * {@code FutureTask}, or any other implementor of the {@code Future} interface. 039 * 040 * <p>This class implements all methods in {@code Future}. Subclasses should 041 * provide a way to set the result of the computation through the protected 042 * methods {@link #set(Object)}, {@link #setException(Throwable)}, or 043 * {@link #cancel()}. If subclasses want to implement cancellation they can 044 * override the {@link #cancel(boolean)} method with a real implementation, the 045 * default implementation doesn't support cancellation. 046 * 047 * <p>The state changing methods all return a boolean indicating success or 048 * failure in changing the future's state. Valid states are running, 049 * completed, failed, or cancelled. Because this class does not implement 050 * cancellation it is left to the subclass to distinguish between created 051 * and running tasks. 052 * 053 * @author Sven Mawson 054 * @since 1 055 */ 056 @Beta 057 public abstract class AbstractFuture<V> implements Future<V> { 058 059 /** Synchronization control for AbstractFutures. */ 060 private final Sync<V> sync = new Sync<V>(); 061 062 /* 063 * Blocks until either the task completes or the timeout expires. Uses the 064 * sync blocking-with-timeout support provided by AQS. 065 */ 066 @Override 067 public V get(long timeout, TimeUnit unit) throws InterruptedException, 068 TimeoutException, ExecutionException { 069 return sync.get(unit.toNanos(timeout)); 070 } 071 072 /* 073 * Blocks until the task completes or we get interrupted. Uses the 074 * interruptible blocking support provided by AQS. 075 */ 076 @Override 077 public V get() throws InterruptedException, ExecutionException { 078 return sync.get(); 079 } 080 081 /* 082 * Checks if the sync is not in the running state. 083 */ 084 @Override 085 public boolean isDone() { 086 return sync.isDone(); 087 } 088 089 /* 090 * Checks if the sync is in the cancelled state. 091 */ 092 @Override 093 public boolean isCancelled() { 094 return sync.isCancelled(); 095 } 096 097 /* 098 * Default implementation of cancel that never cancels the future. 099 * Subclasses should override this to implement cancellation if desired. 100 */ 101 @Override 102 public boolean cancel(boolean mayInterruptIfRunning) { 103 return false; 104 } 105 106 /** 107 * Subclasses should invoke this method to set the result of the computation 108 * to {@code value}. This will set the state of the future to 109 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the 110 * state was successfully changed. 111 * 112 * @param value the value that was the result of the task. 113 * @return true if the state was successfully changed. 114 */ 115 protected boolean set(@Nullable V value) { 116 boolean result = sync.set(value); 117 if (result) { 118 done(); 119 } 120 return result; 121 } 122 123 /** 124 * Subclasses should invoke this method to set the result of the computation 125 * to an error, {@code throwable}. This will set the state of the future to 126 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the 127 * state was successfully changed. 128 * 129 * @param throwable the exception that the task failed with. 130 * @return true if the state was successfully changed. 131 * @throws Error if the throwable was an {@link Error}. 132 */ 133 protected boolean setException(Throwable throwable) { 134 boolean result = sync.setException(checkNotNull(throwable)); 135 if (result) { 136 done(); 137 } 138 139 // If it's an Error, we want to make sure it reaches the top of the 140 // call stack, so we rethrow it. 141 if (throwable instanceof Error) { 142 throw (Error) throwable; 143 } 144 return result; 145 } 146 147 /** 148 * Subclasses should invoke this method to mark the future as cancelled. 149 * This will set the state of the future to {@link 150 * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was 151 * successfully changed. 152 * 153 * @return true if the state was successfully changed. 154 */ 155 protected final boolean cancel() { 156 boolean result = sync.cancel(); 157 if (result) { 158 done(); 159 } 160 return result; 161 } 162 163 /* 164 * Called by the success, failed, or cancelled methods to indicate that the 165 * value is now available and the latch can be released. Subclasses can 166 * use this method to deal with any actions that should be undertaken when 167 * the task has completed. 168 */ 169 protected void done() { 170 // Default implementation does nothing. 171 } 172 173 /** 174 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a 175 * private subclass to hold the synchronizer. This synchronizer is used to 176 * implement the blocking and waiting calls as well as to handle state changes 177 * in a thread-safe manner. The current state of the future is held in the 178 * Sync state, and the lock is released whenever the state changes to either 179 * {@link #COMPLETED} or {@link #CANCELLED}. 180 * 181 * <p>To avoid races between threads doing release and acquire, we transition 182 * to the final state in two steps. One thread will successfully CAS from 183 * RUNNING to COMPLETING, that thread will then set the result of the 184 * computation, and only then transition to COMPLETED or CANCELLED. 185 * 186 * <p>We don't use the integer argument passed between acquire methods so we 187 * pass around a -1 everywhere. 188 */ 189 static final class Sync<V> extends AbstractQueuedSynchronizer { 190 191 private static final long serialVersionUID = 0L; 192 193 /* Valid states. */ 194 static final int RUNNING = 0; 195 static final int COMPLETING = 1; 196 static final int COMPLETED = 2; 197 static final int CANCELLED = 4; 198 199 private V value; 200 private Throwable exception; 201 202 /* 203 * Acquisition succeeds if the future is done, otherwise it fails. 204 */ 205 @Override 206 protected int tryAcquireShared(int ignored) { 207 if (isDone()) { 208 return 1; 209 } 210 return -1; 211 } 212 213 /* 214 * We always allow a release to go through, this means the state has been 215 * successfully changed and the result is available. 216 */ 217 @Override 218 protected boolean tryReleaseShared(int finalState) { 219 setState(finalState); 220 return true; 221 } 222 223 /** 224 * Blocks until the task is complete or the timeout expires. Throws a 225 * {@link TimeoutException} if the timer expires, otherwise behaves like 226 * {@link #get()}. 227 */ 228 V get(long nanos) throws TimeoutException, CancellationException, 229 ExecutionException, InterruptedException { 230 231 // Attempt to acquire the shared lock with a timeout. 232 if (!tryAcquireSharedNanos(-1, nanos)) { 233 throw new TimeoutException("Timeout waiting for task."); 234 } 235 236 return getValue(); 237 } 238 239 /** 240 * Blocks until {@link #complete(Object, Throwable, int)} has been 241 * successfully called. Throws a {@link CancellationException} if the task 242 * was cancelled, or a {@link ExecutionException} if the task completed with 243 * an error. 244 */ 245 V get() throws CancellationException, ExecutionException, 246 InterruptedException { 247 248 // Acquire the shared lock allowing interruption. 249 acquireSharedInterruptibly(-1); 250 return getValue(); 251 } 252 253 /** 254 * Implementation of the actual value retrieval. Will return the value 255 * on success, an exception on failure, a cancellation on cancellation, or 256 * an illegal state if the synchronizer is in an invalid state. 257 */ 258 private V getValue() throws CancellationException, ExecutionException { 259 int state = getState(); 260 switch (state) { 261 case COMPLETED: 262 if (exception != null) { 263 throw new ExecutionException(exception); 264 } else { 265 return value; 266 } 267 268 case CANCELLED: 269 throw new CancellationException("Task was cancelled."); 270 271 default: 272 throw new IllegalStateException( 273 "Error, synchronizer in invalid state: " + state); 274 } 275 } 276 277 /** 278 * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}. 279 */ 280 boolean isDone() { 281 return (getState() & (COMPLETED | CANCELLED)) != 0; 282 } 283 284 /** 285 * Checks if the state is {@link #CANCELLED}. 286 */ 287 boolean isCancelled() { 288 return getState() == CANCELLED; 289 } 290 291 /** 292 * Transition to the COMPLETED state and set the value. 293 */ 294 boolean set(@Nullable V v) { 295 return complete(v, null, COMPLETED); 296 } 297 298 /** 299 * Transition to the COMPLETED state and set the exception. 300 */ 301 boolean setException(Throwable t) { 302 return complete(null, t, COMPLETED); 303 } 304 305 /** 306 * Transition to the CANCELLED state. 307 */ 308 boolean cancel() { 309 return complete(null, null, CANCELLED); 310 } 311 312 /** 313 * Implementation of completing a task. Either {@code v} or {@code t} will 314 * be set but not both. The {@code finalState} is the state to change to 315 * from {@link #RUNNING}. If the state is not in the RUNNING state we 316 * return {@code false}. 317 * 318 * @param v the value to set as the result of the computation. 319 * @param t the exception to set as the result of the computation. 320 * @param finalState the state to transition to. 321 */ 322 private boolean complete(@Nullable V v, Throwable t, int finalState) { 323 if (compareAndSetState(RUNNING, COMPLETING)) { 324 this.value = v; 325 this.exception = t; 326 releaseShared(finalState); 327 return true; 328 } 329 330 // The state was not RUNNING, so there are no valid transitions. 331 return false; 332 } 333 } 334 }