001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Service.State.FAILED; 021import static com.google.common.util.concurrent.Service.State.NEW; 022import static com.google.common.util.concurrent.Service.State.RUNNING; 023import static com.google.common.util.concurrent.Service.State.STARTING; 024import static com.google.common.util.concurrent.Service.State.STOPPING; 025import static com.google.common.util.concurrent.Service.State.TERMINATED; 026 027import com.google.common.annotations.Beta; 028import com.google.common.annotations.GwtIncompatible; 029import com.google.common.util.concurrent.Monitor.Guard; 030import com.google.common.util.concurrent.Service.State; // javadoc needs this 031import com.google.errorprone.annotations.CanIgnoreReturnValue; 032import com.google.errorprone.annotations.ForOverride; 033import com.google.errorprone.annotations.concurrent.GuardedBy; 034import com.google.j2objc.annotations.WeakOuter; 035import java.time.Duration; 036import java.util.concurrent.Executor; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.TimeoutException; 039import org.checkerframework.checker.nullness.qual.Nullable; 040 041/** 042 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 043 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 044 * callbacks. Its subclasses must manage threads manually; consider {@link 045 * AbstractExecutionThreadService} if you need only a single execution thread. 046 * 047 * @author Jesse Wilson 048 * @author Luke Sandberg 049 * @since 1.0 050 */ 051@GwtIncompatible 052public abstract class AbstractService implements Service { 053 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 054 new ListenerCallQueue.Event<Listener>() { 055 @Override 056 public void call(Listener listener) { 057 listener.starting(); 058 } 059 060 @Override 061 public String toString() { 062 return "starting()"; 063 } 064 }; 065 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 066 new ListenerCallQueue.Event<Listener>() { 067 @Override 068 public void call(Listener listener) { 069 listener.running(); 070 } 071 072 @Override 073 public String toString() { 074 return "running()"; 075 } 076 }; 077 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 078 stoppingEvent(STARTING); 079 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 080 stoppingEvent(RUNNING); 081 082 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 083 terminatedEvent(NEW); 084 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 085 terminatedEvent(STARTING); 086 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 087 terminatedEvent(RUNNING); 088 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 089 terminatedEvent(STOPPING); 090 091 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 092 return new ListenerCallQueue.Event<Listener>() { 093 @Override 094 public void call(Listener listener) { 095 listener.terminated(from); 096 } 097 098 @Override 099 public String toString() { 100 return "terminated({from = " + from + "})"; 101 } 102 }; 103 } 104 105 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 106 return new ListenerCallQueue.Event<Listener>() { 107 @Override 108 public void call(Listener listener) { 109 listener.stopping(from); 110 } 111 112 @Override 113 public String toString() { 114 return "stopping({from = " + from + "})"; 115 } 116 }; 117 } 118 119 private final Monitor monitor = new Monitor(); 120 121 private final Guard isStartable = new IsStartableGuard(); 122 123 @WeakOuter 124 private final class IsStartableGuard extends Guard { 125 IsStartableGuard() { 126 super(AbstractService.this.monitor); 127 } 128 129 @Override 130 public boolean isSatisfied() { 131 return state() == NEW; 132 } 133 } 134 135 private final Guard isStoppable = new IsStoppableGuard(); 136 137 @WeakOuter 138 private final class IsStoppableGuard extends Guard { 139 IsStoppableGuard() { 140 super(AbstractService.this.monitor); 141 } 142 143 @Override 144 public boolean isSatisfied() { 145 return state().compareTo(RUNNING) <= 0; 146 } 147 } 148 149 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 150 151 @WeakOuter 152 private final class HasReachedRunningGuard extends Guard { 153 HasReachedRunningGuard() { 154 super(AbstractService.this.monitor); 155 } 156 157 @Override 158 public boolean isSatisfied() { 159 return state().compareTo(RUNNING) >= 0; 160 } 161 } 162 163 private final Guard isStopped = new IsStoppedGuard(); 164 165 @WeakOuter 166 private final class IsStoppedGuard extends Guard { 167 IsStoppedGuard() { 168 super(AbstractService.this.monitor); 169 } 170 171 @Override 172 public boolean isSatisfied() { 173 return state().isTerminal(); 174 } 175 } 176 177 /** The listeners to notify during a state transition. */ 178 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 179 180 /** 181 * The current state of the service. This should be written with the lock held but can be read 182 * without it because it is an immutable object in a volatile field. This is desirable so that 183 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 184 * without grabbing the lock. 185 * 186 * <p>To update this field correctly the lock must be held to guarantee that the state is 187 * consistent. 188 */ 189 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 190 191 /** Constructor for use by subclasses. */ 192 protected AbstractService() {} 193 194 /** 195 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 196 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 197 * or after it has returned. If startup fails, the invocation should cause a call to {@link 198 * #notifyFailed(Throwable)} instead. 199 * 200 * <p>This method should return promptly; prefer to do work on a different thread where it is 201 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 202 * called multiple times. 203 */ 204 @ForOverride 205 protected abstract void doStart(); 206 207 /** 208 * This method should be used to initiate service shutdown. The invocation of this method should 209 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 210 * returned. If shutdown fails, the invocation should cause a call to {@link 211 * #notifyFailed(Throwable)} instead. 212 * 213 * <p>This method should return promptly; prefer to do work on a different thread where it is 214 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 215 * called multiple times. 216 * 217 * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 218 * invoked immediately. Instead, it will be deferred until after the service is {@link 219 * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}. 220 */ 221 @ForOverride 222 protected abstract void doStop(); 223 224 /** 225 * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 226 * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 227 * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 228 * 229 * <p>This method should return promptly; prefer to do work on a different thread where it is 230 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 231 * called multiple times. 232 * 233 * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 234 * external state observable by the caller of {@link #stopAsync}. 235 * 236 * @since 27.0 237 */ 238 @Beta 239 @ForOverride 240 protected void doCancelStart() {} 241 242 @CanIgnoreReturnValue 243 @Override 244 public final Service startAsync() { 245 if (monitor.enterIf(isStartable)) { 246 try { 247 snapshot = new StateSnapshot(STARTING); 248 enqueueStartingEvent(); 249 doStart(); 250 } catch (Throwable startupFailure) { 251 notifyFailed(startupFailure); 252 } finally { 253 monitor.leave(); 254 dispatchListenerEvents(); 255 } 256 } else { 257 throw new IllegalStateException("Service " + this + " has already been started"); 258 } 259 return this; 260 } 261 262 @CanIgnoreReturnValue 263 @Override 264 public final Service stopAsync() { 265 if (monitor.enterIf(isStoppable)) { 266 try { 267 State previous = state(); 268 switch (previous) { 269 case NEW: 270 snapshot = new StateSnapshot(TERMINATED); 271 enqueueTerminatedEvent(NEW); 272 break; 273 case STARTING: 274 snapshot = new StateSnapshot(STARTING, true, null); 275 enqueueStoppingEvent(STARTING); 276 doCancelStart(); 277 break; 278 case RUNNING: 279 snapshot = new StateSnapshot(STOPPING); 280 enqueueStoppingEvent(RUNNING); 281 doStop(); 282 break; 283 case STOPPING: 284 case TERMINATED: 285 case FAILED: 286 // These cases are impossible due to the if statement above. 287 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 288 } 289 } catch (Throwable shutdownFailure) { 290 notifyFailed(shutdownFailure); 291 } finally { 292 monitor.leave(); 293 dispatchListenerEvents(); 294 } 295 } 296 return this; 297 } 298 299 @Override 300 public final void awaitRunning() { 301 monitor.enterWhenUninterruptibly(hasReachedRunning); 302 try { 303 checkCurrentState(RUNNING); 304 } finally { 305 monitor.leave(); 306 } 307 } 308 309 /** @since 28.0 */ 310 @Override 311 public final void awaitRunning(Duration timeout) throws TimeoutException { 312 Service.super.awaitRunning(timeout); 313 } 314 315 @Override 316 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 317 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 318 try { 319 checkCurrentState(RUNNING); 320 } finally { 321 monitor.leave(); 322 } 323 } else { 324 // It is possible due to races the we are currently in the expected state even though we 325 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 326 // even check the guard. I don't think we care too much about this use case but it could lead 327 // to a confusing error message. 328 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 329 } 330 } 331 332 @Override 333 public final void awaitTerminated() { 334 monitor.enterWhenUninterruptibly(isStopped); 335 try { 336 checkCurrentState(TERMINATED); 337 } finally { 338 monitor.leave(); 339 } 340 } 341 342 /** @since 28.0 */ 343 @Override 344 public final void awaitTerminated(Duration timeout) throws TimeoutException { 345 Service.super.awaitTerminated(timeout); 346 } 347 348 @Override 349 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 350 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 351 try { 352 checkCurrentState(TERMINATED); 353 } finally { 354 monitor.leave(); 355 } 356 } else { 357 // It is possible due to races the we are currently in the expected state even though we 358 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 359 // even check the guard. I don't think we care too much about this use case but it could lead 360 // to a confusing error message. 361 throw new TimeoutException( 362 "Timed out waiting for " 363 + this 364 + " to reach a terminal state. " 365 + "Current state: " 366 + state()); 367 } 368 } 369 370 /** Checks that the current state is equal to the expected state. */ 371 @GuardedBy("monitor") 372 private void checkCurrentState(State expected) { 373 State actual = state(); 374 if (actual != expected) { 375 if (actual == FAILED) { 376 // Handle this specially so that we can include the failureCause, if there is one. 377 throw new IllegalStateException( 378 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 379 failureCause()); 380 } 381 throw new IllegalStateException( 382 "Expected the service " + this + " to be " + expected + ", but was " + actual); 383 } 384 } 385 386 /** 387 * Implementing classes should invoke this method once their service has started. It will cause 388 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 389 * 390 * @throws IllegalStateException if the service is not {@link State#STARTING}. 391 */ 392 protected final void notifyStarted() { 393 monitor.enter(); 394 try { 395 // We have to examine the internal state of the snapshot here to properly handle the stop 396 // while starting case. 397 if (snapshot.state != STARTING) { 398 IllegalStateException failure = 399 new IllegalStateException( 400 "Cannot notifyStarted() when the service is " + snapshot.state); 401 notifyFailed(failure); 402 throw failure; 403 } 404 405 if (snapshot.shutdownWhenStartupFinishes) { 406 snapshot = new StateSnapshot(STOPPING); 407 // We don't call listeners here because we already did that when we set the 408 // shutdownWhenStartupFinishes flag. 409 doStop(); 410 } else { 411 snapshot = new StateSnapshot(RUNNING); 412 enqueueRunningEvent(); 413 } 414 } finally { 415 monitor.leave(); 416 dispatchListenerEvents(); 417 } 418 } 419 420 /** 421 * Implementing classes should invoke this method once their service has stopped. It will cause 422 * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 423 * State#TERMINATED}. 424 * 425 * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 426 * State#STARTING}, or {@link State#RUNNING}. 427 */ 428 protected final void notifyStopped() { 429 monitor.enter(); 430 try { 431 State previous = state(); 432 switch (previous) { 433 case NEW: 434 case TERMINATED: 435 case FAILED: 436 throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 437 case RUNNING: 438 case STARTING: 439 case STOPPING: 440 snapshot = new StateSnapshot(TERMINATED); 441 enqueueTerminatedEvent(previous); 442 break; 443 } 444 } finally { 445 monitor.leave(); 446 dispatchListenerEvents(); 447 } 448 } 449 450 /** 451 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 452 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 453 * or otherwise cannot be started nor stopped. 454 */ 455 protected final void notifyFailed(Throwable cause) { 456 checkNotNull(cause); 457 458 monitor.enter(); 459 try { 460 State previous = state(); 461 switch (previous) { 462 case NEW: 463 case TERMINATED: 464 throw new IllegalStateException("Failed while in state:" + previous, cause); 465 case RUNNING: 466 case STARTING: 467 case STOPPING: 468 snapshot = new StateSnapshot(FAILED, false, cause); 469 enqueueFailedEvent(previous, cause); 470 break; 471 case FAILED: 472 // Do nothing 473 break; 474 } 475 } finally { 476 monitor.leave(); 477 dispatchListenerEvents(); 478 } 479 } 480 481 @Override 482 public final boolean isRunning() { 483 return state() == RUNNING; 484 } 485 486 @Override 487 public final State state() { 488 return snapshot.externalState(); 489 } 490 491 /** @since 14.0 */ 492 @Override 493 public final Throwable failureCause() { 494 return snapshot.failureCause(); 495 } 496 497 /** @since 13.0 */ 498 @Override 499 public final void addListener(Listener listener, Executor executor) { 500 listeners.addListener(listener, executor); 501 } 502 503 @Override 504 public String toString() { 505 return getClass().getSimpleName() + " [" + state() + "]"; 506 } 507 508 /** 509 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 510 * #monitor}. 511 */ 512 private void dispatchListenerEvents() { 513 if (!monitor.isOccupiedByCurrentThread()) { 514 listeners.dispatch(); 515 } 516 } 517 518 private void enqueueStartingEvent() { 519 listeners.enqueue(STARTING_EVENT); 520 } 521 522 private void enqueueRunningEvent() { 523 listeners.enqueue(RUNNING_EVENT); 524 } 525 526 private void enqueueStoppingEvent(final State from) { 527 if (from == State.STARTING) { 528 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 529 } else if (from == State.RUNNING) { 530 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 531 } else { 532 throw new AssertionError(); 533 } 534 } 535 536 private void enqueueTerminatedEvent(final State from) { 537 switch (from) { 538 case NEW: 539 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 540 break; 541 case STARTING: 542 listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 543 break; 544 case RUNNING: 545 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 546 break; 547 case STOPPING: 548 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 549 break; 550 case TERMINATED: 551 case FAILED: 552 throw new AssertionError(); 553 } 554 } 555 556 private void enqueueFailedEvent(final State from, final Throwable cause) { 557 // can't memoize this one due to the exception 558 listeners.enqueue( 559 new ListenerCallQueue.Event<Listener>() { 560 @Override 561 public void call(Listener listener) { 562 listener.failed(from, cause); 563 } 564 565 @Override 566 public String toString() { 567 return "failed({from = " + from + ", cause = " + cause + "})"; 568 } 569 }); 570 } 571 572 /** 573 * An immutable snapshot of the current state of the service. This class represents a consistent 574 * snapshot of the state and therefore it can be used to answer simple queries without needing to 575 * grab a lock. 576 */ 577 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 578 private static final class StateSnapshot { 579 /** 580 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 581 */ 582 final State state; 583 584 /** If true, the user requested a shutdown while the service was still starting up. */ 585 final boolean shutdownWhenStartupFinishes; 586 587 /** 588 * The exception that caused this service to fail. This will be {@code null} unless the service 589 * has failed. 590 */ 591 final @Nullable Throwable failure; 592 593 StateSnapshot(State internalState) { 594 this(internalState, false, null); 595 } 596 597 StateSnapshot( 598 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 599 checkArgument( 600 !shutdownWhenStartupFinishes || internalState == STARTING, 601 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 602 internalState); 603 checkArgument( 604 !(failure != null ^ internalState == FAILED), 605 "A failure cause should be set if and only if the state is failed. Got %s and %s " 606 + "instead.", 607 internalState, 608 failure); 609 this.state = internalState; 610 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 611 this.failure = failure; 612 } 613 614 /** @see Service#state() */ 615 State externalState() { 616 if (shutdownWhenStartupFinishes && state == STARTING) { 617 return STOPPING; 618 } else { 619 return state; 620 } 621 } 622 623 /** @see Service#failureCause() */ 624 Throwable failureCause() { 625 checkState( 626 state == FAILED, 627 "failureCause() is only valid if the service has failed, service is %s", 628 state); 629 return failure; 630 } 631 } 632}