001    /*
002     * Copyright (C) 2007 Google Inc.
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    
021    import com.google.common.annotations.Beta;
022    
023    import java.util.concurrent.CancellationException;
024    import java.util.concurrent.ExecutionException;
025    import java.util.concurrent.Future;
026    import java.util.concurrent.TimeUnit;
027    import java.util.concurrent.TimeoutException;
028    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
029    
030    import javax.annotation.Nullable;
031    
032    /**
033     * <p>An abstract implementation of the {@link Future} interface.  This class
034     * is an abstraction of {@link java.util.concurrent.FutureTask} to support use
035     * for tasks other than {@link Runnable}s.  It uses an
036     * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and
037     * guarantee thread safety.  It could be used as a base class to
038     * {@code FutureTask}, or any other implementor of the {@code Future} interface.
039     *
040     * <p>This class implements all methods in {@code Future}.  Subclasses should
041     * provide a way to set the result of the computation through the protected
042     * methods {@link #set(Object)}, {@link #setException(Throwable)}, or
043     * {@link #cancel()}.  If subclasses want to implement cancellation they can
044     * override the {@link #cancel(boolean)} method with a real implementation, the
045     * default implementation doesn't support cancellation.
046     *
047     * <p>The state changing methods all return a boolean indicating success or
048     * failure in changing the future's state.  Valid states are running,
049     * completed, failed, or cancelled.  Because this class does not implement
050     * cancellation it is left to the subclass to distinguish between created
051     * and running tasks.
052     *
053     * @author Sven Mawson
054     * @since 1
055     */
056    @Beta
057    public abstract class AbstractFuture<V> implements Future<V> {
058    
059      /** Synchronization control for AbstractFutures. */
060      private final Sync<V> sync = new Sync<V>();
061    
062      /*
063       * Blocks until either the task completes or the timeout expires.  Uses the
064       * sync blocking-with-timeout support provided by AQS.
065       */
066      public V get(long timeout, TimeUnit unit) throws InterruptedException,
067          TimeoutException, ExecutionException {
068        return sync.get(unit.toNanos(timeout));
069      }
070    
071      /*
072       * Blocks until the task completes or we get interrupted. Uses the
073       * interruptible blocking support provided by AQS.
074       */
075      public V get() throws InterruptedException, ExecutionException {
076        return sync.get();
077      }
078    
079      /*
080       * Checks if the sync is not in the running state.
081       */
082      public boolean isDone() {
083        return sync.isDone();
084      }
085    
086      /*
087       * Checks if the sync is in the cancelled state.
088       */
089      public boolean isCancelled() {
090        return sync.isCancelled();
091      }
092    
093      /*
094       * Default implementation of cancel that never cancels the future.
095       * Subclasses should override this to implement cancellation if desired.
096       */
097      public boolean cancel(boolean mayInterruptIfRunning) {
098        return false;
099      }
100    
101      /**
102       * Subclasses should invoke this method to set the result of the computation
103       * to {@code value}.  This will set the state of the future to
104       * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
105       * state was successfully changed.
106       *
107       * @param value the value that was the result of the task.
108       * @return true if the state was successfully changed.
109       */
110      protected boolean set(@Nullable V value) {
111        boolean result = sync.set(value);
112        if (result) {
113          done();
114        }
115        return result;
116      }
117    
118      /**
119       * Subclasses should invoke this method to set the result of the computation
120       * to an error, {@code throwable}.  This will set the state of the future to
121       * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
122       * state was successfully changed.
123       *
124       * @param throwable the exception that the task failed with.
125       * @return true if the state was successfully changed.
126       * @throws Error if the throwable was an {@link Error}.
127       */
128      protected boolean setException(Throwable throwable) {
129        boolean result = sync.setException(checkNotNull(throwable));
130        if (result) {
131          done();
132        }
133    
134        // If it's an Error, we want to make sure it reaches the top of the
135        // call stack, so we rethrow it.
136        if (throwable instanceof Error) {
137          throw (Error) throwable;
138        }
139        return result;
140      }
141    
142      /**
143       * Subclasses should invoke this method to mark the future as cancelled.
144       * This will set the state of the future to {@link
145       * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
146       * successfully changed.
147       *
148       * @return true if the state was successfully changed.
149       */
150      protected final boolean cancel() {
151        boolean result = sync.cancel();
152        if (result) {
153          done();
154        }
155        return result;
156      }
157    
158      /*
159       * Called by the success, failed, or cancelled methods to indicate that the
160       * value is now available and the latch can be released.  Subclasses can
161       * use this method to deal with any actions that should be undertaken when
162       * the task has completed.
163       */
164      protected void done() {
165        // Default implementation does nothing.
166      }
167    
168      /**
169       * <p>Following the contract of {@link AbstractQueuedSynchronizer} we create a
170       * private subclass to hold the synchronizer.  This synchronizer is used to
171       * implement the blocking and waiting calls as well as to handle state changes
172       * in a thread-safe manner.  The current state of the future is held in the
173       * Sync state, and the lock is released whenever the state changes to either
174       * {@link #COMPLETED} or {@link #CANCELLED}.
175       *
176       * <p>To avoid races between threads doing release and acquire, we transition
177       * to the final state in two steps.  One thread will successfully CAS from
178       * RUNNING to COMPLETING, that thread will then set the result of the
179       * computation, and only then transition to COMPLETED or CANCELLED.
180       *
181       * <p>We don't use the integer argument passed between acquire methods so we
182       * pass around a -1 everywhere.
183       */
184      static final class Sync<V> extends AbstractQueuedSynchronizer {
185    
186        private static final long serialVersionUID = 0L;
187    
188        /* Valid states. */
189        static final int RUNNING = 0;
190        static final int COMPLETING = 1;
191        static final int COMPLETED = 2;
192        static final int CANCELLED = 4;
193    
194        private V value;
195        private ExecutionException exception;
196    
197        /*
198         * Acquisition succeeds if the future is done, otherwise it fails.
199         */
200        @Override
201        protected int tryAcquireShared(int ignored) {
202          if (isDone()) {
203            return 1;
204          }
205          return -1;
206        }
207    
208        /*
209         * We always allow a release to go through, this means the state has been
210         * successfully changed and the result is available.
211         */
212        @Override
213        protected boolean tryReleaseShared(int finalState) {
214          setState(finalState);
215          return true;
216        }
217    
218        /**
219         * Blocks until the task is complete or the timeout expires.  Throws a
220         * {@link TimeoutException} if the timer expires, otherwise behaves like
221         * {@link #get()}.
222         */
223        V get(long nanos) throws TimeoutException, CancellationException,
224            ExecutionException, InterruptedException {
225    
226          // Attempt to acquire the shared lock with a timeout.
227          if (!tryAcquireSharedNanos(-1, nanos)) {
228            throw new TimeoutException("Timeout waiting for task.");
229          }
230    
231          return getValue();
232        }
233    
234        /**
235         * Blocks until {@link #complete(Object, Throwable, int)} has been
236         * successfully called.  Throws a {@link CancellationException} if the task
237         * was cancelled, or a {@link ExecutionException} if the task completed with
238         * an error.
239         */
240        V get() throws CancellationException, ExecutionException,
241            InterruptedException {
242    
243          // Acquire the shared lock allowing interruption.
244          acquireSharedInterruptibly(-1);
245          return getValue();
246        }
247    
248        /**
249         * Implementation of the actual value retrieval.  Will return the value
250         * on success, an exception on failure, a cancellation on cancellation, or
251         * an illegal state if the synchronizer is in an invalid state.
252         */
253        private V getValue() throws CancellationException, ExecutionException {
254          int state = getState();
255          switch (state) {
256            case COMPLETED:
257              if (exception != null) {
258                throw exception;
259              } else {
260                return value;
261              }
262    
263            case CANCELLED:
264              throw new CancellationException("Task was cancelled.");
265    
266            default:
267              throw new IllegalStateException(
268                  "Error, synchronizer in invalid state: " + state);
269          }
270        }
271    
272        /**
273         * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
274         */
275        boolean isDone() {
276          return (getState() & (COMPLETED | CANCELLED)) != 0;
277        }
278    
279        /**
280         * Checks if the state is {@link #CANCELLED}.
281         */
282        boolean isCancelled() {
283          return getState() == CANCELLED;
284        }
285    
286        /**
287         * Transition to the COMPLETED state and set the value.
288         */
289        boolean set(@Nullable V v) {
290          return complete(v, null, COMPLETED);
291        }
292    
293        /**
294         * Transition to the COMPLETED state and set the exception.
295         */
296        boolean setException(Throwable t) {
297          return complete(null, t, COMPLETED);
298        }
299    
300        /**
301         * Transition to the CANCELLED state.
302         */
303        boolean cancel() {
304          return complete(null, null, CANCELLED);
305        }
306    
307        /**
308         * Implementation of completing a task.  Either {@code v} or {@code t} will
309         * be set but not both.  The {@code finalState} is the state to change to
310         * from {@link #RUNNING}.  If the state is not in the RUNNING state we
311         * return {@code false}.
312         *
313         * @param v the value to set as the result of the computation.
314         * @param t the exception to set as the result of the computation.
315         * @param finalState the state to transition to.
316         */
317        private boolean complete(@Nullable V v, Throwable t, int finalState) {
318          if (compareAndSetState(RUNNING, COMPLETING)) {
319            this.value = v;
320            this.exception = t == null ? null : new ExecutionException(t);
321            releaseShared(finalState);
322            return true;
323          }
324    
325          // The state was not RUNNING, so there are no valid transitions.
326          return false;
327        }
328      }
329    }