001 /* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkNotNull; 020 021 import java.util.concurrent.CancellationException; 022 import java.util.concurrent.ExecutionException; 023 import java.util.concurrent.Executor; 024 import java.util.concurrent.TimeUnit; 025 import java.util.concurrent.TimeoutException; 026 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 027 028 import javax.annotation.Nullable; 029 030 /** 031 * An abstract implementation of the {@link ListenableFuture} interface. This 032 * class is preferable to {@link java.util.concurrent.FutureTask} for two 033 * reasons: It implements {@code ListenableFuture}, and it does not implement 034 * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code 035 * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your 036 * tasks to a {@link ListeningExecutorService}.) 037 * 038 * <p>This class implements all methods in {@code ListenableFuture}. 039 * Subclasses should provide a way to set the result of the computation through 040 * the protected methods {@link #set(Object)} and 041 * {@link #setException(Throwable)}. Subclasses may also override {@link 042 * #interruptTask()}, which will be invoked automatically if a call to {@link 043 * #cancel(boolean) cancel(true)} succeeds in canceling the future. 044 * 045 * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal 046 * with concurrency issues and guarantee thread safety. 047 * 048 * <p>The state changing methods all return a boolean indicating success or 049 * failure in changing the future's state. Valid states are running, 050 * completed, failed, or cancelled. 051 * 052 * <p>This class uses an {@link ExecutionList} to guarantee that all registered 053 * listeners will be executed, either when the future finishes or, for listeners 054 * that are added after the future completes, immediately. 055 * {@code Runnable}-{@code Executor} pairs are stored in the execution list but 056 * are not necessarily executed in the order in which they were added. (If a 057 * listener is added after the Future is complete, it will be executed 058 * immediately, even if earlier listeners have not been executed. Additionally, 059 * executors need not guarantee FIFO execution, or different listeners may run 060 * in different executors.) 061 * 062 * @author Sven Mawson 063 * @since 1.0 064 */ 065 public abstract class AbstractFuture<V> implements ListenableFuture<V> { 066 067 /** Synchronization control for AbstractFutures. */ 068 private final Sync<V> sync = new Sync<V>(); 069 070 // The execution list to hold our executors. 071 private final ExecutionList executionList = new ExecutionList(); 072 073 /** 074 * Constructor for use by subclasses. 075 */ 076 protected AbstractFuture() {} 077 078 /* 079 * Improve the documentation of when InterruptedException is thrown. Our 080 * behavior matches the JDK's, but the JDK's documentation is misleading. 081 */ 082 /** 083 * {@inheritDoc} 084 * 085 * <p>The default {@link AbstractFuture} implementation throws {@code 086 * InterruptedException} if the current thread is interrupted before or during 087 * the call, even if the value is already available. 088 * 089 * @throws InterruptedException if the current thread was interrupted before 090 * or during the call (optional but recommended). 091 * @throws CancellationException {@inheritDoc} 092 */ 093 @Override 094 public V get(long timeout, TimeUnit unit) throws InterruptedException, 095 TimeoutException, ExecutionException { 096 return sync.get(unit.toNanos(timeout)); 097 } 098 099 /* 100 * Improve the documentation of when InterruptedException is thrown. Our 101 * behavior matches the JDK's, but the JDK's documentation is misleading. 102 */ 103 /** 104 * {@inheritDoc} 105 * 106 * <p>The default {@link AbstractFuture} implementation throws {@code 107 * InterruptedException} if the current thread is interrupted before or during 108 * the call, even if the value is already available. 109 * 110 * @throws InterruptedException if the current thread was interrupted before 111 * or during the call (optional but recommended). 112 * @throws CancellationException {@inheritDoc} 113 */ 114 @Override 115 public V get() throws InterruptedException, ExecutionException { 116 return sync.get(); 117 } 118 119 @Override 120 public boolean isDone() { 121 return sync.isDone(); 122 } 123 124 @Override 125 public boolean isCancelled() { 126 return sync.isCancelled(); 127 } 128 129 @Override 130 public boolean cancel(boolean mayInterruptIfRunning) { 131 if (!sync.cancel()) { 132 return false; 133 } 134 executionList.execute(); 135 if (mayInterruptIfRunning) { 136 interruptTask(); 137 } 138 return true; 139 } 140 141 /** 142 * Subclasses can override this method to implement interruption of the 143 * future's computation. The method is invoked automatically by a successful 144 * call to {@link #cancel(boolean) cancel(true)}. 145 * 146 * <p>The default implementation does nothing. 147 * 148 * @since 10.0 149 */ 150 protected void interruptTask() { 151 } 152 153 /** 154 * {@inheritDoc} 155 * 156 * @since 10.0 157 */ 158 @Override 159 public void addListener(Runnable listener, Executor exec) { 160 executionList.add(listener, exec); 161 } 162 163 /** 164 * Subclasses should invoke this method to set the result of the computation 165 * to {@code value}. This will set the state of the future to 166 * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the 167 * state was successfully changed. 168 * 169 * @param value the value that was the result of the task. 170 * @return true if the state was successfully changed. 171 */ 172 protected boolean set(@Nullable V value) { 173 boolean result = sync.set(value); 174 if (result) { 175 executionList.execute(); 176 } 177 return result; 178 } 179 180 /** 181 * Subclasses should invoke this method to set the result of the computation 182 * to an error, {@code throwable}. This will set the state of the future to 183 * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the 184 * state was successfully changed. 185 * 186 * @param throwable the exception that the task failed with. 187 * @return true if the state was successfully changed. 188 * @throws Error if the throwable was an {@link Error}. 189 */ 190 protected boolean setException(Throwable throwable) { 191 boolean result = sync.setException(checkNotNull(throwable)); 192 if (result) { 193 executionList.execute(); 194 } 195 196 // If it's an Error, we want to make sure it reaches the top of the 197 // call stack, so we rethrow it. 198 if (throwable instanceof Error) { 199 throw (Error) throwable; 200 } 201 return result; 202 } 203 204 /** 205 * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a 206 * private subclass to hold the synchronizer. This synchronizer is used to 207 * implement the blocking and waiting calls as well as to handle state changes 208 * in a thread-safe manner. The current state of the future is held in the 209 * Sync state, and the lock is released whenever the state changes to either 210 * {@link #COMPLETED} or {@link #CANCELLED}. 211 * 212 * <p>To avoid races between threads doing release and acquire, we transition 213 * to the final state in two steps. One thread will successfully CAS from 214 * RUNNING to COMPLETING, that thread will then set the result of the 215 * computation, and only then transition to COMPLETED or CANCELLED. 216 * 217 * <p>We don't use the integer argument passed between acquire methods so we 218 * pass around a -1 everywhere. 219 */ 220 static final class Sync<V> extends AbstractQueuedSynchronizer { 221 222 private static final long serialVersionUID = 0L; 223 224 /* Valid states. */ 225 static final int RUNNING = 0; 226 static final int COMPLETING = 1; 227 static final int COMPLETED = 2; 228 static final int CANCELLED = 4; 229 230 private V value; 231 private Throwable exception; 232 233 /* 234 * Acquisition succeeds if the future is done, otherwise it fails. 235 */ 236 @Override 237 protected int tryAcquireShared(int ignored) { 238 if (isDone()) { 239 return 1; 240 } 241 return -1; 242 } 243 244 /* 245 * We always allow a release to go through, this means the state has been 246 * successfully changed and the result is available. 247 */ 248 @Override 249 protected boolean tryReleaseShared(int finalState) { 250 setState(finalState); 251 return true; 252 } 253 254 /** 255 * Blocks until the task is complete or the timeout expires. Throws a 256 * {@link TimeoutException} if the timer expires, otherwise behaves like 257 * {@link #get()}. 258 */ 259 V get(long nanos) throws TimeoutException, CancellationException, 260 ExecutionException, InterruptedException { 261 262 // Attempt to acquire the shared lock with a timeout. 263 if (!tryAcquireSharedNanos(-1, nanos)) { 264 throw new TimeoutException("Timeout waiting for task."); 265 } 266 267 return getValue(); 268 } 269 270 /** 271 * Blocks until {@link #complete(Object, Throwable, int)} has been 272 * successfully called. Throws a {@link CancellationException} if the task 273 * was cancelled, or a {@link ExecutionException} if the task completed with 274 * an error. 275 */ 276 V get() throws CancellationException, ExecutionException, 277 InterruptedException { 278 279 // Acquire the shared lock allowing interruption. 280 acquireSharedInterruptibly(-1); 281 return getValue(); 282 } 283 284 /** 285 * Implementation of the actual value retrieval. Will return the value 286 * on success, an exception on failure, a cancellation on cancellation, or 287 * an illegal state if the synchronizer is in an invalid state. 288 */ 289 private V getValue() throws CancellationException, ExecutionException { 290 int state = getState(); 291 switch (state) { 292 case COMPLETED: 293 if (exception != null) { 294 throw new ExecutionException(exception); 295 } else { 296 return value; 297 } 298 299 case CANCELLED: 300 throw new CancellationException("Task was cancelled."); 301 302 default: 303 throw new IllegalStateException( 304 "Error, synchronizer in invalid state: " + state); 305 } 306 } 307 308 /** 309 * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}. 310 */ 311 boolean isDone() { 312 return (getState() & (COMPLETED | CANCELLED)) != 0; 313 } 314 315 /** 316 * Checks if the state is {@link #CANCELLED}. 317 */ 318 boolean isCancelled() { 319 return getState() == CANCELLED; 320 } 321 322 /** 323 * Transition to the COMPLETED state and set the value. 324 */ 325 boolean set(@Nullable V v) { 326 return complete(v, null, COMPLETED); 327 } 328 329 /** 330 * Transition to the COMPLETED state and set the exception. 331 */ 332 boolean setException(Throwable t) { 333 return complete(null, t, COMPLETED); 334 } 335 336 /** 337 * Transition to the CANCELLED state. 338 */ 339 boolean cancel() { 340 return complete(null, null, CANCELLED); 341 } 342 343 /** 344 * Implementation of completing a task. Either {@code v} or {@code t} will 345 * be set but not both. The {@code finalState} is the state to change to 346 * from {@link #RUNNING}. If the state is not in the RUNNING state we 347 * return {@code false} after waiting for the state to be set to a valid 348 * final state ({@link #COMPLETED} or {@link #CANCELLED}). 349 * 350 * @param v the value to set as the result of the computation. 351 * @param t the exception to set as the result of the computation. 352 * @param finalState the state to transition to. 353 */ 354 private boolean complete(@Nullable V v, @Nullable Throwable t, 355 int finalState) { 356 boolean doCompletion = compareAndSetState(RUNNING, COMPLETING); 357 if (doCompletion) { 358 // If this thread successfully transitioned to COMPLETING, set the value 359 // and exception and then release to the final state. 360 this.value = v; 361 this.exception = t; 362 releaseShared(finalState); 363 } else if (getState() == COMPLETING) { 364 // If some other thread is currently completing the future, block until 365 // they are done so we can guarantee completion. 366 acquireShared(-1); 367 } 368 return doCompletion; 369 } 370 } 371 }