001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.base.Preconditions.checkState; 022import static com.google.common.util.concurrent.Service.State.FAILED; 023import static com.google.common.util.concurrent.Service.State.NEW; 024import static com.google.common.util.concurrent.Service.State.RUNNING; 025import static com.google.common.util.concurrent.Service.State.STARTING; 026import static com.google.common.util.concurrent.Service.State.STOPPING; 027import static com.google.common.util.concurrent.Service.State.TERMINATED; 028 029import com.google.common.annotations.Beta; 030import com.google.common.util.concurrent.ListenerCallQueue.Callback; 031import com.google.common.util.concurrent.Monitor.Guard; 032import com.google.common.util.concurrent.Service.State; // javadoc needs this 033import com.google.j2objc.annotations.WeakOuter; 034 035import java.util.ArrayList; 036import java.util.Collections; 037import java.util.List; 038import java.util.concurrent.Executor; 039import java.util.concurrent.TimeUnit; 040import java.util.concurrent.TimeoutException; 041 042import javax.annotation.Nullable; 043import javax.annotation.concurrent.GuardedBy; 044import javax.annotation.concurrent.Immutable; 045 046/** 047 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 048 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 049 * callbacks. Its subclasses must manage threads manually; consider 050 * {@link AbstractExecutionThreadService} if you need only a single execution thread. 051 * 052 * @author Jesse Wilson 053 * @author Luke Sandberg 054 * @since 1.0 055 */ 056@Beta 057public abstract class AbstractService implements Service { 058 private static final Callback<Listener> STARTING_CALLBACK = 059 new Callback<Listener>("starting()") { 060 @Override void call(Listener listener) { 061 listener.starting(); 062 } 063 }; 064 private static final Callback<Listener> RUNNING_CALLBACK = 065 new Callback<Listener>("running()") { 066 @Override void call(Listener listener) { 067 listener.running(); 068 } 069 }; 070 private static final Callback<Listener> STOPPING_FROM_STARTING_CALLBACK = 071 stoppingCallback(STARTING); 072 private static final Callback<Listener> STOPPING_FROM_RUNNING_CALLBACK = 073 stoppingCallback(RUNNING); 074 075 private static final Callback<Listener> TERMINATED_FROM_NEW_CALLBACK = 076 terminatedCallback(NEW); 077 private static final Callback<Listener> TERMINATED_FROM_RUNNING_CALLBACK = 078 terminatedCallback(RUNNING); 079 private static final Callback<Listener> TERMINATED_FROM_STOPPING_CALLBACK = 080 terminatedCallback(STOPPING); 081 082 private static Callback<Listener> terminatedCallback(final State from) { 083 return new Callback<Listener>("terminated({from = " + from + "})") { 084 @Override void call(Listener listener) { 085 listener.terminated(from); 086 } 087 }; 088 } 089 090 private static Callback<Listener> stoppingCallback(final State from) { 091 return new Callback<Listener>("stopping({from = " + from + "})") { 092 @Override void call(Listener listener) { 093 listener.stopping(from); 094 } 095 }; 096 } 097 098 private final Monitor monitor = new Monitor(); 099 100 private final Guard isStartable = new IsStartableGuard(); 101 102 @WeakOuter 103 private final class IsStartableGuard extends Guard { 104 IsStartableGuard() { 105 super(AbstractService.this.monitor); 106 } 107 108 @Override public boolean isSatisfied() { 109 return state() == NEW; 110 } 111 } 112 113 private final Guard isStoppable = new IsStoppableGuard(); 114 115 @WeakOuter 116 private final class IsStoppableGuard extends Guard { 117 IsStoppableGuard() { 118 super(AbstractService.this.monitor); 119 } 120 121 @Override public boolean isSatisfied() { 122 return state().compareTo(RUNNING) <= 0; 123 } 124 } 125 126 private final Guard hasReachedRunning = new HasReachedRunningGuard(); 127 128 @WeakOuter 129 private final class HasReachedRunningGuard extends Guard { 130 HasReachedRunningGuard() { 131 super(AbstractService.this.monitor); 132 } 133 134 @Override public boolean isSatisfied() { 135 return state().compareTo(RUNNING) >= 0; 136 } 137 } 138 139 private final Guard isStopped = new IsStoppedGuard(); 140 141 @WeakOuter 142 private final class IsStoppedGuard extends Guard { 143 IsStoppedGuard() { 144 super(AbstractService.this.monitor); 145 } 146 147 @Override public boolean isSatisfied() { 148 return state().isTerminal(); 149 } 150 } 151 152 /** 153 * The listeners to notify during a state transition. 154 */ 155 @GuardedBy("monitor") 156 private final List<ListenerCallQueue<Listener>> listeners = 157 Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>()); 158 159 /** 160 * The current state of the service. This should be written with the lock held but can be read 161 * without it because it is an immutable object in a volatile field. This is desirable so that 162 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 163 * without grabbing the lock. 164 * 165 * <p>To update this field correctly the lock must be held to guarantee that the state is 166 * consistent. 167 */ 168 @GuardedBy("monitor") 169 private volatile StateSnapshot snapshot = new StateSnapshot(NEW); 170 171 /** Constructor for use by subclasses. */ 172 protected AbstractService() {} 173 174 /** 175 * This method is called by {@link #startAsync} to initiate service startup. The invocation of 176 * this method should cause a call to {@link #notifyStarted()}, either during this method's run, 177 * or after it has returned. If startup fails, the invocation should cause a call to 178 * {@link #notifyFailed(Throwable)} instead. 179 * 180 * <p>This method should return promptly; prefer to do work on a different thread where it is 181 * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 182 * called multiple times. 183 */ 184 protected abstract void doStart(); 185 186 /** 187 * This method should be used to initiate service shutdown. The invocation of this method should 188 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 189 * returned. If shutdown fails, the invocation should cause a call to 190 * {@link #notifyFailed(Throwable)} instead. 191 * 192 * <p> This method should return promptly; prefer to do work on a different thread where it is 193 * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 194 * called multiple times. 195 */ 196 protected abstract void doStop(); 197 198 @Override public final Service startAsync() { 199 if (monitor.enterIf(isStartable)) { 200 try { 201 snapshot = new StateSnapshot(STARTING); 202 starting(); 203 doStart(); 204 } catch (Throwable startupFailure) { 205 notifyFailed(startupFailure); 206 } finally { 207 monitor.leave(); 208 executeListeners(); 209 } 210 } else { 211 throw new IllegalStateException("Service " + this + " has already been started"); 212 } 213 return this; 214 } 215 216 @Override public final Service stopAsync() { 217 if (monitor.enterIf(isStoppable)) { 218 try { 219 State previous = state(); 220 switch (previous) { 221 case NEW: 222 snapshot = new StateSnapshot(TERMINATED); 223 terminated(NEW); 224 break; 225 case STARTING: 226 snapshot = new StateSnapshot(STARTING, true, null); 227 stopping(STARTING); 228 break; 229 case RUNNING: 230 snapshot = new StateSnapshot(STOPPING); 231 stopping(RUNNING); 232 doStop(); 233 break; 234 case STOPPING: 235 case TERMINATED: 236 case FAILED: 237 // These cases are impossible due to the if statement above. 238 throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous); 239 default: 240 throw new AssertionError("Unexpected state: " + previous); 241 } 242 } catch (Throwable shutdownFailure) { 243 notifyFailed(shutdownFailure); 244 } finally { 245 monitor.leave(); 246 executeListeners(); 247 } 248 } 249 return this; 250 } 251 252 @Override public final void awaitRunning() { 253 monitor.enterWhenUninterruptibly(hasReachedRunning); 254 try { 255 checkCurrentState(RUNNING); 256 } finally { 257 monitor.leave(); 258 } 259 } 260 261 @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 262 if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) { 263 try { 264 checkCurrentState(RUNNING); 265 } finally { 266 monitor.leave(); 267 } 268 } else { 269 // It is possible due to races the we are currently in the expected state even though we 270 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 271 // even check the guard. I don't think we care too much about this use case but it could lead 272 // to a confusing error message. 273 throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state."); 274 } 275 } 276 277 @Override public final void awaitTerminated() { 278 monitor.enterWhenUninterruptibly(isStopped); 279 try { 280 checkCurrentState(TERMINATED); 281 } finally { 282 monitor.leave(); 283 } 284 } 285 286 @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 287 if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) { 288 try { 289 checkCurrentState(TERMINATED); 290 } finally { 291 monitor.leave(); 292 } 293 } else { 294 // It is possible due to races the we are currently in the expected state even though we 295 // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never 296 // even check the guard. I don't think we care too much about this use case but it could lead 297 // to a confusing error message. 298 throw new TimeoutException("Timed out waiting for " + this + " to reach a terminal state. " 299 + "Current state: " + state()); 300 } 301 } 302 303 /** Checks that the current state is equal to the expected state. */ 304 @GuardedBy("monitor") 305 private void checkCurrentState(State expected) { 306 State actual = state(); 307 if (actual != expected) { 308 if (actual == FAILED) { 309 // Handle this specially so that we can include the failureCause, if there is one. 310 throw new IllegalStateException("Expected the service to be " + expected 311 + ", but the service has FAILED", failureCause()); 312 } 313 throw new IllegalStateException("Expected the service to be " + expected + ", but was " 314 + actual); 315 } 316 } 317 318 /** 319 * Implementing classes should invoke this method once their service has started. It will cause 320 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 321 * 322 * @throws IllegalStateException if the service is not {@link State#STARTING}. 323 */ 324 protected final void notifyStarted() { 325 monitor.enter(); 326 try { 327 // We have to examine the internal state of the snapshot here to properly handle the stop 328 // while starting case. 329 if (snapshot.state != STARTING) { 330 IllegalStateException failure = new IllegalStateException( 331 "Cannot notifyStarted() when the service is " + snapshot.state); 332 notifyFailed(failure); 333 throw failure; 334 } 335 336 if (snapshot.shutdownWhenStartupFinishes) { 337 snapshot = new StateSnapshot(STOPPING); 338 // We don't call listeners here because we already did that when we set the 339 // shutdownWhenStartupFinishes flag. 340 doStop(); 341 } else { 342 snapshot = new StateSnapshot(RUNNING); 343 running(); 344 } 345 } finally { 346 monitor.leave(); 347 executeListeners(); 348 } 349 } 350 351 /** 352 * Implementing classes should invoke this method once their service has stopped. It will cause 353 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. 354 * 355 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor 356 * {@link State#RUNNING}. 357 */ 358 protected final void notifyStopped() { 359 monitor.enter(); 360 try { 361 // We check the internal state of the snapshot instead of state() directly so we don't allow 362 // notifyStopped() to be called while STARTING, even if stop() has already been called. 363 State previous = snapshot.state; 364 if (previous != STOPPING && previous != RUNNING) { 365 IllegalStateException failure = new IllegalStateException( 366 "Cannot notifyStopped() when the service is " + previous); 367 notifyFailed(failure); 368 throw failure; 369 } 370 snapshot = new StateSnapshot(TERMINATED); 371 terminated(previous); 372 } finally { 373 monitor.leave(); 374 executeListeners(); 375 } 376 } 377 378 /** 379 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 380 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 381 * or otherwise cannot be started nor stopped. 382 */ 383 protected final void notifyFailed(Throwable cause) { 384 checkNotNull(cause); 385 386 monitor.enter(); 387 try { 388 State previous = state(); 389 switch (previous) { 390 case NEW: 391 case TERMINATED: 392 throw new IllegalStateException("Failed while in state:" + previous, cause); 393 case RUNNING: 394 case STARTING: 395 case STOPPING: 396 snapshot = new StateSnapshot(FAILED, false, cause); 397 failed(previous, cause); 398 break; 399 case FAILED: 400 // Do nothing 401 break; 402 default: 403 throw new AssertionError("Unexpected state: " + previous); 404 } 405 } finally { 406 monitor.leave(); 407 executeListeners(); 408 } 409 } 410 411 @Override 412 public final boolean isRunning() { 413 return state() == RUNNING; 414 } 415 416 @Override 417 public final State state() { 418 return snapshot.externalState(); 419 } 420 421 /** 422 * @since 14.0 423 */ 424 @Override 425 public final Throwable failureCause() { 426 return snapshot.failureCause(); 427 } 428 429 /** 430 * @since 13.0 431 */ 432 @Override 433 public final void addListener(Listener listener, Executor executor) { 434 checkNotNull(listener, "listener"); 435 checkNotNull(executor, "executor"); 436 monitor.enter(); 437 try { 438 if (!state().isTerminal()) { 439 listeners.add(new ListenerCallQueue<Listener>(listener, executor)); 440 } 441 } finally { 442 monitor.leave(); 443 } 444 } 445 446 @Override public String toString() { 447 return getClass().getSimpleName() + " [" + state() + "]"; 448 } 449 450 /** 451 * Attempts to execute all the listeners in {@link #listeners} while not holding the 452 * {@link #monitor}. 453 */ 454 private void executeListeners() { 455 if (!monitor.isOccupiedByCurrentThread()) { 456 // iterate by index to avoid concurrent modification exceptions 457 for (int i = 0; i < listeners.size(); i++) { 458 listeners.get(i).execute(); 459 } 460 } 461 } 462 463 @GuardedBy("monitor") 464 private void starting() { 465 STARTING_CALLBACK.enqueueOn(listeners); 466 } 467 468 @GuardedBy("monitor") 469 private void running() { 470 RUNNING_CALLBACK.enqueueOn(listeners); 471 } 472 473 @GuardedBy("monitor") 474 private void stopping(final State from) { 475 if (from == State.STARTING) { 476 STOPPING_FROM_STARTING_CALLBACK.enqueueOn(listeners); 477 } else if (from == State.RUNNING) { 478 STOPPING_FROM_RUNNING_CALLBACK.enqueueOn(listeners); 479 } else { 480 throw new AssertionError(); 481 } 482 } 483 484 @GuardedBy("monitor") 485 private void terminated(final State from) { 486 switch(from) { 487 case NEW: 488 TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners); 489 break; 490 case RUNNING: 491 TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners); 492 break; 493 case STOPPING: 494 TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners); 495 break; 496 case STARTING: 497 case TERMINATED: 498 case FAILED: 499 default: 500 throw new AssertionError(); 501 } 502 } 503 504 @GuardedBy("monitor") 505 private void failed(final State from, final Throwable cause) { 506 // can't memoize this one due to the exception 507 new Callback<Listener>("failed({from = " + from + ", cause = " + cause + "})") { 508 @Override void call(Listener listener) { 509 listener.failed(from, cause); 510 } 511 }.enqueueOn(listeners); 512 } 513 514 /** 515 * An immutable snapshot of the current state of the service. This class represents a consistent 516 * snapshot of the state and therefore it can be used to answer simple queries without needing to 517 * grab a lock. 518 */ 519 @Immutable 520 private static final class StateSnapshot { 521 /** 522 * The internal state, which equals external state unless 523 * shutdownWhenStartupFinishes is true. 524 */ 525 final State state; 526 527 /** 528 * If true, the user requested a shutdown while the service was still starting 529 * up. 530 */ 531 final boolean shutdownWhenStartupFinishes; 532 533 /** 534 * The exception that caused this service to fail. This will be {@code null} 535 * unless the service has failed. 536 */ 537 @Nullable 538 final Throwable failure; 539 540 StateSnapshot(State internalState) { 541 this(internalState, false, null); 542 } 543 544 StateSnapshot( 545 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 546 checkArgument(!shutdownWhenStartupFinishes || internalState == STARTING, 547 "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 548 internalState); 549 checkArgument(!(failure != null ^ internalState == FAILED), 550 "A failure cause should be set if and only if the state is failed. Got %s and %s " 551 + "instead.", internalState, failure); 552 this.state = internalState; 553 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 554 this.failure = failure; 555 } 556 557 /** @see Service#state() */ 558 State externalState() { 559 if (shutdownWhenStartupFinishes && state == STARTING) { 560 return STOPPING; 561 } else { 562 return state; 563 } 564 } 565 566 /** @see Service#failureCause() */ 567 Throwable failureCause() { 568 checkState(state == FAILED, 569 "failureCause() is only valid if the service has failed, service is %s", state); 570 return failure; 571 } 572 } 573}