001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Service.State.FAILED; 021import static com.google.common.util.concurrent.Service.State.NEW; 022import static com.google.common.util.concurrent.Service.State.RUNNING; 023import static com.google.common.util.concurrent.Service.State.STARTING; 024import static com.google.common.util.concurrent.Service.State.STOPPING; 025import static com.google.common.util.concurrent.Service.State.TERMINATED; 026 027import com.google.common.annotations.Beta; 028import com.google.common.annotations.GwtIncompatible; 029import com.google.common.util.concurrent.Monitor.Guard; 030import com.google.common.util.concurrent.Service.State; // javadoc needs this 031import com.google.errorprone.annotations.CanIgnoreReturnValue; 032import com.google.errorprone.annotations.ForOverride; 033import com.google.errorprone.annotations.concurrent.GuardedBy; 034import com.google.j2objc.annotations.WeakOuter; 035import java.util.concurrent.Executor; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import org.checkerframework.checker.nullness.qual.Nullable; 039 040/** 041 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 042 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 043 * callbacks. Its subclasses must manage threads manually; consider {@link 044 * AbstractExecutionThreadService} if you need only a single execution thread. 045 * 046 * @author Jesse Wilson 047 * @author Luke Sandberg 048 * @since 1.0 049 */ 050@Beta 051@GwtIncompatible 052public abstract class AbstractService implements Service { 053 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 054 new ListenerCallQueue.Event<Listener>() { 055 @Override 056 public void call(Listener listener) { 057 listener.starting(); 058 } 059 060 @Override 061 public String toString() { 062 return "starting()"; 063 } 064 }; 065 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 066 new ListenerCallQueue.Event<Listener>() { 067 @Override 068 public void call(Listener listener) { 069 listener.running(); 070 } 071 072 @Override 073 public String toString() { 074 return "running()"; 075 } 076 }; 077 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 078 stoppingEvent(STARTING); 079 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 080 stoppingEvent(RUNNING); 081 082 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 083 terminatedEvent(NEW); 084 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 085 terminatedEvent(STARTING); 086 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 087 terminatedEvent(RUNNING); 088 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 089 terminatedEvent(STOPPING); 090 091 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 092 return new ListenerCallQueue.Event<Listener>() { 093 @Override 094 public void call(Listener listener) { 095 listener.terminated(from); 096 } 097 098 @Override 099 public String toString() { 100 return "terminated({from = " + from + "})"; 101 } 102 }; 103 } 104 105 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 106 return new ListenerCallQueue.Event<Listener>() { 107 @Override 108 public void call(Listener listener) { 109 listener.stopping(from); 110 } 111 112 @Override 113 public String toString() { 114 return "stopping({from = " + from + "})"; 115 } 116 }; 117 } 118 119 private final Monitor monitor = new Monitor(); 120 121 private final Guard isStartable = new IsStartableGuard(); 122 123 @WeakOuter 124 private final class IsStartableGuard extends Guard { 125 IsStartableGuard() { 126 super(AbstractService.this.monitor); 127 } 128 129 @Override 130 public boolean isSatisfied() { 131 return state() == NEW; 132 } 133 } 134 135 private final Guard isStoppable = new IsStoppableGuard(); 136 137 @WeakOuter 138 private final class IsStoppableGuard extends Guard { 139 IsStoppableGuard() { 140 super(AbstractService.this.monitor); 141 } 142 143 @Override 144 public boolean isSatisfied() { 145 return state().compareTo(RUNNING) <= 0; 146 } 147 } 148 149 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 150 151 @WeakOuter 152 private final class HasReachedRunningGuard extends Guard { 153 HasReachedRunningGuard() { 154 super(AbstractService.this.monitor); 155 } 156 157 @Override 158 public boolean isSatisfied() { 159 return state().compareTo(RUNNING) >= 0; 160 } 161 } 162 163 private final Guard isStopped = new IsStoppedGuard(); 164 165 @WeakOuter 166 private final class IsStoppedGuard extends Guard { 167 IsStoppedGuard() { 168 super(AbstractService.this.monitor); 169 } 170 171 @Override 172 public boolean isSatisfied() { 173 return state().isTerminal(); 174 } 175 } 176 177 /** The listeners to notify during a state transition. */ 178 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 179 180 /** 181 * The current state of the service. This should be written with the lock held but can be read 182 * without it because it is an immutable object in a volatile field. This is desirable so that 183 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 184 * without grabbing the lock. 185 * 186 * <p>To update this field correctly the lock must be held to guarantee that the state is 187 * consistent. 188 */ 189 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 190 191 /** Constructor for use by subclasses. */ 192 protected AbstractService() {} 193 194 /** 195 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 196 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 197 * or after it has returned. If startup fails, the invocation should cause a call to {@link 198 * #notifyFailed(Throwable)} instead. 199 * 200 * <p>This method should return promptly; prefer to do work on a different thread where it is 201 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 202 * called multiple times. 203 */ 204 @ForOverride 205 protected abstract void doStart(); 206 207 /** 208 * This method should be used to initiate service shutdown. The invocation of this method should 209 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 210 * returned. If shutdown fails, the invocation should cause a call to {@link 211 * #notifyFailed(Throwable)} instead. 212 * 213 * <p>This method should return promptly; prefer to do work on a different thread where it is 214 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 215 * called multiple times. 216 * 217 * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 218 * invoked immediately. Instead, it will be deferred until after the service is {@link 219 * State#RUNNING}. Services that need to cancel startup work can override {#link #doCancelStart}. 220 */ 221 @ForOverride 222 protected abstract void doStop(); 223 224 /** 225 * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 226 * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 227 * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 228 * 229 * <p>This method should return promptly; prefer to do work on a different thread where it is 230 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 231 * called multiple times. 232 * 233 * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 234 * external state observable by the caller of {@link #stopAsync}. 235 * 236 * @since 27.0 237 */ 238 @ForOverride 239 protected void doCancelStart() {} 240 241 @CanIgnoreReturnValue 242 @Override 243 public final Service startAsync() { 244 if (monitor.enterIf(isStartable)) { 245 try { 246 snapshot = new StateSnapshot(STARTING); 247 enqueueStartingEvent(); 248 doStart(); 249 } catch (Throwable startupFailure) { 250 notifyFailed(startupFailure); 251 } finally { 252 monitor.leave(); 253 dispatchListenerEvents(); 254 } 255 } else { 256 throw new IllegalStateException("Service " + this + " has already been started"); 257 } 258 return this; 259 } 260 261 @CanIgnoreReturnValue 262 @Override 263 public final Service stopAsync() { 264 if (monitor.enterIf(isStoppable)) { 265 try { 266 State previous = state(); 267 switch (previous) { 268 case NEW: 269 snapshot = new StateSnapshot(TERMINATED); 270 enqueueTerminatedEvent(NEW); 271 break; 272 case STARTING: 273 snapshot = new StateSnapshot(STARTING, true, null); 274 enqueueStoppingEvent(STARTING); 275 doCancelStart(); 276 break; 277 case RUNNING: 278 snapshot = new StateSnapshot(STOPPING); 279 enqueueStoppingEvent(RUNNING); 280 doStop(); 281 break; 282 case STOPPING: 283 case TERMINATED: 284 case FAILED: 285 // These cases are impossible due to the if statement above. 286 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 287 } 288 } catch (Throwable shutdownFailure) { 289 notifyFailed(shutdownFailure); 290 } finally { 291 monitor.leave(); 292 dispatchListenerEvents(); 293 } 294 } 295 return this; 296 } 297 298 @Override 299 public final void awaitRunning() { 300 monitor.enterWhenUninterruptibly(hasReachedRunning); 301 try { 302 checkCurrentState(RUNNING); 303 } finally { 304 monitor.leave(); 305 } 306 } 307 308 @Override 309 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 310 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 311 try { 312 checkCurrentState(RUNNING); 313 } finally { 314 monitor.leave(); 315 } 316 } else { 317 // It is possible due to races the we are currently in the expected state even though we 318 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 319 // even check the guard. I don't think we care too much about this use case but it could lead 320 // to a confusing error message. 321 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 322 } 323 } 324 325 @Override 326 public final void awaitTerminated() { 327 monitor.enterWhenUninterruptibly(isStopped); 328 try { 329 checkCurrentState(TERMINATED); 330 } finally { 331 monitor.leave(); 332 } 333 } 334 335 @Override 336 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 337 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 338 try { 339 checkCurrentState(TERMINATED); 340 } finally { 341 monitor.leave(); 342 } 343 } else { 344 // It is possible due to races the we are currently in the expected state even though we 345 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 346 // even check the guard. I don't think we care too much about this use case but it could lead 347 // to a confusing error message. 348 throw new TimeoutException( 349 "Timed out waiting for " 350 + this 351 + " to reach a terminal state. " 352 + "Current state: " 353 + state()); 354 } 355 } 356 357 /** Checks that the current state is equal to the expected state. */ 358 @GuardedBy("monitor") 359 private void checkCurrentState(State expected) { 360 State actual = state(); 361 if (actual != expected) { 362 if (actual == FAILED) { 363 // Handle this specially so that we can include the failureCause, if there is one. 364 throw new IllegalStateException( 365 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 366 failureCause()); 367 } 368 throw new IllegalStateException( 369 "Expected the service " + this + " to be " + expected + ", but was " + actual); 370 } 371 } 372 373 /** 374 * Implementing classes should invoke this method once their service has started. It will cause 375 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 376 * 377 * @throws IllegalStateException if the service is not {@link State#STARTING}. 378 */ 379 protected final void notifyStarted() { 380 monitor.enter(); 381 try { 382 // We have to examine the internal state of the snapshot here to properly handle the stop 383 // while starting case. 384 if (snapshot.state != STARTING) { 385 IllegalStateException failure = 386 new IllegalStateException( 387 "Cannot notifyStarted() when the service is " + snapshot.state); 388 notifyFailed(failure); 389 throw failure; 390 } 391 392 if (snapshot.shutdownWhenStartupFinishes) { 393 snapshot = new StateSnapshot(STOPPING); 394 // We don't call listeners here because we already did that when we set the 395 // shutdownWhenStartupFinishes flag. 396 doStop(); 397 } else { 398 snapshot = new StateSnapshot(RUNNING); 399 enqueueRunningEvent(); 400 } 401 } finally { 402 monitor.leave(); 403 dispatchListenerEvents(); 404 } 405 } 406 407 /** 408 * Implementing classes should invoke this method once their service has stopped. It will cause 409 * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 410 * State#TERMINATED}. 411 * 412 * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 413 * State#STARTING}, or {@link State#RUNNING}. 414 */ 415 protected final void notifyStopped() { 416 monitor.enter(); 417 try { 418 State previous = state(); 419 switch (previous) { 420 case NEW: 421 case TERMINATED: 422 case FAILED: 423 throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 424 case RUNNING: 425 case STARTING: 426 case STOPPING: 427 snapshot = new StateSnapshot(TERMINATED); 428 enqueueTerminatedEvent(previous); 429 break; 430 } 431 } finally { 432 monitor.leave(); 433 dispatchListenerEvents(); 434 } 435 } 436 437 /** 438 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 439 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 440 * or otherwise cannot be started nor stopped. 441 */ 442 protected final void notifyFailed(Throwable cause) { 443 checkNotNull(cause); 444 445 monitor.enter(); 446 try { 447 State previous = state(); 448 switch (previous) { 449 case NEW: 450 case TERMINATED: 451 throw new IllegalStateException("Failed while in state:" + previous, cause); 452 case RUNNING: 453 case STARTING: 454 case STOPPING: 455 snapshot = new StateSnapshot(FAILED, false, cause); 456 enqueueFailedEvent(previous, cause); 457 break; 458 case FAILED: 459 // Do nothing 460 break; 461 } 462 } finally { 463 monitor.leave(); 464 dispatchListenerEvents(); 465 } 466 } 467 468 @Override 469 public final boolean isRunning() { 470 return state() == RUNNING; 471 } 472 473 @Override 474 public final State state() { 475 return snapshot.externalState(); 476 } 477 478 /** @since 14.0 */ 479 @Override 480 public final Throwable failureCause() { 481 return snapshot.failureCause(); 482 } 483 484 /** @since 13.0 */ 485 @Override 486 public final void addListener(Listener listener, Executor executor) { 487 listeners.addListener(listener, executor); 488 } 489 490 @Override 491 public String toString() { 492 return getClass().getSimpleName() + " [" + state() + "]"; 493 } 494 495 /** 496 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 497 * #monitor}. 498 */ 499 private void dispatchListenerEvents() { 500 if (!monitor.isOccupiedByCurrentThread()) { 501 listeners.dispatch(); 502 } 503 } 504 505 private void enqueueStartingEvent() { 506 listeners.enqueue(STARTING_EVENT); 507 } 508 509 private void enqueueRunningEvent() { 510 listeners.enqueue(RUNNING_EVENT); 511 } 512 513 private void enqueueStoppingEvent(final State from) { 514 if (from == State.STARTING) { 515 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 516 } else if (from == State.RUNNING) { 517 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 518 } else { 519 throw new AssertionError(); 520 } 521 } 522 523 private void enqueueTerminatedEvent(final State from) { 524 switch (from) { 525 case NEW: 526 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 527 break; 528 case STARTING: 529 listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 530 break; 531 case RUNNING: 532 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 533 break; 534 case STOPPING: 535 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 536 break; 537 case TERMINATED: 538 case FAILED: 539 throw new AssertionError(); 540 } 541 } 542 543 private void enqueueFailedEvent(final State from, final Throwable cause) { 544 // can't memoize this one due to the exception 545 listeners.enqueue( 546 new ListenerCallQueue.Event<Listener>() { 547 @Override 548 public void call(Listener listener) { 549 listener.failed(from, cause); 550 } 551 552 @Override 553 public String toString() { 554 return "failed({from = " + from + ", cause = " + cause + "})"; 555 } 556 }); 557 } 558 559 /** 560 * An immutable snapshot of the current state of the service. This class represents a consistent 561 * snapshot of the state and therefore it can be used to answer simple queries without needing to 562 * grab a lock. 563 */ 564 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 565 private static final class StateSnapshot { 566 /** 567 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 568 */ 569 final State state; 570 571 /** If true, the user requested a shutdown while the service was still starting up. */ 572 final boolean shutdownWhenStartupFinishes; 573 574 /** 575 * The exception that caused this service to fail. This will be {@code null} unless the service 576 * has failed. 577 */ 578 final @Nullable Throwable failure; 579 580 StateSnapshot(State internalState) { 581 this(internalState, false, null); 582 } 583 584 StateSnapshot( 585 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 586 checkArgument( 587 !shutdownWhenStartupFinishes || internalState == STARTING, 588 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 589 internalState); 590 checkArgument( 591 !(failure != null ^ internalState == FAILED), 592 "A failure cause should be set if and only if the state is failed. Got %s and %s " 593 + "instead.", 594 internalState, 595 failure); 596 this.state = internalState; 597 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 598 this.failure = failure; 599 } 600 601 /** @see Service#state() */ 602 State externalState() { 603 if (shutdownWhenStartupFinishes && state == STARTING) { 604 return STOPPING; 605 } else { 606 return state; 607 } 608 } 609 610 /** @see Service#failureCause() */ 611 Throwable failureCause() { 612 checkState( 613 state == FAILED, 614 "failureCause() is only valid if the service has failed, service is %s", 615 state); 616 return failure; 617 } 618 } 619}