001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.base.Throwables.throwIfUnchecked;
019import static com.google.common.util.concurrent.Futures.getDone;
020import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
021import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
022
023import com.google.common.annotations.Beta;
024import com.google.common.annotations.GwtCompatible;
025import com.google.errorprone.annotations.CanIgnoreReturnValue;
026import com.google.j2objc.annotations.ReflectionSupport;
027import java.security.AccessController;
028import java.security.PrivilegedActionException;
029import java.security.PrivilegedExceptionAction;
030import java.util.concurrent.CancellationException;
031import java.util.concurrent.ExecutionException;
032import java.util.concurrent.Executor;
033import java.util.concurrent.Future;
034import java.util.concurrent.TimeUnit;
035import java.util.concurrent.TimeoutException;
036import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
037import java.util.concurrent.locks.LockSupport;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040import javax.annotation.Nullable;
041
042/**
043 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More
044 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture},
045 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an
046 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, Function)
047 * Futures.transform} and {@link Futures#catching(ListenableFuture, Class, Function)
048 * Futures.catching}.
049 *
050 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way
051 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link
052 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override
053 * {@link #interruptTask()}, which will be invoked automatically if a call to {@link
054 * #cancel(boolean) cancel(true)} succeeds in canceling the future. Subclasses should rarely
055 * override other methods.
056 *
057 * @author Sven Mawson
058 * @author Luke Sandberg
059 * @since 1.0
060 */
061@GwtCompatible(emulated = true)
062@ReflectionSupport(value = ReflectionSupport.Level.FULL)
063public abstract class AbstractFuture<V> implements ListenableFuture<V> {
064  // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, ||
065
066  private static final boolean GENERATE_CANCELLATION_CAUSES =
067      Boolean.parseBoolean(
068          System.getProperty("guava.concurrent.generate_cancellation_cause", "false"));
069
070  /**
071   * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring
072   * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}.
073   */
074  abstract static class TrustedFuture<V> extends AbstractFuture<V> {
075    @CanIgnoreReturnValue
076    @Override
077    public final V get() throws InterruptedException, ExecutionException {
078      return super.get();
079    }
080
081    @CanIgnoreReturnValue
082    @Override
083    public final V get(long timeout, TimeUnit unit)
084        throws InterruptedException, ExecutionException, TimeoutException {
085      return super.get(timeout, unit);
086    }
087
088    @Override
089    public final boolean isDone() {
090      return super.isDone();
091    }
092
093    @Override
094    public final boolean isCancelled() {
095      return super.isCancelled();
096    }
097
098    @Override
099    public final void addListener(Runnable listener, Executor executor) {
100      super.addListener(listener, executor);
101    }
102
103    @CanIgnoreReturnValue
104    @Override
105    public final boolean cancel(boolean mayInterruptIfRunning) {
106      return super.cancel(mayInterruptIfRunning);
107    }
108  }
109
110  // Logger to log exceptions caught when running listeners.
111  private static final Logger log = Logger.getLogger(AbstractFuture.class.getName());
112
113  // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of
114  // blocking. This value is what AbstractQueuedSynchronizer uses.
115  private static final long SPIN_THRESHOLD_NANOS = 1000L;
116
117  private static final AtomicHelper ATOMIC_HELPER;
118
119  static {
120    AtomicHelper helper;
121
122    try {
123      helper = new UnsafeAtomicHelper();
124    } catch (Throwable unsafeFailure) {
125      // catch absolutely everything and fall through to our 'SafeAtomicHelper'
126      // The access control checks that ARFU does means the caller class has to be AbstractFuture
127      // instead of SafeAtomicHelper, so we annoyingly define these here
128      try {
129        helper =
130            new SafeAtomicHelper(
131                newUpdater(Waiter.class, Thread.class, "thread"),
132                newUpdater(Waiter.class, Waiter.class, "next"),
133                newUpdater(AbstractFuture.class, Waiter.class, "waiters"),
134                newUpdater(AbstractFuture.class, Listener.class, "listeners"),
135                newUpdater(AbstractFuture.class, Object.class, "value"));
136      } catch (Throwable atomicReferenceFieldUpdaterFailure) {
137        // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause
138        // getDeclaredField to throw a NoSuchFieldException when the field is definitely there.
139        // For these users fallback to a suboptimal implementation, based on synchronized. This will
140        // be a definite performance hit to those users.
141        log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure);
142        log.log(Level.SEVERE, "SafeAtomicHelper is broken!", atomicReferenceFieldUpdaterFailure);
143        helper = new SynchronizedHelper();
144      }
145    }
146    ATOMIC_HELPER = helper;
147
148    // Prevent rare disastrous classloading in first call to LockSupport.park.
149    // See: https://bugs.openjdk.java.net/browse/JDK-8074773
150    @SuppressWarnings("unused")
151    Class<?> ensureLoaded = LockSupport.class;
152  }
153
154  /**
155   * Waiter links form a Treiber stack, in the {@link #waiters} field.
156   */
157  private static final class Waiter {
158    static final Waiter TOMBSTONE = new Waiter(false /* ignored param */);
159
160    @Nullable volatile Thread thread;
161    @Nullable volatile Waiter next;
162
163    /**
164     * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded
165     * before the ATOMIC_HELPER. Apparently this is possible on some android platforms.
166     */
167    Waiter(boolean unused) {}
168
169    Waiter() {
170      // avoid volatile write, write is made visible by subsequent CAS on waiters field
171      ATOMIC_HELPER.putThread(this, Thread.currentThread());
172    }
173
174    // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters
175    // field.
176    void setNext(Waiter next) {
177      ATOMIC_HELPER.putNext(this, next);
178    }
179
180    void unpark() {
181      // This is racy with removeWaiter. The consequence of the race is that we may spuriously call
182      // unpark even though the thread has already removed itself from the list. But even if we did
183      // use a CAS, that race would still exist (it would just be ever so slightly smaller).
184      Thread w = thread;
185      if (w != null) {
186        thread = null;
187        LockSupport.unpark(w);
188      }
189    }
190  }
191
192  /**
193   * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted
194   * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved
195   * by two things.
196   * <ul>
197   * <li>This is only called when a waiting thread times out or is interrupted. Both of which should
198   *     be rare.
199   * <li>The waiters list should be very short.
200   * </ul>
201   */
202  private void removeWaiter(Waiter node) {
203    node.thread = null; // mark as 'deleted'
204    restart:
205    while (true) {
206      Waiter pred = null;
207      Waiter curr = waiters;
208      if (curr == Waiter.TOMBSTONE) {
209        return; // give up if someone is calling complete
210      }
211      Waiter succ;
212      while (curr != null) {
213        succ = curr.next;
214        if (curr.thread != null) { // we aren't unlinking this node, update pred.
215          pred = curr;
216        } else if (pred != null) { // We are unlinking this node and it has a predecessor.
217          pred.next = succ;
218          if (pred.thread == null) { // We raced with another node that unlinked pred. Restart.
219            continue restart;
220          }
221        } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head
222          continue restart; // We raced with an add or complete
223        }
224        curr = succ;
225      }
226      break;
227    }
228  }
229
230  /** Listeners also form a stack through the {@link #listeners} field. */
231  private static final class Listener {
232    static final Listener TOMBSTONE = new Listener(null, null);
233    final Runnable task;
234    final Executor executor;
235
236    // writes to next are made visible by subsequent CAS's on the listeners field
237    @Nullable Listener next;
238
239    Listener(Runnable task, Executor executor) {
240      this.task = task;
241      this.executor = executor;
242    }
243  }
244
245  /** A special value to represent {@code null}. */
246  private static final Object NULL = new Object();
247
248  /** A special value to represent failure, when {@link #setException} is called successfully. */
249  private static final class Failure {
250    static final Failure FALLBACK_INSTANCE =
251        new Failure(
252            new Throwable("Failure occurred while trying to finish a future.") {
253              @Override
254              public synchronized Throwable fillInStackTrace() {
255                return this; // no stack trace
256              }
257            });
258    final Throwable exception;
259
260    Failure(Throwable exception) {
261      this.exception = checkNotNull(exception);
262    }
263  }
264
265  /** A special value to represent cancellation and the 'wasInterrupted' bit. */
266  private static final class Cancellation {
267    final boolean wasInterrupted;
268    @Nullable final Throwable cause;
269
270    Cancellation(boolean wasInterrupted, @Nullable Throwable cause) {
271      this.wasInterrupted = wasInterrupted;
272      this.cause = cause;
273    }
274  }
275
276  /** A special value that encodes the 'setFuture' state. */
277  private static final class SetFuture<V> implements Runnable {
278    final AbstractFuture<V> owner;
279    final ListenableFuture<? extends V> future;
280
281    SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) {
282      this.owner = owner;
283      this.future = future;
284    }
285
286    @Override
287    public void run() {
288      if (owner.value != this) {
289        // nothing to do, we must have been cancelled, don't bother inspecting the future.
290        return;
291      }
292      Object valueToSet = getFutureValue(future);
293      if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) {
294        complete(owner);
295      }
296    }
297  }
298
299  // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is
300  // available.
301  /**
302   * This field encodes the current state of the future.
303   *
304   * <p>The valid values are:
305   * <ul>
306   * <li>{@code null} initial state, nothing has happened.
307   * <li>{@link Cancellation} terminal state, {@code cancel} was called.
308   * <li>{@link Failure} terminal state, {@code setException} was called.
309   * <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
310   * <li>{@link #NULL} terminal state, {@code set(null)} was called.
311   * <li>Any other non-null value, terminal state, {@code set} was called with a non-null argument.
312   * </ul>
313   */
314  private volatile Object value;
315
316  /** All listeners. */
317  private volatile Listener listeners;
318
319  /** All waiting threads. */
320  private volatile Waiter waiters;
321
322  /**
323   * Constructor for use by subclasses.
324   */
325  protected AbstractFuture() {}
326
327  // Gets and Timed Gets
328  //
329  // * Be responsive to interruption
330  // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the
331  //   waiters field.
332  // * Future completion is defined by when #value becomes non-null/non SetFuture
333  // * Future completion can be observed if the waiters field contains a TOMBSTONE
334
335  // Timed Get
336  // There are a few design constraints to consider
337  // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I
338  //   have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the
339  //   timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of
340  //   spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for
341  //   similar purposes.
342  // * We want to behave reasonably for timeouts of 0
343  // * We are more responsive to completion than timeouts. This is because parkNanos depends on
344  //   system scheduling and as such we could either miss our deadline, or unpark() could be delayed
345  //   so that it looks like we timed out even though we didn't. For comparison FutureTask respects
346  //   completion preferably and AQS is non-deterministic (depends on where in the queue the waiter
347  //   is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node
348  //   and we could use that to make a decision about whether or not we timed out prior to being
349  //   unparked.
350
351  /*
352   * Improve the documentation of when InterruptedException is thrown. Our behavior matches the
353   * JDK's, but the JDK's documentation is misleading.
354   */
355
356  /**
357   * {@inheritDoc}
358   *
359   * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the
360   * current thread is interrupted before or during the call, even if the value is already
361   * available.
362   *
363   * @throws InterruptedException if the current thread was interrupted before or during the call
364   *     (optional but recommended).
365   * @throws CancellationException {@inheritDoc}
366   */
367  @CanIgnoreReturnValue
368  @Override
369  public V get(long timeout, TimeUnit unit)
370      throws InterruptedException, TimeoutException, ExecutionException {
371    // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop
372    // at the bottom and throw a timeoutexception.
373    long remainingNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit.
374    if (Thread.interrupted()) {
375      throw new InterruptedException();
376    }
377    Object localValue = value;
378    if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
379      return getDoneValue(localValue);
380    }
381    // we delay calling nanoTime until we know we will need to either park or spin
382    final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0;
383    long_wait_loop:
384    if (remainingNanos >= SPIN_THRESHOLD_NANOS) {
385      Waiter oldHead = waiters;
386      if (oldHead != Waiter.TOMBSTONE) {
387        Waiter node = new Waiter();
388        do {
389          node.setNext(oldHead);
390          if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
391            while (true) {
392              LockSupport.parkNanos(this, remainingNanos);
393              // Check interruption first, if we woke up due to interruption we need to honor that.
394              if (Thread.interrupted()) {
395                removeWaiter(node);
396                throw new InterruptedException();
397              }
398
399              // Otherwise re-read and check doneness. If we loop then it must have been a spurious
400              // wakeup
401              localValue = value;
402              if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
403                return getDoneValue(localValue);
404              }
405
406              // timed out?
407              remainingNanos = endNanos - System.nanoTime();
408              if (remainingNanos < SPIN_THRESHOLD_NANOS) {
409                // Remove the waiter, one way or another we are done parking this thread.
410                removeWaiter(node);
411                break long_wait_loop; // jump down to the busy wait loop
412              }
413            }
414          }
415          oldHead = waiters; // re-read and loop.
416        } while (oldHead != Waiter.TOMBSTONE);
417      }
418      // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
419      // waiter.
420      return getDoneValue(value);
421    }
422    // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the
423    // waiters list
424    while (remainingNanos > 0) {
425      localValue = value;
426      if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
427        return getDoneValue(localValue);
428      }
429      if (Thread.interrupted()) {
430        throw new InterruptedException();
431      }
432      remainingNanos = endNanos - System.nanoTime();
433    }
434    throw new TimeoutException();
435  }
436
437  /*
438   * Improve the documentation of when InterruptedException is thrown. Our behavior matches the
439   * JDK's, but the JDK's documentation is misleading.
440   */
441
442  /**
443   * {@inheritDoc}
444   *
445   * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the
446   * current thread is interrupted before or during the call, even if the value is already
447   * available.
448   *
449   * @throws InterruptedException if the current thread was interrupted before or during the call
450   *     (optional but recommended).
451   * @throws CancellationException {@inheritDoc}
452   */
453  @CanIgnoreReturnValue
454  @Override
455  public V get() throws InterruptedException, ExecutionException {
456    if (Thread.interrupted()) {
457      throw new InterruptedException();
458    }
459    Object localValue = value;
460    if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
461      return getDoneValue(localValue);
462    }
463    Waiter oldHead = waiters;
464    if (oldHead != Waiter.TOMBSTONE) {
465      Waiter node = new Waiter();
466      do {
467        node.setNext(oldHead);
468        if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
469          // we are on the stack, now wait for completion.
470          while (true) {
471            LockSupport.park(this);
472            // Check interruption first, if we woke up due to interruption we need to honor that.
473            if (Thread.interrupted()) {
474              removeWaiter(node);
475              throw new InterruptedException();
476            }
477            // Otherwise re-read and check doneness. If we loop then it must have been a spurious
478            // wakeup
479            localValue = value;
480            if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) {
481              return getDoneValue(localValue);
482            }
483          }
484        }
485        oldHead = waiters; // re-read and loop.
486      } while (oldHead != Waiter.TOMBSTONE);
487    }
488    // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
489    // waiter.
490    return getDoneValue(value);
491  }
492
493  /**
494   * Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}.
495   */
496  private V getDoneValue(Object obj) throws ExecutionException {
497    // While this seems like it might be too branch-y, simple benchmarking proves it to be
498    // unmeasurable (comparing done AbstractFutures with immediateFuture)
499    if (obj instanceof Cancellation) {
500      throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause);
501    } else if (obj instanceof Failure) {
502      throw new ExecutionException(((Failure) obj).exception);
503    } else if (obj == NULL) {
504      return null;
505    } else {
506      @SuppressWarnings("unchecked") // this is the only other option
507      V asV = (V) obj;
508      return asV;
509    }
510  }
511
512  @Override
513  public boolean isDone() {
514    final Object localValue = value;
515    return localValue != null & !(localValue instanceof AbstractFuture.SetFuture);
516  }
517
518  @Override
519  public boolean isCancelled() {
520    final Object localValue = value;
521    return localValue instanceof Cancellation;
522  }
523
524  /**
525   * {@inheritDoc}
526   *
527   * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain
528   * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate
529   * {@code Future} that was supplied in the {@code setFuture} call.
530   */
531  @CanIgnoreReturnValue
532  @Override
533  public boolean cancel(boolean mayInterruptIfRunning) {
534    Object localValue = value;
535    boolean rValue = false;
536    if (localValue == null | localValue instanceof AbstractFuture.SetFuture) {
537      // Try to delay allocating the exception. At this point we may still lose the CAS, but it is
538      // certainly less likely.
539      Throwable cause =
540          GENERATE_CANCELLATION_CAUSES
541              ? new CancellationException("Future.cancel() was called.")
542              : null;
543      Object valueToSet = new Cancellation(mayInterruptIfRunning, cause);
544      AbstractFuture<?> abstractFuture = this;
545      while (true) {
546        if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
547          rValue = true;
548          // We call interuptTask before calling complete(), which is consistent with
549          // FutureTask
550          if (mayInterruptIfRunning) {
551            abstractFuture.interruptTask();
552          }
553          complete(abstractFuture);
554          if (localValue instanceof AbstractFuture.SetFuture) {
555            // propagate cancellation to the future set in setfuture, this is racy, and we don't
556            // care if we are successful or not.
557            ListenableFuture<?> futureToPropagateTo =
558                ((AbstractFuture.SetFuture) localValue).future;
559            if (futureToPropagateTo instanceof TrustedFuture) {
560              // If the future is a TrustedFuture then we specifically avoid calling cancel()
561              // this has 2 benefits
562              // 1. for long chains of futures strung together with setFuture we consume less stack
563              // 2. we avoid allocating Cancellation objects at every level of the cancellation
564              //    chain
565              // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and
566              // does nothing but delegate to this method.
567              AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo;
568              localValue = trusted.value;
569              if (localValue == null | localValue instanceof AbstractFuture.SetFuture) {
570                abstractFuture = trusted;
571                continue;  // loop back up and try to complete the new future
572              }
573            } else {
574              // not a TrustedFuture, call cancel directly.
575              futureToPropagateTo.cancel(mayInterruptIfRunning);
576            }
577          }
578          break;
579        }
580        // obj changed, reread
581        localValue = abstractFuture.value;
582        if (!(localValue instanceof AbstractFuture.SetFuture)) {
583          // obj cannot be null at this point, because value can only change from null to non-null.
584          // So if value changed (and it did since we lost the CAS), then it cannot be null and
585          // since it isn't a SetFuture, then the future must be done and we should exit the loop
586          break;
587        }
588      }
589    }
590    return rValue;
591  }
592
593  /**
594   * Subclasses can override this method to implement interruption of the future's computation. The
595   * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}.
596   *
597   * <p>The default implementation does nothing.
598   *
599   * @since 10.0
600   */
601  protected void interruptTask() {}
602
603  /**
604   * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code
605   * true}.
606   *
607   * @since 14.0
608   */
609  protected final boolean wasInterrupted() {
610    final Object localValue = value;
611    return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
612  }
613
614  /**
615   * {@inheritDoc}
616   *
617   * @since 10.0
618   */
619  @Override
620  public void addListener(Runnable listener, Executor executor) {
621    checkNotNull(listener, "Runnable was null.");
622    checkNotNull(executor, "Executor was null.");
623    Listener oldHead = listeners;
624    if (oldHead != Listener.TOMBSTONE) {
625      Listener newNode = new Listener(listener, executor);
626      do {
627        newNode.next = oldHead;
628        if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
629          return;
630        }
631        oldHead = listeners; // re-read
632      } while (oldHead != Listener.TOMBSTONE);
633    }
634    // If we get here then the Listener TOMBSTONE was set, which means the future is done, call
635    // the listener.
636    executeListener(listener, executor);
637  }
638
639  /**
640   * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or
641   * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns,
642   * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was
643   * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code
644   * Future} may have previously been set asynchronously, in which case its result may not be known
645   * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*}
646   * method, only by a call to {@link #cancel}.
647   *
648   * @param value the value to be used as the result
649   * @return true if the attempt was accepted, completing the {@code Future}
650   */
651  @CanIgnoreReturnValue
652  protected boolean set(@Nullable V value) {
653    Object valueToSet = value == null ? NULL : value;
654    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
655      complete(this);
656      return true;
657    }
658    return false;
659  }
660
661  /**
662   * Sets the failed result of this {@code Future} unless this {@code Future} has already been
663   * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this
664   * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b>
665   * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the
666   * {@code Future} may have previously been set asynchronously, in which case its result may not be
667   * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*}
668   * method, only by a call to {@link #cancel}.
669   *
670   * @param throwable the exception to be used as the failed result
671   * @return true if the attempt was accepted, completing the {@code Future}
672   */
673  @CanIgnoreReturnValue
674  protected boolean setException(Throwable throwable) {
675    Object valueToSet = new Failure(checkNotNull(throwable));
676    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
677      complete(this);
678      return true;
679    }
680    return false;
681  }
682
683  /**
684   * Sets the result of this {@code Future} to match the supplied input {@code Future} once the
685   * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set
686   * (including "set asynchronously," defined below).
687   *
688   * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call
689   * is accepted, then this future is guaranteed to have been completed with the supplied future by
690   * the time this method returns. If the supplied future is not done and the call is accepted, then
691   * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known,
692   * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}.
693   *
694   * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later
695   * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to
696   * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code
697   * Future}.
698   *
699   * @param future the future to delegate to
700   * @return true if the attempt was accepted, indicating that the {@code Future} was not previously
701   *     cancelled or set.
702   * @since 19.0
703   */
704  @Beta
705  @CanIgnoreReturnValue
706  protected boolean setFuture(ListenableFuture<? extends V> future) {
707    checkNotNull(future);
708    Object localValue = value;
709    if (localValue == null) {
710      if (future.isDone()) {
711        Object value = getFutureValue(future);
712        if (ATOMIC_HELPER.casValue(this, null, value)) {
713          complete(this);
714          return true;
715        }
716        return false;
717      }
718      SetFuture valueToSet = new SetFuture<V>(this, future);
719      if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
720        // the listener is responsible for calling completeWithFuture, directExecutor is appropriate
721        // since all we are doing is unpacking a completed future which should be fast.
722        try {
723          future.addListener(valueToSet, directExecutor());
724        } catch (Throwable t) {
725          // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this
726          // must have been caused by addListener itself. The most likely explanation is a
727          // misconfigured mock. Try to switch to Failure.
728          Failure failure;
729          try {
730            failure = new Failure(t);
731          } catch (Throwable oomMostLikely) {
732            failure = Failure.FALLBACK_INSTANCE;
733          }
734          // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok.
735          boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure);
736        }
737        return true;
738      }
739      localValue = value; // we lost the cas, fall through and maybe cancel
740    }
741    // The future has already been set to something. If it is cancellation we should cancel the
742    // incoming future.
743    if (localValue instanceof Cancellation) {
744      // we don't care if it fails, this is best-effort.
745      future.cancel(((Cancellation) localValue).wasInterrupted);
746    }
747    return false;
748  }
749
750  /**
751   * Returns a value, suitable for storing in the {@link #value} field. From the given future,
752   * which is assumed to be done.
753   *
754   * <p>This is approximately the inverse of {@link #getDoneValue(Object)}
755   */
756  private static Object getFutureValue(ListenableFuture<?> future) {
757    Object valueToSet;
758    if (future instanceof TrustedFuture) {
759      // Break encapsulation for TrustedFuture instances since we know that subclasses cannot
760      // override .get() (since it is final) and therefore this is equivalent to calling .get()
761      // and unpacking the exceptions like we do below (just much faster because it is a single
762      // field read instead of a read, several branches and possibly creating exceptions).
763      return ((AbstractFuture<?>) future).value;
764    } else {
765      // Otherwise calculate valueToSet by calling .get()
766      try {
767        Object v = getDone(future);
768        valueToSet = v == null ? NULL : v;
769      } catch (ExecutionException exception) {
770        valueToSet = new Failure(exception.getCause());
771      } catch (CancellationException cancellation) {
772        valueToSet = new Cancellation(false, cancellation);
773      } catch (Throwable t) {
774        valueToSet = new Failure(t);
775      }
776    }
777    return valueToSet;
778  }
779
780  /** Unblocks all threads and runs all listeners. */
781  private static void complete(AbstractFuture<?> future) {
782    Listener next = null;
783    outer: while (true) {
784      future.releaseWaiters();
785      // We call this before the listeners in order to avoid needing to manage a separate stack data
786      // structure for them.
787      // afterDone() should be generally fast and only used for cleanup work... but in theory can
788      // also be recursive and create StackOverflowErrors
789      future.afterDone();
790      // push the current set of listeners onto next
791      next = future.clearListeners(next);
792      future = null;
793      while (next != null) {
794        Listener curr = next;
795        next = next.next;
796        Runnable task = curr.task;
797        if (task instanceof AbstractFuture.SetFuture) {
798          AbstractFuture.SetFuture<?> setFuture = (AbstractFuture.SetFuture) task;
799          // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long
800          // chains of SetFutures
801          // Handling this special case is important because there is no way to pass an executor to
802          // setFuture, so a user couldn't break the chain by doing this themselves.  It is also
803          // potentially common if someone writes a recursive Futures.transformAsync transformer.
804          future = setFuture.owner;
805          if (future.value == setFuture) {
806            Object valueToSet = getFutureValue(setFuture.future);
807            if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
808              continue outer;
809            }
810          }
811          // other wise the future we were trying to set is already done.
812        } else {
813          executeListener(task, curr.executor);
814        }
815      }
816      break;
817    }
818  }
819
820  /**
821   * Callback method that is called exactly once after the future is completed.
822   *
823   * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it.
824   *
825   * <p>The default implementation of this method in {@code AbstractFuture} does nothing.  This is
826   * intended for very lightweight cleanup work, for example, timing statistics or clearing fields.
827   * If your task does anything heavier consider, just using a listener with an executor.
828   *
829   * @since 20.0
830   */
831  // TODO(cpovirk): @ForOverride https://github.com/google/error-prone/issues/342
832  @Beta
833  protected void afterDone() {}
834
835  /**
836   * Returns the exception that this {@code Future} completed with. This includes completion through
837   * a call to {@link setException} or {@link setFuture}{@code (failedFuture)} but not cancellation.
838   *
839   * @throws RuntimeException if the {@code Future} has not failed
840   */
841  final Throwable trustedGetException() {
842    return ((Failure) value).exception;
843  }
844
845  /**
846   * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts)
847   * the given future (if available).
848   *
849   * <p>This method should be used only when this future is completed. It is designed to be called
850   * from {@code done}.
851   */
852  final void maybePropagateCancellation(@Nullable Future<?> related) {
853    if (related != null & isCancelled()) {
854      related.cancel(wasInterrupted());
855    }
856  }
857
858  /** Releases all threads in the {@link #waiters} list, and clears the list. */
859  private void releaseWaiters() {
860    Waiter head;
861    do {
862      head = waiters;
863    } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE));
864    for (Waiter currentWaiter = head;
865        currentWaiter != null;
866        currentWaiter = currentWaiter.next) {
867      currentWaiter.unpark();
868    }
869  }
870
871  /**
872   * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently
873   * added first.
874   */
875  private Listener clearListeners(Listener onto) {
876    // We need to
877    // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to
878    //    to synchronize with us
879    // 2. reverse the linked list, because despite our rather clear contract, people depend on us
880    //    executing listeners in the order they were added
881    // 3. push all the items onto 'onto' and return the new head of the stack
882    Listener head;
883    do {
884      head = listeners;
885    } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE));
886    Listener reversedList = onto;
887    while (head != null) {
888      Listener tmp = head;
889      head = head.next;
890      tmp.next = reversedList;
891      reversedList = tmp;
892    }
893    return reversedList;
894  }
895
896  /**
897   * Submits the given runnable to the given {@link Executor} catching and logging all
898   * {@linkplain RuntimeException runtime exceptions} thrown by the executor.
899   */
900  private static void executeListener(Runnable runnable, Executor executor) {
901    try {
902      executor.execute(runnable);
903    } catch (RuntimeException e) {
904      // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if
905      // we're given a bad one. We only catch RuntimeException because we want Errors to propagate
906      // up.
907      log.log(
908          Level.SEVERE,
909          "RuntimeException while executing runnable " + runnable + " with executor " + executor,
910          e);
911    }
912  }
913
914  private abstract static class AtomicHelper {
915    /** Non volatile write of the thread to the {@link Waiter#thread} field. */
916    abstract void putThread(Waiter waiter, Thread newValue);
917
918    /** Non volatile write of the waiter to the {@link Waiter#next} field. */
919    abstract void putNext(Waiter waiter, Waiter newValue);
920
921    /** Performs a CAS operation on the {@link #waiters} field. */
922    abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update);
923
924    /** Performs a CAS operation on the {@link #listeners} field. */
925    abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update);
926
927    /** Performs a CAS operation on the {@link #value} field. */
928    abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update);
929  }
930
931  /**
932   * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
933   *
934   * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot
935   * be accessed.
936   */
937  private static final class UnsafeAtomicHelper extends AtomicHelper {
938    static final sun.misc.Unsafe UNSAFE;
939    static final long LISTENERS_OFFSET;
940    static final long WAITERS_OFFSET;
941    static final long VALUE_OFFSET;
942    static final long WAITER_THREAD_OFFSET;
943    static final long WAITER_NEXT_OFFSET;
944
945    static {
946      sun.misc.Unsafe unsafe = null;
947      try {
948        unsafe = sun.misc.Unsafe.getUnsafe();
949      } catch (SecurityException tryReflectionInstead) {
950        try {
951          unsafe =
952              AccessController.doPrivileged(
953                  new PrivilegedExceptionAction<sun.misc.Unsafe>() {
954                    @Override
955                    public sun.misc.Unsafe run() throws Exception {
956                      Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
957                      for (java.lang.reflect.Field f : k.getDeclaredFields()) {
958                        f.setAccessible(true);
959                        Object x = f.get(null);
960                        if (k.isInstance(x)) {
961                          return k.cast(x);
962                        }
963                      }
964                      throw new NoSuchFieldError("the Unsafe");
965                    }
966                  });
967        } catch (PrivilegedActionException e) {
968          throw new RuntimeException("Could not initialize intrinsics", e.getCause());
969        }
970      }
971      try {
972        Class<?> abstractFuture = AbstractFuture.class;
973        WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters"));
974        LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners"));
975        VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value"));
976        WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread"));
977        WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next"));
978        UNSAFE = unsafe;
979      } catch (Exception e) {
980        throwIfUnchecked(e);
981        throw new RuntimeException(e);
982      }
983    }
984
985    @Override
986    void putThread(Waiter waiter, Thread newValue) {
987      UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue);
988    }
989
990    @Override
991    void putNext(Waiter waiter, Waiter newValue) {
992      UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue);
993    }
994
995    /** Performs a CAS operation on the {@link #waiters} field. */
996    @Override
997    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
998      return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update);
999    }
1000
1001    /** Performs a CAS operation on the {@link #listeners} field. */
1002    @Override
1003    boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1004      return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update);
1005    }
1006
1007    /** Performs a CAS operation on the {@link #value} field. */
1008    @Override
1009    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1010      return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update);
1011    }
1012  }
1013
1014  /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */
1015  private static final class SafeAtomicHelper extends AtomicHelper {
1016    final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater;
1017    final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater;
1018    final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater;
1019    final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater;
1020    final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;
1021
1022    SafeAtomicHelper(
1023        AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater,
1024        AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater,
1025        AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater,
1026        AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater,
1027        AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {
1028      this.waiterThreadUpdater = waiterThreadUpdater;
1029      this.waiterNextUpdater = waiterNextUpdater;
1030      this.waitersUpdater = waitersUpdater;
1031      this.listenersUpdater = listenersUpdater;
1032      this.valueUpdater = valueUpdater;
1033    }
1034
1035    @Override
1036    void putThread(Waiter waiter, Thread newValue) {
1037      waiterThreadUpdater.lazySet(waiter, newValue);
1038    }
1039
1040    @Override
1041    void putNext(Waiter waiter, Waiter newValue) {
1042      waiterNextUpdater.lazySet(waiter, newValue);
1043    }
1044
1045    @Override
1046    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
1047      return waitersUpdater.compareAndSet(future, expect, update);
1048    }
1049
1050    @Override
1051    boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1052      return listenersUpdater.compareAndSet(future, expect, update);
1053    }
1054
1055    @Override
1056    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1057      return valueUpdater.compareAndSet(future, expect, update);
1058    }
1059  }
1060
1061  /**
1062   * {@link AtomicHelper} based on {@code synchronized} and volatile writes.
1063   *
1064   * <p>This is an implementation of last resort for when certain basic VM features are broken (like
1065   * AtomicReferenceFieldUpdater).
1066   */
1067  private static final class SynchronizedHelper extends AtomicHelper {
1068    @Override
1069    void putThread(Waiter waiter, Thread newValue) {
1070      waiter.thread = newValue;
1071    }
1072
1073    @Override
1074    void putNext(Waiter waiter, Waiter newValue) {
1075      waiter.next = newValue;
1076    }
1077
1078    @Override
1079    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
1080      synchronized (future) {
1081        if (future.waiters == expect) {
1082          future.waiters = update;
1083          return true;
1084        }
1085        return false;
1086      }
1087    }
1088
1089    @Override
1090    boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1091      synchronized (future) {
1092        if (future.listeners == expect) {
1093          future.listeners = update;
1094          return true;
1095        }
1096        return false;
1097      }
1098    }
1099
1100    @Override
1101    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1102      synchronized (future) {
1103        if (future.value == expect) {
1104          future.value = update;
1105          return true;
1106        }
1107        return false;
1108      }
1109    }
1110  }
1111
1112  private static CancellationException cancellationExceptionWithCause(
1113      @Nullable String message, @Nullable Throwable cause) {
1114    CancellationException exception = new CancellationException(message);
1115    exception.initCause(cause);
1116    return exception;
1117  }
1118}