001/* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkNotNull; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater; 022 023import com.google.common.annotations.Beta; 024import com.google.common.annotations.GwtCompatible; 025import com.google.common.base.Throwables; 026 027import java.security.AccessController; 028import java.security.PrivilegedActionException; 029import java.security.PrivilegedExceptionAction; 030import java.util.concurrent.CancellationException; 031import java.util.concurrent.ExecutionException; 032import java.util.concurrent.Executor; 033import java.util.concurrent.Future; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; 037import java.util.concurrent.locks.LockSupport; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040 041import javax.annotation.Nullable; 042 043/** 044 * An abstract implementation of {@link ListenableFuture}, intended for advanced users only. More 045 * common ways to create a {@code ListenableFuture} include instantiating a {@link SettableFuture}, 046 * submitting a task to a {@link ListeningExecutorService}, and deriving a {@code Future} from an 047 * existing one, typically using methods like {@link Futures#transform(ListenableFuture, Function) 048 * Futures.transform} and {@link Futures#catching(ListenableFuture, Class, Function) 049 * Futures.catching}. 050 * 051 * <p>This class implements all methods in {@code ListenableFuture}. Subclasses should provide a way 052 * to set the result of the computation through the protected methods {@link #set(Object)}, {@link 053 * #setFuture(ListenableFuture)} and {@link #setException(Throwable)}. Subclasses may also override 054 * {@link #interruptTask()}, which will be invoked automatically if a call to {@link 055 * #cancel(boolean) cancel(true)} succeeds in canceling the future. Subclasses should rarely 056 * override other methods. 057 * 058 * @author Sven Mawson 059 * @author Luke Sandberg 060 * @since 1.0 061 */ 062@GwtCompatible(emulated = true) 063public abstract class AbstractFuture<V> implements ListenableFuture<V> { 064 065 /** 066 * A less abstract subclass of AbstractFuture. This can be used to optimize setFuture by ensuring 067 * that {@link #get} calls exactly the implementation of {@link AbstractFuture#get}. 068 */ 069 abstract static class TrustedFuture<V> extends AbstractFuture<V> { 070 // N.B. cancel is not overridden to be final, because many future utilities need to override 071 // cancel in order to propagate cancellation to other futures. 072 073 @Override public final V get() throws InterruptedException, ExecutionException { 074 return super.get(); 075 } 076 077 @Override public final V get(long timeout, TimeUnit unit) 078 throws InterruptedException, ExecutionException, TimeoutException { 079 return super.get(timeout, unit); 080 } 081 082 @Override public final boolean isDone() { 083 return super.isDone(); 084 } 085 086 @Override public final boolean isCancelled() { 087 return super.isCancelled(); 088 } 089 090 @Override public final void addListener(Runnable listener, Executor executor) { 091 super.addListener(listener, executor); 092 } 093 } 094 095 // Logger to log exceptions caught when running listeners. 096 private static final Logger log = Logger.getLogger(AbstractFuture.class.getName()); 097 098 // A heuristic for timed gets. If the remaining timeout is less than this, spin instead of 099 // blocking. This value is what AbstractQueuedSynchronizer uses. 100 private static final long SPIN_THRESHOLD_NANOS = 1000L; 101 102 private static final AtomicHelper ATOMIC_HELPER; 103 private static final AtomicReferenceFieldUpdater<Waiter, Thread> WAITER_THREAD_UPDATER; 104 private static final AtomicReferenceFieldUpdater<Waiter, Waiter> WAITER_NEXT_UPDATER; 105 private static final AtomicReferenceFieldUpdater<AbstractFuture, Waiter> WAITERS_UPDATER; 106 private static final AtomicReferenceFieldUpdater<AbstractFuture, Listener> LISTENERS_UPDATER; 107 private static final AtomicReferenceFieldUpdater<AbstractFuture, Object> VALUE_UPDATER; 108 109 static { 110 AtomicHelper helper = null; 111 try { 112 helper = UnsafeAtomicHelperFactory.values()[0].tryCreateUnsafeAtomicHelper(); 113 } catch (Throwable e) { 114 // catch absolutely everything and fall through 115 } 116 if (helper == null) { 117 // The access control checks that ARFU does means the caller class has to be AbstractFuture 118 // instead of SafeAtomicHelper, so we annoyingly define these here 119 WAITER_THREAD_UPDATER = newUpdater(Waiter.class, Thread.class, "thread"); 120 WAITER_NEXT_UPDATER = newUpdater(Waiter.class, Waiter.class, "next"); 121 WAITERS_UPDATER = newUpdater(AbstractFuture.class, Waiter.class, "waiters"); 122 LISTENERS_UPDATER = newUpdater(AbstractFuture.class, Listener.class, "listeners"); 123 VALUE_UPDATER = newUpdater(AbstractFuture.class, Object.class, "value"); 124 helper = new SafeAtomicHelper(); 125 } else { 126 WAITER_THREAD_UPDATER = null; 127 WAITER_NEXT_UPDATER = null; 128 WAITERS_UPDATER = null; 129 LISTENERS_UPDATER = null; 130 VALUE_UPDATER = null; 131 } 132 ATOMIC_HELPER = helper; 133 134 // Prevent rare disastrous classloading in first call to LockSupport.park. 135 // See: https://bugs.openjdk.java.net/browse/JDK-8074773 136 Class<?> ensureLoaded = LockSupport.class; 137 } 138 139 /** 140 * Waiter links form a Treiber stack, in the {@link #waiters} field. 141 */ 142 private static final class Waiter { 143 static final Waiter TOMBSTONE = new Waiter(false /* ignored param */); 144 145 @Nullable volatile Thread thread; 146 @Nullable volatile Waiter next; 147 148 // Constructor for the TOMBSTONE, avoids use of ATOMIC_HELPER in case this class is loaded 149 // before the ATOMIC_HELPER. Apparently this is possible on some android platforms. 150 Waiter(boolean unused) {} 151 152 Waiter() { 153 // avoid volatile write, write is made visible by subsequent CAS on waiters field 154 ATOMIC_HELPER.putThread(this, Thread.currentThread()); 155 } 156 157 // non-volatile write to the next field. Should be made visible by subsequent CAS on waiters 158 // field. 159 void setNext(Waiter next) { 160 ATOMIC_HELPER.putNext(this, next); 161 } 162 163 void unpark() { 164 // This is racy with removeWaiter. The consequence of the race is that we may spuriously 165 // call unpark even though the thread has already removed itself from the list. But even if 166 // we did use a CAS, that race would still exist (it would just be ever so slightly smaller). 167 Thread w = thread; 168 if (w != null) { 169 thread = null; 170 LockSupport.unpark(w); 171 } 172 } 173 } 174 175 /** 176 * Marks the given node as 'deleted' (null waiter) and then scans the list to unlink all deleted 177 * nodes. This is an O(n) operation in the common case (and O(n^2) in the worst), but we are 178 * saved by two things. 179 * <ul> 180 * <li>This is only called when a waiting thread times out or is interrupted. Both of which 181 * should be rare. 182 * <li>The waiters list should be very short. 183 * </ul> 184 */ 185 private void removeWaiter(Waiter node) { 186 node.thread = null; // mark as 'deleted' 187 restart: while (true) { 188 Waiter pred = null; 189 Waiter curr = waiters; 190 if (curr == Waiter.TOMBSTONE) { 191 return; // give up if someone is calling complete 192 } 193 Waiter succ; 194 while (curr != null) { 195 succ = curr.next; 196 if (curr.thread != null) { // we aren't unlinking this node, update pred. 197 pred = curr; 198 } else if (pred != null) { // We are unlinking this node and it has a predecessor. 199 pred.next = succ; 200 if (pred.thread == null) { // We raced with another node that unlinked pred. Restart. 201 continue restart; 202 } 203 } else if (!ATOMIC_HELPER.casWaiters(this, curr, succ)) { // We are unlinking head 204 continue restart; // We raced with an add or complete 205 } 206 curr = succ; 207 } 208 break; 209 } 210 } 211 212 /** Listeners also form a stack through the {@link #listeners} field. */ 213 private static final class Listener { 214 static final Listener TOMBSTONE = new Listener(null, null); 215 final Runnable task; 216 final Executor executor; 217 218 // writes to next are made visible by subsequent CAS's on the listeners field 219 @Nullable Listener next; 220 221 Listener(Runnable task, Executor executor) { 222 this.task = task; 223 this.executor = executor; 224 } 225 } 226 227 /** A special value to represent {@code null}. */ 228 private static final Object NULL = new Object(); 229 230 /** A special value to represent failure, when {@link #setException} is called successfully. */ 231 private static final class Failure { 232 static final Failure FALLBACK_INSTANCE = new Failure( 233 new Throwable("Failure occurred while trying to finish a future.") { 234 @Override public synchronized Throwable fillInStackTrace() { 235 return this; // no stack trace 236 } 237 }); 238 final Throwable exception; 239 240 Failure(Throwable exception) { 241 this.exception = checkNotNull(exception); 242 } 243 } 244 245 /** A special value to represent cancellation and the 'wasInterrupted' bit. */ 246 private static final class Cancellation { 247 final boolean wasInterrupted; 248 final Throwable cause; 249 250 Cancellation(boolean wasInterrupted, Throwable cause) { 251 this.wasInterrupted = wasInterrupted; 252 this.cause = checkNotNull(cause); 253 } 254 } 255 256 /** A special value that encodes the 'setFuture' state. */ 257 private final class SetFuture implements Runnable { 258 final ListenableFuture<? extends V> future; 259 260 SetFuture(ListenableFuture<? extends V> future) { 261 this.future = future; 262 } 263 264 @Override public void run() { 265 if (value != this) { 266 // nothing to do, we must have been cancelled 267 return; 268 } 269 completeWithFuture(future, this); 270 } 271 } 272 273 // TODO(lukes): investigate using the @Contended annotation on these fields when jdk8 is 274 // available. 275 /** 276 * This field encodes the current state of the future. 277 * 278 * <p>The valid values are: 279 * <ul> 280 * <li>{@code null} initial state, nothing has happened. 281 * <li>{@link Cancellation} terminal state, {@code cancel} was called. 282 * <li>{@link Failure} terminal state, {@code setException} was called. 283 * <li>{@link SetFuture} intermediate state, {@code setFuture} was called. 284 * <li>{@link #NULL} terminal state, {@code set(null)} was called. 285 * <li>Any other non-null value, terminal state, {@code set} was called with a non-null 286 * argument. 287 * </ul> 288 */ 289 private volatile Object value; 290 291 /** All listeners. */ 292 private volatile Listener listeners; 293 294 /** All waiting threads. */ 295 private volatile Waiter waiters; 296 297 /** 298 * Constructor for use by subclasses. 299 */ 300 protected AbstractFuture() {} 301 302 /* 303 * Improve the documentation of when InterruptedException is thrown. Our 304 * behavior matches the JDK's, but the JDK's documentation is misleading. 305 */ 306 307 // Gets and Timed Gets 308 // 309 // * Be responsive to interruption 310 // * Don't create Waiter nodes if you aren't going to park, this helps reduce contention on the 311 // waiters field. 312 // * Future completion is defined by when #value becomes non-null/non SetFuture 313 // * Future completion can be observed if the waiters field contains a TOMBSTONE 314 315 // Timed Get 316 // There are a few design constraints to consider 317 // * We want to be responsive to small timeouts, unpark() has non trivial latency overheads (I 318 // have observed 12 micros on 64 bit linux systems to wake up a parked thread). So if the 319 // timeout is small we shouldn't park(). This needs to be traded off with the cpu overhead of 320 // spinning, so we use SPIN_THRESHOLD_NANOS which is what AbstractQueuedSynchronizer uses for 321 // similar purposes. 322 // * We want to behave reasonably for timeouts of 0 323 // * We are more responsive to completion than timeouts. This is because parkNanos depends on 324 // system scheduling and as such we could either miss our deadline, or unpark() could be delayed 325 // so that it looks like we timed out even though we didn't. For comparison FutureTask respects 326 // completion preferably and AQS is non-deterministic (depends on where in the queue the waiter 327 // is). If we wanted to be strict about it, we could store the unpark() time in the Waiter 328 // node and we could use that to make a decision about whether or not we timed out prior to 329 // being unparked. 330 331 /** 332 * {@inheritDoc} 333 * 334 * <p>The default {@link AbstractFuture} implementation throws {@code 335 * InterruptedException} if the current thread is interrupted before or during 336 * the call, even if the value is already available. 337 * 338 * @throws InterruptedException if the current thread was interrupted before 339 * or during the call (optional but recommended). 340 * @throws CancellationException {@inheritDoc} 341 */ 342 @Override 343 public V get(long timeout, TimeUnit unit) 344 throws InterruptedException, TimeoutException, ExecutionException { 345 // NOTE: if timeout < 0, remainingNanos will be < 0 and we will fall into the while(true) loop 346 // at the bottom and throw a timeoutexception. 347 long remainingNanos = unit.toNanos(timeout); // we rely on the implicit null check on unit. 348 if (Thread.interrupted()) { 349 throw new InterruptedException(); 350 } 351 Object localValue = value; 352 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 353 return getDoneValue(localValue); 354 } 355 // we delay calling nanoTime until we know we will need to either park or spin 356 final long endNanos = remainingNanos > 0 ? System.nanoTime() + remainingNanos : 0; 357 long_wait_loop: if (remainingNanos >= SPIN_THRESHOLD_NANOS) { 358 Waiter oldHead = waiters; 359 if (oldHead != Waiter.TOMBSTONE) { 360 Waiter node = new Waiter(); 361 do { 362 node.setNext(oldHead); 363 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 364 while (true) { 365 LockSupport.parkNanos(this, remainingNanos); 366 // Check interruption first, if we woke up due to interruption we need to honor that. 367 if (Thread.interrupted()) { 368 removeWaiter(node); 369 throw new InterruptedException(); 370 } 371 372 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 373 // wakeup 374 localValue = value; 375 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 376 return getDoneValue(localValue); 377 } 378 379 // timed out? 380 remainingNanos = endNanos - System.nanoTime(); 381 if (remainingNanos < SPIN_THRESHOLD_NANOS) { 382 // Remove the waiter, one way or another we are done parking this thread. 383 removeWaiter(node); 384 break long_wait_loop; // jump down to the busy wait loop 385 } 386 } 387 } 388 oldHead = waiters; // re-read and loop. 389 } while (oldHead != Waiter.TOMBSTONE); 390 } 391 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 392 // waiter. 393 return getDoneValue(value); 394 } 395 // If we get here then we have remainingNanos < SPIN_THRESHOLD_NANOS and there is no node on the 396 // waiters list 397 while (remainingNanos > 0) { 398 localValue = value; 399 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 400 return getDoneValue(localValue); 401 } 402 if (Thread.interrupted()) { 403 throw new InterruptedException(); 404 } 405 remainingNanos = endNanos - System.nanoTime(); 406 } 407 throw new TimeoutException(); 408 } 409 410 /* 411 * Improve the documentation of when InterruptedException is thrown. Our 412 * behavior matches the JDK's, but the JDK's documentation is misleading. 413 */ 414 /** 415 * {@inheritDoc} 416 * 417 * <p>The default {@link AbstractFuture} implementation throws {@code 418 * InterruptedException} if the current thread is interrupted before or during 419 * the call, even if the value is already available. 420 * 421 * @throws InterruptedException if the current thread was interrupted before 422 * or during the call (optional but recommended). 423 * @throws CancellationException {@inheritDoc} 424 */ 425 @Override public V get() throws InterruptedException, ExecutionException { 426 if (Thread.interrupted()) { 427 throw new InterruptedException(); 428 } 429 Object localValue = value; 430 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 431 return getDoneValue(localValue); 432 } 433 Waiter oldHead = waiters; 434 if (oldHead != Waiter.TOMBSTONE) { 435 Waiter node = new Waiter(); 436 do { 437 node.setNext(oldHead); 438 if (ATOMIC_HELPER.casWaiters(this, oldHead, node)) { 439 // we are on the stack, now wait for completion. 440 while (true) { 441 LockSupport.park(this); 442 // Check interruption first, if we woke up due to interruption we need to honor that. 443 if (Thread.interrupted()) { 444 removeWaiter(node); 445 throw new InterruptedException(); 446 } 447 // Otherwise re-read and check doneness. If we loop then it must have been a spurious 448 // wakeup 449 localValue = value; 450 if (localValue != null & !(localValue instanceof AbstractFuture.SetFuture)) { 451 return getDoneValue(localValue); 452 } 453 } 454 } 455 oldHead = waiters; // re-read and loop. 456 } while (oldHead != Waiter.TOMBSTONE); 457 } 458 // re-read value, if we get here then we must have observed a TOMBSTONE while trying to add a 459 // waiter. 460 return getDoneValue(value); 461 } 462 463 /** 464 * Unboxes {@code obj}. Assumes that obj is not {@code null} or a {@link SetFuture}. 465 */ 466 private V getDoneValue(Object obj) throws ExecutionException { 467 // While this seems like it might be too branch-y, simple benchmarking proves it to be 468 // unmeasurable (comparing done AbstractFutures with immediateFuture) 469 if (obj instanceof Cancellation) { 470 throw cancellationExceptionWithCause("Task was cancelled.", ((Cancellation) obj).cause); 471 } else if (obj instanceof Failure) { 472 throw new ExecutionException(((Failure) obj).exception); 473 } else if (obj == NULL) { 474 return null; 475 } else { 476 @SuppressWarnings("unchecked") // this is the only other option 477 V asV = (V) obj; 478 return asV; 479 } 480 } 481 482 @Override 483 public boolean isDone() { 484 final Object localValue = value; 485 return localValue != null & !(localValue instanceof AbstractFuture.SetFuture); 486 } 487 488 @Override 489 public boolean isCancelled() { 490 final Object localValue = value; 491 return localValue instanceof Cancellation; 492 } 493 494 /** 495 * {@inheritDoc} 496 * 497 * <p>If a cancellation attempt succeeds on a {@code Future} that had previously been {@linkplain 498 * #setFuture set asynchronously}, then the cancellation will also be propagated to the delegate 499 * {@code Future} that was supplied in the {@code setFuture} call. 500 */ 501 @Override 502 public boolean cancel(boolean mayInterruptIfRunning) { 503 Object localValue = value; 504 if (localValue == null | localValue instanceof AbstractFuture.SetFuture) { 505 // Try to delay allocating the exception. At this point we may still lose the CAS, but it is 506 // certainly less likely. 507 // TODO(lukes): this exception actually makes cancellation significantly more expensive :( 508 // I wonder if we should consider removing it or providing a mechanism to not do it. 509 Object valueToSet = new Cancellation(mayInterruptIfRunning, newCancellationCause()); 510 do { 511 if (ATOMIC_HELPER.casValue(this, localValue, valueToSet)) { 512 // We call interuptTask before calling complete(), first which is consistent with 513 // FutureTask 514 if (mayInterruptIfRunning) { 515 interruptTask(); 516 } 517 complete(); 518 if (localValue instanceof AbstractFuture.SetFuture) { 519 // propagate cancellation to the future set in setfuture, this is racy, and we don't 520 // care if we are successful or not. 521 ((AbstractFuture<?>.SetFuture) localValue).future.cancel(mayInterruptIfRunning); 522 } 523 return true; 524 } 525 // obj changed, reread 526 localValue = value; 527 // obj cannot be null at this point, because value can only change from null to non-null. So 528 // if value changed (and it did since we lost the CAS), then it cannot be null. 529 } while (localValue instanceof AbstractFuture.SetFuture); 530 } 531 return false; 532 } 533 534 /** 535 * Returns an exception to be used as the cause of the CancellationException thrown by 536 * {@link #get}. 537 * 538 * <p>Note: this method may be called speculatively. There is no guarantee that the future will 539 * be cancelled if this method is called. 540 * 541 * @since 19.0 542 */ 543 @Beta 544 protected Throwable newCancellationCause() { 545 return new CancellationException("Future.cancel() was called."); 546 } 547 548 /** 549 * Subclasses can override this method to implement interruption of the 550 * future's computation. The method is invoked automatically by a successful 551 * call to {@link #cancel(boolean) cancel(true)}. 552 * 553 * <p>The default implementation does nothing. 554 * 555 * @since 10.0 556 */ 557 protected void interruptTask() { 558 } 559 560 /** 561 * Returns true if this future was cancelled with {@code 562 * mayInterruptIfRunning} set to {@code true}. 563 * 564 * @since 14.0 565 */ 566 protected final boolean wasInterrupted() { 567 final Object localValue = value; 568 return (localValue instanceof Cancellation) && ((Cancellation) localValue).wasInterrupted; 569 } 570 571 /** 572 * {@inheritDoc} 573 * 574 * @since 10.0 575 */ 576 @Override 577 public void addListener(Runnable listener, Executor executor) { 578 checkNotNull(listener, "Runnable was null."); 579 checkNotNull(executor, "Executor was null."); 580 Listener oldHead = listeners; 581 if (oldHead != Listener.TOMBSTONE) { 582 Listener newNode = new Listener(listener, executor); 583 do { 584 newNode.next = oldHead; 585 if (ATOMIC_HELPER.casListeners(this, oldHead, newNode)) { 586 return; 587 } 588 oldHead = listeners; // re-read 589 } while (oldHead != Listener.TOMBSTONE); 590 } 591 // If we get here then the Listener TOMBSTONE was set, which means the future is done, call 592 // the listener. 593 executeListener(listener, executor); 594 } 595 596 /** 597 * Sets the result of this {@code Future} unless this {@code Future} has already been cancelled or 598 * set (including {@linkplain #setFuture set asynchronously}). When a call to this method returns, 599 * the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> the call was 600 * accepted (in which case it returns {@code true}). If it returns {@code false}, the {@code 601 * Future} may have previously been set asynchronously, in which case its result may not be known 602 * yet. That result, though not yet known, cannot by overridden by a call to a {@code set*} 603 * method, only by a call to {@link #cancel}. 604 * 605 * @param value the value to be used as the result 606 * @return true if the attempt was accepted, completing the {@code Future} 607 */ 608 protected boolean set(@Nullable V value) { 609 Object valueToSet = value == null ? NULL : value; 610 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 611 complete(); 612 return true; 613 } 614 return false; 615 } 616 617 /** 618 * Sets the failed result of this {@code Future} unless this {@code Future} has already been 619 * cancelled or set (including {@linkplain #setFuture set asynchronously}). When a call to this 620 * method returns, the {@code Future} is guaranteed to be {@linkplain #isDone done} <b>only if</b> 621 * the call was accepted (in which case it returns {@code true}). If it returns {@code false}, the 622 * {@code Future} may have previously been set asynchronously, in which case its result may not be 623 * known yet. That result, though not yet known, cannot by overridden by a call to a {@code set*} 624 * method, only by a call to {@link #cancel}. 625 * 626 * @param throwable the exception to be used as the failed result 627 * @return true if the attempt was accepted, completing the {@code Future} 628 */ 629 protected boolean setException(Throwable throwable) { 630 Object valueToSet = new Failure(checkNotNull(throwable)); 631 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 632 complete(); 633 return true; 634 } 635 return false; 636 } 637 638 /** 639 * Sets the result of this {@code Future} to match the supplied input {@code Future} once the 640 * supplied {@code Future} is done, unless this {@code Future} has already been cancelled or set 641 * (including "set asynchronously," defined below). 642 * 643 * <p>If the supplied future is {@linkplain #isDone done} when this method is called and the call 644 * is accepted, then this future is guaranteed to have been completed with the supplied future by 645 * the time this method returns. If the supplied future is not done and the call is accepted, then 646 * the future will be <i>set asynchronously</i>. Note that such a result, though not yet known, 647 * cannot by overridden by a call to a {@code set*} method, only by a call to {@link #cancel}. 648 * 649 * <p>If the call {@code setFuture(delegate)} is accepted and this {@code Future} is later 650 * cancelled, cancellation will be propagated to {@code delegate}. Additionally, any call to 651 * {@code setFuture} after any cancellation will propagate cancellation to the supplied {@code 652 * Future}. 653 * 654 * @param future the future to delegate to 655 * @return true if the attempt was accepted, indicating that the {@code Future} was not previously 656 * cancelled or set. 657 * @since 19.0 658 */ 659 @Beta protected boolean setFuture(ListenableFuture<? extends V> future) { 660 checkNotNull(future); 661 Object localValue = value; 662 if (localValue == null) { 663 if (future.isDone()) { 664 return completeWithFuture(future, null); 665 } 666 SetFuture valueToSet = new SetFuture(future); 667 if (ATOMIC_HELPER.casValue(this, null, valueToSet)) { 668 // the listener is responsible for calling completeWithFuture, directExecutor is appropriate 669 // since all we are doing is unpacking a completed future which should be fast. 670 try { 671 future.addListener(valueToSet, directExecutor()); 672 } catch (Throwable t) { 673 // addListener has thrown an exception! SetFuture.run can't throw any exceptions so this 674 // must have been caused by addListener itself. The most likely explanation is a 675 // misconfigured mock. Try to switch to Failure. 676 Failure failure; 677 try { 678 failure = new Failure(t); 679 } catch (Throwable oomMostLikely) { 680 failure = Failure.FALLBACK_INSTANCE; 681 } 682 // Note: The only way this CAS could fail is if cancel() has raced with us. That is ok. 683 ATOMIC_HELPER.casValue(this, valueToSet, failure); 684 } 685 return true; 686 } 687 localValue = value; // we lost the cas, fall through and maybe cancel 688 } 689 // The future has already been set to something. If it is cancellation we should cancel the 690 // incoming future. 691 if (localValue instanceof Cancellation) { 692 // we don't care if it fails, this is best-effort. 693 future.cancel(((Cancellation) localValue).wasInterrupted); 694 } 695 return false; 696 } 697 698 /** 699 * Called when a future passed via setFuture has completed. 700 * 701 * @param future the done future to complete this future with. 702 * @param expected the expected value of the {@link #value} field. 703 */ 704 private boolean completeWithFuture(ListenableFuture<? extends V> future, Object expected) { 705 Object valueToSet; 706 if (future instanceof TrustedFuture) { 707 // Break encapsulation for TrustedFuture instances since we know that subclasses cannot 708 // override .get() (since it is final) and therefore this is equivalent to calling .get() 709 // and unpacking the exceptions like we do below (just much faster because it is a single 710 // field read instead of a read, several branches and possibly creating exceptions). 711 valueToSet = ((AbstractFuture<?>) future).value; 712 } else { 713 // Otherwise calculate valueToSet by calling .get() 714 try { 715 V v = Uninterruptibles.getUninterruptibly(future); 716 valueToSet = v == null ? NULL : v; 717 } catch (ExecutionException exception) { 718 valueToSet = new Failure(exception.getCause()); 719 } catch (CancellationException cancellation) { 720 valueToSet = new Cancellation(false, cancellation); 721 } catch (Throwable t) { 722 valueToSet = new Failure(t); 723 } 724 } 725 // The only way this can fail is if we raced with another thread calling cancel(). If we lost 726 // that race then there is nothing to do. 727 if (ATOMIC_HELPER.casValue(AbstractFuture.this, expected, valueToSet)) { 728 complete(); 729 return true; 730 } 731 return false; 732 } 733 734 /** Unblocks all threads and runs all listeners. */ 735 private void complete() { 736 for (Waiter currentWaiter = clearWaiters(); 737 currentWaiter != null; 738 currentWaiter = currentWaiter.next) { 739 currentWaiter.unpark(); 740 } 741 // We need to reverse the list to handle buggy listeners that depend on ordering. 742 Listener currentListener = clearListeners(); 743 Listener reversedList = null; 744 while (currentListener != null) { 745 Listener tmp = currentListener; 746 currentListener = currentListener.next; 747 tmp.next = reversedList; 748 reversedList = tmp; 749 } 750 for (; reversedList != null; reversedList = reversedList.next) { 751 executeListener(reversedList.task, reversedList.executor); 752 } 753 // We call this after the listeners on the theory that done() will only be used for 'cleanup' 754 // oriented tasks (e.g. clearing fields) and so can wait behind listeners which may be executing 755 // more important work. A counter argument would be that done() is trusted code and therefore 756 // it would be safe to run before potentially slow or poorly behaved listeners. Reevaluate this 757 // once we have more examples of done() implementations. 758 done(); 759 } 760 761 /** 762 * Callback method that is called immediately after the future is completed. 763 * 764 * <p>This is called exactly once, after all listeners have executed. By default it does nothing. 765 */ 766 void done() {} 767 768 /** 769 * Returns the exception that this {@code Future} completed with. This includes completion through 770 * a call to {@link setException} or {@link setFuture}{@code (failedFuture)} but not cancellation. 771 * 772 * @throws RuntimeException if the {@code Future} has not failed 773 */ 774 final Throwable trustedGetException() { 775 return ((Failure) value).exception; 776 } 777 778 /** 779 * If this future has been cancelled (and possibly interrupted), cancels (and possibly interrupts) 780 * the given future (if available). 781 * 782 * <p>This method should be used only when this future is completed. It is designed to be called 783 * from {@code done}. 784 */ 785 final void maybePropagateCancellation(@Nullable Future<?> related) { 786 if (related != null & isCancelled()) { 787 related.cancel(wasInterrupted()); 788 } 789 } 790 791 /** Clears the {@link #waiters} list and returns the most recently added value. */ 792 private Waiter clearWaiters() { 793 Waiter head; 794 do { 795 head = waiters; 796 } while (!ATOMIC_HELPER.casWaiters(this, head, Waiter.TOMBSTONE)); 797 return head; 798 } 799 800 /** Clears the {@link #listeners} list and returns the most recently added value. */ 801 private Listener clearListeners() { 802 Listener head; 803 do { 804 head = listeners; 805 } while (!ATOMIC_HELPER.casListeners(this, head, Listener.TOMBSTONE)); 806 return head; 807 } 808 809 /** 810 * Submits the given runnable to the given {@link Executor} catching and logging all 811 * {@linkplain RuntimeException runtime exceptions} thrown by the executor. 812 */ 813 private static void executeListener(Runnable runnable, Executor executor) { 814 try { 815 executor.execute(runnable); 816 } catch (RuntimeException e) { 817 // Log it and keep going, bad runnable and/or executor. Don't 818 // punish the other runnables if we're given a bad one. We only 819 // catch RuntimeException because we want Errors to propagate up. 820 log.log(Level.SEVERE, "RuntimeException while executing runnable " 821 + runnable + " with executor " + executor, e); 822 } 823 } 824 825 static final CancellationException cancellationExceptionWithCause( 826 @Nullable String message, @Nullable Throwable cause) { 827 CancellationException exception = new CancellationException(message); 828 exception.initCause(cause); 829 return exception; 830 } 831 832 private abstract static class AtomicHelper { 833 /** Non volatile write of the thread to the {@link Waiter#thread} field. */ 834 abstract void putThread(Waiter waiter, Thread thread); 835 836 /** Non volatile write of the waiter to the {@link Waiter#next} field. */ 837 abstract void putNext(Waiter waiter, Waiter next); 838 839 /** Performs a CAS operation on the {@link #waiters} field. */ 840 abstract boolean casWaiters(AbstractFuture future, Waiter curr, Waiter next); 841 842 /** Performs a CAS operation on the {@link #listeners} field. */ 843 abstract boolean casListeners(AbstractFuture future, Listener curr, Listener next); 844 845 /** Performs a CAS operation on the {@link #value} field. */ 846 abstract boolean casValue(AbstractFuture future, Object expected, Object v); 847 } 848 849 /** 850 * Temporary hack to hide the reference to {@link UnsafeAtomicHelper} from Android. The caller of 851 * this code will execute {@link #tryCreateUnsafeAtomicHelper} on the <b>first</b> enum value 852 * present. On the server, this will try to create {@link UnsafeAtomicHelper}. On Android, it will 853 * just return {@code null}. 854 */ 855 private enum UnsafeAtomicHelperFactory { 856 @SuppressUnderAndroid // temporarily while we make Proguard tolerate Unsafe 857 REALLY_TRY_TO_CREATE { 858 @Override 859 AtomicHelper tryCreateUnsafeAtomicHelper() { 860 return new UnsafeAtomicHelper(); 861 } 862 }, 863 864 DONT_EVEN_TRY_TO_CREATE { 865 @Override 866 AtomicHelper tryCreateUnsafeAtomicHelper() { 867 return null; 868 } 869 }, 870 871 ; 872 873 abstract AtomicHelper tryCreateUnsafeAtomicHelper(); 874 } 875 876 /** 877 * {@link AtomicHelper} based on {@link sun.misc.Unsafe}. 878 * 879 * <p>Static initialization of this class will fail if the {@link sun.misc.Unsafe} object cannot 880 * be accessed. 881 */ 882 @SuppressUnderAndroid // temporarily while we make Proguard tolerate Unsafe 883 private static final class UnsafeAtomicHelper extends AtomicHelper { 884 static final sun.misc.Unsafe UNSAFE; 885 static final long LISTENERS_OFFSET; 886 static final long WAITERS_OFFSET; 887 static final long VALUE_OFFSET; 888 static final long WAITER_THREAD_OFFSET; 889 static final long WAITER_NEXT_OFFSET; 890 891 static { 892 sun.misc.Unsafe unsafe = null; 893 try { 894 unsafe = sun.misc.Unsafe.getUnsafe(); 895 } catch (SecurityException tryReflectionInstead) { 896 try { 897 unsafe = AccessController.doPrivileged( 898 new PrivilegedExceptionAction<sun.misc.Unsafe>() { 899 @Override public sun.misc.Unsafe run() throws Exception { 900 Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class; 901 for (java.lang.reflect.Field f : k.getDeclaredFields()) { 902 f.setAccessible(true); 903 Object x = f.get(null); 904 if (k.isInstance(x)) { 905 return k.cast(x); 906 } 907 } 908 throw new NoSuchFieldError("the Unsafe"); 909 } 910 }); 911 } catch (PrivilegedActionException e) { 912 throw new RuntimeException("Could not initialize intrinsics", e.getCause()); 913 } 914 } 915 try { 916 Class<?> abstractFuture = AbstractFuture.class; 917 WAITERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("waiters")); 918 LISTENERS_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("listeners")); 919 VALUE_OFFSET = unsafe.objectFieldOffset(abstractFuture.getDeclaredField("value")); 920 WAITER_THREAD_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("thread")); 921 WAITER_NEXT_OFFSET = unsafe.objectFieldOffset(Waiter.class.getDeclaredField("next")); 922 UNSAFE = unsafe; 923 } catch (Exception e) { 924 throw Throwables.propagate(e); 925 } 926 } 927 928 @Override 929 void putThread(Waiter waiter, Thread thread) { 930 UNSAFE.putObject(waiter, WAITER_THREAD_OFFSET, thread); 931 } 932 933 @Override 934 void putNext(Waiter waiter, Waiter next) { 935 UNSAFE.putObject(waiter, WAITER_NEXT_OFFSET, next); 936 } 937 938 /** Performs a CAS operation on the {@link #waiters} field. */ 939 @Override 940 boolean casWaiters(AbstractFuture future, Waiter curr, Waiter next) { 941 return UNSAFE.compareAndSwapObject(future, WAITERS_OFFSET, curr, next); 942 } 943 944 /** Performs a CAS operation on the {@link #listeners} field. */ 945 @Override 946 boolean casListeners(AbstractFuture future, Listener curr, Listener next) { 947 return UNSAFE.compareAndSwapObject(future, LISTENERS_OFFSET, curr, next); 948 } 949 950 /** Performs a CAS operation on the {@link #value} field. */ 951 @Override 952 boolean casValue(AbstractFuture future, Object expected, Object v) { 953 return UNSAFE.compareAndSwapObject(future, VALUE_OFFSET, expected, v); 954 } 955 } 956 957 /** {@link AtomicHelper} based on {@link AtomicReferenceFieldUpdater}. */ 958 private static final class SafeAtomicHelper extends AtomicHelper { 959 @Override void putThread(Waiter waiter, Thread thread) { 960 WAITER_THREAD_UPDATER.lazySet(waiter, thread); 961 } 962 963 @Override void putNext(Waiter waiter, Waiter next) { 964 WAITER_NEXT_UPDATER.lazySet(waiter, next); 965 } 966 967 @Override boolean casWaiters(AbstractFuture future, Waiter curr, Waiter next) { 968 return WAITERS_UPDATER.compareAndSet(future, curr, next); 969 } 970 971 @Override boolean casListeners(AbstractFuture future, Listener curr, Listener next) { 972 return LISTENERS_UPDATER.compareAndSet(future, curr, next); 973 } 974 975 @Override boolean casValue(AbstractFuture future, Object expected, Object v) { 976 return VALUE_UPDATER.compareAndSet(future, expected, v); 977 } 978 } 979}