001/*
002 * Copyright (C) 2012 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package com.google.common.util.concurrent;
017
018import static com.google.common.base.Preconditions.checkArgument;
019import static com.google.common.base.Preconditions.checkNotNull;
020import static com.google.common.base.Preconditions.checkState;
021import static com.google.common.base.Predicates.equalTo;
022import static com.google.common.base.Predicates.in;
023import static com.google.common.base.Predicates.instanceOf;
024import static com.google.common.base.Predicates.not;
025import static com.google.common.util.concurrent.Service.State.FAILED;
026import static com.google.common.util.concurrent.Service.State.NEW;
027import static com.google.common.util.concurrent.Service.State.RUNNING;
028import static com.google.common.util.concurrent.Service.State.STARTING;
029import static com.google.common.util.concurrent.Service.State.STOPPING;
030import static com.google.common.util.concurrent.Service.State.TERMINATED;
031import static java.util.concurrent.TimeUnit.MILLISECONDS;
032
033import com.google.common.annotations.Beta;
034import com.google.common.base.Function;
035import com.google.common.base.Objects;
036import com.google.common.base.Stopwatch;
037import com.google.common.base.Supplier;
038import com.google.common.collect.Collections2;
039import com.google.common.collect.ImmutableCollection;
040import com.google.common.collect.ImmutableList;
041import com.google.common.collect.ImmutableMap;
042import com.google.common.collect.ImmutableMultimap;
043import com.google.common.collect.ImmutableSet;
044import com.google.common.collect.ImmutableSetMultimap;
045import com.google.common.collect.Lists;
046import com.google.common.collect.Maps;
047import com.google.common.collect.Multimaps;
048import com.google.common.collect.Multiset;
049import com.google.common.collect.Ordering;
050import com.google.common.collect.SetMultimap;
051import com.google.common.collect.Sets;
052import com.google.common.util.concurrent.ListenerCallQueue.Callback;
053import com.google.common.util.concurrent.Service.State;
054
055import java.lang.ref.WeakReference;
056import java.util.ArrayList;
057import java.util.Collection;
058import java.util.Collections;
059import java.util.EnumMap;
060import java.util.List;
061import java.util.Map;
062import java.util.Map.Entry;
063import java.util.Set;
064import java.util.concurrent.Executor;
065import java.util.concurrent.TimeUnit;
066import java.util.concurrent.TimeoutException;
067import java.util.logging.Level;
068import java.util.logging.Logger;
069
070import javax.annotation.concurrent.GuardedBy;
071
072/**
073 * A manager for monitoring and controlling a set of {@linkplain Service services}. This class
074 * provides methods for {@linkplain #startAsync() starting}, {@linkplain #stopAsync() stopping} and
075 * {@linkplain #servicesByState inspecting} a collection of {@linkplain Service services}.
076 * Additionally, users can monitor state transitions with the {@linkplain Listener listener}
077 * mechanism.
078 *
079 * <p>While it is recommended that service lifecycles be managed via this class, state transitions
080 * initiated via other mechanisms do not impact the correctness of its methods. For example, if the
081 * services are started by some mechanism besides {@link #startAsync}, the listeners will be invoked
082 * when appropriate and {@link #awaitHealthy} will still work as expected.
083 *
084 * <p>Here is a simple example of how to use a {@code ServiceManager} to start a server.
085 * <pre>   {@code
086 * class Server {
087 *   public static void main(String[] args) {
088 *     Set<Service> services = ...;
089 *     ServiceManager manager = new ServiceManager(services);
090 *     manager.addListener(new Listener() {
091 *         public void stopped() {}
092 *         public void healthy() {
093 *           // Services have been initialized and are healthy, start accepting requests...
094 *         }
095 *         public void failure(Service service) {
096 *           // Something failed, at this point we could log it, notify a load balancer, or take
097 *           // some other action.  For now we will just exit.
098 *           System.exit(1);
099 *         }
100 *       },
101 *       MoreExecutors.sameThreadExecutor());
102 *
103 *     Runtime.getRuntime().addShutdownHook(new Thread() {
104 *       public void run() {
105 *         // Give the services 5 seconds to stop to ensure that we are responsive to shutdown 
106 *         // requests.
107 *         try {
108 *           manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
109 *         } catch (TimeoutException timeout) {
110 *           // stopping timed out
111 *         }
112 *       }
113 *     });
114 *     manager.startAsync();  // start all the services asynchronously
115 *   }
116 * }}</pre>
117 *
118 * <p>This class uses the ServiceManager's methods to start all of its services, to respond to
119 * service failure and to ensure that when the JVM is shutting down all the services are stopped.
120 *
121 * @author Luke Sandberg
122 * @since 14.0
123 */
124@Beta
125public final class ServiceManager {
126  private static final Logger logger = Logger.getLogger(ServiceManager.class.getName());
127  private static final Callback<Listener> HEALTHY_CALLBACK = new Callback<Listener>("healthy()") {
128    @Override void call(Listener listener) {
129      listener.healthy();
130    }
131  };
132  private static final Callback<Listener> STOPPED_CALLBACK = new Callback<Listener>("stopped()") {
133    @Override void call(Listener listener) {
134      listener.stopped();
135    }
136  };
137
138  /**
139   * A listener for the aggregate state changes of the services that are under management. Users
140   * that need to listen to more fine-grained events (such as when each particular {@linkplain
141   * Service service} starts, or terminates), should attach {@linkplain Service.Listener service
142   * listeners} to each individual service.
143   * 
144   * @author Luke Sandberg
145   * @since 15.0 (present as an interface in 14.0)
146   */
147  @Beta  // Should come out of Beta when ServiceManager does
148  public abstract static class Listener {
149    /** 
150     * Called when the service initially becomes healthy.
151     * 
152     * <p>This will be called at most once after all the services have entered the 
153     * {@linkplain State#RUNNING running} state. If any services fail during start up or 
154     * {@linkplain State#FAILED fail}/{@linkplain State#TERMINATED terminate} before all other 
155     * services have started {@linkplain State#RUNNING running} then this method will not be called.
156     */
157    public void healthy() {}
158    
159    /** 
160     * Called when the all of the component services have reached a terminal state, either 
161     * {@linkplain State#TERMINATED terminated} or {@linkplain State#FAILED failed}.
162     */
163    public void stopped() {}
164    
165    /** 
166     * Called when a component service has {@linkplain State#FAILED failed}.
167     * 
168     * @param service The service that failed.
169     */
170    public void failure(Service service) {}
171  }
172  
173  /**
174   * An encapsulation of all of the state that is accessed by the {@linkplain ServiceListener 
175   * service listeners}.  This is extracted into its own object so that {@link ServiceListener} 
176   * could be made {@code static} and its instances can be safely constructed and added in the 
177   * {@link ServiceManager} constructor without having to close over the partially constructed 
178   * {@link ServiceManager} instance (i.e. avoid leaking a pointer to {@code this}).
179   */
180  private final ServiceManagerState state;
181  private final ImmutableList<Service> services;
182  
183  /**
184   * Constructs a new instance for managing the given services.
185   * 
186   * @param services The services to manage
187   * 
188   * @throws IllegalArgumentException if not all services are {@linkplain State#NEW new} or if there
189   * are any duplicate services.
190   */
191  public ServiceManager(Iterable<? extends Service> services) {
192    ImmutableList<Service> copy = ImmutableList.copyOf(services);
193    if (copy.isEmpty()) {
194      // Having no services causes the manager to behave strangely. Notably, listeners are never 
195      // fired.  To avoid this we substitute a placeholder service.
196      logger.log(Level.WARNING, 
197          "ServiceManager configured with no services.  Is your application configured properly?", 
198          new EmptyServiceManagerWarning());
199      copy = ImmutableList.<Service>of(new NoOpService());
200    }
201    this.state = new ServiceManagerState(copy);
202    this.services = copy;
203    WeakReference<ServiceManagerState> stateReference = 
204        new WeakReference<ServiceManagerState>(state);
205    Executor sameThreadExecutor = MoreExecutors.sameThreadExecutor();
206    for (Service service : copy) {
207      // We give each listener its own SynchronizedExecutor to ensure that the state transitions
208      // are run in the same order that they occur.  The Service.Listener api guarantees us only
209      // that the transitions are submitted to the executor in the same order that they occur, so by
210      // synchronizing the executions of each listeners callbacks we can ensure that the entire
211      // execution of the listener occurs in the same order as the transitions themselves.
212      //
213      // This is necessary to prevent transitions being played back in the wrong order due to thread
214      // races to acquire the monitor in ServiceManagerState.
215      service.addListener(new ServiceListener(service, stateReference), sameThreadExecutor);
216      // We check the state after adding the listener as a way to ensure that our listener was added
217      // to a NEW service.
218      checkArgument(service.state() == NEW, "Can only manage NEW services, %s", service);
219    }
220    // We have installed all of our listeners and after this point any state transition should be
221    // correct.
222    this.state.markReady();
223  }
224
225  /**
226   * Registers a {@link Listener} to be {@linkplain Executor#execute executed} on the given 
227   * executor. The listener will not have previous state changes replayed, so it is 
228   * suggested that listeners are added before any of the managed services are 
229   * {@linkplain Service#startAsync started}.
230   *
231   * <p>{@code addListener} guarantees execution ordering across calls to a given listener but not
232   * across calls to multiple listeners. Specifically, a given listener will have its callbacks
233   * invoked in the same order as the underlying service enters those states. Additionally, at most
234   * one of the listener's callbacks will execute at once. However, multiple listeners' callbacks
235   * may execute concurrently, and listeners may execute in an order different from the one in which
236   * they were registered.
237   *
238   * <p>RuntimeExceptions thrown by a listener will be caught and logged. Any exception thrown 
239   * during {@code Executor.execute} (e.g., a {@code RejectedExecutionException}) will be caught and
240   * logged.
241   * 
242   * <p> For fast, lightweight listeners that would be safe to execute in any thread, consider 
243   * calling {@link #addListener(Listener)}.
244   * 
245   * @param listener the listener to run when the manager changes state
246   * @param executor the executor in which the listeners callback methods will be run.
247   */
248  public void addListener(Listener listener, Executor executor) {
249    state.addListener(listener, executor);
250  }
251
252  /**
253   * Registers a {@link Listener} to be run when this {@link ServiceManager} changes state. The 
254   * listener will not have previous state changes replayed, so it is suggested that listeners are 
255   * added before any of the managed services are {@linkplain Service#startAsync started}.
256   *
257   * <p>{@code addListener} guarantees execution ordering across calls to a given listener but not
258   * across calls to multiple listeners. Specifically, a given listener will have its callbacks
259   * invoked in the same order as the underlying service enters those states. Additionally, at most
260   * one of the listener's callbacks will execute at once. However, multiple listeners' callbacks
261   * may execute concurrently, and listeners may execute in an order different from the one in which
262   * they were registered.
263   *
264   * <p>RuntimeExceptions thrown by a listener will be caught and logged.
265   * 
266   * @param listener the listener to run when the manager changes state
267   */
268  public void addListener(Listener listener) {
269    state.addListener(listener, MoreExecutors.sameThreadExecutor());
270  }
271
272  /**
273   * Initiates service {@linkplain Service#startAsync startup} on all the services being managed.  
274   * It is only valid to call this method if all of the services are {@linkplain State#NEW new}.
275   * 
276   * @return this
277   * @throws IllegalStateException if any of the Services are not {@link State#NEW new} when the 
278   *     method is called.
279   */
280  public ServiceManager startAsync() {
281    for (Service service : services) {
282      State state = service.state();
283      checkState(state == NEW, "Service %s is %s, cannot start it.", service, state);
284    }
285    for (Service service : services) {
286      try {
287        service.startAsync();
288      } catch (IllegalStateException e) {
289        // This can happen if the service has already been started or stopped (e.g. by another 
290        // service or listener). Our contract says it is safe to call this method if
291        // all services were NEW when it was called, and this has already been verified above, so we
292        // don't propagate the exception.
293        logger.log(Level.WARNING, "Unable to start Service " + service, e);
294      }
295    }
296    return this;
297  }
298  
299  /**
300   * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy}.  The manager
301   * will become healthy after all the component services have reached the {@linkplain State#RUNNING
302   * running} state.  
303   * 
304   * @throws IllegalStateException if the service manager reaches a state from which it cannot 
305   *     become {@linkplain #isHealthy() healthy}.
306   */
307  public void awaitHealthy() {
308    state.awaitHealthy();
309  }
310  
311  /**
312   * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy} for no more 
313   * than the given time.  The manager will become healthy after all the component services have 
314   * reached the {@linkplain State#RUNNING running} state. 
315   *
316   * @param timeout the maximum time to wait
317   * @param unit the time unit of the timeout argument
318   * @throws TimeoutException if not all of the services have finished starting within the deadline
319   * @throws IllegalStateException if the service manager reaches a state from which it cannot 
320   *     become {@linkplain #isHealthy() healthy}.
321   */
322  public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
323    state.awaitHealthy(timeout, unit);
324  }
325
326  /**
327   * Initiates service {@linkplain Service#stopAsync shutdown} if necessary on all the services
328   * being managed. 
329   *    
330   * @return this
331   */
332  public ServiceManager stopAsync() {
333    for (Service service : services) {
334      service.stopAsync();
335    }
336    return this;
337  }
338 
339  /**
340   * Waits for the all the services to reach a terminal state. After this method returns all
341   * services will either be {@linkplain Service.State#TERMINATED terminated} or {@linkplain
342   * Service.State#FAILED failed}.
343   */
344  public void awaitStopped() {
345    state.awaitStopped();
346  }
347  
348  /**
349   * Waits for the all the services to reach a terminal state for no more than the given time. After
350   * this method returns all services will either be {@linkplain Service.State#TERMINATED
351   * terminated} or {@linkplain Service.State#FAILED failed}.
352   *
353   * @param timeout the maximum time to wait
354   * @param unit the time unit of the timeout argument
355   * @throws TimeoutException if not all of the services have stopped within the deadline
356   */
357  public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
358    state.awaitStopped(timeout, unit);
359  }
360  
361  /**
362   * Returns true if all services are currently in the {@linkplain State#RUNNING running} state.  
363   * 
364   * <p>Users who want more detailed information should use the {@link #servicesByState} method to 
365   * get detailed information about which services are not running.
366   */
367  public boolean isHealthy() {
368    for (Service service : services) {
369      if (!service.isRunning()) {
370        return false;
371      }
372    }
373    return true;
374  }
375  
376  /**
377   * Provides a snapshot of the current state of all the services under management.
378   *
379   * <p>N.B. This snapshot is guaranteed to be consistent, i.e. the set of states returned will
380   * correspond to a point in time view of the services.
381   */
382  public ImmutableMultimap<State, Service> servicesByState() {
383    return state.servicesByState();
384  }
385  
386  /**
387   * Returns the service load times. This value will only return startup times for services that
388   * have finished starting.
389   *
390   * @return Map of services and their corresponding startup time in millis, the map entries will be
391   *     ordered by startup time.
392   */
393  public ImmutableMap<Service, Long> startupTimes() {
394    return state.startupTimes();
395  }
396  
397  @Override public String toString() {
398    return Objects.toStringHelper(ServiceManager.class)
399        .add("services", Collections2.filter(services, not(instanceOf(NoOpService.class))))
400        .toString();
401  }
402  
403  /**
404   * An encapsulation of all the mutable state of the {@link ServiceManager} that needs to be 
405   * accessed by instances of {@link ServiceListener}.
406   */
407  private static final class ServiceManagerState {
408    final Monitor monitor = new Monitor();
409
410    @GuardedBy("monitor")
411    final SetMultimap<State, Service> servicesByState =
412        Multimaps.newSetMultimap(new EnumMap<State, Collection<Service>>(State.class),
413            new Supplier<Set<Service>>() {
414              @Override public Set<Service> get() {
415                return Sets.newLinkedHashSet();
416              }
417            });
418
419    @GuardedBy("monitor")
420    final Multiset<State> states = servicesByState.keys();
421
422    @GuardedBy("monitor")
423    final Map<Service, Stopwatch> startupTimers = Maps.newIdentityHashMap();
424
425    /**
426     * These two booleans are used to mark the state as ready to start.
427     * {@link #ready}: is set by {@link #markReady} to indicate that all listeners have been 
428     *     correctly installed
429     * {@link #transitioned}: is set by {@link #transitionService} to indicate that some transition 
430     *     has been performed.
431     * 
432     * <p>Together, they allow us to enforce that all services have their listeners installed prior
433     * to any service performing a transition, then we can fail in the ServiceManager constructor
434     * rather than in a Service.Listener callback.
435     */
436    @GuardedBy("monitor")
437    boolean ready;
438    
439    @GuardedBy("monitor")
440    boolean transitioned;
441    
442    final int numberOfServices;
443
444    /**
445     * Controls how long to wait for all the services to either become healthy or reach a
446     * state from which it is guaranteed that it can never become healthy.
447     */
448    final Monitor.Guard awaitHealthGuard = new Monitor.Guard(monitor) {
449      @Override public boolean isSatisfied() {
450        // All services have started or some service has terminated/failed.
451        return states.count(RUNNING) == numberOfServices
452            || states.contains(STOPPING) 
453            || states.contains(TERMINATED) 
454            || states.contains(FAILED);
455      }
456    };
457
458    /**
459     * Controls how long to wait for all services to reach a terminal state.
460     */
461    final Monitor.Guard stoppedGuard = new Monitor.Guard(monitor) {
462      @Override public boolean isSatisfied() {
463        return states.count(TERMINATED) + states.count(FAILED) == numberOfServices;
464      }
465    };
466
467    /** The listeners to notify during a state transition. */
468    @GuardedBy("monitor")
469    final List<ListenerCallQueue<Listener>> listeners = 
470        Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());
471
472    /**
473     * It is implicitly assumed that all the services are NEW and that they will all remain NEW 
474     * until all the Listeners are installed and {@link #markReady()} is called.  It is our caller's
475     * responsibility to only call {@link #markReady()} if all services were new at the time this
476     * method was called and when all the listeners were installed.
477     */
478    ServiceManagerState(ImmutableCollection<Service> services) {
479      this.numberOfServices = services.size();
480      servicesByState.putAll(NEW, services);
481      for (Service service : services) {
482        startupTimers.put(service, Stopwatch.createUnstarted());
483      }
484    }
485
486    /**
487     * Marks the {@link State} as ready to receive transitions. Returns true if no transitions have
488     * been observed yet.
489     */
490    void markReady() {
491      monitor.enter();
492      try {
493        if (!transitioned) {
494          // nothing has transitioned since construction, good.
495          ready = true;
496        } else {
497          // This should be an extremely rare race condition.
498          List<Service> servicesInBadStates = Lists.newArrayList();
499          for (Service service : servicesByState().values()) {
500            if (service.state() != NEW) {
501              servicesInBadStates.add(service);
502            }
503          }
504          throw new IllegalArgumentException("Services started transitioning asynchronously before "
505              + "the ServiceManager was constructed: " + servicesInBadStates);
506        }
507      } finally {
508        monitor.leave();
509      }
510    }
511
512    void addListener(Listener listener, Executor executor) {
513      checkNotNull(listener, "listener");
514      checkNotNull(executor, "executor");
515      monitor.enter();
516      try {
517        // no point in adding a listener that will never be called
518        if (!stoppedGuard.isSatisfied()) {
519          listeners.add(new ListenerCallQueue<Listener>(listener, executor));
520        }
521      } finally {
522        monitor.leave();
523      }
524    }
525    
526    void awaitHealthy() {
527      monitor.enterWhenUninterruptibly(awaitHealthGuard);
528      try {
529        checkHealthy();
530      } finally {
531        monitor.leave();
532      }
533    }
534
535    void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
536      monitor.enter();
537      try {
538        if (!monitor.waitForUninterruptibly(awaitHealthGuard, timeout, unit)) {
539          throw new TimeoutException("Timeout waiting for the services to become healthy. The "
540              + "following services have not started: " 
541              + Multimaps.filterKeys(servicesByState, in(ImmutableSet.of(NEW, STARTING))));
542        }
543        checkHealthy();
544      } finally {
545        monitor.leave();
546      }
547    }
548    
549    void awaitStopped() {
550      monitor.enterWhenUninterruptibly(stoppedGuard);
551      monitor.leave();
552    }
553    
554    void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
555      monitor.enter();
556      try {
557        if (!monitor.waitForUninterruptibly(stoppedGuard, timeout, unit)) {
558          throw new TimeoutException("Timeout waiting for the services to stop. The following "
559              + "services have not stopped: " 
560              + Multimaps.filterKeys(servicesByState, 
561                  not(in(ImmutableSet.of(TERMINATED, FAILED))))); 
562        }
563      } finally {
564        monitor.leave();
565      }
566    }
567
568    ImmutableMultimap<State, Service> servicesByState() {
569      ImmutableSetMultimap.Builder<State, Service> builder = ImmutableSetMultimap.builder();
570      monitor.enter();
571      try {
572        for (Entry<State, Service> entry : servicesByState.entries()) {
573          if (!(entry.getValue() instanceof NoOpService)) {
574            builder.put(entry.getKey(), entry.getValue());
575          }
576        }
577      } finally {
578        monitor.leave();
579      }
580      return builder.build();
581    }
582    
583    ImmutableMap<Service, Long> startupTimes() {
584      List<Entry<Service, Long>> loadTimes;
585      monitor.enter();
586      try {
587        loadTimes = Lists.newArrayListWithCapacity(
588            states.size() - states.count(NEW) + states.count(STARTING));
589        for (Entry<Service, Stopwatch> entry : startupTimers.entrySet()) {
590          Service service = entry.getKey();
591          Stopwatch stopWatch = entry.getValue();
592          // N.B. we check the service state in the multimap rather than via Service.state() because
593          // the multimap is guaranteed to be in sync with our timers while the Service.state() is
594          // not.  Due to happens-before ness of the monitor this 'weirdness' will not be observable
595          // by our caller.
596          if (!stopWatch.isRunning() && !servicesByState.containsEntry(NEW, service) 
597              && !(service instanceof NoOpService)) {
598            loadTimes.add(Maps.immutableEntry(service, stopWatch.elapsed(MILLISECONDS)));
599          }
600        }
601      } finally {
602        monitor.leave();
603      }
604      Collections.sort(loadTimes, Ordering.<Long>natural()
605          .onResultOf(new Function<Entry<Service, Long>, Long>() {
606            @Override public Long apply(Map.Entry<Service, Long> input) {
607              return input.getValue();
608            }
609          }));
610      ImmutableMap.Builder<Service, Long> builder = ImmutableMap.builder();
611      for (Entry<Service, Long> entry : loadTimes) {
612        builder.put(entry);
613      }
614      return builder.build();
615    }
616    
617    /** 
618     * Updates the state with the given service transition.
619     * 
620     * <p>This method performs the main logic of ServiceManager in the following steps.
621     * <ol>
622     *      <li>Update the {@link #servicesByState()}
623     *      <li>Update the {@link #startupTimers}
624     *      <li>Based on the new state queue listeners to run
625     *      <li>Run the listeners (outside of the lock)
626     * </ol>
627     */
628    void transitionService(final Service service, State from, State to) {
629      checkNotNull(service);
630      checkArgument(from != to);
631      monitor.enter();
632      try {
633        transitioned = true;
634        if (!ready) {
635          return;
636        }
637        // Update state.
638        checkState(servicesByState.remove(from, service),
639            "Service %s not at the expected location in the state map %s", service, from);
640        checkState(servicesByState.put(to, service),
641            "Service %s in the state map unexpectedly at %s", service, to);
642        // Update the timer
643        Stopwatch stopwatch = startupTimers.get(service);
644        if (from == NEW) {
645          stopwatch.start();
646        }
647        if (to.compareTo(RUNNING) >= 0 && stopwatch.isRunning()) {
648          // N.B. if we miss the STARTING event then we will never record a startup time.
649          stopwatch.stop();
650          if (!(service instanceof NoOpService)) {
651            logger.log(Level.FINE, "Started {0} in {1}.", new Object[] {service, stopwatch});
652          }
653        }
654        // Queue our listeners
655        
656        // Did a service fail?
657        if (to == FAILED) {
658          fireFailedListeners(service);
659        }
660        
661        if (states.count(RUNNING) == numberOfServices) {
662          // This means that the manager is currently healthy. N.B. If other threads call isHealthy 
663          // they are not guaranteed to get 'true', because any service could fail right now. 
664          fireHealthyListeners();
665        } else if (states.count(TERMINATED) + states.count(FAILED) == numberOfServices) {
666          fireStoppedListeners();
667        }
668      } finally {
669        monitor.leave();
670        // Run our executors outside of the lock
671        executeListeners();
672      }
673    }
674
675    @GuardedBy("monitor")
676    void fireStoppedListeners() {
677      STOPPED_CALLBACK.enqueueOn(listeners);
678    }
679
680    @GuardedBy("monitor")
681    void fireHealthyListeners() {
682      HEALTHY_CALLBACK.enqueueOn(listeners);
683    }
684
685    @GuardedBy("monitor")
686    void fireFailedListeners(final Service service) {
687      new Callback<Listener>("failed({service=" + service + "})") {
688        @Override void call(Listener listener) {
689          listener.failure(service);
690        }
691      }.enqueueOn(listeners);
692    }
693
694    /** Attempts to execute all the listeners in {@link #listeners}. */
695    void executeListeners() {
696      checkState(!monitor.isOccupiedByCurrentThread(), 
697          "It is incorrect to execute listeners with the monitor held.");
698      // iterate by index to avoid concurrent modification exceptions
699      for (int i = 0; i < listeners.size(); i++) {
700        listeners.get(i).execute();
701      }
702    }
703
704    @GuardedBy("monitor")
705    void checkHealthy() {
706      if (states.count(RUNNING) != numberOfServices) {
707        throw new IllegalStateException("Expected to be healthy after starting. "
708            + "The following services are not running: " + 
709            Multimaps.filterKeys(servicesByState, not(equalTo(RUNNING))));
710      }
711    }
712  }
713
714  /**
715   * A {@link Service} that wraps another service and times how long it takes for it to start and
716   * also calls the {@link ServiceManagerState#transitionService(Service, State, State)},
717   * to record the state transitions.
718   */
719  private static final class ServiceListener extends Service.Listener {
720    final Service service;
721    // We store the state in a weak reference to ensure that if something went wrong while 
722    // constructing the ServiceManager we don't pointlessly keep updating the state.
723    final WeakReference<ServiceManagerState> state;
724    
725    ServiceListener(Service service, WeakReference<ServiceManagerState> state) {
726      this.service = service;
727      this.state = state;
728    }
729    
730    @Override public void starting() {
731      ServiceManagerState state = this.state.get();
732      if (state != null) {
733        state.transitionService(service, NEW, STARTING);
734        if (!(service instanceof NoOpService)) {
735          logger.log(Level.FINE, "Starting {0}.", service);
736        }
737      }
738    }
739
740    @Override public void running() {
741      ServiceManagerState state = this.state.get();
742      if (state != null) {
743        state.transitionService(service, STARTING, RUNNING);
744      }
745    }
746    
747    @Override public void stopping(State from) {
748      ServiceManagerState state = this.state.get();
749      if (state != null) {
750        state.transitionService(service, from, STOPPING);
751      }
752    }
753    
754    @Override public void terminated(State from) {
755      ServiceManagerState state = this.state.get();
756      if (state != null) {
757        if (!(service instanceof NoOpService)) {
758          logger.log(Level.FINE, "Service {0} has terminated. Previous state was: {1}", 
759              new Object[] {service, from});
760        }
761        state.transitionService(service, from, TERMINATED);
762      }
763    }
764    
765    @Override public void failed(State from, Throwable failure) {
766      ServiceManagerState state = this.state.get();
767      if (state != null) {
768        // Log before the transition, so that if the process exits in response to server failure,
769        // there is a higher likelihood that the cause will be in the logs.
770        if (!(service instanceof NoOpService)) {
771          logger.log(Level.SEVERE, "Service " + service + " has failed in the " + from + " state.",
772              failure);
773        }
774        state.transitionService(service, from, FAILED);
775      }
776    }
777  }
778  
779  /**
780   * A {@link Service} instance that does nothing.  This is only useful as a placeholder to
781   * ensure that the {@link ServiceManager} functions properly even when it is managing no services.
782   * 
783   * <p>The use of this class is considered an implementation detail of ServiceManager and as such
784   * it is excluded from {@link #servicesByState}, {@link #startupTimes}, {@link #toString} and all
785   * logging statements.
786   */
787  private static final class NoOpService extends AbstractService {
788    @Override protected void doStart() { notifyStarted(); }
789    @Override protected void doStop() { notifyStopped(); }
790  }
791  
792  /** This is never thrown but only used for logging. */
793  private static final class EmptyServiceManagerWarning extends Throwable {}
794}