001/* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Throwables.throwIfUnchecked; 019import static com.google.common.util.concurrent.Futures.getDone; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 022 023import com.google.common.annotations.Beta; 024import com.google.common.annotations.GwtCompatible; 025import com.google.errorprone.annotations.CanIgnoreReturnValue; 026import com.google.errorprone.annotations.DoNotMock; 027import com.google.j2objc.annotations.ReflectionSupport; 028import java.security.AccessController; 029import java.security.PrivilegedActionException; 030import java.security.PrivilegedExceptionAction; 031import java.util.concurrent.CancellationException; 032import java.util.concurrent.ExecutionException; 033import java.util.concurrent.Executor; 034import java.util.concurrent.Future; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 038import java.util.concurrent.locks.LockSupport; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041import javax.annotation.Nullable; 042 043/** 044 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 045 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 046 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 047 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, 048 * com.google.common.base.Function) Futures.transform} and {@link Futures#catching(ListenableFuture, 049 * Class, com.google.common.base.Function, java.util.concurrent.Executor) Futures.catching}. 050 * 051 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 052 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 053 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 054 * {@link #afterDone()}, which will be invoked automatically when the future completes. Subclasses 055 * should rarely override other methods. 056 * 057 * @author Sven Mawson 058 * @author Luke Sandberg 059 * @since 1.0 060 */ 061@DoNotMock("Use Futures.immediate*Future or SettableFuture") 062@GwtCompatible(emulated = true) 063@ReflectionSupport(value = ReflectionSupport.Level.FULL) 064public abstract class AbstractFuture<V> implements ListenableFuture<V> { 065 // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || 066 067 private static final boolean GENERATE_CANCELLATION_CAUSES = 068 Boolean.parseBoolean( 069 System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); 070 071 /** 072 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 073 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 074 */ 075 abstract static class TrustedFuture<V> extends AbstractFuture<V> { 076 @CanIgnoreReturnValue 077 @Override 078 public final V get() throws InterruptedException, ExecutionException { 079 return super.get(); 080 } 081 082 @CanIgnoreReturnValue 083 @Override 084 public final V get(long timeout, TimeUnit unit) 085 throws InterruptedException, ExecutionException, TimeoutException { 086 return super.get(timeout, unit); 087 } 088 089 @Override 090 public final boolean isDone() { 091 return super.isDone(); 092 } 093 094 @Override 095 public final boolean isCancelled() { 096 return super.isCancelled(); 097 } 098 099 @Override 100 public final void addListener(Runnable listener, Executor executor) { 101 super.addListener(listener, executor); 102 } 103 104 @CanIgnoreReturnValue 105 @Override 106 public final boolean cancel(boolean mayInterruptIfRunning) { 107 return super.cancel(mayInterruptIfRunning); 108 } 109 } 110 111 // Logger to log exceptions caught when running listeners. 112 private static final Logger log = Logger.getLogger(AbstractFuture.class.getName()); 113 114 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 115 // blocking. This value is what AbstractQueuedSynchronizer uses. 116 private static final long SPIN_THRESHOLD_NANOS = 1000L; 117 118 private static final AtomicHelper ATOMIC_HELPER; 119 120 static { 121 AtomicHelper helper; 122 123 try { 124 helper = new UnsafeAtomicHelper(); 125 } catch (Throwable unsafeFailure) { 126 // catch absolutely everything and fall through to our 'SafeAtomicHelper' 127 // The access control checks that ARFU does means the caller class has to be AbstractFuture 128 // instead of SafeAtomicHelper, so we annoyingly define these here 129 try { 130 helper = 131 new SafeAtomicHelper( 132 newUpdater(Waiter.class, Thread.class, "thread"), 133 newUpdater(Waiter.class, Waiter.class, "next"), 134 newUpdater(AbstractFuture.class, Waiter.class, "waiters"), 135 newUpdater(AbstractFuture.class, Listener.class, "listeners"), 136 newUpdater(AbstractFuture.class, Object.class, "value")); 137 } catch (Throwable atomicReferenceFieldUpdaterFailure) { 138 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 139 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 140 // For these users fallback to a suboptimal implementation, based on synchronized. This will 141 // be a definite performance hit to those users. 142 log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure); 143 log.log(Level.SEVERE, "SafeAtomicHelper is broken!", atomicReferenceFieldUpdaterFailure); 144 helper = new SynchronizedHelper(); 145 } 146 } 147 ATOMIC_HELPER = helper; 148 149 // Prevent rare disastrous classloading in first call to LockSupport.park. 150 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 151 @SuppressWarnings("unused") 152 Class<?> ensureLoaded = LockSupport.class; 153 } 154 155 /** 156 * Waiter links form a Treiber stack, in the {@link #waiters} field. 157 */ 158 private static final class Waiter { 159 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 160 161 @Nullable volatile Thread thread; 162 @Nullable volatile Waiter next; 163 164 /** 165 * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 166 * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 167 */ 168 Waiter(boolean unused) {} 169 170 Waiter() { 171 // avoid volatile write, write is made visible by subsequent CAS on waiters field 172 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 173 } 174 175 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 176 // field. 177 void setNext(Waiter next) { 178 ATOMIC_HELPER.putNext(this, next); 179 } 180 181 void unpark() { 182 // This is racy with removeWaiter. The consequence of the race is that we may spuriously call 183 // unpark even though the thread has already removed itself from the list. But even if we did 184 // use a CAS, that race would still exist (it would just be ever so slightly smaller). 185 Thread w = thread; 186 if (w != null) { 187 thread = null; 188 LockSupport.unpark(w); 189 } 190 } 191 } 192 193 /** 194 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 195 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved 196 * by two things. 197 * <ul> 198 * <li>This is only called when a waiting thread times out or is interrupted. Both of which should 199 * be rare. 200 * <li>The waiters list should be very short. 201 * </ul> 202 */ 203 private void removeWaiter(Waiter node) { 204 node.thread = null; // mark as 'deleted' 205 restart: 206 while (true) { 207 Waiter pred = null; 208 Waiter curr = waiters; 209 if (curr == Waiter.TOMBSTONE) { 210 return; // give up if someone is calling complete 211 } 212 Waiter succ; 213 while (curr != null) { 214 succ = curr.next; 215 if (curr.thread != null) { // we aren't unlinking this node, update pred. 216 pred = curr; 217 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 218 pred.next = succ; 219 if (pred.thread == null) { // We raced with another node that unlinked pred. Restart. 220 continue restart; 221 } 222 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 223 continue restart; // We raced with an add or complete 224 } 225 curr = succ; 226 } 227 break; 228 } 229 } 230 231 /** Listeners also form a stack through the {@link #listeners} field. */ 232 private static final class Listener { 233 static final Listener TOMBSTONE = new Listener(null, null); 234 final Runnable task; 235 final Executor executor; 236 237 // writes to next are made visible by subsequent CAS's on the listeners field 238 @Nullable Listener next; 239 240 Listener(Runnable task, Executor executor) { 241 this.task = task; 242 this.executor = executor; 243 } 244 } 245 246 /** A special value to represent {@code null}. */ 247 private static final Object NULL = new Object(); 248 249 /** A special value to represent failure, when {@link #setException} is called successfully. */ 250 private static final class Failure { 251 static final Failure FALLBACK_INSTANCE = 252 new Failure( 253 new Throwable("Failure occurred while trying to finish a future.") { 254 @Override 255 public synchronized Throwable fillInStackTrace() { 256 return this; // no stack trace 257 } 258 }); 259 final Throwable exception; 260 261 Failure(Throwable exception) { 262 this.exception = checkNotNull(exception); 263 } 264 } 265 266 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 267 private static final class Cancellation { 268 final boolean wasInterrupted; 269 @Nullable final Throwable cause; 270 271 Cancellation(boolean wasInterrupted, @Nullable Throwable cause) { 272 this.wasInterrupted = wasInterrupted; 273 this.cause = cause; 274 } 275 } 276 277 /** A special value that encodes the 'setFuture' state. */ 278 private static final class SetFuture<V> implements Runnable { 279 final AbstractFuture<V> owner; 280 final ListenableFuture<? extends V> future; 281 282 SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) { 283 this.owner = owner; 284 this.future = future; 285 } 286 287 @Override 288 public void run() { 289 if (owner.value != this) { 290 // nothing to do, we must have been cancelled, don't bother inspecting the future. 291 return; 292 } 293 Object valueToSet = getFutureValue(future); 294 if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { 295 complete(owner); 296 } 297 } 298 } 299 300 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 301 // available. 302 /** 303 * This field encodes the current state of the future. 304 * 305 * <p>The valid values are: 306 * <ul> 307 * <li>{@code null} initial state, nothing has happened. 308 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 309 * <li>{@link Failure} terminal state, {@code setException} was called. 310 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 311 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 312 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null argument. 313 * </ul> 314 */ 315 private volatile Object value; 316 317 /** All listeners. */ 318 private volatile Listener listeners; 319 320 /** All waiting threads. */ 321 private volatile Waiter waiters; 322 323 /** 324 * Constructor for use by subclasses. 325 */ 326 protected AbstractFuture() {} 327 328 // Gets and Timed Gets 329 // 330 // * Be responsive to interruption 331 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 332 // waiters field. 333 // * Future completion is defined by when #value becomes non-null/non SetFuture 334 // * Future completion can be observed if the waiters field contains a TOMBSTONE 335 336 // Timed Get 337 // There are a few design constraints to consider 338 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 339 // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the 340 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 341 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 342 // similar purposes. 343 // * We want to behave reasonably for timeouts of 0 344 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 345 // system scheduling and as such we could either miss our deadline, or unpark() could be delayed 346 // so that it looks like we timed out even though we didn't. For comparison FutureTask respects 347 // completion preferably and AQS is non-deterministic (depends on where in the queue the waiter 348 // is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node 349 // and we could use that to make a decision about whether or not we timed out prior to being 350 // unparked. 351 352 /* 353 * Improve the documentation of when InterruptedException is thrown. Our behavior matches the 354 * JDK's, but the JDK's documentation is misleading. 355 */ 356 357 /** 358 * {@inheritDoc} 359 * 360 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 361 * current thread is interrupted before or during the call, even if the value is already 362 * available. 363 * 364 * @throws InterruptedException if the current thread was interrupted before or during the call 365 * (optional but recommended). 366 * @throws CancellationException {@inheritDoc} 367 */ 368 @CanIgnoreReturnValue 369 @Override 370 public V get(long timeout, TimeUnit unit) 371 throws InterruptedException, TimeoutException, ExecutionException { 372 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop 373 // at the bottom and throw a timeoutexception. 374 long remainingNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit. 375 if (Thread.interrupted()) { 376 throw new InterruptedException(); 377 } 378 Object localValue = value; 379 if (localValue != null & !(localValue instanceof SetFuture)) { 380 return getDoneValue(localValue); 381 } 382 // we delay calling nanoTime until we know we will need to either park or spin 383 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 384 long_wait_loop: 385 if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 386 Waiter oldHead = waiters; 387 if (oldHead != Waiter.TOMBSTONE) { 388 Waiter node = new Waiter(); 389 do { 390 node.setNext(oldHead); 391 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 392 while (true) { 393 LockSupport.parkNanos(this, remainingNanos); 394 // Check interruption first, if we woke up due to interruption we need to honor that. 395 if (Thread.interrupted()) { 396 removeWaiter(node); 397 throw new InterruptedException(); 398 } 399 400 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 401 // wakeup 402 localValue = value; 403 if (localValue != null & !(localValue instanceof SetFuture)) { 404 return getDoneValue(localValue); 405 } 406 407 // timed out? 408 remainingNanos = endNanos - System.nanoTime(); 409 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 410 // Remove the waiter, one way or another we are done parking this thread. 411 removeWaiter(node); 412 break long_wait_loop; // jump down to the busy wait loop 413 } 414 } 415 } 416 oldHead = waiters; // re-read and loop. 417 } while (oldHead != Waiter.TOMBSTONE); 418 } 419 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 420 // waiter. 421 return getDoneValue(value); 422 } 423 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the 424 // waiters list 425 while (remainingNanos > 0) { 426 localValue = value; 427 if (localValue != null & !(localValue instanceof SetFuture)) { 428 return getDoneValue(localValue); 429 } 430 if (Thread.interrupted()) { 431 throw new InterruptedException(); 432 } 433 remainingNanos = endNanos - System.nanoTime(); 434 } 435 throw new TimeoutException(); 436 } 437 438 /* 439 * Improve the documentation of when InterruptedException is thrown. Our behavior matches the 440 * JDK's, but the JDK's documentation is misleading. 441 */ 442 443 /** 444 * {@inheritDoc} 445 * 446 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 447 * current thread is interrupted before or during the call, even if the value is already 448 * available. 449 * 450 * @throws InterruptedException if the current thread was interrupted before or during the call 451 * (optional but recommended). 452 * @throws CancellationException {@inheritDoc} 453 */ 454 @CanIgnoreReturnValue 455 @Override 456 public V get() throws InterruptedException, ExecutionException { 457 if (Thread.interrupted()) { 458 throw new InterruptedException(); 459 } 460 Object localValue = value; 461 if (localValue != null & !(localValue instanceof SetFuture)) { 462 return getDoneValue(localValue); 463 } 464 Waiter oldHead = waiters; 465 if (oldHead != Waiter.TOMBSTONE) { 466 Waiter node = new Waiter(); 467 do { 468 node.setNext(oldHead); 469 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 470 // we are on the stack, now wait for completion. 471 while (true) { 472 LockSupport.park(this); 473 // Check interruption first, if we woke up due to interruption we need to honor that. 474 if (Thread.interrupted()) { 475 removeWaiter(node); 476 throw new InterruptedException(); 477 } 478 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 479 // wakeup 480 localValue = value; 481 if (localValue != null & !(localValue instanceof SetFuture)) { 482 return getDoneValue(localValue); 483 } 484 } 485 } 486 oldHead = waiters; // re-read and loop. 487 } while (oldHead != Waiter.TOMBSTONE); 488 } 489 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 490 // waiter. 491 return getDoneValue(value); 492 } 493 494 /** 495 * Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. 496 */ 497 private V getDoneValue(Object obj) throws ExecutionException { 498 // While this seems like it might be too branch-y, simple benchmarking proves it to be 499 // unmeasurable (comparing done AbstractFutures with immediateFuture) 500 if (obj instanceof Cancellation) { 501 throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); 502 } else if (obj instanceof Failure) { 503 throw new ExecutionException(((Failure) obj).exception); 504 } else if (obj == NULL) { 505 return null; 506 } else { 507 @SuppressWarnings("unchecked") // this is the only other option 508 V asV = (V) obj; 509 return asV; 510 } 511 } 512 513 @Override 514 public boolean isDone() { 515 final Object localValue = value; 516 return localValue != null & !(localValue instanceof SetFuture); 517 } 518 519 @Override 520 public boolean isCancelled() { 521 final Object localValue = value; 522 return localValue instanceof Cancellation; 523 } 524 525 /** 526 * {@inheritDoc} 527 * 528 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain 529 * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate 530 * {@code Future} that was supplied in the {@code setFuture} call. 531 * 532 * <p>Rather than override this method to perform additional cancellation work or cleanup, 533 * subclasses should override {@link #afterDone}, consulting {@link #isCancelled} and {@link 534 * #wasInterrupted} as necessary. This ensures that the work is done even if the future is 535 * cancelled without a call to {@code cancel}, such as by calling {@code 536 * setFuture(cancelledFuture)}. 537 */ 538 @CanIgnoreReturnValue 539 @Override 540 public boolean cancel(boolean mayInterruptIfRunning) { 541 Object localValue = value; 542 boolean rValue = false; 543 if (localValue == null | localValue instanceof SetFuture) { 544 // Try to delay allocating the exception. At this point we may still lose the CAS, but it is 545 // certainly less likely. 546 Throwable cause = 547 GENERATE_CANCELLATION_CAUSES 548 ? new CancellationException("Future.cancel() was called.") 549 : null; 550 Object valueToSet = new Cancellation(mayInterruptIfRunning, cause); 551 AbstractFuture<?> abstractFuture = this; 552 while (true) { 553 if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { 554 rValue = true; 555 // We call interuptTask before calling complete(), which is consistent with 556 // FutureTask 557 if (mayInterruptIfRunning) { 558 abstractFuture.interruptTask(); 559 } 560 complete(abstractFuture); 561 if (localValue instanceof SetFuture) { 562 // propagate cancellation to the future set in setfuture, this is racy, and we don't 563 // care if we are successful or not. 564 ListenableFuture<?> futureToPropagateTo = ((SetFuture) localValue).future; 565 if (futureToPropagateTo instanceof TrustedFuture) { 566 // If the future is a TrustedFuture then we specifically avoid calling cancel() 567 // this has 2 benefits 568 // 1. for long chains of futures strung together with setFuture we consume less stack 569 // 2. we avoid allocating Cancellation objects at every level of the cancellation 570 // chain 571 // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and 572 // does nothing but delegate to this method. 573 AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo; 574 localValue = trusted.value; 575 if (localValue == null | localValue instanceof SetFuture) { 576 abstractFuture = trusted; 577 continue; // loop back up and try to complete the new future 578 } 579 } else { 580 // not a TrustedFuture, call cancel directly. 581 futureToPropagateTo.cancel(mayInterruptIfRunning); 582 } 583 } 584 break; 585 } 586 // obj changed, reread 587 localValue = abstractFuture.value; 588 if (!(localValue instanceof SetFuture)) { 589 // obj cannot be null at this point, because value can only change from null to non-null. 590 // So if value changed (and it did since we lost the CAS), then it cannot be null and 591 // since it isn't a SetFuture, then the future must be done and we should exit the loop 592 break; 593 } 594 } 595 } 596 return rValue; 597 } 598 599 /** 600 * Subclasses can override this method to implement interruption of the future's computation. The 601 * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}. 602 * 603 * <p>The default implementation does nothing. 604 * 605 * <p>This method is likely to be deprecated. Prefer to override {@link #afterDone}, checking 606 * {@link #wasInterrupted} to decide whether to interrupt your task. 607 * 608 * @since 10.0 609 */ 610 protected void interruptTask() {} 611 612 /** 613 * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code 614 * true}. 615 * 616 * @since 14.0 617 */ 618 protected final boolean wasInterrupted() { 619 final Object localValue = value; 620 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 621 } 622 623 /** 624 * {@inheritDoc} 625 * 626 * @since 10.0 627 */ 628 @Override 629 public void addListener(Runnable listener, Executor executor) { 630 checkNotNull(listener, "Runnable was null."); 631 checkNotNull(executor, "Executor was null."); 632 Listener oldHead = listeners; 633 if (oldHead != Listener.TOMBSTONE) { 634 Listener newNode = new Listener(listener, executor); 635 do { 636 newNode.next = oldHead; 637 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 638 return; 639 } 640 oldHead = listeners; // re-read 641 } while (oldHead != Listener.TOMBSTONE); 642 } 643 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 644 // the listener. 645 executeListener(listener, executor); 646 } 647 648 /** 649 * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or 650 * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns, 651 * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was 652 * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code 653 * Future} may have previously been set asynchronously, in which case its result may not be known 654 * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 655 * method, only by a call to {@link #cancel}. 656 * 657 * @param value the value to be used as the result 658 * @return true if the attempt was accepted, completing the {@code Future} 659 */ 660 @CanIgnoreReturnValue 661 protected boolean set(@Nullable V value) { 662 Object valueToSet = value == null ? NULL : value; 663 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 664 complete(this); 665 return true; 666 } 667 return false; 668 } 669 670 /** 671 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 672 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 673 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> 674 * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the 675 * {@code Future} may have previously been set asynchronously, in which case its result may not be 676 * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 677 * method, only by a call to {@link #cancel}. 678 * 679 * @param throwable the exception to be used as the failed result 680 * @return true if the attempt was accepted, completing the {@code Future} 681 */ 682 @CanIgnoreReturnValue 683 protected boolean setException(Throwable throwable) { 684 Object valueToSet = new Failure(checkNotNull(throwable)); 685 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 686 complete(this); 687 return true; 688 } 689 return false; 690 } 691 692 /** 693 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 694 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 695 * (including "set asynchronously," defined below). 696 * 697 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call 698 * is accepted, then this future is guaranteed to have been completed with the supplied future by 699 * the time this method returns. If the supplied future is not done and the call is accepted, then 700 * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known, 701 * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}. 702 * 703 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 704 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 705 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 706 * Future}. 707 * 708 * @param future the future to delegate to 709 * @return true if the attempt was accepted, indicating that the {@code Future} was not previously 710 * cancelled or set. 711 * @since 19.0 712 */ 713 @Beta 714 @CanIgnoreReturnValue 715 protected boolean setFuture(ListenableFuture<? extends V> future) { 716 checkNotNull(future); 717 Object localValue = value; 718 if (localValue == null) { 719 if (future.isDone()) { 720 Object value = getFutureValue(future); 721 if (ATOMIC_HELPER.casValue(this, null, value)) { 722 complete(this); 723 return true; 724 } 725 return false; 726 } 727 SetFuture valueToSet = new SetFuture<V>(this, future); 728 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 729 // the listener is responsible for calling completeWithFuture, directExecutor is appropriate 730 // since all we are doing is unpacking a completed future which should be fast. 731 try { 732 future.addListener(valueToSet, directExecutor()); 733 } catch (Throwable t) { 734 // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this 735 // must have been caused by addListener itself. The most likely explanation is a 736 // misconfigured mock. Try to switch to Failure. 737 Failure failure; 738 try { 739 failure = new Failure(t); 740 } catch (Throwable oomMostLikely) { 741 failure = Failure.FALLBACK_INSTANCE; 742 } 743 // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok. 744 boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); 745 } 746 return true; 747 } 748 localValue = value; // we lost the cas, fall through and maybe cancel 749 } 750 // The future has already been set to something. If it is cancellation we should cancel the 751 // incoming future. 752 if (localValue instanceof Cancellation) { 753 // we don't care if it fails, this is best-effort. 754 future.cancel(((Cancellation) localValue).wasInterrupted); 755 } 756 return false; 757 } 758 759 /** 760 * Returns a value, suitable for storing in the {@link #value} field. From the given future, 761 * which is assumed to be done. 762 * 763 * <p>This is approximately the inverse of {@link #getDoneValue(Object)} 764 */ 765 private static Object getFutureValue(ListenableFuture<?> future) { 766 Object valueToSet; 767 if (future instanceof TrustedFuture) { 768 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 769 // override .get() (since it is final) and therefore this is equivalent to calling .get() 770 // and unpacking the exceptions like we do below (just much faster because it is a single 771 // field read instead of a read, several branches and possibly creating exceptions). 772 return ((AbstractFuture<?>) future).value; 773 } else { 774 // Otherwise calculate valueToSet by calling .get() 775 try { 776 Object v = getDone(future); 777 valueToSet = v == null ? NULL : v; 778 } catch (ExecutionException exception) { 779 valueToSet = new Failure(exception.getCause()); 780 } catch (CancellationException cancellation) { 781 valueToSet = new Cancellation(false, cancellation); 782 } catch (Throwable t) { 783 valueToSet = new Failure(t); 784 } 785 } 786 return valueToSet; 787 } 788 789 /** Unblocks all threads and runs all listeners. */ 790 private static void complete(AbstractFuture<?> future) { 791 Listener next = null; 792 outer: while (true) { 793 future.releaseWaiters(); 794 // We call this before the listeners in order to avoid needing to manage a separate stack data 795 // structure for them. 796 // afterDone() should be generally fast and only used for cleanup work... but in theory can 797 // also be recursive and create StackOverflowErrors 798 future.afterDone(); 799 // push the current set of listeners onto next 800 next = future.clearListeners(next); 801 future = null; 802 while (next != null) { 803 Listener curr = next; 804 next = next.next; 805 Runnable task = curr.task; 806 if (task instanceof SetFuture) { 807 SetFuture<?> setFuture = (SetFuture<?>) task; 808 // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long 809 // chains of SetFutures 810 // Handling this special case is important because there is no way to pass an executor to 811 // setFuture, so a user couldn't break the chain by doing this themselves. It is also 812 // potentially common if someone writes a recursive Futures.transformAsync transformer. 813 future = setFuture.owner; 814 if (future.value == setFuture) { 815 Object valueToSet = getFutureValue(setFuture.future); 816 if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { 817 continue outer; 818 } 819 } 820 // other wise the future we were trying to set is already done. 821 } else { 822 executeListener(task, curr.executor); 823 } 824 } 825 break; 826 } 827 } 828 829 /** 830 * Callback method that is called exactly once after the future is completed. 831 * 832 * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. 833 * 834 * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is 835 * intended for very lightweight cleanup work, for example, timing statistics or clearing fields. 836 * If your task does anything heavier consider, just using a listener with an executor. 837 * 838 * @since 20.0 839 */ 840 // TODO(cpovirk): @ForOverride https://github.com/google/error-prone/issues/342 841 @Beta 842 protected void afterDone() {} 843 844 /** 845 * Returns the exception that this {@code Future} completed with. This includes completion through 846 * a call to {@link setException} or {@link setFuture}{@code (failedFuture)} but not cancellation. 847 * 848 * @throws RuntimeException if the {@code Future} has not failed 849 */ 850 final Throwable trustedGetException() { 851 return ((Failure) value).exception; 852 } 853 854 /** 855 * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts) 856 * the given future (if available). 857 * 858 * <p>This method should be used only when this future is completed. It is designed to be called 859 * from {@code done}. 860 */ 861 final void maybePropagateCancellation(@Nullable Future<?> related) { 862 if (related != null & isCancelled()) { 863 related.cancel(wasInterrupted()); 864 } 865 } 866 867 /** Releases all threads in the {@link #waiters} list, and clears the list. */ 868 private void releaseWaiters() { 869 Waiter head; 870 do { 871 head = waiters; 872 } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); 873 for (Waiter currentWaiter = head; 874 currentWaiter != null; 875 currentWaiter = currentWaiter.next) { 876 currentWaiter.unpark(); 877 } 878 } 879 880 /** 881 * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently 882 * added first. 883 */ 884 private Listener clearListeners(Listener onto) { 885 // We need to 886 // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to 887 // to synchronize with us 888 // 2. reverse the linked list, because despite our rather clear contract, people depend on us 889 // executing listeners in the order they were added 890 // 3. push all the items onto 'onto' and return the new head of the stack 891 Listener head; 892 do { 893 head = listeners; 894 } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); 895 Listener reversedList = onto; 896 while (head != null) { 897 Listener tmp = head; 898 head = head.next; 899 tmp.next = reversedList; 900 reversedList = tmp; 901 } 902 return reversedList; 903 } 904 905 /** 906 * Submits the given runnable to the given {@link Executor} catching and logging all 907 * {@linkplain RuntimeException runtime exceptions} thrown by the executor. 908 */ 909 private static void executeListener(Runnable runnable, Executor executor) { 910 try { 911 executor.execute(runnable); 912 } catch (RuntimeException e) { 913 // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if 914 // we're given a bad one. We only catch RuntimeException because we want Errors to propagate 915 // up. 916 log.log( 917 Level.SEVERE, 918 "RuntimeException while executing runnable " + runnable + " with executor " + executor, 919 e); 920 } 921 } 922 923 private abstract static class AtomicHelper { 924 /** Non volatile write of the thread to the {@link Waiter#thread} field. */ 925 abstract void putThread(Waiter waiter, Thread newValue); 926 927 /** Non volatile write of the waiter to the {@link Waiter#next} field. */ 928 abstract void putNext(Waiter waiter, Waiter newValue); 929 930 /** Performs a CAS operation on the {@link #waiters} field. */ 931 abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update); 932 933 /** Performs a CAS operation on the {@link #listeners} field. */ 934 abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update); 935 936 /** Performs a CAS operation on the {@link #value} field. */ 937 abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update); 938 } 939 940 /** 941 * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. 942 * 943 * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot 944 * be accessed. 945 */ 946 private static final class UnsafeAtomicHelper extends AtomicHelper { 947 static final sun.misc.Unsafe UNSAFE; 948 static final long LISTENERS_OFFSET; 949 static final long WAITERS_OFFSET; 950 static final long VALUE_OFFSET; 951 static final long WAITER_THREAD_OFFSET; 952 static final long WAITER_NEXT_OFFSET; 953 954 static { 955 sun.misc.Unsafe unsafe = null; 956 try { 957 unsafe = sun.misc.Unsafe.getUnsafe(); 958 } catch (SecurityException tryReflectionInstead) { 959 try { 960 unsafe = 961 AccessController.doPrivileged( 962 new PrivilegedExceptionAction<sun.misc.Unsafe>() { 963 @Override 964 public sun.misc.Unsafe run() throws Exception { 965 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; 966 for (java.lang.reflect.Field f : k.getDeclaredFields()) { 967 f.setAccessible(true); 968 Object x = f.get(null); 969 if (k.isInstance(x)) { 970 return k.cast(x); 971 } 972 } 973 throw new NoSuchFieldError("the Unsafe"); 974 } 975 }); 976 } catch (PrivilegedActionException e) { 977 throw new RuntimeException("Could not initialize intrinsics", e.getCause()); 978 } 979 } 980 try { 981 Class<?> abstractFuture = AbstractFuture.class; 982 WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters")); 983 LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners")); 984 VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value")); 985 WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread")); 986 WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next")); 987 UNSAFE = unsafe; 988 } catch (Exception e) { 989 throwIfUnchecked(e); 990 throw new RuntimeException(e); 991 } 992 } 993 994 @Override 995 void putThread(Waiter waiter, Thread newValue) { 996 UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue); 997 } 998 999 @Override 1000 void putNext(Waiter waiter, Waiter newValue) { 1001 UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue); 1002 } 1003 1004 /** Performs a CAS operation on the {@link #waiters} field. */ 1005 @Override 1006 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1007 return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update); 1008 } 1009 1010 /** Performs a CAS operation on the {@link #listeners} field. */ 1011 @Override 1012 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1013 return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update); 1014 } 1015 1016 /** Performs a CAS operation on the {@link #value} field. */ 1017 @Override 1018 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1019 return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update); 1020 } 1021 } 1022 1023 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 1024 private static final class SafeAtomicHelper extends AtomicHelper { 1025 final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; 1026 final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; 1027 final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater; 1028 final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater; 1029 final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater; 1030 1031 SafeAtomicHelper( 1032 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, 1033 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, 1034 AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, 1035 AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, 1036 AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) { 1037 this.waiterThreadUpdater = waiterThreadUpdater; 1038 this.waiterNextUpdater = waiterNextUpdater; 1039 this.waitersUpdater = waitersUpdater; 1040 this.listenersUpdater = listenersUpdater; 1041 this.valueUpdater = valueUpdater; 1042 } 1043 1044 @Override 1045 void putThread(Waiter waiter, Thread newValue) { 1046 waiterThreadUpdater.lazySet(waiter, newValue); 1047 } 1048 1049 @Override 1050 void putNext(Waiter waiter, Waiter newValue) { 1051 waiterNextUpdater.lazySet(waiter, newValue); 1052 } 1053 1054 @Override 1055 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1056 return waitersUpdater.compareAndSet(future, expect, update); 1057 } 1058 1059 @Override 1060 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1061 return listenersUpdater.compareAndSet(future, expect, update); 1062 } 1063 1064 @Override 1065 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1066 return valueUpdater.compareAndSet(future, expect, update); 1067 } 1068 } 1069 1070 /** 1071 * {@link AtomicHelper} based on {@code synchronized} and volatile writes. 1072 * 1073 * <p>This is an implementation of last resort for when certain basic VM features are broken (like 1074 * AtomicReferenceFieldUpdater). 1075 */ 1076 private static final class SynchronizedHelper extends AtomicHelper { 1077 @Override 1078 void putThread(Waiter waiter, Thread newValue) { 1079 waiter.thread = newValue; 1080 } 1081 1082 @Override 1083 void putNext(Waiter waiter, Waiter newValue) { 1084 waiter.next = newValue; 1085 } 1086 1087 @Override 1088 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1089 synchronized (future) { 1090 if (future.waiters == expect) { 1091 future.waiters = update; 1092 return true; 1093 } 1094 return false; 1095 } 1096 } 1097 1098 @Override 1099 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1100 synchronized (future) { 1101 if (future.listeners == expect) { 1102 future.listeners = update; 1103 return true; 1104 } 1105 return false; 1106 } 1107 } 1108 1109 @Override 1110 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1111 synchronized (future) { 1112 if (future.value == expect) { 1113 future.value = update; 1114 return true; 1115 } 1116 return false; 1117 } 1118 } 1119 } 1120 1121 private static CancellationException cancellationExceptionWithCause( 1122 @Nullable String message, @Nullable Throwable cause) { 1123 CancellationException exception = new CancellationException(message); 1124 exception.initCause(cause); 1125 return exception; 1126 } 1127}