001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020
021import java.util.concurrent.CancellationException;
022import java.util.concurrent.ExecutionException;
023import java.util.concurrent.Executor;
024import java.util.concurrent.TimeUnit;
025import java.util.concurrent.TimeoutException;
026import java.util.concurrent.locks.AbstractQueuedSynchronizer;
027
028import javax.annotation.Nullable;
029
030/**
031 * An abstract implementation of the {@link ListenableFuture} interface. This
032 * class is preferable to {@link java.util.concurrent.FutureTask} for two
033 * reasons: It implements {@code ListenableFuture}, and it does not implement
034 * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code
035 * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your
036 * tasks to a {@link ListeningExecutorService}.)
037 *
038 * <p>This class implements all methods in {@code ListenableFuture}.
039 * Subclasses should provide a way to set the result of the computation through
040 * the protected methods {@link #set(Object)} and
041 * {@link #setException(Throwable)}. Subclasses may also override {@link
042 * #interruptTask()}, which will be invoked automatically if a call to {@link
043 * #cancel(boolean) cancel(true)} succeeds in canceling the future.
044 *
045 * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal
046 * with concurrency issues and guarantee thread safety.
047 *
048 * <p>The state changing methods all return a boolean indicating success or
049 * failure in changing the future's state.  Valid states are running,
050 * completed, failed, or cancelled.
051 *
052 * <p>This class uses an {@link ExecutionList} to guarantee that all registered
053 * listeners will be executed, either when the future finishes or, for listeners
054 * that are added after the future completes, immediately.
055 * {@code Runnable}-{@code Executor} pairs are stored in the execution list but
056 * are not necessarily executed in the order in which they were added.  (If a
057 * listener is added after the Future is complete, it will be executed
058 * immediately, even if earlier listeners have not been executed. Additionally,
059 * executors need not guarantee FIFO execution, or different listeners may run
060 * in different executors.)
061 *
062 * @author Sven Mawson
063 * @since 1.0
064 */
065public abstract class AbstractFuture<V> implements ListenableFuture<V> {
066
067  /** Synchronization control for AbstractFutures. */
068  private final Sync<V> sync = new Sync<V>();
069
070  // The execution list to hold our executors.
071  private final ExecutionList executionList = new ExecutionList();
072
073  /**
074   * Constructor for use by subclasses.
075   */
076  protected AbstractFuture() {}
077
078  /*
079   * Improve the documentation of when InterruptedException is thrown. Our
080   * behavior matches the JDK's, but the JDK's documentation is misleading.
081   */
082  /**
083   * {@inheritDoc}
084   *
085   * <p>The default {@link AbstractFuture} implementation throws {@code
086   * InterruptedException} if the current thread is interrupted before or during
087   * the call, even if the value is already available.
088   *
089   * @throws InterruptedException if the current thread was interrupted before
090   *     or during the call (optional but recommended).
091   * @throws CancellationException {@inheritDoc}
092   */
093  @Override
094  public V get(long timeout, TimeUnit unit) throws InterruptedException,
095      TimeoutException, ExecutionException {
096    return sync.get(unit.toNanos(timeout));
097  }
098
099  /*
100   * Improve the documentation of when InterruptedException is thrown. Our
101   * behavior matches the JDK's, but the JDK's documentation is misleading.
102   */
103  /**
104   * {@inheritDoc}
105   *
106   * <p>The default {@link AbstractFuture} implementation throws {@code
107   * InterruptedException} if the current thread is interrupted before or during
108   * the call, even if the value is already available.
109   *
110   * @throws InterruptedException if the current thread was interrupted before
111   *     or during the call (optional but recommended).
112   * @throws CancellationException {@inheritDoc}
113   */
114  @Override
115  public V get() throws InterruptedException, ExecutionException {
116    return sync.get();
117  }
118
119  @Override
120  public boolean isDone() {
121    return sync.isDone();
122  }
123
124  @Override
125  public boolean isCancelled() {
126    return sync.isCancelled();
127  }
128
129  @Override
130  public boolean cancel(boolean mayInterruptIfRunning) {
131    if (!sync.cancel(mayInterruptIfRunning)) {
132      return false;
133    }
134    executionList.execute();
135    if (mayInterruptIfRunning) {
136      interruptTask();
137    }
138    return true;
139  }
140
141  /**
142   * Subclasses can override this method to implement interruption of the
143   * future's computation. The method is invoked automatically by a successful
144   * call to {@link #cancel(boolean) cancel(true)}.
145   *
146   * <p>The default implementation does nothing.
147   *
148   * @since 10.0
149   */
150  protected void interruptTask() {
151  }
152
153  /**
154   * Returns true if this future was cancelled with {@code
155   * mayInterruptIfRunning} set to {@code true}.
156   *
157   * @since 14.0
158   */
159  protected final boolean wasInterrupted() {
160    return sync.wasInterrupted();
161  }
162
163  /**
164   * {@inheritDoc}
165   *
166   * @since 10.0
167   */
168  @Override
169  public void addListener(Runnable listener, Executor exec) {
170    executionList.add(listener, exec);
171  }
172
173  /**
174   * Subclasses should invoke this method to set the result of the computation
175   * to {@code value}.  This will set the state of the future to
176   * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
177   * state was successfully changed.
178   *
179   * @param value the value that was the result of the task.
180   * @return true if the state was successfully changed.
181   */
182  protected boolean set(@Nullable V value) {
183    boolean result = sync.set(value);
184    if (result) {
185      executionList.execute();
186    }
187    return result;
188  }
189
190  /**
191   * Subclasses should invoke this method to set the result of the computation
192   * to an error, {@code throwable}.  This will set the state of the future to
193   * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
194   * state was successfully changed.
195   *
196   * @param throwable the exception that the task failed with.
197   * @return true if the state was successfully changed.
198   * @throws Error if the throwable was an {@link Error}.
199   */
200  protected boolean setException(Throwable throwable) {
201    boolean result = sync.setException(checkNotNull(throwable));
202    if (result) {
203      executionList.execute();
204    }
205
206    // If it's an Error, we want to make sure it reaches the top of the
207    // call stack, so we rethrow it.
208    if (throwable instanceof Error) {
209      throw (Error) throwable;
210    }
211    return result;
212  }
213
214  /**
215   * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
216   * private subclass to hold the synchronizer.  This synchronizer is used to
217   * implement the blocking and waiting calls as well as to handle state changes
218   * in a thread-safe manner.  The current state of the future is held in the
219   * Sync state, and the lock is released whenever the state changes to
220   * {@link #COMPLETED}, {@link #CANCELLED}, or {@link #INTERRUPTED}
221   *
222   * <p>To avoid races between threads doing release and acquire, we transition
223   * to the final state in two steps.  One thread will successfully CAS from
224   * RUNNING to COMPLETING, that thread will then set the result of the
225   * computation, and only then transition to COMPLETED, CANCELLED, or
226   * INTERRUPTED.
227   *
228   * <p>We don't use the integer argument passed between acquire methods so we
229   * pass around a -1 everywhere.
230   */
231  static final class Sync<V> extends AbstractQueuedSynchronizer {
232
233    private static final long serialVersionUID = 0L;
234
235    /* Valid states. */
236    static final int RUNNING = 0;
237    static final int COMPLETING = 1;
238    static final int COMPLETED = 2;
239    static final int CANCELLED = 4;
240    static final int INTERRUPTED = 8;
241
242    private V value;
243    private Throwable exception;
244
245    /*
246     * Acquisition succeeds if the future is done, otherwise it fails.
247     */
248    @Override
249    protected int tryAcquireShared(int ignored) {
250      if (isDone()) {
251        return 1;
252      }
253      return -1;
254    }
255
256    /*
257     * We always allow a release to go through, this means the state has been
258     * successfully changed and the result is available.
259     */
260    @Override
261    protected boolean tryReleaseShared(int finalState) {
262      setState(finalState);
263      return true;
264    }
265
266    /**
267     * Blocks until the task is complete or the timeout expires.  Throws a
268     * {@link TimeoutException} if the timer expires, otherwise behaves like
269     * {@link #get()}.
270     */
271    V get(long nanos) throws TimeoutException, CancellationException,
272        ExecutionException, InterruptedException {
273
274      // Attempt to acquire the shared lock with a timeout.
275      if (!tryAcquireSharedNanos(-1, nanos)) {
276        throw new TimeoutException("Timeout waiting for task.");
277      }
278
279      return getValue();
280    }
281
282    /**
283     * Blocks until {@link #complete(Object, Throwable, int)} has been
284     * successfully called.  Throws a {@link CancellationException} if the task
285     * was cancelled, or a {@link ExecutionException} if the task completed with
286     * an error.
287     */
288    V get() throws CancellationException, ExecutionException,
289        InterruptedException {
290
291      // Acquire the shared lock allowing interruption.
292      acquireSharedInterruptibly(-1);
293      return getValue();
294    }
295
296    /**
297     * Implementation of the actual value retrieval.  Will return the value
298     * on success, an exception on failure, a cancellation on cancellation, or
299     * an illegal state if the synchronizer is in an invalid state.
300     */
301    private V getValue() throws CancellationException, ExecutionException {
302      int state = getState();
303      switch (state) {
304        case COMPLETED:
305          if (exception != null) {
306            throw new ExecutionException(exception);
307          } else {
308            return value;
309          }
310
311        case CANCELLED:
312        case INTERRUPTED:
313          throw cancellationExceptionWithCause(
314              "Task was cancelled.", exception);
315
316        default:
317          throw new IllegalStateException(
318              "Error, synchronizer in invalid state: " + state);
319      }
320    }
321
322    /**
323     * Checks if the state is {@link #COMPLETED}, {@link #CANCELLED}, or {@link
324     * INTERRUPTED}.
325     */
326    boolean isDone() {
327      return (getState() & (COMPLETED | CANCELLED | INTERRUPTED)) != 0;
328    }
329
330    /**
331     * Checks if the state is {@link #CANCELLED} or {@link #INTERRUPTED}.
332     */
333    boolean isCancelled() {
334      return (getState() & (CANCELLED | INTERRUPTED)) != 0;
335    }
336
337    /**
338     * Checks if the state is {@link #INTERRUPTED}.
339     */
340    boolean wasInterrupted() {
341      return getState() == INTERRUPTED;
342    }
343
344    /**
345     * Transition to the COMPLETED state and set the value.
346     */
347    boolean set(@Nullable V v) {
348      return complete(v, null, COMPLETED);
349    }
350
351    /**
352     * Transition to the COMPLETED state and set the exception.
353     */
354    boolean setException(Throwable t) {
355      return complete(null, t, COMPLETED);
356    }
357
358    /**
359     * Transition to the CANCELLED or INTERRUPTED state.
360     */
361    boolean cancel(boolean interrupt) {
362      return complete(null, null, interrupt ? INTERRUPTED : CANCELLED);
363    }
364
365    /**
366     * Implementation of completing a task.  Either {@code v} or {@code t} will
367     * be set but not both.  The {@code finalState} is the state to change to
368     * from {@link #RUNNING}.  If the state is not in the RUNNING state we
369     * return {@code false} after waiting for the state to be set to a valid
370     * final state ({@link #COMPLETED}, {@link #CANCELLED}, or {@link
371     * #INTERRUPTED}).
372     *
373     * @param v the value to set as the result of the computation.
374     * @param t the exception to set as the result of the computation.
375     * @param finalState the state to transition to.
376     */
377    private boolean complete(@Nullable V v, @Nullable Throwable t,
378        int finalState) {
379      boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
380      if (doCompletion) {
381        // If this thread successfully transitioned to COMPLETING, set the value
382        // and exception and then release to the final state.
383        this.value = v;
384        // Don't actually construct a CancellationException until necessary.
385        this.exception = ((finalState & (CANCELLED | INTERRUPTED)) != 0)
386            ? new CancellationException("Future.cancel() was called.") : t;
387        releaseShared(finalState);
388      } else if (getState() == COMPLETING) {
389        // If some other thread is currently completing the future, block until
390        // they are done so we can guarantee completion.
391        acquireShared(-1);
392      }
393      return doCompletion;
394    }
395  }
396
397  static final CancellationException cancellationExceptionWithCause(
398      @Nullable String message, @Nullable Throwable cause) {
399    CancellationException exception = new CancellationException(message);
400    exception.initCause(cause);
401    return exception;
402  }
403}