001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Service.State.FAILED; 021import static com.google.common.util.concurrent.Service.State.NEW; 022import static com.google.common.util.concurrent.Service.State.RUNNING; 023import static com.google.common.util.concurrent.Service.State.STARTING; 024import static com.google.common.util.concurrent.Service.State.STOPPING; 025import static com.google.common.util.concurrent.Service.State.TERMINATED; 026 027import com.google.common.annotations.Beta; 028import com.google.common.annotations.GwtIncompatible; 029import com.google.common.util.concurrent.Monitor.Guard; 030import com.google.common.util.concurrent.Service.State; // javadoc needs this 031import com.google.errorprone.annotations.CanIgnoreReturnValue; 032import com.google.errorprone.annotations.ForOverride; 033import com.google.errorprone.annotations.concurrent.GuardedBy; 034import com.google.j2objc.annotations.WeakOuter; 035import java.util.concurrent.Executor; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import org.checkerframework.checker.nullness.compatqual.NullableDecl; 039 040/** 041 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 042 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 043 * callbacks. Its subclasses must manage threads manually; consider {@link 044 * AbstractExecutionThreadService} if you need only a single execution thread. 045 * 046 * @author Jesse Wilson 047 * @author Luke Sandberg 048 * @since 1.0 049 */ 050@GwtIncompatible 051public abstract class AbstractService implements Service { 052 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 053 new ListenerCallQueue.Event<Listener>() { 054 @Override 055 public void call(Listener listener) { 056 listener.starting(); 057 } 058 059 @Override 060 public String toString() { 061 return "starting()"; 062 } 063 }; 064 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 065 new ListenerCallQueue.Event<Listener>() { 066 @Override 067 public void call(Listener listener) { 068 listener.running(); 069 } 070 071 @Override 072 public String toString() { 073 return "running()"; 074 } 075 }; 076 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 077 stoppingEvent(STARTING); 078 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 079 stoppingEvent(RUNNING); 080 081 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 082 terminatedEvent(NEW); 083 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 084 terminatedEvent(STARTING); 085 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 086 terminatedEvent(RUNNING); 087 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 088 terminatedEvent(STOPPING); 089 090 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 091 return new ListenerCallQueue.Event<Listener>() { 092 @Override 093 public void call(Listener listener) { 094 listener.terminated(from); 095 } 096 097 @Override 098 public String toString() { 099 return "terminated({from = " + from + "})"; 100 } 101 }; 102 } 103 104 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 105 return new ListenerCallQueue.Event<Listener>() { 106 @Override 107 public void call(Listener listener) { 108 listener.stopping(from); 109 } 110 111 @Override 112 public String toString() { 113 return "stopping({from = " + from + "})"; 114 } 115 }; 116 } 117 118 private final Monitor monitor = new Monitor(); 119 120 private final Guard isStartable = new IsStartableGuard(); 121 122 @WeakOuter 123 private final class IsStartableGuard extends Guard { 124 IsStartableGuard() { 125 super(AbstractService.this.monitor); 126 } 127 128 @Override 129 public boolean isSatisfied() { 130 return state() == NEW; 131 } 132 } 133 134 private final Guard isStoppable = new IsStoppableGuard(); 135 136 @WeakOuter 137 private final class IsStoppableGuard extends Guard { 138 IsStoppableGuard() { 139 super(AbstractService.this.monitor); 140 } 141 142 @Override 143 public boolean isSatisfied() { 144 return state().compareTo(RUNNING) <= 0; 145 } 146 } 147 148 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 149 150 @WeakOuter 151 private final class HasReachedRunningGuard extends Guard { 152 HasReachedRunningGuard() { 153 super(AbstractService.this.monitor); 154 } 155 156 @Override 157 public boolean isSatisfied() { 158 return state().compareTo(RUNNING) >= 0; 159 } 160 } 161 162 private final Guard isStopped = new IsStoppedGuard(); 163 164 @WeakOuter 165 private final class IsStoppedGuard extends Guard { 166 IsStoppedGuard() { 167 super(AbstractService.this.monitor); 168 } 169 170 @Override 171 public boolean isSatisfied() { 172 return state().isTerminal(); 173 } 174 } 175 176 /** The listeners to notify during a state transition. */ 177 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 178 179 /** 180 * The current state of the service. This should be written with the lock held but can be read 181 * without it because it is an immutable object in a volatile field. This is desirable so that 182 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 183 * without grabbing the lock. 184 * 185 * <p>To update this field correctly the lock must be held to guarantee that the state is 186 * consistent. 187 */ 188 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 189 190 /** Constructor for use by subclasses. */ 191 protected AbstractService() {} 192 193 /** 194 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 195 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 196 * or after it has returned. If startup fails, the invocation should cause a call to {@link 197 * #notifyFailed(Throwable)} instead. 198 * 199 * <p>This method should return promptly; prefer to do work on a different thread where it is 200 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 201 * called multiple times. 202 */ 203 @ForOverride 204 protected abstract void doStart(); 205 206 /** 207 * This method should be used to initiate service shutdown. The invocation of this method should 208 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 209 * returned. If shutdown fails, the invocation should cause a call to {@link 210 * #notifyFailed(Throwable)} instead. 211 * 212 * <p>This method should return promptly; prefer to do work on a different thread where it is 213 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 214 * called multiple times. 215 * 216 * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 217 * invoked immediately. Instead, it will be deferred until after the service is {@link 218 * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}. 219 */ 220 @ForOverride 221 protected abstract void doStop(); 222 223 /** 224 * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 225 * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 226 * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 227 * 228 * <p>This method should return promptly; prefer to do work on a different thread where it is 229 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 230 * called multiple times. 231 * 232 * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 233 * external state observable by the caller of {@link #stopAsync}. 234 * 235 * @since 27.0 236 */ 237 @Beta 238 @ForOverride 239 protected void doCancelStart() {} 240 241 @CanIgnoreReturnValue 242 @Override 243 public final Service startAsync() { 244 if (monitor.enterIf(isStartable)) { 245 try { 246 snapshot = new StateSnapshot(STARTING); 247 enqueueStartingEvent(); 248 doStart(); 249 } catch (Throwable startupFailure) { 250 notifyFailed(startupFailure); 251 } finally { 252 monitor.leave(); 253 dispatchListenerEvents(); 254 } 255 } else { 256 throw new IllegalStateException("Service " + this + " has already been started"); 257 } 258 return this; 259 } 260 261 @CanIgnoreReturnValue 262 @Override 263 public final Service stopAsync() { 264 if (monitor.enterIf(isStoppable)) { 265 try { 266 State previous = state(); 267 switch (previous) { 268 case NEW: 269 snapshot = new StateSnapshot(TERMINATED); 270 enqueueTerminatedEvent(NEW); 271 break; 272 case STARTING: 273 snapshot = new StateSnapshot(STARTING, true, null); 274 enqueueStoppingEvent(STARTING); 275 doCancelStart(); 276 break; 277 case RUNNING: 278 snapshot = new StateSnapshot(STOPPING); 279 enqueueStoppingEvent(RUNNING); 280 doStop(); 281 break; 282 case STOPPING: 283 case TERMINATED: 284 case FAILED: 285 // These cases are impossible due to the if statement above. 286 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 287 } 288 } catch (Throwable shutdownFailure) { 289 notifyFailed(shutdownFailure); 290 } finally { 291 monitor.leave(); 292 dispatchListenerEvents(); 293 } 294 } 295 return this; 296 } 297 298 @Override 299 public final void awaitRunning() { 300 monitor.enterWhenUninterruptibly(hasReachedRunning); 301 try { 302 checkCurrentState(RUNNING); 303 } finally { 304 monitor.leave(); 305 } 306 } 307 308 @Override 309 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 310 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 311 try { 312 checkCurrentState(RUNNING); 313 } finally { 314 monitor.leave(); 315 } 316 } else { 317 // It is possible due to races the we are currently in the expected state even though we 318 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 319 // even check the guard. I don't think we care too much about this use case but it could lead 320 // to a confusing error message. 321 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 322 } 323 } 324 325 @Override 326 public final void awaitTerminated() { 327 monitor.enterWhenUninterruptibly(isStopped); 328 try { 329 checkCurrentState(TERMINATED); 330 } finally { 331 monitor.leave(); 332 } 333 } 334 335 @Override 336 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 337 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 338 try { 339 checkCurrentState(TERMINATED); 340 } finally { 341 monitor.leave(); 342 } 343 } else { 344 // It is possible due to races the we are currently in the expected state even though we 345 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 346 // even check the guard. I don't think we care too much about this use case but it could lead 347 // to a confusing error message. 348 throw new TimeoutException( 349 "Timed out waiting for " 350 + this 351 + " to reach a terminal state. " 352 + "Current state: " 353 + state()); 354 } 355 } 356 357 /** Checks that the current state is equal to the expected state. */ 358 @GuardedBy("monitor") 359 private void checkCurrentState(State expected) { 360 State actual = state(); 361 if (actual != expected) { 362 if (actual == FAILED) { 363 // Handle this specially so that we can include the failureCause, if there is one. 364 throw new IllegalStateException( 365 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 366 failureCause()); 367 } 368 throw new IllegalStateException( 369 "Expected the service " + this + " to be " + expected + ", but was " + actual); 370 } 371 } 372 373 /** 374 * Implementing classes should invoke this method once their service has started. It will cause 375 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 376 * 377 * @throws IllegalStateException if the service is not {@link State#STARTING}. 378 */ 379 protected final void notifyStarted() { 380 monitor.enter(); 381 try { 382 // We have to examine the internal state of the snapshot here to properly handle the stop 383 // while starting case. 384 if (snapshot.state != STARTING) { 385 IllegalStateException failure = 386 new IllegalStateException( 387 "Cannot notifyStarted() when the service is " + snapshot.state); 388 notifyFailed(failure); 389 throw failure; 390 } 391 392 if (snapshot.shutdownWhenStartupFinishes) { 393 snapshot = new StateSnapshot(STOPPING); 394 // We don't call listeners here because we already did that when we set the 395 // shutdownWhenStartupFinishes flag. 396 doStop(); 397 } else { 398 snapshot = new StateSnapshot(RUNNING); 399 enqueueRunningEvent(); 400 } 401 } finally { 402 monitor.leave(); 403 dispatchListenerEvents(); 404 } 405 } 406 407 /** 408 * Implementing classes should invoke this method once their service has stopped. It will cause 409 * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 410 * State#TERMINATED}. 411 * 412 * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 413 * State#STARTING}, or {@link State#RUNNING}. 414 */ 415 protected final void notifyStopped() { 416 monitor.enter(); 417 try { 418 State previous = state(); 419 switch (previous) { 420 case NEW: 421 case TERMINATED: 422 case FAILED: 423 throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 424 case RUNNING: 425 case STARTING: 426 case STOPPING: 427 snapshot = new StateSnapshot(TERMINATED); 428 enqueueTerminatedEvent(previous); 429 break; 430 } 431 } finally { 432 monitor.leave(); 433 dispatchListenerEvents(); 434 } 435 } 436 437 /** 438 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 439 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 440 * or otherwise cannot be started nor stopped. 441 */ 442 protected final void notifyFailed(Throwable cause) { 443 checkNotNull(cause); 444 445 monitor.enter(); 446 try { 447 State previous = state(); 448 switch (previous) { 449 case NEW: 450 case TERMINATED: 451 throw new IllegalStateException("Failed while in state:" + previous, cause); 452 case RUNNING: 453 case STARTING: 454 case STOPPING: 455 snapshot = new StateSnapshot(FAILED, false, cause); 456 enqueueFailedEvent(previous, cause); 457 break; 458 case FAILED: 459 // Do nothing 460 break; 461 } 462 } finally { 463 monitor.leave(); 464 dispatchListenerEvents(); 465 } 466 } 467 468 @Override 469 public final boolean isRunning() { 470 return state() == RUNNING; 471 } 472 473 @Override 474 public final State state() { 475 return snapshot.externalState(); 476 } 477 478 /** @since 14.0 */ 479 @Override 480 public final Throwable failureCause() { 481 return snapshot.failureCause(); 482 } 483 484 /** @since 13.0 */ 485 @Override 486 public final void addListener(Listener listener, Executor executor) { 487 listeners.addListener(listener, executor); 488 } 489 490 @Override 491 public String toString() { 492 return getClass().getSimpleName() + " [" + state() + "]"; 493 } 494 495 /** 496 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 497 * #monitor}. 498 */ 499 private void dispatchListenerEvents() { 500 if (!monitor.isOccupiedByCurrentThread()) { 501 listeners.dispatch(); 502 } 503 } 504 505 private void enqueueStartingEvent() { 506 listeners.enqueue(STARTING_EVENT); 507 } 508 509 private void enqueueRunningEvent() { 510 listeners.enqueue(RUNNING_EVENT); 511 } 512 513 private void enqueueStoppingEvent(final State from) { 514 if (from == State.STARTING) { 515 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 516 } else if (from == State.RUNNING) { 517 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 518 } else { 519 throw new AssertionError(); 520 } 521 } 522 523 private void enqueueTerminatedEvent(final State from) { 524 switch (from) { 525 case NEW: 526 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 527 break; 528 case STARTING: 529 listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 530 break; 531 case RUNNING: 532 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 533 break; 534 case STOPPING: 535 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 536 break; 537 case TERMINATED: 538 case FAILED: 539 throw new AssertionError(); 540 } 541 } 542 543 private void enqueueFailedEvent(final State from, final Throwable cause) { 544 // can't memoize this one due to the exception 545 listeners.enqueue( 546 new ListenerCallQueue.Event<Listener>() { 547 @Override 548 public void call(Listener listener) { 549 listener.failed(from, cause); 550 } 551 552 @Override 553 public String toString() { 554 return "failed({from = " + from + ", cause = " + cause + "})"; 555 } 556 }); 557 } 558 559 /** 560 * An immutable snapshot of the current state of the service. This class represents a consistent 561 * snapshot of the state and therefore it can be used to answer simple queries without needing to 562 * grab a lock. 563 */ 564 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 565 private static final class StateSnapshot { 566 /** 567 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 568 */ 569 final State state; 570 571 /** If true, the user requested a shutdown while the service was still starting up. */ 572 final boolean shutdownWhenStartupFinishes; 573 574 /** 575 * The exception that caused this service to fail. This will be {@code null} unless the service 576 * has failed. 577 */ 578 @NullableDecl final Throwable failure; 579 580 StateSnapshot(State internalState) { 581 this(internalState, false, null); 582 } 583 584 StateSnapshot( 585 State internalState, boolean shutdownWhenStartupFinishes, @NullableDecl Throwable failure) { 586 checkArgument( 587 !shutdownWhenStartupFinishes || internalState == STARTING, 588 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 589 internalState); 590 checkArgument( 591 !(failure != null ^ internalState == FAILED), 592 "A failure cause should be set if and only if the state is failed. Got %s and %s " 593 + "instead.", 594 internalState, 595 failure); 596 this.state = internalState; 597 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 598 this.failure = failure; 599 } 600 601 /** @see Service#state() */ 602 State externalState() { 603 if (shutdownWhenStartupFinishes && state == STARTING) { 604 return STOPPING; 605 } else { 606 return state; 607 } 608 } 609 610 /** @see Service#failureCause() */ 611 Throwable failureCause() { 612 checkState( 613 state == FAILED, 614 "failureCause() is only valid if the service has failed, service is %s", 615 state); 616 return failure; 617 } 618 } 619}