001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.base.Preconditions.checkState; 022import static com.google.common.util.concurrent.Service.State.FAILED; 023import static com.google.common.util.concurrent.Service.State.NEW; 024import static com.google.common.util.concurrent.Service.State.RUNNING; 025import static com.google.common.util.concurrent.Service.State.STARTING; 026import static com.google.common.util.concurrent.Service.State.STOPPING; 027import static com.google.common.util.concurrent.Service.State.TERMINATED; 028 029import com.google.common.annotations.Beta; 030import com.google.common.util.concurrent.ListenerCallQueue.Callback; 031import com.google.common.util.concurrent.Monitor.Guard; 032import com.google.common.util.concurrent.Service.State; // javadoc needs this 033 034import java.util.ArrayList; 035import java.util.Collections; 036import java.util.List; 037import java.util.concurrent.Executor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040 041import javax.annotation.Nullable; 042import javax.annotation.concurrent.GuardedBy; 043import javax.annotation.concurrent.Immutable; 044 045/** 046 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 047 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 048 * callbacks. Its subclasses must manage threads manually; consider 049 * {@link AbstractExecutionThreadService} if you need only a single execution thread. 050 * 051 * @author Jesse Wilson 052 * @author Luke Sandberg 053 * @since 1.0 054 */ 055@Beta 056public abstract class AbstractService implements Service { 057 private static final Callback<Listener> STARTING_CALLBACK = 058 new Callback<Listener>("starting()") { 059 @Override void call(Listener listener) { 060 listener.starting(); 061 } 062 }; 063 private static final Callback<Listener> RUNNING_CALLBACK = 064 new Callback<Listener>("running()") { 065 @Override void call(Listener listener) { 066 listener.running(); 067 } 068 }; 069 private static final Callback<Listener> STOPPING_FROM_STARTING_CALLBACK = 070 stoppingCallback(STARTING); 071 private static final Callback<Listener> STOPPING_FROM_RUNNING_CALLBACK = 072 stoppingCallback(RUNNING); 073 074 private static final Callback<Listener> TERMINATED_FROM_NEW_CALLBACK = 075 terminatedCallback(NEW); 076 private static final Callback<Listener> TERMINATED_FROM_RUNNING_CALLBACK = 077 terminatedCallback(RUNNING); 078 private static final Callback<Listener> TERMINATED_FROM_STOPPING_CALLBACK = 079 terminatedCallback(STOPPING); 080 081 private static Callback<Listener> terminatedCallback(final State from) { 082 return new Callback<Listener>("terminated({from = " + from + "})") { 083 @Override void call(Listener listener) { 084 listener.terminated(from); 085 } 086 }; 087 } 088 089 private static Callback<Listener> stoppingCallback(final State from) { 090 return new Callback<Listener>("stopping({from = " + from + "})") { 091 @Override void call(Listener listener) { 092 listener.stopping(from); 093 } 094 }; 095 } 096 097 private final Monitor monitor = new Monitor(); 098 099 private final Guard isStartable = new Guard(monitor) { 100 @Override public boolean isSatisfied() { 101 return state() == NEW; 102 } 103 }; 104 105 private final Guard isStoppable = new Guard(monitor) { 106 @Override public boolean isSatisfied() { 107 return state().compareTo(RUNNING) <= 0; 108 } 109 }; 110 111 private final Guard hasReachedRunning = new Guard(monitor) { 112 @Override public boolean isSatisfied() { 113 return state().compareTo(RUNNING) >= 0; 114 } 115 }; 116 117 private final Guard isStopped = new Guard(monitor) { 118 @Override public boolean isSatisfied() { 119 return state().isTerminal(); 120 } 121 }; 122 123 /** 124 * The listeners to notify during a state transition. 125 */ 126 @GuardedBy("monitor") 127 private final List<ListenerCallQueue<Listener>> listeners = 128 Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>()); 129 130 /** 131 * The current state of the service. This should be written with the lock held but can be read 132 * without it because it is an immutable object in a volatile field. This is desirable so that 133 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 134 * without grabbing the lock. 135 * 136 * <p>To update this field correctly the lock must be held to guarantee that the state is 137 * consistent. 138 */ 139 @GuardedBy("monitor") 140 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 141 142 /** Constructor for use by subclasses. */ 143 protected AbstractService() {} 144 145 /** 146 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 147 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 148 * or after it has returned. If startup fails, the invocation should cause a call to 149 * {@link #notifyFailed(Throwable)} instead. 150 * 151 * <p>This method should return promptly; prefer to do work on a different thread where it is 152 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 153 * called multiple times. 154 */ 155 protected abstract void doStart(); 156 157 /** 158 * This method should be used to initiate service shutdown. The invocation of this method should 159 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 160 * returned. If shutdown fails, the invocation should cause a call to 161 * {@link #notifyFailed(Throwable)} instead. 162 * 163 * <p> This method should return promptly; prefer to do work on a different thread where it is 164 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 165 * called multiple times. 166 */ 167 protected abstract void doStop(); 168 169 @Override public final Service startAsync() { 170 if (monitor.enterIf(isStartable)) { 171 try { 172 snapshot = new StateSnapshot(STARTING); 173 starting(); 174 doStart(); 175 // TODO(user): justify why we are catching Throwable and not RuntimeException 176 } catch (Throwable startupFailure) { 177 notifyFailed(startupFailure); 178 } finally { 179 monitor.leave(); 180 executeListeners(); 181 } 182 } else { 183 throw new IllegalStateException("Service " + this + " has already been started"); 184 } 185 return this; 186 } 187 188 @Override public final Service stopAsync() { 189 if (monitor.enterIf(isStoppable)) { 190 try { 191 State previous = state(); 192 switch (previous) { 193 case NEW: 194 snapshot = new StateSnapshot(TERMINATED); 195 terminated(NEW); 196 break; 197 case STARTING: 198 snapshot = new StateSnapshot(STARTING, true, null); 199 stopping(STARTING); 200 break; 201 case RUNNING: 202 snapshot = new StateSnapshot(STOPPING); 203 stopping(RUNNING); 204 doStop(); 205 break; 206 case STOPPING: 207 case TERMINATED: 208 case FAILED: 209 // These cases are impossible due to the if statement above. 210 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 211 default: 212 throw new AssertionError("Unexpected state: " + previous); 213 } 214 // TODO(user): justify why we are catching Throwable and not RuntimeException. Also, we 215 // may inadvertently catch our AssertionErrors. 216 } catch (Throwable shutdownFailure) { 217 notifyFailed(shutdownFailure); 218 } finally { 219 monitor.leave(); 220 executeListeners(); 221 } 222 } 223 return this; 224 } 225 226 @Override public final void awaitRunning() { 227 monitor.enterWhenUninterruptibly(hasReachedRunning); 228 try { 229 checkCurrentState(RUNNING); 230 } finally { 231 monitor.leave(); 232 } 233 } 234 235 @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 236 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 237 try { 238 checkCurrentState(RUNNING); 239 } finally { 240 monitor.leave(); 241 } 242 } else { 243 // It is possible due to races the we are currently in the expected state even though we 244 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 245 // even check the guard. I don't think we care too much about this use case but it could lead 246 // to a confusing error message. 247 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state. " 248 + "Current state: " + state()); 249 } 250 } 251 252 @Override public final void awaitTerminated() { 253 monitor.enterWhenUninterruptibly(isStopped); 254 try { 255 checkCurrentState(TERMINATED); 256 } finally { 257 monitor.leave(); 258 } 259 } 260 261 @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 262 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 263 try { 264 checkCurrentState(TERMINATED); 265 } finally { 266 monitor.leave(); 267 } 268 } else { 269 // It is possible due to races the we are currently in the expected state even though we 270 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 271 // even check the guard. I don't think we care too much about this use case but it could lead 272 // to a confusing error message. 273 throw new TimeoutException("Timed out waiting for " + this + " to reach a terminal state. " 274 + "Current state: " + state()); 275 } 276 } 277 278 /** Checks that the current state is equal to the expected state. */ 279 @GuardedBy("monitor") 280 private void checkCurrentState(State expected) { 281 State actual = state(); 282 if (actual != expected) { 283 if (actual == FAILED) { 284 // Handle this specially so that we can include the failureCause, if there is one. 285 throw new IllegalStateException("Expected the service to be " + expected 286 + ", but the service has FAILED", failureCause()); 287 } 288 throw new IllegalStateException("Expected the service to be " + expected + ", but was " 289 + actual); 290 } 291 } 292 293 /** 294 * Implementing classes should invoke this method once their service has started. It will cause 295 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 296 * 297 * @throws IllegalStateException if the service is not {@link State#STARTING}. 298 */ 299 protected final void notifyStarted() { 300 monitor.enter(); 301 try { 302 // We have to examine the internal state of the snapshot here to properly handle the stop 303 // while starting case. 304 if (snapshot.state != STARTING) { 305 IllegalStateException failure = new IllegalStateException( 306 "Cannot notifyStarted() when the service is " + snapshot.state); 307 notifyFailed(failure); 308 throw failure; 309 } 310 311 if (snapshot.shutdownWhenStartupFinishes) { 312 snapshot = new StateSnapshot(STOPPING); 313 // We don't call listeners here because we already did that when we set the 314 // shutdownWhenStartupFinishes flag. 315 doStop(); 316 } else { 317 snapshot = new StateSnapshot(RUNNING); 318 running(); 319 } 320 } finally { 321 monitor.leave(); 322 executeListeners(); 323 } 324 } 325 326 /** 327 * Implementing classes should invoke this method once their service has stopped. It will cause 328 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. 329 * 330 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor 331 * {@link State#RUNNING}. 332 */ 333 protected final void notifyStopped() { 334 monitor.enter(); 335 try { 336 // We check the internal state of the snapshot instead of state() directly so we don't allow 337 // notifyStopped() to be called while STARTING, even if stop() has already been called. 338 State previous = snapshot.state; 339 if (previous != STOPPING && previous != RUNNING) { 340 IllegalStateException failure = new IllegalStateException( 341 "Cannot notifyStopped() when the service is " + previous); 342 notifyFailed(failure); 343 throw failure; 344 } 345 snapshot = new StateSnapshot(TERMINATED); 346 terminated(previous); 347 } finally { 348 monitor.leave(); 349 executeListeners(); 350 } 351 } 352 353 /** 354 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 355 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 356 * or otherwise cannot be started nor stopped. 357 */ 358 protected final void notifyFailed(Throwable cause) { 359 checkNotNull(cause); 360 361 monitor.enter(); 362 try { 363 State previous = state(); 364 switch (previous) { 365 case NEW: 366 case TERMINATED: 367 throw new IllegalStateException("Failed while in state:" + previous, cause); 368 case RUNNING: 369 case STARTING: 370 case STOPPING: 371 snapshot = new StateSnapshot(FAILED, false, cause); 372 failed(previous, cause); 373 break; 374 case FAILED: 375 // Do nothing 376 break; 377 default: 378 throw new AssertionError("Unexpected state: " + previous); 379 } 380 } finally { 381 monitor.leave(); 382 executeListeners(); 383 } 384 } 385 386 @Override 387 public final boolean isRunning() { 388 return state() == RUNNING; 389 } 390 391 @Override 392 public final State state() { 393 return snapshot.externalState(); 394 } 395 396 /** 397 * @since 14.0 398 */ 399 @Override 400 public final Throwable failureCause() { 401 return snapshot.failureCause(); 402 } 403 404 /** 405 * @since 13.0 406 */ 407 @Override 408 public final void addListener(Listener listener, Executor executor) { 409 checkNotNull(listener, "listener"); 410 checkNotNull(executor, "executor"); 411 monitor.enter(); 412 try { 413 if (!state().isTerminal()) { 414 listeners.add(new ListenerCallQueue<Listener>(listener, executor)); 415 } 416 } finally { 417 monitor.leave(); 418 } 419 } 420 421 @Override public String toString() { 422 return getClass().getSimpleName() + " [" + state() + "]"; 423 } 424 425 /** 426 * Attempts to execute all the listeners in {@link #listeners} while not holding the 427 * {@link #monitor}. 428 */ 429 private void executeListeners() { 430 if (!monitor.isOccupiedByCurrentThread()) { 431 // iterate by index to avoid concurrent modification exceptions 432 for (int i = 0; i < listeners.size(); i++) { 433 listeners.get(i).execute(); 434 } 435 } 436 } 437 438 @GuardedBy("monitor") 439 private void starting() { 440 STARTING_CALLBACK.enqueueOn(listeners); 441 } 442 443 @GuardedBy("monitor") 444 private void running() { 445 RUNNING_CALLBACK.enqueueOn(listeners); 446 } 447 448 @GuardedBy("monitor") 449 private void stopping(final State from) { 450 if (from == State.STARTING) { 451 STOPPING_FROM_STARTING_CALLBACK.enqueueOn(listeners); 452 } else if (from == State.RUNNING) { 453 STOPPING_FROM_RUNNING_CALLBACK.enqueueOn(listeners); 454 } else { 455 throw new AssertionError(); 456 } 457 } 458 459 @GuardedBy("monitor") 460 private void terminated(final State from) { 461 switch(from) { 462 case NEW: 463 TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners); 464 break; 465 case RUNNING: 466 TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners); 467 break; 468 case STOPPING: 469 TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners); 470 break; 471 case STARTING: 472 case TERMINATED: 473 case FAILED: 474 default: 475 throw new AssertionError(); 476 } 477 } 478 479 @GuardedBy("monitor") 480 private void failed(final State from, final Throwable cause) { 481 // can't memoize this one due to the exception 482 new Callback<Listener>("failed({from = " + from + ", cause = " + cause + "})") { 483 @Override void call(Listener listener) { 484 listener.failed(from, cause); 485 } 486 }.enqueueOn(listeners); 487 } 488 489 /** 490 * An immutable snapshot of the current state of the service. This class represents a consistent 491 * snapshot of the state and therefore it can be used to answer simple queries without needing to 492 * grab a lock. 493 */ 494 @Immutable 495 private static final class StateSnapshot { 496 /** 497 * The internal state, which equals external state unless 498 * shutdownWhenStartupFinishes is true. 499 */ 500 final State state; 501 502 /** 503 * If true, the user requested a shutdown while the service was still starting 504 * up. 505 */ 506 final boolean shutdownWhenStartupFinishes; 507 508 /** 509 * The exception that caused this service to fail. This will be {@code null} 510 * unless the service has failed. 511 */ 512 @Nullable 513 final Throwable failure; 514 515 StateSnapshot(State internalState) { 516 this(internalState, false, null); 517 } 518 519 StateSnapshot( 520 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 521 checkArgument(!shutdownWhenStartupFinishes || internalState == STARTING, 522 "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 523 internalState); 524 checkArgument(!(failure != null ^ internalState == FAILED), 525 "A failure cause should be set if and only if the state is failed. Got %s and %s " 526 + "instead.", internalState, failure); 527 this.state = internalState; 528 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 529 this.failure = failure; 530 } 531 532 /** @see Service#state() */ 533 State externalState() { 534 if (shutdownWhenStartupFinishes && state == STARTING) { 535 return STOPPING; 536 } else { 537 return state; 538 } 539 } 540 541 /** @see Service#failureCause() */ 542 Throwable failureCause() { 543 checkState(state == FAILED, 544 "failureCause() is only valid if the service has failed, service is %s", state); 545 return failure; 546 } 547 } 548}