001 /*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.util.concurrent;
018
019 import static com.google.common.base.Preconditions.checkArgument;
020 import static com.google.common.base.Preconditions.checkNotNull;
021 import static com.google.common.base.Preconditions.checkState;
022
023 import com.google.common.annotations.Beta;
024 import com.google.common.collect.Lists;
025 import com.google.common.collect.Queues;
026 import com.google.common.util.concurrent.Service.State; // javadoc needs this
027
028 import java.util.List;
029 import java.util.Queue;
030 import java.util.concurrent.ExecutionException;
031 import java.util.concurrent.Executor;
032 import java.util.concurrent.TimeUnit;
033 import java.util.concurrent.TimeoutException;
034 import java.util.concurrent.locks.ReentrantLock;
035 import java.util.logging.Level;
036 import java.util.logging.Logger;
037
038 import javax.annotation.Nullable;
039 import javax.annotation.concurrent.GuardedBy;
040 import javax.annotation.concurrent.Immutable;
041
042 /**
043 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
044 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
045 * callbacks. Its subclasses must manage threads manually; consider
046 * {@link AbstractExecutionThreadService} if you need only a single execution thread.
047 *
048 * @author Jesse Wilson
049 * @author Luke Sandberg
050 * @since 1.0
051 */
052 @Beta
053 public abstract class AbstractService implements Service {
054 private static final Logger logger = Logger.getLogger(AbstractService.class.getName());
055 private final ReentrantLock lock = new ReentrantLock();
056
057 private final Transition startup = new Transition();
058 private final Transition shutdown = new Transition();
059
060 /**
061 * The listeners to notify during a state transition.
062 */
063 @GuardedBy("lock")
064 private final List<ListenerExecutorPair> listeners = Lists.newArrayList();
065
066 /**
067 * The queue of listeners that are waiting to be executed.
068 *
069 * <p>Enqueue operations should be protected by {@link #lock} while dequeue operations should be
070 * protected by the implicit lock on this object. Dequeue operations should be executed atomically
071 * with the execution of the {@link Runnable} and additionally the {@link #lock} should not be
072 * held when the listeners are being executed. Use {@link #executeListeners} for this operation.
073 * This is necessary to ensure that elements on the queue are executed in the correct order.
074 * Enqueue operations should be protected so that listeners are added in the correct order. We use
075 * a concurrent queue implementation so that enqueues can be executed concurrently with dequeues.
076 */
077 @GuardedBy("queuedListeners")
078 private final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue();
079
080 /**
081 * The current state of the service. This should be written with the lock held but can be read
082 * without it because it is an immutable object in a volatile field. This is desirable so that
083 * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
084 * without grabbing the lock.
085 *
086 * <p>To update this field correctly the lock must be held to guarantee that the state is
087 * consistent.
088 */
089 @GuardedBy("lock")
090 private volatile StateSnapshot snapshot = new StateSnapshot(State.NEW);
091
092 protected AbstractService() {
093 // Add a listener to update the futures. This needs to be added first so that it is executed
094 // before the other listeners. This way the other listeners can access the completed futures.
095 addListener(
096 new Listener() {
097 @Override public void starting() {}
098
099 @Override public void running() {
100 startup.set(State.RUNNING);
101 }
102
103 @Override public void stopping(State from) {
104 if (from == State.STARTING) {
105 startup.set(State.STOPPING);
106 }
107 }
108
109 @Override public void terminated(State from) {
110 if (from == State.NEW) {
111 startup.set(State.TERMINATED);
112 }
113 shutdown.set(State.TERMINATED);
114 }
115
116 @Override public void failed(State from, Throwable failure) {
117 switch (from) {
118 case STARTING:
119 startup.setException(failure);
120 shutdown.setException(new Exception("Service failed to start.", failure));
121 break;
122 case RUNNING:
123 shutdown.setException(new Exception("Service failed while running", failure));
124 break;
125 case STOPPING:
126 shutdown.setException(failure);
127 break;
128 case TERMINATED: /* fall-through */
129 case FAILED: /* fall-through */
130 case NEW: /* fall-through */
131 default:
132 throw new AssertionError("Unexpected from state: " + from);
133 }
134 }
135 },
136 MoreExecutors.sameThreadExecutor());
137 }
138
139 /**
140 * This method is called by {@link #start} to initiate service startup. The invocation of this
141 * method should cause a call to {@link #notifyStarted()}, either during this method's run, or
142 * after it has returned. If startup fails, the invocation should cause a call to
143 * {@link #notifyFailed(Throwable)} instead.
144 *
145 * <p>This method should return promptly; prefer to do work on a different thread where it is
146 * convenient. It is invoked exactly once on service startup, even when {@link #start} is called
147 * multiple times.
148 */
149 protected abstract void doStart();
150
151 /**
152 * This method should be used to initiate service shutdown. The invocation of this method should
153 * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
154 * returned. If shutdown fails, the invocation should cause a call to
155 * {@link #notifyFailed(Throwable)} instead.
156 *
157 * <p> This method should return promptly; prefer to do work on a different thread where it is
158 * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called
159 * multiple times.
160 */
161 protected abstract void doStop();
162
163 @Override
164 public final ListenableFuture<State> start() {
165 lock.lock();
166 try {
167 if (snapshot.state == State.NEW) {
168 snapshot = new StateSnapshot(State.STARTING);
169 starting();
170 doStart();
171 }
172 } catch (Throwable startupFailure) {
173 notifyFailed(startupFailure);
174 } finally {
175 lock.unlock();
176 executeListeners();
177 }
178
179 return startup;
180 }
181
182 @Override
183 public final ListenableFuture<State> stop() {
184 lock.lock();
185 try {
186 switch (snapshot.state) {
187 case NEW:
188 snapshot = new StateSnapshot(State.TERMINATED);
189 terminated(State.NEW);
190 break;
191 case STARTING:
192 snapshot = new StateSnapshot(State.STARTING, true, null);
193 stopping(State.STARTING);
194 break;
195 case RUNNING:
196 snapshot = new StateSnapshot(State.STOPPING);
197 stopping(State.RUNNING);
198 doStop();
199 break;
200 case STOPPING:
201 case TERMINATED:
202 case FAILED:
203 // do nothing
204 break;
205 default:
206 throw new AssertionError("Unexpected state: " + snapshot.state);
207 }
208 } catch (Throwable shutdownFailure) {
209 notifyFailed(shutdownFailure);
210 } finally {
211 lock.unlock();
212 executeListeners();
213 }
214
215 return shutdown;
216 }
217
218 @Override
219 public State startAndWait() {
220 return Futures.getUnchecked(start());
221 }
222
223 @Override
224 public State stopAndWait() {
225 return Futures.getUnchecked(stop());
226 }
227
228 /**
229 * Implementing classes should invoke this method once their service has started. It will cause
230 * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
231 *
232 * @throws IllegalStateException if the service is not {@link State#STARTING}.
233 */
234 protected final void notifyStarted() {
235 lock.lock();
236 try {
237 if (snapshot.state != State.STARTING) {
238 IllegalStateException failure = new IllegalStateException(
239 "Cannot notifyStarted() when the service is " + snapshot.state);
240 notifyFailed(failure);
241 throw failure;
242 }
243
244 if (snapshot.shutdownWhenStartupFinishes) {
245 snapshot = new StateSnapshot(State.STOPPING);
246 // We don't call listeners here because we already did that when we set the
247 // shutdownWhenStartupFinishes flag.
248 doStop();
249 } else {
250 snapshot = new StateSnapshot(State.RUNNING);
251 running();
252 }
253 } finally {
254 lock.unlock();
255 executeListeners();
256 }
257 }
258
259 /**
260 * Implementing classes should invoke this method once their service has stopped. It will cause
261 * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
262 *
263 * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
264 * {@link State#RUNNING}.
265 */
266 protected final void notifyStopped() {
267 lock.lock();
268 try {
269 if (snapshot.state != State.STOPPING && snapshot.state != State.RUNNING) {
270 IllegalStateException failure = new IllegalStateException(
271 "Cannot notifyStopped() when the service is " + snapshot.state);
272 notifyFailed(failure);
273 throw failure;
274 }
275 State previous = snapshot.state;
276 snapshot = new StateSnapshot(State.TERMINATED);
277 terminated(previous);
278 } finally {
279 lock.unlock();
280 executeListeners();
281 }
282 }
283
284 /**
285 * Invoke this method to transition the service to the {@link State#FAILED}. The service will
286 * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
287 * or otherwise cannot be started nor stopped.
288 */
289 protected final void notifyFailed(Throwable cause) {
290 checkNotNull(cause);
291
292 lock.lock();
293 try {
294 switch (snapshot.state) {
295 case NEW:
296 case TERMINATED:
297 throw new IllegalStateException("Failed while in state:" + snapshot.state, cause);
298 case RUNNING:
299 case STARTING:
300 case STOPPING:
301 State previous = snapshot.state;
302 snapshot = new StateSnapshot(State.FAILED, false, cause);
303 failed(previous, cause);
304 break;
305 case FAILED:
306 // Do nothing
307 break;
308 default:
309 throw new AssertionError("Unexpected state: " + snapshot.state);
310 }
311 } finally {
312 lock.unlock();
313 executeListeners();
314 }
315 }
316
317 @Override
318 public final boolean isRunning() {
319 return state() == State.RUNNING;
320 }
321
322 @Override
323 public final State state() {
324 return snapshot.externalState();
325 }
326
327 @Override
328 public final void addListener(Listener listener, Executor executor) {
329 checkNotNull(listener, "listener");
330 checkNotNull(executor, "executor");
331 lock.lock();
332 try {
333 if (snapshot.state != State.TERMINATED && snapshot.state != State.FAILED) {
334 listeners.add(new ListenerExecutorPair(listener, executor));
335 }
336 } finally {
337 lock.unlock();
338 }
339 }
340
341 @Override public String toString() {
342 return getClass().getSimpleName() + " [" + state() + "]";
343 }
344
345 /**
346 * A change from one service state to another, plus the result of the change.
347 */
348 private class Transition extends AbstractFuture<State> {
349 @Override
350 public State get(long timeout, TimeUnit unit)
351 throws InterruptedException, TimeoutException, ExecutionException {
352 try {
353 return super.get(timeout, unit);
354 } catch (TimeoutException e) {
355 throw new TimeoutException(AbstractService.this.toString());
356 }
357 }
358 }
359
360 /**
361 * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the
362 * {@link #lock}.
363 */
364 private void executeListeners() {
365 if (!lock.isHeldByCurrentThread()) {
366 synchronized (queuedListeners) {
367 Runnable listener;
368 while ((listener = queuedListeners.poll()) != null) {
369 listener.run();
370 }
371 }
372 }
373 }
374
375 @GuardedBy("lock")
376 private void starting() {
377 for (final ListenerExecutorPair pair : listeners) {
378 queuedListeners.add(new Runnable() {
379 @Override public void run() {
380 pair.execute(new Runnable() {
381 @Override public void run() {
382 pair.listener.starting();
383 }
384 });
385 }
386 });
387 }
388 }
389
390 @GuardedBy("lock")
391 private void running() {
392 for (final ListenerExecutorPair pair : listeners) {
393 queuedListeners.add(new Runnable() {
394 @Override public void run() {
395 pair.execute(new Runnable() {
396 @Override public void run() {
397 pair.listener.running();
398 }
399 });
400 }
401 });
402 }
403 }
404
405 @GuardedBy("lock")
406 private void stopping(final State from) {
407 for (final ListenerExecutorPair pair : listeners) {
408 queuedListeners.add(new Runnable() {
409 @Override public void run() {
410 pair.execute(new Runnable() {
411 @Override public void run() {
412 pair.listener.stopping(from);
413 }
414 });
415 }
416 });
417 }
418 }
419
420 @GuardedBy("lock")
421 private void terminated(final State from) {
422 for (final ListenerExecutorPair pair : listeners) {
423 queuedListeners.add(new Runnable() {
424 @Override public void run() {
425 pair.execute(new Runnable() {
426 @Override public void run() {
427 pair.listener.terminated(from);
428 }
429 });
430 }
431 });
432 }
433 // There are no more state transitions so we can clear this out.
434 listeners.clear();
435 }
436
437 @GuardedBy("lock")
438 private void failed(final State from, final Throwable cause) {
439 for (final ListenerExecutorPair pair : listeners) {
440 queuedListeners.add(new Runnable() {
441 @Override public void run() {
442 pair.execute(new Runnable() {
443 @Override public void run() {
444 pair.listener.failed(from, cause);
445 }
446 });
447 }
448 });
449 }
450 // There are no more state transitions so we can clear this out.
451 listeners.clear();
452 }
453
454 /** A simple holder for a listener and its executor. */
455 private static class ListenerExecutorPair {
456 final Listener listener;
457 final Executor executor;
458
459 ListenerExecutorPair(Listener listener, Executor executor) {
460 this.listener = listener;
461 this.executor = executor;
462 }
463
464 /**
465 * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all
466 * exceptions
467 */
468 void execute(Runnable runnable) {
469 try {
470 executor.execute(runnable);
471 } catch (Exception e) {
472 logger.log(Level.SEVERE, "Exception while executing listener " + listener
473 + " with executor " + executor, e);
474 }
475 }
476 }
477
478 /**
479 * An immutable snapshot of the current state of the service. This class represents a consistent
480 * snapshot of the state and therefore it can be used to answer simple queries without needing to
481 * grab a lock.
482 */
483 @Immutable
484 private static final class StateSnapshot {
485 /**
486 * The internal state, which equals external state unless
487 * shutdownWhenStartupFinishes is true.
488 */
489 final State state;
490
491 /**
492 * If true, the user requested a shutdown while the service was still starting
493 * up.
494 */
495 final boolean shutdownWhenStartupFinishes;
496
497 /**
498 * The exception that caused this service to fail. This will be {@code null}
499 * unless the service has failed.
500 */
501 @Nullable
502 final Throwable failure;
503
504 StateSnapshot(State internalState) {
505 this(internalState, false, null);
506 }
507
508 StateSnapshot(State internalState, boolean shutdownWhenStartupFinishes, Throwable failure) {
509 checkArgument(!shutdownWhenStartupFinishes || internalState == State.STARTING,
510 "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
511 internalState);
512 checkArgument(!(failure != null ^ internalState == State.FAILED),
513 "A failure cause should be set if and only if the state is failed. Got %s and %s "
514 + "instead.", internalState, failure);
515 this.state = internalState;
516 this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
517 this.failure = failure;
518 }
519
520 /** @see Service#state() */
521 State externalState() {
522 if (shutdownWhenStartupFinishes && state == State.STARTING) {
523 return State.STOPPING;
524 } else {
525 return state;
526 }
527 }
528
529 /** @see Service#failureCause() */
530 Throwable failureCause() {
531 checkState(state == State.FAILED,
532 "failureCause() is only valid if the service has failed, service is %s", state);
533 return failure;
534 }
535 }
536 }