001    /*
002     * Copyright (C) 2007 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    
021    import com.google.common.annotations.Beta;
022    
023    import java.util.concurrent.CancellationException;
024    import java.util.concurrent.ExecutionException;
025    import java.util.concurrent.Future;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.TimeoutException;
028    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
029    
030    import javax.annotation.Nullable;
031    
032    /**
033     * <p>An abstract implementation of the {@link Future} interface.  This class
034     * is an abstraction of {@link java.util.concurrent.FutureTask} to support use
035     * for tasks other than {@link Runnable}s.  It uses an
036     * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and
037     * guarantee thread safety.  It could be used as a base class to
038     * {@code FutureTask}, or any other implementor of the {@code Future} interface.
039     *
040     * <p>This class implements all methods in {@code Future}.  Subclasses should
041     * provide a way to set the result of the computation through the protected
042     * methods {@link #set(Object)}, {@link #setException(Throwable)}, or
043     * {@link #cancel()}.  If subclasses want to implement cancellation they can
044     * override the {@link #cancel(boolean)} method with a real implementation, the
045     * default implementation doesn't support cancellation.
046     *
047     * <p>The state changing methods all return a boolean indicating success or
048     * failure in changing the future's state.  Valid states are running,
049     * completed, failed, or cancelled.  Because this class does not implement
050     * cancellation it is left to the subclass to distinguish between created
051     * and running tasks.
052     *
053     * @author Sven Mawson
054     * @since 1
055     */
056    @Beta
057    public abstract class AbstractFuture<V> implements Future<V> {
058    
059      /** Synchronization control for AbstractFutures. */
060      private final Sync<V> sync = new Sync<V>();
061    
062      /*
063       * Blocks until either the task completes or the timeout expires.  Uses the
064       * sync blocking-with-timeout support provided by AQS.
065       */
066      @Override
067      public V get(long timeout, TimeUnit unit) throws InterruptedException,
068          TimeoutException, ExecutionException {
069        return sync.get(unit.toNanos(timeout));
070      }
071    
072      /*
073       * Blocks until the task completes or we get interrupted. Uses the
074       * interruptible blocking support provided by AQS.
075       */
076      @Override
077      public V get() throws InterruptedException, ExecutionException {
078        return sync.get();
079      }
080    
081      /*
082       * Checks if the sync is not in the running state.
083       */
084      @Override
085      public boolean isDone() {
086        return sync.isDone();
087      }
088    
089      /*
090       * Checks if the sync is in the cancelled state.
091       */
092      @Override
093      public boolean isCancelled() {
094        return sync.isCancelled();
095      }
096    
097      /*
098       * Default implementation of cancel that never cancels the future.
099       * Subclasses should override this to implement cancellation if desired.
100       */
101      @Override
102      public boolean cancel(boolean mayInterruptIfRunning) {
103        return false;
104      }
105    
106      /**
107       * Subclasses should invoke this method to set the result of the computation
108       * to {@code value}.  This will set the state of the future to
109       * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
110       * state was successfully changed.
111       *
112       * @param value the value that was the result of the task.
113       * @return true if the state was successfully changed.
114       */
115      protected boolean set(@Nullable V value) {
116        boolean result = sync.set(value);
117        if (result) {
118          done();
119        }
120        return result;
121      }
122    
123      /**
124       * Subclasses should invoke this method to set the result of the computation
125       * to an error, {@code throwable}.  This will set the state of the future to
126       * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
127       * state was successfully changed.
128       *
129       * @param throwable the exception that the task failed with.
130       * @return true if the state was successfully changed.
131       * @throws Error if the throwable was an {@link Error}.
132       */
133      protected boolean setException(Throwable throwable) {
134        boolean result = sync.setException(checkNotNull(throwable));
135        if (result) {
136          done();
137        }
138    
139        // If it's an Error, we want to make sure it reaches the top of the
140        // call stack, so we rethrow it.
141        if (throwable instanceof Error) {
142          throw (Error) throwable;
143        }
144        return result;
145      }
146    
147      /**
148       * Subclasses should invoke this method to mark the future as cancelled.
149       * This will set the state of the future to {@link
150       * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
151       * successfully changed.
152       *
153       * @return true if the state was successfully changed.
154       */
155      protected final boolean cancel() {
156        boolean result = sync.cancel();
157        if (result) {
158          done();
159        }
160        return result;
161      }
162    
163      /*
164       * Called by the success, failed, or cancelled methods to indicate that the
165       * value is now available and the latch can be released.  Subclasses can
166       * use this method to deal with any actions that should be undertaken when
167       * the task has completed.
168       */
169      protected void done() {
170        // Default implementation does nothing.
171      }
172    
173      /**
174       * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
175       * private subclass to hold the synchronizer.  This synchronizer is used to
176       * implement the blocking and waiting calls as well as to handle state changes
177       * in a thread-safe manner.  The current state of the future is held in the
178       * Sync state, and the lock is released whenever the state changes to either
179       * {@link #COMPLETED} or {@link #CANCELLED}.
180       *
181       * <p>To avoid races between threads doing release and acquire, we transition
182       * to the final state in two steps.  One thread will successfully CAS from
183       * RUNNING to COMPLETING, that thread will then set the result of the
184       * computation, and only then transition to COMPLETED or CANCELLED.
185       *
186       * <p>We don't use the integer argument passed between acquire methods so we
187       * pass around a -1 everywhere.
188       */
189      static final class Sync<V> extends AbstractQueuedSynchronizer {
190    
191        private static final long serialVersionUID = 0L;
192    
193        /* Valid states. */
194        static final int RUNNING = 0;
195        static final int COMPLETING = 1;
196        static final int COMPLETED = 2;
197        static final int CANCELLED = 4;
198    
199        private V value;
200        private Throwable exception;
201    
202        /*
203         * Acquisition succeeds if the future is done, otherwise it fails.
204         */
205        @Override
206        protected int tryAcquireShared(int ignored) {
207          if (isDone()) {
208            return 1;
209          }
210          return -1;
211        }
212    
213        /*
214         * We always allow a release to go through, this means the state has been
215         * successfully changed and the result is available.
216         */
217        @Override
218        protected boolean tryReleaseShared(int finalState) {
219          setState(finalState);
220          return true;
221        }
222    
223        /**
224         * Blocks until the task is complete or the timeout expires.  Throws a
225         * {@link TimeoutException} if the timer expires, otherwise behaves like
226         * {@link #get()}.
227         */
228        V get(long nanos) throws TimeoutException, CancellationException,
229            ExecutionException, InterruptedException {
230    
231          // Attempt to acquire the shared lock with a timeout.
232          if (!tryAcquireSharedNanos(-1, nanos)) {
233            throw new TimeoutException("Timeout waiting for task.");
234          }
235    
236          return getValue();
237        }
238    
239        /**
240         * Blocks until {@link #complete(Object, Throwable, int)} has been
241         * successfully called.  Throws a {@link CancellationException} if the task
242         * was cancelled, or a {@link ExecutionException} if the task completed with
243         * an error.
244         */
245        V get() throws CancellationException, ExecutionException,
246            InterruptedException {
247    
248          // Acquire the shared lock allowing interruption.
249          acquireSharedInterruptibly(-1);
250          return getValue();
251        }
252    
253        /**
254         * Implementation of the actual value retrieval.  Will return the value
255         * on success, an exception on failure, a cancellation on cancellation, or
256         * an illegal state if the synchronizer is in an invalid state.
257         */
258        private V getValue() throws CancellationException, ExecutionException {
259          int state = getState();
260          switch (state) {
261            case COMPLETED:
262              if (exception != null) {
263                throw new ExecutionException(exception);
264              } else {
265                return value;
266              }
267    
268            case CANCELLED:
269              throw new CancellationException("Task was cancelled.");
270    
271            default:
272              throw new IllegalStateException(
273                  "Error, synchronizer in invalid state: " + state);
274          }
275        }
276    
277        /**
278         * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
279         */
280        boolean isDone() {
281          return (getState() & (COMPLETED | CANCELLED)) != 0;
282        }
283    
284        /**
285         * Checks if the state is {@link #CANCELLED}.
286         */
287        boolean isCancelled() {
288          return getState() == CANCELLED;
289        }
290    
291        /**
292         * Transition to the COMPLETED state and set the value.
293         */
294        boolean set(@Nullable V v) {
295          return complete(v, null, COMPLETED);
296        }
297    
298        /**
299         * Transition to the COMPLETED state and set the exception.
300         */
301        boolean setException(Throwable t) {
302          return complete(null, t, COMPLETED);
303        }
304    
305        /**
306         * Transition to the CANCELLED state.
307         */
308        boolean cancel() {
309          return complete(null, null, CANCELLED);
310        }
311    
312        /**
313         * Implementation of completing a task.  Either {@code v} or {@code t} will
314         * be set but not both.  The {@code finalState} is the state to change to
315         * from {@link #RUNNING}.  If the state is not in the RUNNING state we
316         * return {@code false}.
317         *
318         * @param v the value to set as the result of the computation.
319         * @param t the exception to set as the result of the computation.
320         * @param finalState the state to transition to.
321         */
322        private boolean complete(@Nullable V v, Throwable t, int finalState) {
323          if (compareAndSetState(RUNNING, COMPLETING)) {
324            this.value = v;
325            this.exception = t;
326            releaseShared(finalState);
327            return true;
328          }
329    
330          // The state was not RUNNING, so there are no valid transitions.
331          return false;
332        }
333      }
334    }