001/* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Throwables.throwIfUnchecked; 019import static com.google.common.util.concurrent.NullnessCasts.uncheckedNull; 020import static java.lang.Integer.toHexString; 021import static java.lang.System.identityHashCode; 022import static java.util.Objects.requireNonNull; 023import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 024 025import com.google.common.annotations.Beta; 026import com.google.common.annotations.GwtCompatible; 027import com.google.common.base.Strings; 028import com.google.common.util.concurrent.internal.InternalFutureFailureAccess; 029import com.google.common.util.concurrent.internal.InternalFutures; 030import com.google.errorprone.annotations.CanIgnoreReturnValue; 031import com.google.errorprone.annotations.ForOverride; 032import com.google.j2objc.annotations.ReflectionSupport; 033import java.security.AccessController; 034import java.security.PrivilegedActionException; 035import java.security.PrivilegedExceptionAction; 036import java.util.Locale; 037import java.util.concurrent.CancellationException; 038import java.util.concurrent.ExecutionException; 039import java.util.concurrent.Executor; 040import java.util.concurrent.Future; 041import java.util.concurrent.ScheduledFuture; 042import java.util.concurrent.TimeUnit; 043import java.util.concurrent.TimeoutException; 044import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 045import java.util.concurrent.locks.LockSupport; 046import java.util.logging.Level; 047import java.util.logging.Logger; 048import javax.annotation.CheckForNull; 049import org.checkerframework.checker.nullness.qual.Nullable; 050 051/** 052 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 053 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 054 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 055 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, 056 * com.google.common.base.Function, java.util.concurrent.Executor) Futures.transform} and {@link 057 * Futures#catching(ListenableFuture, Class, com.google.common.base.Function, 058 * java.util.concurrent.Executor) Futures.catching}. 059 * 060 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 061 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 062 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 063 * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses 064 * should rarely override other methods. 065 * 066 * @author Sven Mawson 067 * @author Luke Sandberg 068 * @since 1.0 069 */ 070@SuppressWarnings({ 071 "ShortCircuitBoolean", // we use non-short circuiting comparisons intentionally 072 "nullness", // TODO(b/147136275): Remove once our checker understands & and |. 073}) 074@GwtCompatible(emulated = true) 075@ReflectionSupport(value = ReflectionSupport.Level.FULL) 076@ElementTypesAreNonnullByDefault 077public abstract class AbstractFuture<V extends @Nullable Object> extends InternalFutureFailureAccess 078 implements ListenableFuture<V> { 079 // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || 080 081 static final boolean GENERATE_CANCELLATION_CAUSES; 082 083 static { 084 // System.getProperty may throw if the security policy does not permit access. 085 boolean generateCancellationCauses; 086 try { 087 generateCancellationCauses = 088 Boolean.parseBoolean( 089 System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); 090 } catch (SecurityException e) { 091 generateCancellationCauses = false; 092 } 093 GENERATE_CANCELLATION_CAUSES = generateCancellationCauses; 094 } 095 096 /** 097 * Tag interface marking trusted subclasses. This enables some optimizations. The implementation 098 * of this interface must also be an AbstractFuture and must not override or expose for overriding 099 * any of the public methods of ListenableFuture. 100 */ 101 interface Trusted<V extends @Nullable Object> extends ListenableFuture<V> {} 102 103 /** 104 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 105 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 106 */ 107 abstract static class TrustedFuture<V extends @Nullable Object> extends AbstractFuture<V> 108 implements Trusted<V> { 109 @CanIgnoreReturnValue 110 @Override 111 @ParametricNullness 112 public final V get() throws InterruptedException, ExecutionException { 113 return super.get(); 114 } 115 116 @CanIgnoreReturnValue 117 @Override 118 @ParametricNullness 119 public final V get(long timeout, TimeUnit unit) 120 throws InterruptedException, ExecutionException, TimeoutException { 121 return super.get(timeout, unit); 122 } 123 124 @Override 125 public final boolean isDone() { 126 return super.isDone(); 127 } 128 129 @Override 130 public final boolean isCancelled() { 131 return super.isCancelled(); 132 } 133 134 @Override 135 public final void addListener(Runnable listener, Executor executor) { 136 super.addListener(listener, executor); 137 } 138 139 @CanIgnoreReturnValue 140 @Override 141 public final boolean cancel(boolean mayInterruptIfRunning) { 142 return super.cancel(mayInterruptIfRunning); 143 } 144 } 145 146 // Logger to log exceptions caught when running listeners. 147 private static final Logger log = Logger.getLogger(AbstractFuture.class.getName()); 148 149 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 150 // blocking. This value is what AbstractQueuedSynchronizer uses. 151 private static final long SPIN_THRESHOLD_NANOS = 1000L; 152 153 private static final AtomicHelper ATOMIC_HELPER; 154 155 static { 156 AtomicHelper helper; 157 Throwable thrownUnsafeFailure = null; 158 Throwable thrownAtomicReferenceFieldUpdaterFailure = null; 159 160 try { 161 helper = new UnsafeAtomicHelper(); 162 } catch (Throwable unsafeFailure) { 163 thrownUnsafeFailure = unsafeFailure; 164 // catch absolutely everything and fall through to our 'SafeAtomicHelper' 165 // The access control checks that ARFU does means the caller class has to be AbstractFuture 166 // instead of SafeAtomicHelper, so we annoyingly define these here 167 try { 168 helper = 169 new SafeAtomicHelper( 170 newUpdater(Waiter.class, Thread.class, "thread"), 171 newUpdater(Waiter.class, Waiter.class, "next"), 172 newUpdater(AbstractFuture.class, Waiter.class, "waiters"), 173 newUpdater(AbstractFuture.class, Listener.class, "listeners"), 174 newUpdater(AbstractFuture.class, Object.class, "value")); 175 } catch (Throwable atomicReferenceFieldUpdaterFailure) { 176 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 177 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 178 // For these users fallback to a suboptimal implementation, based on synchronized. This will 179 // be a definite performance hit to those users. 180 thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure; 181 helper = new SynchronizedHelper(); 182 } 183 } 184 ATOMIC_HELPER = helper; 185 186 // Prevent rare disastrous classloading in first call to LockSupport.park. 187 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 188 @SuppressWarnings("unused") 189 Class<?> ensureLoaded = LockSupport.class; 190 191 // Log after all static init is finished; if an installed logger uses any Futures methods, it 192 // shouldn't break in cases where reflection is missing/broken. 193 if (thrownAtomicReferenceFieldUpdaterFailure != null) { 194 log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", thrownUnsafeFailure); 195 log.log( 196 Level.SEVERE, "SafeAtomicHelper is broken!", thrownAtomicReferenceFieldUpdaterFailure); 197 } 198 } 199 200 /** Waiter links form a Treiber stack, in the {@link #waiters} field. */ 201 private static final class Waiter { 202 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 203 204 @CheckForNull volatile Thread thread; 205 @CheckForNull volatile Waiter next; 206 207 /** 208 * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 209 * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 210 */ 211 Waiter(boolean unused) {} 212 213 Waiter() { 214 // avoid volatile write, write is made visible by subsequent CAS on waiters field 215 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 216 } 217 218 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 219 // field. 220 void setNext(@CheckForNull Waiter next) { 221 ATOMIC_HELPER.putNext(this, next); 222 } 223 224 void unpark() { 225 // This is racy with removeWaiter. The consequence of the race is that we may spuriously call 226 // unpark even though the thread has already removed itself from the list. But even if we did 227 // use a CAS, that race would still exist (it would just be ever so slightly smaller). 228 Thread w = thread; 229 if (w != null) { 230 thread = null; 231 LockSupport.unpark(w); 232 } 233 } 234 } 235 236 /** 237 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 238 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved 239 * by two things. 240 * 241 * <ul> 242 * <li>This is only called when a waiting thread times out or is interrupted. Both of which 243 * should be rare. 244 * <li>The waiters list should be very short. 245 * </ul> 246 */ 247 private void removeWaiter(Waiter node) { 248 node.thread = null; // mark as 'deleted' 249 restart: 250 while (true) { 251 Waiter pred = null; 252 Waiter curr = waiters; 253 if (curr == Waiter.TOMBSTONE) { 254 return; // give up if someone is calling complete 255 } 256 Waiter succ; 257 while (curr != null) { 258 succ = curr.next; 259 if (curr.thread != null) { // we aren't unlinking this node, update pred. 260 pred = curr; 261 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 262 pred.next = succ; 263 if (pred.thread == null) { // We raced with another node that unlinked pred. Restart. 264 continue restart; 265 } 266 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 267 continue restart; // We raced with an add or complete 268 } 269 curr = succ; 270 } 271 break; 272 } 273 } 274 275 /** Listeners also form a stack through the {@link #listeners} field. */ 276 private static final class Listener { 277 static final Listener TOMBSTONE = new Listener(); 278 @CheckForNull // null only for TOMBSTONE 279 final Runnable task; 280 @CheckForNull // null only for TOMBSTONE 281 final Executor executor; 282 283 // writes to next are made visible by subsequent CAS's on the listeners field 284 @CheckForNull Listener next; 285 286 Listener(Runnable task, Executor executor) { 287 this.task = task; 288 this.executor = executor; 289 } 290 291 Listener() { 292 this.task = null; 293 this.executor = null; 294 } 295 } 296 297 /** A special value to represent {@code null}. */ 298 private static final Object NULL = new Object(); 299 300 /** A special value to represent failure, when {@link #setException} is called successfully. */ 301 private static final class Failure { 302 static final Failure FALLBACK_INSTANCE = 303 new Failure( 304 new Throwable("Failure occurred while trying to finish a future.") { 305 @Override 306 public synchronized Throwable fillInStackTrace() { 307 return this; // no stack trace 308 } 309 }); 310 final Throwable exception; 311 312 Failure(Throwable exception) { 313 this.exception = checkNotNull(exception); 314 } 315 } 316 317 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 318 private static final class Cancellation { 319 // constants to use when GENERATE_CANCELLATION_CAUSES = false 320 @CheckForNull static final Cancellation CAUSELESS_INTERRUPTED; 321 @CheckForNull static final Cancellation CAUSELESS_CANCELLED; 322 323 static { 324 if (GENERATE_CANCELLATION_CAUSES) { 325 CAUSELESS_CANCELLED = null; 326 CAUSELESS_INTERRUPTED = null; 327 } else { 328 CAUSELESS_CANCELLED = new Cancellation(false, null); 329 CAUSELESS_INTERRUPTED = new Cancellation(true, null); 330 } 331 } 332 333 final boolean wasInterrupted; 334 @CheckForNull final Throwable cause; 335 336 Cancellation(boolean wasInterrupted, @CheckForNull Throwable cause) { 337 this.wasInterrupted = wasInterrupted; 338 this.cause = cause; 339 } 340 } 341 342 /** A special value that encodes the 'setFuture' state. */ 343 private static final class SetFuture<V extends @Nullable Object> implements Runnable { 344 final AbstractFuture<V> owner; 345 final ListenableFuture<? extends V> future; 346 347 SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) { 348 this.owner = owner; 349 this.future = future; 350 } 351 352 @Override 353 public void run() { 354 if (owner.value != this) { 355 // nothing to do, we must have been cancelled, don't bother inspecting the future. 356 return; 357 } 358 Object valueToSet = getFutureValue(future); 359 if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { 360 complete(owner); 361 } 362 } 363 } 364 365 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 366 // available. 367 /** 368 * This field encodes the current state of the future. 369 * 370 * <p>The valid values are: 371 * 372 * <ul> 373 * <li>{@code null} initial state, nothing has happened. 374 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 375 * <li>{@link Failure} terminal state, {@code setException} was called. 376 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 377 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 378 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null 379 * argument. 380 * </ul> 381 */ 382 @CheckForNull private volatile Object value; 383 384 /** All listeners. */ 385 @CheckForNull private volatile Listener listeners; 386 387 /** All waiting threads. */ 388 @CheckForNull private volatile Waiter waiters; 389 390 /** Constructor for use by subclasses. */ 391 protected AbstractFuture() {} 392 393 // Gets and Timed Gets 394 // 395 // * Be responsive to interruption 396 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 397 // waiters field. 398 // * Future completion is defined by when #value becomes non-null/non SetFuture 399 // * Future completion can be observed if the waiters field contains a TOMBSTONE 400 401 // Timed Get 402 // There are a few design constraints to consider 403 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 404 // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the 405 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 406 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 407 // similar purposes. 408 // * We want to behave reasonably for timeouts of 0 409 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 410 // system scheduling and as such we could either miss our deadline, or unpark() could be delayed 411 // so that it looks like we timed out even though we didn't. For comparison FutureTask respects 412 // completion preferably and AQS is non-deterministic (depends on where in the queue the waiter 413 // is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node 414 // and we could use that to make a decision about whether or not we timed out prior to being 415 // unparked. 416 417 /** 418 * {@inheritDoc} 419 * 420 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 421 * current thread is interrupted during the call, even if the value is already available. 422 * 423 * @throws CancellationException {@inheritDoc} 424 */ 425 @CanIgnoreReturnValue 426 @Override 427 @ParametricNullness 428 public V get(long timeout, TimeUnit unit) 429 throws InterruptedException, TimeoutException, ExecutionException { 430 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop 431 // at the bottom and throw a timeoutexception. 432 final long timeoutNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit. 433 long remainingNanos = timeoutNanos; 434 if (Thread.interrupted()) { 435 throw new InterruptedException(); 436 } 437 Object localValue = value; 438 if (localValue != null & !(localValue instanceof SetFuture)) { 439 return getDoneValue(localValue); 440 } 441 // we delay calling nanoTime until we know we will need to either park or spin 442 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 443 long_wait_loop: 444 if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 445 Waiter oldHead = waiters; 446 if (oldHead != Waiter.TOMBSTONE) { 447 Waiter node = new Waiter(); 448 do { 449 node.setNext(oldHead); 450 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 451 while (true) { 452 OverflowAvoidingLockSupport.parkNanos(this, remainingNanos); 453 // Check interruption first, if we woke up due to interruption we need to honor that. 454 if (Thread.interrupted()) { 455 removeWaiter(node); 456 throw new InterruptedException(); 457 } 458 459 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 460 // wakeup 461 localValue = value; 462 if (localValue != null & !(localValue instanceof SetFuture)) { 463 return getDoneValue(localValue); 464 } 465 466 // timed out? 467 remainingNanos = endNanos - System.nanoTime(); 468 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 469 // Remove the waiter, one way or another we are done parking this thread. 470 removeWaiter(node); 471 break long_wait_loop; // jump down to the busy wait loop 472 } 473 } 474 } 475 oldHead = waiters; // re-read and loop. 476 } while (oldHead != Waiter.TOMBSTONE); 477 } 478 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 479 // waiter. 480 // requireNonNull is safe because value is always set before TOMBSTONE. 481 return getDoneValue(requireNonNull(value)); 482 } 483 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the 484 // waiters list 485 while (remainingNanos > 0) { 486 localValue = value; 487 if (localValue != null & !(localValue instanceof SetFuture)) { 488 return getDoneValue(localValue); 489 } 490 if (Thread.interrupted()) { 491 throw new InterruptedException(); 492 } 493 remainingNanos = endNanos - System.nanoTime(); 494 } 495 496 String futureToString = toString(); 497 final String unitString = unit.toString().toLowerCase(Locale.ROOT); 498 String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT); 499 // Only report scheduling delay if larger than our spin threshold - otherwise it's just noise 500 if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) { 501 // We over-waited for our timeout. 502 message += " (plus "; 503 long overWaitNanos = -remainingNanos; 504 long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS); 505 long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits); 506 boolean shouldShowExtraNanos = 507 overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS; 508 if (overWaitUnits > 0) { 509 message += overWaitUnits + " " + unitString; 510 if (shouldShowExtraNanos) { 511 message += ","; 512 } 513 message += " "; 514 } 515 if (shouldShowExtraNanos) { 516 message += overWaitLeftoverNanos + " nanoseconds "; 517 } 518 519 message += "delay)"; 520 } 521 // It's confusing to see a completed future in a timeout message; if isDone() returns false, 522 // then we know it must have given a pending toString value earlier. If not, then the future 523 // completed after the timeout expired, and the message might be success. 524 if (isDone()) { 525 throw new TimeoutException(message + " but future completed as timeout expired"); 526 } 527 throw new TimeoutException(message + " for " + futureToString); 528 } 529 530 /** 531 * {@inheritDoc} 532 * 533 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 534 * current thread is interrupted during the call, even if the value is already available. 535 * 536 * @throws CancellationException {@inheritDoc} 537 */ 538 @CanIgnoreReturnValue 539 @Override 540 @ParametricNullness 541 public V get() throws InterruptedException, ExecutionException { 542 if (Thread.interrupted()) { 543 throw new InterruptedException(); 544 } 545 Object localValue = value; 546 if (localValue != null & !(localValue instanceof SetFuture)) { 547 return getDoneValue(localValue); 548 } 549 Waiter oldHead = waiters; 550 if (oldHead != Waiter.TOMBSTONE) { 551 Waiter node = new Waiter(); 552 do { 553 node.setNext(oldHead); 554 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 555 // we are on the stack, now wait for completion. 556 while (true) { 557 LockSupport.park(this); 558 // Check interruption first, if we woke up due to interruption we need to honor that. 559 if (Thread.interrupted()) { 560 removeWaiter(node); 561 throw new InterruptedException(); 562 } 563 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 564 // wakeup 565 localValue = value; 566 if (localValue != null & !(localValue instanceof SetFuture)) { 567 return getDoneValue(localValue); 568 } 569 } 570 } 571 oldHead = waiters; // re-read and loop. 572 } while (oldHead != Waiter.TOMBSTONE); 573 } 574 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 575 // waiter. 576 // requireNonNull is safe because value is always set before TOMBSTONE. 577 return getDoneValue(requireNonNull(value)); 578 } 579 580 /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ 581 @ParametricNullness 582 private V getDoneValue(Object obj) throws ExecutionException { 583 // While this seems like it might be too branch-y, simple benchmarking proves it to be 584 // unmeasurable (comparing done AbstractFutures with immediateFuture) 585 if (obj instanceof Cancellation) { 586 throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); 587 } else if (obj instanceof Failure) { 588 throw new ExecutionException(((Failure) obj).exception); 589 } else if (obj == NULL) { 590 /* 591 * It's safe to return null because we would only have stored it in the first place if it were 592 * a valid value for V. 593 */ 594 return uncheckedNull(); 595 } else { 596 @SuppressWarnings("unchecked") // this is the only other option 597 V asV = (V) obj; 598 return asV; 599 } 600 } 601 602 @Override 603 public boolean isDone() { 604 final Object localValue = value; 605 return localValue != null & !(localValue instanceof SetFuture); 606 } 607 608 @Override 609 public boolean isCancelled() { 610 final Object localValue = value; 611 return localValue instanceof Cancellation; 612 } 613 614 /** 615 * {@inheritDoc} 616 * 617 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain 618 * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate 619 * {@code Future} that was supplied in the {@code setFuture} call. 620 * 621 * <p>Rather than override this method to perform additional cancellation work or cleanup, 622 * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link 623 * #wasInterrupted} as necessary. This ensures that the work is done even if the future is 624 * cancelled without a call to {@code cancel}, such as by calling {@code 625 * setFuture(cancelledFuture)}. 626 * 627 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 628 * acquire other locks, risking deadlocks. 629 */ 630 @CanIgnoreReturnValue 631 @Override 632 public boolean cancel(boolean mayInterruptIfRunning) { 633 Object localValue = value; 634 boolean rValue = false; 635 if (localValue == null | localValue instanceof SetFuture) { 636 // Try to delay allocating the exception. At this point we may still lose the CAS, but it is 637 // certainly less likely. 638 Object valueToSet = 639 GENERATE_CANCELLATION_CAUSES 640 ? new Cancellation( 641 mayInterruptIfRunning, new CancellationException("Future.cancel() was called.")) 642 /* 643 * requireNonNull is safe because we've initialized these if 644 * !GENERATE_CANCELLATION_CAUSES. 645 * 646 * TODO(cpovirk): Maybe it would be cleaner to define a CancellationSupplier interface 647 * with two implementations, one that contains causeless Cancellation instances and 648 * the other of which creates new Cancellation instances each time it's called? Yet 649 * another alternative is to fill in a non-null value for each of the fields no matter 650 * what and to just not use it if !GENERATE_CANCELLATION_CAUSES. 651 */ 652 : requireNonNull( 653 mayInterruptIfRunning 654 ? Cancellation.CAUSELESS_INTERRUPTED 655 : Cancellation.CAUSELESS_CANCELLED); 656 AbstractFuture<?> abstractFuture = this; 657 while (true) { 658 if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { 659 rValue = true; 660 // We call interruptTask before calling complete(), which is consistent with 661 // FutureTask 662 if (mayInterruptIfRunning) { 663 abstractFuture.interruptTask(); 664 } 665 complete(abstractFuture); 666 if (localValue instanceof SetFuture) { 667 // propagate cancellation to the future set in setfuture, this is racy, and we don't 668 // care if we are successful or not. 669 ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; 670 if (futureToPropagateTo instanceof Trusted) { 671 // If the future is a TrustedFuture then we specifically avoid calling cancel() 672 // this has 2 benefits 673 // 1. for long chains of futures strung together with setFuture we consume less stack 674 // 2. we avoid allocating Cancellation objects at every level of the cancellation 675 // chain 676 // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and 677 // does nothing but delegate to this method. 678 AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo; 679 localValue = trusted.value; 680 if (localValue == null | localValue instanceof SetFuture) { 681 abstractFuture = trusted; 682 continue; // loop back up and try to complete the new future 683 } 684 } else { 685 // not a TrustedFuture, call cancel directly. 686 futureToPropagateTo.cancel(mayInterruptIfRunning); 687 } 688 } 689 break; 690 } 691 // obj changed, reread 692 localValue = abstractFuture.value; 693 if (!(localValue instanceof SetFuture)) { 694 // obj cannot be null at this point, because value can only change from null to non-null. 695 // So if value changed (and it did since we lost the CAS), then it cannot be null and 696 // since it isn't a SetFuture, then the future must be done and we should exit the loop 697 break; 698 } 699 } 700 } 701 return rValue; 702 } 703 704 /** 705 * Subclasses can override this method to implement interruption of the future's computation. The 706 * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}. 707 * 708 * <p>The default implementation does nothing. 709 * 710 * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, consulting 711 * {@link #wasInterrupted} to decide whether to interrupt your task. 712 * 713 * @since 10.0 714 */ 715 protected void interruptTask() {} 716 717 /** 718 * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code 719 * true}. 720 * 721 * @since 14.0 722 */ 723 protected final boolean wasInterrupted() { 724 final Object localValue = value; 725 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 726 } 727 728 /** 729 * {@inheritDoc} 730 * 731 * @since 10.0 732 */ 733 @Override 734 public void addListener(Runnable listener, Executor executor) { 735 checkNotNull(listener, "Runnable was null."); 736 checkNotNull(executor, "Executor was null."); 737 // Checking isDone and listeners != TOMBSTONE may seem redundant, but our contract for 738 // addListener says that listeners execute 'immediate' if the future isDone(). However, our 739 // protocol for completing a future is to assign the value field (which sets isDone to true) and 740 // then to release waiters, followed by executing afterDone(), followed by releasing listeners. 741 // That means that it is possible to observe that the future isDone and that your listeners 742 // don't execute 'immediately'. By checking isDone here we avoid that. 743 // A corollary to all that is that we don't need to check isDone inside the loop because if we 744 // get into the loop we know that we weren't done when we entered and therefore we aren't under 745 // an obligation to execute 'immediately'. 746 if (!isDone()) { 747 Listener oldHead = listeners; 748 if (oldHead != Listener.TOMBSTONE) { 749 Listener newNode = new Listener(listener, executor); 750 do { 751 newNode.next = oldHead; 752 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 753 return; 754 } 755 oldHead = listeners; // re-read 756 } while (oldHead != Listener.TOMBSTONE); 757 } 758 } 759 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 760 // the listener. 761 executeListener(listener, executor); 762 } 763 764 /** 765 * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or 766 * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns, 767 * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was 768 * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code 769 * Future} may have previously been set asynchronously, in which case its result may not be known 770 * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 771 * method, only by a call to {@link #cancel}. 772 * 773 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 774 * acquire other locks, risking deadlocks. 775 * 776 * @param value the value to be used as the result 777 * @return true if the attempt was accepted, completing the {@code Future} 778 */ 779 @CanIgnoreReturnValue 780 protected boolean set(@ParametricNullness V value) { 781 Object valueToSet = value == null ? NULL : value; 782 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 783 complete(this); 784 return true; 785 } 786 return false; 787 } 788 789 /** 790 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 791 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 792 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> 793 * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the 794 * {@code Future} may have previously been set asynchronously, in which case its result may not be 795 * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 796 * method, only by a call to {@link #cancel}. 797 * 798 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 799 * acquire other locks, risking deadlocks. 800 * 801 * @param throwable the exception to be used as the failed result 802 * @return true if the attempt was accepted, completing the {@code Future} 803 */ 804 @CanIgnoreReturnValue 805 protected boolean setException(Throwable throwable) { 806 Object valueToSet = new Failure(checkNotNull(throwable)); 807 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 808 complete(this); 809 return true; 810 } 811 return false; 812 } 813 814 /** 815 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 816 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 817 * (including "set asynchronously," defined below). 818 * 819 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call 820 * is accepted, then this future is guaranteed to have been completed with the supplied future by 821 * the time this method returns. If the supplied future is not done and the call is accepted, then 822 * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known, 823 * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}. 824 * 825 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 826 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 827 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 828 * Future}. 829 * 830 * <p>Note that, even if the supplied future is cancelled and it causes this future to complete, 831 * it will never trigger interruption behavior. In particular, it will not cause this future to 832 * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not 833 * return {@code true}. 834 * 835 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 836 * acquire other locks, risking deadlocks. 837 * 838 * @param future the future to delegate to 839 * @return true if the attempt was accepted, indicating that the {@code Future} was not previously 840 * cancelled or set. 841 * @since 19.0 842 */ 843 @CanIgnoreReturnValue 844 protected boolean setFuture(ListenableFuture<? extends V> future) { 845 checkNotNull(future); 846 Object localValue = value; 847 if (localValue == null) { 848 if (future.isDone()) { 849 Object value = getFutureValue(future); 850 if (ATOMIC_HELPER.casValue(this, null, value)) { 851 complete(this); 852 return true; 853 } 854 return false; 855 } 856 SetFuture<V> valueToSet = new SetFuture<V>(this, future); 857 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 858 // the listener is responsible for calling completeWithFuture, directExecutor is appropriate 859 // since all we are doing is unpacking a completed future which should be fast. 860 try { 861 future.addListener(valueToSet, DirectExecutor.INSTANCE); 862 } catch (Throwable t) { 863 // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this 864 // must have been caused by addListener itself. The most likely explanation is a 865 // misconfigured mock. Try to switch to Failure. 866 Failure failure; 867 try { 868 failure = new Failure(t); 869 } catch (Throwable oomMostLikely) { 870 failure = Failure.FALLBACK_INSTANCE; 871 } 872 // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok. 873 boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); 874 } 875 return true; 876 } 877 localValue = value; // we lost the cas, fall through and maybe cancel 878 } 879 // The future has already been set to something. If it is cancellation we should cancel the 880 // incoming future. 881 if (localValue instanceof Cancellation) { 882 // we don't care if it fails, this is best-effort. 883 future.cancel(((Cancellation) localValue).wasInterrupted); 884 } 885 return false; 886 } 887 888 /** 889 * Returns a value that satisfies the contract of the {@link #value} field based on the state of 890 * given future. 891 * 892 * <p>This is approximately the inverse of {@link #getDoneValue(Object)} 893 */ 894 private static Object getFutureValue(ListenableFuture<?> future) { 895 if (future instanceof Trusted) { 896 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 897 // override .get() (since it is final) and therefore this is equivalent to calling .get() 898 // and unpacking the exceptions like we do below (just much faster because it is a single 899 // field read instead of a read, several branches and possibly creating exceptions). 900 Object v = ((AbstractFuture<?>) future).value; 901 if (v instanceof Cancellation) { 902 // If the other future was interrupted, clear the interrupted bit while preserving the cause 903 // this will make it consistent with how non-trustedfutures work which cannot propagate the 904 // wasInterrupted bit 905 Cancellation c = (Cancellation) v; 906 if (c.wasInterrupted) { 907 v = 908 c.cause != null 909 ? new Cancellation(/* wasInterrupted= */ false, c.cause) 910 : Cancellation.CAUSELESS_CANCELLED; 911 } 912 } 913 // requireNonNull is safe as long as we call this method only on completed futures. 914 return requireNonNull(v); 915 } 916 if (future instanceof InternalFutureFailureAccess) { 917 Throwable throwable = 918 InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess) future); 919 if (throwable != null) { 920 return new Failure(throwable); 921 } 922 } 923 boolean wasCancelled = future.isCancelled(); 924 // Don't allocate a CancellationException if it's not necessary 925 if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) { 926 /* 927 * requireNonNull is safe because we've initialized CAUSELESS_CANCELLED if 928 * !GENERATE_CANCELLATION_CAUSES. 929 */ 930 return requireNonNull(Cancellation.CAUSELESS_CANCELLED); 931 } 932 // Otherwise calculate the value by calling .get() 933 try { 934 Object v = getUninterruptibly(future); 935 if (wasCancelled) { 936 return new Cancellation( 937 false, 938 new IllegalArgumentException( 939 "get() did not throw CancellationException, despite reporting " 940 + "isCancelled() == true: " 941 + future)); 942 } 943 return v == null ? NULL : v; 944 } catch (ExecutionException exception) { 945 if (wasCancelled) { 946 return new Cancellation( 947 false, 948 new IllegalArgumentException( 949 "get() did not throw CancellationException, despite reporting " 950 + "isCancelled() == true: " 951 + future, 952 exception)); 953 } 954 return new Failure(exception.getCause()); 955 } catch (CancellationException cancellation) { 956 if (!wasCancelled) { 957 return new Failure( 958 new IllegalArgumentException( 959 "get() threw CancellationException, despite reporting isCancelled() == false: " 960 + future, 961 cancellation)); 962 } 963 return new Cancellation(false, cancellation); 964 } catch (Throwable t) { 965 return new Failure(t); 966 } 967 } 968 969 /** 970 * An inlined private copy of {@link Uninterruptibles#getUninterruptibly} used to break an 971 * internal dependency on other /util/concurrent classes. 972 */ 973 @ParametricNullness 974 private static <V extends @Nullable Object> V getUninterruptibly(Future<V> future) 975 throws ExecutionException { 976 boolean interrupted = false; 977 try { 978 while (true) { 979 try { 980 return future.get(); 981 } catch (InterruptedException e) { 982 interrupted = true; 983 } 984 } 985 } finally { 986 if (interrupted) { 987 Thread.currentThread().interrupt(); 988 } 989 } 990 } 991 992 /** Unblocks all threads and runs all listeners. */ 993 private static void complete(AbstractFuture<?> param) { 994 // Declare a "true" local variable so that the Checker Framework will infer nullness. 995 AbstractFuture<?> future = param; 996 997 Listener next = null; 998 outer: 999 while (true) { 1000 future.releaseWaiters(); 1001 // We call this before the listeners in order to avoid needing to manage a separate stack data 1002 // structure for them. Also, some implementations rely on this running prior to listeners 1003 // so that the cleanup work is visible to listeners. 1004 // afterDone() should be generally fast and only used for cleanup work... but in theory can 1005 // also be recursive and create StackOverflowErrors 1006 future.afterDone(); 1007 // push the current set of listeners onto next 1008 next = future.clearListeners(next); 1009 future = null; 1010 while (next != null) { 1011 Listener curr = next; 1012 next = next.next; 1013 /* 1014 * requireNonNull is safe because the listener stack never contains TOMBSTONE until after 1015 * clearListeners. 1016 */ 1017 Runnable task = requireNonNull(curr.task); 1018 if (task instanceof SetFuture) { 1019 SetFuture<?> setFuture = (SetFuture<?>) task; 1020 // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long 1021 // chains of SetFutures 1022 // Handling this special case is important because there is no way to pass an executor to 1023 // setFuture, so a user couldn't break the chain by doing this themselves. It is also 1024 // potentially common if someone writes a recursive Futures.transformAsync transformer. 1025 future = setFuture.owner; 1026 if (future.value == setFuture) { 1027 Object valueToSet = getFutureValue(setFuture.future); 1028 if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { 1029 continue outer; 1030 } 1031 } 1032 // other wise the future we were trying to set is already done. 1033 } else { 1034 /* 1035 * requireNonNull is safe because the listener stack never contains TOMBSTONE until after 1036 * clearListeners. 1037 */ 1038 executeListener(task, requireNonNull(curr.executor)); 1039 } 1040 } 1041 break; 1042 } 1043 } 1044 1045 /** 1046 * Callback method that is called exactly once after the future is completed. 1047 * 1048 * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. 1049 * 1050 * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is 1051 * intended for very lightweight cleanup work, for example, timing statistics or clearing fields. 1052 * If your task does anything heavier consider, just using a listener with an executor. 1053 * 1054 * @since 20.0 1055 */ 1056 @Beta 1057 @ForOverride 1058 protected void afterDone() {} 1059 1060 // TODO(b/114236866): Inherit doc from InternalFutureFailureAccess. Also, -link to its URL. 1061 /** 1062 * Usually returns {@code null} but, if this {@code Future} has failed, may <i>optionally</i> 1063 * return the cause of the failure. "Failure" means specifically "completed with an exception"; it 1064 * does not include "was cancelled." To be explicit: If this method returns a non-null value, 1065 * then: 1066 * 1067 * <ul> 1068 * <li>{@code isDone()} must return {@code true} 1069 * <li>{@code isCancelled()} must return {@code false} 1070 * <li>{@code get()} must not block, and it must throw an {@code ExecutionException} with the 1071 * return value of this method as its cause 1072 * </ul> 1073 * 1074 * <p>This method is {@code protected} so that classes like {@code 1075 * com.google.common.util.concurrent.SettableFuture} do not expose it to their users as an 1076 * instance method. In the unlikely event that you need to call this method, call {@link 1077 * InternalFutures#tryInternalFastPathGetFailure(InternalFutureFailureAccess)}. 1078 * 1079 * @since 27.0 1080 */ 1081 @Override 1082 /* 1083 * We should annotate the superclass, InternalFutureFailureAccess, to say that its copy of this 1084 * method returns @Nullable, too. However, we're not sure if we want to make any changes to that 1085 * class, since it's in a separate artifact that we planned to release only a single version of. 1086 */ 1087 @CheckForNull 1088 protected final Throwable tryInternalFastPathGetFailure() { 1089 if (this instanceof Trusted) { 1090 Object obj = value; 1091 if (obj instanceof Failure) { 1092 return ((Failure) obj).exception; 1093 } 1094 } 1095 return null; 1096 } 1097 1098 /** 1099 * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts) 1100 * the given future (if available). 1101 */ 1102 final void maybePropagateCancellationTo(@CheckForNull Future<?> related) { 1103 if (related != null & isCancelled()) { 1104 related.cancel(wasInterrupted()); 1105 } 1106 } 1107 1108 /** Releases all threads in the {@link #waiters} list, and clears the list. */ 1109 private void releaseWaiters() { 1110 Waiter head = ATOMIC_HELPER.gasWaiters(this, Waiter.TOMBSTONE); 1111 for (Waiter currentWaiter = head; currentWaiter != null; currentWaiter = currentWaiter.next) { 1112 currentWaiter.unpark(); 1113 } 1114 } 1115 1116 /** 1117 * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently 1118 * added first. 1119 */ 1120 @CheckForNull 1121 private Listener clearListeners(@CheckForNull Listener onto) { 1122 // We need to 1123 // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to 1124 // to synchronize with us 1125 // 2. reverse the linked list, because despite our rather clear contract, people depend on us 1126 // executing listeners in the order they were added 1127 // 3. push all the items onto 'onto' and return the new head of the stack 1128 Listener head = ATOMIC_HELPER.gasListeners(this, Listener.TOMBSTONE); 1129 Listener reversedList = onto; 1130 while (head != null) { 1131 Listener tmp = head; 1132 head = head.next; 1133 tmp.next = reversedList; 1134 reversedList = tmp; 1135 } 1136 return reversedList; 1137 } 1138 1139 // TODO(user): move parts into a default method on ListenableFuture? 1140 @Override 1141 public String toString() { 1142 // TODO(cpovirk): Presize to something plausible? 1143 StringBuilder builder = new StringBuilder(); 1144 if (getClass().getName().startsWith("com.google.common.util.concurrent.")) { 1145 builder.append(getClass().getSimpleName()); 1146 } else { 1147 builder.append(getClass().getName()); 1148 } 1149 builder.append('@').append(toHexString(identityHashCode(this))).append("[status="); 1150 if (isCancelled()) { 1151 builder.append("CANCELLED"); 1152 } else if (isDone()) { 1153 addDoneString(builder); 1154 } else { 1155 addPendingString(builder); // delegates to addDoneString if future completes mid-way 1156 } 1157 return builder.append("]").toString(); 1158 } 1159 1160 /** 1161 * Provide a human-readable explanation of why this future has not yet completed. 1162 * 1163 * @return null if an explanation cannot be provided (e.g. because the future is done). 1164 * @since 23.0 1165 */ 1166 @CheckForNull 1167 protected String pendingToString() { 1168 // TODO(diamondm) consider moving this into addPendingString so it's always in the output 1169 if (this instanceof ScheduledFuture) { 1170 return "remaining delay=[" 1171 + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS) 1172 + " ms]"; 1173 } 1174 return null; 1175 } 1176 1177 private void addPendingString(StringBuilder builder) { 1178 // Capture current builder length so it can be truncated if this future ends up completing while 1179 // the toString is being calculated 1180 int truncateLength = builder.length(); 1181 1182 builder.append("PENDING"); 1183 1184 Object localValue = value; 1185 if (localValue instanceof SetFuture) { 1186 builder.append(", setFuture=["); 1187 appendUserObject(builder, ((SetFuture) localValue).future); 1188 builder.append("]"); 1189 } else { 1190 String pendingDescription; 1191 try { 1192 pendingDescription = Strings.emptyToNull(pendingToString()); 1193 } catch (RuntimeException | StackOverflowError e) { 1194 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1195 // subclass is implemented with bugs similar to the subclass. 1196 pendingDescription = "Exception thrown from implementation: " + e.getClass(); 1197 } 1198 if (pendingDescription != null) { 1199 builder.append(", info=[").append(pendingDescription).append("]"); 1200 } 1201 } 1202 1203 // The future may complete while calculating the toString, so we check once more to see if the 1204 // future is done 1205 if (isDone()) { 1206 // Truncate anything that was appended before realizing this future is done 1207 builder.delete(truncateLength, builder.length()); 1208 addDoneString(builder); 1209 } 1210 } 1211 1212 private void addDoneString(StringBuilder builder) { 1213 try { 1214 V value = getUninterruptibly(this); 1215 builder.append("SUCCESS, result=["); 1216 appendResultObject(builder, value); 1217 builder.append("]"); 1218 } catch (ExecutionException e) { 1219 builder.append("FAILURE, cause=[").append(e.getCause()).append("]"); 1220 } catch (CancellationException e) { 1221 builder.append("CANCELLED"); // shouldn't be reachable 1222 } catch (RuntimeException e) { 1223 builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]"); 1224 } 1225 } 1226 1227 /** 1228 * Any object can be the result of a Future, and not every object has a reasonable toString() 1229 * implementation. Using a reconstruction of the default Object.toString() prevents OOMs and stack 1230 * overflows, and helps avoid sensitive data inadvertently ending up in exception messages. 1231 */ 1232 private void appendResultObject(StringBuilder builder, @CheckForNull Object o) { 1233 if (o == null) { 1234 builder.append("null"); 1235 } else if (o == this) { 1236 builder.append("this future"); 1237 } else { 1238 builder 1239 .append(o.getClass().getName()) 1240 .append("@") 1241 .append(Integer.toHexString(System.identityHashCode(o))); 1242 } 1243 } 1244 1245 /** Helper for printing user supplied objects into our toString method. */ 1246 private void appendUserObject(StringBuilder builder, @CheckForNull Object o) { 1247 // This is some basic recursion detection for when people create cycles via set/setFuture or 1248 // when deep chains of futures exist resulting in a StackOverflowException. We could detect 1249 // arbitrary cycles using a thread local but this should be a good enough solution (it is also 1250 // what jdk collections do in these cases) 1251 try { 1252 if (o == this) { 1253 builder.append("this future"); 1254 } else { 1255 builder.append(o); 1256 } 1257 } catch (RuntimeException | StackOverflowError e) { 1258 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1259 // user object is implemented with bugs similar to the user object. 1260 builder.append("Exception thrown from implementation: ").append(e.getClass()); 1261 } 1262 } 1263 1264 /** 1265 * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain 1266 * RuntimeException runtime exceptions} thrown by the executor. 1267 */ 1268 private static void executeListener(Runnable runnable, Executor executor) { 1269 try { 1270 executor.execute(runnable); 1271 } catch (RuntimeException e) { 1272 // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if 1273 // we're given a bad one. We only catch RuntimeException because we want Errors to propagate 1274 // up. 1275 log.log( 1276 Level.SEVERE, 1277 "RuntimeException while executing runnable " + runnable + " with executor " + executor, 1278 e); 1279 } 1280 } 1281 1282 private abstract static class AtomicHelper { 1283 /** Non volatile write of the thread to the {@link Waiter#thread} field. */ 1284 abstract void putThread(Waiter waiter, Thread newValue); 1285 1286 /** Non volatile write of the waiter to the {@link Waiter#next} field. */ 1287 abstract void putNext(Waiter waiter, @CheckForNull Waiter newValue); 1288 1289 /** Performs a CAS operation on the {@link #waiters} field. */ 1290 abstract boolean casWaiters( 1291 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update); 1292 1293 /** Performs a CAS operation on the {@link #listeners} field. */ 1294 abstract boolean casListeners( 1295 AbstractFuture<?> future, @CheckForNull Listener expect, Listener update); 1296 1297 /** Performs a GAS operation on the {@link #waiters} field. */ 1298 abstract Waiter gasWaiters(AbstractFuture<?> future, Waiter update); 1299 1300 /** Performs a GAS operation on the {@link #listeners} field. */ 1301 abstract Listener gasListeners(AbstractFuture<?> future, Listener update); 1302 1303 /** Performs a CAS operation on the {@link #value} field. */ 1304 abstract boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update); 1305 } 1306 1307 /** 1308 * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. 1309 * 1310 * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot 1311 * be accessed. 1312 */ 1313 @SuppressWarnings("sunapi") 1314 private static final class UnsafeAtomicHelper extends AtomicHelper { 1315 static final sun.misc.Unsafe UNSAFE; 1316 static final long LISTENERS_OFFSET; 1317 static final long WAITERS_OFFSET; 1318 static final long VALUE_OFFSET; 1319 static final long WAITER_THREAD_OFFSET; 1320 static final long WAITER_NEXT_OFFSET; 1321 1322 static { 1323 sun.misc.Unsafe unsafe = null; 1324 try { 1325 unsafe = sun.misc.Unsafe.getUnsafe(); 1326 } catch (SecurityException tryReflectionInstead) { 1327 try { 1328 unsafe = 1329 AccessController.doPrivileged( 1330 new PrivilegedExceptionAction<sun.misc.Unsafe>() { 1331 @Override 1332 public sun.misc.Unsafe run() throws Exception { 1333 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; 1334 for (java.lang.reflect.Field f : k.getDeclaredFields()) { 1335 f.setAccessible(true); 1336 Object x = f.get(null); 1337 if (k.isInstance(x)) { 1338 return k.cast(x); 1339 } 1340 } 1341 throw new NoSuchFieldError("the Unsafe"); 1342 } 1343 }); 1344 } catch (PrivilegedActionException e) { 1345 throw new RuntimeException("Could not initialize intrinsics", e.getCause()); 1346 } 1347 } 1348 try { 1349 Class<?> abstractFuture = AbstractFuture.class; 1350 WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters")); 1351 LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners")); 1352 VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value")); 1353 WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread")); 1354 WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next")); 1355 UNSAFE = unsafe; 1356 } catch (Exception e) { 1357 throwIfUnchecked(e); 1358 throw new RuntimeException(e); 1359 } 1360 } 1361 1362 @Override 1363 void putThread(Waiter waiter, Thread newValue) { 1364 UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue); 1365 } 1366 1367 @Override 1368 void putNext(Waiter waiter, @CheckForNull Waiter newValue) { 1369 UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue); 1370 } 1371 1372 /** Performs a CAS operation on the {@link #waiters} field. */ 1373 @Override 1374 boolean casWaiters( 1375 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update) { 1376 return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update); 1377 } 1378 1379 /** Performs a CAS operation on the {@link #listeners} field. */ 1380 @Override 1381 boolean casListeners(AbstractFuture<?> future, @CheckForNull Listener expect, Listener update) { 1382 return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update); 1383 } 1384 1385 /** Performs a GAS operation on the {@link #listeners} field. */ 1386 @Override 1387 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1388 while (true) { 1389 Listener listener = future.listeners; 1390 if (update == listener) { 1391 return listener; 1392 } 1393 if (casListeners(future, listener, update)) { 1394 return listener; 1395 } 1396 } 1397 } 1398 1399 /** Performs a GAS operation on the {@link #waiters} field. */ 1400 @Override 1401 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1402 while (true) { 1403 Waiter waiter = future.waiters; 1404 if (update == waiter) { 1405 return waiter; 1406 } 1407 if (casWaiters(future, waiter, update)) { 1408 return waiter; 1409 } 1410 } 1411 } 1412 1413 /** Performs a CAS operation on the {@link #value} field. */ 1414 @Override 1415 boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update) { 1416 return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update); 1417 } 1418 } 1419 1420 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 1421 @SuppressWarnings("rawtypes") 1422 private static final class SafeAtomicHelper extends AtomicHelper { 1423 final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; 1424 final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; 1425 final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater; 1426 final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater; 1427 final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater; 1428 1429 SafeAtomicHelper( 1430 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, 1431 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, 1432 AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, 1433 AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, 1434 AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) { 1435 this.waiterThreadUpdater = waiterThreadUpdater; 1436 this.waiterNextUpdater = waiterNextUpdater; 1437 this.waitersUpdater = waitersUpdater; 1438 this.listenersUpdater = listenersUpdater; 1439 this.valueUpdater = valueUpdater; 1440 } 1441 1442 @Override 1443 void putThread(Waiter waiter, Thread newValue) { 1444 waiterThreadUpdater.lazySet(waiter, newValue); 1445 } 1446 1447 @Override 1448 void putNext(Waiter waiter, @CheckForNull Waiter newValue) { 1449 waiterNextUpdater.lazySet(waiter, newValue); 1450 } 1451 1452 @Override 1453 boolean casWaiters( 1454 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update) { 1455 return waitersUpdater.compareAndSet(future, expect, update); 1456 } 1457 1458 @Override 1459 boolean casListeners(AbstractFuture<?> future, @CheckForNull Listener expect, Listener update) { 1460 return listenersUpdater.compareAndSet(future, expect, update); 1461 } 1462 1463 /** Performs a GAS operation on the {@link #listeners} field. */ 1464 @Override 1465 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1466 return listenersUpdater.getAndSet(future, update); 1467 } 1468 1469 /** Performs a GAS operation on the {@link #waiters} field. */ 1470 @Override 1471 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1472 return waitersUpdater.getAndSet(future, update); 1473 } 1474 1475 @Override 1476 boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update) { 1477 return valueUpdater.compareAndSet(future, expect, update); 1478 } 1479 } 1480 1481 /** 1482 * {@link AtomicHelper} based on {@code synchronized} and volatile writes. 1483 * 1484 * <p>This is an implementation of last resort for when certain basic VM features are broken (like 1485 * AtomicReferenceFieldUpdater). 1486 */ 1487 private static final class SynchronizedHelper extends AtomicHelper { 1488 @Override 1489 void putThread(Waiter waiter, Thread newValue) { 1490 waiter.thread = newValue; 1491 } 1492 1493 @Override 1494 void putNext(Waiter waiter, @CheckForNull Waiter newValue) { 1495 waiter.next = newValue; 1496 } 1497 1498 @Override 1499 boolean casWaiters( 1500 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update) { 1501 synchronized (future) { 1502 if (future.waiters == expect) { 1503 future.waiters = update; 1504 return true; 1505 } 1506 return false; 1507 } 1508 } 1509 1510 @Override 1511 boolean casListeners(AbstractFuture<?> future, @CheckForNull Listener expect, Listener update) { 1512 synchronized (future) { 1513 if (future.listeners == expect) { 1514 future.listeners = update; 1515 return true; 1516 } 1517 return false; 1518 } 1519 } 1520 1521 /** Performs a GAS operation on the {@link #listeners} field. */ 1522 @Override 1523 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1524 synchronized (future) { 1525 Listener old = future.listeners; 1526 if (old != update) { 1527 future.listeners = update; 1528 } 1529 return old; 1530 } 1531 } 1532 1533 /** Performs a GAS operation on the {@link #waiters} field. */ 1534 @Override 1535 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1536 synchronized (future) { 1537 Waiter old = future.waiters; 1538 if (old != update) { 1539 future.waiters = update; 1540 } 1541 return old; 1542 } 1543 } 1544 1545 @Override 1546 boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update) { 1547 synchronized (future) { 1548 if (future.value == expect) { 1549 future.value = update; 1550 return true; 1551 } 1552 return false; 1553 } 1554 } 1555 } 1556 1557 private static CancellationException cancellationExceptionWithCause( 1558 String message, @CheckForNull Throwable cause) { 1559 CancellationException exception = new CancellationException(message); 1560 exception.initCause(cause); 1561 return exception; 1562 } 1563}