001    /*
002     * Copyright (C) 2007 Google Inc.
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import com.google.common.annotations.Beta;
020    
021    import java.util.concurrent.CancellationException;
022    import java.util.concurrent.ExecutionException;
023    import java.util.concurrent.Future;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.TimeoutException;
026    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
027    
028    import javax.annotation.Nullable;
029    
030    /**
031     * <p>An abstract implementation of the {@link Future} interface.  This class
032     * is an abstraction of {@link java.util.concurrent.FutureTask} to support use
033     * for tasks other than {@link Runnable}s.  It uses an
034     * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and
035     * guarantee thread safety.  It could be used as a base class to
036     * {@code FutureTask}, or any other implementor of the {@code Future} interface.
037     *
038     * <p>This class implements all methods in {@code Future}.  Subclasses should
039     * provide a way to set the result of the computation through the protected
040     * methods {@link #set(Object)}, {@link #setException(Throwable)}, or
041     * {@link #cancel()}.  If subclasses want to implement cancellation they can
042     * override the {@link #cancel(boolean)} method with a real implementation, the
043     * default implementation doesn't support cancellation.
044     *
045     * <p>The state changing methods all return a boolean indicating success or
046     * failure in changing the future's state.  Valid states are running,
047     * completed, failed, or cancelled.  Because this class does not implement
048     * cancellation it is left to the subclass to distinguish between created
049     * and running tasks.
050     *
051     * @author Sven Mawson
052     * @since 1
053     */
054    @Beta
055    public abstract class AbstractFuture<V> implements Future<V> {
056    
057      /** Synchronization control for AbstractFutures. */
058      private final Sync<V> sync = new Sync<V>();
059    
060      /*
061       * Blocks until either the task completes or the timeout expires.  Uses the
062       * sync blocking-with-timeout support provided by AQS.
063       */
064      public V get(long timeout, TimeUnit unit) throws InterruptedException,
065          TimeoutException, ExecutionException {
066        return sync.get(unit.toNanos(timeout));
067      }
068    
069      /*
070       * Blocks until the task completes or we get interrupted. Uses the
071       * interruptible blocking support provided by AQS.
072       */
073      public V get() throws InterruptedException, ExecutionException {
074        return sync.get();
075      }
076    
077      /*
078       * Checks if the sync is not in the running state.
079       */
080      public boolean isDone() {
081        return sync.isDone();
082      }
083    
084      /*
085       * Checks if the sync is in the cancelled state.
086       */
087      public boolean isCancelled() {
088        return sync.isCancelled();
089      }
090    
091      /*
092       * Default implementation of cancel that never cancels the future.
093       * Subclasses should override this to implement cancellation if desired.
094       */
095      public boolean cancel(boolean mayInterruptIfRunning) {
096        return false;
097      }
098    
099      /**
100       * Subclasses should invoke this method to set the result of the computation
101       * to {@code value}.  This will set the state of the future to
102       * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
103       * state was successfully changed.
104       *
105       * @param value the value that was the result of the task.
106       * @return true if the state was successfully changed.
107       */
108      protected boolean set(@Nullable V value) {
109        boolean result = sync.set(value);
110        if (result) {
111          done();
112        }
113        return result;
114      }
115    
116      /**
117       * Subclasses should invoke this method to set the result of the computation
118       * to an error, {@code throwable}.  This will set the state of the future to
119       * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
120       * state was successfully changed.
121       *
122       * @param throwable the exception that the task failed with.
123       * @return true if the state was successfully changed.
124       * @throws Error if the throwable was an {@link Error}.
125       */
126      protected boolean setException(Throwable throwable) {
127        boolean result = sync.setException(throwable);
128        if (result) {
129          done();
130        }
131    
132        // If it's an Error, we want to make sure it reaches the top of the
133        // call stack, so we rethrow it.
134        if (throwable instanceof Error) {
135          throw (Error) throwable;
136        }
137        return result;
138      }
139    
140      /**
141       * Subclasses should invoke this method to mark the future as cancelled.
142       * This will set the state of the future to {@link
143       * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
144       * successfully changed.
145       *
146       * @return true if the state was successfully changed.
147       */
148      protected final boolean cancel() {
149        boolean result = sync.cancel();
150        if (result) {
151          done();
152        }
153        return result;
154      }
155    
156      /*
157       * Called by the success, failed, or cancelled methods to indicate that the
158       * value is now available and the latch can be released.  Subclasses can
159       * use this method to deal with any actions that should be undertaken when
160       * the task has completed.
161       */
162      protected void done() {
163        // Default implementation does nothing.
164      }
165    
166      /**
167       * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
168       * private subclass to hold the synchronizer.  This synchronizer is used to
169       * implement the blocking and waiting calls as well as to handle state changes
170       * in a thread-safe manner.  The current state of the future is held in the
171       * Sync state, and the lock is released whenever the state changes to either
172       * {@link #COMPLETED} or {@link #CANCELLED}.
173       *
174       * <p>To avoid races between threads doing release and acquire, we transition
175       * to the final state in two steps.  One thread will successfully CAS from
176       * RUNNING to COMPLETING, that thread will then set the result of the
177       * computation, and only then transition to COMPLETED or CANCELLED.
178       *
179       * <p>We don't use the integer argument passed between acquire methods so we
180       * pass around a -1 everywhere.
181       */
182      static final class Sync<V> extends AbstractQueuedSynchronizer {
183    
184        private static final long serialVersionUID = 0L;
185    
186        /* Valid states. */
187        static final int RUNNING = 0;
188        static final int COMPLETING = 1;
189        static final int COMPLETED = 2;
190        static final int CANCELLED = 4;
191    
192        private V value;
193        private ExecutionException exception;
194    
195        /*
196         * Acquisition succeeds if the future is done, otherwise it fails.
197         */
198        @Override
199        protected int tryAcquireShared(int ignored) {
200          if (isDone()) {
201            return 1;
202          }
203          return -1;
204        }
205    
206        /*
207         * We always allow a release to go through, this means the state has been
208         * successfully changed and the result is available.
209         */
210        @Override
211        protected boolean tryReleaseShared(int finalState) {
212          setState(finalState);
213          return true;
214        }
215    
216        /**
217         * Blocks until the task is complete or the timeout expires.  Throws a
218         * {@link TimeoutException} if the timer expires, otherwise behaves like
219         * {@link #get()}.
220         */
221        V get(long nanos) throws TimeoutException, CancellationException,
222            ExecutionException, InterruptedException {
223    
224          // Attempt to acquire the shared lock with a timeout.
225          if (!tryAcquireSharedNanos(-1, nanos)) {
226            throw new TimeoutException("Timeout waiting for task.");
227          }
228    
229          return getValue();
230        }
231    
232        /**
233         * Blocks until {@link #complete(Object, Throwable, int)} has been
234         * successfully called.  Throws a {@link CancellationException} if the task
235         * was cancelled, or a {@link ExecutionException} if the task completed with
236         * an error.
237         */
238        V get() throws CancellationException, ExecutionException,
239            InterruptedException {
240    
241          // Acquire the shared lock allowing interruption.
242          acquireSharedInterruptibly(-1);
243          return getValue();
244        }
245    
246        /**
247         * Implementation of the actual value retrieval.  Will return the value
248         * on success, an exception on failure, a cancellation on cancellation, or
249         * an illegal state if the synchronizer is in an invalid state.
250         */
251        private V getValue() throws CancellationException, ExecutionException {
252          int state = getState();
253          switch (state) {
254            case COMPLETED:
255              if (exception != null) {
256                throw exception;
257              } else {
258                return value;
259              }
260    
261            case CANCELLED:
262              throw new CancellationException("Task was cancelled.");
263    
264            default:
265              throw new IllegalStateException(
266                  "Error, synchronizer in invalid state: " + state);
267          }
268        }
269    
270        /**
271         * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
272         */
273        boolean isDone() {
274          return (getState() & (COMPLETED | CANCELLED)) != 0;
275        }
276    
277        /**
278         * Checks if the state is {@link #CANCELLED}.
279         */
280        boolean isCancelled() {
281          return getState() == CANCELLED;
282        }
283    
284        /**
285         * Transition to the COMPLETED state and set the value.
286         */
287        boolean set(V v) {
288          return complete(v, null, COMPLETED);
289        }
290    
291        /**
292         * Transition to the COMPLETED state and set the exception.
293         */
294        boolean setException(Throwable t) {
295          return complete(null, t, COMPLETED);
296        }
297    
298        /**
299         * Transition to the CANCELLED state.
300         */
301        boolean cancel() {
302          return complete(null, null, CANCELLED);
303        }
304    
305        /**
306         * Implementation of completing a task.  Either {@code v} or {@code t} will
307         * be set but not both.  The {@code finalState} is the state to change to
308         * from {@link #RUNNING}.  If the state is not in the RUNNING state we
309         * return {@code false}.
310         *
311         * @param v the value to set as the result of the computation.
312         * @param t the exception to set as the result of the computation.
313         * @param finalState the state to transition to.
314         */
315        private boolean complete(V v, Throwable t, int finalState) {
316          if (compareAndSetState(RUNNING, COMPLETING)) {
317            this.value = v;
318            this.exception = t == null ? null : new ExecutionException(t);
319            releaseShared(finalState);
320            return true;
321          }
322    
323          // The state was not RUNNING, so there are no valid transitions.
324          return false;
325        }
326      }
327    }