001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Service.State.FAILED; 021import static com.google.common.util.concurrent.Service.State.NEW; 022import static com.google.common.util.concurrent.Service.State.RUNNING; 023import static com.google.common.util.concurrent.Service.State.STARTING; 024import static com.google.common.util.concurrent.Service.State.STOPPING; 025import static com.google.common.util.concurrent.Service.State.TERMINATED; 026 027import com.google.common.annotations.Beta; 028import com.google.common.annotations.GwtIncompatible; 029import com.google.common.util.concurrent.Monitor.Guard; 030import com.google.common.util.concurrent.Service.State; // javadoc needs this 031import com.google.errorprone.annotations.CanIgnoreReturnValue; 032import com.google.errorprone.annotations.ForOverride; 033import com.google.errorprone.annotations.concurrent.GuardedBy; 034import com.google.j2objc.annotations.WeakOuter; 035import java.util.concurrent.Executor; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import org.checkerframework.checker.nullness.compatqual.NullableDecl; 039 040/** 041 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 042 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 043 * callbacks. Its subclasses must manage threads manually; consider {@link 044 * AbstractExecutionThreadService} if you need only a single execution thread. 045 * 046 * @author Jesse Wilson 047 * @author Luke Sandberg 048 * @since 1.0 049 */ 050@Beta 051@GwtIncompatible 052public abstract class AbstractService implements Service { 053 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 054 new ListenerCallQueue.Event<Listener>() { 055 @Override 056 public void call(Listener listener) { 057 listener.starting(); 058 } 059 060 @Override 061 public String toString() { 062 return "starting()"; 063 } 064 }; 065 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 066 new ListenerCallQueue.Event<Listener>() { 067 @Override 068 public void call(Listener listener) { 069 listener.running(); 070 } 071 072 @Override 073 public String toString() { 074 return "running()"; 075 } 076 }; 077 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 078 stoppingEvent(STARTING); 079 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 080 stoppingEvent(RUNNING); 081 082 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 083 terminatedEvent(NEW); 084 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 085 terminatedEvent(RUNNING); 086 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 087 terminatedEvent(STOPPING); 088 089 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 090 return new ListenerCallQueue.Event<Listener>() { 091 @Override 092 public void call(Listener listener) { 093 listener.terminated(from); 094 } 095 096 @Override 097 public String toString() { 098 return "terminated({from = " + from + "})"; 099 } 100 }; 101 } 102 103 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 104 return new ListenerCallQueue.Event<Listener>() { 105 @Override 106 public void call(Listener listener) { 107 listener.stopping(from); 108 } 109 110 @Override 111 public String toString() { 112 return "stopping({from = " + from + "})"; 113 } 114 }; 115 } 116 117 private final Monitor monitor = new Monitor(); 118 119 private final Guard isStartable = new IsStartableGuard(); 120 121 @WeakOuter 122 private final class IsStartableGuard extends Guard { 123 IsStartableGuard() { 124 super(AbstractService.this.monitor); 125 } 126 127 @Override 128 public boolean isSatisfied() { 129 return state() == NEW; 130 } 131 } 132 133 private final Guard isStoppable = new IsStoppableGuard(); 134 135 @WeakOuter 136 private final class IsStoppableGuard extends Guard { 137 IsStoppableGuard() { 138 super(AbstractService.this.monitor); 139 } 140 141 @Override 142 public boolean isSatisfied() { 143 return state().compareTo(RUNNING) <= 0; 144 } 145 } 146 147 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 148 149 @WeakOuter 150 private final class HasReachedRunningGuard extends Guard { 151 HasReachedRunningGuard() { 152 super(AbstractService.this.monitor); 153 } 154 155 @Override 156 public boolean isSatisfied() { 157 return state().compareTo(RUNNING) >= 0; 158 } 159 } 160 161 private final Guard isStopped = new IsStoppedGuard(); 162 163 @WeakOuter 164 private final class IsStoppedGuard extends Guard { 165 IsStoppedGuard() { 166 super(AbstractService.this.monitor); 167 } 168 169 @Override 170 public boolean isSatisfied() { 171 return state().isTerminal(); 172 } 173 } 174 175 /** The listeners to notify during a state transition. */ 176 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 177 178 /** 179 * The current state of the service. This should be written with the lock held but can be read 180 * without it because it is an immutable object in a volatile field. This is desirable so that 181 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 182 * without grabbing the lock. 183 * 184 * <p>To update this field correctly the lock must be held to guarantee that the state is 185 * consistent. 186 */ 187 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 188 189 /** Constructor for use by subclasses. */ 190 protected AbstractService() {} 191 192 /** 193 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 194 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 195 * or after it has returned. If startup fails, the invocation should cause a call to {@link 196 * #notifyFailed(Throwable)} instead. 197 * 198 * <p>This method should return promptly; prefer to do work on a different thread where it is 199 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 200 * called multiple times. 201 */ 202 @ForOverride 203 protected abstract void doStart(); 204 205 /** 206 * This method should be used to initiate service shutdown. The invocation of this method should 207 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 208 * returned. If shutdown fails, the invocation should cause a call to {@link 209 * #notifyFailed(Throwable)} instead. 210 * 211 * <p>This method should return promptly; prefer to do work on a different thread where it is 212 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 213 * called multiple times. 214 */ 215 @ForOverride 216 protected abstract void doStop(); 217 218 @CanIgnoreReturnValue 219 @Override 220 public final Service startAsync() { 221 if (monitor.enterIf(isStartable)) { 222 try { 223 snapshot = new StateSnapshot(STARTING); 224 enqueueStartingEvent(); 225 doStart(); 226 } catch (Throwable startupFailure) { 227 notifyFailed(startupFailure); 228 } finally { 229 monitor.leave(); 230 dispatchListenerEvents(); 231 } 232 } else { 233 throw new IllegalStateException("Service " + this + " has already been started"); 234 } 235 return this; 236 } 237 238 @CanIgnoreReturnValue 239 @Override 240 public final Service stopAsync() { 241 if (monitor.enterIf(isStoppable)) { 242 try { 243 State previous = state(); 244 switch (previous) { 245 case NEW: 246 snapshot = new StateSnapshot(TERMINATED); 247 enqueueTerminatedEvent(NEW); 248 break; 249 case STARTING: 250 snapshot = new StateSnapshot(STARTING, true, null); 251 enqueueStoppingEvent(STARTING); 252 break; 253 case RUNNING: 254 snapshot = new StateSnapshot(STOPPING); 255 enqueueStoppingEvent(RUNNING); 256 doStop(); 257 break; 258 case STOPPING: 259 case TERMINATED: 260 case FAILED: 261 // These cases are impossible due to the if statement above. 262 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 263 default: 264 throw new AssertionError("Unexpected state: " + previous); 265 } 266 } catch (Throwable shutdownFailure) { 267 notifyFailed(shutdownFailure); 268 } finally { 269 monitor.leave(); 270 dispatchListenerEvents(); 271 } 272 } 273 return this; 274 } 275 276 @Override 277 public final void awaitRunning() { 278 monitor.enterWhenUninterruptibly(hasReachedRunning); 279 try { 280 checkCurrentState(RUNNING); 281 } finally { 282 monitor.leave(); 283 } 284 } 285 286 @Override 287 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 288 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 289 try { 290 checkCurrentState(RUNNING); 291 } finally { 292 monitor.leave(); 293 } 294 } else { 295 // It is possible due to races the we are currently in the expected state even though we 296 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 297 // even check the guard. I don't think we care too much about this use case but it could lead 298 // to a confusing error message. 299 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 300 } 301 } 302 303 @Override 304 public final void awaitTerminated() { 305 monitor.enterWhenUninterruptibly(isStopped); 306 try { 307 checkCurrentState(TERMINATED); 308 } finally { 309 monitor.leave(); 310 } 311 } 312 313 @Override 314 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 315 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 316 try { 317 checkCurrentState(TERMINATED); 318 } finally { 319 monitor.leave(); 320 } 321 } else { 322 // It is possible due to races the we are currently in the expected state even though we 323 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 324 // even check the guard. I don't think we care too much about this use case but it could lead 325 // to a confusing error message. 326 throw new TimeoutException( 327 "Timed out waiting for " 328 + this 329 + " to reach a terminal state. " 330 + "Current state: " 331 + state()); 332 } 333 } 334 335 /** Checks that the current state is equal to the expected state. */ 336 @GuardedBy("monitor") 337 private void checkCurrentState(State expected) { 338 State actual = state(); 339 if (actual != expected) { 340 if (actual == FAILED) { 341 // Handle this specially so that we can include the failureCause, if there is one. 342 throw new IllegalStateException( 343 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 344 failureCause()); 345 } 346 throw new IllegalStateException( 347 "Expected the service " + this + " to be " + expected + ", but was " + actual); 348 } 349 } 350 351 /** 352 * Implementing classes should invoke this method once their service has started. It will cause 353 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 354 * 355 * @throws IllegalStateException if the service is not {@link State#STARTING}. 356 */ 357 protected final void notifyStarted() { 358 monitor.enter(); 359 try { 360 // We have to examine the internal state of the snapshot here to properly handle the stop 361 // while starting case. 362 if (snapshot.state != STARTING) { 363 IllegalStateException failure = 364 new IllegalStateException( 365 "Cannot notifyStarted() when the service is " + snapshot.state); 366 notifyFailed(failure); 367 throw failure; 368 } 369 370 if (snapshot.shutdownWhenStartupFinishes) { 371 snapshot = new StateSnapshot(STOPPING); 372 // We don't call listeners here because we already did that when we set the 373 // shutdownWhenStartupFinishes flag. 374 doStop(); 375 } else { 376 snapshot = new StateSnapshot(RUNNING); 377 enqueueRunningEvent(); 378 } 379 } finally { 380 monitor.leave(); 381 dispatchListenerEvents(); 382 } 383 } 384 385 /** 386 * Implementing classes should invoke this method once their service has stopped. It will cause 387 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. 388 * 389 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor {@link 390 * State#RUNNING}. 391 */ 392 protected final void notifyStopped() { 393 monitor.enter(); 394 try { 395 // We check the internal state of the snapshot instead of state() directly so we don't allow 396 // notifyStopped() to be called while STARTING, even if stop() has already been called. 397 State previous = snapshot.state; 398 if (previous != STOPPING && previous != RUNNING) { 399 IllegalStateException failure = 400 new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 401 notifyFailed(failure); 402 throw failure; 403 } 404 snapshot = new StateSnapshot(TERMINATED); 405 enqueueTerminatedEvent(previous); 406 } finally { 407 monitor.leave(); 408 dispatchListenerEvents(); 409 } 410 } 411 412 /** 413 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 414 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 415 * or otherwise cannot be started nor stopped. 416 */ 417 protected final void notifyFailed(Throwable cause) { 418 checkNotNull(cause); 419 420 monitor.enter(); 421 try { 422 State previous = state(); 423 switch (previous) { 424 case NEW: 425 case TERMINATED: 426 throw new IllegalStateException("Failed while in state:" + previous, cause); 427 case RUNNING: 428 case STARTING: 429 case STOPPING: 430 snapshot = new StateSnapshot(FAILED, false, cause); 431 enqueueFailedEvent(previous, cause); 432 break; 433 case FAILED: 434 // Do nothing 435 break; 436 default: 437 throw new AssertionError("Unexpected state: " + previous); 438 } 439 } finally { 440 monitor.leave(); 441 dispatchListenerEvents(); 442 } 443 } 444 445 @Override 446 public final boolean isRunning() { 447 return state() == RUNNING; 448 } 449 450 @Override 451 public final State state() { 452 return snapshot.externalState(); 453 } 454 455 /** @since 14.0 */ 456 @Override 457 public final Throwable failureCause() { 458 return snapshot.failureCause(); 459 } 460 461 /** @since 13.0 */ 462 @Override 463 public final void addListener(Listener listener, Executor executor) { 464 listeners.addListener(listener, executor); 465 } 466 467 @Override 468 public String toString() { 469 return getClass().getSimpleName() + " [" + state() + "]"; 470 } 471 472 /** 473 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 474 * #monitor}. 475 */ 476 private void dispatchListenerEvents() { 477 if (!monitor.isOccupiedByCurrentThread()) { 478 listeners.dispatch(); 479 } 480 } 481 482 private void enqueueStartingEvent() { 483 listeners.enqueue(STARTING_EVENT); 484 } 485 486 private void enqueueRunningEvent() { 487 listeners.enqueue(RUNNING_EVENT); 488 } 489 490 private void enqueueStoppingEvent(final State from) { 491 if (from == State.STARTING) { 492 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 493 } else if (from == State.RUNNING) { 494 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 495 } else { 496 throw new AssertionError(); 497 } 498 } 499 500 private void enqueueTerminatedEvent(final State from) { 501 switch (from) { 502 case NEW: 503 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 504 break; 505 case RUNNING: 506 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 507 break; 508 case STOPPING: 509 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 510 break; 511 case STARTING: 512 case TERMINATED: 513 case FAILED: 514 default: 515 throw new AssertionError(); 516 } 517 } 518 519 private void enqueueFailedEvent(final State from, final Throwable cause) { 520 // can't memoize this one due to the exception 521 listeners.enqueue( 522 new ListenerCallQueue.Event<Listener>() { 523 @Override 524 public void call(Listener listener) { 525 listener.failed(from, cause); 526 } 527 528 @Override 529 public String toString() { 530 return "failed({from = " + from + ", cause = " + cause + "})"; 531 } 532 }); 533 } 534 535 /** 536 * An immutable snapshot of the current state of the service. This class represents a consistent 537 * snapshot of the state and therefore it can be used to answer simple queries without needing to 538 * grab a lock. 539 */ 540 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 541 private static final class StateSnapshot { 542 /** 543 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 544 */ 545 final State state; 546 547 /** If true, the user requested a shutdown while the service was still starting up. */ 548 final boolean shutdownWhenStartupFinishes; 549 550 /** 551 * The exception that caused this service to fail. This will be {@code null} unless the service 552 * has failed. 553 */ 554 @NullableDecl final Throwable failure; 555 556 StateSnapshot(State internalState) { 557 this(internalState, false, null); 558 } 559 560 StateSnapshot( 561 State internalState, boolean shutdownWhenStartupFinishes, @NullableDecl Throwable failure) { 562 checkArgument( 563 !shutdownWhenStartupFinishes || internalState == STARTING, 564 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 565 internalState); 566 checkArgument( 567 !(failure != null ^ internalState == FAILED), 568 "A failure cause should be set if and only if the state is failed. Got %s and %s " 569 + "instead.", 570 internalState, 571 failure); 572 this.state = internalState; 573 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 574 this.failure = failure; 575 } 576 577 /** @see Service#state() */ 578 State externalState() { 579 if (shutdownWhenStartupFinishes && state == STARTING) { 580 return STOPPING; 581 } else { 582 return state; 583 } 584 } 585 586 /** @see Service#failureCause() */ 587 Throwable failureCause() { 588 checkState( 589 state == FAILED, 590 "failureCause() is only valid if the service has failed, service is %s", 591 state); 592 return failure; 593 } 594 } 595}