001/* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.util.concurrent.NullnessCasts.uncheckedNull; 019import static java.lang.Integer.toHexString; 020import static java.lang.System.identityHashCode; 021import static java.util.Objects.requireNonNull; 022import static java.util.concurrent.TimeUnit.MILLISECONDS; 023import static java.util.concurrent.TimeUnit.NANOSECONDS; 024import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 025 026import com.google.common.annotations.GwtCompatible; 027import com.google.common.base.Strings; 028import com.google.common.util.concurrent.internal.InternalFutureFailureAccess; 029import com.google.common.util.concurrent.internal.InternalFutures; 030import com.google.errorprone.annotations.CanIgnoreReturnValue; 031import com.google.errorprone.annotations.ForOverride; 032import com.google.j2objc.annotations.ReflectionSupport; 033import com.google.j2objc.annotations.RetainedLocalRef; 034import java.lang.reflect.Field; 035import java.security.AccessController; 036import java.security.PrivilegedActionException; 037import java.security.PrivilegedExceptionAction; 038import java.util.Locale; 039import java.util.concurrent.CancellationException; 040import java.util.concurrent.ExecutionException; 041import java.util.concurrent.Executor; 042import java.util.concurrent.Future; 043import java.util.concurrent.ScheduledFuture; 044import java.util.concurrent.TimeUnit; 045import java.util.concurrent.TimeoutException; 046import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 047import java.util.concurrent.locks.LockSupport; 048import java.util.logging.Level; 049import org.jspecify.annotations.Nullable; 050import sun.misc.Unsafe; 051 052/** 053 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 054 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 055 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 056 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, 057 * com.google.common.base.Function, java.util.concurrent.Executor) Futures.transform} and {@link 058 * Futures#catching(ListenableFuture, Class, com.google.common.base.Function, 059 * java.util.concurrent.Executor) Futures.catching}. 060 * 061 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 062 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 063 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 064 * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses 065 * should rarely override other methods. 066 * 067 * @author Sven Mawson 068 * @author Luke Sandberg 069 * @since 1.0 070 */ 071@SuppressWarnings({ 072 // Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || 073 "ShortCircuitBoolean", 074 "nullness", // TODO(b/147136275): Remove once our checker understands & and |. 075}) 076@GwtCompatible(emulated = true) 077@ReflectionSupport(value = ReflectionSupport.Level.FULL) 078public abstract class AbstractFuture<V extends @Nullable Object> extends InternalFutureFailureAccess 079 implements ListenableFuture<V> { 080 static final boolean GENERATE_CANCELLATION_CAUSES; 081 082 static { 083 // System.getProperty may throw if the security policy does not permit access. 084 boolean generateCancellationCauses; 085 try { 086 generateCancellationCauses = 087 Boolean.parseBoolean( 088 System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); 089 } catch (SecurityException e) { 090 generateCancellationCauses = false; 091 } 092 GENERATE_CANCELLATION_CAUSES = generateCancellationCauses; 093 } 094 095 /** 096 * Tag interface marking trusted subclasses. This enables some optimizations. The implementation 097 * of this interface must also be an AbstractFuture and must not override or expose for overriding 098 * any of the public methods of ListenableFuture. 099 */ 100 interface Trusted<V extends @Nullable Object> extends ListenableFuture<V> {} 101 102 /** 103 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 104 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 105 */ 106 abstract static class TrustedFuture<V extends @Nullable Object> extends AbstractFuture<V> 107 implements Trusted<V> { 108 @CanIgnoreReturnValue 109 @Override 110 @ParametricNullness 111 public final V get() throws InterruptedException, ExecutionException { 112 return super.get(); 113 } 114 115 @CanIgnoreReturnValue 116 @Override 117 @ParametricNullness 118 public final V get(long timeout, TimeUnit unit) 119 throws InterruptedException, ExecutionException, TimeoutException { 120 return super.get(timeout, unit); 121 } 122 123 @Override 124 public final boolean isDone() { 125 return super.isDone(); 126 } 127 128 @Override 129 public final boolean isCancelled() { 130 return super.isCancelled(); 131 } 132 133 @Override 134 public final void addListener(Runnable listener, Executor executor) { 135 super.addListener(listener, executor); 136 } 137 138 @CanIgnoreReturnValue 139 @Override 140 public final boolean cancel(boolean mayInterruptIfRunning) { 141 return super.cancel(mayInterruptIfRunning); 142 } 143 } 144 145 static final LazyLogger log = new LazyLogger(AbstractFuture.class); 146 147 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 148 // blocking. This value is what AbstractQueuedSynchronizer uses. 149 private static final long SPIN_THRESHOLD_NANOS = 1000L; 150 151 private static final AtomicHelper ATOMIC_HELPER; 152 153 static { 154 AtomicHelper helper; 155 Throwable thrownUnsafeFailure = null; 156 Throwable thrownAtomicReferenceFieldUpdaterFailure = null; 157 158 try { 159 helper = new UnsafeAtomicHelper(); 160 } catch (Exception | Error unsafeFailure) { // sneaky checked exception 161 thrownUnsafeFailure = unsafeFailure; 162 // catch absolutely everything and fall through to our 163 // 'AtomicReferenceFieldUpdaterAtomicHelper' The access control checks that ARFU does means 164 // the caller class has to be AbstractFuture instead of 165 // AtomicReferenceFieldUpdaterAtomicHelper, so we annoyingly define these here 166 try { 167 helper = 168 new AtomicReferenceFieldUpdaterAtomicHelper( 169 newUpdater(Waiter.class, Thread.class, "thread"), 170 newUpdater(Waiter.class, Waiter.class, "next"), 171 newUpdater(AbstractFuture.class, Waiter.class, "waiters"), 172 newUpdater(AbstractFuture.class, Listener.class, "listeners"), 173 newUpdater(AbstractFuture.class, Object.class, "value")); 174 } catch (Exception // sneaky checked exception 175 | Error atomicReferenceFieldUpdaterFailure) { 176 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 177 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 178 // For these users fallback to a suboptimal implementation, based on synchronized. This will 179 // be a definite performance hit to those users. 180 thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure; 181 helper = new SynchronizedHelper(); 182 } 183 } 184 ATOMIC_HELPER = helper; 185 186 // Prevent rare disastrous classloading in first call to LockSupport.park. 187 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 188 @SuppressWarnings("unused") 189 Class<?> ensureLoaded = LockSupport.class; 190 191 // Log after all static init is finished; if an installed logger uses any Futures methods, it 192 // shouldn't break in cases where reflection is missing/broken. 193 if (thrownAtomicReferenceFieldUpdaterFailure != null) { 194 log.get().log(Level.SEVERE, "UnsafeAtomicHelper is broken!", thrownUnsafeFailure); 195 log.get() 196 .log( 197 Level.SEVERE, 198 "AtomicReferenceFieldUpdaterAtomicHelper is broken!", 199 thrownAtomicReferenceFieldUpdaterFailure); 200 } 201 } 202 203 /** Waiter links form a Treiber stack, in the {@link #waiters} field. */ 204 private static final class Waiter { 205 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 206 207 volatile @Nullable Thread thread; 208 volatile @Nullable Waiter next; 209 210 /** 211 * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 212 * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 213 */ 214 Waiter(boolean unused) {} 215 216 Waiter() { 217 // avoid volatile write, write is made visible by subsequent CAS on waiters field 218 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 219 } 220 221 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 222 // field. 223 void setNext(@Nullable Waiter next) { 224 ATOMIC_HELPER.putNext(this, next); 225 } 226 227 void unpark() { 228 // This is racy with removeWaiter. The consequence of the race is that we may spuriously call 229 // unpark even though the thread has already removed itself from the list. But even if we did 230 // use a CAS, that race would still exist (it would just be ever so slightly smaller). 231 Thread w = thread; 232 if (w != null) { 233 thread = null; 234 LockSupport.unpark(w); 235 } 236 } 237 } 238 239 /** 240 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 241 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved 242 * by two things. 243 * 244 * <ul> 245 * <li>This is only called when a waiting thread times out or is interrupted. Both of which 246 * should be rare. 247 * <li>The waiters list should be very short. 248 * </ul> 249 */ 250 private void removeWaiter(Waiter node) { 251 node.thread = null; // mark as 'deleted' 252 restart: 253 while (true) { 254 Waiter pred = null; 255 Waiter curr = waiters; 256 if (curr == Waiter.TOMBSTONE) { 257 return; // give up if someone is calling complete 258 } 259 Waiter succ; 260 while (curr != null) { 261 succ = curr.next; 262 if (curr.thread != null) { // we aren't unlinking this node, update pred. 263 pred = curr; 264 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 265 pred.next = succ; 266 if (pred.thread == null) { // We raced with another node that unlinked pred. Restart. 267 continue restart; 268 } 269 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 270 continue restart; // We raced with an add or complete 271 } 272 curr = succ; 273 } 274 break; 275 } 276 } 277 278 /** Listeners also form a stack through the {@link #listeners} field. */ 279 private static final class Listener { 280 static final Listener TOMBSTONE = new Listener(); 281 // null only for TOMBSTONE 282 final @Nullable Runnable task; 283 // null only for TOMBSTONE 284 final @Nullable Executor executor; 285 286 // writes to next are made visible by subsequent CAS's on the listeners field 287 @Nullable Listener next; 288 289 Listener(Runnable task, Executor executor) { 290 this.task = task; 291 this.executor = executor; 292 } 293 294 Listener() { 295 this.task = null; 296 this.executor = null; 297 } 298 } 299 300 /** A special value to represent {@code null}. */ 301 private static final Object NULL = new Object(); 302 303 /** A special value to represent failure, when {@link #setException} is called successfully. */ 304 private static final class Failure { 305 static final Failure FALLBACK_INSTANCE = 306 new Failure( 307 new Throwable("Failure occurred while trying to finish a future.") { 308 @Override 309 public synchronized Throwable fillInStackTrace() { 310 return this; // no stack trace 311 } 312 }); 313 final Throwable exception; 314 315 Failure(Throwable exception) { 316 this.exception = checkNotNull(exception); 317 } 318 } 319 320 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 321 private static final class Cancellation { 322 // constants to use when GENERATE_CANCELLATION_CAUSES = false 323 static final @Nullable Cancellation CAUSELESS_INTERRUPTED; 324 static final @Nullable Cancellation CAUSELESS_CANCELLED; 325 326 static { 327 if (GENERATE_CANCELLATION_CAUSES) { 328 CAUSELESS_CANCELLED = null; 329 CAUSELESS_INTERRUPTED = null; 330 } else { 331 CAUSELESS_CANCELLED = new Cancellation(false, null); 332 CAUSELESS_INTERRUPTED = new Cancellation(true, null); 333 } 334 } 335 336 final boolean wasInterrupted; 337 final @Nullable Throwable cause; 338 339 Cancellation(boolean wasInterrupted, @Nullable Throwable cause) { 340 this.wasInterrupted = wasInterrupted; 341 this.cause = cause; 342 } 343 } 344 345 /** A special value that encodes the 'setFuture' state. */ 346 private static final class SetFuture<V extends @Nullable Object> implements Runnable { 347 final AbstractFuture<V> owner; 348 final ListenableFuture<? extends V> future; 349 350 SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) { 351 this.owner = owner; 352 this.future = future; 353 } 354 355 @Override 356 public void run() { 357 if (owner.value != this) { 358 // nothing to do, we must have been cancelled, don't bother inspecting the future. 359 return; 360 } 361 Object valueToSet = getFutureValue(future); 362 if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { 363 complete( 364 owner, 365 /* 366 * Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so 367 * don't invoke interruptTask. 368 */ 369 false); 370 } 371 } 372 } 373 374 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 375 // available. 376 /** 377 * This field encodes the current state of the future. 378 * 379 * <p>The valid values are: 380 * 381 * <ul> 382 * <li>{@code null} initial state, nothing has happened. 383 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 384 * <li>{@link Failure} terminal state, {@code setException} was called. 385 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 386 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 387 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null 388 * argument. 389 * </ul> 390 */ 391 private volatile @Nullable Object value; 392 393 /** All listeners. */ 394 private volatile @Nullable Listener listeners; 395 396 /** All waiting threads. */ 397 private volatile @Nullable Waiter waiters; 398 399 /** Constructor for use by subclasses. */ 400 protected AbstractFuture() {} 401 402 // Gets and Timed Gets 403 // 404 // * Be responsive to interruption 405 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 406 // waiters field. 407 // * Future completion is defined by when #value becomes non-null/non SetFuture 408 // * Future completion can be observed if the waiters field contains a TOMBSTONE 409 410 // Timed Get 411 // There are a few design constraints to consider 412 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 413 // have observed 12 micros on 64-bit linux systems to wake up a parked thread). So if the 414 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 415 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 416 // similar purposes. 417 // * We want to behave reasonably for timeouts of 0 418 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 419 // system scheduling and as such we could either miss our deadline, or unpark() could be delayed 420 // so that it looks like we timed out even though we didn't. For comparison FutureTask respects 421 // completion preferably and AQS is non-deterministic (depends on where in the queue the waiter 422 // is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node 423 // and we could use that to make a decision about whether or not we timed out prior to being 424 // unparked. 425 426 /** 427 * {@inheritDoc} 428 * 429 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 430 * current thread is interrupted during the call, even if the value is already available. 431 * 432 * @throws CancellationException {@inheritDoc} 433 */ 434 @SuppressWarnings("LabelledBreakTarget") // TODO(b/345814817): Maybe fix? 435 @CanIgnoreReturnValue 436 @Override 437 @ParametricNullness 438 public V get(long timeout, TimeUnit unit) 439 throws InterruptedException, TimeoutException, ExecutionException { 440 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop 441 // at the bottom and throw a timeoutexception. 442 final long timeoutNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit. 443 long remainingNanos = timeoutNanos; 444 if (Thread.interrupted()) { 445 throw new InterruptedException(); 446 } 447 @RetainedLocalRef Object localValue = value; 448 if (localValue != null & !(localValue instanceof SetFuture)) { 449 return getDoneValue(localValue); 450 } 451 // we delay calling nanoTime until we know we will need to either park or spin 452 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 453 long_wait_loop: 454 if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 455 Waiter oldHead = waiters; 456 if (oldHead != Waiter.TOMBSTONE) { 457 Waiter node = new Waiter(); 458 do { 459 node.setNext(oldHead); 460 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 461 while (true) { 462 OverflowAvoidingLockSupport.parkNanos(this, remainingNanos); 463 // Check interruption first, if we woke up due to interruption we need to honor that. 464 if (Thread.interrupted()) { 465 removeWaiter(node); 466 throw new InterruptedException(); 467 } 468 469 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 470 // wakeup 471 localValue = value; 472 if (localValue != null & !(localValue instanceof SetFuture)) { 473 return getDoneValue(localValue); 474 } 475 476 // timed out? 477 remainingNanos = endNanos - System.nanoTime(); 478 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 479 // Remove the waiter, one way or another we are done parking this thread. 480 removeWaiter(node); 481 break long_wait_loop; // jump down to the busy wait loop 482 } 483 } 484 } 485 oldHead = waiters; // re-read and loop. 486 } while (oldHead != Waiter.TOMBSTONE); 487 } 488 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 489 // waiter. 490 // requireNonNull is safe because value is always set before TOMBSTONE. 491 return getDoneValue(requireNonNull(value)); 492 } 493 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the 494 // waiters list 495 while (remainingNanos > 0) { 496 localValue = value; 497 if (localValue != null & !(localValue instanceof SetFuture)) { 498 return getDoneValue(localValue); 499 } 500 if (Thread.interrupted()) { 501 throw new InterruptedException(); 502 } 503 remainingNanos = endNanos - System.nanoTime(); 504 } 505 506 String futureToString = toString(); 507 final String unitString = unit.toString().toLowerCase(Locale.ROOT); 508 String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT); 509 // Only report scheduling delay if larger than our spin threshold - otherwise it's just noise 510 if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) { 511 // We over-waited for our timeout. 512 message += " (plus "; 513 long overWaitNanos = -remainingNanos; 514 long overWaitUnits = unit.convert(overWaitNanos, NANOSECONDS); 515 long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits); 516 boolean shouldShowExtraNanos = 517 overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS; 518 if (overWaitUnits > 0) { 519 message += overWaitUnits + " " + unitString; 520 if (shouldShowExtraNanos) { 521 message += ","; 522 } 523 message += " "; 524 } 525 if (shouldShowExtraNanos) { 526 message += overWaitLeftoverNanos + " nanoseconds "; 527 } 528 529 message += "delay)"; 530 } 531 // It's confusing to see a completed future in a timeout message; if isDone() returns false, 532 // then we know it must have given a pending toString value earlier. If not, then the future 533 // completed after the timeout expired, and the message might be success. 534 if (isDone()) { 535 throw new TimeoutException(message + " but future completed as timeout expired"); 536 } 537 throw new TimeoutException(message + " for " + futureToString); 538 } 539 540 /** 541 * {@inheritDoc} 542 * 543 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 544 * current thread is interrupted during the call, even if the value is already available. 545 * 546 * @throws CancellationException {@inheritDoc} 547 */ 548 @CanIgnoreReturnValue 549 @Override 550 @ParametricNullness 551 public V get() throws InterruptedException, ExecutionException { 552 if (Thread.interrupted()) { 553 throw new InterruptedException(); 554 } 555 @RetainedLocalRef Object localValue = value; 556 if (localValue != null & !(localValue instanceof SetFuture)) { 557 return getDoneValue(localValue); 558 } 559 Waiter oldHead = waiters; 560 if (oldHead != Waiter.TOMBSTONE) { 561 Waiter node = new Waiter(); 562 do { 563 node.setNext(oldHead); 564 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 565 // we are on the stack, now wait for completion. 566 while (true) { 567 LockSupport.park(this); 568 // Check interruption first, if we woke up due to interruption we need to honor that. 569 if (Thread.interrupted()) { 570 removeWaiter(node); 571 throw new InterruptedException(); 572 } 573 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 574 // wakeup 575 localValue = value; 576 if (localValue != null & !(localValue instanceof SetFuture)) { 577 return getDoneValue(localValue); 578 } 579 } 580 } 581 oldHead = waiters; // re-read and loop. 582 } while (oldHead != Waiter.TOMBSTONE); 583 } 584 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 585 // waiter. 586 // requireNonNull is safe because value is always set before TOMBSTONE. 587 return getDoneValue(requireNonNull(value)); 588 } 589 590 /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ 591 @ParametricNullness 592 private V getDoneValue(Object obj) throws ExecutionException { 593 // While this seems like it might be too branch-y, simple benchmarking proves it to be 594 // unmeasurable (comparing done AbstractFutures with immediateFuture) 595 if (obj instanceof Cancellation) { 596 Cancellation cancellation = (Cancellation) obj; 597 Throwable cause = cancellation.cause; 598 throw cancellationExceptionWithCause("Task was cancelled.", cause); 599 } else if (obj instanceof Failure) { 600 Failure failure = (Failure) obj; 601 Throwable exception = failure.exception; 602 throw new ExecutionException(exception); 603 } else if (obj == NULL) { 604 /* 605 * It's safe to return null because we would only have stored it in the first place if it were 606 * a valid value for V. 607 */ 608 return uncheckedNull(); 609 } else { 610 @SuppressWarnings("unchecked") // this is the only other option 611 V asV = (V) obj; 612 return asV; 613 } 614 } 615 616 @Override 617 public boolean isDone() { 618 @RetainedLocalRef Object localValue = value; 619 return localValue != null & !(localValue instanceof SetFuture); 620 } 621 622 @Override 623 public boolean isCancelled() { 624 @RetainedLocalRef Object localValue = value; 625 return localValue instanceof Cancellation; 626 } 627 628 /** 629 * {@inheritDoc} 630 * 631 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain 632 * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate 633 * {@code Future} that was supplied in the {@code setFuture} call. 634 * 635 * <p>Rather than override this method to perform additional cancellation work or cleanup, 636 * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link 637 * #wasInterrupted} as necessary. This ensures that the work is done even if the future is 638 * cancelled without a call to {@code cancel}, such as by calling {@code 639 * setFuture(cancelledFuture)}. 640 * 641 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 642 * acquire other locks, risking deadlocks. 643 */ 644 @CanIgnoreReturnValue 645 @Override 646 public boolean cancel(boolean mayInterruptIfRunning) { 647 @RetainedLocalRef Object localValue = value; 648 boolean rValue = false; 649 if (localValue == null | localValue instanceof SetFuture) { 650 // Try to delay allocating the exception. At this point we may still lose the CAS, but it is 651 // certainly less likely. 652 Object valueToSet = 653 GENERATE_CANCELLATION_CAUSES 654 ? new Cancellation( 655 mayInterruptIfRunning, new CancellationException("Future.cancel() was called.")) 656 /* 657 * requireNonNull is safe because we've initialized these if 658 * !GENERATE_CANCELLATION_CAUSES. 659 * 660 * TODO(cpovirk): Maybe it would be cleaner to define a CancellationSupplier interface 661 * with two implementations, one that contains causeless Cancellation instances and 662 * the other of which creates new Cancellation instances each time it's called? Yet 663 * another alternative is to fill in a non-null value for each of the fields no matter 664 * what and to just not use it if !GENERATE_CANCELLATION_CAUSES. 665 */ 666 : requireNonNull( 667 mayInterruptIfRunning 668 ? Cancellation.CAUSELESS_INTERRUPTED 669 : Cancellation.CAUSELESS_CANCELLED); 670 AbstractFuture<?> abstractFuture = this; 671 while (true) { 672 if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { 673 rValue = true; 674 complete(abstractFuture, mayInterruptIfRunning); 675 if (localValue instanceof SetFuture) { 676 // propagate cancellation to the future set in setfuture, this is racy, and we don't 677 // care if we are successful or not. 678 ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; 679 if (futureToPropagateTo instanceof Trusted) { 680 // If the future is a TrustedFuture then we specifically avoid calling cancel() 681 // this has 2 benefits 682 // 1. for long chains of futures strung together with setFuture we consume less stack 683 // 2. we avoid allocating Cancellation objects at every level of the cancellation 684 // chain 685 // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and 686 // does nothing but delegate to this method. 687 AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo; 688 localValue = trusted.value; 689 if (localValue == null | localValue instanceof SetFuture) { 690 abstractFuture = trusted; 691 continue; // loop back up and try to complete the new future 692 } 693 } else { 694 // not a TrustedFuture, call cancel directly. 695 futureToPropagateTo.cancel(mayInterruptIfRunning); 696 } 697 } 698 break; 699 } 700 // obj changed, reread 701 localValue = abstractFuture.value; 702 if (!(localValue instanceof SetFuture)) { 703 // obj cannot be null at this point, because value can only change from null to non-null. 704 // So if value changed (and it did since we lost the CAS), then it cannot be null and 705 // since it isn't a SetFuture, then the future must be done and we should exit the loop 706 break; 707 } 708 } 709 } 710 return rValue; 711 } 712 713 /** 714 * Subclasses can override this method to implement interruption of the future's computation. The 715 * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}. 716 * 717 * <p>The default implementation does nothing. 718 * 719 * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, checking 720 * {@link #wasInterrupted} to decide whether to interrupt your task. 721 * 722 * @since 10.0 723 */ 724 protected void interruptTask() {} 725 726 /** 727 * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code 728 * true}. 729 * 730 * @since 14.0 731 */ 732 protected final boolean wasInterrupted() { 733 @RetainedLocalRef Object localValue = value; 734 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 735 } 736 737 /** 738 * {@inheritDoc} 739 * 740 * @since 10.0 741 */ 742 @Override 743 public void addListener(Runnable listener, Executor executor) { 744 checkNotNull(listener, "Runnable was null."); 745 checkNotNull(executor, "Executor was null."); 746 // Checking isDone and listeners != TOMBSTONE may seem redundant, but our contract for 747 // addListener says that listeners execute 'immediate' if the future isDone(). However, our 748 // protocol for completing a future is to assign the value field (which sets isDone to true) and 749 // then to release waiters, followed by executing afterDone(), followed by releasing listeners. 750 // That means that it is possible to observe that the future isDone and that your listeners 751 // don't execute 'immediately'. By checking isDone here we avoid that. 752 // A corollary to all that is that we don't need to check isDone inside the loop because if we 753 // get into the loop we know that we weren't done when we entered and therefore we aren't under 754 // an obligation to execute 'immediately'. 755 if (!isDone()) { 756 Listener oldHead = listeners; 757 if (oldHead != Listener.TOMBSTONE) { 758 Listener newNode = new Listener(listener, executor); 759 do { 760 newNode.next = oldHead; 761 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 762 return; 763 } 764 oldHead = listeners; // re-read 765 } while (oldHead != Listener.TOMBSTONE); 766 } 767 } 768 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 769 // the listener. 770 executeListener(listener, executor); 771 } 772 773 /** 774 * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or 775 * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns, 776 * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was 777 * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code 778 * Future} may have previously been set asynchronously, in which case its result may not be known 779 * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 780 * method, only by a call to {@link #cancel}. 781 * 782 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 783 * acquire other locks, risking deadlocks. 784 * 785 * @param value the value to be used as the result 786 * @return true if the attempt was accepted, completing the {@code Future} 787 */ 788 @CanIgnoreReturnValue 789 protected boolean set(@ParametricNullness V value) { 790 Object valueToSet = value == null ? NULL : value; 791 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 792 complete(this, /*callInterruptTask=*/ false); 793 return true; 794 } 795 return false; 796 } 797 798 /** 799 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 800 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 801 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> 802 * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the 803 * {@code Future} may have previously been set asynchronously, in which case its result may not be 804 * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 805 * method, only by a call to {@link #cancel}. 806 * 807 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 808 * acquire other locks, risking deadlocks. 809 * 810 * @param throwable the exception to be used as the failed result 811 * @return true if the attempt was accepted, completing the {@code Future} 812 */ 813 @CanIgnoreReturnValue 814 protected boolean setException(Throwable throwable) { 815 Object valueToSet = new Failure(checkNotNull(throwable)); 816 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 817 complete(this, /*callInterruptTask=*/ false); 818 return true; 819 } 820 return false; 821 } 822 823 /** 824 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 825 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 826 * (including "set asynchronously," defined below). 827 * 828 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call 829 * is accepted, then this future is guaranteed to have been completed with the supplied future by 830 * the time this method returns. If the supplied future is not done and the call is accepted, then 831 * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known, 832 * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}. 833 * 834 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 835 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 836 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 837 * Future}. 838 * 839 * <p>Note that, even if the supplied future is cancelled and it causes this future to complete, 840 * it will never trigger interruption behavior. In particular, it will not cause this future to 841 * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not 842 * return {@code true}. 843 * 844 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 845 * acquire other locks, risking deadlocks. 846 * 847 * @param future the future to delegate to 848 * @return true if the attempt was accepted, indicating that the {@code Future} was not previously 849 * cancelled or set. 850 * @since 19.0 851 */ 852 @CanIgnoreReturnValue 853 @SuppressWarnings("Interruption") // We are propagating an interrupt from a caller. 854 protected boolean setFuture(ListenableFuture<? extends V> future) { 855 checkNotNull(future); 856 @RetainedLocalRef Object localValue = value; 857 if (localValue == null) { 858 if (future.isDone()) { 859 Object value = getFutureValue(future); 860 if (ATOMIC_HELPER.casValue(this, null, value)) { 861 complete( 862 this, 863 /* 864 * Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so 865 * don't invoke interruptTask. 866 */ 867 false); 868 return true; 869 } 870 return false; 871 } 872 SetFuture<V> valueToSet = new SetFuture<>(this, future); 873 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 874 // the listener is responsible for calling completeWithFuture, directExecutor is appropriate 875 // since all we are doing is unpacking a completed future which should be fast. 876 try { 877 future.addListener(valueToSet, DirectExecutor.INSTANCE); 878 } catch (Throwable t) { 879 // Any Exception is either a RuntimeException or sneaky checked exception. 880 // 881 // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this 882 // must have been caused by addListener itself. The most likely explanation is a 883 // misconfigured mock. Try to switch to Failure. 884 Failure failure; 885 try { 886 failure = new Failure(t); 887 } catch (Exception | Error oomMostLikely) { // sneaky checked exception 888 failure = Failure.FALLBACK_INSTANCE; 889 } 890 // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok. 891 boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); 892 } 893 return true; 894 } 895 localValue = value; // we lost the cas, fall through and maybe cancel 896 } 897 // The future has already been set to something. If it is cancellation we should cancel the 898 // incoming future. 899 if (localValue instanceof Cancellation) { 900 // we don't care if it fails, this is best-effort. 901 future.cancel(((Cancellation) localValue).wasInterrupted); 902 } 903 return false; 904 } 905 906 /** 907 * Returns a value that satisfies the contract of the {@link #value} field based on the state of 908 * given future. 909 * 910 * <p>This is approximately the inverse of {@link #getDoneValue(Object)} 911 */ 912 private static Object getFutureValue(ListenableFuture<?> future) { 913 if (future instanceof Trusted) { 914 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 915 // override .get() (since it is final) and therefore this is equivalent to calling .get() 916 // and unpacking the exceptions like we do below (just much faster because it is a single 917 // field read instead of a read, several branches and possibly creating exceptions). 918 Object v = ((AbstractFuture<?>) future).value; 919 if (v instanceof Cancellation) { 920 // If the other future was interrupted, clear the interrupted bit while preserving the cause 921 // this will make it consistent with how non-trustedfutures work which cannot propagate the 922 // wasInterrupted bit 923 Cancellation c = (Cancellation) v; 924 if (c.wasInterrupted) { 925 v = 926 c.cause != null 927 ? new Cancellation(/* wasInterrupted= */ false, c.cause) 928 : Cancellation.CAUSELESS_CANCELLED; 929 } 930 } 931 // requireNonNull is safe as long as we call this method only on completed futures. 932 return requireNonNull(v); 933 } 934 if (future instanceof InternalFutureFailureAccess) { 935 Throwable throwable = 936 InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess) future); 937 if (throwable != null) { 938 return new Failure(throwable); 939 } 940 } 941 boolean wasCancelled = future.isCancelled(); 942 // Don't allocate a CancellationException if it's not necessary 943 if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) { 944 /* 945 * requireNonNull is safe because we've initialized CAUSELESS_CANCELLED if 946 * !GENERATE_CANCELLATION_CAUSES. 947 */ 948 return requireNonNull(Cancellation.CAUSELESS_CANCELLED); 949 } 950 // Otherwise calculate the value by calling .get() 951 try { 952 Object v = getUninterruptibly(future); 953 if (wasCancelled) { 954 return new Cancellation( 955 false, 956 new IllegalArgumentException( 957 "get() did not throw CancellationException, despite reporting " 958 + "isCancelled() == true: " 959 + future)); 960 } 961 return v == null ? NULL : v; 962 } catch (ExecutionException exception) { 963 if (wasCancelled) { 964 return new Cancellation( 965 false, 966 new IllegalArgumentException( 967 "get() did not throw CancellationException, despite reporting " 968 + "isCancelled() == true: " 969 + future, 970 exception)); 971 } 972 return new Failure(exception.getCause()); 973 } catch (CancellationException cancellation) { 974 if (!wasCancelled) { 975 return new Failure( 976 new IllegalArgumentException( 977 "get() threw CancellationException, despite reporting isCancelled() == false: " 978 + future, 979 cancellation)); 980 } 981 return new Cancellation(false, cancellation); 982 } catch (Exception | Error t) { // sneaky checked exception 983 return new Failure(t); 984 } 985 } 986 987 /** 988 * An inlined private copy of {@link Uninterruptibles#getUninterruptibly} used to break an 989 * internal dependency on other /util/concurrent classes. 990 */ 991 @ParametricNullness 992 private static <V extends @Nullable Object> V getUninterruptibly(Future<V> future) 993 throws ExecutionException { 994 boolean interrupted = false; 995 try { 996 while (true) { 997 try { 998 return future.get(); 999 } catch (InterruptedException e) { 1000 interrupted = true; 1001 } 1002 } 1003 } finally { 1004 if (interrupted) { 1005 Thread.currentThread().interrupt(); 1006 } 1007 } 1008 } 1009 1010 /** Unblocks all threads and runs all listeners. */ 1011 private static void complete(AbstractFuture<?> param, boolean callInterruptTask) { 1012 // Declare a "true" local variable so that the Checker Framework will infer nullness. 1013 AbstractFuture<?> future = param; 1014 1015 Listener next = null; 1016 outer: 1017 while (true) { 1018 future.releaseWaiters(); 1019 /* 1020 * We call interruptTask() immediately before afterDone() so that migrating between the two 1021 * can be a no-op. 1022 */ 1023 if (callInterruptTask) { 1024 future.interruptTask(); 1025 /* 1026 * Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so don't 1027 * invoke interruptTask on any subsequent futures. 1028 */ 1029 callInterruptTask = false; 1030 } 1031 // We call this before the listeners in order to avoid needing to manage a separate stack data 1032 // structure for them. Also, some implementations rely on this running prior to listeners 1033 // so that the cleanup work is visible to listeners. 1034 // afterDone() should be generally fast and only used for cleanup work... but in theory can 1035 // also be recursive and create StackOverflowErrors 1036 future.afterDone(); 1037 // push the current set of listeners onto next 1038 next = future.clearListeners(next); 1039 future = null; 1040 while (next != null) { 1041 Listener curr = next; 1042 next = next.next; 1043 /* 1044 * requireNonNull is safe because the listener stack never contains TOMBSTONE until after 1045 * clearListeners. 1046 */ 1047 Runnable task = requireNonNull(curr.task); 1048 if (task instanceof SetFuture) { 1049 SetFuture<?> setFuture = (SetFuture<?>) task; 1050 // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long 1051 // chains of SetFutures 1052 // Handling this special case is important because there is no way to pass an executor to 1053 // setFuture, so a user couldn't break the chain by doing this themselves. It is also 1054 // potentially common if someone writes a recursive Futures.transformAsync transformer. 1055 future = setFuture.owner; 1056 if (future.value == setFuture) { 1057 Object valueToSet = getFutureValue(setFuture.future); 1058 if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { 1059 continue outer; 1060 } 1061 } 1062 // otherwise the future we were trying to set is already done. 1063 } else { 1064 /* 1065 * requireNonNull is safe because the listener stack never contains TOMBSTONE until after 1066 * clearListeners. 1067 */ 1068 executeListener(task, requireNonNull(curr.executor)); 1069 } 1070 } 1071 break; 1072 } 1073 } 1074 1075 /** 1076 * Callback method that is called exactly once after the future is completed. 1077 * 1078 * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. 1079 * 1080 * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is 1081 * intended for very lightweight cleanup work, for example, timing statistics or clearing fields. 1082 * If your task does anything heavier consider, just using a listener with an executor. 1083 * 1084 * @since 20.0 1085 */ 1086 @ForOverride 1087 protected void afterDone() {} 1088 1089 // TODO(b/114236866): Inherit doc from InternalFutureFailureAccess. Also, -link to its URL. 1090 /** 1091 * Usually returns {@code null} but, if this {@code Future} has failed, may <i>optionally</i> 1092 * return the cause of the failure. "Failure" means specifically "completed with an exception"; it 1093 * does not include "was cancelled." To be explicit: If this method returns a non-null value, 1094 * then: 1095 * 1096 * <ul> 1097 * <li>{@code isDone()} must return {@code true} 1098 * <li>{@code isCancelled()} must return {@code false} 1099 * <li>{@code get()} must not block, and it must throw an {@code ExecutionException} with the 1100 * return value of this method as its cause 1101 * </ul> 1102 * 1103 * <p>This method is {@code protected} so that classes like {@code 1104 * com.google.common.util.concurrent.SettableFuture} do not expose it to their users as an 1105 * instance method. In the unlikely event that you need to call this method, call {@link 1106 * InternalFutures#tryInternalFastPathGetFailure(InternalFutureFailureAccess)}. 1107 * 1108 * @since 27.0 1109 */ 1110 @Override 1111 /* 1112 * We should annotate the superclass, InternalFutureFailureAccess, to say that its copy of this 1113 * method returns @Nullable, too. However, we're not sure if we want to make any changes to that 1114 * class, since it's in a separate artifact that we planned to release only a single version of. 1115 */ 1116 protected final @Nullable Throwable tryInternalFastPathGetFailure() { 1117 if (this instanceof Trusted) { 1118 @RetainedLocalRef Object localValue = value; 1119 if (localValue instanceof Failure) { 1120 return ((Failure) localValue).exception; 1121 } 1122 } 1123 return null; 1124 } 1125 1126 /** 1127 * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts) 1128 * the given future (if available). 1129 */ 1130 final void maybePropagateCancellationTo(@Nullable Future<?> related) { 1131 if (related != null & isCancelled()) { 1132 related.cancel(wasInterrupted()); 1133 } 1134 } 1135 1136 /** Releases all threads in the {@link #waiters} list, and clears the list. */ 1137 private void releaseWaiters() { 1138 Waiter head = ATOMIC_HELPER.gasWaiters(this, Waiter.TOMBSTONE); 1139 for (Waiter currentWaiter = head; currentWaiter != null; currentWaiter = currentWaiter.next) { 1140 currentWaiter.unpark(); 1141 } 1142 } 1143 1144 /** 1145 * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently 1146 * added first. 1147 */ 1148 private @Nullable Listener clearListeners(@Nullable Listener onto) { 1149 // We need to 1150 // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that 1151 // to synchronize with us 1152 // 2. reverse the linked list, because despite our rather clear contract, people depend on us 1153 // executing listeners in the order they were added 1154 // 3. push all the items onto 'onto' and return the new head of the stack 1155 Listener head = ATOMIC_HELPER.gasListeners(this, Listener.TOMBSTONE); 1156 Listener reversedList = onto; 1157 while (head != null) { 1158 Listener tmp = head; 1159 head = head.next; 1160 tmp.next = reversedList; 1161 reversedList = tmp; 1162 } 1163 return reversedList; 1164 } 1165 1166 // TODO(user): move parts into a default method on ListenableFuture? 1167 @Override 1168 public String toString() { 1169 // TODO(cpovirk): Presize to something plausible? 1170 StringBuilder builder = new StringBuilder(); 1171 if (getClass().getName().startsWith("com.google.common.util.concurrent.")) { 1172 builder.append(getClass().getSimpleName()); 1173 } else { 1174 builder.append(getClass().getName()); 1175 } 1176 builder.append('@').append(toHexString(identityHashCode(this))).append("[status="); 1177 if (isCancelled()) { 1178 builder.append("CANCELLED"); 1179 } else if (isDone()) { 1180 addDoneString(builder); 1181 } else { 1182 addPendingString(builder); // delegates to addDoneString if future completes midway 1183 } 1184 return builder.append("]").toString(); 1185 } 1186 1187 /** 1188 * Provide a human-readable explanation of why this future has not yet completed. 1189 * 1190 * @return null if an explanation cannot be provided (e.g. because the future is done). 1191 * @since 23.0 1192 */ 1193 protected @Nullable String pendingToString() { 1194 // TODO(diamondm) consider moving this into addPendingString so it's always in the output 1195 if (this instanceof ScheduledFuture) { 1196 return "remaining delay=[" + ((ScheduledFuture) this).getDelay(MILLISECONDS) + " ms]"; 1197 } 1198 return null; 1199 } 1200 1201 @SuppressWarnings("CatchingUnchecked") // sneaky checked exception 1202 private void addPendingString(StringBuilder builder) { 1203 // Capture current builder length so it can be truncated if this future ends up completing while 1204 // the toString is being calculated 1205 int truncateLength = builder.length(); 1206 1207 builder.append("PENDING"); 1208 1209 @RetainedLocalRef Object localValue = value; 1210 if (localValue instanceof SetFuture) { 1211 builder.append(", setFuture=["); 1212 appendUserObject(builder, ((SetFuture) localValue).future); 1213 builder.append("]"); 1214 } else { 1215 String pendingDescription; 1216 try { 1217 pendingDescription = Strings.emptyToNull(pendingToString()); 1218 } catch (Exception | StackOverflowError e) { 1219 // Any Exception is either a RuntimeException or sneaky checked exception. 1220 // 1221 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1222 // subclass is implemented with bugs similar to the subclass. 1223 pendingDescription = "Exception thrown from implementation: " + e.getClass(); 1224 } 1225 if (pendingDescription != null) { 1226 builder.append(", info=[").append(pendingDescription).append("]"); 1227 } 1228 } 1229 1230 // The future may complete while calculating the toString, so we check once more to see if the 1231 // future is done 1232 if (isDone()) { 1233 // Truncate anything that was appended before realizing this future is done 1234 builder.delete(truncateLength, builder.length()); 1235 addDoneString(builder); 1236 } 1237 } 1238 1239 @SuppressWarnings("CatchingUnchecked") // sneaky checked exception 1240 private void addDoneString(StringBuilder builder) { 1241 try { 1242 V value = getUninterruptibly(this); 1243 builder.append("SUCCESS, result=["); 1244 appendResultObject(builder, value); 1245 builder.append("]"); 1246 } catch (ExecutionException e) { 1247 builder.append("FAILURE, cause=[").append(e.getCause()).append("]"); 1248 } catch (CancellationException e) { 1249 builder.append("CANCELLED"); // shouldn't be reachable 1250 } catch (Exception e) { // sneaky checked exception 1251 builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]"); 1252 } 1253 } 1254 1255 /** 1256 * Any object can be the result of a Future, and not every object has a reasonable toString() 1257 * implementation. Using a reconstruction of the default Object.toString() prevents OOMs and stack 1258 * overflows, and helps avoid sensitive data inadvertently ending up in exception messages. 1259 */ 1260 private void appendResultObject(StringBuilder builder, @Nullable Object o) { 1261 if (o == null) { 1262 builder.append("null"); 1263 } else if (o == this) { 1264 builder.append("this future"); 1265 } else { 1266 builder 1267 .append(o.getClass().getName()) 1268 .append("@") 1269 .append(Integer.toHexString(System.identityHashCode(o))); 1270 } 1271 } 1272 1273 /** Helper for printing user supplied objects into our toString method. */ 1274 @SuppressWarnings("CatchingUnchecked") // sneaky checked exception 1275 private void appendUserObject(StringBuilder builder, @Nullable Object o) { 1276 // This is some basic recursion detection for when people create cycles via set/setFuture or 1277 // when deep chains of futures exist resulting in a StackOverflowException. We could detect 1278 // arbitrary cycles using a thread local but this should be a good enough solution (it is also 1279 // what jdk collections do in these cases) 1280 try { 1281 if (o == this) { 1282 builder.append("this future"); 1283 } else { 1284 builder.append(o); 1285 } 1286 } catch (Exception | StackOverflowError e) { 1287 // Any Exception is either a RuntimeException or sneaky checked exception. 1288 // 1289 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1290 // user object is implemented with bugs similar to the user object. 1291 builder.append("Exception thrown from implementation: ").append(e.getClass()); 1292 } 1293 } 1294 1295 /** 1296 * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain 1297 * RuntimeException runtime exceptions} thrown by the executor. 1298 */ 1299 @SuppressWarnings("CatchingUnchecked") // sneaky checked exception 1300 private static void executeListener(Runnable runnable, Executor executor) { 1301 try { 1302 executor.execute(runnable); 1303 } catch (Exception e) { // sneaky checked exception 1304 // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if 1305 // we're given a bad one. We only catch Exception because we want Errors to propagate up. 1306 log.get() 1307 .log( 1308 Level.SEVERE, 1309 "RuntimeException while executing runnable " 1310 + runnable 1311 + " with executor " 1312 + executor, 1313 e); 1314 } 1315 } 1316 1317 private abstract static class AtomicHelper { 1318 /** Non-volatile write of the thread to the {@link Waiter#thread} field. */ 1319 abstract void putThread(Waiter waiter, Thread newValue); 1320 1321 /** Non-volatile write of the waiter to the {@link Waiter#next} field. */ 1322 abstract void putNext(Waiter waiter, @Nullable Waiter newValue); 1323 1324 /** Performs a CAS operation on the {@link AbstractFuture#waiters} field. */ 1325 abstract boolean casWaiters( 1326 AbstractFuture<?> future, @Nullable Waiter expect, @Nullable Waiter update); 1327 1328 /** Performs a CAS operation on the {@link AbstractFuture#listeners} field. */ 1329 abstract boolean casListeners( 1330 AbstractFuture<?> future, @Nullable Listener expect, Listener update); 1331 1332 /** Performs a GAS operation on the {@link AbstractFuture#waiters} field. */ 1333 abstract Waiter gasWaiters(AbstractFuture<?> future, Waiter update); 1334 1335 /** Performs a GAS operation on the {@link AbstractFuture#listeners} field. */ 1336 abstract Listener gasListeners(AbstractFuture<?> future, Listener update); 1337 1338 /** Performs a CAS operation on the {@link AbstractFuture#value} field. */ 1339 abstract boolean casValue(AbstractFuture<?> future, @Nullable Object expect, Object update); 1340 } 1341 1342 /** 1343 * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. 1344 * 1345 * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot 1346 * be accessed. 1347 */ 1348 @SuppressWarnings({"SunApi", "removal"}) // b/345822163 1349 private static final class UnsafeAtomicHelper extends AtomicHelper { 1350 static final Unsafe UNSAFE; 1351 static final long LISTENERS_OFFSET; 1352 static final long WAITERS_OFFSET; 1353 static final long VALUE_OFFSET; 1354 static final long WAITER_THREAD_OFFSET; 1355 static final long WAITER_NEXT_OFFSET; 1356 1357 static { 1358 Unsafe unsafe = null; 1359 try { 1360 unsafe = Unsafe.getUnsafe(); 1361 } catch (SecurityException tryReflectionInstead) { 1362 try { 1363 unsafe = 1364 AccessController.doPrivileged( 1365 new PrivilegedExceptionAction<Unsafe>() { 1366 @Override 1367 public Unsafe run() throws Exception { 1368 Class<Unsafe> k = Unsafe.class; 1369 for (Field f : k.getDeclaredFields()) { 1370 f.setAccessible(true); 1371 Object x = f.get(null); 1372 if (k.isInstance(x)) { 1373 return k.cast(x); 1374 } 1375 } 1376 throw new NoSuchFieldError("the Unsafe"); 1377 } 1378 }); 1379 } catch (PrivilegedActionException e) { 1380 throw new RuntimeException("Could not initialize intrinsics", e.getCause()); 1381 } 1382 } 1383 try { 1384 Class<?> abstractFuture = AbstractFuture.class; 1385 WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters")); 1386 LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners")); 1387 VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value")); 1388 WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread")); 1389 WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next")); 1390 UNSAFE = unsafe; 1391 } catch (NoSuchFieldException e) { 1392 throw new RuntimeException(e); 1393 } 1394 } 1395 1396 @Override 1397 void putThread(Waiter waiter, Thread newValue) { 1398 UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue); 1399 } 1400 1401 @Override 1402 void putNext(Waiter waiter, @Nullable Waiter newValue) { 1403 UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue); 1404 } 1405 1406 @Override 1407 boolean casWaiters(AbstractFuture<?> future, @Nullable Waiter expect, @Nullable Waiter update) { 1408 return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update); 1409 } 1410 1411 @Override 1412 boolean casListeners(AbstractFuture<?> future, @Nullable Listener expect, Listener update) { 1413 return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update); 1414 } 1415 1416 @Override 1417 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1418 return (Listener) UNSAFE.getAndSetObject(future, LISTENERS_OFFSET, update); 1419 } 1420 1421 @Override 1422 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1423 return (Waiter) UNSAFE.getAndSetObject(future, WAITERS_OFFSET, update); 1424 } 1425 1426 @Override 1427 boolean casValue(AbstractFuture<?> future, @Nullable Object expect, Object update) { 1428 return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update); 1429 } 1430 } 1431 1432 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 1433 private static final class AtomicReferenceFieldUpdaterAtomicHelper extends AtomicHelper { 1434 final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; 1435 final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; 1436 final AtomicReferenceFieldUpdater<? super AbstractFuture<?>, Waiter> waitersUpdater; 1437 final AtomicReferenceFieldUpdater<? super AbstractFuture<?>, Listener> listenersUpdater; 1438 final AtomicReferenceFieldUpdater<? super AbstractFuture<?>, Object> valueUpdater; 1439 1440 AtomicReferenceFieldUpdaterAtomicHelper( 1441 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, 1442 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, 1443 AtomicReferenceFieldUpdater<? super AbstractFuture<?>, Waiter> waitersUpdater, 1444 AtomicReferenceFieldUpdater<? super AbstractFuture<?>, Listener> listenersUpdater, 1445 AtomicReferenceFieldUpdater<? super AbstractFuture<?>, Object> valueUpdater) { 1446 this.waiterThreadUpdater = waiterThreadUpdater; 1447 this.waiterNextUpdater = waiterNextUpdater; 1448 this.waitersUpdater = waitersUpdater; 1449 this.listenersUpdater = listenersUpdater; 1450 this.valueUpdater = valueUpdater; 1451 } 1452 1453 @Override 1454 void putThread(Waiter waiter, Thread newValue) { 1455 waiterThreadUpdater.lazySet(waiter, newValue); 1456 } 1457 1458 @Override 1459 void putNext(Waiter waiter, @Nullable Waiter newValue) { 1460 waiterNextUpdater.lazySet(waiter, newValue); 1461 } 1462 1463 @Override 1464 boolean casWaiters(AbstractFuture<?> future, @Nullable Waiter expect, @Nullable Waiter update) { 1465 return waitersUpdater.compareAndSet(future, expect, update); 1466 } 1467 1468 @Override 1469 boolean casListeners(AbstractFuture<?> future, @Nullable Listener expect, Listener update) { 1470 return listenersUpdater.compareAndSet(future, expect, update); 1471 } 1472 1473 @Override 1474 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1475 return listenersUpdater.getAndSet(future, update); 1476 } 1477 1478 @Override 1479 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1480 return waitersUpdater.getAndSet(future, update); 1481 } 1482 1483 @Override 1484 boolean casValue(AbstractFuture<?> future, @Nullable Object expect, Object update) { 1485 return valueUpdater.compareAndSet(future, expect, update); 1486 } 1487 } 1488 1489 /** 1490 * {@link AtomicHelper} based on {@code synchronized} and volatile writes. 1491 * 1492 * <p>This is an implementation of last resort for when certain basic VM features are broken (like 1493 * AtomicReferenceFieldUpdater). 1494 */ 1495 private static final class SynchronizedHelper extends AtomicHelper { 1496 @Override 1497 void putThread(Waiter waiter, Thread newValue) { 1498 waiter.thread = newValue; 1499 } 1500 1501 @Override 1502 void putNext(Waiter waiter, @Nullable Waiter newValue) { 1503 waiter.next = newValue; 1504 } 1505 1506 @Override 1507 boolean casWaiters(AbstractFuture<?> future, @Nullable Waiter expect, @Nullable Waiter update) { 1508 synchronized (future) { 1509 if (future.waiters == expect) { 1510 future.waiters = update; 1511 return true; 1512 } 1513 return false; 1514 } 1515 } 1516 1517 @Override 1518 boolean casListeners(AbstractFuture<?> future, @Nullable Listener expect, Listener update) { 1519 synchronized (future) { 1520 if (future.listeners == expect) { 1521 future.listeners = update; 1522 return true; 1523 } 1524 return false; 1525 } 1526 } 1527 1528 @Override 1529 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1530 synchronized (future) { 1531 Listener old = future.listeners; 1532 if (old != update) { 1533 future.listeners = update; 1534 } 1535 return old; 1536 } 1537 } 1538 1539 @Override 1540 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1541 synchronized (future) { 1542 Waiter old = future.waiters; 1543 if (old != update) { 1544 future.waiters = update; 1545 } 1546 return old; 1547 } 1548 } 1549 1550 @Override 1551 boolean casValue(AbstractFuture<?> future, @Nullable Object expect, Object update) { 1552 synchronized (future) { 1553 if (future.value == expect) { 1554 future.value = update; 1555 return true; 1556 } 1557 return false; 1558 } 1559 } 1560 } 1561 1562 private static CancellationException cancellationExceptionWithCause( 1563 String message, @Nullable Throwable cause) { 1564 CancellationException exception = new CancellationException(message); 1565 exception.initCause(cause); 1566 return exception; 1567 } 1568}