001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Service.State.FAILED; 021import static com.google.common.util.concurrent.Service.State.NEW; 022import static com.google.common.util.concurrent.Service.State.RUNNING; 023import static com.google.common.util.concurrent.Service.State.STARTING; 024import static com.google.common.util.concurrent.Service.State.STOPPING; 025import static com.google.common.util.concurrent.Service.State.TERMINATED; 026 027import com.google.common.annotations.Beta; 028import com.google.common.annotations.GwtIncompatible; 029import com.google.common.util.concurrent.Monitor.Guard; 030import com.google.common.util.concurrent.Service.State; // javadoc needs this 031import com.google.errorprone.annotations.CanIgnoreReturnValue; 032import com.google.errorprone.annotations.ForOverride; 033import com.google.j2objc.annotations.WeakOuter; 034import java.util.concurrent.Executor; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import javax.annotation.Nullable; 038import javax.annotation.concurrent.GuardedBy; 039import javax.annotation.concurrent.Immutable; 040 041/** 042 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 043 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 044 * callbacks. Its subclasses must manage threads manually; consider {@link 045 * AbstractExecutionThreadService} if you need only a single execution thread. 046 * 047 * @author Jesse Wilson 048 * @author Luke Sandberg 049 * @since 1.0 050 */ 051@Beta 052@GwtIncompatible 053public abstract class AbstractService implements Service { 054 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 055 new ListenerCallQueue.Event<Listener>() { 056 @Override 057 public void call(Listener listener) { 058 listener.starting(); 059 } 060 061 @Override 062 public String toString() { 063 return "starting()"; 064 } 065 }; 066 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 067 new ListenerCallQueue.Event<Listener>() { 068 @Override 069 public void call(Listener listener) { 070 listener.running(); 071 } 072 073 @Override 074 public String toString() { 075 return "running()"; 076 } 077 }; 078 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 079 stoppingEvent(STARTING); 080 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 081 stoppingEvent(RUNNING); 082 083 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 084 terminatedEvent(NEW); 085 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 086 terminatedEvent(RUNNING); 087 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 088 terminatedEvent(STOPPING); 089 090 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 091 return new ListenerCallQueue.Event<Listener>() { 092 @Override 093 public void call(Listener listener) { 094 listener.terminated(from); 095 } 096 097 @Override 098 public String toString() { 099 return "terminated({from = " + from + "})"; 100 } 101 }; 102 } 103 104 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 105 return new ListenerCallQueue.Event<Listener>() { 106 @Override 107 public void call(Listener listener) { 108 listener.stopping(from); 109 } 110 111 @Override 112 public String toString() { 113 return "stopping({from = " + from + "})"; 114 } 115 }; 116 } 117 118 private final Monitor monitor = new Monitor(); 119 120 private final Guard isStartable = new IsStartableGuard(); 121 122 @WeakOuter 123 private final class IsStartableGuard extends Guard { 124 IsStartableGuard() { 125 super(AbstractService.this.monitor); 126 } 127 128 @Override 129 public boolean isSatisfied() { 130 return state() == NEW; 131 } 132 } 133 134 private final Guard isStoppable = new IsStoppableGuard(); 135 136 @WeakOuter 137 private final class IsStoppableGuard extends Guard { 138 IsStoppableGuard() { 139 super(AbstractService.this.monitor); 140 } 141 142 @Override 143 public boolean isSatisfied() { 144 return state().compareTo(RUNNING) <= 0; 145 } 146 } 147 148 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 149 150 @WeakOuter 151 private final class HasReachedRunningGuard extends Guard { 152 HasReachedRunningGuard() { 153 super(AbstractService.this.monitor); 154 } 155 156 @Override 157 public boolean isSatisfied() { 158 return state().compareTo(RUNNING) >= 0; 159 } 160 } 161 162 private final Guard isStopped = new IsStoppedGuard(); 163 164 @WeakOuter 165 private final class IsStoppedGuard extends Guard { 166 IsStoppedGuard() { 167 super(AbstractService.this.monitor); 168 } 169 170 @Override 171 public boolean isSatisfied() { 172 return state().isTerminal(); 173 } 174 } 175 176 /** The listeners to notify during a state transition. */ 177 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 178 179 /** 180 * The current state of the service. This should be written with the lock held but can be read 181 * without it because it is an immutable object in a volatile field. This is desirable so that 182 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 183 * without grabbing the lock. 184 * 185 * <p>To update this field correctly the lock must be held to guarantee that the state is 186 * consistent. 187 */ 188 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 189 190 /** Constructor for use by subclasses. */ 191 protected AbstractService() {} 192 193 /** 194 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 195 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 196 * or after it has returned. If startup fails, the invocation should cause a call to {@link 197 * #notifyFailed(Throwable)} instead. 198 * 199 * <p>This method should return promptly; prefer to do work on a different thread where it is 200 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 201 * called multiple times. 202 */ 203 @ForOverride 204 protected abstract void doStart(); 205 206 /** 207 * This method should be used to initiate service shutdown. The invocation of this method should 208 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 209 * returned. If shutdown fails, the invocation should cause a call to {@link 210 * #notifyFailed(Throwable)} instead. 211 * 212 * <p>This method should return promptly; prefer to do work on a different thread where it is 213 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 214 * called multiple times. 215 */ 216 @ForOverride 217 protected abstract void doStop(); 218 219 @CanIgnoreReturnValue 220 @Override 221 public final Service startAsync() { 222 if (monitor.enterIf(isStartable)) { 223 try { 224 snapshot = new StateSnapshot(STARTING); 225 enqueueStartingEvent(); 226 doStart(); 227 } catch (Throwable startupFailure) { 228 notifyFailed(startupFailure); 229 } finally { 230 monitor.leave(); 231 dispatchListenerEvents(); 232 } 233 } else { 234 throw new IllegalStateException("Service " + this + " has already been started"); 235 } 236 return this; 237 } 238 239 @CanIgnoreReturnValue 240 @Override 241 public final Service stopAsync() { 242 if (monitor.enterIf(isStoppable)) { 243 try { 244 State previous = state(); 245 switch (previous) { 246 case NEW: 247 snapshot = new StateSnapshot(TERMINATED); 248 enqueueTerminatedEvent(NEW); 249 break; 250 case STARTING: 251 snapshot = new StateSnapshot(STARTING, true, null); 252 enqueueStoppingEvent(STARTING); 253 break; 254 case RUNNING: 255 snapshot = new StateSnapshot(STOPPING); 256 enqueueStoppingEvent(RUNNING); 257 doStop(); 258 break; 259 case STOPPING: 260 case TERMINATED: 261 case FAILED: 262 // These cases are impossible due to the if statement above. 263 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 264 default: 265 throw new AssertionError("Unexpected state: " + previous); 266 } 267 } catch (Throwable shutdownFailure) { 268 notifyFailed(shutdownFailure); 269 } finally { 270 monitor.leave(); 271 dispatchListenerEvents(); 272 } 273 } 274 return this; 275 } 276 277 @Override 278 public final void awaitRunning() { 279 monitor.enterWhenUninterruptibly(hasReachedRunning); 280 try { 281 checkCurrentState(RUNNING); 282 } finally { 283 monitor.leave(); 284 } 285 } 286 287 @Override 288 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 289 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 290 try { 291 checkCurrentState(RUNNING); 292 } finally { 293 monitor.leave(); 294 } 295 } else { 296 // It is possible due to races the we are currently in the expected state even though we 297 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 298 // even check the guard. I don't think we care too much about this use case but it could lead 299 // to a confusing error message. 300 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 301 } 302 } 303 304 @Override 305 public final void awaitTerminated() { 306 monitor.enterWhenUninterruptibly(isStopped); 307 try { 308 checkCurrentState(TERMINATED); 309 } finally { 310 monitor.leave(); 311 } 312 } 313 314 @Override 315 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 316 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 317 try { 318 checkCurrentState(TERMINATED); 319 } finally { 320 monitor.leave(); 321 } 322 } else { 323 // It is possible due to races the we are currently in the expected state even though we 324 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 325 // even check the guard. I don't think we care too much about this use case but it could lead 326 // to a confusing error message. 327 throw new TimeoutException( 328 "Timed out waiting for " 329 + this 330 + " to reach a terminal state. " 331 + "Current state: " 332 + state()); 333 } 334 } 335 336 /** Checks that the current state is equal to the expected state. */ 337 @GuardedBy("monitor") 338 private void checkCurrentState(State expected) { 339 State actual = state(); 340 if (actual != expected) { 341 if (actual == FAILED) { 342 // Handle this specially so that we can include the failureCause, if there is one. 343 throw new IllegalStateException( 344 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 345 failureCause()); 346 } 347 throw new IllegalStateException( 348 "Expected the service " + this + " to be " + expected + ", but was " + actual); 349 } 350 } 351 352 /** 353 * Implementing classes should invoke this method once their service has started. It will cause 354 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 355 * 356 * @throws IllegalStateException if the service is not {@link State#STARTING}. 357 */ 358 protected final void notifyStarted() { 359 monitor.enter(); 360 try { 361 // We have to examine the internal state of the snapshot here to properly handle the stop 362 // while starting case. 363 if (snapshot.state != STARTING) { 364 IllegalStateException failure = 365 new IllegalStateException( 366 "Cannot notifyStarted() when the service is " + snapshot.state); 367 notifyFailed(failure); 368 throw failure; 369 } 370 371 if (snapshot.shutdownWhenStartupFinishes) { 372 snapshot = new StateSnapshot(STOPPING); 373 // We don't call listeners here because we already did that when we set the 374 // shutdownWhenStartupFinishes flag. 375 doStop(); 376 } else { 377 snapshot = new StateSnapshot(RUNNING); 378 enqueueRunningEvent(); 379 } 380 } finally { 381 monitor.leave(); 382 dispatchListenerEvents(); 383 } 384 } 385 386 /** 387 * Implementing classes should invoke this method once their service has stopped. It will cause 388 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. 389 * 390 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor {@link 391 * State#RUNNING}. 392 */ 393 protected final void notifyStopped() { 394 monitor.enter(); 395 try { 396 // We check the internal state of the snapshot instead of state() directly so we don't allow 397 // notifyStopped() to be called while STARTING, even if stop() has already been called. 398 State previous = snapshot.state; 399 if (previous != STOPPING && previous != RUNNING) { 400 IllegalStateException failure = 401 new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 402 notifyFailed(failure); 403 throw failure; 404 } 405 snapshot = new StateSnapshot(TERMINATED); 406 enqueueTerminatedEvent(previous); 407 } finally { 408 monitor.leave(); 409 dispatchListenerEvents(); 410 } 411 } 412 413 /** 414 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 415 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 416 * or otherwise cannot be started nor stopped. 417 */ 418 protected final void notifyFailed(Throwable cause) { 419 checkNotNull(cause); 420 421 monitor.enter(); 422 try { 423 State previous = state(); 424 switch (previous) { 425 case NEW: 426 case TERMINATED: 427 throw new IllegalStateException("Failed while in state:" + previous, cause); 428 case RUNNING: 429 case STARTING: 430 case STOPPING: 431 snapshot = new StateSnapshot(FAILED, false, cause); 432 enqueueFailedEvent(previous, cause); 433 break; 434 case FAILED: 435 // Do nothing 436 break; 437 default: 438 throw new AssertionError("Unexpected state: " + previous); 439 } 440 } finally { 441 monitor.leave(); 442 dispatchListenerEvents(); 443 } 444 } 445 446 @Override 447 public final boolean isRunning() { 448 return state() == RUNNING; 449 } 450 451 @Override 452 public final State state() { 453 return snapshot.externalState(); 454 } 455 456 /** 457 * @since 14.0 458 */ 459 @Override 460 public final Throwable failureCause() { 461 return snapshot.failureCause(); 462 } 463 464 /** 465 * @since 13.0 466 */ 467 @Override 468 public final void addListener(Listener listener, Executor executor) { 469 listeners.addListener(listener, executor); 470 } 471 472 @Override 473 public String toString() { 474 return getClass().getSimpleName() + " [" + state() + "]"; 475 } 476 477 /** 478 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 479 * #monitor}. 480 */ 481 private void dispatchListenerEvents() { 482 if (!monitor.isOccupiedByCurrentThread()) { 483 listeners.dispatch(); 484 } 485 } 486 487 private void enqueueStartingEvent() { 488 listeners.enqueue(STARTING_EVENT); 489 } 490 491 private void enqueueRunningEvent() { 492 listeners.enqueue(RUNNING_EVENT); 493 } 494 495 private void enqueueStoppingEvent(final State from) { 496 if (from == State.STARTING) { 497 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 498 } else if (from == State.RUNNING) { 499 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 500 } else { 501 throw new AssertionError(); 502 } 503 } 504 505 private void enqueueTerminatedEvent(final State from) { 506 switch (from) { 507 case NEW: 508 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 509 break; 510 case RUNNING: 511 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 512 break; 513 case STOPPING: 514 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 515 break; 516 case STARTING: 517 case TERMINATED: 518 case FAILED: 519 default: 520 throw new AssertionError(); 521 } 522 } 523 524 private void enqueueFailedEvent(final State from, final Throwable cause) { 525 // can't memoize this one due to the exception 526 listeners.enqueue( 527 new ListenerCallQueue.Event<Listener>() { 528 @Override 529 public void call(Listener listener) { 530 listener.failed(from, cause); 531 } 532 533 @Override 534 public String toString() { 535 return "failed({from = " + from + ", cause = " + cause + "})"; 536 } 537 }); 538 } 539 540 /** 541 * An immutable snapshot of the current state of the service. This class represents a consistent 542 * snapshot of the state and therefore it can be used to answer simple queries without needing to 543 * grab a lock. 544 */ 545 @Immutable 546 private static final class StateSnapshot { 547 /** 548 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 549 */ 550 final State state; 551 552 /** 553 * If true, the user requested a shutdown while the service was still starting up. 554 */ 555 final boolean shutdownWhenStartupFinishes; 556 557 /** 558 * The exception that caused this service to fail. This will be {@code null} unless the service 559 * has failed. 560 */ 561 @Nullable final Throwable failure; 562 563 StateSnapshot(State internalState) { 564 this(internalState, false, null); 565 } 566 567 StateSnapshot( 568 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 569 checkArgument( 570 !shutdownWhenStartupFinishes || internalState == STARTING, 571 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 572 internalState); 573 checkArgument( 574 !(failure != null ^ internalState == FAILED), 575 "A failure cause should be set if and only if the state is failed. Got %s and %s " 576 + "instead.", 577 internalState, 578 failure); 579 this.state = internalState; 580 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 581 this.failure = failure; 582 } 583 584 /** @see Service#state() */ 585 State externalState() { 586 if (shutdownWhenStartupFinishes && state == STARTING) { 587 return STOPPING; 588 } else { 589 return state; 590 } 591 } 592 593 /** @see Service#failureCause() */ 594 Throwable failureCause() { 595 checkState( 596 state == FAILED, 597 "failureCause() is only valid if the service has failed, service is %s", 598 state); 599 return failure; 600 } 601 } 602}