001 /* 002 * Copyright (C) 2007 Google Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkNotNull; 020 021 import com.google.common.annotations.Beta; 022 023 import java.util.concurrent.CancellationException; 024 import java.util.concurrent.ExecutionException; 025 import java.util.concurrent.Future; 026 import java.util.concurrent.TimeUnit; 027 import java.util.concurrent.TimeoutException; 028 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 029 030 import javax.annotation.Nullable; 031 032 /** 033 * <p>An abstract implementation of the {@link Future} interface. This class 034 * is an abstraction of {@link java.util.concurrent.FutureTask} to support use 035 * for tasks other than {@link Runnable}s. It uses an 036 * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and 037 * guarantee thread safety. It could be used as a base class to 038 * {@code FutureTask}, or any other implementor of the {@code Future} interface. 039 * 040 * <p>This class implements all methods in {@code Future}. Subclasses should 041 * provide a way to set the result of the computation through the protected 042 * methods {@link #set(Object)}, {@link #setException(Throwable)}, or 043 * {@link #cancel()}. If subclasses want to implement cancellation they can 044 * override the {@link #cancel(boolean)} method with a real implementation, the 045 * default implementation doesn't support cancellation. 046 * 047 * <p>The state changing methods all return a boolean indicating success or 048 * failure in changing the future's state. Valid states are running, 049 * completed, failed, or cancelled. Because this class does not implement 050 * cancellation it is left to the subclass to distinguish between created 051 * and running tasks. 052 * 053 * @author Sven Mawson 054 * @since 1 055 */ 056 @Beta 057 public abstract class AbstractFuture<V> implements Future<V> { 058 059 /** Synchronization control for AbstractFutures. */ 060 private final Sync<V> sync = new Sync<V>(); 061 062 /* 063 * Blocks until either the task completes or the timeout expires. Uses the 064 * sync blocking-with-timeout support provided by AQS. 065 */ 066 public V get(long timeout, TimeUnit unit) throws InterruptedException, 067 TimeoutException, ExecutionException { 068 return sync.get(unit.toNanos(timeout)); 069 } 070 071 /* 072 * Blocks until the task completes or we get interrupted. Uses the 073 * interruptible blocking support provided by AQS. 074 */ 075 public V get() throws InterruptedException, ExecutionException { 076 return sync.get(); 077 } 078 079 /* 080 * Checks if the sync is not in the running state. 081 */ 082 public boolean isDone() { 083 return sync.isDone(); 084 } 085 086 /* 087 * Checks if the sync is in the cancelled state. 088 */ 089 public boolean isCancelled() { 090 return sync.isCancelled(); 091 } 092 093 /* 094 * Default implementation of cancel that never cancels the future. 095 * Subclasses should override this to implement cancellation if desired. 096 */ 097 public boolean cancel(boolean mayInterruptIfRunning) { 098 return false; 099 } 100 101 /** 102 * Subclasses should invoke this method to set the result of the computation 103 * to {@code value}. This will set the state of the future to 104 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the 105 * state was successfully changed. 106 * 107 * @param value the value that was the result of the task. 108 * @return true if the state was successfully changed. 109 */ 110 protected boolean set(@Nullable V value) { 111 boolean result = sync.set(value); 112 if (result) { 113 done(); 114 } 115 return result; 116 } 117 118 /** 119 * Subclasses should invoke this method to set the result of the computation 120 * to an error, {@code throwable}. This will set the state of the future to 121 * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the 122 * state was successfully changed. 123 * 124 * @param throwable the exception that the task failed with. 125 * @return true if the state was successfully changed. 126 * @throws Error if the throwable was an {@link Error}. 127 */ 128 protected boolean setException(Throwable throwable) { 129 boolean result = sync.setException(checkNotNull(throwable)); 130 if (result) { 131 done(); 132 } 133 134 // If it's an Error, we want to make sure it reaches the top of the 135 // call stack, so we rethrow it. 136 if (throwable instanceof Error) { 137 throw (Error) throwable; 138 } 139 return result; 140 } 141 142 /** 143 * Subclasses should invoke this method to mark the future as cancelled. 144 * This will set the state of the future to {@link 145 * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was 146 * successfully changed. 147 * 148 * @return true if the state was successfully changed. 149 */ 150 protected final boolean cancel() { 151 boolean result = sync.cancel(); 152 if (result) { 153 done(); 154 } 155 return result; 156 } 157 158 /* 159 * Called by the success, failed, or cancelled methods to indicate that the 160 * value is now available and the latch can be released. Subclasses can 161 * use this method to deal with any actions that should be undertaken when 162 * the task has completed. 163 */ 164 protected void done() { 165 // Default implementation does nothing. 166 } 167 168 /** 169 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a 170 * private subclass to hold the synchronizer. This synchronizer is used to 171 * implement the blocking and waiting calls as well as to handle state changes 172 * in a thread-safe manner. The current state of the future is held in the 173 * Sync state, and the lock is released whenever the state changes to either 174 * {@link #COMPLETED} or {@link #CANCELLED}. 175 * 176 * <p>To avoid races between threads doing release and acquire, we transition 177 * to the final state in two steps. One thread will successfully CAS from 178 * RUNNING to COMPLETING, that thread will then set the result of the 179 * computation, and only then transition to COMPLETED or CANCELLED. 180 * 181 * <p>We don't use the integer argument passed between acquire methods so we 182 * pass around a -1 everywhere. 183 */ 184 static final class Sync<V> extends AbstractQueuedSynchronizer { 185 186 private static final long serialVersionUID = 0L; 187 188 /* Valid states. */ 189 static final int RUNNING = 0; 190 static final int COMPLETING = 1; 191 static final int COMPLETED = 2; 192 static final int CANCELLED = 4; 193 194 private V value; 195 private ExecutionException exception; 196 197 /* 198 * Acquisition succeeds if the future is done, otherwise it fails. 199 */ 200 @Override 201 protected int tryAcquireShared(int ignored) { 202 if (isDone()) { 203 return 1; 204 } 205 return -1; 206 } 207 208 /* 209 * We always allow a release to go through, this means the state has been 210 * successfully changed and the result is available. 211 */ 212 @Override 213 protected boolean tryReleaseShared(int finalState) { 214 setState(finalState); 215 return true; 216 } 217 218 /** 219 * Blocks until the task is complete or the timeout expires. Throws a 220 * {@link TimeoutException} if the timer expires, otherwise behaves like 221 * {@link #get()}. 222 */ 223 V get(long nanos) throws TimeoutException, CancellationException, 224 ExecutionException, InterruptedException { 225 226 // Attempt to acquire the shared lock with a timeout. 227 if (!tryAcquireSharedNanos(-1, nanos)) { 228 throw new TimeoutException("Timeout waiting for task."); 229 } 230 231 return getValue(); 232 } 233 234 /** 235 * Blocks until {@link #complete(Object, Throwable, int)} has been 236 * successfully called. Throws a {@link CancellationException} if the task 237 * was cancelled, or a {@link ExecutionException} if the task completed with 238 * an error. 239 */ 240 V get() throws CancellationException, ExecutionException, 241 InterruptedException { 242 243 // Acquire the shared lock allowing interruption. 244 acquireSharedInterruptibly(-1); 245 return getValue(); 246 } 247 248 /** 249 * Implementation of the actual value retrieval. Will return the value 250 * on success, an exception on failure, a cancellation on cancellation, or 251 * an illegal state if the synchronizer is in an invalid state. 252 */ 253 private V getValue() throws CancellationException, ExecutionException { 254 int state = getState(); 255 switch (state) { 256 case COMPLETED: 257 if (exception != null) { 258 throw exception; 259 } else { 260 return value; 261 } 262 263 case CANCELLED: 264 throw new CancellationException("Task was cancelled."); 265 266 default: 267 throw new IllegalStateException( 268 "Error, synchronizer in invalid state: " + state); 269 } 270 } 271 272 /** 273 * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}. 274 */ 275 boolean isDone() { 276 return (getState() & (COMPLETED | CANCELLED)) != 0; 277 } 278 279 /** 280 * Checks if the state is {@link #CANCELLED}. 281 */ 282 boolean isCancelled() { 283 return getState() == CANCELLED; 284 } 285 286 /** 287 * Transition to the COMPLETED state and set the value. 288 */ 289 boolean set(@Nullable V v) { 290 return complete(v, null, COMPLETED); 291 } 292 293 /** 294 * Transition to the COMPLETED state and set the exception. 295 */ 296 boolean setException(Throwable t) { 297 return complete(null, t, COMPLETED); 298 } 299 300 /** 301 * Transition to the CANCELLED state. 302 */ 303 boolean cancel() { 304 return complete(null, null, CANCELLED); 305 } 306 307 /** 308 * Implementation of completing a task. Either {@code v} or {@code t} will 309 * be set but not both. The {@code finalState} is the state to change to 310 * from {@link #RUNNING}. If the state is not in the RUNNING state we 311 * return {@code false}. 312 * 313 * @param v the value to set as the result of the computation. 314 * @param t the exception to set as the result of the computation. 315 * @param finalState the state to transition to. 316 */ 317 private boolean complete(@Nullable V v, Throwable t, int finalState) { 318 if (compareAndSetState(RUNNING, COMPLETING)) { 319 this.value = v; 320 this.exception = t == null ? null : new ExecutionException(t); 321 releaseShared(finalState); 322 return true; 323 } 324 325 // The state was not RUNNING, so there are no valid transitions. 326 return false; 327 } 328 } 329 }