001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.base.Preconditions.checkState; 022import static com.google.common.util.concurrent.Service.State.FAILED; 023import static com.google.common.util.concurrent.Service.State.NEW; 024import static com.google.common.util.concurrent.Service.State.RUNNING; 025import static com.google.common.util.concurrent.Service.State.STARTING; 026import static com.google.common.util.concurrent.Service.State.STOPPING; 027import static com.google.common.util.concurrent.Service.State.TERMINATED; 028 029import com.google.common.annotations.Beta; 030import com.google.common.util.concurrent.ListenerCallQueue.Callback; 031import com.google.common.util.concurrent.Monitor.Guard; 032import com.google.common.util.concurrent.Service.State; // javadoc needs this 033 034import java.util.ArrayList; 035import java.util.Collections; 036import java.util.List; 037import java.util.concurrent.Executor; 038import java.util.concurrent.TimeUnit; 039import java.util.concurrent.TimeoutException; 040 041import javax.annotation.Nullable; 042import javax.annotation.concurrent.GuardedBy; 043import javax.annotation.concurrent.Immutable; 044 045/** 046 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 047 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 048 * callbacks. Its subclasses must manage threads manually; consider 049 * {@link AbstractExecutionThreadService} if you need only a single execution thread. 050 * 051 * @author Jesse Wilson 052 * @author Luke Sandberg 053 * @since 1.0 054 */ 055@Beta 056public abstract class AbstractService implements Service { 057 private static final Callback<Listener> STARTING_CALLBACK = 058 new Callback<Listener>("starting()") { 059 @Override void call(Listener listener) { 060 listener.starting(); 061 } 062 }; 063 private static final Callback<Listener> RUNNING_CALLBACK = 064 new Callback<Listener>("running()") { 065 @Override void call(Listener listener) { 066 listener.running(); 067 } 068 }; 069 private static final Callback<Listener> STOPPING_FROM_STARTING_CALLBACK = 070 stoppingCallback(STARTING); 071 private static final Callback<Listener> STOPPING_FROM_RUNNING_CALLBACK = 072 stoppingCallback(RUNNING); 073 074 private static final Callback<Listener> TERMINATED_FROM_NEW_CALLBACK = 075 terminatedCallback(NEW); 076 private static final Callback<Listener> TERMINATED_FROM_RUNNING_CALLBACK = 077 terminatedCallback(RUNNING); 078 private static final Callback<Listener> TERMINATED_FROM_STOPPING_CALLBACK = 079 terminatedCallback(STOPPING); 080 081 private static Callback<Listener> terminatedCallback(final State from) { 082 return new Callback<Listener>("terminated({from = " + from + "})") { 083 @Override void call(Listener listener) { 084 listener.terminated(from); 085 } 086 }; 087 } 088 089 private static Callback<Listener> stoppingCallback(final State from) { 090 return new Callback<Listener>("stopping({from = " + from + "})") { 091 @Override void call(Listener listener) { 092 listener.stopping(from); 093 } 094 }; 095 } 096 097 private final Monitor monitor = new Monitor(); 098 099 private final Guard isStartable = new Guard(monitor) { 100 @Override public boolean isSatisfied() { 101 return state() == NEW; 102 } 103 }; 104 105 private final Guard isStoppable = new Guard(monitor) { 106 @Override public boolean isSatisfied() { 107 return state().compareTo(RUNNING) <= 0; 108 } 109 }; 110 111 private final Guard hasReachedRunning = new Guard(monitor) { 112 @Override public boolean isSatisfied() { 113 return state().compareTo(RUNNING) >= 0; 114 } 115 }; 116 117 private final Guard isStopped = new Guard(monitor) { 118 @Override public boolean isSatisfied() { 119 return state().isTerminal(); 120 } 121 }; 122 123 /** 124 * The listeners to notify during a state transition. 125 */ 126 @GuardedBy("monitor") 127 private final List<ListenerCallQueue<Listener>> listeners = 128 Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>()); 129 130 /** 131 * The current state of the service. This should be written with the lock held but can be read 132 * without it because it is an immutable object in a volatile field. This is desirable so that 133 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 134 * without grabbing the lock. 135 * 136 * <p>To update this field correctly the lock must be held to guarantee that the state is 137 * consistent. 138 */ 139 @GuardedBy("monitor") 140 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 141 142 /** Constructor for use by subclasses. */ 143 protected AbstractService() {} 144 145 /** 146 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 147 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 148 * or after it has returned. If startup fails, the invocation should cause a call to 149 * {@link #notifyFailed(Throwable)} instead. 150 * 151 * <p>This method should return promptly; prefer to do work on a different thread where it is 152 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 153 * called multiple times. 154 */ 155 protected abstract void doStart(); 156 157 /** 158 * This method should be used to initiate service shutdown. The invocation of this method should 159 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 160 * returned. If shutdown fails, the invocation should cause a call to 161 * {@link #notifyFailed(Throwable)} instead. 162 * 163 * <p> This method should return promptly; prefer to do work on a different thread where it is 164 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 165 * called multiple times. 166 */ 167 protected abstract void doStop(); 168 169 @Override public final Service startAsync() { 170 if (monitor.enterIf(isStartable)) { 171 try { 172 snapshot = new StateSnapshot(STARTING); 173 starting(); 174 doStart(); 175 } catch (Throwable startupFailure) { 176 notifyFailed(startupFailure); 177 } finally { 178 monitor.leave(); 179 executeListeners(); 180 } 181 } else { 182 throw new IllegalStateException("Service " + this + " has already been started"); 183 } 184 return this; 185 } 186 187 @Override public final Service stopAsync() { 188 if (monitor.enterIf(isStoppable)) { 189 try { 190 State previous = state(); 191 switch (previous) { 192 case NEW: 193 snapshot = new StateSnapshot(TERMINATED); 194 terminated(NEW); 195 break; 196 case STARTING: 197 snapshot = new StateSnapshot(STARTING, true, null); 198 stopping(STARTING); 199 break; 200 case RUNNING: 201 snapshot = new StateSnapshot(STOPPING); 202 stopping(RUNNING); 203 doStop(); 204 break; 205 case STOPPING: 206 case TERMINATED: 207 case FAILED: 208 // These cases are impossible due to the if statement above. 209 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 210 default: 211 throw new AssertionError("Unexpected state: " + previous); 212 } 213 } catch (Throwable shutdownFailure) { 214 notifyFailed(shutdownFailure); 215 } finally { 216 monitor.leave(); 217 executeListeners(); 218 } 219 } 220 return this; 221 } 222 223 @Override public final void awaitRunning() { 224 monitor.enterWhenUninterruptibly(hasReachedRunning); 225 try { 226 checkCurrentState(RUNNING); 227 } finally { 228 monitor.leave(); 229 } 230 } 231 232 @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 233 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 234 try { 235 checkCurrentState(RUNNING); 236 } finally { 237 monitor.leave(); 238 } 239 } else { 240 // It is possible due to races the we are currently in the expected state even though we 241 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 242 // even check the guard. I don't think we care too much about this use case but it could lead 243 // to a confusing error message. 244 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state. " 245 + "Current state: " + state()); 246 } 247 } 248 249 @Override public final void awaitTerminated() { 250 monitor.enterWhenUninterruptibly(isStopped); 251 try { 252 checkCurrentState(TERMINATED); 253 } finally { 254 monitor.leave(); 255 } 256 } 257 258 @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 259 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 260 try { 261 checkCurrentState(TERMINATED); 262 } finally { 263 monitor.leave(); 264 } 265 } else { 266 // It is possible due to races the we are currently in the expected state even though we 267 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 268 // even check the guard. I don't think we care too much about this use case but it could lead 269 // to a confusing error message. 270 throw new TimeoutException("Timed out waiting for " + this + " to reach a terminal state. " 271 + "Current state: " + state()); 272 } 273 } 274 275 /** Checks that the current state is equal to the expected state. */ 276 @GuardedBy("monitor") 277 private void checkCurrentState(State expected) { 278 State actual = state(); 279 if (actual != expected) { 280 if (actual == FAILED) { 281 // Handle this specially so that we can include the failureCause, if there is one. 282 throw new IllegalStateException("Expected the service to be " + expected 283 + ", but the service has FAILED", failureCause()); 284 } 285 throw new IllegalStateException("Expected the service to be " + expected + ", but was " 286 + actual); 287 } 288 } 289 290 /** 291 * Implementing classes should invoke this method once their service has started. It will cause 292 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 293 * 294 * @throws IllegalStateException if the service is not {@link State#STARTING}. 295 */ 296 protected final void notifyStarted() { 297 monitor.enter(); 298 try { 299 // We have to examine the internal state of the snapshot here to properly handle the stop 300 // while starting case. 301 if (snapshot.state != STARTING) { 302 IllegalStateException failure = new IllegalStateException( 303 "Cannot notifyStarted() when the service is " + snapshot.state); 304 notifyFailed(failure); 305 throw failure; 306 } 307 308 if (snapshot.shutdownWhenStartupFinishes) { 309 snapshot = new StateSnapshot(STOPPING); 310 // We don't call listeners here because we already did that when we set the 311 // shutdownWhenStartupFinishes flag. 312 doStop(); 313 } else { 314 snapshot = new StateSnapshot(RUNNING); 315 running(); 316 } 317 } finally { 318 monitor.leave(); 319 executeListeners(); 320 } 321 } 322 323 /** 324 * Implementing classes should invoke this method once their service has stopped. It will cause 325 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. 326 * 327 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor 328 * {@link State#RUNNING}. 329 */ 330 protected final void notifyStopped() { 331 monitor.enter(); 332 try { 333 // We check the internal state of the snapshot instead of state() directly so we don't allow 334 // notifyStopped() to be called while STARTING, even if stop() has already been called. 335 State previous = snapshot.state; 336 if (previous != STOPPING && previous != RUNNING) { 337 IllegalStateException failure = new IllegalStateException( 338 "Cannot notifyStopped() when the service is " + previous); 339 notifyFailed(failure); 340 throw failure; 341 } 342 snapshot = new StateSnapshot(TERMINATED); 343 terminated(previous); 344 } finally { 345 monitor.leave(); 346 executeListeners(); 347 } 348 } 349 350 /** 351 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 352 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 353 * or otherwise cannot be started nor stopped. 354 */ 355 protected final void notifyFailed(Throwable cause) { 356 checkNotNull(cause); 357 358 monitor.enter(); 359 try { 360 State previous = state(); 361 switch (previous) { 362 case NEW: 363 case TERMINATED: 364 throw new IllegalStateException("Failed while in state:" + previous, cause); 365 case RUNNING: 366 case STARTING: 367 case STOPPING: 368 snapshot = new StateSnapshot(FAILED, false, cause); 369 failed(previous, cause); 370 break; 371 case FAILED: 372 // Do nothing 373 break; 374 default: 375 throw new AssertionError("Unexpected state: " + previous); 376 } 377 } finally { 378 monitor.leave(); 379 executeListeners(); 380 } 381 } 382 383 @Override 384 public final boolean isRunning() { 385 return state() == RUNNING; 386 } 387 388 @Override 389 public final State state() { 390 return snapshot.externalState(); 391 } 392 393 /** 394 * @since 14.0 395 */ 396 @Override 397 public final Throwable failureCause() { 398 return snapshot.failureCause(); 399 } 400 401 /** 402 * @since 13.0 403 */ 404 @Override 405 public final void addListener(Listener listener, Executor executor) { 406 checkNotNull(listener, "listener"); 407 checkNotNull(executor, "executor"); 408 monitor.enter(); 409 try { 410 if (!state().isTerminal()) { 411 listeners.add(new ListenerCallQueue<Listener>(listener, executor)); 412 } 413 } finally { 414 monitor.leave(); 415 } 416 } 417 418 @Override public String toString() { 419 return getClass().getSimpleName() + " [" + state() + "]"; 420 } 421 422 /** 423 * Attempts to execute all the listeners in {@link #listeners} while not holding the 424 * {@link #monitor}. 425 */ 426 private void executeListeners() { 427 if (!monitor.isOccupiedByCurrentThread()) { 428 // iterate by index to avoid concurrent modification exceptions 429 for (int i = 0; i < listeners.size(); i++) { 430 listeners.get(i).execute(); 431 } 432 } 433 } 434 435 @GuardedBy("monitor") 436 private void starting() { 437 STARTING_CALLBACK.enqueueOn(listeners); 438 } 439 440 @GuardedBy("monitor") 441 private void running() { 442 RUNNING_CALLBACK.enqueueOn(listeners); 443 } 444 445 @GuardedBy("monitor") 446 private void stopping(final State from) { 447 if (from == State.STARTING) { 448 STOPPING_FROM_STARTING_CALLBACK.enqueueOn(listeners); 449 } else if (from == State.RUNNING) { 450 STOPPING_FROM_RUNNING_CALLBACK.enqueueOn(listeners); 451 } else { 452 throw new AssertionError(); 453 } 454 } 455 456 @GuardedBy("monitor") 457 private void terminated(final State from) { 458 switch(from) { 459 case NEW: 460 TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners); 461 break; 462 case RUNNING: 463 TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners); 464 break; 465 case STOPPING: 466 TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners); 467 break; 468 case STARTING: 469 case TERMINATED: 470 case FAILED: 471 default: 472 throw new AssertionError(); 473 } 474 } 475 476 @GuardedBy("monitor") 477 private void failed(final State from, final Throwable cause) { 478 // can't memoize this one due to the exception 479 new Callback<Listener>("failed({from = " + from + ", cause = " + cause + "})") { 480 @Override void call(Listener listener) { 481 listener.failed(from, cause); 482 } 483 }.enqueueOn(listeners); 484 } 485 486 /** 487 * An immutable snapshot of the current state of the service. This class represents a consistent 488 * snapshot of the state and therefore it can be used to answer simple queries without needing to 489 * grab a lock. 490 */ 491 @Immutable 492 private static final class StateSnapshot { 493 /** 494 * The internal state, which equals external state unless 495 * shutdownWhenStartupFinishes is true. 496 */ 497 final State state; 498 499 /** 500 * If true, the user requested a shutdown while the service was still starting 501 * up. 502 */ 503 final boolean shutdownWhenStartupFinishes; 504 505 /** 506 * The exception that caused this service to fail. This will be {@code null} 507 * unless the service has failed. 508 */ 509 @Nullable 510 final Throwable failure; 511 512 StateSnapshot(State internalState) { 513 this(internalState, false, null); 514 } 515 516 StateSnapshot( 517 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 518 checkArgument(!shutdownWhenStartupFinishes || internalState == STARTING, 519 "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 520 internalState); 521 checkArgument(!(failure != null ^ internalState == FAILED), 522 "A failure cause should be set if and only if the state is failed. Got %s and %s " 523 + "instead.", internalState, failure); 524 this.state = internalState; 525 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 526 this.failure = failure; 527 } 528 529 /** @see Service#state() */ 530 State externalState() { 531 if (shutdownWhenStartupFinishes && state == STARTING) { 532 return STOPPING; 533 } else { 534 return state; 535 } 536 } 537 538 /** @see Service#failureCause() */ 539 Throwable failureCause() { 540 checkState(state == FAILED, 541 "failureCause() is only valid if the service has failed, service is %s", state); 542 return failure; 543 } 544 } 545}