001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.util.concurrent.Service.State.FAILED;
023import static com.google.common.util.concurrent.Service.State.NEW;
024import static com.google.common.util.concurrent.Service.State.RUNNING;
025import static com.google.common.util.concurrent.Service.State.STARTING;
026import static com.google.common.util.concurrent.Service.State.STOPPING;
027import static com.google.common.util.concurrent.Service.State.TERMINATED;
028
029import com.google.common.annotations.Beta;
030import com.google.common.util.concurrent.ListenerCallQueue.Callback;
031import com.google.common.util.concurrent.Monitor.Guard;
032import com.google.common.util.concurrent.Service.State; // javadoc needs this
033
034import java.util.ArrayList;
035import java.util.Collections;
036import java.util.List;
037import java.util.concurrent.Executor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040
041import javax.annotation.Nullable;
042import javax.annotation.concurrent.GuardedBy;
043import javax.annotation.concurrent.Immutable;
044
045/**
046 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
047 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
048 * callbacks. Its subclasses must manage threads manually; consider
049 * {@link AbstractExecutionThreadService} if you need only a single execution thread.
050 *
051 * @author Jesse Wilson
052 * @author Luke Sandberg
053 * @since 1.0
054 */
055@Beta
056public abstract class AbstractService implements Service {
057  private static final Callback<Listener> STARTING_CALLBACK =
058      new Callback<Listener>("starting()") {
059        @Override void call(Listener listener) {
060          listener.starting();
061        }
062      };
063  private static final Callback<Listener> RUNNING_CALLBACK =
064      new Callback<Listener>("running()") {
065        @Override void call(Listener listener) {
066          listener.running();
067        }
068      };
069  private static final Callback<Listener> STOPPING_FROM_STARTING_CALLBACK =
070      stoppingCallback(STARTING);
071  private static final Callback<Listener> STOPPING_FROM_RUNNING_CALLBACK =
072      stoppingCallback(RUNNING);
073
074  private static final Callback<Listener> TERMINATED_FROM_NEW_CALLBACK =
075      terminatedCallback(NEW);
076  private static final Callback<Listener> TERMINATED_FROM_RUNNING_CALLBACK =
077      terminatedCallback(RUNNING);
078  private static final Callback<Listener> TERMINATED_FROM_STOPPING_CALLBACK =
079      terminatedCallback(STOPPING);
080
081  private static Callback<Listener> terminatedCallback(final State from) {
082    return new Callback<Listener>("terminated({from = " + from + "})") {
083      @Override void call(Listener listener) {
084        listener.terminated(from);
085      }
086    };
087  }
088
089  private static Callback<Listener> stoppingCallback(final State from) {
090    return new Callback<Listener>("stopping({from = " + from + "})") {
091      @Override void call(Listener listener) {
092        listener.stopping(from);
093      }
094    };
095  }
096
097  private final Monitor monitor = new Monitor();
098
099  private final Guard isStartable = new Guard(monitor) {
100    @Override public boolean isSatisfied() {
101      return state() == NEW;
102    }
103  };
104
105  private final Guard isStoppable = new Guard(monitor) {
106    @Override public boolean isSatisfied() {
107      return state().compareTo(RUNNING) <= 0;
108    }
109  };
110
111  private final Guard hasReachedRunning = new Guard(monitor) {
112    @Override public boolean isSatisfied() {
113      return state().compareTo(RUNNING) >= 0;
114    }
115  };
116
117  private final Guard isStopped = new Guard(monitor) {
118    @Override public boolean isSatisfied() {
119      return state().isTerminal();
120    }
121  };
122
123  /**
124   * The listeners to notify during a state transition.
125   */
126  @GuardedBy("monitor")
127  private final List<ListenerCallQueue<Listener>> listeners =
128      Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());
129
130  /**
131   * The current state of the service.  This should be written with the lock held but can be read
132   * without it because it is an immutable object in a volatile field.  This is desirable so that
133   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
134   * without grabbing the lock.
135   *
136   * <p>To update this field correctly the lock must be held to guarantee that the state is
137   * consistent.
138   */
139  @GuardedBy("monitor")
140  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
141
142  /** Constructor for use by subclasses. */
143  protected AbstractService() {}
144
145  /**
146   * This method is called by {@link #startAsync} to initiate service startup. The invocation of
147   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
148   * or after it has returned. If startup fails, the invocation should cause a call to
149   * {@link #notifyFailed(Throwable)} instead.
150   *
151   * <p>This method should return promptly; prefer to do work on a different thread where it is
152   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
153   * called multiple times.
154   */
155  protected abstract void doStart();
156
157  /**
158   * This method should be used to initiate service shutdown. The invocation of this method should
159   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
160   * returned. If shutdown fails, the invocation should cause a call to
161   * {@link #notifyFailed(Throwable)} instead.
162   *
163   * <p> This method should return promptly; prefer to do work on a different thread where it is
164   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
165   * called multiple times.
166   */
167  protected abstract void doStop();
168
169  @Override public final Service startAsync() {
170    if (monitor.enterIf(isStartable)) {
171      try {
172        snapshot = new StateSnapshot(STARTING);
173        starting();
174        doStart();
175       // TODO(user): justify why we are catching Throwable and not RuntimeException
176      } catch (Throwable startupFailure) {
177        notifyFailed(startupFailure);
178      } finally {
179        monitor.leave();
180        executeListeners();
181      }
182    } else {
183      throw new IllegalStateException("Service " + this + " has already been started");
184    }
185    return this;
186  }
187
188  @Override public final Service stopAsync() {
189    if (monitor.enterIf(isStoppable)) {
190      try {
191        State previous = state();
192        switch (previous) {
193          case NEW:
194            snapshot = new StateSnapshot(TERMINATED);
195            terminated(NEW);
196            break;
197          case STARTING:
198            snapshot = new StateSnapshot(STARTING, true, null);
199            stopping(STARTING);
200            break;
201          case RUNNING:
202            snapshot = new StateSnapshot(STOPPING);
203            stopping(RUNNING);
204            doStop();
205            break;
206          case STOPPING:
207          case TERMINATED:
208          case FAILED:
209            // These cases are impossible due to the if statement above.
210            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
211          default:
212            throw new AssertionError("Unexpected state: " + previous);
213        }
214        // TODO(user): justify why we are catching Throwable and not RuntimeException.  Also, we
215        // may inadvertently catch our AssertionErrors.
216      } catch (Throwable shutdownFailure) {
217        notifyFailed(shutdownFailure);
218      } finally {
219        monitor.leave();
220        executeListeners();
221      }
222    }
223    return this;
224  }
225
226  @Override public final void awaitRunning() {
227    monitor.enterWhenUninterruptibly(hasReachedRunning);
228    try {
229      checkCurrentState(RUNNING);
230    } finally {
231      monitor.leave();
232    }
233  }
234
235  @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
236    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
237      try {
238        checkCurrentState(RUNNING);
239      } finally {
240        monitor.leave();
241      }
242    } else {
243      // It is possible due to races the we are currently in the expected state even though we
244      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
245      // even check the guard.  I don't think we care too much about this use case but it could lead
246      // to a confusing error message.
247      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state. "
248          + "Current state: " + state());
249    }
250  }
251
252  @Override public final void awaitTerminated() {
253    monitor.enterWhenUninterruptibly(isStopped);
254    try {
255      checkCurrentState(TERMINATED);
256    } finally {
257      monitor.leave();
258    }
259  }
260
261  @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
262    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
263      try {
264        checkCurrentState(TERMINATED);
265      } finally {
266        monitor.leave();
267      }
268    } else {
269      // It is possible due to races the we are currently in the expected state even though we
270      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
271      // even check the guard.  I don't think we care too much about this use case but it could lead
272      // to a confusing error message.
273      throw new TimeoutException("Timed out waiting for " + this + " to reach a terminal state. "
274          + "Current state: " + state());
275    }
276  }
277
278  /** Checks that the current state is equal to the expected state. */
279  @GuardedBy("monitor")
280  private void checkCurrentState(State expected) {
281    State actual = state();
282    if (actual != expected) {
283      if (actual == FAILED) {
284        // Handle this specially so that we can include the failureCause, if there is one.
285        throw new IllegalStateException("Expected the service to be " + expected
286            + ", but the service has FAILED", failureCause());
287      }
288      throw new IllegalStateException("Expected the service to be " + expected + ", but was "
289          + actual);
290    }
291  }
292
293  /**
294   * Implementing classes should invoke this method once their service has started. It will cause
295   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
296   *
297   * @throws IllegalStateException if the service is not {@link State#STARTING}.
298   */
299  protected final void notifyStarted() {
300    monitor.enter();
301    try {
302      // We have to examine the internal state of the snapshot here to properly handle the stop
303      // while starting case.
304      if (snapshot.state != STARTING) {
305        IllegalStateException failure = new IllegalStateException(
306            "Cannot notifyStarted() when the service is " + snapshot.state);
307        notifyFailed(failure);
308        throw failure;
309      }
310
311      if (snapshot.shutdownWhenStartupFinishes) {
312        snapshot = new StateSnapshot(STOPPING);
313        // We don't call listeners here because we already did that when we set the
314        // shutdownWhenStartupFinishes flag.
315        doStop();
316      } else {
317        snapshot = new StateSnapshot(RUNNING);
318        running();
319      }
320    } finally {
321      monitor.leave();
322      executeListeners();
323    }
324  }
325
326  /**
327   * Implementing classes should invoke this method once their service has stopped. It will cause
328   * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
329   *
330   * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
331   *         {@link State#RUNNING}.
332   */
333  protected final void notifyStopped() {
334    monitor.enter();
335    try {
336      // We check the internal state of the snapshot instead of state() directly so we don't allow
337      // notifyStopped() to be called while STARTING, even if stop() has already been called.
338      State previous = snapshot.state;
339      if (previous != STOPPING && previous != RUNNING) {
340        IllegalStateException failure = new IllegalStateException(
341            "Cannot notifyStopped() when the service is " + previous);
342        notifyFailed(failure);
343        throw failure;
344      }
345      snapshot = new StateSnapshot(TERMINATED);
346      terminated(previous);
347    } finally {
348      monitor.leave();
349      executeListeners();
350    }
351  }
352
353  /**
354   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
355   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
356   * or otherwise cannot be started nor stopped.
357   */
358  protected final void notifyFailed(Throwable cause) {
359    checkNotNull(cause);
360
361    monitor.enter();
362    try {
363      State previous = state();
364      switch (previous) {
365        case NEW:
366        case TERMINATED:
367          throw new IllegalStateException("Failed while in state:" + previous, cause);
368        case RUNNING:
369        case STARTING:
370        case STOPPING:
371          snapshot = new StateSnapshot(FAILED, false, cause);
372          failed(previous, cause);
373          break;
374        case FAILED:
375          // Do nothing
376          break;
377        default:
378          throw new AssertionError("Unexpected state: " + previous);
379      }
380    } finally {
381      monitor.leave();
382      executeListeners();
383    }
384  }
385
386  @Override
387  public final boolean isRunning() {
388    return state() == RUNNING;
389  }
390
391  @Override
392  public final State state() {
393    return snapshot.externalState();
394  }
395
396  /**
397   * @since 14.0
398   */
399  @Override
400  public final Throwable failureCause() {
401    return snapshot.failureCause();
402  }
403
404  /**
405   * @since 13.0
406   */
407  @Override
408  public final void addListener(Listener listener, Executor executor) {
409    checkNotNull(listener, "listener");
410    checkNotNull(executor, "executor");
411    monitor.enter();
412    try {
413      if (!state().isTerminal()) {
414        listeners.add(new ListenerCallQueue<Listener>(listener, executor));
415      }
416    } finally {
417      monitor.leave();
418    }
419  }
420
421  @Override public String toString() {
422    return getClass().getSimpleName() + " [" + state() + "]";
423  }
424
425  /**
426   * Attempts to execute all the listeners in {@link #listeners} while not holding the
427   * {@link #monitor}.
428   */
429  private void executeListeners() {
430    if (!monitor.isOccupiedByCurrentThread()) {
431      // iterate by index to avoid concurrent modification exceptions
432      for (int i = 0; i < listeners.size(); i++) {
433        listeners.get(i).execute();
434      }
435    }
436  }
437
438  @GuardedBy("monitor")
439  private void starting() {
440    STARTING_CALLBACK.enqueueOn(listeners);
441  }
442
443  @GuardedBy("monitor")
444  private void running() {
445    RUNNING_CALLBACK.enqueueOn(listeners);
446  }
447
448  @GuardedBy("monitor")
449  private void stopping(final State from) {
450    if (from == State.STARTING) {
451      STOPPING_FROM_STARTING_CALLBACK.enqueueOn(listeners);
452    } else if (from == State.RUNNING) {
453      STOPPING_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
454    } else {
455      throw new AssertionError();
456    }
457  }
458
459  @GuardedBy("monitor")
460  private void terminated(final State from) {
461    switch(from) {
462      case NEW:
463        TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners);
464        break;
465      case RUNNING:
466        TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
467        break;
468      case STOPPING:
469        TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners);
470        break;
471      case STARTING:
472      case TERMINATED:
473      case FAILED:
474      default:
475        throw new AssertionError();
476    }
477  }
478
479  @GuardedBy("monitor")
480  private void failed(final State from, final Throwable cause) {
481    // can't memoize this one due to the exception
482    new Callback<Listener>("failed({from = " + from + ", cause = " + cause + "})") {
483      @Override void call(Listener listener) {
484        listener.failed(from, cause);
485      }
486    }.enqueueOn(listeners);
487  }
488
489  /**
490   * An immutable snapshot of the current state of the service. This class represents a consistent
491   * snapshot of the state and therefore it can be used to answer simple queries without needing to
492   * grab a lock.
493   */
494  @Immutable
495  private static final class StateSnapshot {
496    /**
497     * The internal state, which equals external state unless
498     * shutdownWhenStartupFinishes is true.
499     */
500    final State state;
501
502    /**
503     * If true, the user requested a shutdown while the service was still starting
504     * up.
505     */
506    final boolean shutdownWhenStartupFinishes;
507
508    /**
509     * The exception that caused this service to fail.  This will be {@code null}
510     * unless the service has failed.
511     */
512    @Nullable
513    final Throwable failure;
514
515    StateSnapshot(State internalState) {
516      this(internalState, false, null);
517    }
518
519    StateSnapshot(
520        State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
521      checkArgument(!shutdownWhenStartupFinishes || internalState == STARTING,
522          "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
523          internalState);
524      checkArgument(!(failure != null ^ internalState == FAILED),
525          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
526          + "instead.", internalState, failure);
527      this.state = internalState;
528      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
529      this.failure = failure;
530    }
531
532    /** @see Service#state() */
533    State externalState() {
534      if (shutdownWhenStartupFinishes && state == STARTING) {
535        return STOPPING;
536      } else {
537        return state;
538      }
539    }
540
541    /** @see Service#failureCause() */
542    Throwable failureCause() {
543      checkState(state == FAILED,
544          "failureCause() is only valid if the service has failed, service is %s", state);
545      return failure;
546    }
547  }
548}