001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.util.concurrent.Service.State.FAILED;
023import static com.google.common.util.concurrent.Service.State.NEW;
024import static com.google.common.util.concurrent.Service.State.RUNNING;
025import static com.google.common.util.concurrent.Service.State.STARTING;
026import static com.google.common.util.concurrent.Service.State.STOPPING;
027import static com.google.common.util.concurrent.Service.State.TERMINATED;
028
029import com.google.common.annotations.Beta;
030import com.google.common.collect.Lists;
031import com.google.common.util.concurrent.Monitor.Guard;
032import com.google.common.util.concurrent.Service.State; // javadoc needs this
033
034import java.util.List;
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.Executor;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.TimeoutException;
039import javax.annotation.Nullable;
040import javax.annotation.concurrent.GuardedBy;
041import javax.annotation.concurrent.Immutable;
042
043/**
044 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
045 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
046 * callbacks. Its subclasses must manage threads manually; consider
047 * {@link AbstractExecutionThreadService} if you need only a single execution thread.
048 *
049 * @author Jesse Wilson
050 * @author Luke Sandberg
051 * @since 1.0
052 */
053@Beta
054public abstract class AbstractService implements Service {
055  private final Monitor monitor = new Monitor();
056
057  private final Transition startup = new Transition();
058  private final Transition shutdown = new Transition();
059
060  private final Guard isStartable = new Guard(monitor) {
061    @Override public boolean isSatisfied() {
062      return state() == NEW;
063    }
064  };
065
066  private final Guard isStoppable = new Guard(monitor) {
067    @Override public boolean isSatisfied() {
068      return state().compareTo(RUNNING) <= 0;
069    }
070  };
071
072  private final Guard hasReachedRunning = new Guard(monitor) {
073    @Override public boolean isSatisfied() {
074      return state().compareTo(RUNNING) >= 0;
075    }
076  };
077
078  private final Guard isStopped = new Guard(monitor) {
079    @Override public boolean isSatisfied() {
080      return state().isTerminal();
081    }
082  };
083
084  /**
085   * The listeners to notify during a state transition.
086   */
087  @GuardedBy("monitor")
088  private final List<ListenerExecutorPair> listeners = Lists.newArrayList();
089
090  /**
091   * The queue of listeners that are waiting to be executed.
092   *
093   * <p>Enqueue operations should be protected by {@link #monitor} while calling
094   * {@link ExecutionQueue#execute()} should not be protected.
095   */
096  private final ExecutionQueue queuedListeners = new ExecutionQueue();
097
098  /**
099   * The current state of the service.  This should be written with the lock held but can be read
100   * without it because it is an immutable object in a volatile field.  This is desirable so that
101   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
102   * without grabbing the lock.
103   *
104   * <p>To update this field correctly the lock must be held to guarantee that the state is
105   * consistent.
106   */
107  @GuardedBy("monitor")
108  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
109
110  /** Constructor for use by subclasses. */
111  protected AbstractService() {
112    // Add a listener to update the futures. This needs to be added first so that it is executed
113    // before the other listeners. This way the other listeners can access the completed futures.
114    addListener(
115        new Listener() {
116          @Override public void running() {
117            startup.set(RUNNING);
118          }
119
120          @Override public void stopping(State from) {
121            if (from == STARTING) {
122              startup.set(STOPPING);
123            }
124          }
125
126          @Override public void terminated(State from) {
127            if (from == NEW) {
128              startup.set(TERMINATED);
129            }
130            shutdown.set(TERMINATED);
131          }
132
133          @Override public void failed(State from, Throwable failure) {
134            switch (from) {
135              case STARTING:
136                startup.setException(failure);
137                shutdown.setException(new Exception("Service failed to start.", failure));
138                break;
139              case RUNNING:
140                shutdown.setException(new Exception("Service failed while running", failure));
141                break;
142              case STOPPING:
143                shutdown.setException(failure);
144                break;
145              case TERMINATED:  /* fall-through */
146              case FAILED:  /* fall-through */
147              case NEW:  /* fall-through */
148              default:
149                throw new AssertionError("Unexpected from state: " + from);
150            }
151          }
152        },
153        MoreExecutors.sameThreadExecutor());
154  }
155
156  /**
157   * This method is called by {@link #start} to initiate service startup. The invocation of this
158   * method should cause a call to {@link #notifyStarted()}, either during this method's run, or
159   * after it has returned. If startup fails, the invocation should cause a call to
160   * {@link #notifyFailed(Throwable)} instead.
161   *
162   * <p>This method should return promptly; prefer to do work on a different thread where it is
163   * convenient. It is invoked exactly once on service startup, even when {@link #start} is called
164   * multiple times.
165   */
166  protected abstract void doStart();
167
168  /**
169   * This method should be used to initiate service shutdown. The invocation of this method should
170   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
171   * returned. If shutdown fails, the invocation should cause a call to
172   * {@link #notifyFailed(Throwable)} instead.
173   *
174   * <p> This method should return promptly; prefer to do work on a different thread where it is
175   * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called
176   * multiple times.
177   */
178  protected abstract void doStop();
179
180  @Override public final Service startAsync() {
181    if (monitor.enterIf(isStartable)) {
182      try {
183        snapshot = new StateSnapshot(STARTING);
184        starting();
185        doStart();
186       // TODO(user): justify why we are catching Throwable and not RuntimeException
187      } catch (Throwable startupFailure) {
188        notifyFailed(startupFailure);
189      } finally {
190        monitor.leave();
191        executeListeners();
192      }
193    } else {
194      throw new IllegalStateException("Service " + this + " has already been started");
195    }
196    return this;
197  }
198
199  @Deprecated
200  @Override
201  public final ListenableFuture<State> start() {
202    if (monitor.enterIf(isStartable)) {
203      try {
204        snapshot = new StateSnapshot(STARTING);
205        starting();
206        doStart();
207      } catch (Throwable startupFailure) {
208        notifyFailed(startupFailure);
209      } finally {
210        monitor.leave();
211        executeListeners();
212      }
213    }
214    return startup;
215  }
216
217  @Override public final Service stopAsync() {
218    stop();
219    return this;
220  }
221
222  @Deprecated
223  @Override
224  public final ListenableFuture<State> stop() {
225    if (monitor.enterIf(isStoppable)) {
226      try {
227        State previous = state();
228        switch (previous) {
229          case NEW:
230            snapshot = new StateSnapshot(TERMINATED);
231            terminated(NEW);
232            break;
233          case STARTING:
234            snapshot = new StateSnapshot(STARTING, true, null);
235            stopping(STARTING);
236            break;
237          case RUNNING:
238            snapshot = new StateSnapshot(STOPPING);
239            stopping(RUNNING);
240            doStop();
241            break;
242          case STOPPING:
243          case TERMINATED:
244          case FAILED:
245            // These cases are impossible due to the if statement above.
246            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
247          default:
248            throw new AssertionError("Unexpected state: " + previous);
249        }
250        // TODO(user): justify why we are catching Throwable and not RuntimeException.  Also, we
251        // may inadvertently catch our AssertionErrors.
252      } catch (Throwable shutdownFailure) {
253        notifyFailed(shutdownFailure);
254      } finally {
255        monitor.leave();
256        executeListeners();
257      }
258    }
259    return shutdown;
260  }
261
262  @Deprecated
263  @Override
264  public State startAndWait() {
265    return Futures.getUnchecked(start());
266  }
267
268  @Deprecated
269  @Override
270  public State stopAndWait() {
271    return Futures.getUnchecked(stop());
272  }
273
274  @Override public final void awaitRunning() {
275    monitor.enterWhenUninterruptibly(hasReachedRunning);
276    try {
277      checkCurrentState(RUNNING);
278    } finally {
279      monitor.leave();
280    }
281  }
282
283  @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
284    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
285      try {
286        checkCurrentState(RUNNING);
287      } finally {
288        monitor.leave();
289      }
290    } else {
291      // It is possible due to races the we are currently in the expected state even though we
292      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
293      // even check the guard.  I don't think we care too much about this use case but it could lead
294      // to a confusing error message.
295      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state. "
296          + "Current state: " + state());
297    }
298  }
299
300  @Override public final void awaitTerminated() {
301    monitor.enterWhenUninterruptibly(isStopped);
302    try {
303      checkCurrentState(TERMINATED);
304    } finally {
305      monitor.leave();
306    }
307  }
308
309  @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
310    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
311      try {
312        State state = state();
313        checkCurrentState(TERMINATED);
314      } finally {
315        monitor.leave();
316      }
317    } else {
318      // It is possible due to races the we are currently in the expected state even though we
319      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
320      // even check the guard.  I don't think we care too much about this use case but it could lead
321      // to a confusing error message.
322      throw new TimeoutException("Timed out waiting for " + this + " to reach a terminal state. "
323          + "Current state: " + state());
324    }
325  }
326
327  /** Checks that the current state is equal to the expected state. */
328  @GuardedBy("monitor")
329  private void checkCurrentState(State expected) {
330    State actual = state();
331    if (actual != expected) {
332      if (actual == FAILED) {
333        // Handle this specially so that we can include the failureCause, if there is one.
334        throw new IllegalStateException("Expected the service to be " + expected
335            + ", but the service has FAILED", failureCause());
336      }
337      throw new IllegalStateException("Expected the service to be " + expected + ", but was "
338          + actual);
339    }
340  }
341
342  /**
343   * Implementing classes should invoke this method once their service has started. It will cause
344   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
345   *
346   * @throws IllegalStateException if the service is not {@link State#STARTING}.
347   */
348  protected final void notifyStarted() {
349    monitor.enter();
350    try {
351      // We have to examine the internal state of the snapshot here to properly handle the stop
352      // while starting case.
353      if (snapshot.state != STARTING) {
354        IllegalStateException failure = new IllegalStateException(
355            "Cannot notifyStarted() when the service is " + snapshot.state);
356        notifyFailed(failure);
357        throw failure;
358      }
359
360      if (snapshot.shutdownWhenStartupFinishes) {
361        snapshot = new StateSnapshot(STOPPING);
362        // We don't call listeners here because we already did that when we set the
363        // shutdownWhenStartupFinishes flag.
364        doStop();
365      } else {
366        snapshot = new StateSnapshot(RUNNING);
367        running();
368      }
369    } finally {
370      monitor.leave();
371      executeListeners();
372    }
373  }
374
375  /**
376   * Implementing classes should invoke this method once their service has stopped. It will cause
377   * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
378   *
379   * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
380   *         {@link State#RUNNING}.
381   */
382  protected final void notifyStopped() {
383    monitor.enter();
384    try {
385      // We check the internal state of the snapshot instead of state() directly so we don't allow
386      // notifyStopped() to be called while STARTING, even if stop() has already been called.
387      State previous = snapshot.state;
388      if (previous != STOPPING && previous != RUNNING) {
389        IllegalStateException failure = new IllegalStateException(
390            "Cannot notifyStopped() when the service is " + previous);
391        notifyFailed(failure);
392        throw failure;
393      }
394      snapshot = new StateSnapshot(TERMINATED);
395      terminated(previous);
396    } finally {
397      monitor.leave();
398      executeListeners();
399    }
400  }
401
402  /**
403   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
404   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
405   * or otherwise cannot be started nor stopped.
406   */
407  protected final void notifyFailed(Throwable cause) {
408    checkNotNull(cause);
409
410    monitor.enter();
411    try {
412      State previous = state();
413      switch (previous) {
414        case NEW:
415        case TERMINATED:
416          throw new IllegalStateException("Failed while in state:" + previous, cause);
417        case RUNNING:
418        case STARTING:
419        case STOPPING:
420          snapshot = new StateSnapshot(FAILED, false, cause);
421          failed(previous, cause);
422          break;
423        case FAILED:
424          // Do nothing
425          break;
426        default:
427          throw new AssertionError("Unexpected state: " + previous);
428      }
429    } finally {
430      monitor.leave();
431      executeListeners();
432    }
433  }
434
435  @Override
436  public final boolean isRunning() {
437    return state() == RUNNING;
438  }
439
440  @Override
441  public final State state() {
442    return snapshot.externalState();
443  }
444
445  /**
446   * @since 14.0
447   */
448  @Override
449  public final Throwable failureCause() {
450    return snapshot.failureCause();
451  }
452
453  /**
454   * @since 13.0
455   */
456  @Override
457  public final void addListener(Listener listener, Executor executor) {
458    checkNotNull(listener, "listener");
459    checkNotNull(executor, "executor");
460    monitor.enter();
461    try {
462      State currentState = state();
463      if (currentState != TERMINATED && currentState != FAILED) {
464        listeners.add(new ListenerExecutorPair(listener, executor));
465      }
466    } finally {
467      monitor.leave();
468    }
469  }
470
471  @Override public String toString() {
472    return getClass().getSimpleName() + " [" + state() + "]";
473  }
474
475  /**
476   * A change from one service state to another, plus the result of the change.
477   */
478  private class Transition extends AbstractFuture<State> {
479    @Override
480    public State get(long timeout, TimeUnit unit)
481        throws InterruptedException, TimeoutException, ExecutionException {
482      try {
483        return super.get(timeout, unit);
484      } catch (TimeoutException e) {
485        throw new TimeoutException(AbstractService.this.toString());
486      }
487    }
488  }
489
490  /**
491   * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the
492   * {@link #monitor}.
493   */
494  private void executeListeners() {
495    if (!monitor.isOccupiedByCurrentThread()) {
496      queuedListeners.execute();
497    }
498  }
499
500  @GuardedBy("monitor")
501  private void starting() {
502    for (final ListenerExecutorPair pair : listeners) {
503      queuedListeners.add(new Runnable() {
504        @Override public void run() {
505          pair.listener.starting();
506        }
507      }, pair.executor);
508    }
509  }
510
511  @GuardedBy("monitor")
512  private void running() {
513    for (final ListenerExecutorPair pair : listeners) {
514      queuedListeners.add(new Runnable() {
515        @Override public void run() {
516          pair.listener.running();
517        }
518      }, pair.executor);
519    }
520  }
521
522  @GuardedBy("monitor")
523  private void stopping(final State from) {
524    for (final ListenerExecutorPair pair : listeners) {
525      queuedListeners.add(new Runnable() {
526        @Override public void run() {
527          pair.listener.stopping(from);
528        }
529      }, pair.executor);
530    }
531  }
532
533  @GuardedBy("monitor")
534  private void terminated(final State from) {
535    for (final ListenerExecutorPair pair : listeners) {
536      queuedListeners.add(new Runnable() {
537        @Override public void run() {
538          pair.listener.terminated(from);
539        }
540      }, pair.executor);
541    }
542    // There are no more state transitions so we can clear this out.
543    listeners.clear();
544  }
545
546  @GuardedBy("monitor")
547  private void failed(final State from, final Throwable cause) {
548    for (final ListenerExecutorPair pair : listeners) {
549      queuedListeners.add(new Runnable() {
550        @Override public void run() {
551          pair.listener.failed(from, cause);
552        }
553      }, pair.executor);
554    }
555    // There are no more state transitions so we can clear this out.
556    listeners.clear();
557  }
558
559  /** A simple holder for a listener and its executor. */
560  private static class ListenerExecutorPair {
561    final Listener listener;
562    final Executor executor;
563
564    ListenerExecutorPair(Listener listener, Executor executor) {
565      this.listener = listener;
566      this.executor = executor;
567    }
568  }
569
570  /**
571   * An immutable snapshot of the current state of the service. This class represents a consistent
572   * snapshot of the state and therefore it can be used to answer simple queries without needing to
573   * grab a lock.
574   */
575  @Immutable
576  private static final class StateSnapshot {
577    /**
578     * The internal state, which equals external state unless
579     * shutdownWhenStartupFinishes is true.
580     */
581    final State state;
582
583    /**
584     * If true, the user requested a shutdown while the service was still starting
585     * up.
586     */
587    final boolean shutdownWhenStartupFinishes;
588
589    /**
590     * The exception that caused this service to fail.  This will be {@code null}
591     * unless the service has failed.
592     */
593    @Nullable
594    final Throwable failure;
595
596    StateSnapshot(State internalState) {
597      this(internalState, false, null);
598    }
599
600    StateSnapshot(
601        State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
602      checkArgument(!shutdownWhenStartupFinishes || internalState == STARTING,
603          "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
604          internalState);
605      checkArgument(!(failure != null ^ internalState == FAILED),
606          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
607          + "instead.", internalState, failure);
608      this.state = internalState;
609      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
610      this.failure = failure;
611    }
612
613    /** @see Service#state() */
614    State externalState() {
615      if (shutdownWhenStartupFinishes && state == STARTING) {
616        return STOPPING;
617      } else {
618        return state;
619      }
620    }
621
622    /** @see Service#failureCause() */
623    Throwable failureCause() {
624      checkState(state == FAILED,
625          "failureCause() is only valid if the service has failed, service is %s", state);
626      return failure;
627    }
628  }
629}