001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Service.State.FAILED; 021import static com.google.common.util.concurrent.Service.State.NEW; 022import static com.google.common.util.concurrent.Service.State.RUNNING; 023import static com.google.common.util.concurrent.Service.State.STARTING; 024import static com.google.common.util.concurrent.Service.State.STOPPING; 025import static com.google.common.util.concurrent.Service.State.TERMINATED; 026import static java.util.Objects.requireNonNull; 027 028import com.google.common.annotations.Beta; 029import com.google.common.annotations.GwtIncompatible; 030import com.google.common.util.concurrent.Monitor.Guard; 031import com.google.common.util.concurrent.Service.State; // javadoc needs this 032import com.google.errorprone.annotations.CanIgnoreReturnValue; 033import com.google.errorprone.annotations.ForOverride; 034import com.google.errorprone.annotations.concurrent.GuardedBy; 035import com.google.j2objc.annotations.WeakOuter; 036import java.util.concurrent.Executor; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.TimeoutException; 039import javax.annotation.CheckForNull; 040 041/** 042 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 043 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 044 * callbacks. Its subclasses must manage threads manually; consider {@link 045 * AbstractExecutionThreadService} if you need only a single execution thread. 046 * 047 * @author Jesse Wilson 048 * @author Luke Sandberg 049 * @since 1.0 050 */ 051@GwtIncompatible 052@ElementTypesAreNonnullByDefault 053public abstract class AbstractService implements Service { 054 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 055 new ListenerCallQueue.Event<Listener>() { 056 @Override 057 public void call(Listener listener) { 058 listener.starting(); 059 } 060 061 @Override 062 public String toString() { 063 return "starting()"; 064 } 065 }; 066 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 067 new ListenerCallQueue.Event<Listener>() { 068 @Override 069 public void call(Listener listener) { 070 listener.running(); 071 } 072 073 @Override 074 public String toString() { 075 return "running()"; 076 } 077 }; 078 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 079 stoppingEvent(STARTING); 080 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 081 stoppingEvent(RUNNING); 082 083 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 084 terminatedEvent(NEW); 085 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 086 terminatedEvent(STARTING); 087 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 088 terminatedEvent(RUNNING); 089 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 090 terminatedEvent(STOPPING); 091 092 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 093 return new ListenerCallQueue.Event<Listener>() { 094 @Override 095 public void call(Listener listener) { 096 listener.terminated(from); 097 } 098 099 @Override 100 public String toString() { 101 return "terminated({from = " + from + "})"; 102 } 103 }; 104 } 105 106 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 107 return new ListenerCallQueue.Event<Listener>() { 108 @Override 109 public void call(Listener listener) { 110 listener.stopping(from); 111 } 112 113 @Override 114 public String toString() { 115 return "stopping({from = " + from + "})"; 116 } 117 }; 118 } 119 120 private final Monitor monitor = new Monitor(); 121 122 private final Guard isStartable = new IsStartableGuard(); 123 124 @WeakOuter 125 private final class IsStartableGuard extends Guard { 126 IsStartableGuard() { 127 super(AbstractService.this.monitor); 128 } 129 130 @Override 131 public boolean isSatisfied() { 132 return state() == NEW; 133 } 134 } 135 136 private final Guard isStoppable = new IsStoppableGuard(); 137 138 @WeakOuter 139 private final class IsStoppableGuard extends Guard { 140 IsStoppableGuard() { 141 super(AbstractService.this.monitor); 142 } 143 144 @Override 145 public boolean isSatisfied() { 146 return state().compareTo(RUNNING) <= 0; 147 } 148 } 149 150 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 151 152 @WeakOuter 153 private final class HasReachedRunningGuard extends Guard { 154 HasReachedRunningGuard() { 155 super(AbstractService.this.monitor); 156 } 157 158 @Override 159 public boolean isSatisfied() { 160 return state().compareTo(RUNNING) >= 0; 161 } 162 } 163 164 private final Guard isStopped = new IsStoppedGuard(); 165 166 @WeakOuter 167 private final class IsStoppedGuard extends Guard { 168 IsStoppedGuard() { 169 super(AbstractService.this.monitor); 170 } 171 172 @Override 173 public boolean isSatisfied() { 174 return state().compareTo(TERMINATED) >= 0; 175 } 176 } 177 178 /** The listeners to notify during a state transition. */ 179 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 180 181 /** 182 * The current state of the service. This should be written with the lock held but can be read 183 * without it because it is an immutable object in a volatile field. This is desirable so that 184 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 185 * without grabbing the lock. 186 * 187 * <p>To update this field correctly the lock must be held to guarantee that the state is 188 * consistent. 189 */ 190 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 191 192 /** Constructor for use by subclasses. */ 193 protected AbstractService() {} 194 195 /** 196 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 197 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 198 * or after it has returned. If startup fails, the invocation should cause a call to {@link 199 * #notifyFailed(Throwable)} instead. 200 * 201 * <p>This method should return promptly; prefer to do work on a different thread where it is 202 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 203 * called multiple times. 204 */ 205 @ForOverride 206 protected abstract void doStart(); 207 208 /** 209 * This method should be used to initiate service shutdown. The invocation of this method should 210 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 211 * returned. If shutdown fails, the invocation should cause a call to {@link 212 * #notifyFailed(Throwable)} instead. 213 * 214 * <p>This method should return promptly; prefer to do work on a different thread where it is 215 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 216 * called multiple times. 217 * 218 * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 219 * invoked immediately. Instead, it will be deferred until after the service is {@link 220 * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}. 221 */ 222 @ForOverride 223 protected abstract void doStop(); 224 225 /** 226 * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 227 * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 228 * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 229 * 230 * <p>This method should return promptly; prefer to do work on a different thread where it is 231 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 232 * called multiple times. 233 * 234 * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 235 * external state observable by the caller of {@link #stopAsync}. 236 * 237 * @since 27.0 238 */ 239 @Beta 240 @ForOverride 241 protected void doCancelStart() {} 242 243 @CanIgnoreReturnValue 244 @Override 245 public final Service startAsync() { 246 if (monitor.enterIf(isStartable)) { 247 try { 248 snapshot = new StateSnapshot(STARTING); 249 enqueueStartingEvent(); 250 doStart(); 251 } catch (Throwable startupFailure) { 252 notifyFailed(startupFailure); 253 } finally { 254 monitor.leave(); 255 dispatchListenerEvents(); 256 } 257 } else { 258 throw new IllegalStateException("Service " + this + " has already been started"); 259 } 260 return this; 261 } 262 263 @CanIgnoreReturnValue 264 @Override 265 public final Service stopAsync() { 266 if (monitor.enterIf(isStoppable)) { 267 try { 268 State previous = state(); 269 switch (previous) { 270 case NEW: 271 snapshot = new StateSnapshot(TERMINATED); 272 enqueueTerminatedEvent(NEW); 273 break; 274 case STARTING: 275 snapshot = new StateSnapshot(STARTING, true, null); 276 enqueueStoppingEvent(STARTING); 277 doCancelStart(); 278 break; 279 case RUNNING: 280 snapshot = new StateSnapshot(STOPPING); 281 enqueueStoppingEvent(RUNNING); 282 doStop(); 283 break; 284 case STOPPING: 285 case TERMINATED: 286 case FAILED: 287 // These cases are impossible due to the if statement above. 288 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 289 } 290 } catch (Throwable shutdownFailure) { 291 notifyFailed(shutdownFailure); 292 } finally { 293 monitor.leave(); 294 dispatchListenerEvents(); 295 } 296 } 297 return this; 298 } 299 300 @Override 301 public final void awaitRunning() { 302 monitor.enterWhenUninterruptibly(hasReachedRunning); 303 try { 304 checkCurrentState(RUNNING); 305 } finally { 306 monitor.leave(); 307 } 308 } 309 310 @Override 311 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 312 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 313 try { 314 checkCurrentState(RUNNING); 315 } finally { 316 monitor.leave(); 317 } 318 } else { 319 // It is possible due to races the we are currently in the expected state even though we 320 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 321 // even check the guard. I don't think we care too much about this use case but it could lead 322 // to a confusing error message. 323 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 324 } 325 } 326 327 @Override 328 public final void awaitTerminated() { 329 monitor.enterWhenUninterruptibly(isStopped); 330 try { 331 checkCurrentState(TERMINATED); 332 } finally { 333 monitor.leave(); 334 } 335 } 336 337 @Override 338 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 339 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 340 try { 341 checkCurrentState(TERMINATED); 342 } finally { 343 monitor.leave(); 344 } 345 } else { 346 // It is possible due to races the we are currently in the expected state even though we 347 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 348 // even check the guard. I don't think we care too much about this use case but it could lead 349 // to a confusing error message. 350 throw new TimeoutException( 351 "Timed out waiting for " 352 + this 353 + " to reach a terminal state. " 354 + "Current state: " 355 + state()); 356 } 357 } 358 359 /** Checks that the current state is equal to the expected state. */ 360 @GuardedBy("monitor") 361 private void checkCurrentState(State expected) { 362 State actual = state(); 363 if (actual != expected) { 364 if (actual == FAILED) { 365 // Handle this specially so that we can include the failureCause, if there is one. 366 throw new IllegalStateException( 367 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 368 failureCause()); 369 } 370 throw new IllegalStateException( 371 "Expected the service " + this + " to be " + expected + ", but was " + actual); 372 } 373 } 374 375 /** 376 * Implementing classes should invoke this method once their service has started. It will cause 377 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 378 * 379 * @throws IllegalStateException if the service is not {@link State#STARTING}. 380 */ 381 protected final void notifyStarted() { 382 monitor.enter(); 383 try { 384 // We have to examine the internal state of the snapshot here to properly handle the stop 385 // while starting case. 386 if (snapshot.state != STARTING) { 387 IllegalStateException failure = 388 new IllegalStateException( 389 "Cannot notifyStarted() when the service is " + snapshot.state); 390 notifyFailed(failure); 391 throw failure; 392 } 393 394 if (snapshot.shutdownWhenStartupFinishes) { 395 snapshot = new StateSnapshot(STOPPING); 396 // We don't call listeners here because we already did that when we set the 397 // shutdownWhenStartupFinishes flag. 398 doStop(); 399 } else { 400 snapshot = new StateSnapshot(RUNNING); 401 enqueueRunningEvent(); 402 } 403 } finally { 404 monitor.leave(); 405 dispatchListenerEvents(); 406 } 407 } 408 409 /** 410 * Implementing classes should invoke this method once their service has stopped. It will cause 411 * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 412 * State#TERMINATED}. 413 * 414 * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 415 * State#STARTING}, or {@link State#RUNNING}. 416 */ 417 protected final void notifyStopped() { 418 monitor.enter(); 419 try { 420 State previous = state(); 421 switch (previous) { 422 case NEW: 423 case TERMINATED: 424 case FAILED: 425 throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 426 case RUNNING: 427 case STARTING: 428 case STOPPING: 429 snapshot = new StateSnapshot(TERMINATED); 430 enqueueTerminatedEvent(previous); 431 break; 432 } 433 } finally { 434 monitor.leave(); 435 dispatchListenerEvents(); 436 } 437 } 438 439 /** 440 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 441 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 442 * or otherwise cannot be started nor stopped. 443 */ 444 protected final void notifyFailed(Throwable cause) { 445 checkNotNull(cause); 446 447 monitor.enter(); 448 try { 449 State previous = state(); 450 switch (previous) { 451 case NEW: 452 case TERMINATED: 453 throw new IllegalStateException("Failed while in state:" + previous, cause); 454 case RUNNING: 455 case STARTING: 456 case STOPPING: 457 snapshot = new StateSnapshot(FAILED, false, cause); 458 enqueueFailedEvent(previous, cause); 459 break; 460 case FAILED: 461 // Do nothing 462 break; 463 } 464 } finally { 465 monitor.leave(); 466 dispatchListenerEvents(); 467 } 468 } 469 470 @Override 471 public final boolean isRunning() { 472 return state() == RUNNING; 473 } 474 475 @Override 476 public final State state() { 477 return snapshot.externalState(); 478 } 479 480 /** @since 14.0 */ 481 @Override 482 public final Throwable failureCause() { 483 return snapshot.failureCause(); 484 } 485 486 /** @since 13.0 */ 487 @Override 488 public final void addListener(Listener listener, Executor executor) { 489 listeners.addListener(listener, executor); 490 } 491 492 @Override 493 public String toString() { 494 return getClass().getSimpleName() + " [" + state() + "]"; 495 } 496 497 /** 498 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 499 * #monitor}. 500 */ 501 private void dispatchListenerEvents() { 502 if (!monitor.isOccupiedByCurrentThread()) { 503 listeners.dispatch(); 504 } 505 } 506 507 private void enqueueStartingEvent() { 508 listeners.enqueue(STARTING_EVENT); 509 } 510 511 private void enqueueRunningEvent() { 512 listeners.enqueue(RUNNING_EVENT); 513 } 514 515 private void enqueueStoppingEvent(final State from) { 516 if (from == State.STARTING) { 517 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 518 } else if (from == State.RUNNING) { 519 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 520 } else { 521 throw new AssertionError(); 522 } 523 } 524 525 private void enqueueTerminatedEvent(final State from) { 526 switch (from) { 527 case NEW: 528 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 529 break; 530 case STARTING: 531 listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 532 break; 533 case RUNNING: 534 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 535 break; 536 case STOPPING: 537 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 538 break; 539 case TERMINATED: 540 case FAILED: 541 throw new AssertionError(); 542 } 543 } 544 545 private void enqueueFailedEvent(final State from, final Throwable cause) { 546 // can't memoize this one due to the exception 547 listeners.enqueue( 548 new ListenerCallQueue.Event<Listener>() { 549 @Override 550 public void call(Listener listener) { 551 listener.failed(from, cause); 552 } 553 554 @Override 555 public String toString() { 556 return "failed({from = " + from + ", cause = " + cause + "})"; 557 } 558 }); 559 } 560 561 /** 562 * An immutable snapshot of the current state of the service. This class represents a consistent 563 * snapshot of the state and therefore it can be used to answer simple queries without needing to 564 * grab a lock. 565 */ 566 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 567 private static final class StateSnapshot { 568 /** 569 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 570 */ 571 final State state; 572 573 /** If true, the user requested a shutdown while the service was still starting up. */ 574 final boolean shutdownWhenStartupFinishes; 575 576 /** 577 * The exception that caused this service to fail. This will be {@code null} unless the service 578 * has failed. 579 */ 580 @CheckForNull final Throwable failure; 581 582 StateSnapshot(State internalState) { 583 this(internalState, false, null); 584 } 585 586 StateSnapshot( 587 State internalState, boolean shutdownWhenStartupFinishes, @CheckForNull Throwable failure) { 588 checkArgument( 589 !shutdownWhenStartupFinishes || internalState == STARTING, 590 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 591 internalState); 592 checkArgument( 593 (failure != null) == (internalState == FAILED), 594 "A failure cause should be set if and only if the state is failed. Got %s and %s " 595 + "instead.", 596 internalState, 597 failure); 598 this.state = internalState; 599 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 600 this.failure = failure; 601 } 602 603 /** @see Service#state() */ 604 State externalState() { 605 if (shutdownWhenStartupFinishes && state == STARTING) { 606 return STOPPING; 607 } else { 608 return state; 609 } 610 } 611 612 /** @see Service#failureCause() */ 613 Throwable failureCause() { 614 checkState( 615 state == FAILED, 616 "failureCause() is only valid if the service has failed, service is %s", 617 state); 618 // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED. 619 return requireNonNull(failure); 620 } 621 } 622}