001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Service.State.FAILED; 021import static com.google.common.util.concurrent.Service.State.NEW; 022import static com.google.common.util.concurrent.Service.State.RUNNING; 023import static com.google.common.util.concurrent.Service.State.STARTING; 024import static com.google.common.util.concurrent.Service.State.STOPPING; 025import static com.google.common.util.concurrent.Service.State.TERMINATED; 026 027import com.google.common.annotations.Beta; 028import com.google.common.annotations.GwtIncompatible; 029import com.google.common.util.concurrent.Monitor.Guard; 030import com.google.common.util.concurrent.Service.State; // javadoc needs this 031import com.google.errorprone.annotations.CanIgnoreReturnValue; 032import com.google.errorprone.annotations.ForOverride; 033import com.google.j2objc.annotations.WeakOuter; 034import java.util.concurrent.Executor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import javax.annotation.Nullable; 038import javax.annotation.concurrent.GuardedBy; 039 040/** 041 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 042 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 043 * callbacks. Its subclasses must manage threads manually; consider {@link 044 * AbstractExecutionThreadService} if you need only a single execution thread. 045 * 046 * @author Jesse Wilson 047 * @author Luke Sandberg 048 * @since 1.0 049 */ 050@Beta 051@GwtIncompatible 052public abstract class AbstractService implements Service { 053 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 054 new ListenerCallQueue.Event<Listener>() { 055 @Override 056 public void call(Listener listener) { 057 listener.starting(); 058 } 059 060 @Override 061 public String toString() { 062 return "starting()"; 063 } 064 }; 065 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 066 new ListenerCallQueue.Event<Listener>() { 067 @Override 068 public void call(Listener listener) { 069 listener.running(); 070 } 071 072 @Override 073 public String toString() { 074 return "running()"; 075 } 076 }; 077 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 078 stoppingEvent(STARTING); 079 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 080 stoppingEvent(RUNNING); 081 082 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 083 terminatedEvent(NEW); 084 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 085 terminatedEvent(RUNNING); 086 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 087 terminatedEvent(STOPPING); 088 089 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 090 return new ListenerCallQueue.Event<Listener>() { 091 @Override 092 public void call(Listener listener) { 093 listener.terminated(from); 094 } 095 096 @Override 097 public String toString() { 098 return "terminated({from = " + from + "})"; 099 } 100 }; 101 } 102 103 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 104 return new ListenerCallQueue.Event<Listener>() { 105 @Override 106 public void call(Listener listener) { 107 listener.stopping(from); 108 } 109 110 @Override 111 public String toString() { 112 return "stopping({from = " + from + "})"; 113 } 114 }; 115 } 116 117 private final Monitor monitor = new Monitor(); 118 119 private final Guard isStartable = new IsStartableGuard(); 120 121 @WeakOuter 122 private final class IsStartableGuard extends Guard { 123 IsStartableGuard() { 124 super(AbstractService.this.monitor); 125 } 126 127 @Override 128 public boolean isSatisfied() { 129 return state() == NEW; 130 } 131 } 132 133 private final Guard isStoppable = new IsStoppableGuard(); 134 135 @WeakOuter 136 private final class IsStoppableGuard extends Guard { 137 IsStoppableGuard() { 138 super(AbstractService.this.monitor); 139 } 140 141 @Override 142 public boolean isSatisfied() { 143 return state().compareTo(RUNNING) <= 0; 144 } 145 } 146 147 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 148 149 @WeakOuter 150 private final class HasReachedRunningGuard extends Guard { 151 HasReachedRunningGuard() { 152 super(AbstractService.this.monitor); 153 } 154 155 @Override 156 public boolean isSatisfied() { 157 return state().compareTo(RUNNING) >= 0; 158 } 159 } 160 161 private final Guard isStopped = new IsStoppedGuard(); 162 163 @WeakOuter 164 private final class IsStoppedGuard extends Guard { 165 IsStoppedGuard() { 166 super(AbstractService.this.monitor); 167 } 168 169 @Override 170 public boolean isSatisfied() { 171 return state().isTerminal(); 172 } 173 } 174 175 /** The listeners to notify during a state transition. */ 176 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 177 178 /** 179 * The current state of the service. This should be written with the lock held but can be read 180 * without it because it is an immutable object in a volatile field. This is desirable so that 181 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 182 * without grabbing the lock. 183 * 184 * <p>To update this field correctly the lock must be held to guarantee that the state is 185 * consistent. 186 */ 187 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 188 189 /** Constructor for use by subclasses. */ 190 protected AbstractService() {} 191 192 /** 193 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 194 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 195 * or after it has returned. If startup fails, the invocation should cause a call to {@link 196 * #notifyFailed(Throwable)} instead. 197 * 198 * <p>This method should return promptly; prefer to do work on a different thread where it is 199 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 200 * called multiple times. 201 */ 202 @ForOverride 203 protected abstract void doStart(); 204 205 /** 206 * This method should be used to initiate service shutdown. The invocation of this method should 207 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 208 * returned. If shutdown fails, the invocation should cause a call to {@link 209 * #notifyFailed(Throwable)} instead. 210 * 211 * <p>This method should return promptly; prefer to do work on a different thread where it is 212 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 213 * called multiple times. 214 */ 215 @ForOverride 216 protected abstract void doStop(); 217 218 @CanIgnoreReturnValue 219 @Override 220 public final Service startAsync() { 221 if (monitor.enterIf(isStartable)) { 222 try { 223 snapshot = new StateSnapshot(STARTING); 224 enqueueStartingEvent(); 225 doStart(); 226 } catch (Throwable startupFailure) { 227 notifyFailed(startupFailure); 228 } finally { 229 monitor.leave(); 230 dispatchListenerEvents(); 231 } 232 } else { 233 throw new IllegalStateException("Service " + this + " has already been started"); 234 } 235 return this; 236 } 237 238 @CanIgnoreReturnValue 239 @Override 240 public final Service stopAsync() { 241 if (monitor.enterIf(isStoppable)) { 242 try { 243 State previous = state(); 244 switch (previous) { 245 case NEW: 246 snapshot = new StateSnapshot(TERMINATED); 247 enqueueTerminatedEvent(NEW); 248 break; 249 case STARTING: 250 snapshot = new StateSnapshot(STARTING, true, null); 251 enqueueStoppingEvent(STARTING); 252 break; 253 case RUNNING: 254 snapshot = new StateSnapshot(STOPPING); 255 enqueueStoppingEvent(RUNNING); 256 doStop(); 257 break; 258 case STOPPING: 259 case TERMINATED: 260 case FAILED: 261 // These cases are impossible due to the if statement above. 262 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 263 default: 264 throw new AssertionError("Unexpected state: " + previous); 265 } 266 } catch (Throwable shutdownFailure) { 267 notifyFailed(shutdownFailure); 268 } finally { 269 monitor.leave(); 270 dispatchListenerEvents(); 271 } 272 } 273 return this; 274 } 275 276 @Override 277 public final void awaitRunning() { 278 monitor.enterWhenUninterruptibly(hasReachedRunning); 279 try { 280 checkCurrentState(RUNNING); 281 } finally { 282 monitor.leave(); 283 } 284 } 285 286 @Override 287 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 288 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 289 try { 290 checkCurrentState(RUNNING); 291 } finally { 292 monitor.leave(); 293 } 294 } else { 295 // It is possible due to races the we are currently in the expected state even though we 296 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 297 // even check the guard. I don't think we care too much about this use case but it could lead 298 // to a confusing error message. 299 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 300 } 301 } 302 303 @Override 304 public final void awaitTerminated() { 305 monitor.enterWhenUninterruptibly(isStopped); 306 try { 307 checkCurrentState(TERMINATED); 308 } finally { 309 monitor.leave(); 310 } 311 } 312 313 @Override 314 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 315 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 316 try { 317 checkCurrentState(TERMINATED); 318 } finally { 319 monitor.leave(); 320 } 321 } else { 322 // It is possible due to races the we are currently in the expected state even though we 323 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 324 // even check the guard. I don't think we care too much about this use case but it could lead 325 // to a confusing error message. 326 throw new TimeoutException( 327 "Timed out waiting for " 328 + this 329 + " to reach a terminal state. " 330 + "Current state: " 331 + state()); 332 } 333 } 334 335 /** Checks that the current state is equal to the expected state. */ 336 @GuardedBy("monitor") 337 private void checkCurrentState(State expected) { 338 State actual = state(); 339 if (actual != expected) { 340 if (actual == FAILED) { 341 // Handle this specially so that we can include the failureCause, if there is one. 342 throw new IllegalStateException( 343 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 344 failureCause()); 345 } 346 throw new IllegalStateException( 347 "Expected the service " + this + " to be " + expected + ", but was " + actual); 348 } 349 } 350 351 /** 352 * Implementing classes should invoke this method once their service has started. It will cause 353 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 354 * 355 * @throws IllegalStateException if the service is not {@link State#STARTING}. 356 */ 357 protected final void notifyStarted() { 358 monitor.enter(); 359 try { 360 // We have to examine the internal state of the snapshot here to properly handle the stop 361 // while starting case. 362 if (snapshot.state != STARTING) { 363 IllegalStateException failure = 364 new IllegalStateException( 365 "Cannot notifyStarted() when the service is " + snapshot.state); 366 notifyFailed(failure); 367 throw failure; 368 } 369 370 if (snapshot.shutdownWhenStartupFinishes) { 371 snapshot = new StateSnapshot(STOPPING); 372 // We don't call listeners here because we already did that when we set the 373 // shutdownWhenStartupFinishes flag. 374 doStop(); 375 } else { 376 snapshot = new StateSnapshot(RUNNING); 377 enqueueRunningEvent(); 378 } 379 } finally { 380 monitor.leave(); 381 dispatchListenerEvents(); 382 } 383 } 384 385 /** 386 * Implementing classes should invoke this method once their service has stopped. It will cause 387 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. 388 * 389 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor {@link 390 * State#RUNNING}. 391 */ 392 protected final void notifyStopped() { 393 monitor.enter(); 394 try { 395 // We check the internal state of the snapshot instead of state() directly so we don't allow 396 // notifyStopped() to be called while STARTING, even if stop() has already been called. 397 State previous = snapshot.state; 398 if (previous != STOPPING && previous != RUNNING) { 399 IllegalStateException failure = 400 new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 401 notifyFailed(failure); 402 throw failure; 403 } 404 snapshot = new StateSnapshot(TERMINATED); 405 enqueueTerminatedEvent(previous); 406 } finally { 407 monitor.leave(); 408 dispatchListenerEvents(); 409 } 410 } 411 412 /** 413 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 414 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 415 * or otherwise cannot be started nor stopped. 416 */ 417 protected final void notifyFailed(Throwable cause) { 418 checkNotNull(cause); 419 420 monitor.enter(); 421 try { 422 State previous = state(); 423 switch (previous) { 424 case NEW: 425 case TERMINATED: 426 throw new IllegalStateException("Failed while in state:" + previous, cause); 427 case RUNNING: 428 case STARTING: 429 case STOPPING: 430 snapshot = new StateSnapshot(FAILED, false, cause); 431 enqueueFailedEvent(previous, cause); 432 break; 433 case FAILED: 434 // Do nothing 435 break; 436 default: 437 throw new AssertionError("Unexpected state: " + previous); 438 } 439 } finally { 440 monitor.leave(); 441 dispatchListenerEvents(); 442 } 443 } 444 445 @Override 446 public final boolean isRunning() { 447 return state() == RUNNING; 448 } 449 450 @Override 451 public final State state() { 452 return snapshot.externalState(); 453 } 454 455 /** 456 * @since 14.0 457 */ 458 @Override 459 public final Throwable failureCause() { 460 return snapshot.failureCause(); 461 } 462 463 /** 464 * @since 13.0 465 */ 466 @Override 467 public final void addListener(Listener listener, Executor executor) { 468 listeners.addListener(listener, executor); 469 } 470 471 @Override 472 public String toString() { 473 return getClass().getSimpleName() + " [" + state() + "]"; 474 } 475 476 /** 477 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 478 * #monitor}. 479 */ 480 private void dispatchListenerEvents() { 481 if (!monitor.isOccupiedByCurrentThread()) { 482 listeners.dispatch(); 483 } 484 } 485 486 private void enqueueStartingEvent() { 487 listeners.enqueue(STARTING_EVENT); 488 } 489 490 private void enqueueRunningEvent() { 491 listeners.enqueue(RUNNING_EVENT); 492 } 493 494 private void enqueueStoppingEvent(final State from) { 495 if (from == State.STARTING) { 496 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 497 } else if (from == State.RUNNING) { 498 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 499 } else { 500 throw new AssertionError(); 501 } 502 } 503 504 private void enqueueTerminatedEvent(final State from) { 505 switch (from) { 506 case NEW: 507 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 508 break; 509 case RUNNING: 510 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 511 break; 512 case STOPPING: 513 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 514 break; 515 case STARTING: 516 case TERMINATED: 517 case FAILED: 518 default: 519 throw new AssertionError(); 520 } 521 } 522 523 private void enqueueFailedEvent(final State from, final Throwable cause) { 524 // can't memoize this one due to the exception 525 listeners.enqueue( 526 new ListenerCallQueue.Event<Listener>() { 527 @Override 528 public void call(Listener listener) { 529 listener.failed(from, cause); 530 } 531 532 @Override 533 public String toString() { 534 return "failed({from = " + from + ", cause = " + cause + "})"; 535 } 536 }); 537 } 538 539 /** 540 * An immutable snapshot of the current state of the service. This class represents a consistent 541 * snapshot of the state and therefore it can be used to answer simple queries without needing to 542 * grab a lock. 543 */ 544 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 545 private static final class StateSnapshot { 546 /** 547 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 548 */ 549 final State state; 550 551 /** 552 * If true, the user requested a shutdown while the service was still starting up. 553 */ 554 final boolean shutdownWhenStartupFinishes; 555 556 /** 557 * The exception that caused this service to fail. This will be {@code null} unless the service 558 * has failed. 559 */ 560 @Nullable final Throwable failure; 561 562 StateSnapshot(State internalState) { 563 this(internalState, false, null); 564 } 565 566 StateSnapshot( 567 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 568 checkArgument( 569 !shutdownWhenStartupFinishes || internalState == STARTING, 570 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 571 internalState); 572 checkArgument( 573 !(failure != null ^ internalState == FAILED), 574 "A failure cause should be set if and only if the state is failed. Got %s and %s " 575 + "instead.", 576 internalState, 577 failure); 578 this.state = internalState; 579 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 580 this.failure = failure; 581 } 582 583 /** @see Service#state() */ 584 State externalState() { 585 if (shutdownWhenStartupFinishes && state == STARTING) { 586 return STOPPING; 587 } else { 588 return state; 589 } 590 } 591 592 /** @see Service#failureCause() */ 593 Throwable failureCause() { 594 checkState( 595 state == FAILED, 596 "failureCause() is only valid if the service has failed, service is %s", 597 state); 598 return failure; 599 } 600 } 601}