001    /*
002     * Copyright (C) 2007 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    
021    import java.util.concurrent.CancellationException;
022    import java.util.concurrent.ExecutionException;
023    import java.util.concurrent.Executor;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.TimeoutException;
026    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
027    
028    import javax.annotation.Nullable;
029    
030    /**
031     * An abstract implementation of the {@link ListenableFuture} interface. This
032     * class is preferable to {@link java.util.concurrent.FutureTask} for two
033     * reasons: It implements {@code ListenableFuture}, and it does not implement
034     * {@code Runnable}. (If you want a {@code Runnable} implementation of {@code
035     * ListenableFuture}, create a {@link ListenableFutureTask}, or submit your
036     * tasks to a {@link ListeningExecutorService}.)
037     *
038     * <p>This class implements all methods in {@code ListenableFuture}.
039     * Subclasses should provide a way to set the result of the computation through
040     * the protected methods {@link #set(Object)} and
041     * {@link #setException(Throwable)}. Subclasses may also override {@link
042     * #interruptTask()}, which will be invoked automatically if a call to {@link
043     * #cancel(boolean) cancel(true)} succeeds in canceling the future.
044     *
045     * <p>{@code AbstractFuture} uses an {@link AbstractQueuedSynchronizer} to deal
046     * with concurrency issues and guarantee thread safety.
047     *
048     * <p>The state changing methods all return a boolean indicating success or
049     * failure in changing the future's state.  Valid states are running,
050     * completed, failed, or cancelled.
051     *
052     * <p>This class uses an {@link ExecutionList} to guarantee that all registered
053     * listeners will be executed, either when the future finishes or, for listeners
054     * that are added after the future completes, immediately.
055     * {@code Runnable}-{@code Executor} pairs are stored in the execution list but
056     * are not necessarily executed in the order in which they were added.  (If a
057     * listener is added after the Future is complete, it will be executed
058     * immediately, even if earlier listeners have not been executed. Additionally,
059     * executors need not guarantee FIFO execution, or different listeners may run
060     * in different executors.)
061     *
062     * @author Sven Mawson
063     * @since 1.0
064     */
065    public abstract class AbstractFuture<V> implements ListenableFuture<V> {
066    
067      /** Synchronization control for AbstractFutures. */
068      private final Sync<V> sync = new Sync<V>();
069    
070      // The execution list to hold our executors.
071      private final ExecutionList executionList = new ExecutionList();
072    
073      /**
074       * Constructor for use by subclasses.
075       */
076      protected AbstractFuture() {}
077    
078      /*
079       * Improve the documentation of when InterruptedException is thrown. Our
080       * behavior matches the JDK's, but the JDK's documentation is misleading.
081       */
082      /**
083       * {@inheritDoc}
084       *
085       * <p>The default {@link AbstractFuture} implementation throws {@code
086       * InterruptedException} if the current thread is interrupted before or during
087       * the call, even if the value is already available.
088       *
089       * @throws InterruptedException if the current thread was interrupted before
090       *     or during the call (optional but recommended).
091       * @throws CancellationException {@inheritDoc}
092       */
093      @Override
094      public V get(long timeout, TimeUnit unit) throws InterruptedException,
095          TimeoutException, ExecutionException {
096        return sync.get(unit.toNanos(timeout));
097      }
098    
099      /*
100       * Improve the documentation of when InterruptedException is thrown. Our
101       * behavior matches the JDK's, but the JDK's documentation is misleading.
102       */
103      /**
104       * {@inheritDoc}
105       *
106       * <p>The default {@link AbstractFuture} implementation throws {@code
107       * InterruptedException} if the current thread is interrupted before or during
108       * the call, even if the value is already available.
109       *
110       * @throws InterruptedException if the current thread was interrupted before
111       *     or during the call (optional but recommended).
112       * @throws CancellationException {@inheritDoc}
113       */
114      @Override
115      public V get() throws InterruptedException, ExecutionException {
116        return sync.get();
117      }
118    
119      @Override
120      public boolean isDone() {
121        return sync.isDone();
122      }
123    
124      @Override
125      public boolean isCancelled() {
126        return sync.isCancelled();
127      }
128    
129      @Override
130      public boolean cancel(boolean mayInterruptIfRunning) {
131        if (!sync.cancel()) {
132          return false;
133        }
134        executionList.execute();
135        if (mayInterruptIfRunning) {
136          interruptTask();
137        }
138        return true;
139      }
140    
141      /**
142       * Subclasses can override this method to implement interruption of the
143       * future's computation. The method is invoked automatically by a successful
144       * call to {@link #cancel(boolean) cancel(true)}.
145       *
146       * <p>The default implementation does nothing.
147       *
148       * @since 10.0
149       */
150      protected void interruptTask() {
151      }
152    
153      /**
154       * {@inheritDoc}
155       *
156       * @since 10.0
157       */
158      @Override
159      public void addListener(Runnable listener, Executor exec) {
160        executionList.add(listener, exec);
161      }
162    
163      /**
164       * Subclasses should invoke this method to set the result of the computation
165       * to {@code value}.  This will set the state of the future to
166       * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
167       * state was successfully changed.
168       *
169       * @param value the value that was the result of the task.
170       * @return true if the state was successfully changed.
171       */
172      protected boolean set(@Nullable V value) {
173        boolean result = sync.set(value);
174        if (result) {
175          executionList.execute();
176        }
177        return result;
178      }
179    
180      /**
181       * Subclasses should invoke this method to set the result of the computation
182       * to an error, {@code throwable}.  This will set the state of the future to
183       * {@link AbstractFuture.Sync#COMPLETED} and invoke the listeners if the
184       * state was successfully changed.
185       *
186       * @param throwable the exception that the task failed with.
187       * @return true if the state was successfully changed.
188       * @throws Error if the throwable was an {@link Error}.
189       */
190      protected boolean setException(Throwable throwable) {
191        boolean result = sync.setException(checkNotNull(throwable));
192        if (result) {
193          executionList.execute();
194        }
195    
196        // If it's an Error, we want to make sure it reaches the top of the
197        // call stack, so we rethrow it.
198        if (throwable instanceof Error) {
199          throw (Error) throwable;
200        }
201        return result;
202      }
203    
204      /**
205       * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
206       * private subclass to hold the synchronizer.  This synchronizer is used to
207       * implement the blocking and waiting calls as well as to handle state changes
208       * in a thread-safe manner.  The current state of the future is held in the
209       * Sync state, and the lock is released whenever the state changes to either
210       * {@link #COMPLETED} or {@link #CANCELLED}.
211       *
212       * <p>To avoid races between threads doing release and acquire, we transition
213       * to the final state in two steps.  One thread will successfully CAS from
214       * RUNNING to COMPLETING, that thread will then set the result of the
215       * computation, and only then transition to COMPLETED or CANCELLED.
216       *
217       * <p>We don't use the integer argument passed between acquire methods so we
218       * pass around a -1 everywhere.
219       */
220      static final class Sync<V> extends AbstractQueuedSynchronizer {
221    
222        private static final long serialVersionUID = 0L;
223    
224        /* Valid states. */
225        static final int RUNNING = 0;
226        static final int COMPLETING = 1;
227        static final int COMPLETED = 2;
228        static final int CANCELLED = 4;
229    
230        private V value;
231        private Throwable exception;
232    
233        /*
234         * Acquisition succeeds if the future is done, otherwise it fails.
235         */
236        @Override
237        protected int tryAcquireShared(int ignored) {
238          if (isDone()) {
239            return 1;
240          }
241          return -1;
242        }
243    
244        /*
245         * We always allow a release to go through, this means the state has been
246         * successfully changed and the result is available.
247         */
248        @Override
249        protected boolean tryReleaseShared(int finalState) {
250          setState(finalState);
251          return true;
252        }
253    
254        /**
255         * Blocks until the task is complete or the timeout expires.  Throws a
256         * {@link TimeoutException} if the timer expires, otherwise behaves like
257         * {@link #get()}.
258         */
259        V get(long nanos) throws TimeoutException, CancellationException,
260            ExecutionException, InterruptedException {
261    
262          // Attempt to acquire the shared lock with a timeout.
263          if (!tryAcquireSharedNanos(-1, nanos)) {
264            throw new TimeoutException("Timeout waiting for task.");
265          }
266    
267          return getValue();
268        }
269    
270        /**
271         * Blocks until {@link #complete(Object, Throwable, int)} has been
272         * successfully called.  Throws a {@link CancellationException} if the task
273         * was cancelled, or a {@link ExecutionException} if the task completed with
274         * an error.
275         */
276        V get() throws CancellationException, ExecutionException,
277            InterruptedException {
278    
279          // Acquire the shared lock allowing interruption.
280          acquireSharedInterruptibly(-1);
281          return getValue();
282        }
283    
284        /**
285         * Implementation of the actual value retrieval.  Will return the value
286         * on success, an exception on failure, a cancellation on cancellation, or
287         * an illegal state if the synchronizer is in an invalid state.
288         */
289        private V getValue() throws CancellationException, ExecutionException {
290          int state = getState();
291          switch (state) {
292            case COMPLETED:
293              if (exception != null) {
294                throw new ExecutionException(exception);
295              } else {
296                return value;
297              }
298    
299            case CANCELLED:
300              throw new CancellationException("Task was cancelled.");
301    
302            default:
303              throw new IllegalStateException(
304                  "Error, synchronizer in invalid state: " + state);
305          }
306        }
307    
308        /**
309         * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
310         */
311        boolean isDone() {
312          return (getState() & (COMPLETED | CANCELLED)) != 0;
313        }
314    
315        /**
316         * Checks if the state is {@link #CANCELLED}.
317         */
318        boolean isCancelled() {
319          return getState() == CANCELLED;
320        }
321    
322        /**
323         * Transition to the COMPLETED state and set the value.
324         */
325        boolean set(@Nullable V v) {
326          return complete(v, null, COMPLETED);
327        }
328    
329        /**
330         * Transition to the COMPLETED state and set the exception.
331         */
332        boolean setException(Throwable t) {
333          return complete(null, t, COMPLETED);
334        }
335    
336        /**
337         * Transition to the CANCELLED state.
338         */
339        boolean cancel() {
340          return complete(null, null, CANCELLED);
341        }
342    
343        /**
344         * Implementation of completing a task.  Either {@code v} or {@code t} will
345         * be set but not both.  The {@code finalState} is the state to change to
346         * from {@link #RUNNING}.  If the state is not in the RUNNING state we
347         * return {@code false} after waiting for the state to be set to a valid
348         * final state ({@link #COMPLETED} or {@link #CANCELLED}).
349         *
350         * @param v the value to set as the result of the computation.
351         * @param t the exception to set as the result of the computation.
352         * @param finalState the state to transition to.
353         */
354        private boolean complete(@Nullable V v, @Nullable Throwable t,
355            int finalState) {
356          boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
357          if (doCompletion) {
358            // If this thread successfully transitioned to COMPLETING, set the value
359            // and exception and then release to the final state.
360            this.value = v;
361            this.exception = t;
362            releaseShared(finalState);
363          } else if (getState() == COMPLETING) {
364            // If some other thread is currently completing the future, block until
365            // they are done so we can guarantee completion.
366            acquireShared(-1);
367          }
368          return doCompletion;
369        }
370      }
371    }