001    /*
002     * Copyright (C) 2009 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkArgument;
020    import static com.google.common.base.Preconditions.checkNotNull;
021    import static com.google.common.base.Preconditions.checkState;
022    
023    import com.google.common.annotations.Beta;
024    import com.google.common.collect.Lists;
025    import com.google.common.collect.Queues;
026    import com.google.common.util.concurrent.Service.State; // javadoc needs this
027    
028    import java.util.List;
029    import java.util.Queue;
030    import java.util.concurrent.ExecutionException;
031    import java.util.concurrent.Executor;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.TimeoutException;
034    import java.util.concurrent.locks.ReentrantLock;
035    import java.util.logging.Level;
036    import java.util.logging.Logger;
037    
038    import javax.annotation.Nullable;
039    import javax.annotation.concurrent.GuardedBy;
040    import javax.annotation.concurrent.Immutable;
041    
042    /**
043     * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
044     * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
045     * callbacks. Its subclasses must manage threads manually; consider
046     * {@link AbstractExecutionThreadService} if you need only a single execution thread.
047     *
048     * @author Jesse Wilson
049     * @author Luke Sandberg
050     * @since 1.0
051     */
052    @Beta
053    public abstract class AbstractService implements Service {
054      private static final Logger logger = Logger.getLogger(AbstractService.class.getName());
055      private final ReentrantLock lock = new ReentrantLock();
056    
057      private final Transition startup = new Transition();
058      private final Transition shutdown = new Transition();
059    
060      /**
061       * The listeners to notify during a state transition.
062       */
063      @GuardedBy("lock")
064      private final List<ListenerExecutorPair> listeners = Lists.newArrayList();
065    
066      /**
067       * The queue of listeners that are waiting to be executed.
068       *
069       * <p>Enqueue operations should be protected by {@link #lock} while dequeue operations should be
070       * protected by the implicit lock on this object. Dequeue operations should be executed atomically
071       * with the execution of the {@link Runnable} and additionally the {@link #lock} should not be
072       * held when the listeners are being executed. Use {@link #executeListeners} for this operation.
073       * This is necessary to ensure that elements on the queue are executed in the correct order.
074       * Enqueue operations should be protected so that listeners are added in the correct order. We use
075       * a concurrent queue implementation so that enqueues can be executed concurrently with dequeues.
076       */
077      @GuardedBy("queuedListeners")
078      private final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue();
079    
080      /**
081       * The current state of the service.  This should be written with the lock held but can be read
082       * without it because it is an immutable object in a volatile field.  This is desirable so that
083       * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
084       * without grabbing the lock.
085       *
086       * <p>To update this field correctly the lock must be held to guarantee that the state is
087       * consistent.
088       */
089      @GuardedBy("lock")
090      private volatile StateSnapshot snapshot = new StateSnapshot(State.NEW);
091    
092      protected AbstractService() {
093        // Add a listener to update the futures. This needs to be added first so that it is executed
094        // before the other listeners. This way the other listeners can access the completed futures.
095        addListener(
096            new Listener() {
097              @Override public void starting() {}
098    
099              @Override public void running() {
100                startup.set(State.RUNNING);
101              }
102    
103              @Override public void stopping(State from) {
104                if (from == State.STARTING) {
105                  startup.set(State.STOPPING);
106                }
107              }
108    
109              @Override public void terminated(State from) {
110                if (from == State.NEW) {
111                  startup.set(State.TERMINATED);
112                }
113                shutdown.set(State.TERMINATED);
114              }
115    
116              @Override public void failed(State from, Throwable failure) {
117                switch (from) {
118                  case STARTING:
119                    startup.setException(failure);
120                    shutdown.setException(new Exception("Service failed to start.", failure));
121                    break;
122                  case RUNNING:
123                    shutdown.setException(new Exception("Service failed while running", failure));
124                    break;
125                  case STOPPING:
126                    shutdown.setException(failure);
127                    break;
128                  case TERMINATED:  /* fall-through */
129                  case FAILED:  /* fall-through */
130                  case NEW:  /* fall-through */
131                  default:
132                    throw new AssertionError("Unexpected from state: " + from);
133                }
134              }
135            },
136            MoreExecutors.sameThreadExecutor());
137      }
138    
139      /**
140       * This method is called by {@link #start} to initiate service startup. The invocation of this
141       * method should cause a call to {@link #notifyStarted()}, either during this method's run, or
142       * after it has returned. If startup fails, the invocation should cause a call to
143       * {@link #notifyFailed(Throwable)} instead.
144       *
145       * <p>This method should return promptly; prefer to do work on a different thread where it is
146       * convenient. It is invoked exactly once on service startup, even when {@link #start} is called
147       * multiple times.
148       */
149      protected abstract void doStart();
150    
151      /**
152       * This method should be used to initiate service shutdown. The invocation of this method should
153       * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
154       * returned. If shutdown fails, the invocation should cause a call to
155       * {@link #notifyFailed(Throwable)} instead.
156       *
157       * <p> This method should return promptly; prefer to do work on a different thread where it is
158       * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called
159       * multiple times.
160       */
161      protected abstract void doStop();
162    
163      @Override
164      public final ListenableFuture<State> start() {
165        lock.lock();
166        try {
167          if (snapshot.state == State.NEW) {
168            snapshot = new StateSnapshot(State.STARTING);
169            starting();
170            doStart();
171          }
172        } catch (Throwable startupFailure) {
173          notifyFailed(startupFailure);
174        } finally {
175          lock.unlock();
176          executeListeners();
177        }
178    
179        return startup;
180      }
181    
182      @Override
183      public final ListenableFuture<State> stop() {
184        lock.lock();
185        try {
186          switch (snapshot.state) {
187            case NEW:
188              snapshot = new StateSnapshot(State.TERMINATED);
189              terminated(State.NEW);
190              break;
191            case STARTING:
192              snapshot = new StateSnapshot(State.STARTING, true, null);
193              stopping(State.STARTING);
194              break;
195            case RUNNING:
196              snapshot = new StateSnapshot(State.STOPPING);
197              stopping(State.RUNNING);
198              doStop();
199              break;
200            case STOPPING:
201            case TERMINATED:
202            case FAILED:
203              // do nothing
204              break;
205            default:
206              throw new AssertionError("Unexpected state: " + snapshot.state);
207          }
208        } catch (Throwable shutdownFailure) {
209          notifyFailed(shutdownFailure);
210        } finally {
211          lock.unlock();
212          executeListeners();
213        }
214    
215        return shutdown;
216      }
217    
218      @Override
219      public State startAndWait() {
220        return Futures.getUnchecked(start());
221      }
222    
223      @Override
224      public State stopAndWait() {
225        return Futures.getUnchecked(stop());
226      }
227    
228      /**
229       * Implementing classes should invoke this method once their service has started. It will cause
230       * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
231       *
232       * @throws IllegalStateException if the service is not {@link State#STARTING}.
233       */
234      protected final void notifyStarted() {
235        lock.lock();
236        try {
237          if (snapshot.state != State.STARTING) {
238            IllegalStateException failure = new IllegalStateException(
239                "Cannot notifyStarted() when the service is " + snapshot.state);
240            notifyFailed(failure);
241            throw failure;
242          }
243    
244          if (snapshot.shutdownWhenStartupFinishes) {
245            snapshot = new StateSnapshot(State.STOPPING);
246            // We don't call listeners here because we already did that when we set the
247            // shutdownWhenStartupFinishes flag.
248            doStop();
249          } else {
250            snapshot = new StateSnapshot(State.RUNNING);
251            running();
252          }
253        } finally {
254          lock.unlock();
255          executeListeners();
256        }
257      }
258    
259      /**
260       * Implementing classes should invoke this method once their service has stopped. It will cause
261       * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
262       *
263       * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
264       *         {@link State#RUNNING}.
265       */
266      protected final void notifyStopped() {
267        lock.lock();
268        try {
269          if (snapshot.state != State.STOPPING && snapshot.state != State.RUNNING) {
270            IllegalStateException failure = new IllegalStateException(
271                "Cannot notifyStopped() when the service is " + snapshot.state);
272            notifyFailed(failure);
273            throw failure;
274          }
275          State previous = snapshot.state;
276          snapshot = new StateSnapshot(State.TERMINATED);
277          terminated(previous);
278        } finally {
279          lock.unlock();
280          executeListeners();
281        }
282      }
283    
284      /**
285       * Invoke this method to transition the service to the {@link State#FAILED}. The service will
286       * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
287       * or otherwise cannot be started nor stopped.
288       */
289      protected final void notifyFailed(Throwable cause) {
290        checkNotNull(cause);
291    
292        lock.lock();
293        try {
294          switch (snapshot.state) {
295            case NEW:
296            case TERMINATED:
297              throw new IllegalStateException("Failed while in state:" + snapshot.state, cause);
298            case RUNNING:
299            case STARTING:
300            case STOPPING:
301              State previous = snapshot.state;
302              snapshot = new StateSnapshot(State.FAILED, false, cause);
303              failed(previous, cause);
304              break;
305            case FAILED:
306              // Do nothing
307              break;
308            default:
309              throw new AssertionError("Unexpected state: " + snapshot.state);
310          }
311        } finally {
312          lock.unlock();
313          executeListeners();
314        }
315      }
316    
317      @Override
318      public final boolean isRunning() {
319        return state() == State.RUNNING;
320      }
321    
322      @Override
323      public final State state() {
324        return snapshot.externalState();
325      }
326    
327      @Override
328      public final void addListener(Listener listener, Executor executor) {
329        checkNotNull(listener, "listener");
330        checkNotNull(executor, "executor");
331        lock.lock();
332        try {
333          if (snapshot.state != State.TERMINATED && snapshot.state != State.FAILED) {
334            listeners.add(new ListenerExecutorPair(listener, executor));
335          }
336        } finally {
337          lock.unlock();
338        }
339      }
340    
341      @Override public String toString() {
342        return getClass().getSimpleName() + " [" + state() + "]";
343      }
344    
345      /**
346       * A change from one service state to another, plus the result of the change.
347       */
348      private class Transition extends AbstractFuture<State> {
349        @Override
350        public State get(long timeout, TimeUnit unit)
351            throws InterruptedException, TimeoutException, ExecutionException {
352          try {
353            return super.get(timeout, unit);
354          } catch (TimeoutException e) {
355            throw new TimeoutException(AbstractService.this.toString());
356          }
357        }
358      }
359    
360      /**
361       * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the
362       * {@link #lock}.
363       */
364      private void executeListeners() {
365        if (!lock.isHeldByCurrentThread()) {
366          synchronized (queuedListeners) {
367            Runnable listener;
368            while ((listener = queuedListeners.poll()) != null) {
369              listener.run();
370            }
371          }
372        }
373      }
374    
375      @GuardedBy("lock")
376      private void starting() {
377        for (final ListenerExecutorPair pair : listeners) {
378          queuedListeners.add(new Runnable() {
379            @Override public void run() {
380              pair.execute(new Runnable() {
381                @Override public void run() {
382                  pair.listener.starting();
383                }
384              });
385            }
386          });
387        }
388      }
389    
390      @GuardedBy("lock")
391      private void running() {
392        for (final ListenerExecutorPair pair : listeners) {
393          queuedListeners.add(new Runnable() {
394            @Override public void run() {
395              pair.execute(new Runnable() {
396                @Override public void run() {
397                  pair.listener.running();
398                }
399              });
400            }
401          });
402        }
403      }
404    
405      @GuardedBy("lock")
406      private void stopping(final State from) {
407        for (final ListenerExecutorPair pair : listeners) {
408          queuedListeners.add(new Runnable() {
409            @Override public void run() {
410              pair.execute(new Runnable() {
411                @Override public void run() {
412                  pair.listener.stopping(from);
413                }
414              });
415            }
416          });
417        }
418      }
419    
420      @GuardedBy("lock")
421      private void terminated(final State from) {
422        for (final ListenerExecutorPair pair : listeners) {
423          queuedListeners.add(new Runnable() {
424            @Override public void run() {
425              pair.execute(new Runnable() {
426                @Override public void run() {
427                  pair.listener.terminated(from);
428                }
429              });
430            }
431          });
432        }
433        // There are no more state transitions so we can clear this out.
434        listeners.clear();
435      }
436    
437      @GuardedBy("lock")
438      private void failed(final State from, final Throwable cause) {
439        for (final ListenerExecutorPair pair : listeners) {
440          queuedListeners.add(new Runnable() {
441            @Override public void run() {
442              pair.execute(new Runnable() {
443                @Override public void run() {
444                  pair.listener.failed(from, cause);
445                }
446              });
447            }
448          });
449        }
450        // There are no more state transitions so we can clear this out.
451        listeners.clear();
452      }
453    
454      /** A simple holder for a listener and its executor. */
455      private static class ListenerExecutorPair {
456        final Listener listener;
457        final Executor executor;
458    
459        ListenerExecutorPair(Listener listener, Executor executor) {
460          this.listener = listener;
461          this.executor = executor;
462        }
463    
464        /**
465         * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all
466         * exceptions
467         */
468        void execute(Runnable runnable) {
469          try {
470            executor.execute(runnable);
471          } catch (Exception e) {
472            logger.log(Level.SEVERE, "Exception while executing listener " + listener
473                + " with executor " + executor, e);
474          }
475        }
476      }
477    
478      /**
479       * An immutable snapshot of the current state of the service. This class represents a consistent
480       * snapshot of the state and therefore it can be used to answer simple queries without needing to
481       * grab a lock.
482       */
483      @Immutable
484      private static final class StateSnapshot {
485        /**
486         * The internal state, which equals external state unless
487         * shutdownWhenStartupFinishes is true.
488         */
489        final State state;
490    
491        /**
492         * If true, the user requested a shutdown while the service was still starting
493         * up.
494         */
495        final boolean shutdownWhenStartupFinishes;
496    
497        /**
498         * The exception that caused this service to fail.  This will be {@code null}
499         * unless the service has failed.
500         */
501        @Nullable
502        final Throwable failure;
503    
504        StateSnapshot(State internalState) {
505          this(internalState, false, null);
506        }
507    
508        StateSnapshot(State internalState, boolean shutdownWhenStartupFinishes, Throwable failure) {
509          checkArgument(!shutdownWhenStartupFinishes || internalState == State.STARTING,
510              "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
511              internalState);
512          checkArgument(!(failure != null ^ internalState == State.FAILED),
513              "A failure cause should be set if and only if the state is failed.  Got %s and %s "
514              + "instead.", internalState, failure);
515          this.state = internalState;
516          this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
517          this.failure = failure;
518        }
519    
520        /** @see Service#state() */
521        State externalState() {
522          if (shutdownWhenStartupFinishes && state == State.STARTING) {
523            return State.STOPPING;
524          } else {
525            return state;
526          }
527        }
528    
529        /** @see Service#failureCause() */
530        Throwable failureCause() {
531          checkState(state == State.FAILED,
532              "failureCause() is only valid if the service has failed, service is %s", state);
533          return failure;
534        }
535      }
536    }