001/* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkNotNull; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 022 023import com.google.common.annotations.Beta; 024import com.google.common.annotations.GwtCompatible; 025import com.google.common.base.Throwables; 026 027import java.security.AccessController; 028import java.security.PrivilegedActionException; 029import java.security.PrivilegedExceptionAction; 030import java.util.concurrent.CancellationException; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.Executor; 033import java.util.concurrent.Future; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 037import java.util.concurrent.locks.LockSupport; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041import javax.annotation.Nullable; 042 043/** 044 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 045 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 046 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 047 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, Function) 048 * Futures.transform} and {@link Futures#catching(ListenableFuture, Class, Function) 049 * Futures.catching}. 050 * 051 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 052 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 053 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 054 * {@link #interruptTask()}, which will be invoked automatically if a call to {@link 055 * #cancel(boolean) cancel(true)} succeeds in canceling the future. Subclasses should rarely 056 * override other methods. 057 * 058 * @author Sven Mawson 059 * @author Luke Sandberg 060 * @since 1.0 061 */ 062@GwtCompatible(emulated = true) 063public abstract class AbstractFuture<V> implements ListenableFuture<V> { 064 private static final boolean GENERATE_CANCELLATION_CAUSES = 065 Boolean.parseBoolean( 066 System.getProperty("guava.concurrent.generate_cancellation_cause", "false")); 067 068 /** 069 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 070 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 071 */ 072 abstract static class TrustedFuture<V> extends AbstractFuture<V> { 073 // N.B. cancel is not overridden to be final, because many future utilities need to override 074 // cancel in order to propagate cancellation to other futures. 075 // TODO(lukes): with maybePropagateCancellation this is no longer really true. Track down the 076 // final few cases and eliminate their overrides of cancel() 077 078 @Override public final V get() throws InterruptedException, ExecutionException { 079 return super.get(); 080 } 081 082 @Override public final V get(long timeout, TimeUnit unit) 083 throws InterruptedException, ExecutionException, TimeoutException { 084 return super.get(timeout, unit); 085 } 086 087 @Override public final boolean isDone() { 088 return super.isDone(); 089 } 090 091 @Override public final boolean isCancelled() { 092 return super.isCancelled(); 093 } 094 095 @Override public final void addListener(Runnable listener, Executor executor) { 096 super.addListener(listener, executor); 097 } 098 } 099 100 // Logger to log exceptions caught when running listeners. 101 private static final Logger log = Logger.getLogger(AbstractFuture.class.getName()); 102 103 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 104 // blocking. This value is what AbstractQueuedSynchronizer uses. 105 private static final long SPIN_THRESHOLD_NANOS = 1000L; 106 107 private static final AtomicHelper ATOMIC_HELPER; 108 109 static { 110 AtomicHelper helper; 111 112 try { 113 helper = new UnsafeAtomicHelper(); 114 } catch (Throwable unsafeFailure) { 115 // catch absolutely everything and fall through to our 'SafeAtomicHelper' 116 // The access control checks that ARFU does means the caller class has to be AbstractFuture 117 // instead of SafeAtomicHelper, so we annoyingly define these here 118 try { 119 helper = new SafeAtomicHelper( 120 newUpdater(Waiter.class, Thread.class, "thread"), 121 newUpdater(Waiter.class, Waiter.class, "next"), 122 newUpdater(AbstractFuture.class, Waiter.class, "waiters"), 123 newUpdater(AbstractFuture.class, Listener.class, "listeners"), 124 newUpdater(AbstractFuture.class, Object.class, "value")); 125 } catch (Throwable atomicReferenceFieldUpdaterFailure) { 126 // Some Android 5.0.x Samsung devices have bugs in JDK reflection APIs that cause 127 // getDeclaredField to throw a NoSuchFieldException when the field is definitely there. 128 // For these users fallback to a suboptimal implementation, based on synchronized. This 129 // will be a definite performance hit to those users. 130 log.log(Level.SEVERE, "UnsafeAtomicHelper is broken!", unsafeFailure); 131 log.log(Level.SEVERE, "SafeAtomicHelper is broken!", atomicReferenceFieldUpdaterFailure); 132 helper = new SynchronizedHelper(); 133 } 134 } 135 ATOMIC_HELPER = helper; 136 137 // Prevent rare disastrous classloading in first call to LockSupport.park. 138 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 139 @SuppressWarnings("unused") 140 Class<?> ensureLoaded = LockSupport.class; 141 } 142 143 /** 144 * Waiter links form a Treiber stack, in the {@link #waiters} field. 145 */ 146 private static final class Waiter { 147 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 148 149 @Nullable volatile Thread thread; 150 @Nullable volatile Waiter next; 151 152 // Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 153 // before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 154 Waiter(boolean unused) {} 155 156 Waiter() { 157 // avoid volatile write, write is made visible by subsequent CAS on waiters field 158 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 159 } 160 161 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 162 // field. 163 void setNext(Waiter next) { 164 ATOMIC_HELPER.putNext(this, next); 165 } 166 167 void unpark() { 168 // This is racy with removeWaiter. The consequence of the race is that we may spuriously 169 // call unpark even though the thread has already removed itself from the list. But even if 170 // we did use a CAS, that race would still exist (it would just be ever so slightly smaller). 171 Thread w = thread; 172 if (w != null) { 173 thread = null; 174 LockSupport.unpark(w); 175 } 176 } 177 } 178 179 /** 180 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 181 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are 182 * saved by two things. 183 * <ul> 184 * <li>This is only called when a waiting thread times out or is interrupted. Both of which 185 * should be rare. 186 * <li>The waiters list should be very short. 187 * </ul> 188 */ 189 private void removeWaiter(Waiter node) { 190 node.thread = null; // mark as 'deleted' 191 restart: while (true) { 192 Waiter pred = null; 193 Waiter curr = waiters; 194 if (curr == Waiter.TOMBSTONE) { 195 return; // give up if someone is calling complete 196 } 197 Waiter succ; 198 while (curr != null) { 199 succ = curr.next; 200 if (curr.thread != null) { // we aren't unlinking this node, update pred. 201 pred = curr; 202 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 203 pred.next = succ; 204 if (pred.thread == null) { // We raced with another node that unlinked pred. Restart. 205 continue restart; 206 } 207 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 208 continue restart; // We raced with an add or complete 209 } 210 curr = succ; 211 } 212 break; 213 } 214 } 215 216 /** Listeners also form a stack through the {@link #listeners} field. */ 217 private static final class Listener { 218 static final Listener TOMBSTONE = new Listener(null, null); 219 final Runnable task; 220 final Executor executor; 221 222 // writes to next are made visible by subsequent CAS's on the listeners field 223 @Nullable Listener next; 224 225 Listener(Runnable task, Executor executor) { 226 this.task = task; 227 this.executor = executor; 228 } 229 } 230 231 /** A special value to represent {@code null}. */ 232 private static final Object NULL = new Object(); 233 234 /** A special value to represent failure, when {@link #setException} is called successfully. */ 235 private static final class Failure { 236 static final Failure FALLBACK_INSTANCE = new Failure( 237 new Throwable("Failure occurred while trying to finish a future.") { 238 @Override public synchronized Throwable fillInStackTrace() { 239 return this; // no stack trace 240 } 241 }); 242 final Throwable exception; 243 244 Failure(Throwable exception) { 245 this.exception = checkNotNull(exception); 246 } 247 } 248 249 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 250 private static final class Cancellation { 251 final boolean wasInterrupted; 252 @Nullable final Throwable cause; 253 254 Cancellation(boolean wasInterrupted, @Nullable Throwable cause) { 255 this.wasInterrupted = wasInterrupted; 256 this.cause = cause; 257 } 258 } 259 260 /** A special value that encodes the 'setFuture' state. */ 261 private final class SetFuture implements Runnable { 262 final ListenableFuture<? extends V> future; 263 264 SetFuture(ListenableFuture<? extends V> future) { 265 this.future = future; 266 } 267 268 @Override public void run() { 269 if (value != this) { 270 // nothing to do, we must have been cancelled 271 return; 272 } 273 completeWithFuture(future, this); 274 } 275 } 276 277 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 278 // available. 279 /** 280 * This field encodes the current state of the future. 281 * 282 * <p>The valid values are: 283 * <ul> 284 * <li>{@code null} initial state, nothing has happened. 285 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 286 * <li>{@link Failure} terminal state, {@code setException} was called. 287 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 288 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 289 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null 290 * argument. 291 * </ul> 292 */ 293 private volatile Object value; 294 295 /** All listeners. */ 296 private volatile Listener listeners; 297 298 /** All waiting threads. */ 299 private volatile Waiter waiters; 300 301 /** 302 * Constructor for use by subclasses. 303 */ 304 protected AbstractFuture() {} 305 306 /* 307 * Improve the documentation of when InterruptedException is thrown. Our 308 * behavior matches the JDK's, but the JDK's documentation is misleading. 309 */ 310 311 // Gets and Timed Gets 312 // 313 // * Be responsive to interruption 314 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 315 // waiters field. 316 // * Future completion is defined by when #value becomes non-null/non SetFuture 317 // * Future completion can be observed if the waiters field contains a TOMBSTONE 318 319 // Timed Get 320 // There are a few design constraints to consider 321 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 322 // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the 323 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 324 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 325 // similar purposes. 326 // * We want to behave reasonably for timeouts of 0 327 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 328 // system scheduling and as such we could either miss our deadline, or unpark() could be delayed 329 // so that it looks like we timed out even though we didn't. For comparison FutureTask respects 330 // completion preferably and AQS is non-deterministic (depends on where in the queue the waiter 331 // is). If we wanted to be strict about it, we could store the unpark() time in the Waiter 332 // node and we could use that to make a decision about whether or not we timed out prior to 333 // being unparked. 334 335 /** 336 * {@inheritDoc} 337 * 338 * <p>The default {@link AbstractFuture} implementation throws {@code 339 * InterruptedException} if the current thread is interrupted before or during 340 * the call, even if the value is already available. 341 * 342 * @throws InterruptedException if the current thread was interrupted before 343 * or during the call (optional but recommended). 344 * @throws CancellationException {@inheritDoc} 345 */ 346 @Override 347 public V get(long timeout, TimeUnit unit) 348 throws InterruptedException, TimeoutException, ExecutionException { 349 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop 350 // at the bottom and throw a timeoutexception. 351 long remainingNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit. 352 if (Thread.interrupted()) { 353 throw new InterruptedException(); 354 } 355 Object localValue = value; 356 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 357 return getDoneValue(localValue); 358 } 359 // we delay calling nanoTime until we know we will need to either park or spin 360 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 361 long_wait_loop: if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 362 Waiter oldHead = waiters; 363 if (oldHead != Waiter.TOMBSTONE) { 364 Waiter node = new Waiter(); 365 do { 366 node.setNext(oldHead); 367 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 368 while (true) { 369 LockSupport.parkNanos(this, remainingNanos); 370 // Check interruption first, if we woke up due to interruption we need to honor that. 371 if (Thread.interrupted()) { 372 removeWaiter(node); 373 throw new InterruptedException(); 374 } 375 376 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 377 // wakeup 378 localValue = value; 379 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 380 return getDoneValue(localValue); 381 } 382 383 // timed out? 384 remainingNanos = endNanos - System.nanoTime(); 385 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 386 // Remove the waiter, one way or another we are done parking this thread. 387 removeWaiter(node); 388 break long_wait_loop; // jump down to the busy wait loop 389 } 390 } 391 } 392 oldHead = waiters; // re-read and loop. 393 } while (oldHead != Waiter.TOMBSTONE); 394 } 395 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 396 // waiter. 397 return getDoneValue(value); 398 } 399 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the 400 // waiters list 401 while (remainingNanos > 0) { 402 localValue = value; 403 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 404 return getDoneValue(localValue); 405 } 406 if (Thread.interrupted()) { 407 throw new InterruptedException(); 408 } 409 remainingNanos = endNanos - System.nanoTime(); 410 } 411 throw new TimeoutException(); 412 } 413 414 /* 415 * Improve the documentation of when InterruptedException is thrown. Our 416 * behavior matches the JDK's, but the JDK's documentation is misleading. 417 */ 418 /** 419 * {@inheritDoc} 420 * 421 * <p>The default {@link AbstractFuture} implementation throws {@code 422 * InterruptedException} if the current thread is interrupted before or during 423 * the call, even if the value is already available. 424 * 425 * @throws InterruptedException if the current thread was interrupted before 426 * or during the call (optional but recommended). 427 * @throws CancellationException {@inheritDoc} 428 */ 429 @Override public V get() throws InterruptedException, ExecutionException { 430 if (Thread.interrupted()) { 431 throw new InterruptedException(); 432 } 433 Object localValue = value; 434 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 435 return getDoneValue(localValue); 436 } 437 Waiter oldHead = waiters; 438 if (oldHead != Waiter.TOMBSTONE) { 439 Waiter node = new Waiter(); 440 do { 441 node.setNext(oldHead); 442 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 443 // we are on the stack, now wait for completion. 444 while (true) { 445 LockSupport.park(this); 446 // Check interruption first, if we woke up due to interruption we need to honor that. 447 if (Thread.interrupted()) { 448 removeWaiter(node); 449 throw new InterruptedException(); 450 } 451 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 452 // wakeup 453 localValue = value; 454 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 455 return getDoneValue(localValue); 456 } 457 } 458 } 459 oldHead = waiters; // re-read and loop. 460 } while (oldHead != Waiter.TOMBSTONE); 461 } 462 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 463 // waiter. 464 return getDoneValue(value); 465 } 466 467 /** 468 * Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. 469 */ 470 private V getDoneValue(Object obj) throws ExecutionException { 471 // While this seems like it might be too branch-y, simple benchmarking proves it to be 472 // unmeasurable (comparing done AbstractFutures with immediateFuture) 473 if (obj instanceof Cancellation) { 474 throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); 475 } else if (obj instanceof Failure) { 476 throw new ExecutionException(((Failure) obj).exception); 477 } else if (obj == NULL) { 478 return null; 479 } else { 480 @SuppressWarnings("unchecked") // this is the only other option 481 V asV = (V) obj; 482 return asV; 483 } 484 } 485 486 @Override 487 public boolean isDone() { 488 final Object localValue = value; 489 return localValue != null & !(localValue instanceof AbstractFuture.SetFuture); 490 } 491 492 @Override 493 public boolean isCancelled() { 494 final Object localValue = value; 495 return localValue instanceof Cancellation; 496 } 497 498 /** 499 * {@inheritDoc} 500 * 501 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain 502 * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate 503 * {@code Future} that was supplied in the {@code setFuture} call. 504 */ 505 @Override 506 public boolean cancel(boolean mayInterruptIfRunning) { 507 Object localValue = value; 508 if (localValue == null | localValue instanceof AbstractFuture.SetFuture) { 509 // Try to delay allocating the exception. At this point we may still lose the CAS, but it is 510 // certainly less likely. 511 // TODO(lukes): this exception actually makes cancellation significantly more expensive :( 512 // I wonder if we should consider removing it or providing a mechanism to not do it. 513 Throwable cause = GENERATE_CANCELLATION_CAUSES ? newCancellationCause() : null; 514 Object valueToSet = new Cancellation(mayInterruptIfRunning, cause); 515 do { 516 if (ATOMIC_HELPER.casValue(this, localValue, valueToSet)) { 517 // We call interuptTask before calling complete(), first which is consistent with 518 // FutureTask 519 if (mayInterruptIfRunning) { 520 interruptTask(); 521 } 522 complete(); 523 if (localValue instanceof AbstractFuture.SetFuture) { 524 // propagate cancellation to the future set in setfuture, this is racy, and we don't 525 // care if we are successful or not. 526 ((AbstractFuture<?>.SetFuture) localValue).future.cancel(mayInterruptIfRunning); 527 } 528 return true; 529 } 530 // obj changed, reread 531 localValue = value; 532 // obj cannot be null at this point, because value can only change from null to non-null. So 533 // if value changed (and it did since we lost the CAS), then it cannot be null. 534 } while (localValue instanceof AbstractFuture.SetFuture); 535 } 536 return false; 537 } 538 539 /** 540 * Returns an exception to be used as the cause of the CancellationException thrown by 541 * {@link #get}. 542 * 543 * <p>Note: this method may be called speculatively. There is no guarantee that the future will 544 * be cancelled if this method is called. 545 */ 546 private Throwable newCancellationCause() { 547 return new CancellationException("Future.cancel() was called."); 548 } 549 550 /** 551 * Subclasses can override this method to implement interruption of the 552 * future's computation. The method is invoked automatically by a successful 553 * call to {@link #cancel(boolean) cancel(true)}. 554 * 555 * <p>The default implementation does nothing. 556 * 557 * @since 10.0 558 */ 559 protected void interruptTask() { 560 } 561 562 /** 563 * Returns true if this future was cancelled with {@code 564 * mayInterruptIfRunning} set to {@code true}. 565 * 566 * @since 14.0 567 */ 568 protected final boolean wasInterrupted() { 569 final Object localValue = value; 570 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 571 } 572 573 /** 574 * {@inheritDoc} 575 * 576 * @since 10.0 577 */ 578 @Override 579 public void addListener(Runnable listener, Executor executor) { 580 checkNotNull(listener, "Runnable was null."); 581 checkNotNull(executor, "Executor was null."); 582 Listener oldHead = listeners; 583 if (oldHead != Listener.TOMBSTONE) { 584 Listener newNode = new Listener(listener, executor); 585 do { 586 newNode.next = oldHead; 587 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 588 return; 589 } 590 oldHead = listeners; // re-read 591 } while (oldHead != Listener.TOMBSTONE); 592 } 593 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 594 // the listener. 595 executeListener(listener, executor); 596 } 597 598 /** 599 * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or 600 * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns, 601 * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was 602 * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code 603 * Future} may have previously been set asynchronously, in which case its result may not be known 604 * yet. That result, though not yet known, cannot by overridden by a call to a {@code set*} 605 * method, only by a call to {@link #cancel}. 606 * 607 * @param value the value to be used as the result 608 * @return true if the attempt was accepted, completing the {@code Future} 609 */ 610 protected boolean set(@Nullable V value) { 611 Object valueToSet = value == null ? NULL : value; 612 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 613 complete(); 614 return true; 615 } 616 return false; 617 } 618 619 /** 620 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 621 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 622 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> 623 * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the 624 * {@code Future} may have previously been set asynchronously, in which case its result may not be 625 * known yet. That result, though not yet known, cannot by overridden by a call to a {@code set*} 626 * method, only by a call to {@link #cancel}. 627 * 628 * @param throwable the exception to be used as the failed result 629 * @return true if the attempt was accepted, completing the {@code Future} 630 */ 631 protected boolean setException(Throwable throwable) { 632 Object valueToSet = new Failure(checkNotNull(throwable)); 633 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 634 complete(); 635 return true; 636 } 637 return false; 638 } 639 640 /** 641 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 642 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 643 * (including "set asynchronously," defined below). 644 * 645 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call 646 * is accepted, then this future is guaranteed to have been completed with the supplied future by 647 * the time this method returns. If the supplied future is not done and the call is accepted, then 648 * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known, 649 * cannot by overridden by a call to a {@code set*} method, only by a call to {@link #cancel}. 650 * 651 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 652 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 653 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 654 * Future}. 655 * 656 * @param future the future to delegate to 657 * @return true if the attempt was accepted, indicating that the {@code Future} was not previously 658 * cancelled or set. 659 * @since 19.0 660 */ 661 @Beta protected boolean setFuture(ListenableFuture<? extends V> future) { 662 checkNotNull(future); 663 Object localValue = value; 664 if (localValue == null) { 665 if (future.isDone()) { 666 return completeWithFuture(future, null); 667 } 668 SetFuture valueToSet = new SetFuture(future); 669 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 670 // the listener is responsible for calling completeWithFuture, directExecutor is appropriate 671 // since all we are doing is unpacking a completed future which should be fast. 672 try { 673 future.addListener(valueToSet, directExecutor()); 674 } catch (Throwable t) { 675 // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this 676 // must have been caused by addListener itself. The most likely explanation is a 677 // misconfigured mock. Try to switch to Failure. 678 Failure failure; 679 try { 680 failure = new Failure(t); 681 } catch (Throwable oomMostLikely) { 682 failure = Failure.FALLBACK_INSTANCE; 683 } 684 // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok. 685 ATOMIC_HELPER.casValue(this, valueToSet, failure); 686 } 687 return true; 688 } 689 localValue = value; // we lost the cas, fall through and maybe cancel 690 } 691 // The future has already been set to something. If it is cancellation we should cancel the 692 // incoming future. 693 if (localValue instanceof Cancellation) { 694 // we don't care if it fails, this is best-effort. 695 future.cancel(((Cancellation) localValue).wasInterrupted); 696 } 697 return false; 698 } 699 700 /** 701 * Called when a future passed via setFuture has completed. 702 * 703 * @param future the done future to complete this future with. 704 * @param expected the expected value of the {@link #value} field. 705 */ 706 private boolean completeWithFuture(ListenableFuture<? extends V> future, Object expected) { 707 Object valueToSet; 708 if (future instanceof TrustedFuture) { 709 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 710 // override .get() (since it is final) and therefore this is equivalent to calling .get() 711 // and unpacking the exceptions like we do below (just much faster because it is a single 712 // field read instead of a read, several branches and possibly creating exceptions). 713 valueToSet = ((AbstractFuture<?>) future).value; 714 } else { 715 // Otherwise calculate valueToSet by calling .get() 716 try { 717 V v = Uninterruptibles.getUninterruptibly(future); 718 valueToSet = v == null ? NULL : v; 719 } catch (ExecutionException exception) { 720 valueToSet = new Failure(exception.getCause()); 721 } catch (CancellationException cancellation) { 722 valueToSet = new Cancellation(false, cancellation); 723 } catch (Throwable t) { 724 valueToSet = new Failure(t); 725 } 726 } 727 // The only way this can fail is if we raced with another thread calling cancel(). If we lost 728 // that race then there is nothing to do. 729 if (ATOMIC_HELPER.casValue(AbstractFuture.this, expected, valueToSet)) { 730 complete(); 731 return true; 732 } 733 return false; 734 } 735 736 /** Unblocks all threads and runs all listeners. */ 737 private void complete() { 738 for (Waiter currentWaiter = clearWaiters(); 739 currentWaiter != null; 740 currentWaiter = currentWaiter.next) { 741 currentWaiter.unpark(); 742 } 743 // We need to reverse the list to handle buggy listeners that depend on ordering. 744 Listener currentListener = clearListeners(); 745 Listener reversedList = null; 746 while (currentListener != null) { 747 Listener tmp = currentListener; 748 currentListener = currentListener.next; 749 tmp.next = reversedList; 750 reversedList = tmp; 751 } 752 for (; reversedList != null; reversedList = reversedList.next) { 753 executeListener(reversedList.task, reversedList.executor); 754 } 755 // We call this after the listeners on the theory that done() will only be used for 'cleanup' 756 // oriented tasks (e.g. clearing fields) and so can wait behind listeners which may be executing 757 // more important work. A counter argument would be that done() is trusted code and therefore 758 // it would be safe to run before potentially slow or poorly behaved listeners. Reevaluate this 759 // once we have more examples of done() implementations. 760 done(); 761 } 762 763 /** 764 * Callback method that is called immediately after the future is completed. 765 * 766 * <p>This is called exactly once, after all listeners have executed. By default it does nothing. 767 */ 768 void done() {} 769 770 /** 771 * Returns the exception that this {@code Future} completed with. This includes completion through 772 * a call to {@link setException} or {@link setFuture}{@code (failedFuture)} but not cancellation. 773 * 774 * @throws RuntimeException if the {@code Future} has not failed 775 */ 776 final Throwable trustedGetException() { 777 return ((Failure) value).exception; 778 } 779 780 /** 781 * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts) 782 * the given future (if available). 783 * 784 * <p>This method should be used only when this future is completed. It is designed to be called 785 * from {@code done}. 786 */ 787 final void maybePropagateCancellation(@Nullable Future<?> related) { 788 if (related != null & isCancelled()) { 789 related.cancel(wasInterrupted()); 790 } 791 } 792 793 /** Clears the {@link #waiters} list and returns the most recently added value. */ 794 private Waiter clearWaiters() { 795 Waiter head; 796 do { 797 head = waiters; 798 } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); 799 return head; 800 } 801 802 /** Clears the {@link #listeners} list and returns the most recently added value. */ 803 private Listener clearListeners() { 804 Listener head; 805 do { 806 head = listeners; 807 } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); 808 return head; 809 } 810 811 /** 812 * Submits the given runnable to the given {@link Executor} catching and logging all 813 * {@linkplain RuntimeException runtime exceptions} thrown by the executor. 814 */ 815 private static void executeListener(Runnable runnable, Executor executor) { 816 try { 817 executor.execute(runnable); 818 } catch (RuntimeException e) { 819 // Log it and keep going, bad runnable and/or executor. Don't 820 // punish the other runnables if we're given a bad one. We only 821 // catch RuntimeException because we want Errors to propagate up. 822 log.log(Level.SEVERE, "RuntimeException while executing runnable " 823 + runnable + " with executor " + executor, e); 824 } 825 } 826 827 static final CancellationException cancellationExceptionWithCause( 828 @Nullable String message, @Nullable Throwable cause) { 829 CancellationException exception = new CancellationException(message); 830 exception.initCause(cause); 831 return exception; 832 } 833 834 private abstract static class AtomicHelper { 835 /** Non volatile write of the thread to the {@link Waiter#thread} field. */ 836 abstract void putThread(Waiter waiter, Thread newValue); 837 838 /** Non volatile write of the waiter to the {@link Waiter#next} field. */ 839 abstract void putNext(Waiter waiter, Waiter newValue); 840 841 /** Performs a CAS operation on the {@link #waiters} field. */ 842 abstract boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update); 843 844 /** Performs a CAS operation on the {@link #listeners} field. */ 845 abstract boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update); 846 847 /** Performs a CAS operation on the {@link #value} field. */ 848 abstract boolean casValue(AbstractFuture<?> future, Object expect, Object update); 849 } 850 851 /** 852 * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. 853 * 854 * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot 855 * be accessed. 856 */ 857 private static final class UnsafeAtomicHelper extends AtomicHelper { 858 static final sun.misc.Unsafe UNSAFE; 859 static final long LISTENERS_OFFSET; 860 static final long WAITERS_OFFSET; 861 static final long VALUE_OFFSET; 862 static final long WAITER_THREAD_OFFSET; 863 static final long WAITER_NEXT_OFFSET; 864 865 static { 866 sun.misc.Unsafe unsafe = null; 867 try { 868 unsafe = sun.misc.Unsafe.getUnsafe(); 869 } catch (SecurityException tryReflectionInstead) { 870 try { 871 unsafe = AccessController.doPrivileged( 872 new PrivilegedExceptionAction<sun.misc.Unsafe>() { 873 @Override public sun.misc.Unsafe run() throws Exception { 874 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; 875 for (java.lang.reflect.Field f : k.getDeclaredFields()) { 876 f.setAccessible(true); 877 Object x = f.get(null); 878 if (k.isInstance(x)) { 879 return k.cast(x); 880 } 881 } 882 throw new NoSuchFieldError("the Unsafe"); 883 } 884 }); 885 } catch (PrivilegedActionException e) { 886 throw new RuntimeException("Could not initialize intrinsics", e.getCause()); 887 } 888 } 889 try { 890 Class<?> abstractFuture = AbstractFuture.class; 891 WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters")); 892 LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners")); 893 VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value")); 894 WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread")); 895 WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next")); 896 UNSAFE = unsafe; 897 } catch (Exception e) { 898 throw Throwables.propagate(e); 899 } 900 } 901 902 @Override 903 void putThread(Waiter waiter, Thread newValue) { 904 UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, newValue); 905 } 906 907 @Override 908 void putNext(Waiter waiter, Waiter newValue) { 909 UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, newValue); 910 } 911 912 /** Performs a CAS operation on the {@link #waiters} field. */ 913 @Override 914 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 915 return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, expect, update); 916 } 917 918 /** Performs a CAS operation on the {@link #listeners} field. */ 919 @Override 920 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 921 return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, expect, update); 922 } 923 924 /** Performs a CAS operation on the {@link #value} field. */ 925 @Override 926 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 927 return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expect, update); 928 } 929 } 930 931 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 932 private static final class SafeAtomicHelper extends AtomicHelper { 933 final AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater; 934 final AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater; 935 final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater; 936 final AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater; 937 final AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater; 938 939 SafeAtomicHelper( 940 AtomicReferenceFieldUpdater<Waiter, Thread> waiterThreadUpdater, 941 AtomicReferenceFieldUpdater<Waiter, Waiter> waiterNextUpdater, 942 AtomicReferenceFieldUpdater<AbstractFuture, Waiter> waitersUpdater, 943 AtomicReferenceFieldUpdater<AbstractFuture, Listener> listenersUpdater, 944 AtomicReferenceFieldUpdater<AbstractFuture, Object> valueUpdater) { 945 this.waiterThreadUpdater = waiterThreadUpdater; 946 this.waiterNextUpdater = waiterNextUpdater; 947 this.waitersUpdater = waitersUpdater; 948 this.listenersUpdater = listenersUpdater; 949 this.valueUpdater = valueUpdater; 950 } 951 952 @Override 953 void putThread(Waiter waiter, Thread newValue) { 954 waiterThreadUpdater.lazySet(waiter, newValue); 955 } 956 957 @Override 958 void putNext(Waiter waiter, Waiter newValue) { 959 waiterNextUpdater.lazySet(waiter, newValue); 960 } 961 962 @Override 963 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 964 return waitersUpdater.compareAndSet(future, expect, update); 965 } 966 967 @Override 968 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 969 return listenersUpdater.compareAndSet(future, expect, update); 970 } 971 972 @Override 973 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 974 return valueUpdater.compareAndSet(future, expect, update); 975 } 976 } 977 978 /** 979 * {@link AtomicHelper} based on {@code synchronized} and volatile writes. 980 * 981 * <p>This is an implementation of last resort for when certain basic VM features are broken 982 * (like AtomicReferenceFieldUpdater). 983 */ 984 private static final class SynchronizedHelper extends AtomicHelper { 985 @Override 986 void putThread(Waiter waiter, Thread newValue) { 987 waiter.thread = newValue; 988 } 989 990 @Override 991 void putNext(Waiter waiter, Waiter newValue) { 992 waiter.next = newValue; 993 } 994 995 @Override 996 boolean casWaiters(AbstractFuture<?> future, Waiter expect, Waiter update) { 997 synchronized (future) { 998 if (future.waiters == expect) { 999 future.waiters = update; 1000 return true; 1001 } 1002 return false; 1003 } 1004 } 1005 1006 @Override 1007 boolean casListeners(AbstractFuture<?> future, Listener expect, Listener update) { 1008 synchronized (future) { 1009 if (future.listeners == expect) { 1010 future.listeners = update; 1011 return true; 1012 } 1013 return false; 1014 } 1015 } 1016 1017 @Override 1018 boolean casValue(AbstractFuture<?> future, Object expect, Object update) { 1019 synchronized (future) { 1020 if (future.value == expect) { 1021 future.value = update; 1022 return true; 1023 } 1024 return false; 1025 } 1026 } 1027 } 1028}