001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.base.Preconditions.checkState; 020import static com.google.common.util.concurrent.Service.State.FAILED; 021import static com.google.common.util.concurrent.Service.State.NEW; 022import static com.google.common.util.concurrent.Service.State.RUNNING; 023import static com.google.common.util.concurrent.Service.State.STARTING; 024import static com.google.common.util.concurrent.Service.State.STOPPING; 025import static com.google.common.util.concurrent.Service.State.TERMINATED; 026 027import com.google.common.annotations.Beta; 028import com.google.common.annotations.GwtIncompatible; 029import com.google.common.util.concurrent.ListenerCallQueue.Callback; 030import com.google.common.util.concurrent.Monitor.Guard; 031import com.google.common.util.concurrent.Service.State; // javadoc needs this 032import com.google.errorprone.annotations.CanIgnoreReturnValue; 033import com.google.j2objc.annotations.WeakOuter; 034import java.util.ArrayList; 035import java.util.Collections; 036import java.util.List; 037import java.util.concurrent.Executor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040import javax.annotation.Nullable; 041import javax.annotation.concurrent.GuardedBy; 042import javax.annotation.concurrent.Immutable; 043 044/** 045 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 046 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 047 * callbacks. Its subclasses must manage threads manually; consider {@link 048 * AbstractExecutionThreadService} if you need only a single execution thread. 049 * 050 * @author Jesse Wilson 051 * @author Luke Sandberg 052 * @since 1.0 053 */ 054@Beta 055@GwtIncompatible 056public abstract class AbstractService implements Service { 057 private static final Callback<Listener> STARTING_CALLBACK = 058 new Callback<Listener>("starting()") { 059 @Override 060 void call(Listener listener) { 061 listener.starting(); 062 } 063 }; 064 private static final Callback<Listener> RUNNING_CALLBACK = 065 new Callback<Listener>("running()") { 066 @Override 067 void call(Listener listener) { 068 listener.running(); 069 } 070 }; 071 private static final Callback<Listener> STOPPING_FROM_STARTING_CALLBACK = 072 stoppingCallback(STARTING); 073 private static final Callback<Listener> STOPPING_FROM_RUNNING_CALLBACK = 074 stoppingCallback(RUNNING); 075 076 private static final Callback<Listener> TERMINATED_FROM_NEW_CALLBACK = terminatedCallback(NEW); 077 private static final Callback<Listener> TERMINATED_FROM_RUNNING_CALLBACK = 078 terminatedCallback(RUNNING); 079 private static final Callback<Listener> TERMINATED_FROM_STOPPING_CALLBACK = 080 terminatedCallback(STOPPING); 081 082 private static Callback<Listener> terminatedCallback(final State from) { 083 return new Callback<Listener>("terminated({from = " + from + "})") { 084 @Override 085 void call(Listener listener) { 086 listener.terminated(from); 087 } 088 }; 089 } 090 091 private static Callback<Listener> stoppingCallback(final State from) { 092 return new Callback<Listener>("stopping({from = " + from + "})") { 093 @Override 094 void call(Listener listener) { 095 listener.stopping(from); 096 } 097 }; 098 } 099 100 private final Monitor monitor = new Monitor(); 101 102 private final Guard isStartable = new IsStartableGuard(); 103 104 @WeakOuter 105 private final class IsStartableGuard extends Guard { 106 IsStartableGuard() { 107 super(AbstractService.this.monitor); 108 } 109 110 @Override 111 public boolean isSatisfied() { 112 return state() == NEW; 113 } 114 } 115 116 private final Guard isStoppable = new IsStoppableGuard(); 117 118 @WeakOuter 119 private final class IsStoppableGuard extends Guard { 120 IsStoppableGuard() { 121 super(AbstractService.this.monitor); 122 } 123 124 @Override 125 public boolean isSatisfied() { 126 return state().compareTo(RUNNING) <= 0; 127 } 128 } 129 130 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 131 132 @WeakOuter 133 private final class HasReachedRunningGuard extends Guard { 134 HasReachedRunningGuard() { 135 super(AbstractService.this.monitor); 136 } 137 138 @Override 139 public boolean isSatisfied() { 140 return state().compareTo(RUNNING) >= 0; 141 } 142 } 143 144 private final Guard isStopped = new IsStoppedGuard(); 145 146 @WeakOuter 147 private final class IsStoppedGuard extends Guard { 148 IsStoppedGuard() { 149 super(AbstractService.this.monitor); 150 } 151 152 @Override 153 public boolean isSatisfied() { 154 return state().isTerminal(); 155 } 156 } 157 158 /** 159 * The listeners to notify during a state transition. 160 */ 161 @GuardedBy("monitor") 162 private final List<ListenerCallQueue<Listener>> listeners = 163 Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>()); 164 165 /** 166 * The current state of the service. This should be written with the lock held but can be read 167 * without it because it is an immutable object in a volatile field. This is desirable so that 168 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 169 * without grabbing the lock. 170 * 171 * <p>To update this field correctly the lock must be held to guarantee that the state is 172 * consistent. 173 */ 174 @GuardedBy("monitor") 175 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 176 177 /** Constructor for use by subclasses. */ 178 protected AbstractService() {} 179 180 /** 181 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 182 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 183 * or after it has returned. If startup fails, the invocation should cause a call to {@link 184 * #notifyFailed(Throwable)} instead. 185 * 186 * <p>This method should return promptly; prefer to do work on a different thread where it is 187 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 188 * called multiple times. 189 */ 190 protected abstract void doStart(); 191 192 /** 193 * This method should be used to initiate service shutdown. The invocation of this method should 194 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 195 * returned. If shutdown fails, the invocation should cause a call to {@link 196 * #notifyFailed(Throwable)} instead. 197 * 198 * <p>This method should return promptly; prefer to do work on a different thread where it is 199 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 200 * called multiple times. 201 */ 202 protected abstract void doStop(); 203 204 @CanIgnoreReturnValue 205 @Override 206 public final Service startAsync() { 207 if (monitor.enterIf(isStartable)) { 208 try { 209 snapshot = new StateSnapshot(STARTING); 210 starting(); 211 doStart(); 212 } catch (Throwable startupFailure) { 213 notifyFailed(startupFailure); 214 } finally { 215 monitor.leave(); 216 executeListeners(); 217 } 218 } else { 219 throw new IllegalStateException("Service " + this + " has already been started"); 220 } 221 return this; 222 } 223 224 @CanIgnoreReturnValue 225 @Override 226 public final Service stopAsync() { 227 if (monitor.enterIf(isStoppable)) { 228 try { 229 State previous = state(); 230 switch (previous) { 231 case NEW: 232 snapshot = new StateSnapshot(TERMINATED); 233 terminated(NEW); 234 break; 235 case STARTING: 236 snapshot = new StateSnapshot(STARTING, true, null); 237 stopping(STARTING); 238 break; 239 case RUNNING: 240 snapshot = new StateSnapshot(STOPPING); 241 stopping(RUNNING); 242 doStop(); 243 break; 244 case STOPPING: 245 case TERMINATED: 246 case FAILED: 247 // These cases are impossible due to the if statement above. 248 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 249 default: 250 throw new AssertionError("Unexpected state: " + previous); 251 } 252 } catch (Throwable shutdownFailure) { 253 notifyFailed(shutdownFailure); 254 } finally { 255 monitor.leave(); 256 executeListeners(); 257 } 258 } 259 return this; 260 } 261 262 @Override 263 public final void awaitRunning() { 264 monitor.enterWhenUninterruptibly(hasReachedRunning); 265 try { 266 checkCurrentState(RUNNING); 267 } finally { 268 monitor.leave(); 269 } 270 } 271 272 @Override 273 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 274 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 275 try { 276 checkCurrentState(RUNNING); 277 } finally { 278 monitor.leave(); 279 } 280 } else { 281 // It is possible due to races the we are currently in the expected state even though we 282 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 283 // even check the guard. I don't think we care too much about this use case but it could lead 284 // to a confusing error message. 285 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 286 } 287 } 288 289 @Override 290 public final void awaitTerminated() { 291 monitor.enterWhenUninterruptibly(isStopped); 292 try { 293 checkCurrentState(TERMINATED); 294 } finally { 295 monitor.leave(); 296 } 297 } 298 299 @Override 300 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 301 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 302 try { 303 checkCurrentState(TERMINATED); 304 } finally { 305 monitor.leave(); 306 } 307 } else { 308 // It is possible due to races the we are currently in the expected state even though we 309 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 310 // even check the guard. I don't think we care too much about this use case but it could lead 311 // to a confusing error message. 312 throw new TimeoutException( 313 "Timed out waiting for " 314 + this 315 + " to reach a terminal state. " 316 + "Current state: " 317 + state()); 318 } 319 } 320 321 /** Checks that the current state is equal to the expected state. */ 322 @GuardedBy("monitor") 323 private void checkCurrentState(State expected) { 324 State actual = state(); 325 if (actual != expected) { 326 if (actual == FAILED) { 327 // Handle this specially so that we can include the failureCause, if there is one. 328 throw new IllegalStateException( 329 "Expected the service " + this + " to be " + expected + ", but the service has FAILED", 330 failureCause()); 331 } 332 throw new IllegalStateException( 333 "Expected the service " + this + " to be " + expected + ", but was " + actual); 334 } 335 } 336 337 /** 338 * Implementing classes should invoke this method once their service has started. It will cause 339 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 340 * 341 * @throws IllegalStateException if the service is not {@link State#STARTING}. 342 */ 343 protected final void notifyStarted() { 344 monitor.enter(); 345 try { 346 // We have to examine the internal state of the snapshot here to properly handle the stop 347 // while starting case. 348 if (snapshot.state != STARTING) { 349 IllegalStateException failure = 350 new IllegalStateException( 351 "Cannot notifyStarted() when the service is " + snapshot.state); 352 notifyFailed(failure); 353 throw failure; 354 } 355 356 if (snapshot.shutdownWhenStartupFinishes) { 357 snapshot = new StateSnapshot(STOPPING); 358 // We don't call listeners here because we already did that when we set the 359 // shutdownWhenStartupFinishes flag. 360 doStop(); 361 } else { 362 snapshot = new StateSnapshot(RUNNING); 363 running(); 364 } 365 } finally { 366 monitor.leave(); 367 executeListeners(); 368 } 369 } 370 371 /** 372 * Implementing classes should invoke this method once their service has stopped. It will cause 373 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. 374 * 375 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor {@link 376 * State#RUNNING}. 377 */ 378 protected final void notifyStopped() { 379 monitor.enter(); 380 try { 381 // We check the internal state of the snapshot instead of state() directly so we don't allow 382 // notifyStopped() to be called while STARTING, even if stop() has already been called. 383 State previous = snapshot.state; 384 if (previous != STOPPING && previous != RUNNING) { 385 IllegalStateException failure = 386 new IllegalStateException("Cannot notifyStopped() when the service is " + previous); 387 notifyFailed(failure); 388 throw failure; 389 } 390 snapshot = new StateSnapshot(TERMINATED); 391 terminated(previous); 392 } finally { 393 monitor.leave(); 394 executeListeners(); 395 } 396 } 397 398 /** 399 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 400 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 401 * or otherwise cannot be started nor stopped. 402 */ 403 protected final void notifyFailed(Throwable cause) { 404 checkNotNull(cause); 405 406 monitor.enter(); 407 try { 408 State previous = state(); 409 switch (previous) { 410 case NEW: 411 case TERMINATED: 412 throw new IllegalStateException("Failed while in state:" + previous, cause); 413 case RUNNING: 414 case STARTING: 415 case STOPPING: 416 snapshot = new StateSnapshot(FAILED, false, cause); 417 failed(previous, cause); 418 break; 419 case FAILED: 420 // Do nothing 421 break; 422 default: 423 throw new AssertionError("Unexpected state: " + previous); 424 } 425 } finally { 426 monitor.leave(); 427 executeListeners(); 428 } 429 } 430 431 @Override 432 public final boolean isRunning() { 433 return state() == RUNNING; 434 } 435 436 @Override 437 public final State state() { 438 return snapshot.externalState(); 439 } 440 441 /** 442 * @since 14.0 443 */ 444 @Override 445 public final Throwable failureCause() { 446 return snapshot.failureCause(); 447 } 448 449 /** 450 * @since 13.0 451 */ 452 @Override 453 public final void addListener(Listener listener, Executor executor) { 454 checkNotNull(listener, "listener"); 455 checkNotNull(executor, "executor"); 456 monitor.enter(); 457 try { 458 if (!state().isTerminal()) { 459 listeners.add(new ListenerCallQueue<Listener>(listener, executor)); 460 } 461 } finally { 462 monitor.leave(); 463 } 464 } 465 466 @Override 467 public String toString() { 468 return getClass().getSimpleName() + " [" + state() + "]"; 469 } 470 471 /** 472 * Attempts to execute all the listeners in {@link #listeners} while not holding the 473 * {@link #monitor}. 474 */ 475 private void executeListeners() { 476 if (!monitor.isOccupiedByCurrentThread()) { 477 // iterate by index to avoid concurrent modification exceptions 478 for (int i = 0; i < listeners.size(); i++) { 479 listeners.get(i).execute(); 480 } 481 } 482 } 483 484 @GuardedBy("monitor") 485 private void starting() { 486 STARTING_CALLBACK.enqueueOn(listeners); 487 } 488 489 @GuardedBy("monitor") 490 private void running() { 491 RUNNING_CALLBACK.enqueueOn(listeners); 492 } 493 494 @GuardedBy("monitor") 495 private void stopping(final State from) { 496 if (from == State.STARTING) { 497 STOPPING_FROM_STARTING_CALLBACK.enqueueOn(listeners); 498 } else if (from == State.RUNNING) { 499 STOPPING_FROM_RUNNING_CALLBACK.enqueueOn(listeners); 500 } else { 501 throw new AssertionError(); 502 } 503 } 504 505 @GuardedBy("monitor") 506 private void terminated(final State from) { 507 switch (from) { 508 case NEW: 509 TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners); 510 break; 511 case RUNNING: 512 TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners); 513 break; 514 case STOPPING: 515 TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners); 516 break; 517 case STARTING: 518 case TERMINATED: 519 case FAILED: 520 default: 521 throw new AssertionError(); 522 } 523 } 524 525 @GuardedBy("monitor") 526 private void failed(final State from, final Throwable cause) { 527 // can't memoize this one due to the exception 528 new Callback<Listener>("failed({from = " + from + ", cause = " + cause + "})") { 529 @Override 530 void call(Listener listener) { 531 listener.failed(from, cause); 532 } 533 }.enqueueOn(listeners); 534 } 535 536 /** 537 * An immutable snapshot of the current state of the service. This class represents a consistent 538 * snapshot of the state and therefore it can be used to answer simple queries without needing to 539 * grab a lock. 540 */ 541 @Immutable 542 private static final class StateSnapshot { 543 /** 544 * The internal state, which equals external state unless shutdownWhenStartupFinishes is true. 545 */ 546 final State state; 547 548 /** 549 * If true, the user requested a shutdown while the service was still starting up. 550 */ 551 final boolean shutdownWhenStartupFinishes; 552 553 /** 554 * The exception that caused this service to fail. This will be {@code null} unless the service 555 * has failed. 556 */ 557 @Nullable final Throwable failure; 558 559 StateSnapshot(State internalState) { 560 this(internalState, false, null); 561 } 562 563 StateSnapshot( 564 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 565 checkArgument( 566 !shutdownWhenStartupFinishes || internalState == STARTING, 567 "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 568 internalState); 569 checkArgument( 570 !(failure != null ^ internalState == FAILED), 571 "A failure cause should be set if and only if the state is failed. Got %s and %s " 572 + "instead.", 573 internalState, 574 failure); 575 this.state = internalState; 576 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 577 this.failure = failure; 578 } 579 580 /** @see Service#state() */ 581 State externalState() { 582 if (shutdownWhenStartupFinishes && state == STARTING) { 583 return STOPPING; 584 } else { 585 return state; 586 } 587 } 588 589 /** @see Service#failureCause() */ 590 Throwable failureCause() { 591 checkState( 592 state == FAILED, 593 "failureCause() is only valid if the service has failed, service is %s", 594 state); 595 return failure; 596 } 597 } 598}