001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.base.Preconditions.checkState; 022import static com.google.common.util.concurrent.Service.State.FAILED; 023import static com.google.common.util.concurrent.Service.State.NEW; 024import static com.google.common.util.concurrent.Service.State.RUNNING; 025import static com.google.common.util.concurrent.Service.State.STARTING; 026import static com.google.common.util.concurrent.Service.State.STOPPING; 027import static com.google.common.util.concurrent.Service.State.TERMINATED; 028 029import com.google.common.annotations.Beta; 030import com.google.common.collect.Lists; 031import com.google.common.util.concurrent.Monitor.Guard; 032import com.google.common.util.concurrent.Service.State; // javadoc needs this 033 034import java.util.List; 035import java.util.concurrent.ExecutionException; 036import java.util.concurrent.Executor; 037import java.util.concurrent.TimeUnit; 038import java.util.concurrent.TimeoutException; 039import javax.annotation.Nullable; 040import javax.annotation.concurrent.GuardedBy; 041import javax.annotation.concurrent.Immutable; 042 043/** 044 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 045 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 046 * callbacks. Its subclasses must manage threads manually; consider 047 * {@link AbstractExecutionThreadService} if you need only a single execution thread. 048 * 049 * @author Jesse Wilson 050 * @author Luke Sandberg 051 * @since 1.0 052 */ 053@Beta 054public abstract class AbstractService implements Service { 055 private final Monitor monitor = new Monitor(); 056 057 private final Transition startup = new Transition(); 058 private final Transition shutdown = new Transition(); 059 060 private final Guard isStartable = new Guard(monitor) { 061 @Override public boolean isSatisfied() { 062 return state() == NEW; 063 } 064 }; 065 066 private final Guard isStoppable = new Guard(monitor) { 067 @Override public boolean isSatisfied() { 068 return state().compareTo(RUNNING) <= 0; 069 } 070 }; 071 072 private final Guard hasReachedRunning = new Guard(monitor) { 073 @Override public boolean isSatisfied() { 074 return state().compareTo(RUNNING) >= 0; 075 } 076 }; 077 078 private final Guard isStopped = new Guard(monitor) { 079 @Override public boolean isSatisfied() { 080 return state().isTerminal(); 081 } 082 }; 083 084 /** 085 * The listeners to notify during a state transition. 086 */ 087 @GuardedBy("monitor") 088 private final List<ListenerExecutorPair> listeners = Lists.newArrayList(); 089 090 /** 091 * The queue of listeners that are waiting to be executed. 092 * 093 * <p>Enqueue operations should be protected by {@link #monitor} while calling 094 * {@link ExecutionQueue#execute()} should not be protected. 095 */ 096 private final ExecutionQueue queuedListeners = new ExecutionQueue(); 097 098 /** 099 * The current state of the service. This should be written with the lock held but can be read 100 * without it because it is an immutable object in a volatile field. This is desirable so that 101 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 102 * without grabbing the lock. 103 * 104 * <p>To update this field correctly the lock must be held to guarantee that the state is 105 * consistent. 106 */ 107 @GuardedBy("monitor") 108 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 109 110 /** Constructor for use by subclasses. */ 111 protected AbstractService() { 112 // Add a listener to update the futures. This needs to be added first so that it is executed 113 // before the other listeners. This way the other listeners can access the completed futures. 114 addListener( 115 new Listener() { 116 @Override public void running() { 117 startup.set(RUNNING); 118 } 119 120 @Override public void stopping(State from) { 121 if (from == STARTING) { 122 startup.set(STOPPING); 123 } 124 } 125 126 @Override public void terminated(State from) { 127 if (from == NEW) { 128 startup.set(TERMINATED); 129 } 130 shutdown.set(TERMINATED); 131 } 132 133 @Override public void failed(State from, Throwable failure) { 134 switch (from) { 135 case STARTING: 136 startup.setException(failure); 137 shutdown.setException(new Exception("Service failed to start.", failure)); 138 break; 139 case RUNNING: 140 shutdown.setException(new Exception("Service failed while running", failure)); 141 break; 142 case STOPPING: 143 shutdown.setException(failure); 144 break; 145 case TERMINATED: /* fall-through */ 146 case FAILED: /* fall-through */ 147 case NEW: /* fall-through */ 148 default: 149 throw new AssertionError("Unexpected from state: " + from); 150 } 151 } 152 }, 153 MoreExecutors.sameThreadExecutor()); 154 } 155 156 /** 157 * This method is called by {@link #start} to initiate service startup. The invocation of this 158 * method should cause a call to {@link #notifyStarted()}, either during this method's run, or 159 * after it has returned. If startup fails, the invocation should cause a call to 160 * {@link #notifyFailed(Throwable)} instead. 161 * 162 * <p>This method should return promptly; prefer to do work on a different thread where it is 163 * convenient. It is invoked exactly once on service startup, even when {@link #start} is called 164 * multiple times. 165 */ 166 protected abstract void doStart(); 167 168 /** 169 * This method should be used to initiate service shutdown. The invocation of this method should 170 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 171 * returned. If shutdown fails, the invocation should cause a call to 172 * {@link #notifyFailed(Throwable)} instead. 173 * 174 * <p> This method should return promptly; prefer to do work on a different thread where it is 175 * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called 176 * multiple times. 177 */ 178 protected abstract void doStop(); 179 180 @Override public final Service startAsync() { 181 if (monitor.enterIf(isStartable)) { 182 try { 183 snapshot = new StateSnapshot(STARTING); 184 starting(); 185 doStart(); 186 // TODO(user): justify why we are catching Throwable and not RuntimeException 187 } catch (Throwable startupFailure) { 188 notifyFailed(startupFailure); 189 } finally { 190 monitor.leave(); 191 executeListeners(); 192 } 193 } else { 194 throw new IllegalStateException("Service " + this + " has already been started"); 195 } 196 return this; 197 } 198 199 @Deprecated 200 @Override 201 public final ListenableFuture<State> start() { 202 if (monitor.enterIf(isStartable)) { 203 try { 204 snapshot = new StateSnapshot(STARTING); 205 starting(); 206 doStart(); 207 } catch (Throwable startupFailure) { 208 notifyFailed(startupFailure); 209 } finally { 210 monitor.leave(); 211 executeListeners(); 212 } 213 } 214 return startup; 215 } 216 217 @Override public final Service stopAsync() { 218 stop(); 219 return this; 220 } 221 222 @Deprecated 223 @Override 224 public final ListenableFuture<State> stop() { 225 if (monitor.enterIf(isStoppable)) { 226 try { 227 State previous = state(); 228 switch (previous) { 229 case NEW: 230 snapshot = new StateSnapshot(TERMINATED); 231 terminated(NEW); 232 break; 233 case STARTING: 234 snapshot = new StateSnapshot(STARTING, true, null); 235 stopping(STARTING); 236 break; 237 case RUNNING: 238 snapshot = new StateSnapshot(STOPPING); 239 stopping(RUNNING); 240 doStop(); 241 break; 242 case STOPPING: 243 case TERMINATED: 244 case FAILED: 245 // These cases are impossible due to the if statement above. 246 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 247 default: 248 throw new AssertionError("Unexpected state: " + previous); 249 } 250 // TODO(user): justify why we are catching Throwable and not RuntimeException. Also, we 251 // may inadvertently catch our AssertionErrors. 252 } catch (Throwable shutdownFailure) { 253 notifyFailed(shutdownFailure); 254 } finally { 255 monitor.leave(); 256 executeListeners(); 257 } 258 } 259 return shutdown; 260 } 261 262 @Deprecated 263 @Override 264 public State startAndWait() { 265 return Futures.getUnchecked(start()); 266 } 267 268 @Deprecated 269 @Override 270 public State stopAndWait() { 271 return Futures.getUnchecked(stop()); 272 } 273 274 @Override public final void awaitRunning() { 275 monitor.enterWhenUninterruptibly(hasReachedRunning); 276 try { 277 checkCurrentState(RUNNING); 278 } finally { 279 monitor.leave(); 280 } 281 } 282 283 @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 284 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 285 try { 286 checkCurrentState(RUNNING); 287 } finally { 288 monitor.leave(); 289 } 290 } else { 291 // It is possible due to races the we are currently in the expected state even though we 292 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 293 // even check the guard. I don't think we care too much about this use case but it could lead 294 // to a confusing error message. 295 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state. " 296 + "Current state: " + state()); 297 } 298 } 299 300 @Override public final void awaitTerminated() { 301 monitor.enterWhenUninterruptibly(isStopped); 302 try { 303 checkCurrentState(TERMINATED); 304 } finally { 305 monitor.leave(); 306 } 307 } 308 309 @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 310 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 311 try { 312 State state = state(); 313 checkCurrentState(TERMINATED); 314 } finally { 315 monitor.leave(); 316 } 317 } else { 318 // It is possible due to races the we are currently in the expected state even though we 319 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 320 // even check the guard. I don't think we care too much about this use case but it could lead 321 // to a confusing error message. 322 throw new TimeoutException("Timed out waiting for " + this + " to reach a terminal state. " 323 + "Current state: " + state()); 324 } 325 } 326 327 /** Checks that the current state is equal to the expected state. */ 328 @GuardedBy("monitor") 329 private void checkCurrentState(State expected) { 330 State actual = state(); 331 if (actual != expected) { 332 if (actual == FAILED) { 333 // Handle this specially so that we can include the failureCause, if there is one. 334 throw new IllegalStateException("Expected the service to be " + expected 335 + ", but the service has FAILED", failureCause()); 336 } 337 throw new IllegalStateException("Expected the service to be " + expected + ", but was " 338 + actual); 339 } 340 } 341 342 /** 343 * Implementing classes should invoke this method once their service has started. It will cause 344 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 345 * 346 * @throws IllegalStateException if the service is not {@link State#STARTING}. 347 */ 348 protected final void notifyStarted() { 349 monitor.enter(); 350 try { 351 // We have to examine the internal state of the snapshot here to properly handle the stop 352 // while starting case. 353 if (snapshot.state != STARTING) { 354 IllegalStateException failure = new IllegalStateException( 355 "Cannot notifyStarted() when the service is " + snapshot.state); 356 notifyFailed(failure); 357 throw failure; 358 } 359 360 if (snapshot.shutdownWhenStartupFinishes) { 361 snapshot = new StateSnapshot(STOPPING); 362 // We don't call listeners here because we already did that when we set the 363 // shutdownWhenStartupFinishes flag. 364 doStop(); 365 } else { 366 snapshot = new StateSnapshot(RUNNING); 367 running(); 368 } 369 } finally { 370 monitor.leave(); 371 executeListeners(); 372 } 373 } 374 375 /** 376 * Implementing classes should invoke this method once their service has stopped. It will cause 377 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. 378 * 379 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor 380 * {@link State#RUNNING}. 381 */ 382 protected final void notifyStopped() { 383 monitor.enter(); 384 try { 385 // We check the internal state of the snapshot instead of state() directly so we don't allow 386 // notifyStopped() to be called while STARTING, even if stop() has already been called. 387 State previous = snapshot.state; 388 if (previous != STOPPING && previous != RUNNING) { 389 IllegalStateException failure = new IllegalStateException( 390 "Cannot notifyStopped() when the service is " + previous); 391 notifyFailed(failure); 392 throw failure; 393 } 394 snapshot = new StateSnapshot(TERMINATED); 395 terminated(previous); 396 } finally { 397 monitor.leave(); 398 executeListeners(); 399 } 400 } 401 402 /** 403 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 404 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 405 * or otherwise cannot be started nor stopped. 406 */ 407 protected final void notifyFailed(Throwable cause) { 408 checkNotNull(cause); 409 410 monitor.enter(); 411 try { 412 State previous = state(); 413 switch (previous) { 414 case NEW: 415 case TERMINATED: 416 throw new IllegalStateException("Failed while in state:" + previous, cause); 417 case RUNNING: 418 case STARTING: 419 case STOPPING: 420 snapshot = new StateSnapshot(FAILED, false, cause); 421 failed(previous, cause); 422 break; 423 case FAILED: 424 // Do nothing 425 break; 426 default: 427 throw new AssertionError("Unexpected state: " + previous); 428 } 429 } finally { 430 monitor.leave(); 431 executeListeners(); 432 } 433 } 434 435 @Override 436 public final boolean isRunning() { 437 return state() == RUNNING; 438 } 439 440 @Override 441 public final State state() { 442 return snapshot.externalState(); 443 } 444 445 /** 446 * @since 14.0 447 */ 448 @Override 449 public final Throwable failureCause() { 450 return snapshot.failureCause(); 451 } 452 453 /** 454 * @since 13.0 455 */ 456 @Override 457 public final void addListener(Listener listener, Executor executor) { 458 checkNotNull(listener, "listener"); 459 checkNotNull(executor, "executor"); 460 monitor.enter(); 461 try { 462 State currentState = state(); 463 if (currentState != TERMINATED && currentState != FAILED) { 464 listeners.add(new ListenerExecutorPair(listener, executor)); 465 } 466 } finally { 467 monitor.leave(); 468 } 469 } 470 471 @Override public String toString() { 472 return getClass().getSimpleName() + " [" + state() + "]"; 473 } 474 475 /** 476 * A change from one service state to another, plus the result of the change. 477 */ 478 private class Transition extends AbstractFuture<State> { 479 @Override 480 public State get(long timeout, TimeUnit unit) 481 throws InterruptedException, TimeoutException, ExecutionException { 482 try { 483 return super.get(timeout, unit); 484 } catch (TimeoutException e) { 485 throw new TimeoutException(AbstractService.this.toString()); 486 } 487 } 488 } 489 490 /** 491 * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the 492 * {@link #monitor}. 493 */ 494 private void executeListeners() { 495 if (!monitor.isOccupiedByCurrentThread()) { 496 queuedListeners.execute(); 497 } 498 } 499 500 @GuardedBy("monitor") 501 private void starting() { 502 for (final ListenerExecutorPair pair : listeners) { 503 queuedListeners.add(new Runnable() { 504 @Override public void run() { 505 pair.listener.starting(); 506 } 507 }, pair.executor); 508 } 509 } 510 511 @GuardedBy("monitor") 512 private void running() { 513 for (final ListenerExecutorPair pair : listeners) { 514 queuedListeners.add(new Runnable() { 515 @Override public void run() { 516 pair.listener.running(); 517 } 518 }, pair.executor); 519 } 520 } 521 522 @GuardedBy("monitor") 523 private void stopping(final State from) { 524 for (final ListenerExecutorPair pair : listeners) { 525 queuedListeners.add(new Runnable() { 526 @Override public void run() { 527 pair.listener.stopping(from); 528 } 529 }, pair.executor); 530 } 531 } 532 533 @GuardedBy("monitor") 534 private void terminated(final State from) { 535 for (final ListenerExecutorPair pair : listeners) { 536 queuedListeners.add(new Runnable() { 537 @Override public void run() { 538 pair.listener.terminated(from); 539 } 540 }, pair.executor); 541 } 542 // There are no more state transitions so we can clear this out. 543 listeners.clear(); 544 } 545 546 @GuardedBy("monitor") 547 private void failed(final State from, final Throwable cause) { 548 for (final ListenerExecutorPair pair : listeners) { 549 queuedListeners.add(new Runnable() { 550 @Override public void run() { 551 pair.listener.failed(from, cause); 552 } 553 }, pair.executor); 554 } 555 // There are no more state transitions so we can clear this out. 556 listeners.clear(); 557 } 558 559 /** A simple holder for a listener and its executor. */ 560 private static class ListenerExecutorPair { 561 final Listener listener; 562 final Executor executor; 563 564 ListenerExecutorPair(Listener listener, Executor executor) { 565 this.listener = listener; 566 this.executor = executor; 567 } 568 } 569 570 /** 571 * An immutable snapshot of the current state of the service. This class represents a consistent 572 * snapshot of the state and therefore it can be used to answer simple queries without needing to 573 * grab a lock. 574 */ 575 @Immutable 576 private static final class StateSnapshot { 577 /** 578 * The internal state, which equals external state unless 579 * shutdownWhenStartupFinishes is true. 580 */ 581 final State state; 582 583 /** 584 * If true, the user requested a shutdown while the service was still starting 585 * up. 586 */ 587 final boolean shutdownWhenStartupFinishes; 588 589 /** 590 * The exception that caused this service to fail. This will be {@code null} 591 * unless the service has failed. 592 */ 593 @Nullable 594 final Throwable failure; 595 596 StateSnapshot(State internalState) { 597 this(internalState, false, null); 598 } 599 600 StateSnapshot( 601 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 602 checkArgument(!shutdownWhenStartupFinishes || internalState == STARTING, 603 "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 604 internalState); 605 checkArgument(!(failure != null ^ internalState == FAILED), 606 "A failure cause should be set if and only if the state is failed. Got %s and %s " 607 + "instead.", internalState, failure); 608 this.state = internalState; 609 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 610 this.failure = failure; 611 } 612 613 /** @see Service#state() */ 614 State externalState() { 615 if (shutdownWhenStartupFinishes && state == STARTING) { 616 return STOPPING; 617 } else { 618 return state; 619 } 620 } 621 622 /** @see Service#failureCause() */ 623 Throwable failureCause() { 624 checkState(state == FAILED, 625 "failureCause() is only valid if the service has failed, service is %s", state); 626 return failure; 627 } 628 } 629}