001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
021import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
022
023import com.google.common.annotations.Beta;
024import com.google.common.annotations.GwtCompatible;
025import com.google.common.base.Throwables;
026
027import java.security.AccessController;
028import java.security.PrivilegedActionException;
029import java.security.PrivilegedExceptionAction;
030import java.util.concurrent.CancellationException;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.Executor;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
037import java.util.concurrent.locks.LockSupport;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041import javax.annotation.Nullable;
042
043/**
044 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More
045 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture},
046 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an
047 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, Function)
048 * Futures.transform} and {@link Futures#catching(ListenableFuture, Class, Function)
049 * Futures.catching}.
050 *
051 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way
052 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link
053 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override
054 * {@link #interruptTask()}, which will be invoked automatically if a call to {@link
055 * #cancel(boolean) cancel(true)} succeeds in canceling the future. Subclasses should rarely
056 * override other methods.
057 *
058 * @author Sven Mawson
059 * @author Luke Sandberg
060 * @since 1.0
061 */
062@GwtCompatible(emulated = true)
063public abstract class AbstractFuture<V> implements ListenableFuture<V> {
064  private static final boolean GENERATE_CANCELLATION_CAUSES =
065      Boolean.parseBoolean(
066          System.getProperty("guava.concurrent.generate_cancellation_cause", "false"));
067
068  /**
069   * A less abstract subclass of AbstractFuture.  This can be used to optimize setFuture by ensuring
070   * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}.
071   */
072  abstract static class TrustedFuture<V> extends AbstractFuture<V> {
073    // N.B. cancel is not overridden to be final, because many future utilities need to override
074    // cancel in order to propagate cancellation to other futures.
075    // TODO(lukes): with maybePropagateCancellation this is no longer really true.  Track down the
076    // final few cases and eliminate their overrides of cancel()
077
078    @Override public final V get() throws InterruptedException, ExecutionException {
079      return super.get();
080    }
081
082    @Override public final V get(long timeout, TimeUnit unit)
083        throws InterruptedException, ExecutionException, TimeoutException {
084      return super.get(timeout, unit);
085    }
086
087    @Override public final boolean isDone() {
088      return super.isDone();
089    }
090
091    @Override public final boolean isCancelled() {
092      return super.isCancelled();
093    }
094
095    @Override public final void addListener(Runnable listener, Executor executor) {
096      super.addListener(listener, executor);
097    }
098  }
099
100  // Logger to log exceptions caught when running listeners.
101  private static final Logger log = Logger.getLogger(AbstractFuture.class.getName());
102
103  // A heuristic for timed gets.  If the remaining timeout is less than this, spin instead of
104  // blocking.  This value is what AbstractQueuedSynchronizer uses.
105  private static final long SPIN_THRESHOLD_NANOS = 1000L;
106
107  private static final AtomicHelper ATOMIC_HELPER;
108
109  static {
110    AtomicHelper helper;
111
112    try {
113      helper = new UnsafeAtomicHelper();
114    } catch (Throwable unsafeFailure) {
115      // catch absolutely everything and fall through to our 'SafeAtomicHelper'
116      // The access control checks that ARFU does means the caller class has to be AbstractFuture
117      // instead of SafeAtomicHelper, so we annoyingly define these here
118      try {
119        helper = new SafeAtomicHelper(
120            newUpdater(Waiter.class, Thread.class, "thread"),
121            newUpdater(Waiter.class, Waiter.class, "next"),
122            newUpdater(AbstractFuture.class, Waiter.class, "waiters"),
123            newUpdater(AbstractFuture.class, Listener.class, "listeners"),
124            newUpdater(AbstractFuture.class, Object.class, "value"));
125      } catch (Throwable atomicReferenceFieldUpdaterFailure) {
126        // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause
127        // getDeclaredField to throw a NoSuchFieldException when the field is definitely there.
128        // For these users fallback to a suboptimal implementation, based on synchronized.  This
129        // will be a definite performance hit to those users.
130        log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure);
131        log.log(Level.SEVERE, "SafeAtomicHelper is broken!", atomicReferenceFieldUpdaterFailure);
132        helper = new SynchronizedHelper();
133      }
134    }
135    ATOMIC_HELPER = helper;
136
137    // Prevent rare disastrous classloading in first call to LockSupport.park.
138    // See: https://bugs.openjdk.java.net/browse/JDK-8074773
139    @SuppressWarnings("unused")
140    Class<?> ensureLoaded = LockSupport.class;
141  }
142
143  /**
144   * Waiter links form a Treiber stack, in the {@link #waiters} field.
145   */
146  private static final class Waiter {
147    static final Waiter TOMBSTONE = new Waiter(false /* ignored param */);
148
149    @Nullable volatile Thread thread;
150    @Nullable volatile Waiter next;
151
152    // Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded
153    // before the ATOMIC_HELPER.  Apparently this is possible on some android platforms.
154    Waiter(boolean unused) {}
155
156    Waiter() {
157      // avoid volatile write, write is made visible by subsequent CAS on waiters field
158      ATOMIC_HELPER.putThread(this, Thread.currentThread());
159    }
160
161    // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters
162    // field.
163    void setNext(Waiter next) {
164      ATOMIC_HELPER.putNext(this, next);
165    }
166
167    void unpark() {
168      // This is racy with removeWaiter.  The consequence of the race is that we may spuriously
169      // call unpark even though the thread has already removed itself from the list.  But even if
170      // we did use a CAS, that race would still exist (it would just be ever so slightly smaller).
171      Thread w = thread;
172      if (w != null) {
173        thread = null;
174        LockSupport.unpark(w);
175      }
176    }
177  }
178
179  /**
180   * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted
181   * nodes.  This is an O(n) operation in the common case (and O(n^2) in the worst), but we are
182   * saved by two things.
183   * <ul>
184   *   <li>This is only called when a waiting thread times out or is interrupted.  Both of which
185   *       should be rare.
186   *   <li>The waiters list should be very short.
187   * </ul>
188   */
189  private void removeWaiter(Waiter node) {
190    node.thread = null;  // mark as 'deleted'
191    restart: while (true) {
192      Waiter pred = null;
193      Waiter curr = waiters;
194      if (curr == Waiter.TOMBSTONE) {
195        return;  // give up if someone is calling complete
196      }
197      Waiter succ;
198      while (curr != null) {
199        succ = curr.next;
200        if (curr.thread != null) {  // we aren't unlinking this node, update pred.
201          pred = curr;
202        } else if (pred != null) { // We are unlinking this node and it has a predecessor.
203          pred.next = succ;
204          if (pred.thread == null) {  // We raced with another node that unlinked pred. Restart.
205            continue restart;
206          }
207        } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) {  // We are unlinking head
208          continue restart;  // We raced with an add or complete
209        }
210        curr = succ;
211      }
212      break;
213    }
214  }
215
216  /** Listeners also form a stack through the {@link #listeners} field. */
217  private static final class Listener {
218    static final Listener TOMBSTONE = new Listener(null, null);
219    final Runnable task;
220    final Executor executor;
221
222    // writes to next are made visible by subsequent CAS's on the listeners field
223    @Nullable Listener next;
224
225    Listener(Runnable task, Executor executor) {
226      this.task = task;
227      this.executor = executor;
228    }
229  }
230
231  /** A special value to represent {@code null}. */
232  private static final Object NULL = new Object();
233
234  /** A special value to represent failure, when {@link #setException} is called successfully. */
235  private static final class Failure {
236    static final Failure FALLBACK_INSTANCE = new Failure(
237        new Throwable("Failure occurred while trying to finish a future.") {
238          @Override public synchronized Throwable fillInStackTrace() {
239            return this;  // no stack trace
240          }
241        });
242    final Throwable exception;
243
244    Failure(Throwable exception) {
245      this.exception = checkNotNull(exception);
246    }
247  }
248
249  /** A special value to represent cancellation and the 'wasInterrupted' bit. */
250  private static final class Cancellation {
251    final boolean wasInterrupted;
252    @Nullable final Throwable cause;
253
254    Cancellation(boolean wasInterrupted, @Nullable Throwable cause) {
255      this.wasInterrupted = wasInterrupted;
256      this.cause = cause;
257    }
258  }
259
260  /** A special value that encodes the 'setFuture' state. */
261  private final class SetFuture implements Runnable {
262    final ListenableFuture<? extends V> future;
263
264    SetFuture(ListenableFuture<? extends V> future) {
265      this.future = future;
266    }
267
268    @Override public void run() {
269      if (value != this) {
270        // nothing to do, we must have been cancelled
271        return;
272      }
273      completeWithFuture(future, this);
274    }
275  }
276
277  // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is
278  // available.
279  /**
280   * This field encodes the current state of the future.
281   *
282   * <p>The valid values are:
283   * <ul>
284   *   <li>{@code null} initial state, nothing has happened.
285   *   <li>{@link Cancellation} terminal state, {@code cancel} was called.
286   *   <li>{@link Failure} terminal state, {@code setException} was called.
287   *   <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
288   *   <li>{@link #NULL} terminal state, {@code set(null)} was called.
289   *   <li>Any other non-null value, terminal state, {@code set} was called with a non-null
290   *       argument.
291   * </ul>
292   */
293  private volatile Object value;
294
295  /** All listeners. */
296  private volatile Listener listeners;
297
298  /** All waiting threads. */
299  private volatile Waiter waiters;
300
301  /**
302   * Constructor for use by subclasses.
303   */
304  protected AbstractFuture() {}
305
306  /*
307   * Improve the documentation of when InterruptedException is thrown. Our
308   * behavior matches the JDK's, but the JDK's documentation is misleading.
309   */
310
311  // Gets and Timed Gets
312  //
313  // * Be responsive to interruption
314  // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the
315  //   waiters field.
316  // * Future completion is defined by when #value becomes non-null/non SetFuture
317  // * Future completion can be observed if the waiters field contains a TOMBSTONE
318
319  // Timed Get
320  // There are a few design constraints to consider
321  // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I
322  //   have observed 12 micros on 64 bit linux systems to wake up a parked thread).  So if the
323  //   timeout is small we shouldn't park().  This needs to be traded off with the cpu overhead of
324  //   spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for
325  //   similar purposes.
326  // * We want to behave reasonably for timeouts of 0
327  // * We are more responsive to completion than timeouts.  This is because parkNanos depends on
328  //   system scheduling and as such we could either miss our deadline, or unpark() could be delayed
329  //   so that it looks like we timed out even though we didn't.  For comparison FutureTask respects
330  //   completion preferably and AQS is non-deterministic (depends on where in the queue the waiter
331  //   is).  If we wanted to be strict about it, we could store the unpark() time in the Waiter
332  //   node and we could use that to make a decision about whether or not we timed out prior to
333  //   being unparked.
334
335  /**
336   * {@inheritDoc}
337   *
338   * <p>The default {@link AbstractFuture} implementation throws {@code
339   * InterruptedException} if the current thread is interrupted before or during
340   * the call, even if the value is already available.
341   *
342   * @throws InterruptedException if the current thread was interrupted before
343   *     or during the call (optional but recommended).
344   * @throws CancellationException {@inheritDoc}
345   */
346  @Override
347  public V get(long timeout, TimeUnit unit)
348      throws InterruptedException, TimeoutException, ExecutionException {
349    // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop
350    // at the bottom and throw a timeoutexception.
351    long remainingNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit.
352    if (Thread.interrupted()) {
353      throw new InterruptedException();
354    }
355    Object localValue = value;
356    if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
357      return getDoneValue(localValue);
358    }
359    // we delay calling nanoTime until we know we will need to either park or spin
360    final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0;
361    long_wait_loop: if (remainingNanos >= SPIN_THRESHOLD_NANOS) {
362      Waiter oldHead = waiters;
363      if (oldHead != Waiter.TOMBSTONE) {
364        Waiter node = new Waiter();
365        do {
366          node.setNext(oldHead);
367          if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
368            while (true) {
369              LockSupport.parkNanos(this, remainingNanos);
370              // Check interruption first, if we woke up due to interruption we need to honor that.
371              if (Thread.interrupted()) {
372                removeWaiter(node);
373                throw new InterruptedException();
374              }
375
376              // Otherwise re-read and check doneness.  If we loop then it must have been a spurious
377              // wakeup
378              localValue = value;
379              if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
380                return getDoneValue(localValue);
381              }
382
383              // timed out?
384              remainingNanos = endNanos - System.nanoTime();
385              if (remainingNanos < SPIN_THRESHOLD_NANOS) {
386                // Remove the waiter, one way or another we are done parking this thread.
387                removeWaiter(node);
388                break long_wait_loop;  // jump down to the busy wait loop
389              }
390            }
391          }
392          oldHead = waiters;  // re-read and loop.
393        } while (oldHead != Waiter.TOMBSTONE);
394      }
395      // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
396      // waiter.
397      return getDoneValue(value);
398    }
399    // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the
400    // waiters list
401    while (remainingNanos > 0) {
402      localValue = value;
403      if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
404        return getDoneValue(localValue);
405      }
406      if (Thread.interrupted()) {
407        throw new InterruptedException();
408      }
409      remainingNanos = endNanos - System.nanoTime();
410    }
411    throw new TimeoutException();
412  }
413
414  /*
415   * Improve the documentation of when InterruptedException is thrown. Our
416   * behavior matches the JDK's, but the JDK's documentation is misleading.
417   */
418  /**
419   * {@inheritDoc}
420   *
421   * <p>The default {@link AbstractFuture} implementation throws {@code
422   * InterruptedException} if the current thread is interrupted before or during
423   * the call, even if the value is already available.
424   *
425   * @throws InterruptedException if the current thread was interrupted before
426   *     or during the call (optional but recommended).
427   * @throws CancellationException {@inheritDoc}
428   */
429  @Override public V get() throws InterruptedException, ExecutionException {
430    if (Thread.interrupted()) {
431      throw new InterruptedException();
432    }
433    Object localValue = value;
434    if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
435      return getDoneValue(localValue);
436    }
437    Waiter oldHead = waiters;
438    if (oldHead != Waiter.TOMBSTONE) {
439      Waiter node = new Waiter();
440      do {
441        node.setNext(oldHead);
442        if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
443          // we are on the stack, now wait for completion.
444          while (true) {
445            LockSupport.park(this);
446            // Check interruption first, if we woke up due to interruption we need to honor that.
447            if (Thread.interrupted()) {
448              removeWaiter(node);
449              throw new InterruptedException();
450            }
451            // Otherwise re-read and check doneness.  If we loop then it must have been a spurious
452            // wakeup
453            localValue = value;
454            if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
455              return getDoneValue(localValue);
456            }
457          }
458        }
459        oldHead = waiters;  // re-read and loop.
460      } while (oldHead != Waiter.TOMBSTONE);
461    }
462    // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
463    // waiter.
464    return getDoneValue(value);
465  }
466
467  /**
468   * Unboxes {@code obj}.  Assumes that obj is not {@code null} or a {@link SetFuture}.
469   */
470  private V getDoneValue(Object obj) throws ExecutionException {
471    // While this seems like it might be too branch-y, simple benchmarking proves it to be
472    // unmeasurable (comparing done AbstractFutures with immediateFuture)
473    if (obj instanceof Cancellation) {
474      throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause);
475    } else if (obj instanceof Failure) {
476      throw new ExecutionException(((Failure) obj).exception);
477    } else if (obj == NULL) {
478      return null;
479    } else {
480      @SuppressWarnings("unchecked")  // this is the only other option
481      V asV = (V) obj;
482      return asV;
483    }
484  }
485
486  @Override
487  public boolean isDone() {
488    final Object localValue = value;
489    return localValue != null & !(localValue instanceof AbstractFuture.SetFuture);
490  }
491
492  @Override
493  public boolean isCancelled() {
494    final Object localValue = value;
495    return localValue instanceof Cancellation;
496  }
497
498  /**
499   * {@inheritDoc}
500   *
501   * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain
502   * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate
503   * {@code Future} that was supplied in the {@code setFuture} call.
504   */
505  @Override
506  public boolean cancel(boolean mayInterruptIfRunning) {
507    Object localValue = value;
508    if (localValue == null | localValue instanceof AbstractFuture.SetFuture) {
509      // Try to delay allocating the exception.  At this point we may still lose the CAS, but it is
510      // certainly less likely.
511      // TODO(lukes): this exception actually makes cancellation significantly more expensive :(
512      // I wonder if we should consider removing it or providing a mechanism to not do it.
513      Throwable cause = GENERATE_CANCELLATION_CAUSES ? newCancellationCause() : null;
514      Object valueToSet = new Cancellation(mayInterruptIfRunning, cause);
515      do {
516        if (ATOMIC_HELPER.casValue(this, localValue, valueToSet)) {
517          // We call interuptTask before calling complete(), first which is consistent with
518          // FutureTask
519          if (mayInterruptIfRunning) {
520            interruptTask();
521          }
522          complete();
523          if (localValue instanceof AbstractFuture.SetFuture) {
524            // propagate cancellation to the future set in setfuture, this is racy, and we don't
525            // care if we are successful or not.
526            ((AbstractFuture<?>.SetFuture) localValue).future.cancel(mayInterruptIfRunning);
527          }
528          return true;
529        }
530        // obj changed, reread
531        localValue = value;
532        // obj cannot be null at this point, because value can only change from null to non-null. So
533        // if value changed (and it did since we lost the CAS), then it cannot be null.
534      } while (localValue instanceof AbstractFuture.SetFuture);
535    }
536    return false;
537  }
538
539  /**
540   * Returns an exception to be used as the cause of the CancellationException thrown by
541   * {@link #get}.
542   *
543   * <p>Note: this method may be called speculatively.  There is no guarantee that the future will
544   * be cancelled if this method is called.
545   */
546  private Throwable newCancellationCause() {
547    return new CancellationException("Future.cancel() was called.");
548  }
549
550  /**
551   * Subclasses can override this method to implement interruption of the
552   * future's computation. The method is invoked automatically by a successful
553   * call to {@link #cancel(boolean) cancel(true)}.
554   *
555   * <p>The default implementation does nothing.
556   *
557   * @since 10.0
558   */
559  protected void interruptTask() {
560  }
561
562  /**
563   * Returns true if this future was cancelled with {@code
564   * mayInterruptIfRunning} set to {@code true}.
565   *
566   * @since 14.0
567   */
568  protected final boolean wasInterrupted() {
569    final Object localValue = value;
570    return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
571  }
572
573  /**
574   * {@inheritDoc}
575   *
576   * @since 10.0
577   */
578  @Override
579  public void addListener(Runnable listener, Executor executor) {
580    checkNotNull(listener, "Runnable was null.");
581    checkNotNull(executor, "Executor was null.");
582    Listener oldHead = listeners;
583    if (oldHead != Listener.TOMBSTONE) {
584      Listener newNode = new Listener(listener, executor);
585      do {
586        newNode.next = oldHead;
587        if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
588          return;
589        }
590        oldHead = listeners;  // re-read
591      } while (oldHead != Listener.TOMBSTONE);
592    }
593    // If we get here then the Listener TOMBSTONE was set, which means the future is done, call
594    // the listener.
595    executeListener(listener, executor);
596  }
597
598  /**
599   * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or
600   * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns,
601   * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was
602   * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code
603   * Future} may have previously been set asynchronously, in which case its result may not be known
604   * yet. That result, though not yet known, cannot by overridden by a call to a {@code set*}
605   * method, only by a call to {@link #cancel}.
606   *
607   * @param value the value to be used as the result
608   * @return true if the attempt was accepted, completing the {@code Future}
609   */
610  protected boolean set(@Nullable V value) {
611    Object valueToSet = value == null ? NULL : value;
612    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
613      complete();
614      return true;
615    }
616    return false;
617  }
618
619  /**
620   * Sets the failed result of this {@code Future} unless this {@code Future} has already been
621   * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this
622   * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b>
623   * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the
624   * {@code Future} may have previously been set asynchronously, in which case its result may not be
625   * known yet. That result, though not yet known, cannot by overridden by a call to a {@code set*}
626   * method, only by a call to {@link #cancel}.
627   *
628   * @param throwable the exception to be used as the failed result
629   * @return true if the attempt was accepted, completing the {@code Future}
630   */
631  protected boolean setException(Throwable throwable) {
632    Object valueToSet = new Failure(checkNotNull(throwable));
633    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
634      complete();
635      return true;
636    }
637    return false;
638  }
639
640  /**
641   * Sets the result of this {@code Future} to match the supplied input {@code Future} once the
642   * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set
643   * (including "set asynchronously," defined below).
644   *
645   * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call
646   * is accepted, then this future is guaranteed to have been completed with the supplied future by
647   * the time this method returns. If the supplied future is not done and the call is accepted, then
648   * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known,
649   * cannot by overridden by a call to a {@code set*} method, only by a call to {@link #cancel}.
650   *
651   * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later
652   * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to
653   * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code
654   * Future}.
655   *
656   * @param future the future to delegate to
657   * @return true if the attempt was accepted, indicating that the {@code Future} was not previously
658   *     cancelled or set.
659   * @since 19.0
660   */
661  @Beta protected boolean setFuture(ListenableFuture<? extends V> future) {
662    checkNotNull(future);
663    Object localValue = value;
664    if (localValue == null) {
665      if (future.isDone()) {
666        return completeWithFuture(future, null);
667      }
668      SetFuture valueToSet = new SetFuture(future);
669      if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
670        // the listener is responsible for calling completeWithFuture, directExecutor is appropriate
671        // since all we are doing is unpacking a completed future which should be fast.
672        try {
673          future.addListener(valueToSet, directExecutor());
674        } catch (Throwable t) {
675          // addListener has thrown an exception!  SetFuture.run can't throw any exceptions so this
676          // must have been caused by addListener itself.  The most likely explanation is a
677          // misconfigured mock.  Try to switch to Failure.
678          Failure failure;
679          try {
680            failure = new Failure(t);
681          } catch (Throwable oomMostLikely) {
682            failure = Failure.FALLBACK_INSTANCE;
683          }
684          // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok.
685          ATOMIC_HELPER.casValue(this, valueToSet, failure);
686        }
687        return true;
688      }
689      localValue = value;  // we lost the cas, fall through and maybe cancel
690    }
691    // The future has already been set to something.  If it is cancellation we should cancel the
692    // incoming future.
693    if (localValue instanceof Cancellation) {
694      // we don't care if it fails, this is best-effort.
695      future.cancel(((Cancellation) localValue).wasInterrupted);
696    }
697    return false;
698  }
699
700  /**
701   * Called when a future passed via setFuture has completed.
702   *
703   * @param future the done future to complete this future with.
704   * @param expected the expected value of the {@link #value} field.
705   */
706  private boolean completeWithFuture(ListenableFuture<? extends V> future, Object expected) {
707    Object valueToSet;
708    if (future instanceof TrustedFuture) {
709      // Break encapsulation for TrustedFuture instances since we know that subclasses cannot
710      // override .get() (since it is final) and therefore this is equivalent to calling .get()
711      // and unpacking the exceptions like we do below (just much faster because it is a single
712      // field read instead of a read, several branches and possibly creating exceptions).
713      valueToSet = ((AbstractFuture<?>) future).value;
714    } else {
715      // Otherwise calculate valueToSet by calling .get()
716      try {
717        V v = Uninterruptibles.getUninterruptibly(future);
718        valueToSet = v == null ? NULL : v;
719      } catch (ExecutionException exception) {
720        valueToSet = new Failure(exception.getCause());
721      } catch (CancellationException cancellation) {
722        valueToSet = new Cancellation(false, cancellation);
723      } catch (Throwable t) {
724        valueToSet = new Failure(t);
725      }
726    }
727    // The only way this can fail is if we raced with another thread calling cancel(). If we lost
728    // that race then there is nothing to do.
729    if (ATOMIC_HELPER.casValue(AbstractFuture.this, expected, valueToSet)) {
730      complete();
731      return true;
732    }
733    return false;
734  }
735
736  /** Unblocks all threads and runs all listeners. */
737  private void complete() {
738    for (Waiter currentWaiter = clearWaiters();
739        currentWaiter != null;
740        currentWaiter = currentWaiter.next) {
741      currentWaiter.unpark();
742    }
743    // We need to reverse the list to handle buggy listeners that depend on ordering.
744    Listener currentListener = clearListeners();
745    Listener reversedList = null;
746    while (currentListener != null) {
747      Listener tmp = currentListener;
748      currentListener = currentListener.next;
749      tmp.next = reversedList;
750      reversedList = tmp;
751    }
752    for (; reversedList != null; reversedList = reversedList.next) {
753      executeListener(reversedList.task, reversedList.executor);
754    }
755    // We call this after the listeners on the theory that done() will only be used for 'cleanup'
756    // oriented tasks (e.g. clearing fields) and so can wait behind listeners which may be executing
757    // more important work.  A counter argument would be that done() is trusted code and therefore
758    // it would be safe to run before potentially slow or poorly behaved listeners.  Reevaluate this
759    // once we have more examples of done() implementations.
760    done();
761  }
762
763  /**
764   * Callback method that is called immediately after the future is completed.
765   *
766   * <p>This is called exactly once, after all listeners have executed.  By default it does nothing.
767   */
768  void done() {}
769
770  /**
771   * Returns the exception that this {@code Future} completed with. This includes completion through
772   * a call to {@link setException} or {@link setFuture}{@code (failedFuture)} but not cancellation.
773   *
774   * @throws RuntimeException if the {@code Future} has not failed
775   */
776  final Throwable trustedGetException() {
777    return ((Failure) value).exception;
778  }
779
780  /**
781   * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts)
782   * the given future (if available).
783   *
784   * <p>This method should be used only when this future is completed. It is designed to be called
785   * from {@code done}.
786   */
787  final void maybePropagateCancellation(@Nullable Future<?> related) {
788    if (related != null & isCancelled()) {
789      related.cancel(wasInterrupted());
790    }
791  }
792
793  /** Clears the {@link #waiters} list and returns the most recently added value. */
794  private Waiter clearWaiters() {
795    Waiter head;
796    do {
797      head = waiters;
798    } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE));
799    return head;
800  }
801
802  /** Clears the {@link #listeners} list and returns the most recently added value. */
803  private Listener clearListeners() {
804    Listener head;
805    do {
806      head = listeners;
807    } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE));
808    return head;
809  }
810
811  /**
812   * Submits the given runnable to the given {@link Executor} catching and logging all
813   * {@linkplain RuntimeException runtime exceptions} thrown by the executor.
814   */
815  private static void executeListener(Runnable runnable, Executor executor) {
816    try {
817      executor.execute(runnable);
818    } catch (RuntimeException e) {
819      // Log it and keep going, bad runnable and/or executor.  Don't
820      // punish the other runnables if we're given a bad one.  We only
821      // catch RuntimeException because we want Errors to propagate up.
822      log.log(Level.SEVERE, "RuntimeException while executing runnable "
823          + runnable + " with executor " + executor, e);
824    }
825  }
826
827  static final CancellationException cancellationExceptionWithCause(
828      @Nullable String message, @Nullable Throwable cause) {
829    CancellationException exception = new CancellationException(message);
830    exception.initCause(cause);
831    return exception;
832  }
833
834  private abstract static class AtomicHelper {
835    /** Non volatile write of the thread to the {@link Waiter#thread} field. */
836    abstract void putThread(Waiter waiter, Thread newValue);
837
838    /** Non volatile write of the waiter to the {@link Waiter#next} field. */
839    abstract void putNext(Waiter waiter, Waiter newValue);
840
841    /** Performs a CAS operation on the {@link #waiters} field. */
842    abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update);
843
844    /** Performs a CAS operation on the {@link #listeners} field. */
845    abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update);
846
847    /** Performs a CAS operation on the {@link #value} field. */
848    abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update);
849  }
850
851  /**
852   * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
853   *
854   * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot
855   * be accessed.
856   */
857  private static final class UnsafeAtomicHelper extends AtomicHelper {
858    static final sun.misc.Unsafe UNSAFE;
859    static final long LISTENERS_OFFSET;
860    static final long WAITERS_OFFSET;
861    static final long VALUE_OFFSET;
862    static final long WAITER_THREAD_OFFSET;
863    static final long WAITER_NEXT_OFFSET;
864
865    static {
866      sun.misc.Unsafe unsafe = null;
867      try {
868        unsafe = sun.misc.Unsafe.getUnsafe();
869      } catch (SecurityException tryReflectionInstead) {
870        try {
871          unsafe = AccessController.doPrivileged(
872              new PrivilegedExceptionAction<sun.misc.Unsafe>() {
873                @Override public sun.misc.Unsafe run() throws Exception {
874                  Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
875                  for (java.lang.reflect.Field f : k.getDeclaredFields()) {
876                    f.setAccessible(true);
877                    Object x = f.get(null);
878                    if (k.isInstance(x)) {
879                      return k.cast(x);
880                    }
881                  }
882                  throw new NoSuchFieldError("the Unsafe");
883                }
884              });
885        } catch (PrivilegedActionException e) {
886          throw new RuntimeException("Could not initialize intrinsics", e.getCause());
887        }
888      }
889      try {
890        Class<?> abstractFuture = AbstractFuture.class;
891        WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters"));
892        LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners"));
893        VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value"));
894        WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread"));
895        WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next"));
896        UNSAFE = unsafe;
897      } catch (Exception e) {
898        throw Throwables.propagate(e);
899      }
900    }
901
902    @Override
903    void putThread(Waiter waiter, Thread newValue) {
904      UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue);
905    }
906
907    @Override
908    void putNext(Waiter waiter, Waiter newValue) {
909      UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue);
910    }
911
912    /** Performs a CAS operation on the {@link #waiters} field. */
913    @Override
914    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
915      return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update);
916    }
917
918    /** Performs a CAS operation on the {@link #listeners} field. */
919    @Override
920    boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
921      return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update);
922    }
923
924    /** Performs a CAS operation on the {@link #value} field. */
925    @Override
926    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
927      return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update);
928    }
929  }
930
931  /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */
932  private static final class SafeAtomicHelper extends AtomicHelper {
933    final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater;
934    final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater;
935    final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater;
936    final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater;
937    final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;
938
939    SafeAtomicHelper(
940        AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater,
941        AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater,
942        AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater,
943        AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater,
944        AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {
945      this.waiterThreadUpdater = waiterThreadUpdater;
946      this.waiterNextUpdater = waiterNextUpdater;
947      this.waitersUpdater = waitersUpdater;
948      this.listenersUpdater = listenersUpdater;
949      this.valueUpdater = valueUpdater;
950    }
951
952    @Override
953    void putThread(Waiter waiter, Thread newValue) {
954      waiterThreadUpdater.lazySet(waiter, newValue);
955    }
956
957    @Override
958    void putNext(Waiter waiter, Waiter newValue) {
959      waiterNextUpdater.lazySet(waiter, newValue);
960    }
961
962    @Override
963    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
964      return waitersUpdater.compareAndSet(future, expect, update);
965    }
966
967    @Override
968    boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
969      return listenersUpdater.compareAndSet(future, expect, update);
970    }
971
972    @Override
973    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
974      return valueUpdater.compareAndSet(future, expect, update);
975    }
976  }
977
978  /**
979   * {@link AtomicHelper} based on {@code synchronized} and volatile writes.
980   *
981   * <p>This is an implementation of last resort for when certain basic VM features are broken
982   * (like AtomicReferenceFieldUpdater).
983   */
984  private static final class SynchronizedHelper extends AtomicHelper {
985    @Override
986    void putThread(Waiter waiter, Thread newValue) {
987      waiter.thread = newValue;
988    }
989
990    @Override
991    void putNext(Waiter waiter, Waiter newValue) {
992      waiter.next = newValue;
993    }
994
995    @Override
996    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
997      synchronized (future) {
998        if (future.waiters == expect) {
999          future.waiters = update;
1000          return true;
1001        }
1002        return false;
1003      }
1004    }
1005
1006    @Override
1007    boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1008      synchronized (future) {
1009        if (future.listeners == expect) {
1010          future.listeners = update;
1011          return true;
1012        }
1013        return false;
1014      }
1015    }
1016
1017    @Override
1018    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1019      synchronized (future) {
1020        if (future.value == expect) {
1021          future.value = update;
1022          return true;
1023        }
1024        return false;
1025      }
1026    }
1027  }
1028}