001/* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.util.concurrent.NullnessCasts.uncheckedNull; 019import static java.lang.Integer.toHexString; 020import static java.lang.System.identityHashCode; 021import static java.util.Objects.requireNonNull; 022import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 023 024import com.google.common.annotations.GwtCompatible; 025import com.google.common.base.Strings; 026import com.google.common.util.concurrent.internal.InternalFutureFailureAccess; 027import com.google.common.util.concurrent.internal.InternalFutures; 028import com.google.errorprone.annotations.CanIgnoreReturnValue; 029import com.google.errorprone.annotations.ForOverride; 030import com.google.j2objc.annotations.ReflectionSupport; 031import java.security.AccessController; 032import java.security.PrivilegedActionException; 033import java.security.PrivilegedExceptionAction; 034import java.util.Locale; 035import java.util.concurrent.CancellationException; 036import java.util.concurrent.ExecutionException; 037import java.util.concurrent.Executor; 038import java.util.concurrent.Future; 039import java.util.concurrent.ScheduledFuture; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.TimeoutException; 042import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 043import java.util.concurrent.locks.LockSupport; 044import java.util.logging.Level; 045import javax.annotation.CheckForNull; 046import org.checkerframework.checker.nullness.qual.Nullable; 047 048/** 049 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 050 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 051 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 052 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, 053 * com.google.common.base.Function, java.util.concurrent.Executor) Futures.transform} and {@link 054 * Futures#catching(ListenableFuture, Class, com.google.common.base.Function, 055 * java.util.concurrent.Executor) Futures.catching}. 056 * 057 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 058 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 059 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 060 * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses 061 * should rarely override other methods. 062 * 063 * @author Sven Mawson 064 * @author Luke Sandberg 065 * @since 1.0 066 */ 067@SuppressWarnings({ 068 "ShortCircuitBoolean", // we use non-short circuiting comparisons intentionally 069 "nullness", // TODO(b/147136275): Remove once our checker understands & and |. 070}) 071@GwtCompatible(emulated = true) 072@ReflectionSupport(value = ReflectionSupport.Level.FULL) 073@ElementTypesAreNonnullByDefault 074public abstract class AbstractFuture<V extends @Nullable Object> extends InternalFutureFailureAccess 075 implements ListenableFuture<V> { 076 // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || 077 078 static final boolean GENERATE_CANCELLATION_CAUSES; 079 080 static { 081 // System.getProperty may throw if the security policy does not permit access. 082 boolean generateCancellationCauses; 083 try { 084 generateCancellationCauses = 085 Boolean.parseBoolean( 086 System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); 087 } catch (SecurityException e) { 088 generateCancellationCauses = false; 089 } 090 GENERATE_CANCELLATION_CAUSES = generateCancellationCauses; 091 } 092 093 /** 094 * Tag interface marking trusted subclasses. This enables some optimizations. The implementation 095 * of this interface must also be an AbstractFuture and must not override or expose for overriding 096 * any of the public methods of ListenableFuture. 097 */ 098 interface Trusted<V extends @Nullable Object> extends ListenableFuture<V> {} 099 100 /** 101 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 102 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 103 */ 104 abstract static class TrustedFuture<V extends @Nullable Object> extends AbstractFuture<V> 105 implements Trusted<V> { 106 @CanIgnoreReturnValue 107 @Override 108 @ParametricNullness 109 public final V get() throws InterruptedException, ExecutionException { 110 return super.get(); 111 } 112 113 @CanIgnoreReturnValue 114 @Override 115 @ParametricNullness 116 public final V get(long timeout, TimeUnit unit) 117 throws InterruptedException, ExecutionException, TimeoutException { 118 return super.get(timeout, unit); 119 } 120 121 @Override 122 public final boolean isDone() { 123 return super.isDone(); 124 } 125 126 @Override 127 public final boolean isCancelled() { 128 return super.isCancelled(); 129 } 130 131 @Override 132 public final void addListener(Runnable listener, Executor executor) { 133 super.addListener(listener, executor); 134 } 135 136 @CanIgnoreReturnValue 137 @Override 138 public final boolean cancel(boolean mayInterruptIfRunning) { 139 return super.cancel(mayInterruptIfRunning); 140 } 141 } 142 143 static final LazyLogger log = new LazyLogger(AbstractFuture.class); 144 145 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 146 // blocking. This value is what AbstractQueuedSynchronizer uses. 147 private static final long SPIN_THRESHOLD_NANOS = 1000L; 148 149 private static final AtomicHelper ATOMIC_HELPER; 150 151 static { 152 AtomicHelper helper; 153 Throwable thrownUnsafeFailure = null; 154 Throwable thrownAtomicReferenceFieldUpdaterFailure = null; 155 156 try { 157 helper = new UnsafeAtomicHelper(); 158 } catch (Exception | Error unsafeFailure) { // sneaky checked exception 159 thrownUnsafeFailure = unsafeFailure; 160 // catch absolutely everything and fall through to our 'SafeAtomicHelper' 161 // The access control checks that ARFU does means the caller class has to be AbstractFuture 162 // instead of SafeAtomicHelper, so we annoyingly define these here 163 try { 164 helper = 165 new SafeAtomicHelper( 166 newUpdater(Waiter.class, Thread.class, "thread"), 167 newUpdater(Waiter.class, Waiter.class, "next"), 168 newUpdater(AbstractFuture.class, Waiter.class, "waiters"), 169 newUpdater(AbstractFuture.class, Listener.class, "listeners"), 170 newUpdater(AbstractFuture.class, Object.class, "value")); 171 } catch (Exception // sneaky checked exception 172 | Error atomicReferenceFieldUpdaterFailure) { 173 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 174 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 175 // For these users fallback to a suboptimal implementation, based on synchronized. This will 176 // be a definite performance hit to those users. 177 thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure; 178 helper = new SynchronizedHelper(); 179 } 180 } 181 ATOMIC_HELPER = helper; 182 183 // Prevent rare disastrous classloading in first call to LockSupport.park. 184 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 185 @SuppressWarnings("unused") 186 Class<?> ensureLoaded = LockSupport.class; 187 188 // Log after all static init is finished; if an installed logger uses any Futures methods, it 189 // shouldn't break in cases where reflection is missing/broken. 190 if (thrownAtomicReferenceFieldUpdaterFailure != null) { 191 log.get().log(Level.SEVERE, "UnsafeAtomicHelper is broken!", thrownUnsafeFailure); 192 log.get() 193 .log( 194 Level.SEVERE, 195 "SafeAtomicHelper is broken!", 196 thrownAtomicReferenceFieldUpdaterFailure); 197 } 198 } 199 200 /** Waiter links form a Treiber stack, in the {@link #waiters} field. */ 201 private static final class Waiter { 202 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 203 204 @CheckForNull volatile Thread thread; 205 @CheckForNull volatile Waiter next; 206 207 /** 208 * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 209 * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 210 */ 211 Waiter(boolean unused) {} 212 213 Waiter() { 214 // avoid volatile write, write is made visible by subsequent CAS on waiters field 215 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 216 } 217 218 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 219 // field. 220 void setNext(@CheckForNull Waiter next) { 221 ATOMIC_HELPER.putNext(this, next); 222 } 223 224 void unpark() { 225 // This is racy with removeWaiter. The consequence of the race is that we may spuriously call 226 // unpark even though the thread has already removed itself from the list. But even if we did 227 // use a CAS, that race would still exist (it would just be ever so slightly smaller). 228 Thread w = thread; 229 if (w != null) { 230 thread = null; 231 LockSupport.unpark(w); 232 } 233 } 234 } 235 236 /** 237 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 238 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved 239 * by two things. 240 * 241 * <ul> 242 * <li>This is only called when a waiting thread times out or is interrupted. Both of which 243 * should be rare. 244 * <li>The waiters list should be very short. 245 * </ul> 246 */ 247 private void removeWaiter(Waiter node) { 248 node.thread = null; // mark as 'deleted' 249 restart: 250 while (true) { 251 Waiter pred = null; 252 Waiter curr = waiters; 253 if (curr == Waiter.TOMBSTONE) { 254 return; // give up if someone is calling complete 255 } 256 Waiter succ; 257 while (curr != null) { 258 succ = curr.next; 259 if (curr.thread != null) { // we aren't unlinking this node, update pred. 260 pred = curr; 261 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 262 pred.next = succ; 263 if (pred.thread == null) { // We raced with another node that unlinked pred. Restart. 264 continue restart; 265 } 266 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 267 continue restart; // We raced with an add or complete 268 } 269 curr = succ; 270 } 271 break; 272 } 273 } 274 275 /** Listeners also form a stack through the {@link #listeners} field. */ 276 private static final class Listener { 277 static final Listener TOMBSTONE = new Listener(); 278 @CheckForNull // null only for TOMBSTONE 279 final Runnable task; 280 @CheckForNull // null only for TOMBSTONE 281 final Executor executor; 282 283 // writes to next are made visible by subsequent CAS's on the listeners field 284 @CheckForNull Listener next; 285 286 Listener(Runnable task, Executor executor) { 287 this.task = task; 288 this.executor = executor; 289 } 290 291 Listener() { 292 this.task = null; 293 this.executor = null; 294 } 295 } 296 297 /** A special value to represent {@code null}. */ 298 private static final Object NULL = new Object(); 299 300 /** A special value to represent failure, when {@link #setException} is called successfully. */ 301 private static final class Failure { 302 static final Failure FALLBACK_INSTANCE = 303 new Failure( 304 new Throwable("Failure occurred while trying to finish a future.") { 305 @Override 306 public synchronized Throwable fillInStackTrace() { 307 return this; // no stack trace 308 } 309 }); 310 final Throwable exception; 311 312 Failure(Throwable exception) { 313 this.exception = checkNotNull(exception); 314 } 315 } 316 317 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 318 private static final class Cancellation { 319 // constants to use when GENERATE_CANCELLATION_CAUSES = false 320 @CheckForNull static final Cancellation CAUSELESS_INTERRUPTED; 321 @CheckForNull static final Cancellation CAUSELESS_CANCELLED; 322 323 static { 324 if (GENERATE_CANCELLATION_CAUSES) { 325 CAUSELESS_CANCELLED = null; 326 CAUSELESS_INTERRUPTED = null; 327 } else { 328 CAUSELESS_CANCELLED = new Cancellation(false, null); 329 CAUSELESS_INTERRUPTED = new Cancellation(true, null); 330 } 331 } 332 333 final boolean wasInterrupted; 334 @CheckForNull final Throwable cause; 335 336 Cancellation(boolean wasInterrupted, @CheckForNull Throwable cause) { 337 this.wasInterrupted = wasInterrupted; 338 this.cause = cause; 339 } 340 } 341 342 /** A special value that encodes the 'setFuture' state. */ 343 private static final class SetFuture<V extends @Nullable Object> implements Runnable { 344 final AbstractFuture<V> owner; 345 final ListenableFuture<? extends V> future; 346 347 SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) { 348 this.owner = owner; 349 this.future = future; 350 } 351 352 @Override 353 public void run() { 354 if (owner.value != this) { 355 // nothing to do, we must have been cancelled, don't bother inspecting the future. 356 return; 357 } 358 Object valueToSet = getFutureValue(future); 359 if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { 360 complete( 361 owner, 362 /* 363 * Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so 364 * don't invoke interruptTask. 365 */ 366 false); 367 } 368 } 369 } 370 371 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 372 // available. 373 /** 374 * This field encodes the current state of the future. 375 * 376 * <p>The valid values are: 377 * 378 * <ul> 379 * <li>{@code null} initial state, nothing has happened. 380 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 381 * <li>{@link Failure} terminal state, {@code setException} was called. 382 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 383 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 384 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null 385 * argument. 386 * </ul> 387 */ 388 @CheckForNull private volatile Object value; 389 390 /** All listeners. */ 391 @CheckForNull private volatile Listener listeners; 392 393 /** All waiting threads. */ 394 @CheckForNull private volatile Waiter waiters; 395 396 /** Constructor for use by subclasses. */ 397 protected AbstractFuture() {} 398 399 // Gets and Timed Gets 400 // 401 // * Be responsive to interruption 402 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 403 // waiters field. 404 // * Future completion is defined by when #value becomes non-null/non SetFuture 405 // * Future completion can be observed if the waiters field contains a TOMBSTONE 406 407 // Timed Get 408 // There are a few design constraints to consider 409 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 410 // have observed 12 micros on 64-bit linux systems to wake up a parked thread). So if the 411 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 412 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 413 // similar purposes. 414 // * We want to behave reasonably for timeouts of 0 415 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 416 // system scheduling and as such we could either miss our deadline, or unpark() could be delayed 417 // so that it looks like we timed out even though we didn't. For comparison FutureTask respects 418 // completion preferably and AQS is non-deterministic (depends on where in the queue the waiter 419 // is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node 420 // and we could use that to make a decision about whether or not we timed out prior to being 421 // unparked. 422 423 /** 424 * {@inheritDoc} 425 * 426 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 427 * current thread is interrupted during the call, even if the value is already available. 428 * 429 * @throws CancellationException {@inheritDoc} 430 */ 431 @CanIgnoreReturnValue 432 @Override 433 @ParametricNullness 434 public V get(long timeout, TimeUnit unit) 435 throws InterruptedException, TimeoutException, ExecutionException { 436 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop 437 // at the bottom and throw a timeoutexception. 438 final long timeoutNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit. 439 long remainingNanos = timeoutNanos; 440 if (Thread.interrupted()) { 441 throw new InterruptedException(); 442 } 443 Object localValue = value; 444 if (localValue != null & !(localValue instanceof SetFuture)) { 445 return getDoneValue(localValue); 446 } 447 // we delay calling nanoTime until we know we will need to either park or spin 448 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 449 long_wait_loop: 450 if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 451 Waiter oldHead = waiters; 452 if (oldHead != Waiter.TOMBSTONE) { 453 Waiter node = new Waiter(); 454 do { 455 node.setNext(oldHead); 456 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 457 while (true) { 458 OverflowAvoidingLockSupport.parkNanos(this, remainingNanos); 459 // Check interruption first, if we woke up due to interruption we need to honor that. 460 if (Thread.interrupted()) { 461 removeWaiter(node); 462 throw new InterruptedException(); 463 } 464 465 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 466 // wakeup 467 localValue = value; 468 if (localValue != null & !(localValue instanceof SetFuture)) { 469 return getDoneValue(localValue); 470 } 471 472 // timed out? 473 remainingNanos = endNanos - System.nanoTime(); 474 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 475 // Remove the waiter, one way or another we are done parking this thread. 476 removeWaiter(node); 477 break long_wait_loop; // jump down to the busy wait loop 478 } 479 } 480 } 481 oldHead = waiters; // re-read and loop. 482 } while (oldHead != Waiter.TOMBSTONE); 483 } 484 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 485 // waiter. 486 // requireNonNull is safe because value is always set before TOMBSTONE. 487 return getDoneValue(requireNonNull(value)); 488 } 489 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the 490 // waiters list 491 while (remainingNanos > 0) { 492 localValue = value; 493 if (localValue != null & !(localValue instanceof SetFuture)) { 494 return getDoneValue(localValue); 495 } 496 if (Thread.interrupted()) { 497 throw new InterruptedException(); 498 } 499 remainingNanos = endNanos - System.nanoTime(); 500 } 501 502 String futureToString = toString(); 503 final String unitString = unit.toString().toLowerCase(Locale.ROOT); 504 String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT); 505 // Only report scheduling delay if larger than our spin threshold - otherwise it's just noise 506 if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) { 507 // We over-waited for our timeout. 508 message += " (plus "; 509 long overWaitNanos = -remainingNanos; 510 long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS); 511 long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits); 512 boolean shouldShowExtraNanos = 513 overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS; 514 if (overWaitUnits > 0) { 515 message += overWaitUnits + " " + unitString; 516 if (shouldShowExtraNanos) { 517 message += ","; 518 } 519 message += " "; 520 } 521 if (shouldShowExtraNanos) { 522 message += overWaitLeftoverNanos + " nanoseconds "; 523 } 524 525 message += "delay)"; 526 } 527 // It's confusing to see a completed future in a timeout message; if isDone() returns false, 528 // then we know it must have given a pending toString value earlier. If not, then the future 529 // completed after the timeout expired, and the message might be success. 530 if (isDone()) { 531 throw new TimeoutException(message + " but future completed as timeout expired"); 532 } 533 throw new TimeoutException(message + " for " + futureToString); 534 } 535 536 /** 537 * {@inheritDoc} 538 * 539 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 540 * current thread is interrupted during the call, even if the value is already available. 541 * 542 * @throws CancellationException {@inheritDoc} 543 */ 544 @CanIgnoreReturnValue 545 @Override 546 @ParametricNullness 547 public V get() throws InterruptedException, ExecutionException { 548 if (Thread.interrupted()) { 549 throw new InterruptedException(); 550 } 551 Object localValue = value; 552 if (localValue != null & !(localValue instanceof SetFuture)) { 553 return getDoneValue(localValue); 554 } 555 Waiter oldHead = waiters; 556 if (oldHead != Waiter.TOMBSTONE) { 557 Waiter node = new Waiter(); 558 do { 559 node.setNext(oldHead); 560 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 561 // we are on the stack, now wait for completion. 562 while (true) { 563 LockSupport.park(this); 564 // Check interruption first, if we woke up due to interruption we need to honor that. 565 if (Thread.interrupted()) { 566 removeWaiter(node); 567 throw new InterruptedException(); 568 } 569 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 570 // wakeup 571 localValue = value; 572 if (localValue != null & !(localValue instanceof SetFuture)) { 573 return getDoneValue(localValue); 574 } 575 } 576 } 577 oldHead = waiters; // re-read and loop. 578 } while (oldHead != Waiter.TOMBSTONE); 579 } 580 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 581 // waiter. 582 // requireNonNull is safe because value is always set before TOMBSTONE. 583 return getDoneValue(requireNonNull(value)); 584 } 585 586 /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ 587 @ParametricNullness 588 private V getDoneValue(Object obj) throws ExecutionException { 589 // While this seems like it might be too branch-y, simple benchmarking proves it to be 590 // unmeasurable (comparing done AbstractFutures with immediateFuture) 591 if (obj instanceof Cancellation) { 592 throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); 593 } else if (obj instanceof Failure) { 594 throw new ExecutionException(((Failure) obj).exception); 595 } else if (obj == NULL) { 596 /* 597 * It's safe to return null because we would only have stored it in the first place if it were 598 * a valid value for V. 599 */ 600 return uncheckedNull(); 601 } else { 602 @SuppressWarnings("unchecked") // this is the only other option 603 V asV = (V) obj; 604 return asV; 605 } 606 } 607 608 @Override 609 public boolean isDone() { 610 final Object localValue = value; 611 return localValue != null & !(localValue instanceof SetFuture); 612 } 613 614 @Override 615 public boolean isCancelled() { 616 final Object localValue = value; 617 return localValue instanceof Cancellation; 618 } 619 620 /** 621 * {@inheritDoc} 622 * 623 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain 624 * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate 625 * {@code Future} that was supplied in the {@code setFuture} call. 626 * 627 * <p>Rather than override this method to perform additional cancellation work or cleanup, 628 * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link 629 * #wasInterrupted} as necessary. This ensures that the work is done even if the future is 630 * cancelled without a call to {@code cancel}, such as by calling {@code 631 * setFuture(cancelledFuture)}. 632 * 633 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 634 * acquire other locks, risking deadlocks. 635 */ 636 @CanIgnoreReturnValue 637 @Override 638 public boolean cancel(boolean mayInterruptIfRunning) { 639 Object localValue = value; 640 boolean rValue = false; 641 if (localValue == null | localValue instanceof SetFuture) { 642 // Try to delay allocating the exception. At this point we may still lose the CAS, but it is 643 // certainly less likely. 644 Object valueToSet = 645 GENERATE_CANCELLATION_CAUSES 646 ? new Cancellation( 647 mayInterruptIfRunning, new CancellationException("Future.cancel() was called.")) 648 /* 649 * requireNonNull is safe because we've initialized these if 650 * !GENERATE_CANCELLATION_CAUSES. 651 * 652 * TODO(cpovirk): Maybe it would be cleaner to define a CancellationSupplier interface 653 * with two implementations, one that contains causeless Cancellation instances and 654 * the other of which creates new Cancellation instances each time it's called? Yet 655 * another alternative is to fill in a non-null value for each of the fields no matter 656 * what and to just not use it if !GENERATE_CANCELLATION_CAUSES. 657 */ 658 : requireNonNull( 659 mayInterruptIfRunning 660 ? Cancellation.CAUSELESS_INTERRUPTED 661 : Cancellation.CAUSELESS_CANCELLED); 662 AbstractFuture<?> abstractFuture = this; 663 while (true) { 664 if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { 665 rValue = true; 666 complete(abstractFuture, mayInterruptIfRunning); 667 if (localValue instanceof SetFuture) { 668 // propagate cancellation to the future set in setfuture, this is racy, and we don't 669 // care if we are successful or not. 670 ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; 671 if (futureToPropagateTo instanceof Trusted) { 672 // If the future is a TrustedFuture then we specifically avoid calling cancel() 673 // this has 2 benefits 674 // 1. for long chains of futures strung together with setFuture we consume less stack 675 // 2. we avoid allocating Cancellation objects at every level of the cancellation 676 // chain 677 // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and 678 // does nothing but delegate to this method. 679 AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo; 680 localValue = trusted.value; 681 if (localValue == null | localValue instanceof SetFuture) { 682 abstractFuture = trusted; 683 continue; // loop back up and try to complete the new future 684 } 685 } else { 686 // not a TrustedFuture, call cancel directly. 687 futureToPropagateTo.cancel(mayInterruptIfRunning); 688 } 689 } 690 break; 691 } 692 // obj changed, reread 693 localValue = abstractFuture.value; 694 if (!(localValue instanceof SetFuture)) { 695 // obj cannot be null at this point, because value can only change from null to non-null. 696 // So if value changed (and it did since we lost the CAS), then it cannot be null and 697 // since it isn't a SetFuture, then the future must be done and we should exit the loop 698 break; 699 } 700 } 701 } 702 return rValue; 703 } 704 705 /** 706 * Subclasses can override this method to implement interruption of the future's computation. The 707 * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}. 708 * 709 * <p>The default implementation does nothing. 710 * 711 * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, consulting 712 * {@link #wasInterrupted} to decide whether to interrupt your task. 713 * 714 * @since 10.0 715 */ 716 protected void interruptTask() {} 717 718 /** 719 * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code 720 * true}. 721 * 722 * @since 14.0 723 */ 724 protected final boolean wasInterrupted() { 725 final Object localValue = value; 726 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 727 } 728 729 /** 730 * {@inheritDoc} 731 * 732 * @since 10.0 733 */ 734 @Override 735 public void addListener(Runnable listener, Executor executor) { 736 checkNotNull(listener, "Runnable was null."); 737 checkNotNull(executor, "Executor was null."); 738 // Checking isDone and listeners != TOMBSTONE may seem redundant, but our contract for 739 // addListener says that listeners execute 'immediate' if the future isDone(). However, our 740 // protocol for completing a future is to assign the value field (which sets isDone to true) and 741 // then to release waiters, followed by executing afterDone(), followed by releasing listeners. 742 // That means that it is possible to observe that the future isDone and that your listeners 743 // don't execute 'immediately'. By checking isDone here we avoid that. 744 // A corollary to all that is that we don't need to check isDone inside the loop because if we 745 // get into the loop we know that we weren't done when we entered and therefore we aren't under 746 // an obligation to execute 'immediately'. 747 if (!isDone()) { 748 Listener oldHead = listeners; 749 if (oldHead != Listener.TOMBSTONE) { 750 Listener newNode = new Listener(listener, executor); 751 do { 752 newNode.next = oldHead; 753 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 754 return; 755 } 756 oldHead = listeners; // re-read 757 } while (oldHead != Listener.TOMBSTONE); 758 } 759 } 760 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 761 // the listener. 762 executeListener(listener, executor); 763 } 764 765 /** 766 * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or 767 * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns, 768 * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was 769 * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code 770 * Future} may have previously been set asynchronously, in which case its result may not be known 771 * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 772 * method, only by a call to {@link #cancel}. 773 * 774 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 775 * acquire other locks, risking deadlocks. 776 * 777 * @param value the value to be used as the result 778 * @return true if the attempt was accepted, completing the {@code Future} 779 */ 780 @CanIgnoreReturnValue 781 protected boolean set(@ParametricNullness V value) { 782 Object valueToSet = value == null ? NULL : value; 783 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 784 complete(this, /*callInterruptTask=*/ false); 785 return true; 786 } 787 return false; 788 } 789 790 /** 791 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 792 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 793 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> 794 * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the 795 * {@code Future} may have previously been set asynchronously, in which case its result may not be 796 * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 797 * method, only by a call to {@link #cancel}. 798 * 799 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 800 * acquire other locks, risking deadlocks. 801 * 802 * @param throwable the exception to be used as the failed result 803 * @return true if the attempt was accepted, completing the {@code Future} 804 */ 805 @CanIgnoreReturnValue 806 protected boolean setException(Throwable throwable) { 807 Object valueToSet = new Failure(checkNotNull(throwable)); 808 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 809 complete(this, /*callInterruptTask=*/ false); 810 return true; 811 } 812 return false; 813 } 814 815 /** 816 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 817 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 818 * (including "set asynchronously," defined below). 819 * 820 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call 821 * is accepted, then this future is guaranteed to have been completed with the supplied future by 822 * the time this method returns. If the supplied future is not done and the call is accepted, then 823 * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known, 824 * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}. 825 * 826 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 827 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 828 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 829 * Future}. 830 * 831 * <p>Note that, even if the supplied future is cancelled and it causes this future to complete, 832 * it will never trigger interruption behavior. In particular, it will not cause this future to 833 * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not 834 * return {@code true}. 835 * 836 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 837 * acquire other locks, risking deadlocks. 838 * 839 * @param future the future to delegate to 840 * @return true if the attempt was accepted, indicating that the {@code Future} was not previously 841 * cancelled or set. 842 * @since 19.0 843 */ 844 @CanIgnoreReturnValue 845 protected boolean setFuture(ListenableFuture<? extends V> future) { 846 checkNotNull(future); 847 Object localValue = value; 848 if (localValue == null) { 849 if (future.isDone()) { 850 Object value = getFutureValue(future); 851 if (ATOMIC_HELPER.casValue(this, null, value)) { 852 complete( 853 this, 854 /* 855 * Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so 856 * don't invoke interruptTask. 857 */ 858 false); 859 return true; 860 } 861 return false; 862 } 863 SetFuture<V> valueToSet = new SetFuture<V>(this, future); 864 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 865 // the listener is responsible for calling completeWithFuture, directExecutor is appropriate 866 // since all we are doing is unpacking a completed future which should be fast. 867 try { 868 future.addListener(valueToSet, DirectExecutor.INSTANCE); 869 } catch (Throwable t) { 870 // Any Exception is either a RuntimeException or sneaky checked exception. 871 // 872 // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this 873 // must have been caused by addListener itself. The most likely explanation is a 874 // misconfigured mock. Try to switch to Failure. 875 Failure failure; 876 try { 877 failure = new Failure(t); 878 } catch (Exception | Error oomMostLikely) { // sneaky checked exception 879 failure = Failure.FALLBACK_INSTANCE; 880 } 881 // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok. 882 boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); 883 } 884 return true; 885 } 886 localValue = value; // we lost the cas, fall through and maybe cancel 887 } 888 // The future has already been set to something. If it is cancellation we should cancel the 889 // incoming future. 890 if (localValue instanceof Cancellation) { 891 // we don't care if it fails, this is best-effort. 892 future.cancel(((Cancellation) localValue).wasInterrupted); 893 } 894 return false; 895 } 896 897 /** 898 * Returns a value that satisfies the contract of the {@link #value} field based on the state of 899 * given future. 900 * 901 * <p>This is approximately the inverse of {@link #getDoneValue(Object)} 902 */ 903 private static Object getFutureValue(ListenableFuture<?> future) { 904 if (future instanceof Trusted) { 905 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 906 // override .get() (since it is final) and therefore this is equivalent to calling .get() 907 // and unpacking the exceptions like we do below (just much faster because it is a single 908 // field read instead of a read, several branches and possibly creating exceptions). 909 Object v = ((AbstractFuture<?>) future).value; 910 if (v instanceof Cancellation) { 911 // If the other future was interrupted, clear the interrupted bit while preserving the cause 912 // this will make it consistent with how non-trustedfutures work which cannot propagate the 913 // wasInterrupted bit 914 Cancellation c = (Cancellation) v; 915 if (c.wasInterrupted) { 916 v = 917 c.cause != null 918 ? new Cancellation(/* wasInterrupted= */ false, c.cause) 919 : Cancellation.CAUSELESS_CANCELLED; 920 } 921 } 922 // requireNonNull is safe as long as we call this method only on completed futures. 923 return requireNonNull(v); 924 } 925 if (future instanceof InternalFutureFailureAccess) { 926 Throwable throwable = 927 InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess) future); 928 if (throwable != null) { 929 return new Failure(throwable); 930 } 931 } 932 boolean wasCancelled = future.isCancelled(); 933 // Don't allocate a CancellationException if it's not necessary 934 if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) { 935 /* 936 * requireNonNull is safe because we've initialized CAUSELESS_CANCELLED if 937 * !GENERATE_CANCELLATION_CAUSES. 938 */ 939 return requireNonNull(Cancellation.CAUSELESS_CANCELLED); 940 } 941 // Otherwise calculate the value by calling .get() 942 try { 943 Object v = getUninterruptibly(future); 944 if (wasCancelled) { 945 return new Cancellation( 946 false, 947 new IllegalArgumentException( 948 "get() did not throw CancellationException, despite reporting " 949 + "isCancelled() == true: " 950 + future)); 951 } 952 return v == null ? NULL : v; 953 } catch (ExecutionException exception) { 954 if (wasCancelled) { 955 return new Cancellation( 956 false, 957 new IllegalArgumentException( 958 "get() did not throw CancellationException, despite reporting " 959 + "isCancelled() == true: " 960 + future, 961 exception)); 962 } 963 return new Failure(exception.getCause()); 964 } catch (CancellationException cancellation) { 965 if (!wasCancelled) { 966 return new Failure( 967 new IllegalArgumentException( 968 "get() threw CancellationException, despite reporting isCancelled() == false: " 969 + future, 970 cancellation)); 971 } 972 return new Cancellation(false, cancellation); 973 } catch (Exception | Error t) { // sneaky checked exception 974 return new Failure(t); 975 } 976 } 977 978 /** 979 * An inlined private copy of {@link Uninterruptibles#getUninterruptibly} used to break an 980 * internal dependency on other /util/concurrent classes. 981 */ 982 @ParametricNullness 983 private static <V extends @Nullable Object> V getUninterruptibly(Future<V> future) 984 throws ExecutionException { 985 boolean interrupted = false; 986 try { 987 while (true) { 988 try { 989 return future.get(); 990 } catch (InterruptedException e) { 991 interrupted = true; 992 } 993 } 994 } finally { 995 if (interrupted) { 996 Thread.currentThread().interrupt(); 997 } 998 } 999 } 1000 1001 /** Unblocks all threads and runs all listeners. */ 1002 private static void complete(AbstractFuture<?> param, boolean callInterruptTask) { 1003 // Declare a "true" local variable so that the Checker Framework will infer nullness. 1004 AbstractFuture<?> future = param; 1005 1006 Listener next = null; 1007 outer: 1008 while (true) { 1009 future.releaseWaiters(); 1010 /* 1011 * We call interruptTask() immediately before afterDone() so that migrating between the two 1012 * can be a no-op. 1013 */ 1014 if (callInterruptTask) { 1015 future.interruptTask(); 1016 /* 1017 * Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so don't 1018 * invoke interruptTask on any subsequent futures. 1019 */ 1020 callInterruptTask = false; 1021 } 1022 // We call this before the listeners in order to avoid needing to manage a separate stack data 1023 // structure for them. Also, some implementations rely on this running prior to listeners 1024 // so that the cleanup work is visible to listeners. 1025 // afterDone() should be generally fast and only used for cleanup work... but in theory can 1026 // also be recursive and create StackOverflowErrors 1027 future.afterDone(); 1028 // push the current set of listeners onto next 1029 next = future.clearListeners(next); 1030 future = null; 1031 while (next != null) { 1032 Listener curr = next; 1033 next = next.next; 1034 /* 1035 * requireNonNull is safe because the listener stack never contains TOMBSTONE until after 1036 * clearListeners. 1037 */ 1038 Runnable task = requireNonNull(curr.task); 1039 if (task instanceof SetFuture) { 1040 SetFuture<?> setFuture = (SetFuture<?>) task; 1041 // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long 1042 // chains of SetFutures 1043 // Handling this special case is important because there is no way to pass an executor to 1044 // setFuture, so a user couldn't break the chain by doing this themselves. It is also 1045 // potentially common if someone writes a recursive Futures.transformAsync transformer. 1046 future = setFuture.owner; 1047 if (future.value == setFuture) { 1048 Object valueToSet = getFutureValue(setFuture.future); 1049 if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { 1050 continue outer; 1051 } 1052 } 1053 // otherwise the future we were trying to set is already done. 1054 } else { 1055 /* 1056 * requireNonNull is safe because the listener stack never contains TOMBSTONE until after 1057 * clearListeners. 1058 */ 1059 executeListener(task, requireNonNull(curr.executor)); 1060 } 1061 } 1062 break; 1063 } 1064 } 1065 1066 /** 1067 * Callback method that is called exactly once after the future is completed. 1068 * 1069 * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. 1070 * 1071 * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is 1072 * intended for very lightweight cleanup work, for example, timing statistics or clearing fields. 1073 * If your task does anything heavier consider, just using a listener with an executor. 1074 * 1075 * @since 20.0 1076 */ 1077 @ForOverride 1078 protected void afterDone() {} 1079 1080 // TODO(b/114236866): Inherit doc from InternalFutureFailureAccess. Also, -link to its URL. 1081 /** 1082 * Usually returns {@code null} but, if this {@code Future} has failed, may <i>optionally</i> 1083 * return the cause of the failure. "Failure" means specifically "completed with an exception"; it 1084 * does not include "was cancelled." To be explicit: If this method returns a non-null value, 1085 * then: 1086 * 1087 * <ul> 1088 * <li>{@code isDone()} must return {@code true} 1089 * <li>{@code isCancelled()} must return {@code false} 1090 * <li>{@code get()} must not block, and it must throw an {@code ExecutionException} with the 1091 * return value of this method as its cause 1092 * </ul> 1093 * 1094 * <p>This method is {@code protected} so that classes like {@code 1095 * com.google.common.util.concurrent.SettableFuture} do not expose it to their users as an 1096 * instance method. In the unlikely event that you need to call this method, call {@link 1097 * InternalFutures#tryInternalFastPathGetFailure(InternalFutureFailureAccess)}. 1098 * 1099 * @since 27.0 1100 */ 1101 @Override 1102 /* 1103 * We should annotate the superclass, InternalFutureFailureAccess, to say that its copy of this 1104 * method returns @Nullable, too. However, we're not sure if we want to make any changes to that 1105 * class, since it's in a separate artifact that we planned to release only a single version of. 1106 */ 1107 @CheckForNull 1108 protected final Throwable tryInternalFastPathGetFailure() { 1109 if (this instanceof Trusted) { 1110 Object obj = value; 1111 if (obj instanceof Failure) { 1112 return ((Failure) obj).exception; 1113 } 1114 } 1115 return null; 1116 } 1117 1118 /** 1119 * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts) 1120 * the given future (if available). 1121 */ 1122 final void maybePropagateCancellationTo(@CheckForNull Future<?> related) { 1123 if (related != null & isCancelled()) { 1124 related.cancel(wasInterrupted()); 1125 } 1126 } 1127 1128 /** Releases all threads in the {@link #waiters} list, and clears the list. */ 1129 private void releaseWaiters() { 1130 Waiter head = ATOMIC_HELPER.gasWaiters(this, Waiter.TOMBSTONE); 1131 for (Waiter currentWaiter = head; currentWaiter != null; currentWaiter = currentWaiter.next) { 1132 currentWaiter.unpark(); 1133 } 1134 } 1135 1136 /** 1137 * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently 1138 * added first. 1139 */ 1140 @CheckForNull 1141 private Listener clearListeners(@CheckForNull Listener onto) { 1142 // We need to 1143 // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that 1144 // to synchronize with us 1145 // 2. reverse the linked list, because despite our rather clear contract, people depend on us 1146 // executing listeners in the order they were added 1147 // 3. push all the items onto 'onto' and return the new head of the stack 1148 Listener head = ATOMIC_HELPER.gasListeners(this, Listener.TOMBSTONE); 1149 Listener reversedList = onto; 1150 while (head != null) { 1151 Listener tmp = head; 1152 head = head.next; 1153 tmp.next = reversedList; 1154 reversedList = tmp; 1155 } 1156 return reversedList; 1157 } 1158 1159 // TODO(user): move parts into a default method on ListenableFuture? 1160 @Override 1161 public String toString() { 1162 // TODO(cpovirk): Presize to something plausible? 1163 StringBuilder builder = new StringBuilder(); 1164 if (getClass().getName().startsWith("com.google.common.util.concurrent.")) { 1165 builder.append(getClass().getSimpleName()); 1166 } else { 1167 builder.append(getClass().getName()); 1168 } 1169 builder.append('@').append(toHexString(identityHashCode(this))).append("[status="); 1170 if (isCancelled()) { 1171 builder.append("CANCELLED"); 1172 } else if (isDone()) { 1173 addDoneString(builder); 1174 } else { 1175 addPendingString(builder); // delegates to addDoneString if future completes midway 1176 } 1177 return builder.append("]").toString(); 1178 } 1179 1180 /** 1181 * Provide a human-readable explanation of why this future has not yet completed. 1182 * 1183 * @return null if an explanation cannot be provided (e.g. because the future is done). 1184 * @since 23.0 1185 */ 1186 @CheckForNull 1187 protected String pendingToString() { 1188 // TODO(diamondm) consider moving this into addPendingString so it's always in the output 1189 if (this instanceof ScheduledFuture) { 1190 return "remaining delay=[" 1191 + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS) 1192 + " ms]"; 1193 } 1194 return null; 1195 } 1196 1197 @SuppressWarnings("CatchingUnchecked") // sneaky checked exception 1198 private void addPendingString(StringBuilder builder) { 1199 // Capture current builder length so it can be truncated if this future ends up completing while 1200 // the toString is being calculated 1201 int truncateLength = builder.length(); 1202 1203 builder.append("PENDING"); 1204 1205 Object localValue = value; 1206 if (localValue instanceof SetFuture) { 1207 builder.append(", setFuture=["); 1208 appendUserObject(builder, ((SetFuture) localValue).future); 1209 builder.append("]"); 1210 } else { 1211 String pendingDescription; 1212 try { 1213 pendingDescription = Strings.emptyToNull(pendingToString()); 1214 } catch (Exception | StackOverflowError e) { 1215 // Any Exception is either a RuntimeException or sneaky checked exception. 1216 // 1217 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1218 // subclass is implemented with bugs similar to the subclass. 1219 pendingDescription = "Exception thrown from implementation: " + e.getClass(); 1220 } 1221 if (pendingDescription != null) { 1222 builder.append(", info=[").append(pendingDescription).append("]"); 1223 } 1224 } 1225 1226 // The future may complete while calculating the toString, so we check once more to see if the 1227 // future is done 1228 if (isDone()) { 1229 // Truncate anything that was appended before realizing this future is done 1230 builder.delete(truncateLength, builder.length()); 1231 addDoneString(builder); 1232 } 1233 } 1234 1235 @SuppressWarnings("CatchingUnchecked") // sneaky checked exception 1236 private void addDoneString(StringBuilder builder) { 1237 try { 1238 V value = getUninterruptibly(this); 1239 builder.append("SUCCESS, result=["); 1240 appendResultObject(builder, value); 1241 builder.append("]"); 1242 } catch (ExecutionException e) { 1243 builder.append("FAILURE, cause=[").append(e.getCause()).append("]"); 1244 } catch (CancellationException e) { 1245 builder.append("CANCELLED"); // shouldn't be reachable 1246 } catch (Exception e) { // sneaky checked exception 1247 builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]"); 1248 } 1249 } 1250 1251 /** 1252 * Any object can be the result of a Future, and not every object has a reasonable toString() 1253 * implementation. Using a reconstruction of the default Object.toString() prevents OOMs and stack 1254 * overflows, and helps avoid sensitive data inadvertently ending up in exception messages. 1255 */ 1256 private void appendResultObject(StringBuilder builder, @CheckForNull Object o) { 1257 if (o == null) { 1258 builder.append("null"); 1259 } else if (o == this) { 1260 builder.append("this future"); 1261 } else { 1262 builder 1263 .append(o.getClass().getName()) 1264 .append("@") 1265 .append(Integer.toHexString(System.identityHashCode(o))); 1266 } 1267 } 1268 1269 /** Helper for printing user supplied objects into our toString method. */ 1270 @SuppressWarnings("CatchingUnchecked") // sneaky checked exception 1271 private void appendUserObject(StringBuilder builder, @CheckForNull Object o) { 1272 // This is some basic recursion detection for when people create cycles via set/setFuture or 1273 // when deep chains of futures exist resulting in a StackOverflowException. We could detect 1274 // arbitrary cycles using a thread local but this should be a good enough solution (it is also 1275 // what jdk collections do in these cases) 1276 try { 1277 if (o == this) { 1278 builder.append("this future"); 1279 } else { 1280 builder.append(o); 1281 } 1282 } catch (Exception | StackOverflowError e) { 1283 // Any Exception is either a RuntimeException or sneaky checked exception. 1284 // 1285 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1286 // user object is implemented with bugs similar to the user object. 1287 builder.append("Exception thrown from implementation: ").append(e.getClass()); 1288 } 1289 } 1290 1291 /** 1292 * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain 1293 * RuntimeException runtime exceptions} thrown by the executor. 1294 */ 1295 @SuppressWarnings("CatchingUnchecked") // sneaky checked exception 1296 private static void executeListener(Runnable runnable, Executor executor) { 1297 try { 1298 executor.execute(runnable); 1299 } catch (Exception e) { // sneaky checked exception 1300 // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if 1301 // we're given a bad one. We only catch RuntimeException because we want Errors to propagate 1302 // up. 1303 log.get() 1304 .log( 1305 Level.SEVERE, 1306 "RuntimeException while executing runnable " 1307 + runnable 1308 + " with executor " 1309 + executor, 1310 e); 1311 } 1312 } 1313 1314 private abstract static class AtomicHelper { 1315 /** Non-volatile write of the thread to the {@link Waiter#thread} field. */ 1316 abstract void putThread(Waiter waiter, Thread newValue); 1317 1318 /** Non-volatile write of the waiter to the {@link Waiter#next} field. */ 1319 abstract void putNext(Waiter waiter, @CheckForNull Waiter newValue); 1320 1321 /** Performs a CAS operation on the {@link #waiters} field. */ 1322 abstract boolean casWaiters( 1323 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update); 1324 1325 /** Performs a CAS operation on the {@link #listeners} field. */ 1326 abstract boolean casListeners( 1327 AbstractFuture<?> future, @CheckForNull Listener expect, Listener update); 1328 1329 /** Performs a GAS operation on the {@link #waiters} field. */ 1330 abstract Waiter gasWaiters(AbstractFuture<?> future, Waiter update); 1331 1332 /** Performs a GAS operation on the {@link #listeners} field. */ 1333 abstract Listener gasListeners(AbstractFuture<?> future, Listener update); 1334 1335 /** Performs a CAS operation on the {@link #value} field. */ 1336 abstract boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update); 1337 } 1338 1339 /** 1340 * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. 1341 * 1342 * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot 1343 * be accessed. 1344 */ 1345 @SuppressWarnings({"sunapi", "removal"}) // b/318391980 1346 private static final class UnsafeAtomicHelper extends AtomicHelper { 1347 static final sun.misc.Unsafe UNSAFE; 1348 static final long LISTENERS_OFFSET; 1349 static final long WAITERS_OFFSET; 1350 static final long VALUE_OFFSET; 1351 static final long WAITER_THREAD_OFFSET; 1352 static final long WAITER_NEXT_OFFSET; 1353 1354 static { 1355 sun.misc.Unsafe unsafe = null; 1356 try { 1357 unsafe = sun.misc.Unsafe.getUnsafe(); 1358 } catch (SecurityException tryReflectionInstead) { 1359 try { 1360 unsafe = 1361 AccessController.doPrivileged( 1362 new PrivilegedExceptionAction<sun.misc.Unsafe>() { 1363 @Override 1364 public sun.misc.Unsafe run() throws Exception { 1365 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; 1366 for (java.lang.reflect.Field f : k.getDeclaredFields()) { 1367 f.setAccessible(true); 1368 Object x = f.get(null); 1369 if (k.isInstance(x)) { 1370 return k.cast(x); 1371 } 1372 } 1373 throw new NoSuchFieldError("the Unsafe"); 1374 } 1375 }); 1376 } catch (PrivilegedActionException e) { 1377 throw new RuntimeException("Could not initialize intrinsics", e.getCause()); 1378 } 1379 } 1380 try { 1381 Class<?> abstractFuture = AbstractFuture.class; 1382 WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters")); 1383 LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners")); 1384 VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value")); 1385 WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread")); 1386 WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next")); 1387 UNSAFE = unsafe; 1388 } catch (NoSuchFieldException e) { 1389 throw new RuntimeException(e); 1390 } 1391 } 1392 1393 @Override 1394 void putThread(Waiter waiter, Thread newValue) { 1395 UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue); 1396 } 1397 1398 @Override 1399 void putNext(Waiter waiter, @CheckForNull Waiter newValue) { 1400 UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue); 1401 } 1402 1403 /** Performs a CAS operation on the {@link #waiters} field. */ 1404 @Override 1405 boolean casWaiters( 1406 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update) { 1407 return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update); 1408 } 1409 1410 /** Performs a CAS operation on the {@link #listeners} field. */ 1411 @Override 1412 boolean casListeners(AbstractFuture<?> future, @CheckForNull Listener expect, Listener update) { 1413 return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update); 1414 } 1415 1416 /** Performs a GAS operation on the {@link #listeners} field. */ 1417 @Override 1418 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1419 while (true) { 1420 Listener listener = future.listeners; 1421 if (update == listener) { 1422 return listener; 1423 } 1424 if (casListeners(future, listener, update)) { 1425 return listener; 1426 } 1427 } 1428 } 1429 1430 /** Performs a GAS operation on the {@link #waiters} field. */ 1431 @Override 1432 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1433 while (true) { 1434 Waiter waiter = future.waiters; 1435 if (update == waiter) { 1436 return waiter; 1437 } 1438 if (casWaiters(future, waiter, update)) { 1439 return waiter; 1440 } 1441 } 1442 } 1443 1444 /** Performs a CAS operation on the {@link #value} field. */ 1445 @Override 1446 boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update) { 1447 return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update); 1448 } 1449 } 1450 1451 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 1452 @SuppressWarnings("rawtypes") 1453 private static final class SafeAtomicHelper extends AtomicHelper { 1454 final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; 1455 final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; 1456 final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater; 1457 final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater; 1458 final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater; 1459 1460 SafeAtomicHelper( 1461 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, 1462 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, 1463 AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, 1464 AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, 1465 AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) { 1466 this.waiterThreadUpdater = waiterThreadUpdater; 1467 this.waiterNextUpdater = waiterNextUpdater; 1468 this.waitersUpdater = waitersUpdater; 1469 this.listenersUpdater = listenersUpdater; 1470 this.valueUpdater = valueUpdater; 1471 } 1472 1473 @Override 1474 void putThread(Waiter waiter, Thread newValue) { 1475 waiterThreadUpdater.lazySet(waiter, newValue); 1476 } 1477 1478 @Override 1479 void putNext(Waiter waiter, @CheckForNull Waiter newValue) { 1480 waiterNextUpdater.lazySet(waiter, newValue); 1481 } 1482 1483 @Override 1484 boolean casWaiters( 1485 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update) { 1486 return waitersUpdater.compareAndSet(future, expect, update); 1487 } 1488 1489 @Override 1490 boolean casListeners(AbstractFuture<?> future, @CheckForNull Listener expect, Listener update) { 1491 return listenersUpdater.compareAndSet(future, expect, update); 1492 } 1493 1494 /** Performs a GAS operation on the {@link #listeners} field. */ 1495 @Override 1496 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1497 return listenersUpdater.getAndSet(future, update); 1498 } 1499 1500 /** Performs a GAS operation on the {@link #waiters} field. */ 1501 @Override 1502 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1503 return waitersUpdater.getAndSet(future, update); 1504 } 1505 1506 @Override 1507 boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update) { 1508 return valueUpdater.compareAndSet(future, expect, update); 1509 } 1510 } 1511 1512 /** 1513 * {@link AtomicHelper} based on {@code synchronized} and volatile writes. 1514 * 1515 * <p>This is an implementation of last resort for when certain basic VM features are broken (like 1516 * AtomicReferenceFieldUpdater). 1517 */ 1518 private static final class SynchronizedHelper extends AtomicHelper { 1519 @Override 1520 void putThread(Waiter waiter, Thread newValue) { 1521 waiter.thread = newValue; 1522 } 1523 1524 @Override 1525 void putNext(Waiter waiter, @CheckForNull Waiter newValue) { 1526 waiter.next = newValue; 1527 } 1528 1529 @Override 1530 boolean casWaiters( 1531 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update) { 1532 synchronized (future) { 1533 if (future.waiters == expect) { 1534 future.waiters = update; 1535 return true; 1536 } 1537 return false; 1538 } 1539 } 1540 1541 @Override 1542 boolean casListeners(AbstractFuture<?> future, @CheckForNull Listener expect, Listener update) { 1543 synchronized (future) { 1544 if (future.listeners == expect) { 1545 future.listeners = update; 1546 return true; 1547 } 1548 return false; 1549 } 1550 } 1551 1552 /** Performs a GAS operation on the {@link #listeners} field. */ 1553 @Override 1554 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1555 synchronized (future) { 1556 Listener old = future.listeners; 1557 if (old != update) { 1558 future.listeners = update; 1559 } 1560 return old; 1561 } 1562 } 1563 1564 /** Performs a GAS operation on the {@link #waiters} field. */ 1565 @Override 1566 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1567 synchronized (future) { 1568 Waiter old = future.waiters; 1569 if (old != update) { 1570 future.waiters = update; 1571 } 1572 return old; 1573 } 1574 } 1575 1576 @Override 1577 boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update) { 1578 synchronized (future) { 1579 if (future.value == expect) { 1580 future.value = update; 1581 return true; 1582 } 1583 return false; 1584 } 1585 } 1586 } 1587 1588 private static CancellationException cancellationExceptionWithCause( 1589 String message, @CheckForNull Throwable cause) { 1590 CancellationException exception = new CancellationException(message); 1591 exception.initCause(cause); 1592 return exception; 1593 } 1594}