001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 021import static com.google.common.util.concurrent.Service.State.FAILED; 022import static com.google.common.util.concurrent.Service.State.NEW; 023import static com.google.common.util.concurrent.Service.State.RUNNING; 024import static com.google.common.util.concurrent.Service.State.STARTING; 025import static com.google.common.util.concurrent.Service.State.STOPPING; 026import static com.google.common.util.concurrent.Service.State.TERMINATED; 027import static java.util.Objects.requireNonNull; 028 029import com.google.common.annotations.GwtIncompatible; 030import com.google.common.annotations.J2ktIncompatible; 031import com.google.common.util.concurrent.Monitor.Guard; 032import com.google.common.util.concurrent.Service.State; // javadoc needs this 033import com.google.errorprone.annotations.CanIgnoreReturnValue; 034import com.google.errorprone.annotations.ForOverride; 035import com.google.errorprone.annotations.concurrent.GuardedBy; 036import com.google.j2objc.annotations.WeakOuter; 037import java.util.concurrent.Executor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import javax.annotation.CheckForNull; 041 042/** 043 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 044 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 045 * callbacks. Its subclasses must manage threads manually; consider {@link 046 * AbstractExecutionThreadService} if you need only a single execution thread. 047 * 048 * @author Jesse Wilson 049 * @author Luke Sandberg 050 * @since 1.0 051 */ 052@GwtIncompatible 053@J2ktIncompatible 054@ElementTypesAreNonnullByDefault 055public abstract class AbstractService implements Service { 056 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 057 new ListenerCallQueue.Event<Listener>() { 058 @Override 059 public void call(Listener listener) { 060 listener.starting(); 061 } 062 063 @Override 064 public String toString() { 065 return "starting()"; 066 } 067 }; 068 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 069 new ListenerCallQueue.Event<Listener>() { 070 @Override 071 public void call(Listener listener) { 072 listener.running(); 073 } 074 075 @Override 076 public String toString() { 077 return "running()"; 078 } 079 }; 080 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 081 stoppingEvent(STARTING); 082 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 083 stoppingEvent(RUNNING); 084 085 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 086 terminatedEvent(NEW); 087 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 088 terminatedEvent(STARTING); 089 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 090 terminatedEvent(RUNNING); 091 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 092 terminatedEvent(STOPPING); 093 094 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 095 return new ListenerCallQueue.Event<Listener>() { 096 @Override 097 public void call(Listener listener) { 098 listener.terminated(from); 099 } 100 101 @Override 102 public String toString() { 103 return "terminated({from = " + from + "})"; 104 } 105 }; 106 } 107 108 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 109 return new ListenerCallQueue.Event<Listener>() { 110 @Override 111 public void call(Listener listener) { 112 listener.stopping(from); 113 } 114 115 @Override 116 public String toString() { 117 return "stopping({from = " + from + "})"; 118 } 119 }; 120 } 121 122 private final Monitor monitor = new Monitor(); 123 124 private final Guard isStartable = new IsStartableGuard(); 125 126 @WeakOuter 127 private final class IsStartableGuard extends Guard { 128 IsStartableGuard() { 129 super(AbstractService.this.monitor); 130 } 131 132 @Override 133 public boolean isSatisfied() { 134 return state() == NEW; 135 } 136 } 137 138 private final Guard isStoppable = new IsStoppableGuard(); 139 140 @WeakOuter 141 private final class IsStoppableGuard extends Guard { 142 IsStoppableGuard() { 143 super(AbstractService.this.monitor); 144 } 145 146 @Override 147 public boolean isSatisfied() { 148 return state().compareTo(RUNNING) <= 0; 149 } 150 } 151 152 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 153 154 @WeakOuter 155 private final class HasReachedRunningGuard extends Guard { 156 HasReachedRunningGuard() { 157 super(AbstractService.this.monitor); 158 } 159 160 @Override 161 public boolean isSatisfied() { 162 return state().compareTo(RUNNING) >= 0; 163 } 164 } 165 166 private final Guard isStopped = new IsStoppedGuard(); 167 168 @WeakOuter 169 private final class IsStoppedGuard extends Guard { 170 IsStoppedGuard() { 171 super(AbstractService.this.monitor); 172 } 173 174 @Override 175 public boolean isSatisfied() { 176 return state().compareTo(TERMINATED) >= 0; 177 } 178 } 179 180 /** The listeners to notify during a state transition. */ 181 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 182 183 /** 184 * The current state of the service. This should be written with the lock held but can be read 185 * without it because it is an immutable object in a volatile field. This is desirable so that 186 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 187 * without grabbing the lock. 188 * 189 * <p>To update this field correctly the lock must be held to guarantee that the state is 190 * consistent. 191 */ 192 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 193 194 /** Constructor for use by subclasses. */ 195 protected AbstractService() {} 196 197 /** 198 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 199 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 200 * or after it has returned. If startup fails, the invocation should cause a call to {@link 201 * #notifyFailed(Throwable)} instead. 202 * 203 * <p>This method should return promptly; prefer to do work on a different thread where it is 204 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 205 * called multiple times. 206 */ 207 @ForOverride 208 protected abstract void doStart(); 209 210 /** 211 * This method should be used to initiate service shutdown. The invocation of this method should 212 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 213 * returned. If shutdown fails, the invocation should cause a call to {@link 214 * #notifyFailed(Throwable)} instead. 215 * 216 * <p>This method should return promptly; prefer to do work on a different thread where it is 217 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 218 * called multiple times. 219 * 220 * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 221 * invoked immediately. Instead, it will be deferred until after the service is {@link 222 * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}. 223 */ 224 @ForOverride 225 protected abstract void doStop(); 226 227 /** 228 * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 229 * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 230 * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 231 * 232 * <p>This method should return promptly; prefer to do work on a different thread where it is 233 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 234 * called multiple times. 235 * 236 * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 237 * external state observable by the caller of {@link #stopAsync}. 238 * 239 * @since 27.0 240 */ 241 @ForOverride 242 protected void doCancelStart() {} 243 244 @CanIgnoreReturnValue 245 @Override 246 public final Service startAsync() { 247 if (monitor.enterIf(isStartable)) { 248 try { 249 snapshot = new StateSnapshot(STARTING); 250 enqueueStartingEvent(); 251 doStart(); 252 } catch (Throwable startupFailure) { 253 restoreInterruptIfIsInterruptedException(startupFailure); 254 notifyFailed(startupFailure); 255 } finally { 256 monitor.leave(); 257 dispatchListenerEvents(); 258 } 259 } else { 260 throw new IllegalStateException("Service " + this + " has already been started"); 261 } 262 return this; 263 } 264 265 @CanIgnoreReturnValue 266 @Override 267 public final Service stopAsync() { 268 if (monitor.enterIf(isStoppable)) { 269 try { 270 State previous = state(); 271 switch (previous) { 272 case NEW: 273 snapshot = new StateSnapshot(TERMINATED); 274 enqueueTerminatedEvent(NEW); 275 break; 276 case STARTING: 277 snapshot = new StateSnapshot(STARTING, true, null); 278 enqueueStoppingEvent(STARTING); 279 doCancelStart(); 280 break; 281 case RUNNING: 282 snapshot = new StateSnapshot(STOPPING); 283 enqueueStoppingEvent(RUNNING); 284 doStop(); 285 break; 286 case STOPPING: 287 case TERMINATED: 288 case FAILED: 289 // These cases are impossible due to the if statement above. 290 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 291 } 292 } catch (Throwable shutdownFailure) { 293 restoreInterruptIfIsInterruptedException(shutdownFailure); 294 notifyFailed(shutdownFailure); 295 } finally { 296 monitor.leave(); 297 dispatchListenerEvents(); 298 } 299 } 300 return this; 301 } 302 303 @Override 304 public final void awaitRunning() { 305 monitor.enterWhenUninterruptibly(hasReachedRunning); 306 try { 307 checkCurrentState(RUNNING); 308 } finally { 309 monitor.leave(); 310 } 311 } 312 313 @Override 314 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 315 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 316 try { 317 checkCurrentState(RUNNING); 318 } finally { 319 monitor.leave(); 320 } 321 } else { 322 // It is possible due to races that we are currently in the expected state even though we 323 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 324 // even check the guard. I don't think we care too much about this use case but it could lead 325 // to a confusing error message. 326 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 327 } 328 } 329 330 @Override 331 public final void awaitTerminated() { 332 monitor.enterWhenUninterruptibly(isStopped); 333 try { 334 checkCurrentState(TERMINATED); 335 } finally { 336 monitor.leave(); 337 } 338 } 339 340 @Override 341 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 342 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 343 try { 344 checkCurrentState(TERMINATED); 345 } finally { 346 monitor.leave(); 347 } 348 } else { 349 // It is possible due to races that we are currently in the expected state even though we 350 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 351 // even check the guard. I don't think we care too much about this use case but it could lead 352 // to a confusing error message. 353 throw new TimeoutException( 354 "Timed out waiting for " 355 + this 356 + " to reach a terminal state. " 357 + "Current state: " 358 + state()); 359 } 360 } 361 362 /** Checks that the current state is equal to the expected state. */ 363 @GuardedBy("monitor") 364 private void checkCurrentState(State expected) { 365 State actual = state(); 366 if (actual != expected) { 367 if (actual == FAILED) { 368 // Handle this specially so that we can include the failureCause, if there is one. 369 throw new IllegalStateException( 370 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 371 failureCause()); 372 } 373 throw new IllegalStateException( 374 "Expected the service " + this + " to be " + expected + ", but was " + actual); 375 } 376 } 377 378 /** 379 * Implementing classes should invoke this method once their service has started. It will cause 380 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 381 * 382 * @throws IllegalStateException if the service is not {@link State#STARTING}. 383 */ 384 protected final void notifyStarted() { 385 monitor.enter(); 386 try { 387 // We have to examine the internal state of the snapshot here to properly handle the stop 388 // while starting case. 389 if (snapshot.state != STARTING) { 390 IllegalStateException failure = 391 new IllegalStateException( 392 "Cannot notifyStarted() when the service is " + snapshot.state); 393 notifyFailed(failure); 394 throw failure; 395 } 396 397 if (snapshot.shutdownWhenStartupFinishes) { 398 snapshot = new StateSnapshot(STOPPING); 399 // We don't call listeners here because we already did that when we set the 400 // shutdownWhenStartupFinishes flag. 401 doStop(); 402 } else { 403 snapshot = new StateSnapshot(RUNNING); 404 enqueueRunningEvent(); 405 } 406 } finally { 407 monitor.leave(); 408 dispatchListenerEvents(); 409 } 410 } 411 412 /** 413 * Implementing classes should invoke this method once their service has stopped. It will cause 414 * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 415 * State#TERMINATED}. 416 * 417 * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 418 * State#STARTING}, or {@link State#RUNNING}. 419 */ 420 protected final void notifyStopped() { 421 monitor.enter(); 422 try { 423 State previous = state(); 424 switch (previous) { 425 case NEW: 426 case TERMINATED: 427 case FAILED: 428 throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 429 case RUNNING: 430 case STARTING: 431 case STOPPING: 432 snapshot = new StateSnapshot(TERMINATED); 433 enqueueTerminatedEvent(previous); 434 break; 435 } 436 } finally { 437 monitor.leave(); 438 dispatchListenerEvents(); 439 } 440 } 441 442 /** 443 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 444 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 445 * or otherwise cannot be started nor stopped. 446 */ 447 protected final void notifyFailed(Throwable cause) { 448 checkNotNull(cause); 449 450 monitor.enter(); 451 try { 452 State previous = state(); 453 switch (previous) { 454 case NEW: 455 case TERMINATED: 456 throw new IllegalStateException("Failed while in state:" + previous, cause); 457 case RUNNING: 458 case STARTING: 459 case STOPPING: 460 snapshot = new StateSnapshot(FAILED, false, cause); 461 enqueueFailedEvent(previous, cause); 462 break; 463 case FAILED: 464 // Do nothing 465 break; 466 } 467 } finally { 468 monitor.leave(); 469 dispatchListenerEvents(); 470 } 471 } 472 473 @Override 474 public final boolean isRunning() { 475 return state() == RUNNING; 476 } 477 478 @Override 479 public final State state() { 480 return snapshot.externalState(); 481 } 482 483 /** @since 14.0 */ 484 @Override 485 public final Throwable failureCause() { 486 return snapshot.failureCause(); 487 } 488 489 /** @since 13.0 */ 490 @Override 491 public final void addListener(Listener listener, Executor executor) { 492 listeners.addListener(listener, executor); 493 } 494 495 @Override 496 public String toString() { 497 return getClass().getSimpleName() + " [" + state() + "]"; 498 } 499 500 /** 501 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 502 * #monitor}. 503 */ 504 private void dispatchListenerEvents() { 505 if (!monitor.isOccupiedByCurrentThread()) { 506 listeners.dispatch(); 507 } 508 } 509 510 private void enqueueStartingEvent() { 511 listeners.enqueue(STARTING_EVENT); 512 } 513 514 private void enqueueRunningEvent() { 515 listeners.enqueue(RUNNING_EVENT); 516 } 517 518 private void enqueueStoppingEvent(final State from) { 519 if (from == State.STARTING) { 520 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 521 } else if (from == State.RUNNING) { 522 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 523 } else { 524 throw new AssertionError(); 525 } 526 } 527 528 private void enqueueTerminatedEvent(final State from) { 529 switch (from) { 530 case NEW: 531 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 532 break; 533 case STARTING: 534 listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 535 break; 536 case RUNNING: 537 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 538 break; 539 case STOPPING: 540 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 541 break; 542 case TERMINATED: 543 case FAILED: 544 throw new AssertionError(); 545 } 546 } 547 548 private void enqueueFailedEvent(final State from, final Throwable cause) { 549 // can't memoize this one due to the exception 550 listeners.enqueue( 551 new ListenerCallQueue.Event<Listener>() { 552 @Override 553 public void call(Listener listener) { 554 listener.failed(from, cause); 555 } 556 557 @Override 558 public String toString() { 559 return "failed({from = " + from + ", cause = " + cause + "})"; 560 } 561 }); 562 } 563 564 /** 565 * An immutable snapshot of the current state of the service. This class represents a consistent 566 * snapshot of the state and therefore it can be used to answer simple queries without needing to 567 * grab a lock. 568 */ 569 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 570 private static final class StateSnapshot { 571 /** 572 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 573 */ 574 final State state; 575 576 /** If true, the user requested a shutdown while the service was still starting up. */ 577 final boolean shutdownWhenStartupFinishes; 578 579 /** 580 * The exception that caused this service to fail. This will be {@code null} unless the service 581 * has failed. 582 */ 583 @CheckForNull final Throwable failure; 584 585 StateSnapshot(State internalState) { 586 this(internalState, false, null); 587 } 588 589 StateSnapshot( 590 State internalState, boolean shutdownWhenStartupFinishes, @CheckForNull Throwable failure) { 591 checkArgument( 592 !shutdownWhenStartupFinishes || internalState == STARTING, 593 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 594 internalState); 595 checkArgument( 596 (failure != null) == (internalState == FAILED), 597 "A failure cause should be set if and only if the state is failed. Got %s and %s " 598 + "instead.", 599 internalState, 600 failure); 601 this.state = internalState; 602 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 603 this.failure = failure; 604 } 605 606 /** @see Service#state() */ 607 State externalState() { 608 if (shutdownWhenStartupFinishes && state == STARTING) { 609 return STOPPING; 610 } else { 611 return state; 612 } 613 } 614 615 /** @see Service#failureCause() */ 616 Throwable failureCause() { 617 checkState( 618 state == FAILED, 619 "failureCause() is only valid if the service has failed, service is %s", 620 state); 621 // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED. 622 return requireNonNull(failure); 623 } 624 } 625}