001 /* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkArgument; 020 import static com.google.common.base.Preconditions.checkNotNull; 021 import static com.google.common.base.Preconditions.checkState; 022 023 import com.google.common.annotations.Beta; 024 import com.google.common.collect.Lists; 025 import com.google.common.collect.Queues; 026 import com.google.common.util.concurrent.Service.State; // javadoc needs this 027 028 import java.util.List; 029 import java.util.Queue; 030 import java.util.concurrent.ExecutionException; 031 import java.util.concurrent.Executor; 032 import java.util.concurrent.TimeUnit; 033 import java.util.concurrent.TimeoutException; 034 import java.util.concurrent.locks.ReentrantLock; 035 import java.util.logging.Level; 036 import java.util.logging.Logger; 037 038 import javax.annotation.Nullable; 039 import javax.annotation.concurrent.GuardedBy; 040 import javax.annotation.concurrent.Immutable; 041 042 /** 043 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop} 044 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()} 045 * callbacks. Its subclasses must manage threads manually; consider 046 * {@link AbstractExecutionThreadService} if you need only a single execution thread. 047 * 048 * @author Jesse Wilson 049 * @author Luke Sandberg 050 * @since 1.0 051 */ 052 @Beta 053 public abstract class AbstractService implements Service { 054 private static final Logger logger = Logger.getLogger(AbstractService.class.getName()); 055 private final ReentrantLock lock = new ReentrantLock(); 056 057 private final Transition startup = new Transition(); 058 private final Transition shutdown = new Transition(); 059 060 /** 061 * The listeners to notify during a state transition. 062 */ 063 @GuardedBy("lock") 064 private final List<ListenerExecutorPair> listeners = Lists.newArrayList(); 065 066 /** 067 * The queue of listeners that are waiting to be executed. 068 * 069 * <p>Enqueue operations should be protected by {@link #lock} while dequeue operations should be 070 * protected by the implicit lock on this object. Dequeue operations should be executed atomically 071 * with the execution of the {@link Runnable} and additionally the {@link #lock} should not be 072 * held when the listeners are being executed. Use {@link #executeListeners} for this operation. 073 * This is necessary to ensure that elements on the queue are executed in the correct order. 074 * Enqueue operations should be protected so that listeners are added in the correct order. We use 075 * a concurrent queue implementation so that enqueues can be executed concurrently with dequeues. 076 */ 077 @GuardedBy("queuedListeners") 078 private final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue(); 079 080 /** 081 * The current state of the service. This should be written with the lock held but can be read 082 * without it because it is an immutable object in a volatile field. This is desirable so that 083 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run 084 * without grabbing the lock. 085 * 086 * <p>To update this field correctly the lock must be held to guarantee that the state is 087 * consistent. 088 */ 089 @GuardedBy("lock") 090 private volatile StateSnapshot snapshot = new StateSnapshot(State.NEW); 091 092 protected AbstractService() { 093 // Add a listener to update the futures. This needs to be added first so that it is executed 094 // before the other listeners. This way the other listeners can access the completed futures. 095 addListener( 096 new Listener() { 097 @Override public void starting() {} 098 099 @Override public void running() { 100 startup.set(State.RUNNING); 101 } 102 103 @Override public void stopping(State from) { 104 if (from == State.STARTING) { 105 startup.set(State.STOPPING); 106 } 107 } 108 109 @Override public void terminated(State from) { 110 if (from == State.NEW) { 111 startup.set(State.TERMINATED); 112 } 113 shutdown.set(State.TERMINATED); 114 } 115 116 @Override public void failed(State from, Throwable failure) { 117 switch (from) { 118 case STARTING: 119 startup.setException(failure); 120 shutdown.setException(new Exception("Service failed to start.", failure)); 121 break; 122 case RUNNING: 123 shutdown.setException(new Exception("Service failed while running", failure)); 124 break; 125 case STOPPING: 126 shutdown.setException(failure); 127 break; 128 case TERMINATED: /* fall-through */ 129 case FAILED: /* fall-through */ 130 case NEW: /* fall-through */ 131 default: 132 throw new AssertionError("Unexpected from state: " + from); 133 } 134 } 135 }, 136 MoreExecutors.sameThreadExecutor()); 137 } 138 139 /** 140 * This method is called by {@link #start} to initiate service startup. The invocation of this 141 * method should cause a call to {@link #notifyStarted()}, either during this method's run, or 142 * after it has returned. If startup fails, the invocation should cause a call to 143 * {@link #notifyFailed(Throwable)} instead. 144 * 145 * <p>This method should return promptly; prefer to do work on a different thread where it is 146 * convenient. It is invoked exactly once on service startup, even when {@link #start} is called 147 * multiple times. 148 */ 149 protected abstract void doStart(); 150 151 /** 152 * This method should be used to initiate service shutdown. The invocation of this method should 153 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has 154 * returned. If shutdown fails, the invocation should cause a call to 155 * {@link #notifyFailed(Throwable)} instead. 156 * 157 * <p> This method should return promptly; prefer to do work on a different thread where it is 158 * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called 159 * multiple times. 160 */ 161 protected abstract void doStop(); 162 163 @Override 164 public final ListenableFuture<State> start() { 165 lock.lock(); 166 try { 167 if (snapshot.state == State.NEW) { 168 snapshot = new StateSnapshot(State.STARTING); 169 starting(); 170 doStart(); 171 } 172 } catch (Throwable startupFailure) { 173 notifyFailed(startupFailure); 174 } finally { 175 lock.unlock(); 176 executeListeners(); 177 } 178 179 return startup; 180 } 181 182 @Override 183 public final ListenableFuture<State> stop() { 184 lock.lock(); 185 try { 186 switch (snapshot.state) { 187 case NEW: 188 snapshot = new StateSnapshot(State.TERMINATED); 189 terminated(State.NEW); 190 break; 191 case STARTING: 192 snapshot = new StateSnapshot(State.STARTING, true, null); 193 stopping(State.STARTING); 194 break; 195 case RUNNING: 196 snapshot = new StateSnapshot(State.STOPPING); 197 stopping(State.RUNNING); 198 doStop(); 199 break; 200 case STOPPING: 201 case TERMINATED: 202 case FAILED: 203 // do nothing 204 break; 205 default: 206 throw new AssertionError("Unexpected state: " + snapshot.state); 207 } 208 } catch (Throwable shutdownFailure) { 209 notifyFailed(shutdownFailure); 210 } finally { 211 lock.unlock(); 212 executeListeners(); 213 } 214 215 return shutdown; 216 } 217 218 @Override 219 public State startAndWait() { 220 return Futures.getUnchecked(start()); 221 } 222 223 @Override 224 public State stopAndWait() { 225 return Futures.getUnchecked(stop()); 226 } 227 228 /** 229 * Implementing classes should invoke this method once their service has started. It will cause 230 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}. 231 * 232 * @throws IllegalStateException if the service is not {@link State#STARTING}. 233 */ 234 protected final void notifyStarted() { 235 lock.lock(); 236 try { 237 if (snapshot.state != State.STARTING) { 238 IllegalStateException failure = new IllegalStateException( 239 "Cannot notifyStarted() when the service is " + snapshot.state); 240 notifyFailed(failure); 241 throw failure; 242 } 243 244 if (snapshot.shutdownWhenStartupFinishes) { 245 snapshot = new StateSnapshot(State.STOPPING); 246 // We don't call listeners here because we already did that when we set the 247 // shutdownWhenStartupFinishes flag. 248 doStop(); 249 } else { 250 snapshot = new StateSnapshot(State.RUNNING); 251 running(); 252 } 253 } finally { 254 lock.unlock(); 255 executeListeners(); 256 } 257 } 258 259 /** 260 * Implementing classes should invoke this method once their service has stopped. It will cause 261 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}. 262 * 263 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor 264 * {@link State#RUNNING}. 265 */ 266 protected final void notifyStopped() { 267 lock.lock(); 268 try { 269 if (snapshot.state != State.STOPPING && snapshot.state != State.RUNNING) { 270 IllegalStateException failure = new IllegalStateException( 271 "Cannot notifyStopped() when the service is " + snapshot.state); 272 notifyFailed(failure); 273 throw failure; 274 } 275 State previous = snapshot.state; 276 snapshot = new StateSnapshot(State.TERMINATED); 277 terminated(previous); 278 } finally { 279 lock.unlock(); 280 executeListeners(); 281 } 282 } 283 284 /** 285 * Invoke this method to transition the service to the {@link State#FAILED}. The service will 286 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically 287 * or otherwise cannot be started nor stopped. 288 */ 289 protected final void notifyFailed(Throwable cause) { 290 checkNotNull(cause); 291 292 lock.lock(); 293 try { 294 switch (snapshot.state) { 295 case NEW: 296 case TERMINATED: 297 throw new IllegalStateException("Failed while in state:" + snapshot.state, cause); 298 case RUNNING: 299 case STARTING: 300 case STOPPING: 301 State previous = snapshot.state; 302 snapshot = new StateSnapshot(State.FAILED, false, cause); 303 failed(previous, cause); 304 break; 305 case FAILED: 306 // Do nothing 307 break; 308 default: 309 throw new AssertionError("Unexpected state: " + snapshot.state); 310 } 311 } finally { 312 lock.unlock(); 313 executeListeners(); 314 } 315 } 316 317 @Override 318 public final boolean isRunning() { 319 return state() == State.RUNNING; 320 } 321 322 @Override 323 public final State state() { 324 return snapshot.externalState(); 325 } 326 327 @Override 328 public final void addListener(Listener listener, Executor executor) { 329 checkNotNull(listener, "listener"); 330 checkNotNull(executor, "executor"); 331 lock.lock(); 332 try { 333 if (snapshot.state != State.TERMINATED && snapshot.state != State.FAILED) { 334 listeners.add(new ListenerExecutorPair(listener, executor)); 335 } 336 } finally { 337 lock.unlock(); 338 } 339 } 340 341 @Override public String toString() { 342 return getClass().getSimpleName() + " [" + state() + "]"; 343 } 344 345 /** 346 * A change from one service state to another, plus the result of the change. 347 */ 348 private class Transition extends AbstractFuture<State> { 349 @Override 350 public State get(long timeout, TimeUnit unit) 351 throws InterruptedException, TimeoutException, ExecutionException { 352 try { 353 return super.get(timeout, unit); 354 } catch (TimeoutException e) { 355 throw new TimeoutException(AbstractService.this.toString()); 356 } 357 } 358 } 359 360 /** 361 * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the 362 * {@link #lock}. 363 */ 364 private void executeListeners() { 365 if (!lock.isHeldByCurrentThread()) { 366 synchronized (queuedListeners) { 367 Runnable listener; 368 while ((listener = queuedListeners.poll()) != null) { 369 listener.run(); 370 } 371 } 372 } 373 } 374 375 @GuardedBy("lock") 376 private void starting() { 377 for (final ListenerExecutorPair pair : listeners) { 378 queuedListeners.add(new Runnable() { 379 @Override public void run() { 380 pair.execute(new Runnable() { 381 @Override public void run() { 382 pair.listener.starting(); 383 } 384 }); 385 } 386 }); 387 } 388 } 389 390 @GuardedBy("lock") 391 private void running() { 392 for (final ListenerExecutorPair pair : listeners) { 393 queuedListeners.add(new Runnable() { 394 @Override public void run() { 395 pair.execute(new Runnable() { 396 @Override public void run() { 397 pair.listener.running(); 398 } 399 }); 400 } 401 }); 402 } 403 } 404 405 @GuardedBy("lock") 406 private void stopping(final State from) { 407 for (final ListenerExecutorPair pair : listeners) { 408 queuedListeners.add(new Runnable() { 409 @Override public void run() { 410 pair.execute(new Runnable() { 411 @Override public void run() { 412 pair.listener.stopping(from); 413 } 414 }); 415 } 416 }); 417 } 418 } 419 420 @GuardedBy("lock") 421 private void terminated(final State from) { 422 for (final ListenerExecutorPair pair : listeners) { 423 queuedListeners.add(new Runnable() { 424 @Override public void run() { 425 pair.execute(new Runnable() { 426 @Override public void run() { 427 pair.listener.terminated(from); 428 } 429 }); 430 } 431 }); 432 } 433 // There are no more state transitions so we can clear this out. 434 listeners.clear(); 435 } 436 437 @GuardedBy("lock") 438 private void failed(final State from, final Throwable cause) { 439 for (final ListenerExecutorPair pair : listeners) { 440 queuedListeners.add(new Runnable() { 441 @Override public void run() { 442 pair.execute(new Runnable() { 443 @Override public void run() { 444 pair.listener.failed(from, cause); 445 } 446 }); 447 } 448 }); 449 } 450 // There are no more state transitions so we can clear this out. 451 listeners.clear(); 452 } 453 454 /** A simple holder for a listener and its executor. */ 455 private static class ListenerExecutorPair { 456 final Listener listener; 457 final Executor executor; 458 459 ListenerExecutorPair(Listener listener, Executor executor) { 460 this.listener = listener; 461 this.executor = executor; 462 } 463 464 /** 465 * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all 466 * exceptions 467 */ 468 void execute(Runnable runnable) { 469 try { 470 executor.execute(runnable); 471 } catch (Exception e) { 472 logger.log(Level.SEVERE, "Exception while executing listener " + listener 473 + " with executor " + executor, e); 474 } 475 } 476 } 477 478 /** 479 * An immutable snapshot of the current state of the service. This class represents a consistent 480 * snapshot of the state and therefore it can be used to answer simple queries without needing to 481 * grab a lock. 482 */ 483 @Immutable 484 private static final class StateSnapshot { 485 /** 486 * The internal state, which equals external state unless 487 * shutdownWhenStartupFinishes is true. 488 */ 489 final State state; 490 491 /** 492 * If true, the user requested a shutdown while the service was still starting 493 * up. 494 */ 495 final boolean shutdownWhenStartupFinishes; 496 497 /** 498 * The exception that caused this service to fail. This will be {@code null} 499 * unless the service has failed. 500 */ 501 @Nullable 502 final Throwable failure; 503 504 StateSnapshot(State internalState) { 505 this(internalState, false, null); 506 } 507 508 StateSnapshot(State internalState, boolean shutdownWhenStartupFinishes, Throwable failure) { 509 checkArgument(!shutdownWhenStartupFinishes || internalState == State.STARTING, 510 "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 511 internalState); 512 checkArgument(!(failure != null ^ internalState == State.FAILED), 513 "A failure cause should be set if and only if the state is failed. Got %s and %s " 514 + "instead.", internalState, failure); 515 this.state = internalState; 516 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes; 517 this.failure = failure; 518 } 519 520 /** @see Service#state() */ 521 State externalState() { 522 if (shutdownWhenStartupFinishes && state == State.STARTING) { 523 return State.STOPPING; 524 } else { 525 return state; 526 } 527 } 528 529 /** @see Service#failureCause() */ 530 Throwable failureCause() { 531 checkState(state == State.FAILED, 532 "failureCause() is only valid if the service has failed, service is %s", state); 533 return failure; 534 } 535 } 536 }