001/* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkNotNull; 018import static com.google.common.base.Throwables.throwIfUnchecked; 019import static com.google.common.util.concurrent.Futures.getDone; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 022 023import com.google.common.annotations.Beta; 024import com.google.common.annotations.GwtCompatible; 025import com.google.errorprone.annotations.CanIgnoreReturnValue; 026import com.google.j2objc.annotations.ReflectionSupport; 027import java.security.AccessController; 028import java.security.PrivilegedActionException; 029import java.security.PrivilegedExceptionAction; 030import java.util.concurrent.CancellationException; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.Executor; 033import java.util.concurrent.Future; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 037import java.util.concurrent.locks.LockSupport; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040import javax.annotation.Nullable; 041 042/** 043 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 044 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 045 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 046 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, Function) 047 * Futures.transform} and {@link Futures#catching(ListenableFuture, Class, Function) 048 * Futures.catching}. 049 * 050 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 051 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 052 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 053 * {@link #interruptTask()}, which will be invoked automatically if a call to {@link 054 * #cancel(boolean) cancel(true)} succeeds in canceling the future. Subclasses should rarely 055 * override other methods. 056 * 057 * @author Sven Mawson 058 * @author Luke Sandberg 059 * @since 1.0 060 */ 061@GwtCompatible(emulated = true) 062@ReflectionSupport(value = ReflectionSupport.Level.FULL) 063public abstract class AbstractFuture<V> implements ListenableFuture<V> { 064 // NOTE: Whenever both tests are cheap and functional, it's faster to use &, | instead of &&, || 065 066 private static final boolean GENERATE_CANCELLATION_CAUSES = 067 Boolean.parseBoolean( 068 System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); 069 070 /** 071 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 072 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 073 */ 074 abstract static class TrustedFuture<V> extends AbstractFuture<V> { 075 @CanIgnoreReturnValue 076 @Override 077 public final V get() throws InterruptedException, ExecutionException { 078 return super.get(); 079 } 080 081 @CanIgnoreReturnValue 082 @Override 083 public final V get(long timeout, TimeUnit unit) 084 throws InterruptedException, ExecutionException, TimeoutException { 085 return super.get(timeout, unit); 086 } 087 088 @Override 089 public final boolean isDone() { 090 return super.isDone(); 091 } 092 093 @Override 094 public final boolean isCancelled() { 095 return super.isCancelled(); 096 } 097 098 @Override 099 public final void addListener(Runnable listener, Executor executor) { 100 super.addListener(listener, executor); 101 } 102 103 @CanIgnoreReturnValue 104 @Override 105 public final boolean cancel(boolean mayInterruptIfRunning) { 106 return super.cancel(mayInterruptIfRunning); 107 } 108 } 109 110 // Logger to log exceptions caught when running listeners. 111 private static final Logger log = Logger.getLogger(AbstractFuture.class.getName()); 112 113 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 114 // blocking. This value is what AbstractQueuedSynchronizer uses. 115 private static final long SPIN_THRESHOLD_NANOS = 1000L; 116 117 private static final AtomicHelper ATOMIC_HELPER; 118 119 static { 120 AtomicHelper helper; 121 122 try { 123 helper = new UnsafeAtomicHelper(); 124 } catch (Throwable unsafeFailure) { 125 // catch absolutely everything and fall through to our 'SafeAtomicHelper' 126 // The access control checks that ARFU does means the caller class has to be AbstractFuture 127 // instead of SafeAtomicHelper, so we annoyingly define these here 128 try { 129 helper = 130 new SafeAtomicHelper( 131 newUpdater(Waiter.class, Thread.class, "thread"), 132 newUpdater(Waiter.class, Waiter.class, "next"), 133 newUpdater(AbstractFuture.class, Waiter.class, "waiters"), 134 newUpdater(AbstractFuture.class, Listener.class, "listeners"), 135 newUpdater(AbstractFuture.class, Object.class, "value")); 136 } catch (Throwable atomicReferenceFieldUpdaterFailure) { 137 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 138 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 139 // For these users fallback to a suboptimal implementation, based on synchronized. This will 140 // be a definite performance hit to those users. 141 log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure); 142 log.log(Level.SEVERE, "SafeAtomicHelper is broken!", atomicReferenceFieldUpdaterFailure); 143 helper = new SynchronizedHelper(); 144 } 145 } 146 ATOMIC_HELPER = helper; 147 148 // Prevent rare disastrous classloading in first call to LockSupport.park. 149 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 150 @SuppressWarnings("unused") 151 Class<?> ensureLoaded = LockSupport.class; 152 } 153 154 /** 155 * Waiter links form a Treiber stack, in the {@link #waiters} field. 156 */ 157 private static final class Waiter { 158 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 159 160 @Nullable volatile Thread thread; 161 @Nullable volatile Waiter next; 162 163 /** 164 * Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 165 * before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 166 */ 167 Waiter(boolean unused) {} 168 169 Waiter() { 170 // avoid volatile write, write is made visible by subsequent CAS on waiters field 171 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 172 } 173 174 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 175 // field. 176 void setNext(Waiter next) { 177 ATOMIC_HELPER.putNext(this, next); 178 } 179 180 void unpark() { 181 // This is racy with removeWaiter. The consequence of the race is that we may spuriously call 182 // unpark even though the thread has already removed itself from the list. But even if we did 183 // use a CAS, that race would still exist (it would just be ever so slightly smaller). 184 Thread w = thread; 185 if (w != null) { 186 thread = null; 187 LockSupport.unpark(w); 188 } 189 } 190 } 191 192 /** 193 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 194 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are saved 195 * by two things. 196 * <ul> 197 * <li>This is only called when a waiting thread times out or is interrupted. Both of which should 198 * be rare. 199 * <li>The waiters list should be very short. 200 * </ul> 201 */ 202 private void removeWaiter(Waiter node) { 203 node.thread = null; // mark as 'deleted' 204 restart: 205 while (true) { 206 Waiter pred = null; 207 Waiter curr = waiters; 208 if (curr == Waiter.TOMBSTONE) { 209 return; // give up if someone is calling complete 210 } 211 Waiter succ; 212 while (curr != null) { 213 succ = curr.next; 214 if (curr.thread != null) { // we aren't unlinking this node, update pred. 215 pred = curr; 216 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 217 pred.next = succ; 218 if (pred.thread == null) { // We raced with another node that unlinked pred. Restart. 219 continue restart; 220 } 221 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 222 continue restart; // We raced with an add or complete 223 } 224 curr = succ; 225 } 226 break; 227 } 228 } 229 230 /** Listeners also form a stack through the {@link #listeners} field. */ 231 private static final class Listener { 232 static final Listener TOMBSTONE = new Listener(null, null); 233 final Runnable task; 234 final Executor executor; 235 236 // writes to next are made visible by subsequent CAS's on the listeners field 237 @Nullable Listener next; 238 239 Listener(Runnable task, Executor executor) { 240 this.task = task; 241 this.executor = executor; 242 } 243 } 244 245 /** A special value to represent {@code null}. */ 246 private static final Object NULL = new Object(); 247 248 /** A special value to represent failure, when {@link #setException} is called successfully. */ 249 private static final class Failure { 250 static final Failure FALLBACK_INSTANCE = 251 new Failure( 252 new Throwable("Failure occurred while trying to finish a future.") { 253 @Override 254 public synchronized Throwable fillInStackTrace() { 255 return this; // no stack trace 256 } 257 }); 258 final Throwable exception; 259 260 Failure(Throwable exception) { 261 this.exception = checkNotNull(exception); 262 } 263 } 264 265 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 266 private static final class Cancellation { 267 final boolean wasInterrupted; 268 @Nullable final Throwable cause; 269 270 Cancellation(boolean wasInterrupted, @Nullable Throwable cause) { 271 this.wasInterrupted = wasInterrupted; 272 this.cause = cause; 273 } 274 } 275 276 /** A special value that encodes the 'setFuture' state. */ 277 private static final class SetFuture<V> implements Runnable { 278 final AbstractFuture<V> owner; 279 final ListenableFuture<? extends V> future; 280 281 SetFuture(AbstractFuture<V> owner, ListenableFuture<? extends V> future) { 282 this.owner = owner; 283 this.future = future; 284 } 285 286 @Override 287 public void run() { 288 if (owner.value != this) { 289 // nothing to do, we must have been cancelled, don't bother inspecting the future. 290 return; 291 } 292 Object valueToSet = getFutureValue(future); 293 if (ATOMIC_HELPER.casValue(owner, this, valueToSet)) { 294 complete(owner); 295 } 296 } 297 } 298 299 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 300 // available. 301 /** 302 * This field encodes the current state of the future. 303 * 304 * <p>The valid values are: 305 * <ul> 306 * <li>{@code null} initial state, nothing has happened. 307 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 308 * <li>{@link Failure} terminal state, {@code setException} was called. 309 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 310 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 311 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null argument. 312 * </ul> 313 */ 314 private volatile Object value; 315 316 /** All listeners. */ 317 private volatile Listener listeners; 318 319 /** All waiting threads. */ 320 private volatile Waiter waiters; 321 322 /** 323 * Constructor for use by subclasses. 324 */ 325 protected AbstractFuture() {} 326 327 // Gets and Timed Gets 328 // 329 // * Be responsive to interruption 330 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 331 // waiters field. 332 // * Future completion is defined by when #value becomes non-null/non SetFuture 333 // * Future completion can be observed if the waiters field contains a TOMBSTONE 334 335 // Timed Get 336 // There are a few design constraints to consider 337 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 338 // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the 339 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 340 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 341 // similar purposes. 342 // * We want to behave reasonably for timeouts of 0 343 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 344 // system scheduling and as such we could either miss our deadline, or unpark() could be delayed 345 // so that it looks like we timed out even though we didn't. For comparison FutureTask respects 346 // completion preferably and AQS is non-deterministic (depends on where in the queue the waiter 347 // is). If we wanted to be strict about it, we could store the unpark() time in the Waiter node 348 // and we could use that to make a decision about whether or not we timed out prior to being 349 // unparked. 350 351 /* 352 * Improve the documentation of when InterruptedException is thrown. Our behavior matches the 353 * JDK's, but the JDK's documentation is misleading. 354 */ 355 356 /** 357 * {@inheritDoc} 358 * 359 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 360 * current thread is interrupted before or during the call, even if the value is already 361 * available. 362 * 363 * @throws InterruptedException if the current thread was interrupted before or during the call 364 * (optional but recommended). 365 * @throws CancellationException {@inheritDoc} 366 */ 367 @CanIgnoreReturnValue 368 @Override 369 public V get(long timeout, TimeUnit unit) 370 throws InterruptedException, TimeoutException, ExecutionException { 371 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop 372 // at the bottom and throw a timeoutexception. 373 long remainingNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit. 374 if (Thread.interrupted()) { 375 throw new InterruptedException(); 376 } 377 Object localValue = value; 378 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 379 return getDoneValue(localValue); 380 } 381 // we delay calling nanoTime until we know we will need to either park or spin 382 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 383 long_wait_loop: 384 if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 385 Waiter oldHead = waiters; 386 if (oldHead != Waiter.TOMBSTONE) { 387 Waiter node = new Waiter(); 388 do { 389 node.setNext(oldHead); 390 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 391 while (true) { 392 LockSupport.parkNanos(this, remainingNanos); 393 // Check interruption first, if we woke up due to interruption we need to honor that. 394 if (Thread.interrupted()) { 395 removeWaiter(node); 396 throw new InterruptedException(); 397 } 398 399 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 400 // wakeup 401 localValue = value; 402 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 403 return getDoneValue(localValue); 404 } 405 406 // timed out? 407 remainingNanos = endNanos - System.nanoTime(); 408 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 409 // Remove the waiter, one way or another we are done parking this thread. 410 removeWaiter(node); 411 break long_wait_loop; // jump down to the busy wait loop 412 } 413 } 414 } 415 oldHead = waiters; // re-read and loop. 416 } while (oldHead != Waiter.TOMBSTONE); 417 } 418 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 419 // waiter. 420 return getDoneValue(value); 421 } 422 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the 423 // waiters list 424 while (remainingNanos > 0) { 425 localValue = value; 426 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 427 return getDoneValue(localValue); 428 } 429 if (Thread.interrupted()) { 430 throw new InterruptedException(); 431 } 432 remainingNanos = endNanos - System.nanoTime(); 433 } 434 throw new TimeoutException(); 435 } 436 437 /* 438 * Improve the documentation of when InterruptedException is thrown. Our behavior matches the 439 * JDK's, but the JDK's documentation is misleading. 440 */ 441 442 /** 443 * {@inheritDoc} 444 * 445 * <p>The default {@link AbstractFuture} implementation throws {@code InterruptedException} if the 446 * current thread is interrupted before or during the call, even if the value is already 447 * available. 448 * 449 * @throws InterruptedException if the current thread was interrupted before or during the call 450 * (optional but recommended). 451 * @throws CancellationException {@inheritDoc} 452 */ 453 @CanIgnoreReturnValue 454 @Override 455 public V get() throws InterruptedException, ExecutionException { 456 if (Thread.interrupted()) { 457 throw new InterruptedException(); 458 } 459 Object localValue = value; 460 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 461 return getDoneValue(localValue); 462 } 463 Waiter oldHead = waiters; 464 if (oldHead != Waiter.TOMBSTONE) { 465 Waiter node = new Waiter(); 466 do { 467 node.setNext(oldHead); 468 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 469 // we are on the stack, now wait for completion. 470 while (true) { 471 LockSupport.park(this); 472 // Check interruption first, if we woke up due to interruption we need to honor that. 473 if (Thread.interrupted()) { 474 removeWaiter(node); 475 throw new InterruptedException(); 476 } 477 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 478 // wakeup 479 localValue = value; 480 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 481 return getDoneValue(localValue); 482 } 483 } 484 } 485 oldHead = waiters; // re-read and loop. 486 } while (oldHead != Waiter.TOMBSTONE); 487 } 488 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 489 // waiter. 490 return getDoneValue(value); 491 } 492 493 /** 494 * Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. 495 */ 496 private V getDoneValue(Object obj) throws ExecutionException { 497 // While this seems like it might be too branch-y, simple benchmarking proves it to be 498 // unmeasurable (comparing done AbstractFutures with immediateFuture) 499 if (obj instanceof Cancellation) { 500 throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); 501 } else if (obj instanceof Failure) { 502 throw new ExecutionException(((Failure) obj).exception); 503 } else if (obj == NULL) { 504 return null; 505 } else { 506 @SuppressWarnings("unchecked") // this is the only other option 507 V asV = (V) obj; 508 return asV; 509 } 510 } 511 512 @Override 513 public boolean isDone() { 514 final Object localValue = value; 515 return localValue != null & !(localValue instanceof AbstractFuture.SetFuture); 516 } 517 518 @Override 519 public boolean isCancelled() { 520 final Object localValue = value; 521 return localValue instanceof Cancellation; 522 } 523 524 /** 525 * {@inheritDoc} 526 * 527 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain 528 * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate 529 * {@code Future} that was supplied in the {@code setFuture} call. 530 */ 531 @CanIgnoreReturnValue 532 @Override 533 public boolean cancel(boolean mayInterruptIfRunning) { 534 Object localValue = value; 535 boolean rValue = false; 536 if (localValue == null | localValue instanceof AbstractFuture.SetFuture) { 537 // Try to delay allocating the exception. At this point we may still lose the CAS, but it is 538 // certainly less likely. 539 Throwable cause = 540 GENERATE_CANCELLATION_CAUSES 541 ? new CancellationException("Future.cancel() was called.") 542 : null; 543 Object valueToSet = new Cancellation(mayInterruptIfRunning, cause); 544 AbstractFuture<?> abstractFuture = this; 545 while (true) { 546 if (ATOMIC_HELPER.casValue(abstractFuture, localValue, valueToSet)) { 547 rValue = true; 548 // We call interuptTask before calling complete(), which is consistent with 549 // FutureTask 550 if (mayInterruptIfRunning) { 551 abstractFuture.interruptTask(); 552 } 553 complete(abstractFuture); 554 if (localValue instanceof AbstractFuture.SetFuture) { 555 // propagate cancellation to the future set in setfuture, this is racy, and we don't 556 // care if we are successful or not. 557 ListenableFuture<?> futureToPropagateTo = 558 ((AbstractFuture.SetFuture) localValue).future; 559 if (futureToPropagateTo instanceof TrustedFuture) { 560 // If the future is a TrustedFuture then we specifically avoid calling cancel() 561 // this has 2 benefits 562 // 1. for long chains of futures strung together with setFuture we consume less stack 563 // 2. we avoid allocating Cancellation objects at every level of the cancellation 564 // chain 565 // We can only do this for TrustedFuture, because TrustedFuture.cancel is final and 566 // does nothing but delegate to this method. 567 AbstractFuture<?> trusted = (AbstractFuture<?>) futureToPropagateTo; 568 localValue = trusted.value; 569 if (localValue == null | localValue instanceof AbstractFuture.SetFuture) { 570 abstractFuture = trusted; 571 continue; // loop back up and try to complete the new future 572 } 573 } else { 574 // not a TrustedFuture, call cancel directly. 575 futureToPropagateTo.cancel(mayInterruptIfRunning); 576 } 577 } 578 break; 579 } 580 // obj changed, reread 581 localValue = abstractFuture.value; 582 if (!(localValue instanceof AbstractFuture.SetFuture)) { 583 // obj cannot be null at this point, because value can only change from null to non-null. 584 // So if value changed (and it did since we lost the CAS), then it cannot be null and 585 // since it isn't a SetFuture, then the future must be done and we should exit the loop 586 break; 587 } 588 } 589 } 590 return rValue; 591 } 592 593 /** 594 * Subclasses can override this method to implement interruption of the future's computation. The 595 * method is invoked automatically by a successful call to {@link #cancel(boolean) cancel(true)}. 596 * 597 * <p>The default implementation does nothing. 598 * 599 * @since 10.0 600 */ 601 protected void interruptTask() {} 602 603 /** 604 * Returns true if this future was cancelled with {@code mayInterruptIfRunning} set to {@code 605 * true}. 606 * 607 * @since 14.0 608 */ 609 protected final boolean wasInterrupted() { 610 final Object localValue = value; 611 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 612 } 613 614 /** 615 * {@inheritDoc} 616 * 617 * @since 10.0 618 */ 619 @Override 620 public void addListener(Runnable listener, Executor executor) { 621 checkNotNull(listener, "Runnable was null."); 622 checkNotNull(executor, "Executor was null."); 623 Listener oldHead = listeners; 624 if (oldHead != Listener.TOMBSTONE) { 625 Listener newNode = new Listener(listener, executor); 626 do { 627 newNode.next = oldHead; 628 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 629 return; 630 } 631 oldHead = listeners; // re-read 632 } while (oldHead != Listener.TOMBSTONE); 633 } 634 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 635 // the listener. 636 executeListener(listener, executor); 637 } 638 639 /** 640 * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or 641 * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns, 642 * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was 643 * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code 644 * Future} may have previously been set asynchronously, in which case its result may not be known 645 * yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 646 * method, only by a call to {@link #cancel}. 647 * 648 * @param value the value to be used as the result 649 * @return true if the attempt was accepted, completing the {@code Future} 650 */ 651 @CanIgnoreReturnValue 652 protected boolean set(@Nullable V value) { 653 Object valueToSet = value == null ? NULL : value; 654 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 655 complete(this); 656 return true; 657 } 658 return false; 659 } 660 661 /** 662 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 663 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 664 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> 665 * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the 666 * {@code Future} may have previously been set asynchronously, in which case its result may not be 667 * known yet. That result, though not yet known, cannot be overridden by a call to a {@code set*} 668 * method, only by a call to {@link #cancel}. 669 * 670 * @param throwable the exception to be used as the failed result 671 * @return true if the attempt was accepted, completing the {@code Future} 672 */ 673 @CanIgnoreReturnValue 674 protected boolean setException(Throwable throwable) { 675 Object valueToSet = new Failure(checkNotNull(throwable)); 676 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 677 complete(this); 678 return true; 679 } 680 return false; 681 } 682 683 /** 684 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 685 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 686 * (including "set asynchronously," defined below). 687 * 688 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call 689 * is accepted, then this future is guaranteed to have been completed with the supplied future by 690 * the time this method returns. If the supplied future is not done and the call is accepted, then 691 * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known, 692 * cannot be overridden by a call to a {@code set*} method, only by a call to {@link #cancel}. 693 * 694 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 695 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 696 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 697 * Future}. 698 * 699 * @param future the future to delegate to 700 * @return true if the attempt was accepted, indicating that the {@code Future} was not previously 701 * cancelled or set. 702 * @since 19.0 703 */ 704 @Beta 705 @CanIgnoreReturnValue 706 protected boolean setFuture(ListenableFuture<? extends V> future) { 707 checkNotNull(future); 708 Object localValue = value; 709 if (localValue == null) { 710 if (future.isDone()) { 711 Object value = getFutureValue(future); 712 if (ATOMIC_HELPER.casValue(this, null, value)) { 713 complete(this); 714 return true; 715 } 716 return false; 717 } 718 SetFuture valueToSet = new SetFuture<V>(this, future); 719 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 720 // the listener is responsible for calling completeWithFuture, directExecutor is appropriate 721 // since all we are doing is unpacking a completed future which should be fast. 722 try { 723 future.addListener(valueToSet, directExecutor()); 724 } catch (Throwable t) { 725 // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this 726 // must have been caused by addListener itself. The most likely explanation is a 727 // misconfigured mock. Try to switch to Failure. 728 Failure failure; 729 try { 730 failure = new Failure(t); 731 } catch (Throwable oomMostLikely) { 732 failure = Failure.FALLBACK_INSTANCE; 733 } 734 // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok. 735 boolean unused = ATOMIC_HELPER.casValue(this, valueToSet, failure); 736 } 737 return true; 738 } 739 localValue = value; // we lost the cas, fall through and maybe cancel 740 } 741 // The future has already been set to something. If it is cancellation we should cancel the 742 // incoming future. 743 if (localValue instanceof Cancellation) { 744 // we don't care if it fails, this is best-effort. 745 future.cancel(((Cancellation) localValue).wasInterrupted); 746 } 747 return false; 748 } 749 750 /** 751 * Returns a value, suitable for storing in the {@link #value} field. From the given future, 752 * which is assumed to be done. 753 * 754 * <p>This is approximately the inverse of {@link #getDoneValue(Object)} 755 */ 756 private static Object getFutureValue(ListenableFuture<?> future) { 757 Object valueToSet; 758 if (future instanceof TrustedFuture) { 759 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 760 // override .get() (since it is final) and therefore this is equivalent to calling .get() 761 // and unpacking the exceptions like we do below (just much faster because it is a single 762 // field read instead of a read, several branches and possibly creating exceptions). 763 return ((AbstractFuture<?>) future).value; 764 } else { 765 // Otherwise calculate valueToSet by calling .get() 766 try { 767 Object v = getDone(future); 768 valueToSet = v == null ? NULL : v; 769 } catch (ExecutionException exception) { 770 valueToSet = new Failure(exception.getCause()); 771 } catch (CancellationException cancellation) { 772 valueToSet = new Cancellation(false, cancellation); 773 } catch (Throwable t) { 774 valueToSet = new Failure(t); 775 } 776 } 777 return valueToSet; 778 } 779 780 /** Unblocks all threads and runs all listeners. */ 781 private static void complete(AbstractFuture<?> future) { 782 Listener next = null; 783 outer: while (true) { 784 future.releaseWaiters(); 785 // We call this before the listeners in order to avoid needing to manage a separate stack data 786 // structure for them. 787 // afterDone() should be generally fast and only used for cleanup work... but in theory can 788 // also be recursive and create StackOverflowErrors 789 future.afterDone(); 790 // push the current set of listeners onto next 791 next = future.clearListeners(next); 792 future = null; 793 while (next != null) { 794 Listener curr = next; 795 next = next.next; 796 Runnable task = curr.task; 797 if (task instanceof AbstractFuture.SetFuture) { 798 AbstractFuture.SetFuture<?> setFuture = (AbstractFuture.SetFuture) task; 799 // We unwind setFuture specifically to avoid StackOverflowErrors in the case of long 800 // chains of SetFutures 801 // Handling this special case is important because there is no way to pass an executor to 802 // setFuture, so a user couldn't break the chain by doing this themselves. It is also 803 // potentially common if someone writes a recursive Futures.transformAsync transformer. 804 future = setFuture.owner; 805 if (future.value == setFuture) { 806 Object valueToSet = getFutureValue(setFuture.future); 807 if (ATOMIC_HELPER.casValue(future, setFuture, valueToSet)) { 808 continue outer; 809 } 810 } 811 // other wise the future we were trying to set is already done. 812 } else { 813 executeListener(task, curr.executor); 814 } 815 } 816 break; 817 } 818 } 819 820 /** 821 * Callback method that is called exactly once after the future is completed. 822 * 823 * <p>If {@link #interruptTask} is also run during completion, {@link #afterDone} runs after it. 824 * 825 * <p>The default implementation of this method in {@code AbstractFuture} does nothing. This is 826 * intended for very lightweight cleanup work, for example, timing statistics or clearing fields. 827 * If your task does anything heavier consider, just using a listener with an executor. 828 * 829 * @since 20.0 830 */ 831 // TODO(cpovirk): @ForOverride https://github.com/google/error-prone/issues/342 832 @Beta 833 protected void afterDone() {} 834 835 /** 836 * Returns the exception that this {@code Future} completed with. This includes completion through 837 * a call to {@link setException} or {@link setFuture}{@code (failedFuture)} but not cancellation. 838 * 839 * @throws RuntimeException if the {@code Future} has not failed 840 */ 841 final Throwable trustedGetException() { 842 return ((Failure) value).exception; 843 } 844 845 /** 846 * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts) 847 * the given future (if available). 848 * 849 * <p>This method should be used only when this future is completed. It is designed to be called 850 * from {@code done}. 851 */ 852 final void maybePropagateCancellation(@Nullable Future<?> related) { 853 if (related != null & isCancelled()) { 854 related.cancel(wasInterrupted()); 855 } 856 } 857 858 /** Releases all threads in the {@link #waiters} list, and clears the list. */ 859 private void releaseWaiters() { 860 Waiter head; 861 do { 862 head = waiters; 863 } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); 864 for (Waiter currentWaiter = head; 865 currentWaiter != null; 866 currentWaiter = currentWaiter.next) { 867 currentWaiter.unpark(); 868 } 869 } 870 871 /** 872 * Clears the {@link #listeners} list and prepends its contents to {@code onto}, least recently 873 * added first. 874 */ 875 private Listener clearListeners(Listener onto) { 876 // We need to 877 // 1. atomically swap the listeners with TOMBSTONE, this is because addListener uses that to 878 // to synchronize with us 879 // 2. reverse the linked list, because despite our rather clear contract, people depend on us 880 // executing listeners in the order they were added 881 // 3. push all the items onto 'onto' and return the new head of the stack 882 Listener head; 883 do { 884 head = listeners; 885 } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); 886 Listener reversedList = onto; 887 while (head != null) { 888 Listener tmp = head; 889 head = head.next; 890 tmp.next = reversedList; 891 reversedList = tmp; 892 } 893 return reversedList; 894 } 895 896 /** 897 * Submits the given runnable to the given {@link Executor} catching and logging all 898 * {@linkplain RuntimeException runtime exceptions} thrown by the executor. 899 */ 900 private static void executeListener(Runnable runnable, Executor executor) { 901 try { 902 executor.execute(runnable); 903 } catch (RuntimeException e) { 904 // Log it and keep going -- bad runnable and/or executor. Don't punish the other runnables if 905 // we're given a bad one. We only catch RuntimeException because we want Errors to propagate 906 // up. 907 log.log( 908 Level.SEVERE, 909 "RuntimeException while executing runnable " + runnable + " with executor " + executor, 910 e); 911 } 912 } 913 914 private abstract static class AtomicHelper { 915 /** Non volatile write of the thread to the {@link Waiter#thread} field. */ 916 abstract void putThread(Waiter waiter, Thread newValue); 917 918 /** Non volatile write of the waiter to the {@link Waiter#next} field. */ 919 abstract void putNext(Waiter waiter, Waiter newValue); 920 921 /** Performs a CAS operation on the {@link #waiters} field. */ 922 abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update); 923 924 /** Performs a CAS operation on the {@link #listeners} field. */ 925 abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update); 926 927 /** Performs a CAS operation on the {@link #value} field. */ 928 abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update); 929 } 930 931 /** 932 * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. 933 * 934 * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot 935 * be accessed. 936 */ 937 private static final class UnsafeAtomicHelper extends AtomicHelper { 938 static final sun.misc.Unsafe UNSAFE; 939 static final long LISTENERS_OFFSET; 940 static final long WAITERS_OFFSET; 941 static final long VALUE_OFFSET; 942 static final long WAITER_THREAD_OFFSET; 943 static final long WAITER_NEXT_OFFSET; 944 945 static { 946 sun.misc.Unsafe unsafe = null; 947 try { 948 unsafe = sun.misc.Unsafe.getUnsafe(); 949 } catch (SecurityException tryReflectionInstead) { 950 try { 951 unsafe = 952 AccessController.doPrivileged( 953 new PrivilegedExceptionAction<sun.misc.Unsafe>() { 954 @Override 955 public sun.misc.Unsafe run() throws Exception { 956 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; 957 for (java.lang.reflect.Field f : k.getDeclaredFields()) { 958 f.setAccessible(true); 959 Object x = f.get(null); 960 if (k.isInstance(x)) { 961 return k.cast(x); 962 } 963 } 964 throw new NoSuchFieldError("the Unsafe"); 965 } 966 }); 967 } catch (PrivilegedActionException e) { 968 throw new RuntimeException("Could not initialize intrinsics", e.getCause()); 969 } 970 } 971 try { 972 Class<?> abstractFuture = AbstractFuture.class; 973 WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters")); 974 LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners")); 975 VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value")); 976 WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread")); 977 WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next")); 978 UNSAFE = unsafe; 979 } catch (Exception e) { 980 throwIfUnchecked(e); 981 throw new RuntimeException(e); 982 } 983 } 984 985 @Override 986 void putThread(Waiter waiter, Thread newValue) { 987 UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue); 988 } 989 990 @Override 991 void putNext(Waiter waiter, Waiter newValue) { 992 UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue); 993 } 994 995 /** Performs a CAS operation on the {@link #waiters} field. */ 996 @Override 997 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 998 return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update); 999 } 1000 1001 /** Performs a CAS operation on the {@link #listeners} field. */ 1002 @Override 1003 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1004 return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update); 1005 } 1006 1007 /** Performs a CAS operation on the {@link #value} field. */ 1008 @Override 1009 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1010 return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update); 1011 } 1012 } 1013 1014 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 1015 private static final class SafeAtomicHelper extends AtomicHelper { 1016 final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; 1017 final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; 1018 final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater; 1019 final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater; 1020 final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater; 1021 1022 SafeAtomicHelper( 1023 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, 1024 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, 1025 AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, 1026 AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, 1027 AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) { 1028 this.waiterThreadUpdater = waiterThreadUpdater; 1029 this.waiterNextUpdater = waiterNextUpdater; 1030 this.waitersUpdater = waitersUpdater; 1031 this.listenersUpdater = listenersUpdater; 1032 this.valueUpdater = valueUpdater; 1033 } 1034 1035 @Override 1036 void putThread(Waiter waiter, Thread newValue) { 1037 waiterThreadUpdater.lazySet(waiter, newValue); 1038 } 1039 1040 @Override 1041 void putNext(Waiter waiter, Waiter newValue) { 1042 waiterNextUpdater.lazySet(waiter, newValue); 1043 } 1044 1045 @Override 1046 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1047 return waitersUpdater.compareAndSet(future, expect, update); 1048 } 1049 1050 @Override 1051 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1052 return listenersUpdater.compareAndSet(future, expect, update); 1053 } 1054 1055 @Override 1056 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1057 return valueUpdater.compareAndSet(future, expect, update); 1058 } 1059 } 1060 1061 /** 1062 * {@link AtomicHelper} based on {@code synchronized} and volatile writes. 1063 * 1064 * <p>This is an implementation of last resort for when certain basic VM features are broken (like 1065 * AtomicReferenceFieldUpdater). 1066 */ 1067 private static final class SynchronizedHelper extends AtomicHelper { 1068 @Override 1069 void putThread(Waiter waiter, Thread newValue) { 1070 waiter.thread = newValue; 1071 } 1072 1073 @Override 1074 void putNext(Waiter waiter, Waiter newValue) { 1075 waiter.next = newValue; 1076 } 1077 1078 @Override 1079 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 1080 synchronized (future) { 1081 if (future.waiters == expect) { 1082 future.waiters = update; 1083 return true; 1084 } 1085 return false; 1086 } 1087 } 1088 1089 @Override 1090 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1091 synchronized (future) { 1092 if (future.listeners == expect) { 1093 future.listeners = update; 1094 return true; 1095 } 1096 return false; 1097 } 1098 } 1099 1100 @Override 1101 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1102 synchronized (future) { 1103 if (future.value == expect) { 1104 future.value = update; 1105 return true; 1106 } 1107 return false; 1108 } 1109 } 1110 } 1111 1112 private static CancellationException cancellationExceptionWithCause( 1113 @Nullable String message, @Nullable Throwable cause) { 1114 CancellationException exception = new CancellationException(message); 1115 exception.initCause(cause); 1116 return exception; 1117 } 1118}