001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 021import static com.google.common.util.concurrent.Service.State.FAILED; 022import static com.google.common.util.concurrent.Service.State.NEW; 023import static com.google.common.util.concurrent.Service.State.RUNNING; 024import static com.google.common.util.concurrent.Service.State.STARTING; 025import static com.google.common.util.concurrent.Service.State.STOPPING; 026import static com.google.common.util.concurrent.Service.State.TERMINATED; 027import static java.util.Objects.requireNonNull; 028 029import com.google.common.annotations.GwtIncompatible; 030import com.google.common.annotations.J2ktIncompatible; 031import com.google.common.util.concurrent.Monitor.Guard; 032import com.google.common.util.concurrent.Service.State; // javadoc needs this 033import com.google.errorprone.annotations.CanIgnoreReturnValue; 034import com.google.errorprone.annotations.ForOverride; 035import com.google.errorprone.annotations.concurrent.GuardedBy; 036import com.google.j2objc.annotations.WeakOuter; 037import java.time.Duration; 038import java.util.concurrent.Executor; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.TimeoutException; 041import javax.annotation.CheckForNull; 042 043/** 044 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 045 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 046 * callbacks. Its subclasses must manage threads manually; consider {@link 047 * AbstractExecutionThreadService} if you need only a single execution thread. 048 * 049 * @author Jesse Wilson 050 * @author Luke Sandberg 051 * @since 1.0 052 */ 053@GwtIncompatible 054@J2ktIncompatible 055@ElementTypesAreNonnullByDefault 056public abstract class AbstractService implements Service { 057 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 058 new ListenerCallQueue.Event<Listener>() { 059 @Override 060 public void call(Listener listener) { 061 listener.starting(); 062 } 063 064 @Override 065 public String toString() { 066 return "starting()"; 067 } 068 }; 069 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 070 new ListenerCallQueue.Event<Listener>() { 071 @Override 072 public void call(Listener listener) { 073 listener.running(); 074 } 075 076 @Override 077 public String toString() { 078 return "running()"; 079 } 080 }; 081 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 082 stoppingEvent(STARTING); 083 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 084 stoppingEvent(RUNNING); 085 086 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 087 terminatedEvent(NEW); 088 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 089 terminatedEvent(STARTING); 090 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 091 terminatedEvent(RUNNING); 092 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 093 terminatedEvent(STOPPING); 094 095 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 096 return new ListenerCallQueue.Event<Listener>() { 097 @Override 098 public void call(Listener listener) { 099 listener.terminated(from); 100 } 101 102 @Override 103 public String toString() { 104 return "terminated({from = " + from + "})"; 105 } 106 }; 107 } 108 109 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 110 return new ListenerCallQueue.Event<Listener>() { 111 @Override 112 public void call(Listener listener) { 113 listener.stopping(from); 114 } 115 116 @Override 117 public String toString() { 118 return "stopping({from = " + from + "})"; 119 } 120 }; 121 } 122 123 private final Monitor monitor = new Monitor(); 124 125 private final Guard isStartable = new IsStartableGuard(); 126 127 @WeakOuter 128 private final class IsStartableGuard extends Guard { 129 IsStartableGuard() { 130 super(AbstractService.this.monitor); 131 } 132 133 @Override 134 public boolean isSatisfied() { 135 return state() == NEW; 136 } 137 } 138 139 private final Guard isStoppable = new IsStoppableGuard(); 140 141 @WeakOuter 142 private final class IsStoppableGuard extends Guard { 143 IsStoppableGuard() { 144 super(AbstractService.this.monitor); 145 } 146 147 @Override 148 public boolean isSatisfied() { 149 return state().compareTo(RUNNING) <= 0; 150 } 151 } 152 153 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 154 155 @WeakOuter 156 private final class HasReachedRunningGuard extends Guard { 157 HasReachedRunningGuard() { 158 super(AbstractService.this.monitor); 159 } 160 161 @Override 162 public boolean isSatisfied() { 163 return state().compareTo(RUNNING) >= 0; 164 } 165 } 166 167 private final Guard isStopped = new IsStoppedGuard(); 168 169 @WeakOuter 170 private final class IsStoppedGuard extends Guard { 171 IsStoppedGuard() { 172 super(AbstractService.this.monitor); 173 } 174 175 @Override 176 public boolean isSatisfied() { 177 return state().compareTo(TERMINATED) >= 0; 178 } 179 } 180 181 /** The listeners to notify during a state transition. */ 182 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 183 184 /** 185 * The current state of the service. This should be written with the lock held but can be read 186 * without it because it is an immutable object in a volatile field. This is desirable so that 187 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 188 * without grabbing the lock. 189 * 190 * <p>To update this field correctly the lock must be held to guarantee that the state is 191 * consistent. 192 */ 193 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 194 195 /** Constructor for use by subclasses. */ 196 protected AbstractService() {} 197 198 /** 199 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 200 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 201 * or after it has returned. If startup fails, the invocation should cause a call to {@link 202 * #notifyFailed(Throwable)} instead. 203 * 204 * <p>This method should return promptly; prefer to do work on a different thread where it is 205 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 206 * called multiple times. 207 */ 208 @ForOverride 209 protected abstract void doStart(); 210 211 /** 212 * This method should be used to initiate service shutdown. The invocation of this method should 213 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 214 * returned. If shutdown fails, the invocation should cause a call to {@link 215 * #notifyFailed(Throwable)} instead. 216 * 217 * <p>This method should return promptly; prefer to do work on a different thread where it is 218 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 219 * called multiple times. 220 * 221 * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 222 * invoked immediately. Instead, it will be deferred until after the service is {@link 223 * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}. 224 */ 225 @ForOverride 226 protected abstract void doStop(); 227 228 /** 229 * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 230 * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 231 * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 232 * 233 * <p>This method should return promptly; prefer to do work on a different thread where it is 234 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 235 * called multiple times. 236 * 237 * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 238 * external state observable by the caller of {@link #stopAsync}. 239 * 240 * @since 27.0 241 */ 242 @ForOverride 243 protected void doCancelStart() {} 244 245 @CanIgnoreReturnValue 246 @Override 247 public final Service startAsync() { 248 if (monitor.enterIf(isStartable)) { 249 try { 250 snapshot = new StateSnapshot(STARTING); 251 enqueueStartingEvent(); 252 doStart(); 253 } catch (Throwable startupFailure) { 254 restoreInterruptIfIsInterruptedException(startupFailure); 255 notifyFailed(startupFailure); 256 } finally { 257 monitor.leave(); 258 dispatchListenerEvents(); 259 } 260 } else { 261 throw new IllegalStateException("Service " + this + " has already been started"); 262 } 263 return this; 264 } 265 266 @CanIgnoreReturnValue 267 @Override 268 public final Service stopAsync() { 269 if (monitor.enterIf(isStoppable)) { 270 try { 271 State previous = state(); 272 switch (previous) { 273 case NEW: 274 snapshot = new StateSnapshot(TERMINATED); 275 enqueueTerminatedEvent(NEW); 276 break; 277 case STARTING: 278 snapshot = new StateSnapshot(STARTING, true, null); 279 enqueueStoppingEvent(STARTING); 280 doCancelStart(); 281 break; 282 case RUNNING: 283 snapshot = new StateSnapshot(STOPPING); 284 enqueueStoppingEvent(RUNNING); 285 doStop(); 286 break; 287 case STOPPING: 288 case TERMINATED: 289 case FAILED: 290 // These cases are impossible due to the if statement above. 291 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 292 } 293 } catch (Throwable shutdownFailure) { 294 restoreInterruptIfIsInterruptedException(shutdownFailure); 295 notifyFailed(shutdownFailure); 296 } finally { 297 monitor.leave(); 298 dispatchListenerEvents(); 299 } 300 } 301 return this; 302 } 303 304 @Override 305 public final void awaitRunning() { 306 monitor.enterWhenUninterruptibly(hasReachedRunning); 307 try { 308 checkCurrentState(RUNNING); 309 } finally { 310 monitor.leave(); 311 } 312 } 313 314 /** @since 28.0 */ 315 @Override 316 public final void awaitRunning(Duration timeout) throws TimeoutException { 317 Service.super.awaitRunning(timeout); 318 } 319 320 @Override 321 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 322 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 323 try { 324 checkCurrentState(RUNNING); 325 } finally { 326 monitor.leave(); 327 } 328 } else { 329 // It is possible due to races that we are currently in the expected state even though we 330 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 331 // even check the guard. I don't think we care too much about this use case but it could lead 332 // to a confusing error message. 333 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 334 } 335 } 336 337 @Override 338 public final void awaitTerminated() { 339 monitor.enterWhenUninterruptibly(isStopped); 340 try { 341 checkCurrentState(TERMINATED); 342 } finally { 343 monitor.leave(); 344 } 345 } 346 347 /** @since 28.0 */ 348 @Override 349 public final void awaitTerminated(Duration timeout) throws TimeoutException { 350 Service.super.awaitTerminated(timeout); 351 } 352 353 @Override 354 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 355 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 356 try { 357 checkCurrentState(TERMINATED); 358 } finally { 359 monitor.leave(); 360 } 361 } else { 362 // It is possible due to races that we are currently in the expected state even though we 363 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 364 // even check the guard. I don't think we care too much about this use case but it could lead 365 // to a confusing error message. 366 throw new TimeoutException( 367 "Timed out waiting for " 368 + this 369 + " to reach a terminal state. " 370 + "Current state: " 371 + state()); 372 } 373 } 374 375 /** Checks that the current state is equal to the expected state. */ 376 @GuardedBy("monitor") 377 private void checkCurrentState(State expected) { 378 State actual = state(); 379 if (actual != expected) { 380 if (actual == FAILED) { 381 // Handle this specially so that we can include the failureCause, if there is one. 382 throw new IllegalStateException( 383 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 384 failureCause()); 385 } 386 throw new IllegalStateException( 387 "Expected the service " + this + " to be " + expected + ", but was " + actual); 388 } 389 } 390 391 /** 392 * Implementing classes should invoke this method once their service has started. It will cause 393 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 394 * 395 * @throws IllegalStateException if the service is not {@link State#STARTING}. 396 */ 397 protected final void notifyStarted() { 398 monitor.enter(); 399 try { 400 // We have to examine the internal state of the snapshot here to properly handle the stop 401 // while starting case. 402 if (snapshot.state != STARTING) { 403 IllegalStateException failure = 404 new IllegalStateException( 405 "Cannot notifyStarted() when the service is " + snapshot.state); 406 notifyFailed(failure); 407 throw failure; 408 } 409 410 if (snapshot.shutdownWhenStartupFinishes) { 411 snapshot = new StateSnapshot(STOPPING); 412 // We don't call listeners here because we already did that when we set the 413 // shutdownWhenStartupFinishes flag. 414 doStop(); 415 } else { 416 snapshot = new StateSnapshot(RUNNING); 417 enqueueRunningEvent(); 418 } 419 } finally { 420 monitor.leave(); 421 dispatchListenerEvents(); 422 } 423 } 424 425 /** 426 * Implementing classes should invoke this method once their service has stopped. It will cause 427 * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 428 * State#TERMINATED}. 429 * 430 * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 431 * State#STARTING}, or {@link State#RUNNING}. 432 */ 433 protected final void notifyStopped() { 434 monitor.enter(); 435 try { 436 State previous = state(); 437 switch (previous) { 438 case NEW: 439 case TERMINATED: 440 case FAILED: 441 throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 442 case RUNNING: 443 case STARTING: 444 case STOPPING: 445 snapshot = new StateSnapshot(TERMINATED); 446 enqueueTerminatedEvent(previous); 447 break; 448 } 449 } finally { 450 monitor.leave(); 451 dispatchListenerEvents(); 452 } 453 } 454 455 /** 456 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 457 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 458 * or otherwise cannot be started nor stopped. 459 */ 460 protected final void notifyFailed(Throwable cause) { 461 checkNotNull(cause); 462 463 monitor.enter(); 464 try { 465 State previous = state(); 466 switch (previous) { 467 case NEW: 468 case TERMINATED: 469 throw new IllegalStateException("Failed while in state:" + previous, cause); 470 case RUNNING: 471 case STARTING: 472 case STOPPING: 473 snapshot = new StateSnapshot(FAILED, false, cause); 474 enqueueFailedEvent(previous, cause); 475 break; 476 case FAILED: 477 // Do nothing 478 break; 479 } 480 } finally { 481 monitor.leave(); 482 dispatchListenerEvents(); 483 } 484 } 485 486 @Override 487 public final boolean isRunning() { 488 return state() == RUNNING; 489 } 490 491 @Override 492 public final State state() { 493 return snapshot.externalState(); 494 } 495 496 /** @since 14.0 */ 497 @Override 498 public final Throwable failureCause() { 499 return snapshot.failureCause(); 500 } 501 502 /** @since 13.0 */ 503 @Override 504 public final void addListener(Listener listener, Executor executor) { 505 listeners.addListener(listener, executor); 506 } 507 508 @Override 509 public String toString() { 510 return getClass().getSimpleName() + " [" + state() + "]"; 511 } 512 513 /** 514 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 515 * #monitor}. 516 */ 517 private void dispatchListenerEvents() { 518 if (!monitor.isOccupiedByCurrentThread()) { 519 listeners.dispatch(); 520 } 521 } 522 523 private void enqueueStartingEvent() { 524 listeners.enqueue(STARTING_EVENT); 525 } 526 527 private void enqueueRunningEvent() { 528 listeners.enqueue(RUNNING_EVENT); 529 } 530 531 private void enqueueStoppingEvent(final State from) { 532 if (from == State.STARTING) { 533 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 534 } else if (from == State.RUNNING) { 535 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 536 } else { 537 throw new AssertionError(); 538 } 539 } 540 541 private void enqueueTerminatedEvent(final State from) { 542 switch (from) { 543 case NEW: 544 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 545 break; 546 case STARTING: 547 listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 548 break; 549 case RUNNING: 550 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 551 break; 552 case STOPPING: 553 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 554 break; 555 case TERMINATED: 556 case FAILED: 557 throw new AssertionError(); 558 } 559 } 560 561 private void enqueueFailedEvent(final State from, final Throwable cause) { 562 // can't memoize this one due to the exception 563 listeners.enqueue( 564 new ListenerCallQueue.Event<Listener>() { 565 @Override 566 public void call(Listener listener) { 567 listener.failed(from, cause); 568 } 569 570 @Override 571 public String toString() { 572 return "failed({from = " + from + ", cause = " + cause + "})"; 573 } 574 }); 575 } 576 577 /** 578 * An immutable snapshot of the current state of the service. This class represents a consistent 579 * snapshot of the state and therefore it can be used to answer simple queries without needing to 580 * grab a lock. 581 */ 582 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 583 private static final class StateSnapshot { 584 /** 585 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 586 */ 587 final State state; 588 589 /** If true, the user requested a shutdown while the service was still starting up. */ 590 final boolean shutdownWhenStartupFinishes; 591 592 /** 593 * The exception that caused this service to fail. This will be {@code null} unless the service 594 * has failed. 595 */ 596 @CheckForNull final Throwable failure; 597 598 StateSnapshot(State internalState) { 599 this(internalState, false, null); 600 } 601 602 StateSnapshot( 603 State internalState, boolean shutdownWhenStartupFinishes, @CheckForNull Throwable failure) { 604 checkArgument( 605 !shutdownWhenStartupFinishes || internalState == STARTING, 606 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 607 internalState); 608 checkArgument( 609 (failure != null) == (internalState == FAILED), 610 "A failure cause should be set if and only if the state is failed. Got %s and %s " 611 + "instead.", 612 internalState, 613 failure); 614 this.state = internalState; 615 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 616 this.failure = failure; 617 } 618 619 /** @see Service#state() */ 620 State externalState() { 621 if (shutdownWhenStartupFinishes && state == STARTING) { 622 return STOPPING; 623 } else { 624 return state; 625 } 626 } 627 628 /** @see Service#failureCause() */ 629 Throwable failureCause() { 630 checkState( 631 state == FAILED, 632 "failureCause() is only valid if the service has failed, service is %s", 633 state); 634 // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED. 635 return requireNonNull(failure); 636 } 637 } 638}