001/* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.util.concurrent.NullnessCasts.uncheckedNull; 019import static java.lang.Integer.toHexString; 020import static java.lang.System.identityHashCode; 021import static java.util.Objects.requireNonNull; 022import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 023 024import com.google.common.annotations.Beta; 025import com.google.common.annotations.GwtCompatible; 026import com.google.common.base.Strings; 027import com.google.common.util.concurrent.internal.InternalFutureFailureAccess; 028import com.google.common.util.concurrent.internal.InternalFutures; 029import com.google.errorprone.annotations.CanIgnoreReturnValue; 030import com.google.errorprone.annotations.ForOverride; 031import com.google.j2objc.annotations.ReflectionSupport; 032import java.security.AccessController; 033import java.security.PrivilegedActionException; 034import java.security.PrivilegedExceptionAction; 035import java.util.Locale; 036import java.util.concurrent.CancellationException; 037import java.util.concurrent.ExecutionException; 038import java.util.concurrent.Executor; 039import java.util.concurrent.Future; 040import java.util.concurrent.ScheduledFuture; 041import java.util.concurrent.TimeUnit; 042import java.util.concurrent.TimeoutException; 043import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 044import java.util.concurrent.locks.LockSupport; 045import java.util.logging.Level; 046import java.util.logging.Logger; 047import javax.annotation.CheckForNull; 048import org.checkerframework.checker.nullness.qual.Nullable; 049 050/** 051 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 052 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 053 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 054 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, 055 * com.google.common.base.Function, java.util.concurrent.Executor) Futures.transform} and {@link 056 * Futures#catching(ListenableFuture, Class, com.google.common.base.Function, 057 * java.util.concurrent.Executor) Futures.catching}. 058 * 059 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 060 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 061 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 062 * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses 063 * should rarely override other methods. 064 * 065 * @author Sven Mawson 066 * @author Luke Sandberg 067 * @since 1.0 068 */ 069@SuppressWarnings({ 070 "ShortCircuitBoolean", // we use non-short circuiting comparisons intentionally 071 "nullness", // TODO(b/147136275): Remove once our checker understands & and |. 072}) 073@GwtCompatible(emulated = true) 074@ReflectionSupport(value = ReflectionSupport.Level.FULL) 075@ElementTypesAreNonnullByDefault 076public abstract class AbstractFuture<V extends @Nullable Object> extends InternalFutureFailureAccess 077 implements ListenableFuture<V> { 078 // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || 079 080 static final boolean GENERATE_CANCELLATION_CAUSES; 081 082 static { 083 // System.getProperty may throw if the security policy does not permit access. 084 boolean generateCancellationCauses; 085 try { 086 generateCancellationCauses = 087 Boolean.parseBoolean( 088 System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); 089 } catch (SecurityException e) { 090 generateCancellationCauses = false; 091 } 092 GENERATE_CANCELLATION_CAUSES = generateCancellationCauses; 093 } 094 095 /** 096 * Tag interface marking trusted subclasses. This enables some optimizations. The implementation 097 * of this interface must also be an AbstractFuture and must not override or expose for overriding 098 * any of the public methods of ListenableFuture. 099 */ 100 interface Trusted<V extends @Nullable Object> extends ListenableFuture<V> {} 101 102 /** 103 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 104 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 105 */ 106 abstract static class TrustedFuture<V extends @Nullable Object> extends AbstractFuture<V> 107 implements Trusted<V> { 108 @CanIgnoreReturnValue 109 @Override 110 @ParametricNullness 111 public final V get() throws InterruptedException, ExecutionException { 112 return super.get(); 113 } 114 115 @CanIgnoreReturnValue 116 @Override 117 @ParametricNullness 118 public final V get(long timeout, TimeUnit unit) 119 throws InterruptedException, ExecutionException, TimeoutException { 120 return super.get(timeout, unit); 121 } 122 123 @Override 124 public final boolean isDone() { 125 return super.isDone(); 126 } 127 128 @Override 129 public final boolean isCancelled() { 130 return super.isCancelled(); 131 } 132 133 @Override 134 public final void addListener(Runnable listener, Executor executor) { 135 super.addListener(listener, executor); 136 } 137 138 @CanIgnoreReturnValue 139 @Override 140 public final boolean cancel(boolean mayInterruptIfRunning) { 141 return super.cancel(mayInterruptIfRunning); 142 } 143 } 144 145 // Logger to log exceptions caught when running listeners. 146 private static final Logger log = Logger.getLogger(AbstractFuture.class.getName()); 147 148 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 149 // blocking. This value is what AbstractQueuedSynchronizer uses. 150 private static final long SPIN_THRESHOLD_NANOS = 1000L; 151 152 private static final AtomicHelper ATOMIC_HELPER; 153 154 static { 155 AtomicHelper helper; 156 Throwable thrownUnsafeFailure = null; 157 Throwable thrownAtomicReferenceFieldUpdaterFailure = null; 158 159 try { 160 helper = new UnsafeAtomicHelper(); 161 } catch (RuntimeException | Error unsafeFailure) { 162 thrownUnsafeFailure = unsafeFailure; 163 // catch absolutely everything and fall through to our 'SafeAtomicHelper' 164 // The access control checks that ARFU does means the caller class has to be AbstractFuture 165 // instead of SafeAtomicHelper, so we annoyingly define these here 166 try { 167 helper = 168 new SafeAtomicHelper( 169 newUpdater(Waiter.class, Thread.class, "thread"), 170 newUpdater(Waiter.class, Waiter.class, "next"), 171 newUpdater(AbstractFuture.class, Waiter.class, "waiters"), 172 newUpdater(AbstractFuture.class, Listener.class, "listeners"), 173 newUpdater(AbstractFuture.class, Object.class, "value")); 174 } catch (RuntimeException | Error atomicReferenceFieldUpdaterFailure) { 175 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 176 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 177 // For these users fallback to a suboptimal implementation, based on synchronized. This will 178 // be a definite performance hit to those users. 179 thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure; 180 helper = new SynchronizedHelper(); 181 } 182 } 183 ATOMIC_HELPER = helper; 184 185 // Prevent rare disastrous classloading in first call to LockSupport.park. 186 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 187 @SuppressWarnings("unused") 188 Class<?> ensureLoaded = LockSupport.class; 189 190 // Log after all static init is finished; if an installed logger uses any Futures methods, it 191 // shouldn't break in cases where reflection is missing/broken. 192 if (thrownAtomicReferenceFieldUpdaterFailure != null) { 193 log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", thrownUnsafeFailure); 194 log.log( 195 Level.SEVERE, "SafeAtomicHelper is broken!", thrownAtomicReferenceFieldUpdaterFailure); 196 } 197 } 198 199 /** Waiter links form a Treiber stack, in the {@link #waiters} field. */ 200 private static final class Waiter { 201 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 202 203 @CheckForNull volatile Thread thread; 204 @CheckForNull volatile Waiter next; 205 206 /** 207 * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 208 * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 209 */ 210 Waiter(boolean unused) {} 211 212 Waiter() { 213 // avoid volatile write, write is made visible by subsequent CAS on waiters field 214 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 215 } 216 217 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 218 // field. 219 void setNext(@CheckForNull Waiter next) { 220 ATOMIC_HELPER.putNext(this, next); 221 } 222 223 void unpark() { 224 // This is racy with removeWaiter. The consequence of the race is that we may spuriously call 225 // unpark even though the thread has already removed itself from the list. But even if we did 226 // use a CAS, that race would still exist (it would just be ever so slightly smaller). 227 Thread w = thread; 228 if (w != null) { 229 thread = null; 230 LockSupport.unpark(w); 231 } 232 } 233 } 234 235 /** 236 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 237 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved 238 * by two things. 239 * 240 * <ul> 241 * <li>This is only called when a waiting thread times out or is interrupted. Both of which 242 * should be rare. 243 * <li>The waiters list should be very short. 244 * </ul> 245 */ 246 private void removeWaiter(Waiter node) { 247 node.thread = null; // mark as 'deleted' 248 restart: 249 while (true) { 250 Waiter pred = null; 251 Waiter curr = waiters; 252 if (curr == Waiter.TOMBSTONE) { 253 return; // give up if someone is calling complete 254 } 255 Waiter succ; 256 while (curr != null) { 257 succ = curr.next; 258 if (curr.thread != null) { // we aren't unlinking this node, update pred. 259 pred = curr; 260 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 261 pred.next = succ; 262 if (pred.thread == null) { // We raced with another node that unlinked pred. Restart. 263 continue restart; 264 } 265 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 266 continue restart; // We raced with an add or complete 267 } 268 curr = succ; 269 } 270 break; 271 } 272 } 273 274 /** Listeners also form a stack through the {@link #listeners} field. */ 275 private static final class Listener { 276 static final Listener TOMBSTONE = new Listener(); 277 @CheckForNull // null only for TOMBSTONE 278 final Runnable task; 279 @CheckForNull // null only for TOMBSTONE 280 final Executor executor; 281 282 // writes to next are made visible by subsequent CAS's on the listeners field 283 @CheckForNull Listener next; 284 285 Listener(Runnable task, Executor executor) { 286 this.task = task; 287 this.executor = executor; 288 } 289 290 Listener() { 291 this.task = null; 292 this.executor = null; 293 } 294 } 295 296 /** A special value to represent {@code null}. */ 297 private static final Object NULL = new Object(); 298 299 /** A special value to represent failure, when {@link #setException} is called successfully. */ 300 private static final class Failure { 301 static final Failure FALLBACK_INSTANCE = 302 new Failure( 303 new Throwable("Failure occurred while trying to finish a future.") { 304 @Override 305 public synchronized Throwable fillInStackTrace() { 306 return this; // no stack trace 307 } 308 }); 309 final Throwable exception; 310 311 Failure(Throwable exception) { 312 this.exception = checkNotNull(exception); 313 } 314 } 315 316 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 317 private static final class Cancellation { 318 // constants to use when GENERATE_CANCELLATION_CAUSES = false 319 @CheckForNull static final Cancellation CAUSELESS_INTERRUPTED; 320 @CheckForNull static final Cancellation CAUSELESS_CANCELLED; 321 322 static { 323 if (GENERATE_CANCELLATION_CAUSES) { 324 CAUSELESS_CANCELLED = null; 325 CAUSELESS_INTERRUPTED = null; 326 } else { 327 CAUSELESS_CANCELLED = new Cancellation(false, null); 328 CAUSELESS_INTERRUPTED = new Cancellation(true, null); 329 } 330 } 331 332 final boolean wasInterrupted; 333 @CheckForNull final Throwable cause; 334 335 Cancellation(boolean wasInterrupted, @CheckForNull Throwable cause) { 336 this.wasInterrupted = wasInterrupted; 337 this.cause = cause; 338 } 339 } 340 341 /** A special value that encodes the 'setFuture' state. */ 342 private static final class SetFuture<V extends @Nullable Object> implements Runnable { 343 final AbstractFuture<V> owner; 344 final ListenableFuture<? extends V> future; 345 346 SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) { 347 this.owner = owner; 348 this.future = future; 349 } 350 351 @Override 352 public void run() { 353 if (owner.value != this) { 354 // nothing to do, we must have been cancelled, don't bother inspecting the future. 355 return; 356 } 357 Object valueToSet = getFutureValue(future); 358 if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { 359 complete( 360 owner, 361 /* 362 * Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so 363 * don't invoke interruptTask. 364 */ 365 false); 366 } 367 } 368 } 369 370 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 371 // available. 372 /** 373 * This field encodes the current state of the future. 374 * 375 * <p>The valid values are: 376 * 377 * <ul> 378 * <li>{@code null} initial state, nothing has happened. 379 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 380 * <li>{@link Failure} terminal state, {@code setException} was called. 381 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 382 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 383 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null 384 * argument. 385 * </ul> 386 */ 387 @CheckForNull private volatile Object value; 388 389 /** All listeners. */ 390 @CheckForNull private volatile Listener listeners; 391 392 /** All waiting threads. */ 393 @CheckForNull private volatile Waiter waiters; 394 395 /** Constructor for use by subclasses. */ 396 protected AbstractFuture() {} 397 398 // Gets and Timed Gets 399 // 400 // * Be responsive to interruption 401 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 402 // waiters field. 403 // * Future completion is defined by when #value becomes non-null/non SetFuture 404 // * Future completion can be observed if the waiters field contains a TOMBSTONE 405 406 // Timed Get 407 // There are a few design constraints to consider 408 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 409 // have observed 12 micros on 64-bit linux systems to wake up a parked thread). So if the 410 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 411 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 412 // similar purposes. 413 // * We want to behave reasonably for timeouts of 0 414 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 415 // system scheduling and as such we could either miss our deadline, or unpark() could be delayed 416 // so that it looks like we timed out even though we didn't. For comparison FutureTask respects 417 // completion preferably and AQS is non-deterministic (depends on where in the queue the waiter 418 // is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node 419 // and we could use that to make a decision about whether or not we timed out prior to being 420 // unparked. 421 422 /** 423 * {@inheritDoc} 424 * 425 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 426 * current thread is interrupted during the call, even if the value is already available. 427 * 428 * @throws CancellationException {@inheritDoc} 429 */ 430 @CanIgnoreReturnValue 431 @Override 432 @ParametricNullness 433 public V get(long timeout, TimeUnit unit) 434 throws InterruptedException, TimeoutException, ExecutionException { 435 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop 436 // at the bottom and throw a timeoutexception. 437 final long timeoutNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit. 438 long remainingNanos = timeoutNanos; 439 if (Thread.interrupted()) { 440 throw new InterruptedException(); 441 } 442 Object localValue = value; 443 if (localValue != null & !(localValue instanceof SetFuture)) { 444 return getDoneValue(localValue); 445 } 446 // we delay calling nanoTime until we know we will need to either park or spin 447 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 448 long_wait_loop: 449 if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 450 Waiter oldHead = waiters; 451 if (oldHead != Waiter.TOMBSTONE) { 452 Waiter node = new Waiter(); 453 do { 454 node.setNext(oldHead); 455 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 456 while (true) { 457 OverflowAvoidingLockSupport.parkNanos(this, remainingNanos); 458 // Check interruption first, if we woke up due to interruption we need to honor that. 459 if (Thread.interrupted()) { 460 removeWaiter(node); 461 throw new InterruptedException(); 462 } 463 464 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 465 // wakeup 466 localValue = value; 467 if (localValue != null & !(localValue instanceof SetFuture)) { 468 return getDoneValue(localValue); 469 } 470 471 // timed out? 472 remainingNanos = endNanos - System.nanoTime(); 473 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 474 // Remove the waiter, one way or another we are done parking this thread. 475 removeWaiter(node); 476 break long_wait_loop; // jump down to the busy wait loop 477 } 478 } 479 } 480 oldHead = waiters; // re-read and loop. 481 } while (oldHead != Waiter.TOMBSTONE); 482 } 483 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 484 // waiter. 485 // requireNonNull is safe because value is always set before TOMBSTONE. 486 return getDoneValue(requireNonNull(value)); 487 } 488 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the 489 // waiters list 490 while (remainingNanos > 0) { 491 localValue = value; 492 if (localValue != null & !(localValue instanceof SetFuture)) { 493 return getDoneValue(localValue); 494 } 495 if (Thread.interrupted()) { 496 throw new InterruptedException(); 497 } 498 remainingNanos = endNanos - System.nanoTime(); 499 } 500 501 String futureToString = toString(); 502 final String unitString = unit.toString().toLowerCase(Locale.ROOT); 503 String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT); 504 // Only report scheduling delay if larger than our spin threshold - otherwise it's just noise 505 if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) { 506 // We over-waited for our timeout. 507 message += " (plus "; 508 long overWaitNanos = -remainingNanos; 509 long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS); 510 long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits); 511 boolean shouldShowExtraNanos = 512 overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS; 513 if (overWaitUnits > 0) { 514 message += overWaitUnits + " " + unitString; 515 if (shouldShowExtraNanos) { 516 message += ","; 517 } 518 message += " "; 519 } 520 if (shouldShowExtraNanos) { 521 message += overWaitLeftoverNanos + " nanoseconds "; 522 } 523 524 message += "delay)"; 525 } 526 // It's confusing to see a completed future in a timeout message; if isDone() returns false, 527 // then we know it must have given a pending toString value earlier. If not, then the future 528 // completed after the timeout expired, and the message might be success. 529 if (isDone()) { 530 throw new TimeoutException(message + " but future completed as timeout expired"); 531 } 532 throw new TimeoutException(message + " for " + futureToString); 533 } 534 535 /** 536 * {@inheritDoc} 537 * 538 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 539 * current thread is interrupted during the call, even if the value is already available. 540 * 541 * @throws CancellationException {@inheritDoc} 542 */ 543 @CanIgnoreReturnValue 544 @Override 545 @ParametricNullness 546 public V get() throws InterruptedException, ExecutionException { 547 if (Thread.interrupted()) { 548 throw new InterruptedException(); 549 } 550 Object localValue = value; 551 if (localValue != null & !(localValue instanceof SetFuture)) { 552 return getDoneValue(localValue); 553 } 554 Waiter oldHead = waiters; 555 if (oldHead != Waiter.TOMBSTONE) { 556 Waiter node = new Waiter(); 557 do { 558 node.setNext(oldHead); 559 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 560 // we are on the stack, now wait for completion. 561 while (true) { 562 LockSupport.park(this); 563 // Check interruption first, if we woke up due to interruption we need to honor that. 564 if (Thread.interrupted()) { 565 removeWaiter(node); 566 throw new InterruptedException(); 567 } 568 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 569 // wakeup 570 localValue = value; 571 if (localValue != null & !(localValue instanceof SetFuture)) { 572 return getDoneValue(localValue); 573 } 574 } 575 } 576 oldHead = waiters; // re-read and loop. 577 } while (oldHead != Waiter.TOMBSTONE); 578 } 579 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 580 // waiter. 581 // requireNonNull is safe because value is always set before TOMBSTONE. 582 return getDoneValue(requireNonNull(value)); 583 } 584 585 /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ 586 @ParametricNullness 587 private V getDoneValue(Object obj) throws ExecutionException { 588 // While this seems like it might be too branch-y, simple benchmarking proves it to be 589 // unmeasurable (comparing done AbstractFutures with immediateFuture) 590 if (obj instanceof Cancellation) { 591 throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); 592 } else if (obj instanceof Failure) { 593 throw new ExecutionException(((Failure) obj).exception); 594 } else if (obj == NULL) { 595 /* 596 * It's safe to return null because we would only have stored it in the first place if it were 597 * a valid value for V. 598 */ 599 return uncheckedNull(); 600 } else { 601 @SuppressWarnings("unchecked") // this is the only other option 602 V asV = (V) obj; 603 return asV; 604 } 605 } 606 607 @Override 608 public boolean isDone() { 609 final Object localValue = value; 610 return localValue != null & !(localValue instanceof SetFuture); 611 } 612 613 @Override 614 public boolean isCancelled() { 615 final Object localValue = value; 616 return localValue instanceof Cancellation; 617 } 618 619 /** 620 * {@inheritDoc} 621 * 622 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain 623 * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate 624 * {@code Future} that was supplied in the {@code setFuture} call. 625 * 626 * <p>Rather than override this method to perform additional cancellation work or cleanup, 627 * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link 628 * #wasInterrupted} as necessary. This ensures that the work is done even if the future is 629 * cancelled without a call to {@code cancel}, such as by calling {@code 630 * setFuture(cancelledFuture)}. 631 * 632 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 633 * acquire other locks, risking deadlocks. 634 */ 635 @CanIgnoreReturnValue 636 @Override 637 public boolean cancel(boolean mayInterruptIfRunning) { 638 Object localValue = value; 639 boolean rValue = false; 640 if (localValue == null | localValue instanceof SetFuture) { 641 // Try to delay allocating the exception. At this point we may still lose the CAS, but it is 642 // certainly less likely. 643 Object valueToSet = 644 GENERATE_CANCELLATION_CAUSES 645 ? new Cancellation( 646 mayInterruptIfRunning, new CancellationException("Future.cancel() was called.")) 647 /* 648 * requireNonNull is safe because we've initialized these if 649 * !GENERATE_CANCELLATION_CAUSES. 650 * 651 * TODO(cpovirk): Maybe it would be cleaner to define a CancellationSupplier interface 652 * with two implementations, one that contains causeless Cancellation instances and 653 * the other of which creates new Cancellation instances each time it's called? Yet 654 * another alternative is to fill in a non-null value for each of the fields no matter 655 * what and to just not use it if !GENERATE_CANCELLATION_CAUSES. 656 */ 657 : requireNonNull( 658 mayInterruptIfRunning 659 ? Cancellation.CAUSELESS_INTERRUPTED 660 : Cancellation.CAUSELESS_CANCELLED); 661 AbstractFuture<?> abstractFuture = this; 662 while (true) { 663 if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { 664 rValue = true; 665 complete(abstractFuture, mayInterruptIfRunning); 666 if (localValue instanceof SetFuture) { 667 // propagate cancellation to the future set in setfuture, this is racy, and we don't 668 // care if we are successful or not. 669 ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; 670 if (futureToPropagateTo instanceof Trusted) { 671 // If the future is a TrustedFuture then we specifically avoid calling cancel() 672 // this has 2 benefits 673 // 1. for long chains of futures strung together with setFuture we consume less stack 674 // 2. we avoid allocating Cancellation objects at every level of the cancellation 675 // chain 676 // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and 677 // does nothing but delegate to this method. 678 AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo; 679 localValue = trusted.value; 680 if (localValue == null | localValue instanceof SetFuture) { 681 abstractFuture = trusted; 682 continue; // loop back up and try to complete the new future 683 } 684 } else { 685 // not a TrustedFuture, call cancel directly. 686 futureToPropagateTo.cancel(mayInterruptIfRunning); 687 } 688 } 689 break; 690 } 691 // obj changed, reread 692 localValue = abstractFuture.value; 693 if (!(localValue instanceof SetFuture)) { 694 // obj cannot be null at this point, because value can only change from null to non-null. 695 // So if value changed (and it did since we lost the CAS), then it cannot be null and 696 // since it isn't a SetFuture, then the future must be done and we should exit the loop 697 break; 698 } 699 } 700 } 701 return rValue; 702 } 703 704 /** 705 * Subclasses can override this method to implement interruption of the future's computation. The 706 * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}. 707 * 708 * <p>The default implementation does nothing. 709 * 710 * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, consulting 711 * {@link #wasInterrupted} to decide whether to interrupt your task. 712 * 713 * @since 10.0 714 */ 715 protected void interruptTask() {} 716 717 /** 718 * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code 719 * true}. 720 * 721 * @since 14.0 722 */ 723 protected final boolean wasInterrupted() { 724 final Object localValue = value; 725 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 726 } 727 728 /** 729 * {@inheritDoc} 730 * 731 * @since 10.0 732 */ 733 @Override 734 public void addListener(Runnable listener, Executor executor) { 735 checkNotNull(listener, "Runnable was null."); 736 checkNotNull(executor, "Executor was null."); 737 // Checking isDone and listeners != TOMBSTONE may seem redundant, but our contract for 738 // addListener says that listeners execute 'immediate' if the future isDone(). However, our 739 // protocol for completing a future is to assign the value field (which sets isDone to true) and 740 // then to release waiters, followed by executing afterDone(), followed by releasing listeners. 741 // That means that it is possible to observe that the future isDone and that your listeners 742 // don't execute 'immediately'. By checking isDone here we avoid that. 743 // A corollary to all that is that we don't need to check isDone inside the loop because if we 744 // get into the loop we know that we weren't done when we entered and therefore we aren't under 745 // an obligation to execute 'immediately'. 746 if (!isDone()) { 747 Listener oldHead = listeners; 748 if (oldHead != Listener.TOMBSTONE) { 749 Listener newNode = new Listener(listener, executor); 750 do { 751 newNode.next = oldHead; 752 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 753 return; 754 } 755 oldHead = listeners; // re-read 756 } while (oldHead != Listener.TOMBSTONE); 757 } 758 } 759 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 760 // the listener. 761 executeListener(listener, executor); 762 } 763 764 /** 765 * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or 766 * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns, 767 * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was 768 * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code 769 * Future} may have previously been set asynchronously, in which case its result may not be known 770 * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 771 * method, only by a call to {@link #cancel}. 772 * 773 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 774 * acquire other locks, risking deadlocks. 775 * 776 * @param value the value to be used as the result 777 * @return true if the attempt was accepted, completing the {@code Future} 778 */ 779 @CanIgnoreReturnValue 780 protected boolean set(@ParametricNullness V value) { 781 Object valueToSet = value == null ? NULL : value; 782 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 783 complete(this, /*callInterruptTask=*/ false); 784 return true; 785 } 786 return false; 787 } 788 789 /** 790 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 791 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 792 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> 793 * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the 794 * {@code Future} may have previously been set asynchronously, in which case its result may not be 795 * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 796 * method, only by a call to {@link #cancel}. 797 * 798 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 799 * acquire other locks, risking deadlocks. 800 * 801 * @param throwable the exception to be used as the failed result 802 * @return true if the attempt was accepted, completing the {@code Future} 803 */ 804 @CanIgnoreReturnValue 805 protected boolean setException(Throwable throwable) { 806 Object valueToSet = new Failure(checkNotNull(throwable)); 807 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 808 complete(this, /*callInterruptTask=*/ false); 809 return true; 810 } 811 return false; 812 } 813 814 /** 815 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 816 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 817 * (including "set asynchronously," defined below). 818 * 819 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call 820 * is accepted, then this future is guaranteed to have been completed with the supplied future by 821 * the time this method returns. If the supplied future is not done and the call is accepted, then 822 * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known, 823 * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}. 824 * 825 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 826 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 827 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 828 * Future}. 829 * 830 * <p>Note that, even if the supplied future is cancelled and it causes this future to complete, 831 * it will never trigger interruption behavior. In particular, it will not cause this future to 832 * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not 833 * return {@code true}. 834 * 835 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 836 * acquire other locks, risking deadlocks. 837 * 838 * @param future the future to delegate to 839 * @return true if the attempt was accepted, indicating that the {@code Future} was not previously 840 * cancelled or set. 841 * @since 19.0 842 */ 843 @CanIgnoreReturnValue 844 protected boolean setFuture(ListenableFuture<? extends V> future) { 845 checkNotNull(future); 846 Object localValue = value; 847 if (localValue == null) { 848 if (future.isDone()) { 849 Object value = getFutureValue(future); 850 if (ATOMIC_HELPER.casValue(this, null, value)) { 851 complete( 852 this, 853 /* 854 * Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so 855 * don't invoke interruptTask. 856 */ 857 false); 858 return true; 859 } 860 return false; 861 } 862 SetFuture<V> valueToSet = new SetFuture<V>(this, future); 863 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 864 // the listener is responsible for calling completeWithFuture, directExecutor is appropriate 865 // since all we are doing is unpacking a completed future which should be fast. 866 try { 867 future.addListener(valueToSet, DirectExecutor.INSTANCE); 868 } catch (RuntimeException | Error t) { 869 // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this 870 // must have been caused by addListener itself. The most likely explanation is a 871 // misconfigured mock. Try to switch to Failure. 872 Failure failure; 873 try { 874 failure = new Failure(t); 875 } catch (RuntimeException | Error oomMostLikely) { 876 failure = Failure.FALLBACK_INSTANCE; 877 } 878 // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok. 879 boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); 880 } 881 return true; 882 } 883 localValue = value; // we lost the cas, fall through and maybe cancel 884 } 885 // The future has already been set to something. If it is cancellation we should cancel the 886 // incoming future. 887 if (localValue instanceof Cancellation) { 888 // we don't care if it fails, this is best-effort. 889 future.cancel(((Cancellation) localValue).wasInterrupted); 890 } 891 return false; 892 } 893 894 /** 895 * Returns a value that satisfies the contract of the {@link #value} field based on the state of 896 * given future. 897 * 898 * <p>This is approximately the inverse of {@link #getDoneValue(Object)} 899 */ 900 private static Object getFutureValue(ListenableFuture<?> future) { 901 if (future instanceof Trusted) { 902 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 903 // override .get() (since it is final) and therefore this is equivalent to calling .get() 904 // and unpacking the exceptions like we do below (just much faster because it is a single 905 // field read instead of a read, several branches and possibly creating exceptions). 906 Object v = ((AbstractFuture<?>) future).value; 907 if (v instanceof Cancellation) { 908 // If the other future was interrupted, clear the interrupted bit while preserving the cause 909 // this will make it consistent with how non-trustedfutures work which cannot propagate the 910 // wasInterrupted bit 911 Cancellation c = (Cancellation) v; 912 if (c.wasInterrupted) { 913 v = 914 c.cause != null 915 ? new Cancellation(/* wasInterrupted= */ false, c.cause) 916 : Cancellation.CAUSELESS_CANCELLED; 917 } 918 } 919 // requireNonNull is safe as long as we call this method only on completed futures. 920 return requireNonNull(v); 921 } 922 if (future instanceof InternalFutureFailureAccess) { 923 Throwable throwable = 924 InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess) future); 925 if (throwable != null) { 926 return new Failure(throwable); 927 } 928 } 929 boolean wasCancelled = future.isCancelled(); 930 // Don't allocate a CancellationException if it's not necessary 931 if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) { 932 /* 933 * requireNonNull is safe because we've initialized CAUSELESS_CANCELLED if 934 * !GENERATE_CANCELLATION_CAUSES. 935 */ 936 return requireNonNull(Cancellation.CAUSELESS_CANCELLED); 937 } 938 // Otherwise calculate the value by calling .get() 939 try { 940 Object v = getUninterruptibly(future); 941 if (wasCancelled) { 942 return new Cancellation( 943 false, 944 new IllegalArgumentException( 945 "get() did not throw CancellationException, despite reporting " 946 + "isCancelled() == true: " 947 + future)); 948 } 949 return v == null ? NULL : v; 950 } catch (ExecutionException exception) { 951 if (wasCancelled) { 952 return new Cancellation( 953 false, 954 new IllegalArgumentException( 955 "get() did not throw CancellationException, despite reporting " 956 + "isCancelled() == true: " 957 + future, 958 exception)); 959 } 960 return new Failure(exception.getCause()); 961 } catch (CancellationException cancellation) { 962 if (!wasCancelled) { 963 return new Failure( 964 new IllegalArgumentException( 965 "get() threw CancellationException, despite reporting isCancelled() == false: " 966 + future, 967 cancellation)); 968 } 969 return new Cancellation(false, cancellation); 970 } catch (RuntimeException | Error t) { 971 return new Failure(t); 972 } 973 } 974 975 /** 976 * An inlined private copy of {@link Uninterruptibles#getUninterruptibly} used to break an 977 * internal dependency on other /util/concurrent classes. 978 */ 979 @ParametricNullness 980 private static <V extends @Nullable Object> V getUninterruptibly(Future<V> future) 981 throws ExecutionException { 982 boolean interrupted = false; 983 try { 984 while (true) { 985 try { 986 return future.get(); 987 } catch (InterruptedException e) { 988 interrupted = true; 989 } 990 } 991 } finally { 992 if (interrupted) { 993 Thread.currentThread().interrupt(); 994 } 995 } 996 } 997 998 /** Unblocks all threads and runs all listeners. */ 999 private static void complete(AbstractFuture<?> param, boolean callInterruptTask) { 1000 // Declare a "true" local variable so that the Checker Framework will infer nullness. 1001 AbstractFuture<?> future = param; 1002 1003 Listener next = null; 1004 outer: 1005 while (true) { 1006 future.releaseWaiters(); 1007 /* 1008 * We call interruptTask() immediately before afterDone() so that migrating between the two 1009 * can be a no-op. 1010 */ 1011 if (callInterruptTask) { 1012 future.interruptTask(); 1013 /* 1014 * Interruption doesn't propagate through a SetFuture chain (see getFutureValue), so don't 1015 * invoke interruptTask on any subsequent futures. 1016 */ 1017 callInterruptTask = false; 1018 } 1019 // We call this before the listeners in order to avoid needing to manage a separate stack data 1020 // structure for them. Also, some implementations rely on this running prior to listeners 1021 // so that the cleanup work is visible to listeners. 1022 // afterDone() should be generally fast and only used for cleanup work... but in theory can 1023 // also be recursive and create StackOverflowErrors 1024 future.afterDone(); 1025 // push the current set of listeners onto next 1026 next = future.clearListeners(next); 1027 future = null; 1028 while (next != null) { 1029 Listener curr = next; 1030 next = next.next; 1031 /* 1032 * requireNonNull is safe because the listener stack never contains TOMBSTONE until after 1033 * clearListeners. 1034 */ 1035 Runnable task = requireNonNull(curr.task); 1036 if (task instanceof SetFuture) { 1037 SetFuture<?> setFuture = (SetFuture<?>) task; 1038 // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long 1039 // chains of SetFutures 1040 // Handling this special case is important because there is no way to pass an executor to 1041 // setFuture, so a user couldn't break the chain by doing this themselves. It is also 1042 // potentially common if someone writes a recursive Futures.transformAsync transformer. 1043 future = setFuture.owner; 1044 if (future.value == setFuture) { 1045 Object valueToSet = getFutureValue(setFuture.future); 1046 if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { 1047 continue outer; 1048 } 1049 } 1050 // otherwise the future we were trying to set is already done. 1051 } else { 1052 /* 1053 * requireNonNull is safe because the listener stack never contains TOMBSTONE until after 1054 * clearListeners. 1055 */ 1056 executeListener(task, requireNonNull(curr.executor)); 1057 } 1058 } 1059 break; 1060 } 1061 } 1062 1063 /** 1064 * Callback method that is called exactly once after the future is completed. 1065 * 1066 * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. 1067 * 1068 * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is 1069 * intended for very lightweight cleanup work, for example, timing statistics or clearing fields. 1070 * If your task does anything heavier consider, just using a listener with an executor. 1071 * 1072 * @since 20.0 1073 */ 1074 @Beta 1075 @ForOverride 1076 protected void afterDone() {} 1077 1078 // TODO(b/114236866): Inherit doc from InternalFutureFailureAccess. Also, -link to its URL. 1079 /** 1080 * Usually returns {@code null} but, if this {@code Future} has failed, may <i>optionally</i> 1081 * return the cause of the failure. "Failure" means specifically "completed with an exception"; it 1082 * does not include "was cancelled." To be explicit: If this method returns a non-null value, 1083 * then: 1084 * 1085 * <ul> 1086 * <li>{@code isDone()} must return {@code true} 1087 * <li>{@code isCancelled()} must return {@code false} 1088 * <li>{@code get()} must not block, and it must throw an {@code ExecutionException} with the 1089 * return value of this method as its cause 1090 * </ul> 1091 * 1092 * <p>This method is {@code protected} so that classes like {@code 1093 * com.google.common.util.concurrent.SettableFuture} do not expose it to their users as an 1094 * instance method. In the unlikely event that you need to call this method, call {@link 1095 * InternalFutures#tryInternalFastPathGetFailure(InternalFutureFailureAccess)}. 1096 * 1097 * @since 27.0 1098 */ 1099 @Override 1100 /* 1101 * We should annotate the superclass, InternalFutureFailureAccess, to say that its copy of this 1102 * method returns @Nullable, too. However, we're not sure if we want to make any changes to that 1103 * class, since it's in a separate artifact that we planned to release only a single version of. 1104 */ 1105 @CheckForNull 1106 protected final Throwable tryInternalFastPathGetFailure() { 1107 if (this instanceof Trusted) { 1108 Object obj = value; 1109 if (obj instanceof Failure) { 1110 return ((Failure) obj).exception; 1111 } 1112 } 1113 return null; 1114 } 1115 1116 /** 1117 * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts) 1118 * the given future (if available). 1119 */ 1120 final void maybePropagateCancellationTo(@CheckForNull Future<?> related) { 1121 if (related != null & isCancelled()) { 1122 related.cancel(wasInterrupted()); 1123 } 1124 } 1125 1126 /** Releases all threads in the {@link #waiters} list, and clears the list. */ 1127 private void releaseWaiters() { 1128 Waiter head = ATOMIC_HELPER.gasWaiters(this, Waiter.TOMBSTONE); 1129 for (Waiter currentWaiter = head; currentWaiter != null; currentWaiter = currentWaiter.next) { 1130 currentWaiter.unpark(); 1131 } 1132 } 1133 1134 /** 1135 * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently 1136 * added first. 1137 */ 1138 @CheckForNull 1139 private Listener clearListeners(@CheckForNull Listener onto) { 1140 // We need to 1141 // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that 1142 // to synchronize with us 1143 // 2. reverse the linked list, because despite our rather clear contract, people depend on us 1144 // executing listeners in the order they were added 1145 // 3. push all the items onto 'onto' and return the new head of the stack 1146 Listener head = ATOMIC_HELPER.gasListeners(this, Listener.TOMBSTONE); 1147 Listener reversedList = onto; 1148 while (head != null) { 1149 Listener tmp = head; 1150 head = head.next; 1151 tmp.next = reversedList; 1152 reversedList = tmp; 1153 } 1154 return reversedList; 1155 } 1156 1157 // TODO(user): move parts into a default method on ListenableFuture? 1158 @Override 1159 public String toString() { 1160 // TODO(cpovirk): Presize to something plausible? 1161 StringBuilder builder = new StringBuilder(); 1162 if (getClass().getName().startsWith("com.google.common.util.concurrent.")) { 1163 builder.append(getClass().getSimpleName()); 1164 } else { 1165 builder.append(getClass().getName()); 1166 } 1167 builder.append('@').append(toHexString(identityHashCode(this))).append("[status="); 1168 if (isCancelled()) { 1169 builder.append("CANCELLED"); 1170 } else if (isDone()) { 1171 addDoneString(builder); 1172 } else { 1173 addPendingString(builder); // delegates to addDoneString if future completes midway 1174 } 1175 return builder.append("]").toString(); 1176 } 1177 1178 /** 1179 * Provide a human-readable explanation of why this future has not yet completed. 1180 * 1181 * @return null if an explanation cannot be provided (e.g. because the future is done). 1182 * @since 23.0 1183 */ 1184 @CheckForNull 1185 protected String pendingToString() { 1186 // TODO(diamondm) consider moving this into addPendingString so it's always in the output 1187 if (this instanceof ScheduledFuture) { 1188 return "remaining delay=[" 1189 + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS) 1190 + " ms]"; 1191 } 1192 return null; 1193 } 1194 1195 private void addPendingString(StringBuilder builder) { 1196 // Capture current builder length so it can be truncated if this future ends up completing while 1197 // the toString is being calculated 1198 int truncateLength = builder.length(); 1199 1200 builder.append("PENDING"); 1201 1202 Object localValue = value; 1203 if (localValue instanceof SetFuture) { 1204 builder.append(", setFuture=["); 1205 appendUserObject(builder, ((SetFuture) localValue).future); 1206 builder.append("]"); 1207 } else { 1208 String pendingDescription; 1209 try { 1210 pendingDescription = Strings.emptyToNull(pendingToString()); 1211 } catch (RuntimeException | StackOverflowError e) { 1212 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1213 // subclass is implemented with bugs similar to the subclass. 1214 pendingDescription = "Exception thrown from implementation: " + e.getClass(); 1215 } 1216 if (pendingDescription != null) { 1217 builder.append(", info=[").append(pendingDescription).append("]"); 1218 } 1219 } 1220 1221 // The future may complete while calculating the toString, so we check once more to see if the 1222 // future is done 1223 if (isDone()) { 1224 // Truncate anything that was appended before realizing this future is done 1225 builder.delete(truncateLength, builder.length()); 1226 addDoneString(builder); 1227 } 1228 } 1229 1230 private void addDoneString(StringBuilder builder) { 1231 try { 1232 V value = getUninterruptibly(this); 1233 builder.append("SUCCESS, result=["); 1234 appendResultObject(builder, value); 1235 builder.append("]"); 1236 } catch (ExecutionException e) { 1237 builder.append("FAILURE, cause=[").append(e.getCause()).append("]"); 1238 } catch (CancellationException e) { 1239 builder.append("CANCELLED"); // shouldn't be reachable 1240 } catch (RuntimeException e) { 1241 builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]"); 1242 } 1243 } 1244 1245 /** 1246 * Any object can be the result of a Future, and not every object has a reasonable toString() 1247 * implementation. Using a reconstruction of the default Object.toString() prevents OOMs and stack 1248 * overflows, and helps avoid sensitive data inadvertently ending up in exception messages. 1249 */ 1250 private void appendResultObject(StringBuilder builder, @CheckForNull Object o) { 1251 if (o == null) { 1252 builder.append("null"); 1253 } else if (o == this) { 1254 builder.append("this future"); 1255 } else { 1256 builder 1257 .append(o.getClass().getName()) 1258 .append("@") 1259 .append(Integer.toHexString(System.identityHashCode(o))); 1260 } 1261 } 1262 1263 /** Helper for printing user supplied objects into our toString method. */ 1264 private void appendUserObject(StringBuilder builder, @CheckForNull Object o) { 1265 // This is some basic recursion detection for when people create cycles via set/setFuture or 1266 // when deep chains of futures exist resulting in a StackOverflowException. We could detect 1267 // arbitrary cycles using a thread local but this should be a good enough solution (it is also 1268 // what jdk collections do in these cases) 1269 try { 1270 if (o == this) { 1271 builder.append("this future"); 1272 } else { 1273 builder.append(o); 1274 } 1275 } catch (RuntimeException | StackOverflowError e) { 1276 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1277 // user object is implemented with bugs similar to the user object. 1278 builder.append("Exception thrown from implementation: ").append(e.getClass()); 1279 } 1280 } 1281 1282 /** 1283 * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain 1284 * RuntimeException runtime exceptions} thrown by the executor. 1285 */ 1286 private static void executeListener(Runnable runnable, Executor executor) { 1287 try { 1288 executor.execute(runnable); 1289 } catch (RuntimeException e) { 1290 // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if 1291 // we're given a bad one. We only catch RuntimeException because we want Errors to propagate 1292 // up. 1293 log.log( 1294 Level.SEVERE, 1295 "RuntimeException while executing runnable " + runnable + " with executor " + executor, 1296 e); 1297 } 1298 } 1299 1300 private abstract static class AtomicHelper { 1301 /** Non-volatile write of the thread to the {@link Waiter#thread} field. */ 1302 abstract void putThread(Waiter waiter, Thread newValue); 1303 1304 /** Non-volatile write of the waiter to the {@link Waiter#next} field. */ 1305 abstract void putNext(Waiter waiter, @CheckForNull Waiter newValue); 1306 1307 /** Performs a CAS operation on the {@link #waiters} field. */ 1308 abstract boolean casWaiters( 1309 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update); 1310 1311 /** Performs a CAS operation on the {@link #listeners} field. */ 1312 abstract boolean casListeners( 1313 AbstractFuture<?> future, @CheckForNull Listener expect, Listener update); 1314 1315 /** Performs a GAS operation on the {@link #waiters} field. */ 1316 abstract Waiter gasWaiters(AbstractFuture<?> future, Waiter update); 1317 1318 /** Performs a GAS operation on the {@link #listeners} field. */ 1319 abstract Listener gasListeners(AbstractFuture<?> future, Listener update); 1320 1321 /** Performs a CAS operation on the {@link #value} field. */ 1322 abstract boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update); 1323 } 1324 1325 /** 1326 * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. 1327 * 1328 * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot 1329 * be accessed. 1330 */ 1331 @SuppressWarnings("sunapi") 1332 private static final class UnsafeAtomicHelper extends AtomicHelper { 1333 static final sun.misc.Unsafe UNSAFE; 1334 static final long LISTENERS_OFFSET; 1335 static final long WAITERS_OFFSET; 1336 static final long VALUE_OFFSET; 1337 static final long WAITER_THREAD_OFFSET; 1338 static final long WAITER_NEXT_OFFSET; 1339 1340 static { 1341 sun.misc.Unsafe unsafe = null; 1342 try { 1343 unsafe = sun.misc.Unsafe.getUnsafe(); 1344 } catch (SecurityException tryReflectionInstead) { 1345 try { 1346 unsafe = 1347 AccessController.doPrivileged( 1348 new PrivilegedExceptionAction<sun.misc.Unsafe>() { 1349 @Override 1350 public sun.misc.Unsafe run() throws Exception { 1351 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; 1352 for (java.lang.reflect.Field f : k.getDeclaredFields()) { 1353 f.setAccessible(true); 1354 Object x = f.get(null); 1355 if (k.isInstance(x)) { 1356 return k.cast(x); 1357 } 1358 } 1359 throw new NoSuchFieldError("the Unsafe"); 1360 } 1361 }); 1362 } catch (PrivilegedActionException e) { 1363 throw new RuntimeException("Could not initialize intrinsics", e.getCause()); 1364 } 1365 } 1366 try { 1367 Class<?> abstractFuture = AbstractFuture.class; 1368 WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters")); 1369 LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners")); 1370 VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value")); 1371 WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread")); 1372 WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next")); 1373 UNSAFE = unsafe; 1374 } catch (NoSuchFieldException e) { 1375 throw new RuntimeException(e); 1376 } catch (RuntimeException e) { 1377 throw e; 1378 } 1379 } 1380 1381 @Override 1382 void putThread(Waiter waiter, Thread newValue) { 1383 UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue); 1384 } 1385 1386 @Override 1387 void putNext(Waiter waiter, @CheckForNull Waiter newValue) { 1388 UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue); 1389 } 1390 1391 /** Performs a CAS operation on the {@link #waiters} field. */ 1392 @Override 1393 boolean casWaiters( 1394 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update) { 1395 return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update); 1396 } 1397 1398 /** Performs a CAS operation on the {@link #listeners} field. */ 1399 @Override 1400 boolean casListeners(AbstractFuture<?> future, @CheckForNull Listener expect, Listener update) { 1401 return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update); 1402 } 1403 1404 /** Performs a GAS operation on the {@link #listeners} field. */ 1405 @Override 1406 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1407 while (true) { 1408 Listener listener = future.listeners; 1409 if (update == listener) { 1410 return listener; 1411 } 1412 if (casListeners(future, listener, update)) { 1413 return listener; 1414 } 1415 } 1416 } 1417 1418 /** Performs a GAS operation on the {@link #waiters} field. */ 1419 @Override 1420 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1421 while (true) { 1422 Waiter waiter = future.waiters; 1423 if (update == waiter) { 1424 return waiter; 1425 } 1426 if (casWaiters(future, waiter, update)) { 1427 return waiter; 1428 } 1429 } 1430 } 1431 1432 /** Performs a CAS operation on the {@link #value} field. */ 1433 @Override 1434 boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update) { 1435 return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update); 1436 } 1437 } 1438 1439 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 1440 @SuppressWarnings("rawtypes") 1441 private static final class SafeAtomicHelper extends AtomicHelper { 1442 final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; 1443 final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; 1444 final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater; 1445 final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater; 1446 final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater; 1447 1448 SafeAtomicHelper( 1449 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, 1450 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, 1451 AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, 1452 AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, 1453 AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) { 1454 this.waiterThreadUpdater = waiterThreadUpdater; 1455 this.waiterNextUpdater = waiterNextUpdater; 1456 this.waitersUpdater = waitersUpdater; 1457 this.listenersUpdater = listenersUpdater; 1458 this.valueUpdater = valueUpdater; 1459 } 1460 1461 @Override 1462 void putThread(Waiter waiter, Thread newValue) { 1463 waiterThreadUpdater.lazySet(waiter, newValue); 1464 } 1465 1466 @Override 1467 void putNext(Waiter waiter, @CheckForNull Waiter newValue) { 1468 waiterNextUpdater.lazySet(waiter, newValue); 1469 } 1470 1471 @Override 1472 boolean casWaiters( 1473 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update) { 1474 return waitersUpdater.compareAndSet(future, expect, update); 1475 } 1476 1477 @Override 1478 boolean casListeners(AbstractFuture<?> future, @CheckForNull Listener expect, Listener update) { 1479 return listenersUpdater.compareAndSet(future, expect, update); 1480 } 1481 1482 /** Performs a GAS operation on the {@link #listeners} field. */ 1483 @Override 1484 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1485 return listenersUpdater.getAndSet(future, update); 1486 } 1487 1488 /** Performs a GAS operation on the {@link #waiters} field. */ 1489 @Override 1490 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1491 return waitersUpdater.getAndSet(future, update); 1492 } 1493 1494 @Override 1495 boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update) { 1496 return valueUpdater.compareAndSet(future, expect, update); 1497 } 1498 } 1499 1500 /** 1501 * {@link AtomicHelper} based on {@code synchronized} and volatile writes. 1502 * 1503 * <p>This is an implementation of last resort for when certain basic VM features are broken (like 1504 * AtomicReferenceFieldUpdater). 1505 */ 1506 private static final class SynchronizedHelper extends AtomicHelper { 1507 @Override 1508 void putThread(Waiter waiter, Thread newValue) { 1509 waiter.thread = newValue; 1510 } 1511 1512 @Override 1513 void putNext(Waiter waiter, @CheckForNull Waiter newValue) { 1514 waiter.next = newValue; 1515 } 1516 1517 @Override 1518 boolean casWaiters( 1519 AbstractFuture<?> future, @CheckForNull Waiter expect, @CheckForNull Waiter update) { 1520 synchronized (future) { 1521 if (future.waiters == expect) { 1522 future.waiters = update; 1523 return true; 1524 } 1525 return false; 1526 } 1527 } 1528 1529 @Override 1530 boolean casListeners(AbstractFuture<?> future, @CheckForNull Listener expect, Listener update) { 1531 synchronized (future) { 1532 if (future.listeners == expect) { 1533 future.listeners = update; 1534 return true; 1535 } 1536 return false; 1537 } 1538 } 1539 1540 /** Performs a GAS operation on the {@link #listeners} field. */ 1541 @Override 1542 Listener gasListeners(AbstractFuture<?> future, Listener update) { 1543 synchronized (future) { 1544 Listener old = future.listeners; 1545 if (old != update) { 1546 future.listeners = update; 1547 } 1548 return old; 1549 } 1550 } 1551 1552 /** Performs a GAS operation on the {@link #waiters} field. */ 1553 @Override 1554 Waiter gasWaiters(AbstractFuture<?> future, Waiter update) { 1555 synchronized (future) { 1556 Waiter old = future.waiters; 1557 if (old != update) { 1558 future.waiters = update; 1559 } 1560 return old; 1561 } 1562 } 1563 1564 @Override 1565 boolean casValue(AbstractFuture<?> future, @CheckForNull Object expect, Object update) { 1566 synchronized (future) { 1567 if (future.value == expect) { 1568 future.value = update; 1569 return true; 1570 } 1571 return false; 1572 } 1573 } 1574 } 1575 1576 private static CancellationException cancellationExceptionWithCause( 1577 String message, @CheckForNull Throwable cause) { 1578 CancellationException exception = new CancellationException(message); 1579 exception.initCause(cause); 1580 return exception; 1581 } 1582}