001/*
002 * Copyright (C) 2012 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016package com.google.common.util.concurrent;
017
018import static com.google.common.base.Preconditions.checkArgument;
019import static com.google.common.base.Preconditions.checkNotNull;
020import static com.google.common.base.Preconditions.checkState;
021import static java.util.concurrent.TimeUnit.MILLISECONDS;
022
023import com.google.common.annotations.Beta;
024import com.google.common.base.Function;
025import com.google.common.base.Objects;
026import com.google.common.base.Stopwatch;
027import com.google.common.collect.ImmutableList;
028import com.google.common.collect.ImmutableMap;
029import com.google.common.collect.ImmutableMultimap;
030import com.google.common.collect.Lists;
031import com.google.common.collect.Maps;
032import com.google.common.collect.Ordering;
033import com.google.common.collect.Queues;
034import com.google.common.util.concurrent.Service.State;
035
036import java.util.List;
037import java.util.Map;
038import java.util.Map.Entry;
039import java.util.Queue;
040import java.util.Set;
041import java.util.concurrent.Executor;
042import java.util.concurrent.TimeUnit;
043import java.util.concurrent.TimeoutException;
044import java.util.logging.Level;
045import java.util.logging.Logger;
046
047import javax.annotation.concurrent.GuardedBy;
048import javax.annotation.concurrent.Immutable;
049import javax.inject.Inject;
050import javax.inject.Singleton;
051
052/**
053 * A manager for monitoring and controlling a set of {@link Service services}. This class provides
054 * methods for {@linkplain #startAsync() starting}, {@linkplain #stopAsync() stopping} and
055 * {@linkplain #servicesByState inspecting} a collection of {@linkplain Service services}.
056 * Additionally, users can monitor state transitions with the {@link Listener listener} mechanism.
057 *
058 * <p>While it is recommended that service lifecycles be managed via this class, state transitions
059 * initiated via other mechanisms do not impact the correctness of its methods. For example, if the
060 * services are started by some mechanism besides {@link #startAsync}, the listeners will be invoked
061 * when appropriate and {@link #awaitHealthy} will still work as expected.
062 *
063 * <p>Here is a simple example of how to use a {@link ServiceManager} to start a server.
064 * <pre>   {@code
065 * class Server {
066 *   public static void main(String[] args) {
067 *     Set<Service> services = ...;
068 *     ServiceManager manager = new ServiceManager(services);
069 *     manager.addListener(new Listener() {
070 *         public void stopped() {}
071 *         public void healthy() {
072 *           // Services have been initialized and are healthy, start accepting requests...
073 *         }
074 *         public void failure(Service service) {
075 *           // Something failed, at this point we could log it, notify a load balancer, or take
076 *           // some other action.  For now we will just exit.
077 *           System.exit(1);
078 *         }
079 *       },
080 *       MoreExecutors.sameThreadExecutor());
081 *
082 *     Runtime.getRuntime().addShutdownHook(new Thread() {
083 *       public void run() {
084 *         // Give the services 5 seconds to stop to ensure that we are responsive to shutdown 
085 *         // requests.
086 *         try {
087 *           manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
088 *         } catch (TimeoutException timeout) {
089 *           // stopping timed out
090 *         }
091 *       }
092 *     });
093 *     manager.startAsync();  // start all the services asynchronously
094 *   }
095 * }}</pre>
096 *
097 * This class uses the ServiceManager's methods to start all of its services, to respond to service
098 * failure and to ensure that when the JVM is shutting down all the services are stopped.
099 *
100 * @author Luke Sandberg
101 * @since 14.0
102 */
103@Beta
104@Singleton
105public final class ServiceManager {
106  private static final Logger logger = Logger.getLogger(ServiceManager.class.getName());
107  
108  /**
109   * A listener for the aggregate state changes of the services that are under management. Users
110   * that need to listen to more fine-grained events (such as when each particular
111   * {@link Service service} starts, or terminates), should attach {@link Service.Listener service
112   * listeners} to each individual service.
113   * 
114   * @author Luke Sandberg
115   * @since 14.0
116   */
117  @Beta  // Should come out of Beta when ServiceManager does
118  public static interface Listener {
119    /** 
120     * Called when the service initially becomes healthy.
121     * 
122     * <p>This will be called at most once after all the services have entered the 
123     * {@linkplain State#RUNNING running} state. If any services fail during start up or 
124     * {@linkplain State#FAILED fail}/{@linkplain State#TERMINATED terminate} before all other 
125     * services have started {@linkplain State#RUNNING running} then this method will not be called.
126     */
127    void healthy();
128    
129    /** 
130     * Called when the all of the component services have reached a terminal state, either 
131     * {@linkplain State#TERMINATED terminated} or {@linkplain State#FAILED failed}.
132     */
133    void stopped();
134    
135    /** 
136     * Called when a component service has {@linkplain State#FAILED failed}.
137     * 
138     * @param service The service that failed.
139     */
140    void failure(Service service);
141  }
142  
143  /**
144   * An encapsulation of all of the state that is accessed by the {@linkplain ServiceListener 
145   * service listeners}.  This is extracted into its own object so that {@link ServiceListener} 
146   * could be made {@code static} and its instances can be safely constructed and added in the 
147   * {@link ServiceManager} constructor without having to close over the partially constructed 
148   * {@link ServiceManager} instance (i.e. avoid leaking a pointer to {@code this}).
149   */
150  private final ServiceManagerState state;
151  private final ImmutableMap<Service, ServiceListener> services;
152  
153  /**
154   * Constructs a new instance for managing the given services.
155   * 
156   * @param services The services to manage
157   * 
158   * @throws IllegalArgumentException if not all services are {@link State#NEW new} or if there are
159   *     any duplicate services.
160   */
161  public ServiceManager(Iterable<? extends Service> services) {
162    ImmutableList<Service> copy = ImmutableList.copyOf(services);
163    this.state = new ServiceManagerState(copy.size());
164    ImmutableMap.Builder<Service, ServiceListener> builder = ImmutableMap.builder();
165    Executor executor = MoreExecutors.sameThreadExecutor();
166    for (Service service : copy) {
167      ServiceListener listener = new ServiceListener(service, state);
168      service.addListener(listener, executor);
169      // We check the state after adding the listener as a way to ensure that our listener was added
170      // to a NEW service.
171      checkArgument(service.state() == State.NEW, "Can only manage NEW services, %s", service);
172      builder.put(service, listener);
173    }
174    this.services = builder.build();
175  }
176  
177  /**
178   * Constructs a new instance for managing the given services. This constructor is provided so that
179   * dependency injection frameworks can inject instances of {@link ServiceManager}.
180   * 
181   * @param services The services to manage
182   * 
183   * @throws IllegalStateException if not all services are {@link State#NEW new}.
184   */
185  @Inject ServiceManager(Set<Service> services) {
186    this((Iterable<Service>) services);
187  }
188  
189  /**
190   * Registers a {@link Listener} to be {@linkplain Executor#execute executed} on the given 
191   * executor. The listener will not have previous state changes replayed, so it is 
192   * suggested that listeners are added before any of the managed services are 
193   * {@linkplain Service#start started}.
194   *
195   * <p>There is no guaranteed ordering of execution of listeners, but any listener added through 
196   * this method is guaranteed to be called whenever there is a state change.
197   *
198   * <p>Exceptions thrown by a listener will be propagated up to the executor. Any exception thrown 
199   * during {@code Executor.execute} (e.g., a {@code RejectedExecutionException} or an exception 
200   * thrown by {@linkplain MoreExecutors#sameThreadExecutor inline execution}) will be caught and
201   * logged.
202   * 
203   * @param listener the listener to run when the manager changes state
204   * @param executor the executor in which the the listeners callback methods will be run. For fast,
205   *     lightweight listeners that would be safe to execute in any thread, consider 
206   *     {@link MoreExecutors#sameThreadExecutor}.
207   */
208  public void addListener(Listener listener, Executor executor) {
209    state.addListener(listener, executor);
210  }
211
212  /**
213   * Initiates service {@linkplain Service#start startup} on all the services being managed.  It is
214   * only valid to call this method if all of the services are {@linkplain State#NEW new}.
215   * 
216   * @return this
217   * @throws IllegalStateException if any of the Services are not {@link State#NEW new} when the 
218   *     method is called, {@link State#TERMINATED terminated} or {@link State#FAILED failed}.
219   */
220  public ServiceManager startAsync() {
221    for (Map.Entry<Service, ServiceListener> entry : services.entrySet()) {
222      Service service = entry.getKey();
223      State state = service.state();
224      checkState(state == State.NEW, "Service %s is %s, cannot start it.", service, 
225          state);
226    }
227    for (ServiceListener service : services.values()) {
228      service.start();
229    }
230    return this;
231  }
232  
233  /**
234   * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy}.  The manager
235   * will become healthy after all the component services have reached the {@linkplain State#RUNNING
236   * running} state.  
237   * 
238   * @throws IllegalStateException if the service manager reaches a state from which it cannot 
239   *     become {@linkplain #isHealthy() healthy}.
240   */
241  public void awaitHealthy() {
242    state.awaitHealthy();
243    checkState(isHealthy(), "Expected to be healthy after starting");
244  }
245  
246  /**
247   * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy} for no more 
248   * than the given time.  The manager will become healthy after all the component services have 
249   * reached the {@linkplain State#RUNNING running} state. 
250   *
251   * @param timeout the maximum time to wait
252   * @param unit the time unit of the timeout argument
253   * @throws TimeoutException if not all of the services have finished starting within the deadline
254   * @throws IllegalStateException if the service manager reaches a state from which it cannot 
255   *     become {@linkplain #isHealthy() healthy}.
256   */
257  public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
258    if (!state.awaitHealthy(timeout, unit)) {
259      // It would be nice to tell the caller who we are still waiting on, and this information is 
260      // likely to be in servicesByState(), however due to race conditions we can't actually tell 
261      // which services are holding up healthiness. The current set of NEW or STARTING services is
262      // likely to point out the culprit, but may not.  If we really wanted to solve this we could
263      // change state to track exactly which services have started and then we could accurately 
264      // report on this. But it is only for logging so we likely don't care.
265      throw new TimeoutException("Timeout waiting for the services to become healthy.");
266    }
267    checkState(isHealthy(), "Expected to be healthy after starting");
268  }
269
270  /**
271   * Initiates service {@linkplain Service#stop shutdown} if necessary on all the services being 
272   * managed. 
273   *    
274   * @return this
275   */
276  public ServiceManager stopAsync() {
277    for (Service service : services.keySet()) {
278      service.stop();
279    }
280    return this;
281  }
282 
283  /**
284   * Waits for the all the services to reach a terminal state. After this method returns all
285   * services will either be {@link Service.State#TERMINATED terminated} or 
286   * {@link Service.State#FAILED failed}
287   */
288  public void awaitStopped() {
289    state.awaitStopped();
290  }
291  
292  /**
293   * Waits for the all the services to reach a terminal state for no more than the given time. After
294   * this method returns all services will either be {@link Service.State#TERMINATED terminated} or 
295   * {@link Service.State#FAILED failed}
296   *
297   * @param timeout the maximum time to wait
298   * @param unit the time unit of the timeout argument
299   * @throws TimeoutException if not all of the services have stopped within the deadline
300   */
301  public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
302    if (!state.awaitStopped(timeout, unit)) {
303      throw new TimeoutException("Timeout waiting for the services to stop.");
304    }
305  }
306  
307  /**
308   * Returns true if all services are currently in the {@linkplain State#RUNNING running} state.  
309   * 
310   * <p>Users who want more detailed information should use the {@link #servicesByState} method to 
311   * get detailed information about which services are not running.
312   */
313  public boolean isHealthy() {
314    for (Service service : services.keySet()) {
315      if (!service.isRunning()) {
316        return false;
317      }
318    }
319    return true;
320  }
321  
322  /**
323   * Provides a snapshot of the current state of all the services under management.
324   * 
325   * <p>N.B. This snapshot it not guaranteed to be consistent, i.e. the set of states returned may
326   * not correspond to any particular point in time view of the services. 
327   */
328  public ImmutableMultimap<State, Service> servicesByState() {
329    ImmutableMultimap.Builder<State, Service> builder = ImmutableMultimap.builder();
330    for (Service service : services.keySet()) {
331      builder.put(service.state(), service);
332    }
333    return builder.build();
334  }
335  
336  /**
337   * Returns the service load times. This value will only return startup times for services that
338   * have finished starting.
339   *
340   * @return Map of services and their corresponding startup time in millis, the map entries will be
341   *     ordered by startup time.
342   */
343  public ImmutableMap<Service, Long> startupTimes() { 
344    Map<Service, Long> loadTimeMap = Maps.newHashMapWithExpectedSize(services.size());
345    for (Map.Entry<Service, ServiceListener> entry : services.entrySet()) {
346      State state = entry.getKey().state();
347      if (state != State.NEW && state != State.STARTING) {
348        loadTimeMap.put(entry.getKey(), entry.getValue().startupTimeMillis());
349      }
350    }
351    List<Entry<Service, Long>> servicesByStartTime = Ordering.<Long>natural()
352        .onResultOf(new Function<Map.Entry<Service, Long>, Long>() {
353          @Override public Long apply(Map.Entry<Service, Long> input) {
354            return input.getValue();
355          }
356        })
357        .sortedCopy(loadTimeMap.entrySet());
358    ImmutableMap.Builder<Service, Long> builder = ImmutableMap.builder();
359    for (Map.Entry<Service, Long> entry : servicesByStartTime) {
360      builder.put(entry);
361    }
362    return builder.build();
363  }
364  
365  @Override public String toString() {
366    return Objects.toStringHelper(ServiceManager.class)
367        .add("services", services.keySet())
368        .toString();
369  }
370  
371  /**
372   * An encapsulation of all the mutable state of the {@link ServiceManager} that needs to be 
373   * accessed by instances of {@link ServiceListener}.
374   */
375  private static final class ServiceManagerState {
376    final Monitor monitor = new Monitor();
377    final int numberOfServices;
378    /** The number of services that have not finished starting up. */
379    @GuardedBy("monitor")
380    int unstartedServices;
381    /** The number of services that have not reached a terminal state. */
382    @GuardedBy("monitor")
383    int unstoppedServices;
384    /** 
385     * Controls how long to wait for all the service manager to either become healthy or reach a 
386     * state where it is guaranteed that it can never become healthy.
387     */
388    final Monitor.Guard awaitHealthGuard = new Monitor.Guard(monitor) {
389      @Override public boolean isSatisfied() {
390        // All services have started or some service has terminated/failed.
391        return unstartedServices == 0 || unstoppedServices != numberOfServices;
392      }
393    };
394    /**
395     * Controls how long to wait for all services to reach a terminal state.
396     */
397    final Monitor.Guard stoppedGuard = new Monitor.Guard(monitor) {
398      @Override public boolean isSatisfied() {
399        return unstoppedServices == 0;
400      }
401    };
402    /** The listeners to notify during a state transition. */
403    @GuardedBy("monitor")
404    final List<ListenerExecutorPair> listeners = Lists.newArrayList();
405    /**
406     * The queue of listeners that are waiting to be executed.
407     *
408     * <p>Enqueue operations should be protected by {@link #monitor} while dequeue operations should
409     * be protected by the implicit lock on this object. This is to ensure that listeners are
410     * executed in the correct order and also so that a listener can not hold the {@link #monitor} 
411     * for an arbitrary amount of time (listeners can only block other listeners, not internal state
412     * transitions). We use a concurrent queue implementation so that enqueues can be executed 
413     * concurrently with dequeues.
414     */
415    @GuardedBy("queuedListeners")
416    final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue();
417    
418    ServiceManagerState(int numberOfServices) {
419      this.numberOfServices = numberOfServices;
420      this.unstoppedServices = numberOfServices;
421      this.unstartedServices = numberOfServices;
422    }
423    
424    void addListener(Listener listener, Executor executor) {
425      checkNotNull(listener, "listener");
426      checkNotNull(executor, "executor");
427      monitor.enter();
428      try {
429        // no point in adding a listener that will never be called
430        if (unstartedServices > 0 || unstoppedServices > 0) {
431          listeners.add(new ListenerExecutorPair(listener, executor));
432        }
433      } finally {
434        monitor.leave();
435      }
436    }
437    
438    void awaitHealthy() {
439      monitor.enter();
440      try {
441        monitor.waitForUninterruptibly(awaitHealthGuard);
442      } finally {
443        monitor.leave();
444      }
445    }
446    
447    boolean awaitHealthy(long timeout, TimeUnit unit) {
448      monitor.enter();
449      try {
450        if (monitor.waitForUninterruptibly(awaitHealthGuard, timeout, unit)) {
451          return true;
452        }
453        return false;
454      } finally {
455        monitor.leave();
456      }
457    }
458    
459    void awaitStopped() {
460      monitor.enter();
461      try {
462        monitor.waitForUninterruptibly(stoppedGuard);
463      } finally {
464        monitor.leave();
465      }
466    }
467    
468    boolean awaitStopped(long timeout, TimeUnit unit) {
469      monitor.enter();
470      try {
471        return monitor.waitForUninterruptibly(stoppedGuard, timeout, unit);
472      } finally {
473        monitor.leave();
474      }
475    }
476    
477    /**
478     * This should be called when a service finishes starting up.
479     * 
480     * @param currentlyHealthy whether or not the service that finished starting was healthy at the 
481     *        time that it finished starting. 
482     */
483    @GuardedBy("monitor")
484    private void serviceFinishedStarting(Service service, boolean currentlyHealthy) {
485      checkState(unstartedServices > 0, 
486          "All services should have already finished starting but %s just finished.", service);
487      unstartedServices--;
488      if (currentlyHealthy && unstartedServices == 0 && unstoppedServices == numberOfServices) {
489        // This means that the manager is currently healthy, or at least it should have been
490        // healthy at some point from some perspective. Calling isHealthy is not currently
491        // guaranteed to return true because any service could fail right now. However, the
492        // happens-before relationship enforced by the monitor ensures that this method was called
493        // before either serviceTerminated or serviceFailed, so we know that the manager was at 
494        // least healthy for some period of time. Furthermore we are guaranteed that this call to
495        // healthy() will be before any call to terminated() or failure(Service) on the listener.
496        // So it is correct to execute the listener's health() callback.
497        for (final ListenerExecutorPair pair : listeners) {
498          queuedListeners.add(new Runnable() {
499            @Override public void run() {
500              pair.execute(new Runnable() {
501                @Override public void run() {
502                  pair.listener.healthy();
503                }
504              });
505            }
506          });
507        }
508      }
509    }
510    
511    /**
512     * This should be called when a service is {@linkplain State#TERMINATED terminated}.
513     */
514    @GuardedBy("monitor")
515    private void serviceTerminated(Service service) {
516      serviceStopped(service);
517    }
518    
519    /**
520     * This should be called when a service is {@linkplain State#FAILED failed}.
521     */
522    @GuardedBy("monitor")
523    private void serviceFailed(final Service service) {
524      for (final ListenerExecutorPair pair : listeners) {
525        queuedListeners.add(new Runnable() {
526          @Override public void run() {
527            pair.execute(new Runnable() {
528              @Override public void run() {
529                pair.listener.failure(service);
530              }
531            });
532          }
533        });
534      }
535      serviceStopped(service);
536    }
537    
538    /**
539     * Should be called whenever a service reaches a terminal state (
540     * {@linkplain State#TERMINATED terminated} or 
541     * {@linkplain State#FAILED failed}).
542     */
543    @GuardedBy("monitor")
544    private void serviceStopped(Service service) {
545      checkState(unstoppedServices > 0, 
546          "All services should have already stopped but %s just stopped.", service);
547      unstoppedServices--;
548      if (unstoppedServices == 0) {
549        checkState(unstartedServices == 0, 
550            "All services are stopped but %d services haven't finished starting", 
551            unstartedServices);
552        for (final ListenerExecutorPair pair : listeners) {
553          queuedListeners.add(new Runnable() {
554            @Override public void run() {
555              pair.execute(new Runnable() {
556                @Override public void run() {
557                  pair.listener.stopped();
558                }
559              });
560            }
561          });
562        }
563        // no more listeners could possibly be called, so clear them out
564        listeners.clear();
565      }
566    }
567    
568    /** 
569     * Attempts to execute all the listeners in {@link #queuedListeners}.
570     */
571    private void executeListeners() {
572      checkState(!monitor.isOccupiedByCurrentThread(), 
573          "It is incorrect to execute listeners with the monitor held.");
574      synchronized (queuedListeners) {
575        Runnable listener;
576        while ((listener = queuedListeners.poll()) != null) {
577          listener.run();
578        }
579      }
580    }
581  }
582
583  /**
584   * A {@link Service} that wraps another service and times how long it takes for it to start and 
585   * also calls the {@link ServiceManagerState#serviceFinishedStarting}, 
586   * {@link ServiceManagerState#serviceTerminated} and {@link ServiceManagerState#serviceFailed}
587   * according to its current state.
588   */
589  private static final class ServiceListener implements Service.Listener {
590    @GuardedBy("watch")  // AFAICT Stopwatch is not thread safe so we need to protect accesses
591    final Stopwatch watch = new Stopwatch();
592    final Service service;
593    final ServiceManagerState state;
594    
595    /**
596     * @param service the service that 
597     */
598    ServiceListener(Service service, ServiceManagerState state) {
599      this.service = service;
600      this.state = state;
601    }
602    
603    @Override public void starting() {
604      // This can happen if someone besides the ServiceManager starts the service, in this case
605      // our timings may be inaccurate.
606      startTimer();
607    }
608    
609    @Override public void running() {
610      state.monitor.enter();
611      try {
612        finishedStarting(true);
613      } finally {
614        state.monitor.leave();
615        state.executeListeners();
616      }
617    }
618    
619    @Override public void stopping(State from) {
620      if (from == State.STARTING) {
621        state.monitor.enter();
622        try {
623          finishedStarting(false);
624        } finally {
625          state.monitor.leave();
626          state.executeListeners();
627        }
628      }
629    }
630    
631    @Override public void terminated(State from) {
632      logger.info("Service " + service + " has terminated. Previous state was " + from + " state.");
633      state.monitor.enter();
634      try {
635        if (from == State.NEW) {
636          // startTimer is idempotent, so this is safe to call and it may be necessary if no one has
637          // started the timer yet.
638          startTimer(); 
639          finishedStarting(false);
640        }
641        state.serviceTerminated(service);
642      } finally {
643        state.monitor.leave();
644        state.executeListeners();
645      }
646    }
647    
648    @Override public void failed(State from, Throwable failure) {
649      logger.log(Level.SEVERE, "Service " + service + " has failed in the " + from + " state.", 
650          failure);
651      state.monitor.enter();
652      try {
653        if (from == State.STARTING) {
654          finishedStarting(false);
655        }
656        state.serviceFailed(service);
657      } finally {
658        state.monitor.leave();
659        state.executeListeners();
660      }
661    }
662    
663    /** 
664     * Stop the stopwatch, log the startup time and decrement the startup latch
665     *  
666     * @param currentlyHealthy whether or not the service that finished starting is currently 
667     *        healthy 
668     */
669    @GuardedBy("monitor")
670    void finishedStarting(boolean currentlyHealthy) {
671      synchronized (watch) {
672        watch.stop();
673        logger.log(Level.INFO, "Started " + service + " in " + startupTimeMillis() + " ms.");
674      }
675      state.serviceFinishedStarting(service, currentlyHealthy);
676    }
677    
678    void start() {
679      startTimer();
680      service.start();
681    }
682  
683    /** Start the timer if it hasn't been started. */
684    void startTimer() {
685      synchronized (watch) {
686        if (!watch.isRunning()) { // only start the watch once.
687          watch.start();
688          logger.log(Level.INFO, "Starting {0}", service);
689        }
690      }
691    }
692    
693    /** Returns the amount of time it took for the service to finish starting in milliseconds. */
694    synchronized long startupTimeMillis() {
695      synchronized (watch) {
696        return watch.elapsed(MILLISECONDS);
697      }
698    }
699  }
700  
701  /** Simple value object binding a listener to its executor. */
702  @Immutable private static final class ListenerExecutorPair {
703    final Listener listener;
704    final Executor executor;
705    
706    ListenerExecutorPair(Listener listener, Executor executor) {
707      this.listener = listener;
708      this.executor = executor;
709    }
710    
711    /**
712     * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all 
713     * exceptions
714     */
715    void execute(Runnable runnable) {
716      try {
717        executor.execute(runnable);
718      } catch (Exception e) {
719        logger.log(Level.SEVERE, "Exception while executing listener " + listener 
720            + " with executor " + executor, e);
721      }
722    }
723  }
724}