001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.base.Strings.isNullOrEmpty;
019import static com.google.common.base.Throwables.throwIfUnchecked;
020import static com.google.common.util.concurrent.Futures.getDone;
021import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
022import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
023
024import com.google.common.annotations.Beta;
025import com.google.common.annotations.GwtCompatible;
026import com.google.common.base.Ascii;
027import com.google.errorprone.annotations.CanIgnoreReturnValue;
028import com.google.errorprone.annotations.DoNotMock;
029import com.google.j2objc.annotations.ReflectionSupport;
030import java.security.AccessController;
031import java.security.PrivilegedActionException;
032import java.security.PrivilegedExceptionAction;
033import java.util.concurrent.CancellationException;
034import java.util.concurrent.ExecutionException;
035import java.util.concurrent.Executor;
036import java.util.concurrent.Future;
037import java.util.concurrent.ScheduledFuture;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
041import java.util.concurrent.locks.LockSupport;
042import java.util.logging.Level;
043import java.util.logging.Logger;
044import javax.annotation.Nullable;
045
046/**
047 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More
048 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture},
049 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an
050 * existing one, typically using methods like {@link Futures#transform(ListenableFuture,
051 * com.google.common.base.Function, java.util.concurrent.Executor) Futures.transform} and {@link
052 * Futures#catching(ListenableFuture, Class, com.google.common.base.Function,
053 * java.util.concurrent.Executor) Futures.catching}.
054 *
055 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way
056 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link
057 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override
058 * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses
059 * should rarely override other methods.
060 *
061 * @author Sven Mawson
062 * @author Luke Sandberg
063 * @since 1.0
064 */
065@DoNotMock("Use Futures.immediate*Future or SettableFuture")
066@GwtCompatible(emulated = true)
067@ReflectionSupport(value = ReflectionSupport.Level.FULL)
068public abstract class AbstractFuture<V> extends FluentFuture<V> {
069  // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, ||
070
071  private static final boolean GENERATE_CANCELLATION_CAUSES =
072      Boolean.parseBoolean(
073          System.getProperty("guava.concurrent.generate_cancellation_cause", "false"));
074
075  /**
076   * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring
077   * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}.
078   */
079  abstract static class TrustedFuture<V> extends AbstractFuture<V> {
080    @CanIgnoreReturnValue
081    @Override
082    public final V get() throws InterruptedException, ExecutionException {
083      return super.get();
084    }
085
086    @CanIgnoreReturnValue
087    @Override
088    public final V get(long timeout, TimeUnit unit)
089        throws InterruptedException, ExecutionException, TimeoutException {
090      return super.get(timeout, unit);
091    }
092
093    @Override
094    public final boolean isDone() {
095      return super.isDone();
096    }
097
098    @Override
099    public final boolean isCancelled() {
100      return super.isCancelled();
101    }
102
103    @Override
104    public final void addListener(Runnable listener, Executor executor) {
105      super.addListener(listener, executor);
106    }
107
108    @CanIgnoreReturnValue
109    @Override
110    public final boolean cancel(boolean mayInterruptIfRunning) {
111      return super.cancel(mayInterruptIfRunning);
112    }
113  }
114
115  // Logger to log exceptions caught when running listeners.
116  private static final Logger log = Logger.getLogger(AbstractFuture.class.getName());
117
118  // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of
119  // blocking. This value is what AbstractQueuedSynchronizer uses.
120  private static final long SPIN_THRESHOLD_NANOS = 1000L;
121
122  private static final AtomicHelper ATOMIC_HELPER;
123
124  static {
125    AtomicHelper helper;
126
127    try {
128      helper = new UnsafeAtomicHelper();
129    } catch (Throwable unsafeFailure) {
130      // catch absolutely everything and fall through to our 'SafeAtomicHelper'
131      // The access control checks that ARFU does means the caller class has to be AbstractFuture
132      // instead of SafeAtomicHelper, so we annoyingly define these here
133      try {
134        helper =
135            new SafeAtomicHelper(
136                newUpdater(Waiter.class, Thread.class, "thread"),
137                newUpdater(Waiter.class, Waiter.class, "next"),
138                newUpdater(AbstractFuture.class, Waiter.class, "waiters"),
139                newUpdater(AbstractFuture.class, Listener.class, "listeners"),
140                newUpdater(AbstractFuture.class, Object.class, "value"));
141      } catch (Throwable atomicReferenceFieldUpdaterFailure) {
142        // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause
143        // getDeclaredField to throw a NoSuchFieldException when the field is definitely there.
144        // For these users fallback to a suboptimal implementation, based on synchronized. This will
145        // be a definite performance hit to those users.
146        log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure);
147        log.log(Level.SEVERE, "SafeAtomicHelper is broken!", atomicReferenceFieldUpdaterFailure);
148        helper = new SynchronizedHelper();
149      }
150    }
151    ATOMIC_HELPER = helper;
152
153    // Prevent rare disastrous classloading in first call to LockSupport.park.
154    // See: https://bugs.openjdk.java.net/browse/JDK-8074773
155    @SuppressWarnings("unused")
156    Class<?> ensureLoaded = LockSupport.class;
157  }
158
159  /**
160   * Waiter links form a Treiber stack, in the {@link #waiters} field.
161   */
162  private static final class Waiter {
163    static final Waiter TOMBSTONE = new Waiter(false /* ignored param */);
164
165    @Nullable volatile Thread thread;
166    @Nullable volatile Waiter next;
167
168    /**
169     * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded
170     * before the ATOMIC_HELPER. Apparently this is possible on some android platforms.
171     */
172    Waiter(boolean unused) {}
173
174    Waiter() {
175      // avoid volatile write, write is made visible by subsequent CAS on waiters field
176      ATOMIC_HELPER.putThread(this, Thread.currentThread());
177    }
178
179    // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters
180    // field.
181    void setNext(Waiter next) {
182      ATOMIC_HELPER.putNext(this, next);
183    }
184
185    void unpark() {
186      // This is racy with removeWaiter. The consequence of the race is that we may spuriously call
187      // unpark even though the thread has already removed itself from the list. But even if we did
188      // use a CAS, that race would still exist (it would just be ever so slightly smaller).
189      Thread w = thread;
190      if (w != null) {
191        thread = null;
192        LockSupport.unpark(w);
193      }
194    }
195  }
196
197  /**
198   * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted
199   * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved
200   * by two things.
201   * <ul>
202   * <li>This is only called when a waiting thread times out or is interrupted. Both of which should
203   *     be rare.
204   * <li>The waiters list should be very short.
205   * </ul>
206   */
207  private void removeWaiter(Waiter node) {
208    node.thread = null; // mark as 'deleted'
209    restart:
210    while (true) {
211      Waiter pred = null;
212      Waiter curr = waiters;
213      if (curr == Waiter.TOMBSTONE) {
214        return; // give up if someone is calling complete
215      }
216      Waiter succ;
217      while (curr != null) {
218        succ = curr.next;
219        if (curr.thread != null) { // we aren't unlinking this node, update pred.
220          pred = curr;
221        } else if (pred != null) { // We are unlinking this node and it has a predecessor.
222          pred.next = succ;
223          if (pred.thread == null) { // We raced with another node that unlinked pred. Restart.
224            continue restart;
225          }
226        } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head
227          continue restart; // We raced with an add or complete
228        }
229        curr = succ;
230      }
231      break;
232    }
233  }
234
235  /** Listeners also form a stack through the {@link #listeners} field. */
236  private static final class Listener {
237    static final Listener TOMBSTONE = new Listener(null, null);
238    final Runnable task;
239    final Executor executor;
240
241    // writes to next are made visible by subsequent CAS's on the listeners field
242    @Nullable Listener next;
243
244    Listener(Runnable task, Executor executor) {
245      this.task = task;
246      this.executor = executor;
247    }
248  }
249
250  /** A special value to represent {@code null}. */
251  private static final Object NULL = new Object();
252
253  /** A special value to represent failure, when {@link #setException} is called successfully. */
254  private static final class Failure {
255    static final Failure FALLBACK_INSTANCE =
256        new Failure(
257            new Throwable("Failure occurred while trying to finish a future.") {
258              @Override
259              public synchronized Throwable fillInStackTrace() {
260                return this; // no stack trace
261              }
262            });
263    final Throwable exception;
264
265    Failure(Throwable exception) {
266      this.exception = checkNotNull(exception);
267    }
268  }
269
270  /** A special value to represent cancellation and the 'wasInterrupted' bit. */
271  private static final class Cancellation {
272    final boolean wasInterrupted;
273    @Nullable final Throwable cause;
274
275    Cancellation(boolean wasInterrupted, @Nullable Throwable cause) {
276      this.wasInterrupted = wasInterrupted;
277      this.cause = cause;
278    }
279  }
280
281  /** A special value that encodes the 'setFuture' state. */
282  private static final class SetFuture<V> implements Runnable {
283    final AbstractFuture<V> owner;
284    final ListenableFuture<? extends V> future;
285
286    SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) {
287      this.owner = owner;
288      this.future = future;
289    }
290
291    @Override
292    public void run() {
293      if (owner.value != this) {
294        // nothing to do, we must have been cancelled, don't bother inspecting the future.
295        return;
296      }
297      Object valueToSet = getFutureValue(future);
298      if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) {
299        complete(owner);
300      }
301    }
302  }
303
304  // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is
305  // available.
306  /**
307   * This field encodes the current state of the future.
308   *
309   * <p>The valid values are:
310   * <ul>
311   * <li>{@code null} initial state, nothing has happened.
312   * <li>{@link Cancellation} terminal state, {@code cancel} was called.
313   * <li>{@link Failure} terminal state, {@code setException} was called.
314   * <li>{@link SetFuture} intermediate state, {@code setFuture} was called.
315   * <li>{@link #NULL} terminal state, {@code set(null)} was called.
316   * <li>Any other non-null value, terminal state, {@code set} was called with a non-null argument.
317   * </ul>
318   */
319  private volatile Object value;
320
321  /** All listeners. */
322  private volatile Listener listeners;
323
324  /** All waiting threads. */
325  private volatile Waiter waiters;
326
327  /**
328   * Constructor for use by subclasses.
329   */
330  protected AbstractFuture() {}
331
332  // Gets and Timed Gets
333  //
334  // * Be responsive to interruption
335  // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the
336  //   waiters field.
337  // * Future completion is defined by when #value becomes non-null/non SetFuture
338  // * Future completion can be observed if the waiters field contains a TOMBSTONE
339
340  // Timed Get
341  // There are a few design constraints to consider
342  // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I
343  //   have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the
344  //   timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of
345  //   spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for
346  //   similar purposes.
347  // * We want to behave reasonably for timeouts of 0
348  // * We are more responsive to completion than timeouts. This is because parkNanos depends on
349  //   system scheduling and as such we could either miss our deadline, or unpark() could be delayed
350  //   so that it looks like we timed out even though we didn't. For comparison FutureTask respects
351  //   completion preferably and AQS is non-deterministic (depends on where in the queue the waiter
352  //   is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node
353  //   and we could use that to make a decision about whether or not we timed out prior to being
354  //   unparked.
355
356  /**
357   * {@inheritDoc}
358   *
359   * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the
360   * current thread is interrupted during the call, even if the value is already available.
361   *
362   * @throws CancellationException {@inheritDoc}
363   */
364  @CanIgnoreReturnValue
365  @Override
366  public V get(long timeout, TimeUnit unit)
367      throws InterruptedException, TimeoutException, ExecutionException {
368    // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop
369    // at the bottom and throw a timeoutexception.
370    long remainingNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit.
371    if (Thread.interrupted()) {
372      throw new InterruptedException();
373    }
374    Object localValue = value;
375    if (localValue != null & !(localValue instanceof SetFuture)) {
376      return getDoneValue(localValue);
377    }
378    // we delay calling nanoTime until we know we will need to either park or spin
379    final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0;
380    long_wait_loop:
381    if (remainingNanos >= SPIN_THRESHOLD_NANOS) {
382      Waiter oldHead = waiters;
383      if (oldHead != Waiter.TOMBSTONE) {
384        Waiter node = new Waiter();
385        do {
386          node.setNext(oldHead);
387          if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
388            while (true) {
389              LockSupport.parkNanos(this, remainingNanos);
390              // Check interruption first, if we woke up due to interruption we need to honor that.
391              if (Thread.interrupted()) {
392                removeWaiter(node);
393                throw new InterruptedException();
394              }
395
396              // Otherwise re-read and check doneness. If we loop then it must have been a spurious
397              // wakeup
398              localValue = value;
399              if (localValue != null & !(localValue instanceof SetFuture)) {
400                return getDoneValue(localValue);
401              }
402
403              // timed out?
404              remainingNanos = endNanos - System.nanoTime();
405              if (remainingNanos < SPIN_THRESHOLD_NANOS) {
406                // Remove the waiter, one way or another we are done parking this thread.
407                removeWaiter(node);
408                break long_wait_loop; // jump down to the busy wait loop
409              }
410            }
411          }
412          oldHead = waiters; // re-read and loop.
413        } while (oldHead != Waiter.TOMBSTONE);
414      }
415      // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
416      // waiter.
417      return getDoneValue(value);
418    }
419    // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the
420    // waiters list
421    while (remainingNanos > 0) {
422      localValue = value;
423      if (localValue != null & !(localValue instanceof SetFuture)) {
424        return getDoneValue(localValue);
425      }
426      if (Thread.interrupted()) {
427        throw new InterruptedException();
428      }
429      remainingNanos = endNanos - System.nanoTime();
430    }
431
432    String futureToString = toString();
433    // It's confusing to see a completed future in a timeout message; if isDone() returns false,
434    // then we know it must have given a pending toString value earlier. If not, then the future
435    // completed after the timeout expired, and the message might be success.
436    if (isDone()) {
437      throw new TimeoutException(
438          "Waited " + timeout + " " + Ascii.toLowerCase(unit.toString())
439              + " but future completed as timeout expired");
440    }
441    throw new TimeoutException(
442        "Waited " + timeout + " " + Ascii.toLowerCase(unit.toString()) + " for " + futureToString);
443  }
444
445  /**
446   * {@inheritDoc}
447   *
448   * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the
449   * current thread is interrupted during the call, even if the value is already available.
450   *
451   * @throws CancellationException {@inheritDoc}
452   */
453  @CanIgnoreReturnValue
454  @Override
455  public V get() throws InterruptedException, ExecutionException {
456    if (Thread.interrupted()) {
457      throw new InterruptedException();
458    }
459    Object localValue = value;
460    if (localValue != null & !(localValue instanceof SetFuture)) {
461      return getDoneValue(localValue);
462    }
463    Waiter oldHead = waiters;
464    if (oldHead != Waiter.TOMBSTONE) {
465      Waiter node = new Waiter();
466      do {
467        node.setNext(oldHead);
468        if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) {
469          // we are on the stack, now wait for completion.
470          while (true) {
471            LockSupport.park(this);
472            // Check interruption first, if we woke up due to interruption we need to honor that.
473            if (Thread.interrupted()) {
474              removeWaiter(node);
475              throw new InterruptedException();
476            }
477            // Otherwise re-read and check doneness. If we loop then it must have been a spurious
478            // wakeup
479            localValue = value;
480            if (localValue != null & !(localValue instanceof SetFuture)) {
481              return getDoneValue(localValue);
482            }
483          }
484        }
485        oldHead = waiters; // re-read and loop.
486      } while (oldHead != Waiter.TOMBSTONE);
487    }
488    // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a
489    // waiter.
490    return getDoneValue(value);
491  }
492
493  /**
494   * Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}.
495   */
496  private V getDoneValue(Object obj) throws ExecutionException {
497    // While this seems like it might be too branch-y, simple benchmarking proves it to be
498    // unmeasurable (comparing done AbstractFutures with immediateFuture)
499    if (obj instanceof Cancellation) {
500      throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause);
501    } else if (obj instanceof Failure) {
502      throw new ExecutionException(((Failure) obj).exception);
503    } else if (obj == NULL) {
504      return null;
505    } else {
506      @SuppressWarnings("unchecked") // this is the only other option
507      V asV = (V) obj;
508      return asV;
509    }
510  }
511
512  @Override
513  public boolean isDone() {
514    final Object localValue = value;
515    return localValue != null & !(localValue instanceof SetFuture);
516  }
517
518  @Override
519  public boolean isCancelled() {
520    final Object localValue = value;
521    return localValue instanceof Cancellation;
522  }
523
524  /**
525   * {@inheritDoc}
526   *
527   * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain
528   * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate
529   * {@code Future} that was supplied in the {@code setFuture} call.
530   *
531   * <p>Rather than override this method to perform additional cancellation work or cleanup,
532   * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link
533   * #wasInterrupted} as necessary. This ensures that the work is done even if the future is
534   * cancelled without a call to {@code cancel}, such as by calling {@code
535   * setFuture(cancelledFuture)}.
536   */
537  @CanIgnoreReturnValue
538  @Override
539  public boolean cancel(boolean mayInterruptIfRunning) {
540    Object localValue = value;
541    boolean rValue = false;
542    if (localValue == null | localValue instanceof SetFuture) {
543      // Try to delay allocating the exception. At this point we may still lose the CAS, but it is
544      // certainly less likely.
545      Throwable cause =
546          GENERATE_CANCELLATION_CAUSES
547              ? new CancellationException("Future.cancel() was called.")
548              : null;
549      Object valueToSet = new Cancellation(mayInterruptIfRunning, cause);
550      AbstractFuture<?> abstractFuture = this;
551      while (true) {
552        if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) {
553          rValue = true;
554          // We call interuptTask before calling complete(), which is consistent with
555          // FutureTask
556          if (mayInterruptIfRunning) {
557            abstractFuture.interruptTask();
558          }
559          complete(abstractFuture);
560          if (localValue instanceof SetFuture) {
561            // propagate cancellation to the future set in setfuture, this is racy, and we don't
562            // care if we are successful or not.
563            ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future;
564            if (futureToPropagateTo instanceof TrustedFuture) {
565              // If the future is a TrustedFuture then we specifically avoid calling cancel()
566              // this has 2 benefits
567              // 1. for long chains of futures strung together with setFuture we consume less stack
568              // 2. we avoid allocating Cancellation objects at every level of the cancellation
569              //    chain
570              // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and
571              // does nothing but delegate to this method.
572              AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo;
573              localValue = trusted.value;
574              if (localValue == null | localValue instanceof SetFuture) {
575                abstractFuture = trusted;
576                continue;  // loop back up and try to complete the new future
577              }
578            } else {
579              // not a TrustedFuture, call cancel directly.
580              futureToPropagateTo.cancel(mayInterruptIfRunning);
581            }
582          }
583          break;
584        }
585        // obj changed, reread
586        localValue = abstractFuture.value;
587        if (!(localValue instanceof SetFuture)) {
588          // obj cannot be null at this point, because value can only change from null to non-null.
589          // So if value changed (and it did since we lost the CAS), then it cannot be null and
590          // since it isn't a SetFuture, then the future must be done and we should exit the loop
591          break;
592        }
593      }
594    }
595    return rValue;
596  }
597
598  /**
599   * Subclasses can override this method to implement interruption of the future's computation. The
600   * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}.
601   *
602   * <p>The default implementation does nothing.
603   *
604   * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, consulting
605   * {@link #wasInterrupted} to decide whether to interrupt your task.
606   *
607   * @since 10.0
608   */
609  protected void interruptTask() {}
610
611  /**
612   * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code
613   * true}.
614   *
615   * @since 14.0
616   */
617  protected final boolean wasInterrupted() {
618    final Object localValue = value;
619    return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted;
620  }
621
622  /**
623   * {@inheritDoc}
624   *
625   * @since 10.0
626   */
627  @Override
628  public void addListener(Runnable listener, Executor executor) {
629    checkNotNull(listener, "Runnable was null.");
630    checkNotNull(executor, "Executor was null.");
631    Listener oldHead = listeners;
632    if (oldHead != Listener.TOMBSTONE) {
633      Listener newNode = new Listener(listener, executor);
634      do {
635        newNode.next = oldHead;
636        if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) {
637          return;
638        }
639        oldHead = listeners; // re-read
640      } while (oldHead != Listener.TOMBSTONE);
641    }
642    // If we get here then the Listener TOMBSTONE was set, which means the future is done, call
643    // the listener.
644    executeListener(listener, executor);
645  }
646
647  /**
648   * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or
649   * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns,
650   * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was
651   * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code
652   * Future} may have previously been set asynchronously, in which case its result may not be known
653   * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*}
654   * method, only by a call to {@link #cancel}.
655   *
656   * @param value the value to be used as the result
657   * @return true if the attempt was accepted, completing the {@code Future}
658   */
659  @CanIgnoreReturnValue
660  protected boolean set(@Nullable V value) {
661    Object valueToSet = value == null ? NULL : value;
662    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
663      complete(this);
664      return true;
665    }
666    return false;
667  }
668
669  /**
670   * Sets the failed result of this {@code Future} unless this {@code Future} has already been
671   * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this
672   * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b>
673   * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the
674   * {@code Future} may have previously been set asynchronously, in which case its result may not be
675   * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*}
676   * method, only by a call to {@link #cancel}.
677   *
678   * @param throwable the exception to be used as the failed result
679   * @return true if the attempt was accepted, completing the {@code Future}
680   */
681  @CanIgnoreReturnValue
682  protected boolean setException(Throwable throwable) {
683    Object valueToSet = new Failure(checkNotNull(throwable));
684    if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
685      complete(this);
686      return true;
687    }
688    return false;
689  }
690
691  /**
692   * Sets the result of this {@code Future} to match the supplied input {@code Future} once the
693   * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set
694   * (including "set asynchronously," defined below).
695   *
696   * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call
697   * is accepted, then this future is guaranteed to have been completed with the supplied future by
698   * the time this method returns. If the supplied future is not done and the call is accepted, then
699   * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known,
700   * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}.
701   *
702   * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later
703   * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to
704   * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code
705   * Future}.
706   *
707   * @param future the future to delegate to
708   * @return true if the attempt was accepted, indicating that the {@code Future} was not previously
709   *     cancelled or set.
710   * @since 19.0
711   */
712  @Beta
713  @CanIgnoreReturnValue
714  protected boolean setFuture(ListenableFuture<? extends V> future) {
715    checkNotNull(future);
716    Object localValue = value;
717    if (localValue == null) {
718      if (future.isDone()) {
719        Object value = getFutureValue(future);
720        if (ATOMIC_HELPER.casValue(this, null, value)) {
721          complete(this);
722          return true;
723        }
724        return false;
725      }
726      SetFuture valueToSet = new SetFuture<V>(this, future);
727      if (ATOMIC_HELPER.casValue(this, null, valueToSet)) {
728        // the listener is responsible for calling completeWithFuture, directExecutor is appropriate
729        // since all we are doing is unpacking a completed future which should be fast.
730        try {
731          future.addListener(valueToSet, directExecutor());
732        } catch (Throwable t) {
733          // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this
734          // must have been caused by addListener itself. The most likely explanation is a
735          // misconfigured mock. Try to switch to Failure.
736          Failure failure;
737          try {
738            failure = new Failure(t);
739          } catch (Throwable oomMostLikely) {
740            failure = Failure.FALLBACK_INSTANCE;
741          }
742          // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok.
743          boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure);
744        }
745        return true;
746      }
747      localValue = value; // we lost the cas, fall through and maybe cancel
748    }
749    // The future has already been set to something. If it is cancellation we should cancel the
750    // incoming future.
751    if (localValue instanceof Cancellation) {
752      // we don't care if it fails, this is best-effort.
753      future.cancel(((Cancellation) localValue).wasInterrupted);
754    }
755    return false;
756  }
757
758  /**
759   * Returns a value, suitable for storing in the {@link #value} field. From the given future,
760   * which is assumed to be done.
761   *
762   * <p>This is approximately the inverse of {@link #getDoneValue(Object)}
763   */
764  private static Object getFutureValue(ListenableFuture<?> future) {
765    Object valueToSet;
766    if (future instanceof TrustedFuture) {
767      // Break encapsulation for TrustedFuture instances since we know that subclasses cannot
768      // override .get() (since it is final) and therefore this is equivalent to calling .get()
769      // and unpacking the exceptions like we do below (just much faster because it is a single
770      // field read instead of a read, several branches and possibly creating exceptions).
771      return ((AbstractFuture<?>) future).value;
772    } else {
773      // Otherwise calculate valueToSet by calling .get()
774      try {
775        Object v = getDone(future);
776        valueToSet = v == null ? NULL : v;
777      } catch (ExecutionException exception) {
778        valueToSet = new Failure(exception.getCause());
779      } catch (CancellationException cancellation) {
780        valueToSet = new Cancellation(false, cancellation);
781      } catch (Throwable t) {
782        valueToSet = new Failure(t);
783      }
784    }
785    return valueToSet;
786  }
787
788  /** Unblocks all threads and runs all listeners. */
789  private static void complete(AbstractFuture<?> future) {
790    Listener next = null;
791    outer: while (true) {
792      future.releaseWaiters();
793      // We call this before the listeners in order to avoid needing to manage a separate stack data
794      // structure for them.
795      // afterDone() should be generally fast and only used for cleanup work... but in theory can
796      // also be recursive and create StackOverflowErrors
797      future.afterDone();
798      // push the current set of listeners onto next
799      next = future.clearListeners(next);
800      future = null;
801      while (next != null) {
802        Listener curr = next;
803        next = next.next;
804        Runnable task = curr.task;
805        if (task instanceof SetFuture) {
806          SetFuture<?> setFuture = (SetFuture<?>) task;
807          // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long
808          // chains of SetFutures
809          // Handling this special case is important because there is no way to pass an executor to
810          // setFuture, so a user couldn't break the chain by doing this themselves.  It is also
811          // potentially common if someone writes a recursive Futures.transformAsync transformer.
812          future = setFuture.owner;
813          if (future.value == setFuture) {
814            Object valueToSet = getFutureValue(setFuture.future);
815            if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) {
816              continue outer;
817            }
818          }
819          // other wise the future we were trying to set is already done.
820        } else {
821          executeListener(task, curr.executor);
822        }
823      }
824      break;
825    }
826  }
827
828  /**
829   * Callback method that is called exactly once after the future is completed.
830   *
831   * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it.
832   *
833   * <p>The default implementation of this method in {@code AbstractFuture} does nothing.  This is
834   * intended for very lightweight cleanup work, for example, timing statistics or clearing fields.
835   * If your task does anything heavier consider, just using a listener with an executor.
836   *
837   * @since 20.0
838   */
839  // TODO(cpovirk): @ForOverride https://github.com/google/error-prone/issues/342
840  @Beta
841  protected void afterDone() {}
842
843  /**
844   * Returns the exception that this {@code Future} completed with. This includes completion through
845   * a call to {@link #setException} or {@link #setFuture setFuture}{@code (failedFuture)} but not
846   * cancellation.
847   *
848   * @throws RuntimeException if the {@code Future} has not failed
849   */
850  final Throwable trustedGetException() {
851    return ((Failure) value).exception;
852  }
853
854  /**
855   * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts)
856   * the given future (if available).
857   *
858   * <p>This method should be used only when this future is completed. It is designed to be called
859   * from {@code done}.
860   */
861  final void maybePropagateCancellation(@Nullable Future<?> related) {
862    if (related != null & isCancelled()) {
863      related.cancel(wasInterrupted());
864    }
865  }
866
867  /** Releases all threads in the {@link #waiters} list, and clears the list. */
868  private void releaseWaiters() {
869    Waiter head;
870    do {
871      head = waiters;
872    } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE));
873    for (Waiter currentWaiter = head;
874        currentWaiter != null;
875        currentWaiter = currentWaiter.next) {
876      currentWaiter.unpark();
877    }
878  }
879
880  /**
881   * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently
882   * added first.
883   */
884  private Listener clearListeners(Listener onto) {
885    // We need to
886    // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to
887    //    to synchronize with us
888    // 2. reverse the linked list, because despite our rather clear contract, people depend on us
889    //    executing listeners in the order they were added
890    // 3. push all the items onto 'onto' and return the new head of the stack
891    Listener head;
892    do {
893      head = listeners;
894    } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE));
895    Listener reversedList = onto;
896    while (head != null) {
897      Listener tmp = head;
898      head = head.next;
899      tmp.next = reversedList;
900      reversedList = tmp;
901    }
902    return reversedList;
903  }
904
905  // TODO(user) move this up into FluentFuture, or parts as a default method on ListenableFuture?
906  @Override
907  public String toString() {
908    StringBuilder builder = new StringBuilder().append(super.toString()).append("[status=");
909    if (isCancelled()) {
910      builder.append("CANCELLED");
911    } else if (isDone()) {
912      addDoneString(builder);
913    } else {
914      String pendingDescription;
915      try {
916        pendingDescription = pendingToString();
917      } catch (RuntimeException e) {
918        // Don't call getMessage or toString() on the exception, in case the exception thrown by the
919        // subclass is implemented with bugs similar to the subclass.
920        pendingDescription = "Exception thrown from implementation: " + e.getClass();
921      }
922      // The future may complete during or before the call to getPendingToString, so we use null
923      // as a signal that we should try checking if the future is done again.
924      if (!isNullOrEmpty(pendingDescription)) {
925        builder.append("PENDING, info=[").append(pendingDescription).append("]");
926      } else if (isDone()) {
927        addDoneString(builder);
928      } else {
929        builder.append("PENDING");
930      }
931    }
932    return builder.append("]").toString();
933  }
934
935  /**
936   * Provide a human-readable explanation of why this future has not yet completed.
937   *
938   * @return null if an explanation cannot be provided because the future is done.
939   * @since 23.0
940   */
941  @Nullable
942  protected String pendingToString() {
943    Object localValue = value;
944    if (localValue instanceof SetFuture) {
945      return "setFuture=[" + ((SetFuture) localValue).future + "]";
946    } else if (this instanceof ScheduledFuture) {
947      return "remaining delay=["
948          + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS)
949          + " ms]";
950    }
951    return null;
952  }
953
954  private void addDoneString(StringBuilder builder) {
955    try {
956      V value = getDone(this);
957      builder.append("SUCCESS, result=[").append(value).append("]");
958    } catch (ExecutionException e) {
959      builder.append("FAILURE, cause=[").append(e.getCause()).append("]");
960    } catch (CancellationException e) {
961      builder.append("CANCELLED"); // shouldn't be reachable
962    } catch (RuntimeException e) {
963      builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]");
964    }
965  }
966
967  /**
968   * Submits the given runnable to the given {@link Executor} catching and logging all
969   * {@linkplain RuntimeException runtime exceptions} thrown by the executor.
970   */
971  private static void executeListener(Runnable runnable, Executor executor) {
972    try {
973      executor.execute(runnable);
974    } catch (RuntimeException e) {
975      // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if
976      // we're given a bad one. We only catch RuntimeException because we want Errors to propagate
977      // up.
978      log.log(
979          Level.SEVERE,
980          "RuntimeException while executing runnable " + runnable + " with executor " + executor,
981          e);
982    }
983  }
984
985  private abstract static class AtomicHelper {
986    /** Non volatile write of the thread to the {@link Waiter#thread} field. */
987    abstract void putThread(Waiter waiter, Thread newValue);
988
989    /** Non volatile write of the waiter to the {@link Waiter#next} field. */
990    abstract void putNext(Waiter waiter, Waiter newValue);
991
992    /** Performs a CAS operation on the {@link #waiters} field. */
993    abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update);
994
995    /** Performs a CAS operation on the {@link #listeners} field. */
996    abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update);
997
998    /** Performs a CAS operation on the {@link #value} field. */
999    abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update);
1000  }
1001
1002  /**
1003   * {@link AtomicHelper} based on {@link sun.misc.Unsafe}.
1004   *
1005   * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot
1006   * be accessed.
1007   */
1008  private static final class UnsafeAtomicHelper extends AtomicHelper {
1009    static final sun.misc.Unsafe UNSAFE;
1010    static final long LISTENERS_OFFSET;
1011    static final long WAITERS_OFFSET;
1012    static final long VALUE_OFFSET;
1013    static final long WAITER_THREAD_OFFSET;
1014    static final long WAITER_NEXT_OFFSET;
1015
1016    static {
1017      sun.misc.Unsafe unsafe = null;
1018      try {
1019        unsafe = sun.misc.Unsafe.getUnsafe();
1020      } catch (SecurityException tryReflectionInstead) {
1021        try {
1022          unsafe =
1023              AccessController.doPrivileged(
1024                  new PrivilegedExceptionAction<sun.misc.Unsafe>() {
1025                    @Override
1026                    public sun.misc.Unsafe run() throws Exception {
1027                      Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
1028                      for (java.lang.reflect.Field f : k.getDeclaredFields()) {
1029                        f.setAccessible(true);
1030                        Object x = f.get(null);
1031                        if (k.isInstance(x)) {
1032                          return k.cast(x);
1033                        }
1034                      }
1035                      throw new NoSuchFieldError("the Unsafe");
1036                    }
1037                  });
1038        } catch (PrivilegedActionException e) {
1039          throw new RuntimeException("Could not initialize intrinsics", e.getCause());
1040        }
1041      }
1042      try {
1043        Class<?> abstractFuture = AbstractFuture.class;
1044        WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters"));
1045        LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners"));
1046        VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value"));
1047        WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread"));
1048        WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next"));
1049        UNSAFE = unsafe;
1050      } catch (Exception e) {
1051        throwIfUnchecked(e);
1052        throw new RuntimeException(e);
1053      }
1054    }
1055
1056    @Override
1057    void putThread(Waiter waiter, Thread newValue) {
1058      UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue);
1059    }
1060
1061    @Override
1062    void putNext(Waiter waiter, Waiter newValue) {
1063      UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue);
1064    }
1065
1066    /** Performs a CAS operation on the {@link #waiters} field. */
1067    @Override
1068    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
1069      return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update);
1070    }
1071
1072    /** Performs a CAS operation on the {@link #listeners} field. */
1073    @Override
1074    boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1075      return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update);
1076    }
1077
1078    /** Performs a CAS operation on the {@link #value} field. */
1079    @Override
1080    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1081      return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update);
1082    }
1083  }
1084
1085  /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */
1086  private static final class SafeAtomicHelper extends AtomicHelper {
1087    final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater;
1088    final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater;
1089    final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater;
1090    final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater;
1091    final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater;
1092
1093    SafeAtomicHelper(
1094        AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater,
1095        AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater,
1096        AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater,
1097        AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater,
1098        AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) {
1099      this.waiterThreadUpdater = waiterThreadUpdater;
1100      this.waiterNextUpdater = waiterNextUpdater;
1101      this.waitersUpdater = waitersUpdater;
1102      this.listenersUpdater = listenersUpdater;
1103      this.valueUpdater = valueUpdater;
1104    }
1105
1106    @Override
1107    void putThread(Waiter waiter, Thread newValue) {
1108      waiterThreadUpdater.lazySet(waiter, newValue);
1109    }
1110
1111    @Override
1112    void putNext(Waiter waiter, Waiter newValue) {
1113      waiterNextUpdater.lazySet(waiter, newValue);
1114    }
1115
1116    @Override
1117    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
1118      return waitersUpdater.compareAndSet(future, expect, update);
1119    }
1120
1121    @Override
1122    boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1123      return listenersUpdater.compareAndSet(future, expect, update);
1124    }
1125
1126    @Override
1127    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1128      return valueUpdater.compareAndSet(future, expect, update);
1129    }
1130  }
1131
1132  /**
1133   * {@link AtomicHelper} based on {@code synchronized} and volatile writes.
1134   *
1135   * <p>This is an implementation of last resort for when certain basic VM features are broken (like
1136   * AtomicReferenceFieldUpdater).
1137   */
1138  private static final class SynchronizedHelper extends AtomicHelper {
1139    @Override
1140    void putThread(Waiter waiter, Thread newValue) {
1141      waiter.thread = newValue;
1142    }
1143
1144    @Override
1145    void putNext(Waiter waiter, Waiter newValue) {
1146      waiter.next = newValue;
1147    }
1148
1149    @Override
1150    boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) {
1151      synchronized (future) {
1152        if (future.waiters == expect) {
1153          future.waiters = update;
1154          return true;
1155        }
1156        return false;
1157      }
1158    }
1159
1160    @Override
1161    boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) {
1162      synchronized (future) {
1163        if (future.listeners == expect) {
1164          future.listeners = update;
1165          return true;
1166        }
1167        return false;
1168      }
1169    }
1170
1171    @Override
1172    boolean casValue(AbstractFuture<?> future, Object expect, Object update) {
1173      synchronized (future) {
1174        if (future.value == expect) {
1175          future.value = update;
1176          return true;
1177        }
1178        return false;
1179      }
1180    }
1181  }
1182
1183  private static CancellationException cancellationExceptionWithCause(
1184      @Nullable String message, @Nullable Throwable cause) {
1185    CancellationException exception = new CancellationException(message);
1186    exception.initCause(cause);
1187    return exception;
1188  }
1189}