001/* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.base.Preconditions.checkState; 022 023import com.google.common.annotations.Beta; 024import com.google.common.collect.Lists; 025import com.google.common.collect.Queues; 026import com.google.common.util.concurrent.Service.State; // javadoc needs this 027 028import java.util.List; 029import java.util.Queue; 030import java.util.concurrent.ExecutionException; 031import java.util.concurrent.Executor; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.TimeoutException; 034import java.util.concurrent.locks.ReentrantLock; 035import java.util.logging.Level; 036import java.util.logging.Logger; 037 038import javax.annotation.Nullable; 039import javax.annotation.concurrent.GuardedBy; 040import javax.annotation.concurrent.Immutable; 041 042/** 043 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 044 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 045 * callbacks. Its subclasses must manage threads manually; consider 046 * {@link AbstractExecutionThreadService} if you need only a single execution thread. 047 * 048 * @author Jesse Wilson 049 * @author Luke Sandberg 050 * @since 1.0 051 */ 052@Beta 053public abstract class AbstractService implements Service { 054 private static final Logger logger = Logger.getLogger(AbstractService.class.getName()); 055 private final ReentrantLock lock = new ReentrantLock(); 056 057 private final Transition startup = new Transition(); 058 private final Transition shutdown = new Transition(); 059 060 /** 061 * The listeners to notify during a state transition. 062 */ 063 @GuardedBy("lock") 064 private final List<ListenerExecutorPair> listeners = Lists.newArrayList(); 065 066 /** 067 * The queue of listeners that are waiting to be executed. 068 * 069 * <p>Enqueue operations should be protected by {@link #lock} while dequeue operations should be 070 * protected by the implicit lock on this object. Dequeue operations should be executed atomically 071 * with the execution of the {@link Runnable} and additionally the {@link #lock} should not be 072 * held when the listeners are being executed. Use {@link #executeListeners} for this operation. 073 * This is necessary to ensure that elements on the queue are executed in the correct order. 074 * Enqueue operations should be protected so that listeners are added in the correct order. We use 075 * a concurrent queue implementation so that enqueues can be executed concurrently with dequeues. 076 */ 077 @GuardedBy("queuedListeners") 078 private final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue(); 079 080 /** 081 * The current state of the service. This should be written with the lock held but can be read 082 * without it because it is an immutable object in a volatile field. This is desirable so that 083 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 084 * without grabbing the lock. 085 * 086 * <p>To update this field correctly the lock must be held to guarantee that the state is 087 * consistent. 088 */ 089 @GuardedBy("lock") 090 private volatile StateSnapshot snapshot = new StateSnapshot(State.NEW); 091 092 /** Constructor for use by subclasses. */ 093 protected AbstractService() { 094 // Add a listener to update the futures. This needs to be added first so that it is executed 095 // before the other listeners. This way the other listeners can access the completed futures. 096 addListener( 097 new Listener() { 098 @Override public void starting() {} 099 100 @Override public void running() { 101 startup.set(State.RUNNING); 102 } 103 104 @Override public void stopping(State from) { 105 if (from == State.STARTING) { 106 startup.set(State.STOPPING); 107 } 108 } 109 110 @Override public void terminated(State from) { 111 if (from == State.NEW) { 112 startup.set(State.TERMINATED); 113 } 114 shutdown.set(State.TERMINATED); 115 } 116 117 @Override public void failed(State from, Throwable failure) { 118 switch (from) { 119 case STARTING: 120 startup.setException(failure); 121 shutdown.setException(new Exception("Service failed to start.", failure)); 122 break; 123 case RUNNING: 124 shutdown.setException(new Exception("Service failed while running", failure)); 125 break; 126 case STOPPING: 127 shutdown.setException(failure); 128 break; 129 case TERMINATED: /* fall-through */ 130 case FAILED: /* fall-through */ 131 case NEW: /* fall-through */ 132 default: 133 throw new AssertionError("Unexpected from state: " + from); 134 } 135 } 136 }, 137 MoreExecutors.sameThreadExecutor()); 138 } 139 140 /** 141 * This method is called by {@link #start} to initiate service startup. The invocation of this 142 * method should cause a call to {@link #notifyStarted()}, either during this method's run, or 143 * after it has returned. If startup fails, the invocation should cause a call to 144 * {@link #notifyFailed(Throwable)} instead. 145 * 146 * <p>This method should return promptly; prefer to do work on a different thread where it is 147 * convenient. It is invoked exactly once on service startup, even when {@link #start} is called 148 * multiple times. 149 */ 150 protected abstract void doStart(); 151 152 /** 153 * This method should be used to initiate service shutdown. The invocation of this method should 154 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 155 * returned. If shutdown fails, the invocation should cause a call to 156 * {@link #notifyFailed(Throwable)} instead. 157 * 158 * <p> This method should return promptly; prefer to do work on a different thread where it is 159 * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called 160 * multiple times. 161 */ 162 protected abstract void doStop(); 163 164 @Override 165 public final ListenableFuture<State> start() { 166 lock.lock(); 167 try { 168 if (snapshot.state == State.NEW) { 169 snapshot = new StateSnapshot(State.STARTING); 170 starting(); 171 doStart(); 172 } 173 } catch (Throwable startupFailure) { 174 notifyFailed(startupFailure); 175 } finally { 176 lock.unlock(); 177 executeListeners(); 178 } 179 180 return startup; 181 } 182 183 @Override 184 public final ListenableFuture<State> stop() { 185 lock.lock(); 186 try { 187 switch (snapshot.state) { 188 case NEW: 189 snapshot = new StateSnapshot(State.TERMINATED); 190 terminated(State.NEW); 191 break; 192 case STARTING: 193 snapshot = new StateSnapshot(State.STARTING, true, null); 194 stopping(State.STARTING); 195 break; 196 case RUNNING: 197 snapshot = new StateSnapshot(State.STOPPING); 198 stopping(State.RUNNING); 199 doStop(); 200 break; 201 case STOPPING: 202 case TERMINATED: 203 case FAILED: 204 // do nothing 205 break; 206 default: 207 throw new AssertionError("Unexpected state: " + snapshot.state); 208 } 209 } catch (Throwable shutdownFailure) { 210 notifyFailed(shutdownFailure); 211 } finally { 212 lock.unlock(); 213 executeListeners(); 214 } 215 216 return shutdown; 217 } 218 219 @Override 220 public State startAndWait() { 221 return Futures.getUnchecked(start()); 222 } 223 224 @Override 225 public State stopAndWait() { 226 return Futures.getUnchecked(stop()); 227 } 228 229 /** 230 * Implementing classes should invoke this method once their service has started. It will cause 231 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 232 * 233 * @throws IllegalStateException if the service is not {@link State#STARTING}. 234 */ 235 protected final void notifyStarted() { 236 lock.lock(); 237 try { 238 if (snapshot.state != State.STARTING) { 239 IllegalStateException failure = new IllegalStateException( 240 "Cannot notifyStarted() when the service is " + snapshot.state); 241 notifyFailed(failure); 242 throw failure; 243 } 244 245 if (snapshot.shutdownWhenStartupFinishes) { 246 snapshot = new StateSnapshot(State.STOPPING); 247 // We don't call listeners here because we already did that when we set the 248 // shutdownWhenStartupFinishes flag. 249 doStop(); 250 } else { 251 snapshot = new StateSnapshot(State.RUNNING); 252 running(); 253 } 254 } finally { 255 lock.unlock(); 256 executeListeners(); 257 } 258 } 259 260 /** 261 * Implementing classes should invoke this method once their service has stopped. It will cause 262 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. 263 * 264 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor 265 * {@link State#RUNNING}. 266 */ 267 protected final void notifyStopped() { 268 lock.lock(); 269 try { 270 if (snapshot.state != State.STOPPING && snapshot.state != State.RUNNING) { 271 IllegalStateException failure = new IllegalStateException( 272 "Cannot notifyStopped() when the service is " + snapshot.state); 273 notifyFailed(failure); 274 throw failure; 275 } 276 State previous = snapshot.state; 277 snapshot = new StateSnapshot(State.TERMINATED); 278 terminated(previous); 279 } finally { 280 lock.unlock(); 281 executeListeners(); 282 } 283 } 284 285 /** 286 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 287 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 288 * or otherwise cannot be started nor stopped. 289 */ 290 protected final void notifyFailed(Throwable cause) { 291 checkNotNull(cause); 292 293 lock.lock(); 294 try { 295 switch (snapshot.state) { 296 case NEW: 297 case TERMINATED: 298 throw new IllegalStateException("Failed while in state:" + snapshot.state, cause); 299 case RUNNING: 300 case STARTING: 301 case STOPPING: 302 State previous = snapshot.state; 303 snapshot = new StateSnapshot(State.FAILED, false, cause); 304 failed(previous, cause); 305 break; 306 case FAILED: 307 // Do nothing 308 break; 309 default: 310 throw new AssertionError("Unexpected state: " + snapshot.state); 311 } 312 } finally { 313 lock.unlock(); 314 executeListeners(); 315 } 316 } 317 318 @Override 319 public final boolean isRunning() { 320 return state() == State.RUNNING; 321 } 322 323 @Override 324 public final State state() { 325 return snapshot.externalState(); 326 } 327 328 /** 329 * @since 14.0 330 */ 331 @Override 332 public final Throwable failureCause() { 333 return snapshot.failureCause(); 334 } 335 336 /** 337 * @since 13.0 338 */ 339 @Override 340 public final void addListener(Listener listener, Executor executor) { 341 checkNotNull(listener, "listener"); 342 checkNotNull(executor, "executor"); 343 lock.lock(); 344 try { 345 if (snapshot.state != State.TERMINATED && snapshot.state != State.FAILED) { 346 listeners.add(new ListenerExecutorPair(listener, executor)); 347 } 348 } finally { 349 lock.unlock(); 350 } 351 } 352 353 @Override public String toString() { 354 return getClass().getSimpleName() + " [" + state() + "]"; 355 } 356 357 /** 358 * A change from one service state to another, plus the result of the change. 359 */ 360 private class Transition extends AbstractFuture<State> { 361 @Override 362 public State get(long timeout, TimeUnit unit) 363 throws InterruptedException, TimeoutException, ExecutionException { 364 try { 365 return super.get(timeout, unit); 366 } catch (TimeoutException e) { 367 throw new TimeoutException(AbstractService.this.toString()); 368 } 369 } 370 } 371 372 /** 373 * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the 374 * {@link #lock}. 375 */ 376 private void executeListeners() { 377 if (!lock.isHeldByCurrentThread()) { 378 synchronized (queuedListeners) { 379 Runnable listener; 380 while ((listener = queuedListeners.poll()) != null) { 381 listener.run(); 382 } 383 } 384 } 385 } 386 387 @GuardedBy("lock") 388 private void starting() { 389 for (final ListenerExecutorPair pair : listeners) { 390 queuedListeners.add(new Runnable() { 391 @Override public void run() { 392 pair.execute(new Runnable() { 393 @Override public void run() { 394 pair.listener.starting(); 395 } 396 }); 397 } 398 }); 399 } 400 } 401 402 @GuardedBy("lock") 403 private void running() { 404 for (final ListenerExecutorPair pair : listeners) { 405 queuedListeners.add(new Runnable() { 406 @Override public void run() { 407 pair.execute(new Runnable() { 408 @Override public void run() { 409 pair.listener.running(); 410 } 411 }); 412 } 413 }); 414 } 415 } 416 417 @GuardedBy("lock") 418 private void stopping(final State from) { 419 for (final ListenerExecutorPair pair : listeners) { 420 queuedListeners.add(new Runnable() { 421 @Override public void run() { 422 pair.execute(new Runnable() { 423 @Override public void run() { 424 pair.listener.stopping(from); 425 } 426 }); 427 } 428 }); 429 } 430 } 431 432 @GuardedBy("lock") 433 private void terminated(final State from) { 434 for (final ListenerExecutorPair pair : listeners) { 435 queuedListeners.add(new Runnable() { 436 @Override public void run() { 437 pair.execute(new Runnable() { 438 @Override public void run() { 439 pair.listener.terminated(from); 440 } 441 }); 442 } 443 }); 444 } 445 // There are no more state transitions so we can clear this out. 446 listeners.clear(); 447 } 448 449 @GuardedBy("lock") 450 private void failed(final State from, final Throwable cause) { 451 for (final ListenerExecutorPair pair : listeners) { 452 queuedListeners.add(new Runnable() { 453 @Override public void run() { 454 pair.execute(new Runnable() { 455 @Override public void run() { 456 pair.listener.failed(from, cause); 457 } 458 }); 459 } 460 }); 461 } 462 // There are no more state transitions so we can clear this out. 463 listeners.clear(); 464 } 465 466 /** A simple holder for a listener and its executor. */ 467 private static class ListenerExecutorPair { 468 final Listener listener; 469 final Executor executor; 470 471 ListenerExecutorPair(Listener listener, Executor executor) { 472 this.listener = listener; 473 this.executor = executor; 474 } 475 476 /** 477 * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all 478 * exceptions 479 */ 480 void execute(Runnable runnable) { 481 try { 482 executor.execute(runnable); 483 } catch (Exception e) { 484 logger.log(Level.SEVERE, "Exception while executing listener " + listener 485 + " with executor " + executor, e); 486 } 487 } 488 } 489 490 /** 491 * An immutable snapshot of the current state of the service. This class represents a consistent 492 * snapshot of the state and therefore it can be used to answer simple queries without needing to 493 * grab a lock. 494 */ 495 @Immutable 496 private static final class StateSnapshot { 497 /** 498 * The internal state, which equals external state unless 499 * shutdownWhenStartupFinishes is true. 500 */ 501 final State state; 502 503 /** 504 * If true, the user requested a shutdown while the service was still starting 505 * up. 506 */ 507 final boolean shutdownWhenStartupFinishes; 508 509 /** 510 * The exception that caused this service to fail. This will be {@code null} 511 * unless the service has failed. 512 */ 513 @Nullable 514 final Throwable failure; 515 516 StateSnapshot(State internalState) { 517 this(internalState, false, null); 518 } 519 520 StateSnapshot( 521 State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) { 522 checkArgument(!shutdownWhenStartupFinishes || internalState == State.STARTING, 523 "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 524 internalState); 525 checkArgument(!(failure != null ^ internalState == State.FAILED), 526 "A failure cause should be set if and only if the state is failed. Got %s and %s " 527 + "instead.", internalState, failure); 528 this.state = internalState; 529 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 530 this.failure = failure; 531 } 532 533 /** @see Service#state() */ 534 State externalState() { 535 if (shutdownWhenStartupFinishes && state == State.STARTING) { 536 return State.STOPPING; 537 } else { 538 return state; 539 } 540 } 541 542 /** @see Service#failureCause() */ 543 Throwable failureCause() { 544 checkState(state == State.FAILED, 545 "failureCause() is only valid if the service has failed, service is %s", state); 546 return failure; 547 } 548 } 549}