001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 021import static com.google.common.util.concurrent.Service.State.FAILED; 022import static com.google.common.util.concurrent.Service.State.NEW; 023import static com.google.common.util.concurrent.Service.State.RUNNING; 024import static com.google.common.util.concurrent.Service.State.STARTING; 025import static com.google.common.util.concurrent.Service.State.STOPPING; 026import static com.google.common.util.concurrent.Service.State.TERMINATED; 027import static java.util.Objects.requireNonNull; 028 029import com.google.common.annotations.GwtIncompatible; 030import com.google.common.annotations.J2ktIncompatible; 031import com.google.common.util.concurrent.Monitor.Guard; 032import com.google.common.util.concurrent.Service.State; 033import com.google.errorprone.annotations.CanIgnoreReturnValue; 034import com.google.errorprone.annotations.ForOverride; 035import com.google.errorprone.annotations.concurrent.GuardedBy; 036import com.google.j2objc.annotations.WeakOuter; 037import java.time.Duration; 038import java.util.concurrent.Executor; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.TimeoutException; 041import org.jspecify.annotations.Nullable; 042 043/** 044 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 045 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 046 * callbacks. Its subclasses must manage threads manually; consider {@link 047 * AbstractExecutionThreadService} if you need only a single execution thread. 048 * 049 * @author Jesse Wilson 050 * @author Luke Sandberg 051 * @since 1.0 052 */ 053@GwtIncompatible 054@J2ktIncompatible 055public abstract class AbstractService implements Service { 056 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 057 new ListenerCallQueue.Event<Listener>() { 058 @Override 059 public void call(Listener listener) { 060 listener.starting(); 061 } 062 063 @Override 064 public String toString() { 065 return "starting()"; 066 } 067 }; 068 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 069 new ListenerCallQueue.Event<Listener>() { 070 @Override 071 public void call(Listener listener) { 072 listener.running(); 073 } 074 075 @Override 076 public String toString() { 077 return "running()"; 078 } 079 }; 080 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 081 stoppingEvent(STARTING); 082 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 083 stoppingEvent(RUNNING); 084 085 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 086 terminatedEvent(NEW); 087 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 088 terminatedEvent(STARTING); 089 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 090 terminatedEvent(RUNNING); 091 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 092 terminatedEvent(STOPPING); 093 094 private static ListenerCallQueue.Event<Listener> terminatedEvent(State from) { 095 return new ListenerCallQueue.Event<Listener>() { 096 @Override 097 public void call(Listener listener) { 098 listener.terminated(from); 099 } 100 101 @Override 102 public String toString() { 103 return "terminated({from = " + from + "})"; 104 } 105 }; 106 } 107 108 private static ListenerCallQueue.Event<Listener> stoppingEvent(State from) { 109 return new ListenerCallQueue.Event<Listener>() { 110 @Override 111 public void call(Listener listener) { 112 listener.stopping(from); 113 } 114 115 @Override 116 public String toString() { 117 return "stopping({from = " + from + "})"; 118 } 119 }; 120 } 121 122 private final Monitor monitor = new Monitor(); 123 124 private final Guard isStartable = new IsStartableGuard(); 125 126 @WeakOuter 127 private final class IsStartableGuard extends Guard { 128 IsStartableGuard() { 129 super(AbstractService.this.monitor); 130 } 131 132 @Override 133 public boolean isSatisfied() { 134 return state() == NEW; 135 } 136 } 137 138 private final Guard isStoppable = new IsStoppableGuard(); 139 140 @WeakOuter 141 private final class IsStoppableGuard extends Guard { 142 IsStoppableGuard() { 143 super(AbstractService.this.monitor); 144 } 145 146 @Override 147 public boolean isSatisfied() { 148 return state().compareTo(RUNNING) <= 0; 149 } 150 } 151 152 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 153 154 @WeakOuter 155 private final class HasReachedRunningGuard extends Guard { 156 HasReachedRunningGuard() { 157 super(AbstractService.this.monitor); 158 } 159 160 @Override 161 public boolean isSatisfied() { 162 return state().compareTo(RUNNING) >= 0; 163 } 164 } 165 166 private final Guard isStopped = new IsStoppedGuard(); 167 168 @WeakOuter 169 private final class IsStoppedGuard extends Guard { 170 IsStoppedGuard() { 171 super(AbstractService.this.monitor); 172 } 173 174 @Override 175 public boolean isSatisfied() { 176 return state().compareTo(TERMINATED) >= 0; 177 } 178 } 179 180 /** The listeners to notify during a state transition. */ 181 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 182 183 /** 184 * The current state of the service. This should be written with the lock held but can be read 185 * without it because it is an immutable object in a volatile field. This is desirable so that 186 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 187 * without grabbing the lock. 188 * 189 * <p>To update this field correctly the lock must be held to guarantee that the state is 190 * consistent. 191 */ 192 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 193 194 /** Constructor for use by subclasses. */ 195 protected AbstractService() {} 196 197 /** 198 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 199 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 200 * or after it has returned. If startup fails, the invocation should cause a call to {@link 201 * #notifyFailed(Throwable)} instead. 202 * 203 * <p>This method should return promptly; prefer to do work on a different thread where it is 204 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 205 * called multiple times. 206 */ 207 @ForOverride 208 protected abstract void doStart(); 209 210 /** 211 * This method should be used to initiate service shutdown. The invocation of this method should 212 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 213 * returned. If shutdown fails, the invocation should cause a call to {@link 214 * #notifyFailed(Throwable)} instead. 215 * 216 * <p>This method should return promptly; prefer to do work on a different thread where it is 217 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 218 * called multiple times. 219 * 220 * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 221 * invoked immediately. Instead, it will be deferred until after the service is {@link 222 * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}. 223 */ 224 @ForOverride 225 protected abstract void doStop(); 226 227 /** 228 * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 229 * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 230 * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 231 * 232 * <p>This method should return promptly; prefer to do work on a different thread where it is 233 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 234 * called multiple times. 235 * 236 * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 237 * external state observable by the caller of {@link #stopAsync}. 238 * 239 * @since 27.0 240 */ 241 @ForOverride 242 protected void doCancelStart() {} 243 244 @CanIgnoreReturnValue 245 @Override 246 public final Service startAsync() { 247 if (monitor.enterIf(isStartable)) { 248 try { 249 snapshot = new StateSnapshot(STARTING); 250 enqueueStartingEvent(); 251 doStart(); 252 } catch (Throwable startupFailure) { 253 restoreInterruptIfIsInterruptedException(startupFailure); 254 notifyFailed(startupFailure); 255 } finally { 256 monitor.leave(); 257 dispatchListenerEvents(); 258 } 259 } else { 260 throw new IllegalStateException("Service " + this + " has already been started"); 261 } 262 return this; 263 } 264 265 @CanIgnoreReturnValue 266 @Override 267 public final Service stopAsync() { 268 if (monitor.enterIf(isStoppable)) { 269 try { 270 State previous = state(); 271 switch (previous) { 272 case NEW: 273 snapshot = new StateSnapshot(TERMINATED); 274 enqueueTerminatedEvent(NEW); 275 break; 276 case STARTING: 277 snapshot = new StateSnapshot(STARTING, true, null); 278 enqueueStoppingEvent(STARTING); 279 doCancelStart(); 280 break; 281 case RUNNING: 282 snapshot = new StateSnapshot(STOPPING); 283 enqueueStoppingEvent(RUNNING); 284 doStop(); 285 break; 286 case STOPPING: 287 case TERMINATED: 288 case FAILED: 289 // These cases are impossible due to the if statement above. 290 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 291 } 292 } catch (Throwable shutdownFailure) { 293 restoreInterruptIfIsInterruptedException(shutdownFailure); 294 notifyFailed(shutdownFailure); 295 } finally { 296 monitor.leave(); 297 dispatchListenerEvents(); 298 } 299 } 300 return this; 301 } 302 303 @Override 304 public final void awaitRunning() { 305 monitor.enterWhenUninterruptibly(hasReachedRunning); 306 try { 307 checkCurrentState(RUNNING); 308 } finally { 309 monitor.leave(); 310 } 311 } 312 313 /** 314 * @since 28.0 315 */ 316 @Override 317 public final void awaitRunning(Duration timeout) throws TimeoutException { 318 Service.super.awaitRunning(timeout); 319 } 320 321 @Override 322 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 323 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 324 try { 325 checkCurrentState(RUNNING); 326 } finally { 327 monitor.leave(); 328 } 329 } else { 330 // It is possible due to races that we are currently in the expected state even though we 331 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 332 // even check the guard. I don't think we care too much about this use case but it could lead 333 // to a confusing error message. 334 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 335 } 336 } 337 338 @Override 339 public final void awaitTerminated() { 340 monitor.enterWhenUninterruptibly(isStopped); 341 try { 342 checkCurrentState(TERMINATED); 343 } finally { 344 monitor.leave(); 345 } 346 } 347 348 /** 349 * @since 28.0 350 */ 351 @Override 352 public final void awaitTerminated(Duration timeout) throws TimeoutException { 353 Service.super.awaitTerminated(timeout); 354 } 355 356 @Override 357 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 358 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 359 try { 360 checkCurrentState(TERMINATED); 361 } finally { 362 monitor.leave(); 363 } 364 } else { 365 // It is possible due to races that we are currently in the expected state even though we 366 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 367 // even check the guard. I don't think we care too much about this use case but it could lead 368 // to a confusing error message. 369 throw new TimeoutException( 370 "Timed out waiting for " 371 + this 372 + " to reach a terminal state. " 373 + "Current state: " 374 + state()); 375 } 376 } 377 378 /** Checks that the current state is equal to the expected state. */ 379 @GuardedBy("monitor") 380 private void checkCurrentState(State expected) { 381 State actual = state(); 382 if (actual != expected) { 383 if (actual == FAILED) { 384 // Handle this specially so that we can include the failureCause, if there is one. 385 throw new IllegalStateException( 386 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 387 failureCause()); 388 } 389 throw new IllegalStateException( 390 "Expected the service " + this + " to be " + expected + ", but was " + actual); 391 } 392 } 393 394 /** 395 * Implementing classes should invoke this method once their service has started. It will cause 396 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 397 * 398 * @throws IllegalStateException if the service is not {@link State#STARTING}. 399 */ 400 protected final void notifyStarted() { 401 monitor.enter(); 402 try { 403 // We have to examine the internal state of the snapshot here to properly handle the stop 404 // while starting case. 405 if (snapshot.state != STARTING) { 406 IllegalStateException failure = 407 new IllegalStateException( 408 "Cannot notifyStarted() when the service is " + snapshot.state); 409 notifyFailed(failure); 410 throw failure; 411 } 412 413 if (snapshot.shutdownWhenStartupFinishes) { 414 snapshot = new StateSnapshot(STOPPING); 415 // We don't call listeners here because we already did that when we set the 416 // shutdownWhenStartupFinishes flag. 417 doStop(); 418 } else { 419 snapshot = new StateSnapshot(RUNNING); 420 enqueueRunningEvent(); 421 } 422 } finally { 423 monitor.leave(); 424 dispatchListenerEvents(); 425 } 426 } 427 428 /** 429 * Implementing classes should invoke this method once their service has stopped. It will cause 430 * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 431 * State#TERMINATED}. 432 * 433 * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 434 * State#STARTING}, or {@link State#RUNNING}. 435 */ 436 protected final void notifyStopped() { 437 monitor.enter(); 438 try { 439 State previous = state(); 440 switch (previous) { 441 case NEW: 442 case TERMINATED: 443 case FAILED: 444 throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 445 case RUNNING: 446 case STARTING: 447 case STOPPING: 448 snapshot = new StateSnapshot(TERMINATED); 449 enqueueTerminatedEvent(previous); 450 break; 451 } 452 } finally { 453 monitor.leave(); 454 dispatchListenerEvents(); 455 } 456 } 457 458 /** 459 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 460 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 461 * or otherwise cannot be started nor stopped. 462 */ 463 protected final void notifyFailed(Throwable cause) { 464 checkNotNull(cause); 465 466 monitor.enter(); 467 try { 468 State previous = state(); 469 switch (previous) { 470 case NEW: 471 case TERMINATED: 472 throw new IllegalStateException("Failed while in state:" + previous, cause); 473 case RUNNING: 474 case STARTING: 475 case STOPPING: 476 snapshot = new StateSnapshot(FAILED, false, cause); 477 enqueueFailedEvent(previous, cause); 478 break; 479 case FAILED: 480 // Do nothing 481 break; 482 } 483 } finally { 484 monitor.leave(); 485 dispatchListenerEvents(); 486 } 487 } 488 489 @Override 490 public final boolean isRunning() { 491 return state() == RUNNING; 492 } 493 494 @Override 495 public final State state() { 496 return snapshot.externalState(); 497 } 498 499 /** 500 * @since 14.0 501 */ 502 @Override 503 public final Throwable failureCause() { 504 return snapshot.failureCause(); 505 } 506 507 /** 508 * @since 13.0 509 */ 510 @Override 511 public final void addListener(Listener listener, Executor executor) { 512 listeners.addListener(listener, executor); 513 } 514 515 @Override 516 public String toString() { 517 return getClass().getSimpleName() + " [" + state() + "]"; 518 } 519 520 /** 521 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 522 * #monitor}. 523 */ 524 private void dispatchListenerEvents() { 525 if (!monitor.isOccupiedByCurrentThread()) { 526 listeners.dispatch(); 527 } 528 } 529 530 private void enqueueStartingEvent() { 531 listeners.enqueue(STARTING_EVENT); 532 } 533 534 private void enqueueRunningEvent() { 535 listeners.enqueue(RUNNING_EVENT); 536 } 537 538 private void enqueueStoppingEvent(State from) { 539 if (from == State.STARTING) { 540 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 541 } else if (from == State.RUNNING) { 542 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 543 } else { 544 throw new AssertionError(); 545 } 546 } 547 548 private void enqueueTerminatedEvent(State from) { 549 switch (from) { 550 case NEW: 551 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 552 break; 553 case STARTING: 554 listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 555 break; 556 case RUNNING: 557 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 558 break; 559 case STOPPING: 560 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 561 break; 562 case TERMINATED: 563 case FAILED: 564 throw new AssertionError(); 565 } 566 } 567 568 private void enqueueFailedEvent(State from, Throwable cause) { 569 // can't memoize this one due to the exception 570 listeners.enqueue( 571 new ListenerCallQueue.Event<Listener>() { 572 @Override 573 public void call(Listener listener) { 574 listener.failed(from, cause); 575 } 576 577 @Override 578 public String toString() { 579 return "failed({from = " + from + ", cause = " + cause + "})"; 580 } 581 }); 582 } 583 584 /** 585 * An immutable snapshot of the current state of the service. This class represents a consistent 586 * snapshot of the state and therefore it can be used to answer simple queries without needing to 587 * grab a lock. 588 */ 589 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 590 private static final class StateSnapshot { 591 /** 592 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 593 */ 594 final State state; 595 596 /** If true, the user requested a shutdown while the service was still starting up. */ 597 final boolean shutdownWhenStartupFinishes; 598 599 /** 600 * The exception that caused this service to fail. This will be {@code null} unless the service 601 * has failed. 602 */ 603 final @Nullable Throwable failure; 604 605 StateSnapshot(State internalState) { 606 this(internalState, false, null); 607 } 608 609 StateSnapshot( 610 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 611 checkArgument( 612 !shutdownWhenStartupFinishes || internalState == STARTING, 613 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 614 internalState); 615 checkArgument( 616 (failure != null) == (internalState == FAILED), 617 "A failure cause should be set if and only if the state is failed. Got %s and %s " 618 + "instead.", 619 internalState, 620 failure); 621 this.state = internalState; 622 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 623 this.failure = failure; 624 } 625 626 /** 627 * @see Service#state() 628 */ 629 State externalState() { 630 if (shutdownWhenStartupFinishes && state == STARTING) { 631 return STOPPING; 632 } else { 633 return state; 634 } 635 } 636 637 /** 638 * @see Service#failureCause() 639 */ 640 Throwable failureCause() { 641 checkState( 642 state == FAILED, 643 "failureCause() is only valid if the service has failed, service is %s", 644 state); 645 // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED. 646 return requireNonNull(failure); 647 } 648 } 649}