001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
021import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
022
023import com.google.common.annotations.Beta;
024import com.google.common.annotations.GwtCompatible;
025import com.google.common.base.Throwables;
026
027import java.security.AccessController;
028import java.security.PrivilegedActionException;
029import java.security.PrivilegedExceptionAction;
030import java.util.concurrent.CancellationException;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.Executor;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
037import java.util.concurrent.locks.LockSupport;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040
041import javax.annotation.Nullable;
042
043/**
044 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More
045 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture},
046 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an
047 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, Function)
048 * Futures.transform} and {@link Futures#catching(ListenableFuture, Class, Function)
049 * Futures.catching}.
050 *
051 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way
052 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link
053 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override
054 * {@link #interruptTask()}, which will be invoked automatically if a call to {@link
055 * #cancel(boolean) cancel(true)} succeeds in canceling the future. Subclasses should rarely
056 * override other methods.
057 *
058 * @author Sven Mawson
059 * @author Luke Sandberg
060 * @since 1.0
061 */
062@GwtCompatible(emulated = true)
063public abstract class AbstractFuture<V> implements ListenableFuture<V> {
064
065  /**
066   * A less abstract subclass of AbstractFuture.  This can be used to optimize setFuture by ensuring
067   * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}.
068   */
069  abstract static class TrustedFuture<V> extends AbstractFuture<V> {
070    // N.B. cancel is not overridden to be final, because many future utilities need to override
071    // cancel in order to propagate cancellation to other futures.
072
073    @Override public final V get() throws InterruptedException, ExecutionException {
074      return super.get();
075    }
076
077    @Override public final V get(long timeout, TimeUnit unit)
078        throws InterruptedException, ExecutionException, TimeoutException {
079      return super.get(timeout, unit);
080    }
081
082    @Override public final boolean isDone() {
083      return super.isDone();
084    }
085
086    @Override public final boolean isCancelled() {
087      return super.isCancelled();
088    }
089
090    @Override public final void addListener(Runnable listener, Executor executor) {
091      super.addListener(listener, executor);
092    }
093  }
094
095  // Logger to log exceptions caught when running listeners.
096  private static final Logger log = Logger.getLogger(AbstractFuture.class.getName());
097
098  // A heuristic for timed gets.  If the remaining timeout is less than this, spin instead of
099  // blocking.  This value is what AbstractQueuedSynchronizer uses.
100  private static final long SPIN_THRESHOLD_NANOS = 1000L;
101
102  private static final AtomicHelper ATOMIC_HELPER;
103  private static final AtomicReferenceFieldUpdater<Waiter, Thread> WAITER_THREAD_UPDATER;
104  private static final AtomicReferenceFieldUpdater<Waiter, Waiter> WAITER_NEXT_UPDATER;
105  private static final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> WAITERS_UPDATER;
106  private static final AtomicReferenceFieldUpdater<AbstractFuture, Listener> LISTENERS_UPDATER;
107  private static final AtomicReferenceFieldUpdater<AbstractFuture, Object> VALUE_UPDATER;
108
109  static {
110    AtomicHelper helper = null;
111    try {
112      helper = UnsafeAtomicHelperFactory.values()[0].tryCreateUnsafeAtomicHelper();
113    } catch (Throwable e) {
114      // catch absolutely everything and fall through
115    }
116    if (helper == null) {
117      // The access control checks that ARFU does means the caller class has to be AbstractFuture
118      // instead of SafeAtomicHelper, so we annoyingly define these here
119      WAITER_THREAD_UPDATER = newUpdater(Waiter.class, Thread.class, "thread");
120      WAITER_NEXT_UPDATER = newUpdater(Waiter.class, Waiter.class, "next");
121      WAITERS_UPDATER = newUpdater(AbstractFuture.class, Waiter.class, "waiters");
122      LISTENERS_UPDATER = newUpdater(AbstractFuture.class, Listener.class, "listeners");
123      VALUE_UPDATER = newUpdater(AbstractFuture.class, Object.class, "value");
124      helper = new SafeAtomicHelper();
125    } else {
126      WAITER_THREAD_UPDATER = null;
127      WAITER_NEXT_UPDATER = null;
128      WAITERS_UPDATER = null;
129      LISTENERS_UPDATER = null;
130      VALUE_UPDATER = null;
131    }
132    ATOMIC_HELPER = helper;
133
134    // Prevent rare disastrous classloading in first call to LockSupport.park.
135    // See: https://bugs.openjdk.java.net/browse/JDK-8074773
136    Class<?> ensureLoaded = LockSupport.class;
137  }
138
139  /**
140   * Waiter links form a Treiber stack, in the {@link #waiters} field.
141   */
142  private static final class Waiter {
143    static final Waiter TOMBSTONE = new Waiter(false /* ignored param */);
144
145    @Nullable volatile Thread thread;
146    @Nullable volatile Waiter next;
147
148    // Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded
149    // before the ATOMIC_HELPER.  Apparently this is possible on some android platforms.
150    Waiter(boolean unused) {}
151
152    Waiter() {
153      // avoid volatile write, write is made visible by subsequent CAS on waiters field
154      ATOMIC_HELPER.putThread(this, Thread.currentThread());
155    }
156
157    // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters
158    // field.
159    void setNext(Waiter next) {
160      ATOMIC_HELPER.putNext(this, next);
161    }
162
163    void unpark() {
164      // This is racy with removeWaiter.  The consequence of the race is that we may spuriously
165      // call unpark even though the thread has already removed itself from the list.  But even if
166      // we did use a CAS, that race would still exist (it would just be ever so slightly smaller).
167      Thread w = thread;
168      if (w != null) {
169        thread = null;
170        LockSupport.unpark(w);
171      }
172    }
173  }
174
175  /**
176   * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted
177   * nodes.  This is an O(n) operation in the common case (and O(n^2) in the worst), but we are
178   * saved by two things.
179   * <ul>
180   *   <li>This is only called when a waiting thread times out or is interrupted.  Both of which
181   *       should be rare.
182   *   <li>The waiters list should be very short.
183   * </ul>
184   */
185  private void removeWaiter(Waiter node) {
186    node.thread = null;  // mark as 'deleted'
187    restart: while (true) {
188      Waiter pred = null;
189      Waiter curr = waiters;
190      if (curr == Waiter.TOMBSTONE) {
191        return;  // give up if someone is calling complete
192      }
193      Waiter succ;
194      while (curr != null) {
195        succ = curr.next;
196        if (curr.thread != null) {  // we aren't unlinking this node, update pred.
197          pred = curr;
198        } else if (pred != null) { // We are unlinking this node and it has a predecessor.
199          pred.next = succ;
200          if (pred.thread == null) {  // We raced with another node that unlinked pred. Restart.
201            continue restart;
202          }
203        } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) {  // We are unlinking head
204          continue restart;  // We raced with an add or complete
205        }
206        curr = succ;
207      }
208      break;
209    }
210  }
211
212  /** Listeners also form a stack through the {@link #listeners} field. */
213  private static final class Listener {
214    static final Listener TOMBSTONE = new Listener(null, null);
215    final Runnable task;
216    final Executor executor;
217
218    // writes to next are made visible by subsequent CAS's on the listeners field
219    @Nullable Listener next;
220
221    Listener(Runnable task, Executor executor) {
222      this.task = task;
223      this.executor = executor;
224    }
225  }
226
227  /** A special value to represent {@code null}. */
228  private static final Object NULL = new Object();
229
230  /** A special value to represent failure, when {@link #setException} is called successfully. */
231  private static final class Failure {
232    static final Failure FALLBACK_INSTANCE = new Failure(
233        new Throwable("Failure occurred while trying to finish a future.") {
234          @Override public synchronized Throwable fillInStackTrace() {
235            return this;  // no stack trace
236          }
237        });
238    final Throwable exception;
239
240    Failure(Throwable exception) {
241      this.exception = checkNotNull(exception);
242    }
243  }
244
245  /** A special value to represent cancellation and the 'wasInterrupted' bit. */
246  private static final class Cancellation {
247    final boolean wasInterrupted;
248    final Throwable cause;
249
250    Cancellation(boolean wasInterrupted, Throwable cause) {
251      this.wasInterrupted = wasInterrupted;
252      this.cause = checkNotNull(cause);
253    }
254  }
255
256  /** A special value that encodes the 'setFuture' state. */
257  private final class SetFuture implements Runnable {
258    final ListenableFuture<? extends V> future;
259
260    SetFuture(ListenableFuture<? extends V> future) {
261      this.future = future;
262    }
263
264    @Override public void run() {
265      if (value != this) {
266        // nothing to do, we must have been cancelled
267        return;
268      }
269      completeWithFuture(future, this);
270    }
271  }
272
273  // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is
274  // available.
275  /**
276   * This field encodes the current state of the future.
277   *
278   * <p>The valid values are:
279   * <ul>
280   *   <li>{@code null} initial state, nothing has happened.
281   *   <li>{@link Cancellation} terminal state, {@code cancel} was called.
282   *   <li>{@link Failure} terminal state, {@code setException} was called.
283   *   <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
284   *   <li>{@link #NULL} terminal state, {@code set(null)} was called.
285   *   <li>Any other non-null value, terminal state, {@code set} was called with a non-null
286   *       argument.
287   * </ul>
288   */
289  private volatile Object value;
290
291  /** All listeners. */
292  private volatile Listener listeners;
293
294  /** All waiting threads. */
295  private volatile Waiter waiters;
296
297  /**
298   * Constructor for use by subclasses.
299   */
300  protected AbstractFuture() {}
301
302  /*
303   * Improve the documentation of when InterruptedException is thrown. Our
304   * behavior matches the JDK's, but the JDK's documentation is misleading.
305   */
306
307  // Gets and Timed Gets
308  //
309  // * Be responsive to interruption
310  // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the
311  //   waiters field.
312  // * Future completion is defined by when #value becomes non-null/non SetFuture
313  // * Future completion can be observed if the waiters field contains a TOMBSTONE
314
315  // Timed Get
316  // There are a few design constraints to consider
317  // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I
318  //   have observed 12 micros on 64 bit linux systems to wake up a parked thread).  So if the
319  //   timeout is small we shouldn't park().  This needs to be traded off with the cpu overhead of
320  //   spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for
321  //   similar purposes.
322  // * We want to behave reasonably for timeouts of 0
323  // * We are more responsive to completion than timeouts.  This is because parkNanos depends on
324  //   system scheduling and as such we could either miss our deadline, or unpark() could be delayed
325  //   so that it looks like we timed out even though we didn't.  For comparison FutureTask respects
326  //   completion preferably and AQS is non-deterministic (depends on where in the queue the waiter
327  //   is).  If we wanted to be strict about it, we could store the unpark() time in the Waiter
328  //   node and we could use that to make a decision about whether or not we timed out prior to
329  //   being unparked.
330
331  /**
332   * {@inheritDoc}
333   *
334   * <p>The default {@link AbstractFuture} implementation throws {@code
335   * InterruptedException} if the current thread is interrupted before or during
336   * the call, even if the value is already available.
337   *
338   * @throws InterruptedException if the current thread was interrupted before
339   *     or during the call (optional but recommended).
340   * @throws CancellationException {@inheritDoc}
341   */
342  @Override
343  public V get(long timeout, TimeUnit unit)
344      throws InterruptedException, TimeoutException, ExecutionException {
345    // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop
346    // at the bottom and throw a timeoutexception.
347    long remainingNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit.
348    if (Thread.interrupted()) {
349      throw new InterruptedException();
350    }
351    Object localValue = value;
352    if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
353      return getDoneValue(localValue);
354    }
355    // we delay calling nanoTime until we know we will need to either park or spin
356    final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0;
357    long_wait_loop: if (remainingNanos >= SPIN_THRESHOLD_NANOS) {
358      Waiter oldHead = waiters;
359      if (oldHead != Waiter.TOMBSTONE) {
360        Waiter node = new Waiter();
361        do {
362          node.setNext(oldHead);
363          if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
364            while (true) {
365              LockSupport.parkNanos(this, remainingNanos);
366              // Check interruption first, if we woke up due to interruption we need to honor that.
367              if (Thread.interrupted()) {
368                removeWaiter(node);
369                throw new InterruptedException();
370              }
371
372              // Otherwise re-read and check doneness.  If we loop then it must have been a spurious
373              // wakeup
374              localValue = value;
375              if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
376                return getDoneValue(localValue);
377              }
378
379              // timed out?
380              remainingNanos = endNanos - System.nanoTime();
381              if (remainingNanos < SPIN_THRESHOLD_NANOS) {
382                // Remove the waiter, one way or another we are done parking this thread.
383                removeWaiter(node);
384                break long_wait_loop;  // jump down to the busy wait loop
385              }
386            }
387          }
388          oldHead = waiters;  // re-read and loop.
389        } while (oldHead != Waiter.TOMBSTONE);
390      }
391      // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
392      // waiter.
393      return getDoneValue(value);
394    }
395    // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the
396    // waiters list
397    while (remainingNanos > 0) {
398      localValue = value;
399      if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
400        return getDoneValue(localValue);
401      }
402      if (Thread.interrupted()) {
403        throw new InterruptedException();
404      }
405      remainingNanos = endNanos - System.nanoTime();
406    }
407    throw new TimeoutException();
408  }
409
410  /*
411   * Improve the documentation of when InterruptedException is thrown. Our
412   * behavior matches the JDK's, but the JDK's documentation is misleading.
413   */
414  /**
415   * {@inheritDoc}
416   *
417   * <p>The default {@link AbstractFuture} implementation throws {@code
418   * InterruptedException} if the current thread is interrupted before or during
419   * the call, even if the value is already available.
420   *
421   * @throws InterruptedException if the current thread was interrupted before
422   *     or during the call (optional but recommended).
423   * @throws CancellationException {@inheritDoc}
424   */
425  @Override public V get() throws InterruptedException, ExecutionException {
426    if (Thread.interrupted()) {
427      throw new InterruptedException();
428    }
429    Object localValue = value;
430    if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
431      return getDoneValue(localValue);
432    }
433    Waiter oldHead = waiters;
434    if (oldHead != Waiter.TOMBSTONE) {
435      Waiter node = new Waiter();
436      do {
437        node.setNext(oldHead);
438        if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
439          // we are on the stack, now wait for completion.
440          while (true) {
441            LockSupport.park(this);
442            // Check interruption first, if we woke up due to interruption we need to honor that.
443            if (Thread.interrupted()) {
444              removeWaiter(node);
445              throw new InterruptedException();
446            }
447            // Otherwise re-read and check doneness.  If we loop then it must have been a spurious
448            // wakeup
449            localValue = value;
450            if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
451              return getDoneValue(localValue);
452            }
453          }
454        }
455        oldHead = waiters;  // re-read and loop.
456      } while (oldHead != Waiter.TOMBSTONE);
457    }
458    // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
459    // waiter.
460    return getDoneValue(value);
461  }
462
463  /**
464   * Unboxes {@code obj}.  Assumes that obj is not {@code null} or a {@link SetFuture}.
465   */
466  private V getDoneValue(Object obj) throws ExecutionException {
467    // While this seems like it might be too branch-y, simple benchmarking proves it to be
468    // unmeasurable (comparing done AbstractFutures with immediateFuture)
469    if (obj instanceof Cancellation) {
470      throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause);
471    } else if (obj instanceof Failure) {
472      throw new ExecutionException(((Failure) obj).exception);
473    } else if (obj == NULL) {
474      return null;
475    } else {
476      @SuppressWarnings("unchecked")  // this is the only other option
477      V asV = (V) obj;
478      return asV;
479    }
480  }
481
482  @Override
483  public boolean isDone() {
484    final Object localValue = value;
485    return localValue != null & !(localValue instanceof AbstractFuture.SetFuture);
486  }
487
488  @Override
489  public boolean isCancelled() {
490    final Object localValue = value;
491    return localValue instanceof Cancellation;
492  }
493
494  /**
495   * {@inheritDoc}
496   *
497   * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain
498   * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate
499   * {@code Future} that was supplied in the {@code setFuture} call.
500   */
501  @Override
502  public boolean cancel(boolean mayInterruptIfRunning) {
503    Object localValue = value;
504    if (localValue == null | localValue instanceof AbstractFuture.SetFuture) {
505      // Try to delay allocating the exception.  At this point we may still lose the CAS, but it is
506      // certainly less likely.
507      // TODO(lukes): this exception actually makes cancellation significantly more expensive :(
508      // I wonder if we should consider removing it or providing a mechanism to not do it.
509      Object valueToSet = new Cancellation(mayInterruptIfRunning, newCancellationCause());
510      do {
511        if (ATOMIC_HELPER.casValue(this, localValue, valueToSet)) {
512          // We call interuptTask before calling complete(), first which is consistent with
513          // FutureTask
514          if (mayInterruptIfRunning) {
515            interruptTask();
516          }
517          complete();
518          if (localValue instanceof AbstractFuture.SetFuture) {
519            // propagate cancellation to the future set in setfuture, this is racy, and we don't
520            // care if we are successful or not.
521            ((AbstractFuture<?>.SetFuture) localValue).future.cancel(mayInterruptIfRunning);
522          }
523          return true;
524        }
525        // obj changed, reread
526        localValue = value;
527        // obj cannot be null at this point, because value can only change from null to non-null. So
528        // if value changed (and it did since we lost the CAS), then it cannot be null.
529      } while (localValue instanceof AbstractFuture.SetFuture);
530    }
531    return false;
532  }
533
534  /**
535   * Returns an exception to be used as the cause of the CancellationException thrown by
536   * {@link #get}.
537   *
538   * <p>Note: this method may be called speculatively.  There is no guarantee that the future will
539   * be cancelled if this method is called.
540   *
541   * @since 19.0
542   */
543  @Beta
544  protected Throwable newCancellationCause() {
545    return new CancellationException("Future.cancel() was called.");
546  }
547
548  /**
549   * Subclasses can override this method to implement interruption of the
550   * future's computation. The method is invoked automatically by a successful
551   * call to {@link #cancel(boolean) cancel(true)}.
552   *
553   * <p>The default implementation does nothing.
554   *
555   * @since 10.0
556   */
557  protected void interruptTask() {
558  }
559
560  /**
561   * Returns true if this future was cancelled with {@code
562   * mayInterruptIfRunning} set to {@code true}.
563   *
564   * @since 14.0
565   */
566  protected final boolean wasInterrupted() {
567    final Object localValue = value;
568    return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
569  }
570
571  /**
572   * {@inheritDoc}
573   *
574   * @since 10.0
575   */
576  @Override
577  public void addListener(Runnable listener, Executor executor) {
578    checkNotNull(listener, "Runnable was null.");
579    checkNotNull(executor, "Executor was null.");
580    Listener oldHead = listeners;
581    if (oldHead != Listener.TOMBSTONE) {
582      Listener newNode = new Listener(listener, executor);
583      do {
584        newNode.next = oldHead;
585        if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
586          return;
587        }
588        oldHead = listeners;  // re-read
589      } while (oldHead != Listener.TOMBSTONE);
590    }
591    // If we get here then the Listener TOMBSTONE was set, which means the future is done, call
592    // the listener.
593    executeListener(listener, executor);
594  }
595
596  /**
597   * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or
598   * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns,
599   * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was
600   * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code
601   * Future} may have previously been set asynchronously, in which case its result may not be known
602   * yet. That result, though not yet known, cannot by overridden by a call to a {@code set*}
603   * method, only by a call to {@link #cancel}.
604   *
605   * @param value the value to be used as the result
606   * @return true if the attempt was accepted, completing the {@code Future}
607   */
608  protected boolean set(@Nullable V value) {
609    Object valueToSet = value == null ? NULL : value;
610    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
611      complete();
612      return true;
613    }
614    return false;
615  }
616
617  /**
618   * Sets the failed result of this {@code Future} unless this {@code Future} has already been
619   * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this
620   * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b>
621   * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the
622   * {@code Future} may have previously been set asynchronously, in which case its result may not be
623   * known yet. That result, though not yet known, cannot by overridden by a call to a {@code set*}
624   * method, only by a call to {@link #cancel}.
625   *
626   * @param throwable the exception to be used as the failed result
627   * @return true if the attempt was accepted, completing the {@code Future}
628   */
629  protected boolean setException(Throwable throwable) {
630    Object valueToSet = new Failure(checkNotNull(throwable));
631    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
632      complete();
633      return true;
634    }
635    return false;
636  }
637
638  /**
639   * Sets the result of this {@code Future} to match the supplied input {@code Future} once the
640   * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set
641   * (including "set asynchronously," defined below).
642   *
643   * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call
644   * is accepted, then this future is guaranteed to have been completed with the supplied future by
645   * the time this method returns. If the supplied future is not done and the call is accepted, then
646   * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known,
647   * cannot by overridden by a call to a {@code set*} method, only by a call to {@link #cancel}.
648   *
649   * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later
650   * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to
651   * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code
652   * Future}.
653   *
654   * @param future the future to delegate to
655   * @return true if the attempt was accepted, indicating that the {@code Future} was not previously
656   *     cancelled or set.
657   * @since 19.0
658   */
659  @Beta protected boolean setFuture(ListenableFuture<? extends V> future) {
660    checkNotNull(future);
661    Object localValue = value;
662    if (localValue == null) {
663      if (future.isDone()) {
664        return completeWithFuture(future, null);
665      }
666      SetFuture valueToSet = new SetFuture(future);
667      if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
668        // the listener is responsible for calling completeWithFuture, directExecutor is appropriate
669        // since all we are doing is unpacking a completed future which should be fast.
670        try {
671          future.addListener(valueToSet, directExecutor());
672        } catch (Throwable t) {
673          // addListener has thrown an exception!  SetFuture.run can't throw any exceptions so this
674          // must have been caused by addListener itself.  The most likely explanation is a
675          // misconfigured mock.  Try to switch to Failure.
676          Failure failure;
677          try {
678            failure = new Failure(t);
679          } catch (Throwable oomMostLikely) {
680            failure = Failure.FALLBACK_INSTANCE;
681          }
682          // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok.
683          ATOMIC_HELPER.casValue(this, valueToSet, failure);
684        }
685        return true;
686      }
687      localValue = value;  // we lost the cas, fall through and maybe cancel
688    }
689    // The future has already been set to something.  If it is cancellation we should cancel the
690    // incoming future.
691    if (localValue instanceof Cancellation) {
692      // we don't care if it fails, this is best-effort.
693      future.cancel(((Cancellation) localValue).wasInterrupted);
694    }
695    return false;
696  }
697
698  /**
699   * Called when a future passed via setFuture has completed.
700   *
701   * @param future the done future to complete this future with.
702   * @param expected the expected value of the {@link #value} field.
703   */
704  private boolean completeWithFuture(ListenableFuture<? extends V> future, Object expected) {
705    Object valueToSet;
706    if (future instanceof TrustedFuture) {
707      // Break encapsulation for TrustedFuture instances since we know that subclasses cannot
708      // override .get() (since it is final) and therefore this is equivalent to calling .get()
709      // and unpacking the exceptions like we do below (just much faster because it is a single
710      // field read instead of a read, several branches and possibly creating exceptions).
711      valueToSet = ((AbstractFuture<?>) future).value;
712    } else {
713      // Otherwise calculate valueToSet by calling .get()
714      try {
715        V v = Uninterruptibles.getUninterruptibly(future);
716        valueToSet = v == null ? NULL : v;
717      } catch (ExecutionException exception) {
718        valueToSet = new Failure(exception.getCause());
719      } catch (CancellationException cancellation) {
720        valueToSet = new Cancellation(false, cancellation);
721      } catch (Throwable t) {
722        valueToSet = new Failure(t);
723      }
724    }
725    // The only way this can fail is if we raced with another thread calling cancel(). If we lost
726    // that race then there is nothing to do.
727    if (ATOMIC_HELPER.casValue(AbstractFuture.this, expected, valueToSet)) {
728      complete();
729      return true;
730    }
731    return false;
732  }
733
734  /** Unblocks all threads and runs all listeners. */
735  private void complete() {
736    for (Waiter currentWaiter = clearWaiters();
737        currentWaiter != null;
738        currentWaiter = currentWaiter.next) {
739      currentWaiter.unpark();
740    }
741    // We need to reverse the list to handle buggy listeners that depend on ordering.
742    Listener currentListener = clearListeners();
743    Listener reversedList = null;
744    while (currentListener != null) {
745      Listener tmp = currentListener;
746      currentListener = currentListener.next;
747      tmp.next = reversedList;
748      reversedList = tmp;
749    }
750    for (; reversedList != null; reversedList = reversedList.next) {
751      executeListener(reversedList.task, reversedList.executor);
752    }
753    // We call this after the listeners on the theory that done() will only be used for 'cleanup'
754    // oriented tasks (e.g. clearing fields) and so can wait behind listeners which may be executing
755    // more important work.  A counter argument would be that done() is trusted code and therefore
756    // it would be safe to run before potentially slow or poorly behaved listeners.  Reevaluate this
757    // once we have more examples of done() implementations.
758    done();
759  }
760
761  /**
762   * Callback method that is called immediately after the future is completed.
763   *
764   * <p>This is called exactly once, after all listeners have executed.  By default it does nothing.
765   */
766  void done() {}
767
768  /**
769   * Returns the exception that this {@code Future} completed with. This includes completion through
770   * a call to {@link setException} or {@link setFuture}{@code (failedFuture)} but not cancellation.
771   *
772   * @throws RuntimeException if the {@code Future} has not failed
773   */
774  final Throwable trustedGetException() {
775    return ((Failure) value).exception;
776  }
777
778  /**
779   * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts)
780   * the given future (if available).
781   *
782   * <p>This method should be used only when this future is completed. It is designed to be called
783   * from {@code done}.
784   */
785  final void maybePropagateCancellation(@Nullable Future<?> related) {
786    if (related != null & isCancelled()) {
787      related.cancel(wasInterrupted());
788    }
789  }
790
791  /** Clears the {@link #waiters} list and returns the most recently added value. */
792  private Waiter clearWaiters() {
793    Waiter head;
794    do {
795      head = waiters;
796    } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE));
797    return head;
798  }
799
800  /** Clears the {@link #listeners} list and returns the most recently added value. */
801  private Listener clearListeners() {
802    Listener head;
803    do {
804      head = listeners;
805    } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE));
806    return head;
807  }
808
809  /**
810   * Submits the given runnable to the given {@link Executor} catching and logging all
811   * {@linkplain RuntimeException runtime exceptions} thrown by the executor.
812   */
813  private static void executeListener(Runnable runnable, Executor executor) {
814    try {
815      executor.execute(runnable);
816    } catch (RuntimeException e) {
817      // Log it and keep going, bad runnable and/or executor.  Don't
818      // punish the other runnables if we're given a bad one.  We only
819      // catch RuntimeException because we want Errors to propagate up.
820      log.log(Level.SEVERE, "RuntimeException while executing runnable "
821          + runnable + " with executor " + executor, e);
822    }
823  }
824
825  static final CancellationException cancellationExceptionWithCause(
826      @Nullable String message, @Nullable Throwable cause) {
827    CancellationException exception = new CancellationException(message);
828    exception.initCause(cause);
829    return exception;
830  }
831
832  private abstract static class AtomicHelper {
833    /** Non volatile write of the thread to the {@link Waiter#thread} field. */
834    abstract void putThread(Waiter waiter, Thread thread);
835
836    /** Non volatile write of the waiter to the {@link Waiter#next} field. */
837    abstract void putNext(Waiter waiter, Waiter next);
838
839    /** Performs a CAS operation on the {@link #waiters} field. */
840    abstract boolean casWaiters(AbstractFuture future, Waiter curr, Waiter next);
841
842    /** Performs a CAS operation on the {@link #listeners} field. */
843    abstract boolean casListeners(AbstractFuture future, Listener curr, Listener next);
844
845    /** Performs a CAS operation on the {@link #value} field. */
846    abstract boolean casValue(AbstractFuture future, Object expected, Object v);
847  }
848
849  /**
850   * Temporary hack to hide the reference to {@link UnsafeAtomicHelper} from Android. The caller of
851   * this code will execute {@link #tryCreateUnsafeAtomicHelper} on the <b>first</b> enum value
852   * present. On the server, this will try to create {@link UnsafeAtomicHelper}. On Android, it will
853   * just return {@code null}.
854   */
855  private enum UnsafeAtomicHelperFactory {
856    @SuppressUnderAndroid // temporarily while we make Proguard tolerate Unsafe
857    REALLY_TRY_TO_CREATE {
858      @Override
859      AtomicHelper tryCreateUnsafeAtomicHelper() {
860        return new UnsafeAtomicHelper();
861      }
862    },
863
864    DONT_EVEN_TRY_TO_CREATE {
865      @Override
866      AtomicHelper tryCreateUnsafeAtomicHelper() {
867        return null;
868      }
869    },
870
871  ;
872
873    abstract AtomicHelper tryCreateUnsafeAtomicHelper();
874  }
875
876  /**
877   * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
878   *
879   * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot
880   * be accessed.
881   */
882  @SuppressUnderAndroid // temporarily while we make Proguard tolerate Unsafe
883  private static final class UnsafeAtomicHelper extends AtomicHelper {
884    static final sun.misc.Unsafe UNSAFE;
885    static final long LISTENERS_OFFSET;
886    static final long WAITERS_OFFSET;
887    static final long VALUE_OFFSET;
888    static final long WAITER_THREAD_OFFSET;
889    static final long WAITER_NEXT_OFFSET;
890
891    static {
892      sun.misc.Unsafe unsafe = null;
893      try {
894        unsafe = sun.misc.Unsafe.getUnsafe();
895      } catch (SecurityException tryReflectionInstead) {
896        try {
897          unsafe = AccessController.doPrivileged(
898              new PrivilegedExceptionAction<sun.misc.Unsafe>() {
899                @Override public sun.misc.Unsafe run() throws Exception {
900                  Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
901                  for (java.lang.reflect.Field f : k.getDeclaredFields()) {
902                    f.setAccessible(true);
903                    Object x = f.get(null);
904                    if (k.isInstance(x)) {
905                      return k.cast(x);
906                    }
907                  }
908                  throw new NoSuchFieldError("the Unsafe");
909                }
910              });
911        } catch (PrivilegedActionException e) {
912          throw new RuntimeException("Could not initialize intrinsics", e.getCause());
913        }
914      }
915      try {
916        Class<?> abstractFuture = AbstractFuture.class;
917        WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters"));
918        LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners"));
919        VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value"));
920        WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread"));
921        WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next"));
922        UNSAFE = unsafe;
923      } catch (Exception e) {
924        throw Throwables.propagate(e);
925      }
926    }
927
928    @Override
929    void putThread(Waiter waiter, Thread thread) {
930      UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, thread);
931    }
932
933    @Override
934    void putNext(Waiter waiter, Waiter next) {
935      UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, next);
936    }
937
938    /** Performs a CAS operation on the {@link #waiters} field. */
939    @Override
940    boolean casWaiters(AbstractFuture future, Waiter curr, Waiter next) {
941      return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, curr, next);
942    }
943
944    /** Performs a CAS operation on the {@link #listeners} field. */
945    @Override
946    boolean casListeners(AbstractFuture future, Listener curr, Listener next) {
947      return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, curr, next);
948    }
949
950    /** Performs a CAS operation on the {@link #value} field. */
951    @Override
952    boolean casValue(AbstractFuture future, Object expected, Object v) {
953      return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expected, v);
954    }
955  }
956
957  /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */
958  private static final class SafeAtomicHelper extends AtomicHelper {
959    @Override void putThread(Waiter waiter, Thread thread) {
960      WAITER_THREAD_UPDATER.lazySet(waiter, thread);
961    }
962
963    @Override void putNext(Waiter waiter, Waiter next) {
964      WAITER_NEXT_UPDATER.lazySet(waiter, next);
965    }
966
967    @Override boolean casWaiters(AbstractFuture future, Waiter curr, Waiter next) {
968      return WAITERS_UPDATER.compareAndSet(future, curr, next);
969    }
970
971    @Override boolean casListeners(AbstractFuture future, Listener curr, Listener next) {
972      return LISTENERS_UPDATER.compareAndSet(future, curr, next);
973    }
974
975    @Override boolean casValue(AbstractFuture future, Object expected, Object v) {
976      return VALUE_UPDATER.compareAndSet(future, expected, v);
977    }
978  }
979}