001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 021import static com.google.common.util.concurrent.Service.State.FAILED; 022import static com.google.common.util.concurrent.Service.State.NEW; 023import static com.google.common.util.concurrent.Service.State.RUNNING; 024import static com.google.common.util.concurrent.Service.State.STARTING; 025import static com.google.common.util.concurrent.Service.State.STOPPING; 026import static com.google.common.util.concurrent.Service.State.TERMINATED; 027import static java.util.Objects.requireNonNull; 028 029import com.google.common.annotations.GwtIncompatible; 030import com.google.common.annotations.J2ktIncompatible; 031import com.google.common.util.concurrent.Monitor.Guard; 032import com.google.common.util.concurrent.Service.State; 033import com.google.errorprone.annotations.CanIgnoreReturnValue; 034import com.google.errorprone.annotations.ForOverride; 035import com.google.errorprone.annotations.concurrent.GuardedBy; 036import com.google.j2objc.annotations.WeakOuter; 037import java.util.concurrent.Executor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import org.checkerframework.checker.nullness.qual.Nullable; 041 042/** 043 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 044 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 045 * callbacks. Its subclasses must manage threads manually; consider {@link 046 * AbstractExecutionThreadService} if you need only a single execution thread. 047 * 048 * @author Jesse Wilson 049 * @author Luke Sandberg 050 * @since 1.0 051 */ 052@GwtIncompatible 053@J2ktIncompatible 054public abstract class AbstractService implements Service { 055 private static final ListenerCallQueue.Event<Listener> STARTING_EVENT = 056 new ListenerCallQueue.Event<Listener>() { 057 @Override 058 public void call(Listener listener) { 059 listener.starting(); 060 } 061 062 @Override 063 public String toString() { 064 return "starting()"; 065 } 066 }; 067 private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT = 068 new ListenerCallQueue.Event<Listener>() { 069 @Override 070 public void call(Listener listener) { 071 listener.running(); 072 } 073 074 @Override 075 public String toString() { 076 return "running()"; 077 } 078 }; 079 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT = 080 stoppingEvent(STARTING); 081 private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT = 082 stoppingEvent(RUNNING); 083 084 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT = 085 terminatedEvent(NEW); 086 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT = 087 terminatedEvent(STARTING); 088 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT = 089 terminatedEvent(RUNNING); 090 private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT = 091 terminatedEvent(STOPPING); 092 093 private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) { 094 return new ListenerCallQueue.Event<Listener>() { 095 @Override 096 public void call(Listener listener) { 097 listener.terminated(from); 098 } 099 100 @Override 101 public String toString() { 102 return "terminated({from = " + from + "})"; 103 } 104 }; 105 } 106 107 private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) { 108 return new ListenerCallQueue.Event<Listener>() { 109 @Override 110 public void call(Listener listener) { 111 listener.stopping(from); 112 } 113 114 @Override 115 public String toString() { 116 return "stopping({from = " + from + "})"; 117 } 118 }; 119 } 120 121 private final Monitor monitor = new Monitor(); 122 123 private final Guard isStartable = new IsStartableGuard(); 124 125 @WeakOuter 126 private final class IsStartableGuard extends Guard { 127 IsStartableGuard() { 128 super(AbstractService.this.monitor); 129 } 130 131 @Override 132 public boolean isSatisfied() { 133 return state() == NEW; 134 } 135 } 136 137 private final Guard isStoppable = new IsStoppableGuard(); 138 139 @WeakOuter 140 private final class IsStoppableGuard extends Guard { 141 IsStoppableGuard() { 142 super(AbstractService.this.monitor); 143 } 144 145 @Override 146 public boolean isSatisfied() { 147 return state().compareTo(RUNNING) <= 0; 148 } 149 } 150 151 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 152 153 @WeakOuter 154 private final class HasReachedRunningGuard extends Guard { 155 HasReachedRunningGuard() { 156 super(AbstractService.this.monitor); 157 } 158 159 @Override 160 public boolean isSatisfied() { 161 return state().compareTo(RUNNING) >= 0; 162 } 163 } 164 165 private final Guard isStopped = new IsStoppedGuard(); 166 167 @WeakOuter 168 private final class IsStoppedGuard extends Guard { 169 IsStoppedGuard() { 170 super(AbstractService.this.monitor); 171 } 172 173 @Override 174 public boolean isSatisfied() { 175 return state().compareTo(TERMINATED) >= 0; 176 } 177 } 178 179 /** The listeners to notify during a state transition. */ 180 private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>(); 181 182 /** 183 * The current state of the service. This should be written with the lock held but can be read 184 * without it because it is an immutable object in a volatile field. This is desirable so that 185 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 186 * without grabbing the lock. 187 * 188 * <p>To update this field correctly the lock must be held to guarantee that the state is 189 * consistent. 190 */ 191 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 192 193 /** Constructor for use by subclasses. */ 194 protected AbstractService() {} 195 196 /** 197 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 198 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 199 * or after it has returned. If startup fails, the invocation should cause a call to {@link 200 * #notifyFailed(Throwable)} instead. 201 * 202 * <p>This method should return promptly; prefer to do work on a different thread where it is 203 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 204 * called multiple times. 205 */ 206 @ForOverride 207 protected abstract void doStart(); 208 209 /** 210 * This method should be used to initiate service shutdown. The invocation of this method should 211 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 212 * returned. If shutdown fails, the invocation should cause a call to {@link 213 * #notifyFailed(Throwable)} instead. 214 * 215 * <p>This method should return promptly; prefer to do work on a different thread where it is 216 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 217 * called multiple times. 218 * 219 * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not 220 * invoked immediately. Instead, it will be deferred until after the service is {@link 221 * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}. 222 */ 223 @ForOverride 224 protected abstract void doStop(); 225 226 /** 227 * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link 228 * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the 229 * method to cancel pending work and then call {@link #notifyStopped} to stop the service. 230 * 231 * <p>This method should return promptly; prefer to do work on a different thread where it is 232 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 233 * called multiple times. 234 * 235 * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the 236 * external state observable by the caller of {@link #stopAsync}. 237 * 238 * @since 27.0 239 */ 240 @ForOverride 241 protected void doCancelStart() {} 242 243 @CanIgnoreReturnValue 244 @Override 245 public final Service startAsync() { 246 if (monitor.enterIf(isStartable)) { 247 try { 248 snapshot = new StateSnapshot(STARTING); 249 enqueueStartingEvent(); 250 doStart(); 251 } catch (Throwable startupFailure) { 252 restoreInterruptIfIsInterruptedException(startupFailure); 253 notifyFailed(startupFailure); 254 } finally { 255 monitor.leave(); 256 dispatchListenerEvents(); 257 } 258 } else { 259 throw new IllegalStateException("Service " + this + " has already been started"); 260 } 261 return this; 262 } 263 264 @CanIgnoreReturnValue 265 @Override 266 public final Service stopAsync() { 267 if (monitor.enterIf(isStoppable)) { 268 try { 269 State previous = state(); 270 switch (previous) { 271 case NEW: 272 snapshot = new StateSnapshot(TERMINATED); 273 enqueueTerminatedEvent(NEW); 274 break; 275 case STARTING: 276 snapshot = new StateSnapshot(STARTING, true, null); 277 enqueueStoppingEvent(STARTING); 278 doCancelStart(); 279 break; 280 case RUNNING: 281 snapshot = new StateSnapshot(STOPPING); 282 enqueueStoppingEvent(RUNNING); 283 doStop(); 284 break; 285 case STOPPING: 286 case TERMINATED: 287 case FAILED: 288 // These cases are impossible due to the if statement above. 289 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 290 } 291 } catch (Throwable shutdownFailure) { 292 restoreInterruptIfIsInterruptedException(shutdownFailure); 293 notifyFailed(shutdownFailure); 294 } finally { 295 monitor.leave(); 296 dispatchListenerEvents(); 297 } 298 } 299 return this; 300 } 301 302 @Override 303 public final void awaitRunning() { 304 monitor.enterWhenUninterruptibly(hasReachedRunning); 305 try { 306 checkCurrentState(RUNNING); 307 } finally { 308 monitor.leave(); 309 } 310 } 311 312 @Override 313 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 314 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 315 try { 316 checkCurrentState(RUNNING); 317 } finally { 318 monitor.leave(); 319 } 320 } else { 321 // It is possible due to races that we are currently in the expected state even though we 322 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 323 // even check the guard. I don't think we care too much about this use case but it could lead 324 // to a confusing error message. 325 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 326 } 327 } 328 329 @Override 330 public final void awaitTerminated() { 331 monitor.enterWhenUninterruptibly(isStopped); 332 try { 333 checkCurrentState(TERMINATED); 334 } finally { 335 monitor.leave(); 336 } 337 } 338 339 @Override 340 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 341 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 342 try { 343 checkCurrentState(TERMINATED); 344 } finally { 345 monitor.leave(); 346 } 347 } else { 348 // It is possible due to races that we are currently in the expected state even though we 349 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 350 // even check the guard. I don't think we care too much about this use case but it could lead 351 // to a confusing error message. 352 throw new TimeoutException( 353 "Timed out waiting for " 354 + this 355 + " to reach a terminal state. " 356 + "Current state: " 357 + state()); 358 } 359 } 360 361 /** Checks that the current state is equal to the expected state. */ 362 @GuardedBy("monitor") 363 private void checkCurrentState(State expected) { 364 State actual = state(); 365 if (actual != expected) { 366 if (actual == FAILED) { 367 // Handle this specially so that we can include the failureCause, if there is one. 368 throw new IllegalStateException( 369 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 370 failureCause()); 371 } 372 throw new IllegalStateException( 373 "Expected the service " + this + " to be " + expected + ", but was " + actual); 374 } 375 } 376 377 /** 378 * Implementing classes should invoke this method once their service has started. It will cause 379 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 380 * 381 * @throws IllegalStateException if the service is not {@link State#STARTING}. 382 */ 383 protected final void notifyStarted() { 384 monitor.enter(); 385 try { 386 // We have to examine the internal state of the snapshot here to properly handle the stop 387 // while starting case. 388 if (snapshot.state != STARTING) { 389 IllegalStateException failure = 390 new IllegalStateException( 391 "Cannot notifyStarted() when the service is " + snapshot.state); 392 notifyFailed(failure); 393 throw failure; 394 } 395 396 if (snapshot.shutdownWhenStartupFinishes) { 397 snapshot = new StateSnapshot(STOPPING); 398 // We don't call listeners here because we already did that when we set the 399 // shutdownWhenStartupFinishes flag. 400 doStop(); 401 } else { 402 snapshot = new StateSnapshot(RUNNING); 403 enqueueRunningEvent(); 404 } 405 } finally { 406 monitor.leave(); 407 dispatchListenerEvents(); 408 } 409 } 410 411 /** 412 * Implementing classes should invoke this method once their service has stopped. It will cause 413 * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link 414 * State#TERMINATED}. 415 * 416 * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link 417 * State#STARTING}, or {@link State#RUNNING}. 418 */ 419 protected final void notifyStopped() { 420 monitor.enter(); 421 try { 422 State previous = state(); 423 switch (previous) { 424 case NEW: 425 case TERMINATED: 426 case FAILED: 427 throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 428 case RUNNING: 429 case STARTING: 430 case STOPPING: 431 snapshot = new StateSnapshot(TERMINATED); 432 enqueueTerminatedEvent(previous); 433 break; 434 } 435 } finally { 436 monitor.leave(); 437 dispatchListenerEvents(); 438 } 439 } 440 441 /** 442 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 443 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 444 * or otherwise cannot be started nor stopped. 445 */ 446 protected final void notifyFailed(Throwable cause) { 447 checkNotNull(cause); 448 449 monitor.enter(); 450 try { 451 State previous = state(); 452 switch (previous) { 453 case NEW: 454 case TERMINATED: 455 throw new IllegalStateException("Failed while in state:" + previous, cause); 456 case RUNNING: 457 case STARTING: 458 case STOPPING: 459 snapshot = new StateSnapshot(FAILED, false, cause); 460 enqueueFailedEvent(previous, cause); 461 break; 462 case FAILED: 463 // Do nothing 464 break; 465 } 466 } finally { 467 monitor.leave(); 468 dispatchListenerEvents(); 469 } 470 } 471 472 @Override 473 public final boolean isRunning() { 474 return state() == RUNNING; 475 } 476 477 @Override 478 public final State state() { 479 return snapshot.externalState(); 480 } 481 482 /** @since 14.0 */ 483 @Override 484 public final Throwable failureCause() { 485 return snapshot.failureCause(); 486 } 487 488 /** @since 13.0 */ 489 @Override 490 public final void addListener(Listener listener, Executor executor) { 491 listeners.addListener(listener, executor); 492 } 493 494 @Override 495 public String toString() { 496 return getClass().getSimpleName() + " [" + state() + "]"; 497 } 498 499 /** 500 * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link 501 * #monitor}. 502 */ 503 private void dispatchListenerEvents() { 504 if (!monitor.isOccupiedByCurrentThread()) { 505 listeners.dispatch(); 506 } 507 } 508 509 private void enqueueStartingEvent() { 510 listeners.enqueue(STARTING_EVENT); 511 } 512 513 private void enqueueRunningEvent() { 514 listeners.enqueue(RUNNING_EVENT); 515 } 516 517 private void enqueueStoppingEvent(final State from) { 518 if (from == State.STARTING) { 519 listeners.enqueue(STOPPING_FROM_STARTING_EVENT); 520 } else if (from == State.RUNNING) { 521 listeners.enqueue(STOPPING_FROM_RUNNING_EVENT); 522 } else { 523 throw new AssertionError(); 524 } 525 } 526 527 private void enqueueTerminatedEvent(final State from) { 528 switch (from) { 529 case NEW: 530 listeners.enqueue(TERMINATED_FROM_NEW_EVENT); 531 break; 532 case STARTING: 533 listeners.enqueue(TERMINATED_FROM_STARTING_EVENT); 534 break; 535 case RUNNING: 536 listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT); 537 break; 538 case STOPPING: 539 listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT); 540 break; 541 case TERMINATED: 542 case FAILED: 543 throw new AssertionError(); 544 } 545 } 546 547 private void enqueueFailedEvent(final State from, final Throwable cause) { 548 // can't memoize this one due to the exception 549 listeners.enqueue( 550 new ListenerCallQueue.Event<Listener>() { 551 @Override 552 public void call(Listener listener) { 553 listener.failed(from, cause); 554 } 555 556 @Override 557 public String toString() { 558 return "failed({from = " + from + ", cause = " + cause + "})"; 559 } 560 }); 561 } 562 563 /** 564 * An immutable snapshot of the current state of the service. This class represents a consistent 565 * snapshot of the state and therefore it can be used to answer simple queries without needing to 566 * grab a lock. 567 */ 568 // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses). 569 private static final class StateSnapshot { 570 /** 571 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 572 */ 573 final State state; 574 575 /** If true, the user requested a shutdown while the service was still starting up. */ 576 final boolean shutdownWhenStartupFinishes; 577 578 /** 579 * The exception that caused this service to fail. This will be {@code null} unless the service 580 * has failed. 581 */ 582 final @Nullable Throwable failure; 583 584 StateSnapshot(State internalState) { 585 this(internalState, false, null); 586 } 587 588 StateSnapshot( 589 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 590 checkArgument( 591 !shutdownWhenStartupFinishes || internalState == STARTING, 592 "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 593 internalState); 594 checkArgument( 595 (failure != null) == (internalState == FAILED), 596 "A failure cause should be set if and only if the state is failed. Got %s and %s " 597 + "instead.", 598 internalState, 599 failure); 600 this.state = internalState; 601 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 602 this.failure = failure; 603 } 604 605 /** @see Service#state() */ 606 State externalState() { 607 if (shutdownWhenStartupFinishes && state == STARTING) { 608 return STOPPING; 609 } else { 610 return state; 611 } 612 } 613 614 /** @see Service#failureCause() */ 615 Throwable failureCause() { 616 checkState( 617 state == FAILED, 618 "failureCause() is only valid if the service has failed, service is %s", 619 state); 620 // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED. 621 return requireNonNull(failure); 622 } 623 } 624}