001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 021import static com.google.common.util.concurrent.Service.State.FAILED; 022import static com.google.common.util.concurrent.Service.State.NEW; 023import static com.google.common.util.concurrent.Service.State.RUNNING; 024import static com.google.common.util.concurrent.Service.State.STARTING; 025import static com.google.common.util.concurrent.Service.State.STOPPING; 026import static com.google.common.util.concurrent.Service.State.TERMINATED; 027import static java.util.Objects.requireNonNull; 028 029import com.google.common.annotations.GwtIncompatible; 030import com.google.common.annotations.J2ktIncompatible; 031import com.google.common.util.concurrent.Monitor.Guard; 032import com.google.common.util.concurrent.Service.State; 033import com.google.errorprone.annotations.CanIgnoreReturnValue; 034import com.google.errorprone.annotations.ForOverride; 035import com.google.errorprone.annotations.concurrent.GuardedBy; 036import com.google.j2objc.annotations.WeakOuter; 037import java.time.Duration; 038import java.util.concurrent.Executor; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.TimeoutException; 041import org.jspecify.annotations.Nullable; 042 043/** 044 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 045 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 046 * callbacks. Its subclasses must manage threads manually; consider {@link 047 * AbstractExecutionThreadService} if you need only a single execution thread. 048 * 049 * @author Jesse Wilson 050 * @author Luke Sandberg 051 * @since 1.0 052 */ 053@GwtIncompatible 054@J2ktIncompatible 055public abstract class AbstractService implements Service { 056 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 057 new ListenerCallQueue.Event<Listener>() { 058 @Override 059 public void call(Listener listener) { 060 listener.starting(); 061 } 062 063 @Override 064 public String toString() { 065 return "starting()"; 066 } 067 }; 068 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 069 new ListenerCallQueue.Event<Listener>() { 070 @Override 071 public void call(Listener listener) { 072 listener.running(); 073 } 074 075 @Override 076 public String toString() { 077 return "running()"; 078 } 079 }; 080 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 081 stoppingEvent(STARTING); 082 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 083 stoppingEvent(RUNNING); 084 085 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 086 terminatedEvent(NEW); 087 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 088 terminatedEvent(STARTING); 089 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 090 terminatedEvent(RUNNING); 091 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 092 terminatedEvent(STOPPING); 093 094 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 095 return new ListenerCallQueue.Event<Listener>() { 096 @Override 097 public void call(Listener listener) { 098 listener.terminated(from); 099 } 100 101 @Override 102 public String toString() { 103 return "terminated({from = " + from + "})"; 104 } 105 }; 106 } 107 108 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 109 return new ListenerCallQueue.Event<Listener>() { 110 @Override 111 public void call(Listener listener) { 112 listener.stopping(from); 113 } 114 115 @Override 116 public String toString() { 117 return "stopping({from = " + from + "})"; 118 } 119 }; 120 } 121 122 private final Monitor monitor = new Monitor(); 123 124 private final Guard isStartable = new IsStartableGuard(); 125 126 @WeakOuter 127 private final class IsStartableGuard extends Guard { 128 IsStartableGuard() { 129 super(AbstractService.this.monitor); 130 } 131 132 @Override 133 public boolean isSatisfied() { 134 return state() == NEW; 135 } 136 } 137 138 private final Guard isStoppable = new IsStoppableGuard(); 139 140 @WeakOuter 141 private final class IsStoppableGuard extends Guard { 142 IsStoppableGuard() { 143 super(AbstractService.this.monitor); 144 } 145 146 @Override 147 public boolean isSatisfied() { 148 return state().compareTo(RUNNING) <= 0; 149 } 150 } 151 152 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 153 154 @WeakOuter 155 private final class HasReachedRunningGuard extends Guard { 156 HasReachedRunningGuard() { 157 super(AbstractService.this.monitor); 158 } 159 160 @Override 161 public boolean isSatisfied() { 162 return state().compareTo(RUNNING) >= 0; 163 } 164 } 165 166 private final Guard isStopped = new IsStoppedGuard(); 167 168 @WeakOuter 169 private final class IsStoppedGuard extends Guard { 170 IsStoppedGuard() { 171 super(AbstractService.this.monitor); 172 } 173 174 @Override 175 public boolean isSatisfied() { 176 return state().compareTo(TERMINATED) >= 0; 177 } 178 } 179 180 /** The listeners to notify during a state transition. */ 181 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 182 183 /** 184 * The current state of the service. This should be written with the lock held but can be read 185 * without it because it is an immutable object in a volatile field. This is desirable so that 186 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 187 * without grabbing the lock. 188 * 189 * <p>To update this field correctly the lock must be held to guarantee that the state is 190 * consistent. 191 */ 192 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 193 194 /** Constructor for use by subclasses. */ 195 protected AbstractService() {} 196 197 /** 198 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 199 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 200 * or after it has returned. If startup fails, the invocation should cause a call to {@link 201 * #notifyFailed(Throwable)} instead. 202 * 203 * <p>This method should return promptly; prefer to do work on a different thread where it is 204 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 205 * called multiple times. 206 */ 207 @ForOverride 208 protected abstract void doStart(); 209 210 /** 211 * This method should be used to initiate service shutdown. The invocation of this method should 212 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 213 * returned. If shutdown fails, the invocation should cause a call to {@link 214 * #notifyFailed(Throwable)} instead. 215 * 216 * <p>This method should return promptly; prefer to do work on a different thread where it is 217 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 218 * called multiple times. 219 * 220 * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 221 * invoked immediately. Instead, it will be deferred until after the service is {@link 222 * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}. 223 */ 224 @ForOverride 225 protected abstract void doStop(); 226 227 /** 228 * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 229 * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 230 * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 231 * 232 * <p>This method should return promptly; prefer to do work on a different thread where it is 233 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 234 * called multiple times. 235 * 236 * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 237 * external state observable by the caller of {@link #stopAsync}. 238 * 239 * @since 27.0 240 */ 241 @ForOverride 242 protected void doCancelStart() {} 243 244 @CanIgnoreReturnValue 245 @Override 246 public final Service startAsync() { 247 if (monitor.enterIf(isStartable)) { 248 try { 249 snapshot = new StateSnapshot(STARTING); 250 enqueueStartingEvent(); 251 doStart(); 252 } catch (Throwable startupFailure) { 253 restoreInterruptIfIsInterruptedException(startupFailure); 254 notifyFailed(startupFailure); 255 } finally { 256 monitor.leave(); 257 dispatchListenerEvents(); 258 } 259 } else { 260 throw new IllegalStateException("Service " + this + " has already been started"); 261 } 262 return this; 263 } 264 265 @CanIgnoreReturnValue 266 @Override 267 public final Service stopAsync() { 268 if (monitor.enterIf(isStoppable)) { 269 try { 270 State previous = state(); 271 switch (previous) { 272 case NEW: 273 snapshot = new StateSnapshot(TERMINATED); 274 enqueueTerminatedEvent(NEW); 275 break; 276 case STARTING: 277 snapshot = new StateSnapshot(STARTING, true, null); 278 enqueueStoppingEvent(STARTING); 279 doCancelStart(); 280 break; 281 case RUNNING: 282 snapshot = new StateSnapshot(STOPPING); 283 enqueueStoppingEvent(RUNNING); 284 doStop(); 285 break; 286 case STOPPING: 287 case TERMINATED: 288 case FAILED: 289 // These cases are impossible due to the if statement above. 290 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 291 } 292 } catch (Throwable shutdownFailure) { 293 restoreInterruptIfIsInterruptedException(shutdownFailure); 294 notifyFailed(shutdownFailure); 295 } finally { 296 monitor.leave(); 297 dispatchListenerEvents(); 298 } 299 } 300 return this; 301 } 302 303 @Override 304 public final void awaitRunning() { 305 monitor.enterWhenUninterruptibly(hasReachedRunning); 306 try { 307 checkCurrentState(RUNNING); 308 } finally { 309 monitor.leave(); 310 } 311 } 312 313 /** @since 28.0 */ 314 @Override 315 public final void awaitRunning(Duration timeout) throws TimeoutException { 316 Service.super.awaitRunning(timeout); 317 } 318 319 @Override 320 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 321 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 322 try { 323 checkCurrentState(RUNNING); 324 } finally { 325 monitor.leave(); 326 } 327 } else { 328 // It is possible due to races that we are currently in the expected state even though we 329 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 330 // even check the guard. I don't think we care too much about this use case but it could lead 331 // to a confusing error message. 332 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 333 } 334 } 335 336 @Override 337 public final void awaitTerminated() { 338 monitor.enterWhenUninterruptibly(isStopped); 339 try { 340 checkCurrentState(TERMINATED); 341 } finally { 342 monitor.leave(); 343 } 344 } 345 346 /** @since 28.0 */ 347 @Override 348 public final void awaitTerminated(Duration timeout) throws TimeoutException { 349 Service.super.awaitTerminated(timeout); 350 } 351 352 @Override 353 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 354 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 355 try { 356 checkCurrentState(TERMINATED); 357 } finally { 358 monitor.leave(); 359 } 360 } else { 361 // It is possible due to races that we are currently in the expected state even though we 362 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 363 // even check the guard. I don't think we care too much about this use case but it could lead 364 // to a confusing error message. 365 throw new TimeoutException( 366 "Timed out waiting for " 367 + this 368 + " to reach a terminal state. " 369 + "Current state: " 370 + state()); 371 } 372 } 373 374 /** Checks that the current state is equal to the expected state. */ 375 @GuardedBy("monitor") 376 private void checkCurrentState(State expected) { 377 State actual = state(); 378 if (actual != expected) { 379 if (actual == FAILED) { 380 // Handle this specially so that we can include the failureCause, if there is one. 381 throw new IllegalStateException( 382 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 383 failureCause()); 384 } 385 throw new IllegalStateException( 386 "Expected the service " + this + " to be " + expected + ", but was " + actual); 387 } 388 } 389 390 /** 391 * Implementing classes should invoke this method once their service has started. It will cause 392 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 393 * 394 * @throws IllegalStateException if the service is not {@link State#STARTING}. 395 */ 396 protected final void notifyStarted() { 397 monitor.enter(); 398 try { 399 // We have to examine the internal state of the snapshot here to properly handle the stop 400 // while starting case. 401 if (snapshot.state != STARTING) { 402 IllegalStateException failure = 403 new IllegalStateException( 404 "Cannot notifyStarted() when the service is " + snapshot.state); 405 notifyFailed(failure); 406 throw failure; 407 } 408 409 if (snapshot.shutdownWhenStartupFinishes) { 410 snapshot = new StateSnapshot(STOPPING); 411 // We don't call listeners here because we already did that when we set the 412 // shutdownWhenStartupFinishes flag. 413 doStop(); 414 } else { 415 snapshot = new StateSnapshot(RUNNING); 416 enqueueRunningEvent(); 417 } 418 } finally { 419 monitor.leave(); 420 dispatchListenerEvents(); 421 } 422 } 423 424 /** 425 * Implementing classes should invoke this method once their service has stopped. It will cause 426 * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 427 * State#TERMINATED}. 428 * 429 * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 430 * State#STARTING}, or {@link State#RUNNING}. 431 */ 432 protected final void notifyStopped() { 433 monitor.enter(); 434 try { 435 State previous = state(); 436 switch (previous) { 437 case NEW: 438 case TERMINATED: 439 case FAILED: 440 throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 441 case RUNNING: 442 case STARTING: 443 case STOPPING: 444 snapshot = new StateSnapshot(TERMINATED); 445 enqueueTerminatedEvent(previous); 446 break; 447 } 448 } finally { 449 monitor.leave(); 450 dispatchListenerEvents(); 451 } 452 } 453 454 /** 455 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 456 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 457 * or otherwise cannot be started nor stopped. 458 */ 459 protected final void notifyFailed(Throwable cause) { 460 checkNotNull(cause); 461 462 monitor.enter(); 463 try { 464 State previous = state(); 465 switch (previous) { 466 case NEW: 467 case TERMINATED: 468 throw new IllegalStateException("Failed while in state:" + previous, cause); 469 case RUNNING: 470 case STARTING: 471 case STOPPING: 472 snapshot = new StateSnapshot(FAILED, false, cause); 473 enqueueFailedEvent(previous, cause); 474 break; 475 case FAILED: 476 // Do nothing 477 break; 478 } 479 } finally { 480 monitor.leave(); 481 dispatchListenerEvents(); 482 } 483 } 484 485 @Override 486 public final boolean isRunning() { 487 return state() == RUNNING; 488 } 489 490 @Override 491 public final State state() { 492 return snapshot.externalState(); 493 } 494 495 /** @since 14.0 */ 496 @Override 497 public final Throwable failureCause() { 498 return snapshot.failureCause(); 499 } 500 501 /** @since 13.0 */ 502 @Override 503 public final void addListener(Listener listener, Executor executor) { 504 listeners.addListener(listener, executor); 505 } 506 507 @Override 508 public String toString() { 509 return getClass().getSimpleName() + " [" + state() + "]"; 510 } 511 512 /** 513 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 514 * #monitor}. 515 */ 516 private void dispatchListenerEvents() { 517 if (!monitor.isOccupiedByCurrentThread()) { 518 listeners.dispatch(); 519 } 520 } 521 522 private void enqueueStartingEvent() { 523 listeners.enqueue(STARTING_EVENT); 524 } 525 526 private void enqueueRunningEvent() { 527 listeners.enqueue(RUNNING_EVENT); 528 } 529 530 private void enqueueStoppingEvent(final State from) { 531 if (from == State.STARTING) { 532 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 533 } else if (from == State.RUNNING) { 534 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 535 } else { 536 throw new AssertionError(); 537 } 538 } 539 540 private void enqueueTerminatedEvent(final State from) { 541 switch (from) { 542 case NEW: 543 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 544 break; 545 case STARTING: 546 listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 547 break; 548 case RUNNING: 549 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 550 break; 551 case STOPPING: 552 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 553 break; 554 case TERMINATED: 555 case FAILED: 556 throw new AssertionError(); 557 } 558 } 559 560 private void enqueueFailedEvent(final State from, final Throwable cause) { 561 // can't memoize this one due to the exception 562 listeners.enqueue( 563 new ListenerCallQueue.Event<Listener>() { 564 @Override 565 public void call(Listener listener) { 566 listener.failed(from, cause); 567 } 568 569 @Override 570 public String toString() { 571 return "failed({from = " + from + ", cause = " + cause + "})"; 572 } 573 }); 574 } 575 576 /** 577 * An immutable snapshot of the current state of the service. This class represents a consistent 578 * snapshot of the state and therefore it can be used to answer simple queries without needing to 579 * grab a lock. 580 */ 581 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 582 private static final class StateSnapshot { 583 /** 584 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 585 */ 586 final State state; 587 588 /** If true, the user requested a shutdown while the service was still starting up. */ 589 final boolean shutdownWhenStartupFinishes; 590 591 /** 592 * The exception that caused this service to fail. This will be {@code null} unless the service 593 * has failed. 594 */ 595 final @Nullable Throwable failure; 596 597 StateSnapshot(State internalState) { 598 this(internalState, false, null); 599 } 600 601 StateSnapshot( 602 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 603 checkArgument( 604 !shutdownWhenStartupFinishes || internalState == STARTING, 605 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 606 internalState); 607 checkArgument( 608 (failure != null) == (internalState == FAILED), 609 "A failure cause should be set if and only if the state is failed. Got %s and %s " 610 + "instead.", 611 internalState, 612 failure); 613 this.state = internalState; 614 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 615 this.failure = failure; 616 } 617 618 /** @see Service#state() */ 619 State externalState() { 620 if (shutdownWhenStartupFinishes && state == STARTING) { 621 return STOPPING; 622 } else { 623 return state; 624 } 625 } 626 627 /** @see Service#failureCause() */ 628 Throwable failureCause() { 629 checkState( 630 state == FAILED, 631 "failureCause() is only valid if the service has failed, service is %s", 632 state); 633 // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED. 634 return requireNonNull(failure); 635 } 636 } 637}