001/* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Throwables.throwIfUnchecked; 019import static java.lang.Integer.toHexString; 020import static java.lang.System.identityHashCode; 021import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 022 023import com.google.common.annotations.Beta; 024import com.google.common.annotations.GwtCompatible; 025import com.google.common.base.Strings; 026import com.google.common.util.concurrent.internal.InternalFutureFailureAccess; 027import com.google.common.util.concurrent.internal.InternalFutures; 028import com.google.errorprone.annotations.CanIgnoreReturnValue; 029import com.google.errorprone.annotations.ForOverride; 030import com.google.j2objc.annotations.ReflectionSupport; 031import java.security.AccessController; 032import java.security.PrivilegedActionException; 033import java.security.PrivilegedExceptionAction; 034import java.util.Locale; 035import java.util.concurrent.CancellationException; 036import java.util.concurrent.ExecutionException; 037import java.util.concurrent.Executor; 038import java.util.concurrent.Future; 039import java.util.concurrent.ScheduledFuture; 040import java.util.concurrent.TimeUnit; 041import java.util.concurrent.TimeoutException; 042import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 043import java.util.concurrent.locks.LockSupport; 044import java.util.logging.Level; 045import java.util.logging.Logger; 046import org.checkerframework.checker.nullness.compatqual.NullableDecl; 047 048/** 049 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 050 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 051 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 052 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, 053 * com.google.common.base.Function, java.util.concurrent.Executor) Futures.transform} and {@link 054 * Futures#catching(ListenableFuture, Class, com.google.common.base.Function, 055 * java.util.concurrent.Executor) Futures.catching}. 056 * 057 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 058 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 059 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 060 * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses 061 * should rarely override other methods. 062 * 063 * @author Sven Mawson 064 * @author Luke Sandberg 065 * @since 1.0 066 */ 067@SuppressWarnings("ShortCircuitBoolean") // we use non-short circuiting comparisons intentionally 068@GwtCompatible(emulated = true) 069@ReflectionSupport(value = ReflectionSupport.Level.FULL) 070public abstract class AbstractFuture<V> extends InternalFutureFailureAccess 071 implements ListenableFuture<V> { 072 // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || 073 074 private static final boolean GENERATE_CANCELLATION_CAUSES; 075 076 static { 077 // System.getProperty may throw if the security policy does not permit access. 078 boolean generateCancellationCauses; 079 try { 080 generateCancellationCauses = 081 Boolean.parseBoolean( 082 System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); 083 } catch (SecurityException e) { 084 generateCancellationCauses = false; 085 } 086 GENERATE_CANCELLATION_CAUSES = generateCancellationCauses; 087 } 088 089 /** 090 * Tag interface marking trusted subclasses. This enables some optimizations. The implementation 091 * of this interface must also be an AbstractFuture and must not override or expose for overriding 092 * any of the public methods of ListenableFuture. 093 */ 094 interface Trusted<V> extends ListenableFuture<V> {} 095 096 /** 097 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 098 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 099 */ 100 abstract static class TrustedFuture<V> extends AbstractFuture<V> implements Trusted<V> { 101 @CanIgnoreReturnValue 102 @Override 103 public final V get() throws InterruptedException, ExecutionException { 104 return super.get(); 105 } 106 107 @CanIgnoreReturnValue 108 @Override 109 public final V get(long timeout, TimeUnit unit) 110 throws InterruptedException, ExecutionException, TimeoutException { 111 return super.get(timeout, unit); 112 } 113 114 @Override 115 public final boolean isDone() { 116 return super.isDone(); 117 } 118 119 @Override 120 public final boolean isCancelled() { 121 return super.isCancelled(); 122 } 123 124 @Override 125 public final void addListener(Runnable listener, Executor executor) { 126 super.addListener(listener, executor); 127 } 128 129 @CanIgnoreReturnValue 130 @Override 131 public final boolean cancel(boolean mayInterruptIfRunning) { 132 return super.cancel(mayInterruptIfRunning); 133 } 134 } 135 136 // Logger to log exceptions caught when running listeners. 137 private static final Logger log = Logger.getLogger(AbstractFuture.class.getName()); 138 139 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 140 // blocking. This value is what AbstractQueuedSynchronizer uses. 141 private static final long SPIN_THRESHOLD_NANOS = 1000L; 142 143 private static final AtomicHelper ATOMIC_HELPER; 144 145 static { 146 AtomicHelper helper; 147 Throwable thrownUnsafeFailure = null; 148 Throwable thrownAtomicReferenceFieldUpdaterFailure = null; 149 150 try { 151 helper = new UnsafeAtomicHelper(); 152 } catch (Throwable unsafeFailure) { 153 thrownUnsafeFailure = unsafeFailure; 154 // catch absolutely everything and fall through to our 'SafeAtomicHelper' 155 // The access control checks that ARFU does means the caller class has to be AbstractFuture 156 // instead of SafeAtomicHelper, so we annoyingly define these here 157 try { 158 helper = 159 new SafeAtomicHelper( 160 newUpdater(Waiter.class, Thread.class, "thread"), 161 newUpdater(Waiter.class, Waiter.class, "next"), 162 newUpdater(AbstractFuture.class, Waiter.class, "waiters"), 163 newUpdater(AbstractFuture.class, Listener.class, "listeners"), 164 newUpdater(AbstractFuture.class, Object.class, "value")); 165 } catch (Throwable atomicReferenceFieldUpdaterFailure) { 166 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 167 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 168 // For these users fallback to a suboptimal implementation, based on synchronized. This will 169 // be a definite performance hit to those users. 170 thrownAtomicReferenceFieldUpdaterFailure = atomicReferenceFieldUpdaterFailure; 171 helper = new SynchronizedHelper(); 172 } 173 } 174 ATOMIC_HELPER = helper; 175 176 // Prevent rare disastrous classloading in first call to LockSupport.park. 177 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 178 @SuppressWarnings("unused") 179 Class<?> ensureLoaded = LockSupport.class; 180 181 // Log after all static init is finished; if an installed logger uses any Futures methods, it 182 // shouldn't break in cases where reflection is missing/broken. 183 if (thrownAtomicReferenceFieldUpdaterFailure != null) { 184 log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", thrownUnsafeFailure); 185 log.log( 186 Level.SEVERE, "SafeAtomicHelper is broken!", thrownAtomicReferenceFieldUpdaterFailure); 187 } 188 } 189 190 /** Waiter links form a Treiber stack, in the {@link #waiters} field. */ 191 private static final class Waiter { 192 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 193 194 @NullableDecl volatile Thread thread; 195 @NullableDecl volatile Waiter next; 196 197 /** 198 * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 199 * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 200 */ 201 Waiter(boolean unused) {} 202 203 Waiter() { 204 // avoid volatile write, write is made visible by subsequent CAS on waiters field 205 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 206 } 207 208 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 209 // field. 210 void setNext(Waiter next) { 211 ATOMIC_HELPER.putNext(this, next); 212 } 213 214 void unpark() { 215 // This is racy with removeWaiter. The consequence of the race is that we may spuriously call 216 // unpark even though the thread has already removed itself from the list. But even if we did 217 // use a CAS, that race would still exist (it would just be ever so slightly smaller). 218 Thread w = thread; 219 if (w != null) { 220 thread = null; 221 LockSupport.unpark(w); 222 } 223 } 224 } 225 226 /** 227 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 228 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved 229 * by two things. 230 * 231 * <ul> 232 * <li>This is only called when a waiting thread times out or is interrupted. Both of which 233 * should be rare. 234 * <li>The waiters list should be very short. 235 * </ul> 236 */ 237 private void removeWaiter(Waiter node) { 238 node.thread = null; // mark as 'deleted' 239 restart: 240 while (true) { 241 Waiter pred = null; 242 Waiter curr = waiters; 243 if (curr == Waiter.TOMBSTONE) { 244 return; // give up if someone is calling complete 245 } 246 Waiter succ; 247 while (curr != null) { 248 succ = curr.next; 249 if (curr.thread != null) { // we aren't unlinking this node, update pred. 250 pred = curr; 251 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 252 pred.next = succ; 253 if (pred.thread == null) { // We raced with another node that unlinked pred. Restart. 254 continue restart; 255 } 256 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 257 continue restart; // We raced with an add or complete 258 } 259 curr = succ; 260 } 261 break; 262 } 263 } 264 265 /** Listeners also form a stack through the {@link #listeners} field. */ 266 private static final class Listener { 267 static final Listener TOMBSTONE = new Listener(null, null); 268 final Runnable task; 269 final Executor executor; 270 271 // writes to next are made visible by subsequent CAS's on the listeners field 272 @NullableDecl Listener next; 273 274 Listener(Runnable task, Executor executor) { 275 this.task = task; 276 this.executor = executor; 277 } 278 } 279 280 /** A special value to represent {@code null}. */ 281 private static final Object NULL = new Object(); 282 283 /** A special value to represent failure, when {@link #setException} is called successfully. */ 284 private static final class Failure { 285 static final Failure FALLBACK_INSTANCE = 286 new Failure( 287 new Throwable("Failure occurred while trying to finish a future.") { 288 @Override 289 public synchronized Throwable fillInStackTrace() { 290 return this; // no stack trace 291 } 292 }); 293 final Throwable exception; 294 295 Failure(Throwable exception) { 296 this.exception = checkNotNull(exception); 297 } 298 } 299 300 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 301 private static final class Cancellation { 302 // constants to use when GENERATE_CANCELLATION_CAUSES = false 303 static final Cancellation CAUSELESS_INTERRUPTED; 304 static final Cancellation CAUSELESS_CANCELLED; 305 306 static { 307 if (GENERATE_CANCELLATION_CAUSES) { 308 CAUSELESS_CANCELLED = null; 309 CAUSELESS_INTERRUPTED = null; 310 } else { 311 CAUSELESS_CANCELLED = new Cancellation(false, null); 312 CAUSELESS_INTERRUPTED = new Cancellation(true, null); 313 } 314 } 315 316 final boolean wasInterrupted; 317 @NullableDecl final Throwable cause; 318 319 Cancellation(boolean wasInterrupted, @NullableDecl Throwable cause) { 320 this.wasInterrupted = wasInterrupted; 321 this.cause = cause; 322 } 323 } 324 325 /** A special value that encodes the 'setFuture' state. */ 326 private static final class SetFuture<V> implements Runnable { 327 final AbstractFuture<V> owner; 328 final ListenableFuture<? extends V> future; 329 330 SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) { 331 this.owner = owner; 332 this.future = future; 333 } 334 335 @Override 336 public void run() { 337 if (owner.value != this) { 338 // nothing to do, we must have been cancelled, don't bother inspecting the future. 339 return; 340 } 341 Object valueToSet = getFutureValue(future); 342 if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { 343 complete(owner); 344 } 345 } 346 } 347 348 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 349 // available. 350 /** 351 * This field encodes the current state of the future. 352 * 353 * <p>The valid values are: 354 * 355 * <ul> 356 * <li>{@code null} initial state, nothing has happened. 357 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 358 * <li>{@link Failure} terminal state, {@code setException} was called. 359 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 360 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 361 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null 362 * argument. 363 * </ul> 364 */ 365 @NullableDecl private volatile Object value; 366 367 /** All listeners. */ 368 @NullableDecl private volatile Listener listeners; 369 370 /** All waiting threads. */ 371 @NullableDecl private volatile Waiter waiters; 372 373 /** Constructor for use by subclasses. */ 374 protected AbstractFuture() {} 375 376 // Gets and Timed Gets 377 // 378 // * Be responsive to interruption 379 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 380 // waiters field. 381 // * Future completion is defined by when #value becomes non-null/non SetFuture 382 // * Future completion can be observed if the waiters field contains a TOMBSTONE 383 384 // Timed Get 385 // There are a few design constraints to consider 386 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 387 // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the 388 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 389 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 390 // similar purposes. 391 // * We want to behave reasonably for timeouts of 0 392 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 393 // system scheduling and as such we could either miss our deadline, or unpark() could be delayed 394 // so that it looks like we timed out even though we didn't. For comparison FutureTask respects 395 // completion preferably and AQS is non-deterministic (depends on where in the queue the waiter 396 // is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node 397 // and we could use that to make a decision about whether or not we timed out prior to being 398 // unparked. 399 400 /** 401 * {@inheritDoc} 402 * 403 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 404 * current thread is interrupted during the call, even if the value is already available. 405 * 406 * @throws CancellationException {@inheritDoc} 407 */ 408 @CanIgnoreReturnValue 409 @Override 410 public V get(long timeout, TimeUnit unit) 411 throws InterruptedException, TimeoutException, ExecutionException { 412 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop 413 // at the bottom and throw a timeoutexception. 414 final long timeoutNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit. 415 long remainingNanos = timeoutNanos; 416 if (Thread.interrupted()) { 417 throw new InterruptedException(); 418 } 419 Object localValue = value; 420 if (localValue != null & !(localValue instanceof SetFuture)) { 421 return getDoneValue(localValue); 422 } 423 // we delay calling nanoTime until we know we will need to either park or spin 424 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 425 long_wait_loop: 426 if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 427 Waiter oldHead = waiters; 428 if (oldHead != Waiter.TOMBSTONE) { 429 Waiter node = new Waiter(); 430 do { 431 node.setNext(oldHead); 432 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 433 while (true) { 434 OverflowAvoidingLockSupport.parkNanos(this, remainingNanos); 435 // Check interruption first, if we woke up due to interruption we need to honor that. 436 if (Thread.interrupted()) { 437 removeWaiter(node); 438 throw new InterruptedException(); 439 } 440 441 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 442 // wakeup 443 localValue = value; 444 if (localValue != null & !(localValue instanceof SetFuture)) { 445 return getDoneValue(localValue); 446 } 447 448 // timed out? 449 remainingNanos = endNanos - System.nanoTime(); 450 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 451 // Remove the waiter, one way or another we are done parking this thread. 452 removeWaiter(node); 453 break long_wait_loop; // jump down to the busy wait loop 454 } 455 } 456 } 457 oldHead = waiters; // re-read and loop. 458 } while (oldHead != Waiter.TOMBSTONE); 459 } 460 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 461 // waiter. 462 return getDoneValue(value); 463 } 464 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the 465 // waiters list 466 while (remainingNanos > 0) { 467 localValue = value; 468 if (localValue != null & !(localValue instanceof SetFuture)) { 469 return getDoneValue(localValue); 470 } 471 if (Thread.interrupted()) { 472 throw new InterruptedException(); 473 } 474 remainingNanos = endNanos - System.nanoTime(); 475 } 476 477 String futureToString = toString(); 478 final String unitString = unit.toString().toLowerCase(Locale.ROOT); 479 String message = "Waited " + timeout + " " + unit.toString().toLowerCase(Locale.ROOT); 480 // Only report scheduling delay if larger than our spin threshold - otherwise it's just noise 481 if (remainingNanos + SPIN_THRESHOLD_NANOS < 0) { 482 // We over-waited for our timeout. 483 message += " (plus "; 484 long overWaitNanos = -remainingNanos; 485 long overWaitUnits = unit.convert(overWaitNanos, TimeUnit.NANOSECONDS); 486 long overWaitLeftoverNanos = overWaitNanos - unit.toNanos(overWaitUnits); 487 boolean shouldShowExtraNanos = 488 overWaitUnits == 0 || overWaitLeftoverNanos > SPIN_THRESHOLD_NANOS; 489 if (overWaitUnits > 0) { 490 message += overWaitUnits + " " + unitString; 491 if (shouldShowExtraNanos) { 492 message += ","; 493 } 494 message += " "; 495 } 496 if (shouldShowExtraNanos) { 497 message += overWaitLeftoverNanos + " nanoseconds "; 498 } 499 500 message += "delay)"; 501 } 502 // It's confusing to see a completed future in a timeout message; if isDone() returns false, 503 // then we know it must have given a pending toString value earlier. If not, then the future 504 // completed after the timeout expired, and the message might be success. 505 if (isDone()) { 506 throw new TimeoutException(message + " but future completed as timeout expired"); 507 } 508 throw new TimeoutException(message + " for " + futureToString); 509 } 510 511 /** 512 * {@inheritDoc} 513 * 514 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 515 * current thread is interrupted during the call, even if the value is already available. 516 * 517 * @throws CancellationException {@inheritDoc} 518 */ 519 @CanIgnoreReturnValue 520 @Override 521 public V get() throws InterruptedException, ExecutionException { 522 if (Thread.interrupted()) { 523 throw new InterruptedException(); 524 } 525 Object localValue = value; 526 if (localValue != null & !(localValue instanceof SetFuture)) { 527 return getDoneValue(localValue); 528 } 529 Waiter oldHead = waiters; 530 if (oldHead != Waiter.TOMBSTONE) { 531 Waiter node = new Waiter(); 532 do { 533 node.setNext(oldHead); 534 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 535 // we are on the stack, now wait for completion. 536 while (true) { 537 LockSupport.park(this); 538 // Check interruption first, if we woke up due to interruption we need to honor that. 539 if (Thread.interrupted()) { 540 removeWaiter(node); 541 throw new InterruptedException(); 542 } 543 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 544 // wakeup 545 localValue = value; 546 if (localValue != null & !(localValue instanceof SetFuture)) { 547 return getDoneValue(localValue); 548 } 549 } 550 } 551 oldHead = waiters; // re-read and loop. 552 } while (oldHead != Waiter.TOMBSTONE); 553 } 554 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 555 // waiter. 556 return getDoneValue(value); 557 } 558 559 /** Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. */ 560 private V getDoneValue(Object obj) throws ExecutionException { 561 // While this seems like it might be too branch-y, simple benchmarking proves it to be 562 // unmeasurable (comparing done AbstractFutures with immediateFuture) 563 if (obj instanceof Cancellation) { 564 throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); 565 } else if (obj instanceof Failure) { 566 throw new ExecutionException(((Failure) obj).exception); 567 } else if (obj == NULL) { 568 return null; 569 } else { 570 @SuppressWarnings("unchecked") // this is the only other option 571 V asV = (V) obj; 572 return asV; 573 } 574 } 575 576 @Override 577 public boolean isDone() { 578 final Object localValue = value; 579 return localValue != null & !(localValue instanceof SetFuture); 580 } 581 582 @Override 583 public boolean isCancelled() { 584 final Object localValue = value; 585 return localValue instanceof Cancellation; 586 } 587 588 /** 589 * {@inheritDoc} 590 * 591 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain 592 * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate 593 * {@code Future} that was supplied in the {@code setFuture} call. 594 * 595 * <p>Rather than override this method to perform additional cancellation work or cleanup, 596 * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link 597 * #wasInterrupted} as necessary. This ensures that the work is done even if the future is 598 * cancelled without a call to {@code cancel}, such as by calling {@code 599 * setFuture(cancelledFuture)}. 600 * 601 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 602 * acquire other locks, risking deadlocks. 603 */ 604 @CanIgnoreReturnValue 605 @Override 606 public boolean cancel(boolean mayInterruptIfRunning) { 607 Object localValue = value; 608 boolean rValue = false; 609 if (localValue == null | localValue instanceof SetFuture) { 610 // Try to delay allocating the exception. At this point we may still lose the CAS, but it is 611 // certainly less likely. 612 Object valueToSet = 613 GENERATE_CANCELLATION_CAUSES 614 ? new Cancellation( 615 mayInterruptIfRunning, new CancellationException("Future.cancel() was called.")) 616 : (mayInterruptIfRunning 617 ? Cancellation.CAUSELESS_INTERRUPTED 618 : Cancellation.CAUSELESS_CANCELLED); 619 AbstractFuture<?> abstractFuture = this; 620 while (true) { 621 if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { 622 rValue = true; 623 // We call interruptTask before calling complete(), which is consistent with 624 // FutureTask 625 if (mayInterruptIfRunning) { 626 abstractFuture.interruptTask(); 627 } 628 complete(abstractFuture); 629 if (localValue instanceof SetFuture) { 630 // propagate cancellation to the future set in setfuture, this is racy, and we don't 631 // care if we are successful or not. 632 ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; 633 if (futureToPropagateTo instanceof Trusted) { 634 // If the future is a TrustedFuture then we specifically avoid calling cancel() 635 // this has 2 benefits 636 // 1. for long chains of futures strung together with setFuture we consume less stack 637 // 2. we avoid allocating Cancellation objects at every level of the cancellation 638 // chain 639 // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and 640 // does nothing but delegate to this method. 641 AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo; 642 localValue = trusted.value; 643 if (localValue == null | localValue instanceof SetFuture) { 644 abstractFuture = trusted; 645 continue; // loop back up and try to complete the new future 646 } 647 } else { 648 // not a TrustedFuture, call cancel directly. 649 futureToPropagateTo.cancel(mayInterruptIfRunning); 650 } 651 } 652 break; 653 } 654 // obj changed, reread 655 localValue = abstractFuture.value; 656 if (!(localValue instanceof SetFuture)) { 657 // obj cannot be null at this point, because value can only change from null to non-null. 658 // So if value changed (and it did since we lost the CAS), then it cannot be null and 659 // since it isn't a SetFuture, then the future must be done and we should exit the loop 660 break; 661 } 662 } 663 } 664 return rValue; 665 } 666 667 /** 668 * Subclasses can override this method to implement interruption of the future's computation. The 669 * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}. 670 * 671 * <p>The default implementation does nothing. 672 * 673 * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, consulting 674 * {@link #wasInterrupted} to decide whether to interrupt your task. 675 * 676 * @since 10.0 677 */ 678 protected void interruptTask() {} 679 680 /** 681 * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code 682 * true}. 683 * 684 * @since 14.0 685 */ 686 protected final boolean wasInterrupted() { 687 final Object localValue = value; 688 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 689 } 690 691 /** 692 * {@inheritDoc} 693 * 694 * @since 10.0 695 */ 696 @Override 697 public void addListener(Runnable listener, Executor executor) { 698 checkNotNull(listener, "Runnable was null."); 699 checkNotNull(executor, "Executor was null."); 700 // Checking isDone and listeners != TOMBSTONE may seem redundant, but our contract for 701 // addListener says that listeners execute 'immediate' if the future isDone(). However, our 702 // protocol for completing a future is to assign the value field (which sets isDone to true) and 703 // then to release waiters, followed by executing afterDone(), followed by releasing listeners. 704 // That means that it is possible to observe that the future isDone and that your listeners 705 // don't execute 'immediately'. By checking isDone here we avoid that. 706 // A corollary to all that is that we don't need to check isDone inside the loop because if we 707 // get into the loop we know that we weren't done when we entered and therefore we aren't under 708 // an obligation to execute 'immediately'. 709 if (!isDone()) { 710 Listener oldHead = listeners; 711 if (oldHead != Listener.TOMBSTONE) { 712 Listener newNode = new Listener(listener, executor); 713 do { 714 newNode.next = oldHead; 715 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 716 return; 717 } 718 oldHead = listeners; // re-read 719 } while (oldHead != Listener.TOMBSTONE); 720 } 721 } 722 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 723 // the listener. 724 executeListener(listener, executor); 725 } 726 727 /** 728 * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or 729 * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns, 730 * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was 731 * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code 732 * Future} may have previously been set asynchronously, in which case its result may not be known 733 * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 734 * method, only by a call to {@link #cancel}. 735 * 736 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 737 * acquire other locks, risking deadlocks. 738 * 739 * @param value the value to be used as the result 740 * @return true if the attempt was accepted, completing the {@code Future} 741 */ 742 @CanIgnoreReturnValue 743 protected boolean set(@NullableDecl V value) { 744 Object valueToSet = value == null ? NULL : value; 745 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 746 complete(this); 747 return true; 748 } 749 return false; 750 } 751 752 /** 753 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 754 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 755 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> 756 * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the 757 * {@code Future} may have previously been set asynchronously, in which case its result may not be 758 * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 759 * method, only by a call to {@link #cancel}. 760 * 761 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 762 * acquire other locks, risking deadlocks. 763 * 764 * @param throwable the exception to be used as the failed result 765 * @return true if the attempt was accepted, completing the {@code Future} 766 */ 767 @CanIgnoreReturnValue 768 protected boolean setException(Throwable throwable) { 769 Object valueToSet = new Failure(checkNotNull(throwable)); 770 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 771 complete(this); 772 return true; 773 } 774 return false; 775 } 776 777 /** 778 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 779 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 780 * (including "set asynchronously," defined below). 781 * 782 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call 783 * is accepted, then this future is guaranteed to have been completed with the supplied future by 784 * the time this method returns. If the supplied future is not done and the call is accepted, then 785 * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known, 786 * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}. 787 * 788 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 789 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 790 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 791 * Future}. 792 * 793 * <p>Note that, even if the supplied future is cancelled and it causes this future to complete, 794 * it will never trigger interruption behavior. In particular, it will not cause this future to 795 * invoke the {@link #interruptTask} method, and the {@link #wasInterrupted} method will not 796 * return {@code true}. 797 * 798 * <p>Beware of completing a future while holding a lock. Its listeners may do slow work or 799 * acquire other locks, risking deadlocks. 800 * 801 * @param future the future to delegate to 802 * @return true if the attempt was accepted, indicating that the {@code Future} was not previously 803 * cancelled or set. 804 * @since 19.0 805 */ 806 @CanIgnoreReturnValue 807 protected boolean setFuture(ListenableFuture<? extends V> future) { 808 checkNotNull(future); 809 Object localValue = value; 810 if (localValue == null) { 811 if (future.isDone()) { 812 Object value = getFutureValue(future); 813 if (ATOMIC_HELPER.casValue(this, null, value)) { 814 complete(this); 815 return true; 816 } 817 return false; 818 } 819 SetFuture valueToSet = new SetFuture<V>(this, future); 820 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 821 // the listener is responsible for calling completeWithFuture, directExecutor is appropriate 822 // since all we are doing is unpacking a completed future which should be fast. 823 try { 824 future.addListener(valueToSet, DirectExecutor.INSTANCE); 825 } catch (Throwable t) { 826 // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this 827 // must have been caused by addListener itself. The most likely explanation is a 828 // misconfigured mock. Try to switch to Failure. 829 Failure failure; 830 try { 831 failure = new Failure(t); 832 } catch (Throwable oomMostLikely) { 833 failure = Failure.FALLBACK_INSTANCE; 834 } 835 // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok. 836 boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); 837 } 838 return true; 839 } 840 localValue = value; // we lost the cas, fall through and maybe cancel 841 } 842 // The future has already been set to something. If it is cancellation we should cancel the 843 // incoming future. 844 if (localValue instanceof Cancellation) { 845 // we don't care if it fails, this is best-effort. 846 future.cancel(((Cancellation) localValue).wasInterrupted); 847 } 848 return false; 849 } 850 851 /** 852 * Returns a value that satisfies the contract of the {@link #value} field based on the state of 853 * given future. 854 * 855 * <p>This is approximately the inverse of {@link #getDoneValue(Object)} 856 */ 857 private static Object getFutureValue(ListenableFuture<?> future) { 858 if (future instanceof Trusted) { 859 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 860 // override .get() (since it is final) and therefore this is equivalent to calling .get() 861 // and unpacking the exceptions like we do below (just much faster because it is a single 862 // field read instead of a read, several branches and possibly creating exceptions). 863 Object v = ((AbstractFuture<?>) future).value; 864 if (v instanceof Cancellation) { 865 // If the other future was interrupted, clear the interrupted bit while preserving the cause 866 // this will make it consistent with how non-trustedfutures work which cannot propagate the 867 // wasInterrupted bit 868 Cancellation c = (Cancellation) v; 869 if (c.wasInterrupted) { 870 v = 871 c.cause != null 872 ? new Cancellation(/* wasInterrupted= */ false, c.cause) 873 : Cancellation.CAUSELESS_CANCELLED; 874 } 875 } 876 return v; 877 } 878 if (future instanceof InternalFutureFailureAccess) { 879 Throwable throwable = 880 InternalFutures.tryInternalFastPathGetFailure((InternalFutureFailureAccess) future); 881 if (throwable != null) { 882 return new Failure(throwable); 883 } 884 } 885 boolean wasCancelled = future.isCancelled(); 886 // Don't allocate a CancellationException if it's not necessary 887 if (!GENERATE_CANCELLATION_CAUSES & wasCancelled) { 888 return Cancellation.CAUSELESS_CANCELLED; 889 } 890 // Otherwise calculate the value by calling .get() 891 try { 892 Object v = getUninterruptibly(future); 893 if (wasCancelled) { 894 return new Cancellation( 895 false, 896 new IllegalArgumentException( 897 "get() did not throw CancellationException, despite reporting " 898 + "isCancelled() == true: " 899 + future)); 900 } 901 return v == null ? NULL : v; 902 } catch (ExecutionException exception) { 903 if (wasCancelled) { 904 return new Cancellation( 905 false, 906 new IllegalArgumentException( 907 "get() did not throw CancellationException, despite reporting " 908 + "isCancelled() == true: " 909 + future, 910 exception)); 911 } 912 return new Failure(exception.getCause()); 913 } catch (CancellationException cancellation) { 914 if (!wasCancelled) { 915 return new Failure( 916 new IllegalArgumentException( 917 "get() threw CancellationException, despite reporting isCancelled() == false: " 918 + future, 919 cancellation)); 920 } 921 return new Cancellation(false, cancellation); 922 } catch (Throwable t) { 923 return new Failure(t); 924 } 925 } 926 927 /** 928 * An inlined private copy of {@link Uninterruptibles#getUninterruptibly} used to break an 929 * internal dependency on other /util/concurrent classes. 930 */ 931 private static <V> V getUninterruptibly(Future<V> future) throws ExecutionException { 932 boolean interrupted = false; 933 try { 934 while (true) { 935 try { 936 return future.get(); 937 } catch (InterruptedException e) { 938 interrupted = true; 939 } 940 } 941 } finally { 942 if (interrupted) { 943 Thread.currentThread().interrupt(); 944 } 945 } 946 } 947 948 /** Unblocks all threads and runs all listeners. */ 949 private static void complete(AbstractFuture<?> future) { 950 Listener next = null; 951 outer: 952 while (true) { 953 future.releaseWaiters(); 954 // We call this before the listeners in order to avoid needing to manage a separate stack data 955 // structure for them. Also, some implementations rely on this running prior to listeners 956 // so that the cleanup work is visible to listeners. 957 // afterDone() should be generally fast and only used for cleanup work... but in theory can 958 // also be recursive and create StackOverflowErrors 959 future.afterDone(); 960 // push the current set of listeners onto next 961 next = future.clearListeners(next); 962 future = null; 963 while (next != null) { 964 Listener curr = next; 965 next = next.next; 966 Runnable task = curr.task; 967 if (task instanceof SetFuture) { 968 SetFuture<?> setFuture = (SetFuture<?>) task; 969 // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long 970 // chains of SetFutures 971 // Handling this special case is important because there is no way to pass an executor to 972 // setFuture, so a user couldn't break the chain by doing this themselves. It is also 973 // potentially common if someone writes a recursive Futures.transformAsync transformer. 974 future = setFuture.owner; 975 if (future.value == setFuture) { 976 Object valueToSet = getFutureValue(setFuture.future); 977 if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { 978 continue outer; 979 } 980 } 981 // other wise the future we were trying to set is already done. 982 } else { 983 executeListener(task, curr.executor); 984 } 985 } 986 break; 987 } 988 } 989 990 /** 991 * Callback method that is called exactly once after the future is completed. 992 * 993 * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. 994 * 995 * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is 996 * intended for very lightweight cleanup work, for example, timing statistics or clearing fields. 997 * If your task does anything heavier consider, just using a listener with an executor. 998 * 999 * @since 20.0 1000 */ 1001 @Beta 1002 @ForOverride 1003 protected void afterDone() {} 1004 1005 // TODO(b/114236866): Inherit doc from InternalFutureFailureAccess. Also, -link to its URL. 1006 /** 1007 * Usually returns {@code null} but, if this {@code Future} has failed, may <i>optionally</i> 1008 * return the cause of the failure. "Failure" means specifically "completed with an exception"; it 1009 * does not include "was cancelled." To be explicit: If this method returns a non-null value, 1010 * then: 1011 * 1012 * <ul> 1013 * <li>{@code isDone()} must return {@code true} 1014 * <li>{@code isCancelled()} must return {@code false} 1015 * <li>{@code get()} must not block, and it must throw an {@code ExecutionException} with the 1016 * return value of this method as its cause 1017 * </ul> 1018 * 1019 * <p>This method is {@code protected} so that classes like {@code 1020 * com.google.common.util.concurrent.SettableFuture} do not expose it to their users as an 1021 * instance method. In the unlikely event that you need to call this method, call {@link 1022 * InternalFutures#tryInternalFastPathGetFailure(InternalFutureFailureAccess)}. 1023 * 1024 * @since 27.0 1025 */ 1026 @Override 1027 @NullableDecl 1028 protected final Throwable tryInternalFastPathGetFailure() { 1029 if (this instanceof Trusted) { 1030 Object obj = value; 1031 if (obj instanceof Failure) { 1032 return ((Failure) obj).exception; 1033 } 1034 } 1035 return null; 1036 } 1037 1038 /** 1039 * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts) 1040 * the given future (if available). 1041 */ 1042 final void maybePropagateCancellationTo(@NullableDecl Future<?> related) { 1043 if (related != null & isCancelled()) { 1044 related.cancel(wasInterrupted()); 1045 } 1046 } 1047 1048 /** Releases all threads in the {@link #waiters} list, and clears the list. */ 1049 private void releaseWaiters() { 1050 Waiter head; 1051 do { 1052 head = waiters; 1053 } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); 1054 for (Waiter currentWaiter = head; currentWaiter != null; currentWaiter = currentWaiter.next) { 1055 currentWaiter.unpark(); 1056 } 1057 } 1058 1059 /** 1060 * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently 1061 * added first. 1062 */ 1063 private Listener clearListeners(Listener onto) { 1064 // We need to 1065 // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to 1066 // to synchronize with us 1067 // 2. reverse the linked list, because despite our rather clear contract, people depend on us 1068 // executing listeners in the order they were added 1069 // 3. push all the items onto 'onto' and return the new head of the stack 1070 Listener head; 1071 do { 1072 head = listeners; 1073 } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); 1074 Listener reversedList = onto; 1075 while (head != null) { 1076 Listener tmp = head; 1077 head = head.next; 1078 tmp.next = reversedList; 1079 reversedList = tmp; 1080 } 1081 return reversedList; 1082 } 1083 1084 // TODO(user): move parts into a default method on ListenableFuture? 1085 @Override 1086 public String toString() { 1087 // TODO(cpovirk): Presize to something plausible? 1088 StringBuilder builder = new StringBuilder(); 1089 if (getClass().getName().startsWith("com.google.common.util.concurrent.")) { 1090 builder.append(getClass().getSimpleName()); 1091 } else { 1092 builder.append(getClass().getName()); 1093 } 1094 builder.append('@').append(toHexString(identityHashCode(this))).append("[status="); 1095 if (isCancelled()) { 1096 builder.append("CANCELLED"); 1097 } else if (isDone()) { 1098 addDoneString(builder); 1099 } else { 1100 addPendingString(builder); // delegates to addDoneString if future completes mid-way 1101 } 1102 return builder.append("]").toString(); 1103 } 1104 1105 /** 1106 * Provide a human-readable explanation of why this future has not yet completed. 1107 * 1108 * @return null if an explanation cannot be provided (e.g. because the future is done). 1109 * @since 23.0 1110 */ 1111 @NullableDecl 1112 protected String pendingToString() { 1113 // TODO(diamondm) consider moving this into addPendingString so it's always in the output 1114 if (this instanceof ScheduledFuture) { 1115 return "remaining delay=[" 1116 + ((ScheduledFuture) this).getDelay(TimeUnit.MILLISECONDS) 1117 + " ms]"; 1118 } 1119 return null; 1120 } 1121 1122 private void addPendingString(StringBuilder builder) { 1123 // Capture current builder length so it can be truncated if this future ends up completing while 1124 // the toString is being calculated 1125 int truncateLength = builder.length(); 1126 1127 builder.append("PENDING"); 1128 1129 Object localValue = value; 1130 if (localValue instanceof SetFuture) { 1131 builder.append(", setFuture=["); 1132 appendUserObject(builder, ((SetFuture) localValue).future); 1133 builder.append("]"); 1134 } else { 1135 String pendingDescription; 1136 try { 1137 pendingDescription = Strings.emptyToNull(pendingToString()); 1138 } catch (RuntimeException | StackOverflowError e) { 1139 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1140 // subclass is implemented with bugs similar to the subclass. 1141 pendingDescription = "Exception thrown from implementation: " + e.getClass(); 1142 } 1143 if (pendingDescription != null) { 1144 builder.append(", info=[").append(pendingDescription).append("]"); 1145 } 1146 } 1147 1148 // The future may complete while calculating the toString, so we check once more to see if the 1149 // future is done 1150 if (isDone()) { 1151 // Truncate anything that was appended before realizing this future is done 1152 builder.delete(truncateLength, builder.length()); 1153 addDoneString(builder); 1154 } 1155 } 1156 1157 private void addDoneString(StringBuilder builder) { 1158 try { 1159 V value = getUninterruptibly(this); 1160 builder.append("SUCCESS, result=["); 1161 appendResultObject(builder, value); 1162 builder.append("]"); 1163 } catch (ExecutionException e) { 1164 builder.append("FAILURE, cause=[").append(e.getCause()).append("]"); 1165 } catch (CancellationException e) { 1166 builder.append("CANCELLED"); // shouldn't be reachable 1167 } catch (RuntimeException e) { 1168 builder.append("UNKNOWN, cause=[").append(e.getClass()).append(" thrown from get()]"); 1169 } 1170 } 1171 1172 /** 1173 * Any object can be the result of a Future, and not every object has a reasonable toString() 1174 * implementation. Using a reconstruction of the default Object.toString() prevents OOMs and stack 1175 * overflows, and helps avoid sensitive data inadvertently ending up in exception messages. 1176 */ 1177 private void appendResultObject(StringBuilder builder, Object o) { 1178 if (o == null) { 1179 builder.append("null"); 1180 } else if (o == this) { 1181 builder.append("this future"); 1182 } else { 1183 builder 1184 .append(o.getClass().getName()) 1185 .append("@") 1186 .append(Integer.toHexString(System.identityHashCode(o))); 1187 } 1188 } 1189 1190 /** Helper for printing user supplied objects into our toString method. */ 1191 private void appendUserObject(StringBuilder builder, Object o) { 1192 // This is some basic recursion detection for when people create cycles via set/setFuture or 1193 // when deep chains of futures exist resulting in a StackOverflowException. We could detect 1194 // arbitrary cycles using a thread local but this should be a good enough solution (it is also 1195 // what jdk collections do in these cases) 1196 try { 1197 if (o == this) { 1198 builder.append("this future"); 1199 } else { 1200 builder.append(o); 1201 } 1202 } catch (RuntimeException | StackOverflowError e) { 1203 // Don't call getMessage or toString() on the exception, in case the exception thrown by the 1204 // user object is implemented with bugs similar to the user object. 1205 builder.append("Exception thrown from implementation: ").append(e.getClass()); 1206 } 1207 } 1208 1209 /** 1210 * Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain 1211 * RuntimeException runtime exceptions} thrown by the executor. 1212 */ 1213 private static void executeListener(Runnable runnable, Executor executor) { 1214 try { 1215 executor.execute(runnable); 1216 } catch (RuntimeException e) { 1217 // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if 1218 // we're given a bad one. We only catch RuntimeException because we want Errors to propagate 1219 // up. 1220 log.log( 1221 Level.SEVERE, 1222 "RuntimeException while executing runnable " + runnable + " with executor " + executor, 1223 e); 1224 } 1225 } 1226 1227 private abstract static class AtomicHelper { 1228 /** Non volatile write of the thread to the {@link Waiter#thread} field. */ 1229 abstract void putThread(Waiter waiter, Thread newValue); 1230 1231 /** Non volatile write of the waiter to the {@link Waiter#next} field. */ 1232 abstract void putNext(Waiter waiter, Waiter newValue); 1233 1234 /** Performs a CAS operation on the {@link #waiters} field. */ 1235 abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update); 1236 1237 /** Performs a CAS operation on the {@link #listeners} field. */ 1238 abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update); 1239 1240 /** Performs a CAS operation on the {@link #value} field. */ 1241 abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update); 1242 } 1243 1244 /** 1245 * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. 1246 * 1247 * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot 1248 * be accessed. 1249 */ 1250 private static final class UnsafeAtomicHelper extends AtomicHelper { 1251 static final sun.misc.Unsafe UNSAFE; 1252 static final long LISTENERS_OFFSET; 1253 static final long WAITERS_OFFSET; 1254 static final long VALUE_OFFSET; 1255 static final long WAITER_THREAD_OFFSET; 1256 static final long WAITER_NEXT_OFFSET; 1257 1258 static { 1259 sun.misc.Unsafe unsafe = null; 1260 try { 1261 unsafe = sun.misc.Unsafe.getUnsafe(); 1262 } catch (SecurityException tryReflectionInstead) { 1263 try { 1264 unsafe = 1265 AccessController.doPrivileged( 1266 new PrivilegedExceptionAction<sun.misc.Unsafe>() { 1267 @Override 1268 public sun.misc.Unsafe run() throws Exception { 1269 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; 1270 for (java.lang.reflect.Field f : k.getDeclaredFields()) { 1271 f.setAccessible(true); 1272 Object x = f.get(null); 1273 if (k.isInstance(x)) { 1274 return k.cast(x); 1275 } 1276 } 1277 throw new NoSuchFieldError("the Unsafe"); 1278 } 1279 }); 1280 } catch (PrivilegedActionException e) { 1281 throw new RuntimeException("Could not initialize intrinsics", e.getCause()); 1282 } 1283 } 1284 try { 1285 Class<?> abstractFuture = AbstractFuture.class; 1286 WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters")); 1287 LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners")); 1288 VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value")); 1289 WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread")); 1290 WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next")); 1291 UNSAFE = unsafe; 1292 } catch (Exception e) { 1293 throwIfUnchecked(e); 1294 throw new RuntimeException(e); 1295 } 1296 } 1297 1298 @Override 1299 void putThread(Waiter waiter, Thread newValue) { 1300 UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue); 1301 } 1302 1303 @Override 1304 void putNext(Waiter waiter, Waiter newValue) { 1305 UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue); 1306 } 1307 1308 /** Performs a CAS operation on the {@link #waiters} field. */ 1309 @Override 1310 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1311 return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update); 1312 } 1313 1314 /** Performs a CAS operation on the {@link #listeners} field. */ 1315 @Override 1316 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1317 return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update); 1318 } 1319 1320 /** Performs a CAS operation on the {@link #value} field. */ 1321 @Override 1322 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1323 return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update); 1324 } 1325 } 1326 1327 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 1328 private static final class SafeAtomicHelper extends AtomicHelper { 1329 final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; 1330 final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; 1331 final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater; 1332 final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater; 1333 final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater; 1334 1335 SafeAtomicHelper( 1336 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, 1337 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, 1338 AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, 1339 AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, 1340 AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) { 1341 this.waiterThreadUpdater = waiterThreadUpdater; 1342 this.waiterNextUpdater = waiterNextUpdater; 1343 this.waitersUpdater = waitersUpdater; 1344 this.listenersUpdater = listenersUpdater; 1345 this.valueUpdater = valueUpdater; 1346 } 1347 1348 @Override 1349 void putThread(Waiter waiter, Thread newValue) { 1350 waiterThreadUpdater.lazySet(waiter, newValue); 1351 } 1352 1353 @Override 1354 void putNext(Waiter waiter, Waiter newValue) { 1355 waiterNextUpdater.lazySet(waiter, newValue); 1356 } 1357 1358 @Override 1359 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1360 return waitersUpdater.compareAndSet(future, expect, update); 1361 } 1362 1363 @Override 1364 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1365 return listenersUpdater.compareAndSet(future, expect, update); 1366 } 1367 1368 @Override 1369 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1370 return valueUpdater.compareAndSet(future, expect, update); 1371 } 1372 } 1373 1374 /** 1375 * {@link AtomicHelper} based on {@code synchronized} and volatile writes. 1376 * 1377 * <p>This is an implementation of last resort for when certain basic VM features are broken (like 1378 * AtomicReferenceFieldUpdater). 1379 */ 1380 private static final class SynchronizedHelper extends AtomicHelper { 1381 @Override 1382 void putThread(Waiter waiter, Thread newValue) { 1383 waiter.thread = newValue; 1384 } 1385 1386 @Override 1387 void putNext(Waiter waiter, Waiter newValue) { 1388 waiter.next = newValue; 1389 } 1390 1391 @Override 1392 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1393 synchronized (future) { 1394 if (future.waiters == expect) { 1395 future.waiters = update; 1396 return true; 1397 } 1398 return false; 1399 } 1400 } 1401 1402 @Override 1403 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1404 synchronized (future) { 1405 if (future.listeners == expect) { 1406 future.listeners = update; 1407 return true; 1408 } 1409 return false; 1410 } 1411 } 1412 1413 @Override 1414 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1415 synchronized (future) { 1416 if (future.value == expect) { 1417 future.value = update; 1418 return true; 1419 } 1420 return false; 1421 } 1422 } 1423 } 1424 1425 private static CancellationException cancellationExceptionWithCause( 1426 @NullableDecl String message, @NullableDecl Throwable cause) { 1427 CancellationException exception = new CancellationException(message); 1428 exception.initCause(cause); 1429 return exception; 1430 } 1431}