001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.base.Preconditions.checkState;
020import static com.google.common.util.concurrent.Service.State.FAILED;
021import static com.google.common.util.concurrent.Service.State.NEW;
022import static com.google.common.util.concurrent.Service.State.RUNNING;
023import static com.google.common.util.concurrent.Service.State.STARTING;
024import static com.google.common.util.concurrent.Service.State.STOPPING;
025import static com.google.common.util.concurrent.Service.State.TERMINATED;
026
027import com.google.common.annotations.Beta;
028import com.google.common.annotations.GwtIncompatible;
029import com.google.common.util.concurrent.ListenerCallQueue.Callback;
030import com.google.common.util.concurrent.Monitor.Guard;
031import com.google.common.util.concurrent.Service.State; // javadoc needs this
032import com.google.errorprone.annotations.CanIgnoreReturnValue;
033import com.google.j2objc.annotations.WeakOuter;
034import java.util.ArrayList;
035import java.util.Collections;
036import java.util.List;
037import java.util.concurrent.Executor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040import javax.annotation.Nullable;
041import javax.annotation.concurrent.GuardedBy;
042import javax.annotation.concurrent.Immutable;
043
044/**
045 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
046 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
047 * callbacks. Its subclasses must manage threads manually; consider {@link
048 * AbstractExecutionThreadService} if you need only a single execution thread.
049 *
050 * @author Jesse Wilson
051 * @author Luke Sandberg
052 * @since 1.0
053 */
054@Beta
055@GwtIncompatible
056public abstract class AbstractService implements Service {
057  private static final Callback<Listener> STARTING_CALLBACK =
058      new Callback<Listener>("starting()") {
059        @Override
060        void call(Listener listener) {
061          listener.starting();
062        }
063      };
064  private static final Callback<Listener> RUNNING_CALLBACK =
065      new Callback<Listener>("running()") {
066        @Override
067        void call(Listener listener) {
068          listener.running();
069        }
070      };
071  private static final Callback<Listener> STOPPING_FROM_STARTING_CALLBACK =
072      stoppingCallback(STARTING);
073  private static final Callback<Listener> STOPPING_FROM_RUNNING_CALLBACK =
074      stoppingCallback(RUNNING);
075
076  private static final Callback<Listener> TERMINATED_FROM_NEW_CALLBACK = terminatedCallback(NEW);
077  private static final Callback<Listener> TERMINATED_FROM_RUNNING_CALLBACK =
078      terminatedCallback(RUNNING);
079  private static final Callback<Listener> TERMINATED_FROM_STOPPING_CALLBACK =
080      terminatedCallback(STOPPING);
081
082  private static Callback<Listener> terminatedCallback(final State from) {
083    return new Callback<Listener>("terminated({from = " + from + "})") {
084      @Override
085      void call(Listener listener) {
086        listener.terminated(from);
087      }
088    };
089  }
090
091  private static Callback<Listener> stoppingCallback(final State from) {
092    return new Callback<Listener>("stopping({from = " + from + "})") {
093      @Override
094      void call(Listener listener) {
095        listener.stopping(from);
096      }
097    };
098  }
099
100  private final Monitor monitor = new Monitor();
101
102  private final Guard isStartable = new IsStartableGuard();
103
104  @WeakOuter
105  private final class IsStartableGuard extends Guard {
106    IsStartableGuard() {
107      super(AbstractService.this.monitor);
108    }
109
110    @Override
111    public boolean isSatisfied() {
112      return state() == NEW;
113    }
114  }
115
116  private final Guard isStoppable = new IsStoppableGuard();
117
118  @WeakOuter
119  private final class IsStoppableGuard extends Guard {
120    IsStoppableGuard() {
121      super(AbstractService.this.monitor);
122    }
123
124    @Override
125    public boolean isSatisfied() {
126      return state().compareTo(RUNNING) <= 0;
127    }
128  }
129
130  private final Guard hasReachedRunning = new HasReachedRunningGuard();
131
132  @WeakOuter
133  private final class HasReachedRunningGuard extends Guard {
134    HasReachedRunningGuard() {
135      super(AbstractService.this.monitor);
136    }
137
138    @Override
139    public boolean isSatisfied() {
140      return state().compareTo(RUNNING) >= 0;
141    }
142  }
143
144  private final Guard isStopped = new IsStoppedGuard();
145
146  @WeakOuter
147  private final class IsStoppedGuard extends Guard {
148    IsStoppedGuard() {
149      super(AbstractService.this.monitor);
150    }
151
152    @Override
153    public boolean isSatisfied() {
154      return state().isTerminal();
155    }
156  }
157
158  /**
159   * The listeners to notify during a state transition.
160   */
161  @GuardedBy("monitor")
162  private final List<ListenerCallQueue<Listener>> listeners =
163      Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());
164
165  /**
166   * The current state of the service. This should be written with the lock held but can be read
167   * without it because it is an immutable object in a volatile field. This is desirable so that
168   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
169   * without grabbing the lock.
170   *
171   * <p>To update this field correctly the lock must be held to guarantee that the state is
172   * consistent.
173   */
174  @GuardedBy("monitor")
175  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
176
177  /** Constructor for use by subclasses. */
178  protected AbstractService() {}
179
180  /**
181   * This method is called by {@link #startAsync} to initiate service startup. The invocation of
182   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
183   * or after it has returned. If startup fails, the invocation should cause a call to {@link
184   * #notifyFailed(Throwable)} instead.
185   *
186   * <p>This method should return promptly; prefer to do work on a different thread where it is
187   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
188   * called multiple times.
189   */
190  protected abstract void doStart();
191
192  /**
193   * This method should be used to initiate service shutdown. The invocation of this method should
194   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
195   * returned. If shutdown fails, the invocation should cause a call to {@link
196   * #notifyFailed(Throwable)} instead.
197   *
198   * <p>This method should return promptly; prefer to do work on a different thread where it is
199   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
200   * called multiple times.
201   */
202  protected abstract void doStop();
203
204  @CanIgnoreReturnValue
205  @Override
206  public final Service startAsync() {
207    if (monitor.enterIf(isStartable)) {
208      try {
209        snapshot = new StateSnapshot(STARTING);
210        starting();
211        doStart();
212      } catch (Throwable startupFailure) {
213        notifyFailed(startupFailure);
214      } finally {
215        monitor.leave();
216        executeListeners();
217      }
218    } else {
219      throw new IllegalStateException("Service " + this + " has already been started");
220    }
221    return this;
222  }
223
224  @CanIgnoreReturnValue
225  @Override
226  public final Service stopAsync() {
227    if (monitor.enterIf(isStoppable)) {
228      try {
229        State previous = state();
230        switch (previous) {
231          case NEW:
232            snapshot = new StateSnapshot(TERMINATED);
233            terminated(NEW);
234            break;
235          case STARTING:
236            snapshot = new StateSnapshot(STARTING, true, null);
237            stopping(STARTING);
238            break;
239          case RUNNING:
240            snapshot = new StateSnapshot(STOPPING);
241            stopping(RUNNING);
242            doStop();
243            break;
244          case STOPPING:
245          case TERMINATED:
246          case FAILED:
247            // These cases are impossible due to the if statement above.
248            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
249          default:
250            throw new AssertionError("Unexpected state: " + previous);
251        }
252      } catch (Throwable shutdownFailure) {
253        notifyFailed(shutdownFailure);
254      } finally {
255        monitor.leave();
256        executeListeners();
257      }
258    }
259    return this;
260  }
261
262  @Override
263  public final void awaitRunning() {
264    monitor.enterWhenUninterruptibly(hasReachedRunning);
265    try {
266      checkCurrentState(RUNNING);
267    } finally {
268      monitor.leave();
269    }
270  }
271
272  @Override
273  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
274    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
275      try {
276        checkCurrentState(RUNNING);
277      } finally {
278        monitor.leave();
279      }
280    } else {
281      // It is possible due to races the we are currently in the expected state even though we
282      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
283      // even check the guard. I don't think we care too much about this use case but it could lead
284      // to a confusing error message.
285      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
286    }
287  }
288
289  @Override
290  public final void awaitTerminated() {
291    monitor.enterWhenUninterruptibly(isStopped);
292    try {
293      checkCurrentState(TERMINATED);
294    } finally {
295      monitor.leave();
296    }
297  }
298
299  @Override
300  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
301    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
302      try {
303        checkCurrentState(TERMINATED);
304      } finally {
305        monitor.leave();
306      }
307    } else {
308      // It is possible due to races the we are currently in the expected state even though we
309      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
310      // even check the guard. I don't think we care too much about this use case but it could lead
311      // to a confusing error message.
312      throw new TimeoutException(
313          "Timed out waiting for "
314              + this
315              + " to reach a terminal state. "
316              + "Current state: "
317              + state());
318    }
319  }
320
321  /** Checks that the current state is equal to the expected state. */
322  @GuardedBy("monitor")
323  private void checkCurrentState(State expected) {
324    State actual = state();
325    if (actual != expected) {
326      if (actual == FAILED) {
327        // Handle this specially so that we can include the failureCause, if there is one.
328        throw new IllegalStateException(
329            "Expected the service " + this + " to be " + expected + ", but the service has FAILED",
330            failureCause());
331      }
332      throw new IllegalStateException(
333          "Expected the service " + this + " to be " + expected + ", but was " + actual);
334    }
335  }
336
337  /**
338   * Implementing classes should invoke this method once their service has started. It will cause
339   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
340   *
341   * @throws IllegalStateException if the service is not {@link State#STARTING}.
342   */
343  protected final void notifyStarted() {
344    monitor.enter();
345    try {
346      // We have to examine the internal state of the snapshot here to properly handle the stop
347      // while starting case.
348      if (snapshot.state != STARTING) {
349        IllegalStateException failure =
350            new IllegalStateException(
351                "Cannot notifyStarted() when the service is " + snapshot.state);
352        notifyFailed(failure);
353        throw failure;
354      }
355
356      if (snapshot.shutdownWhenStartupFinishes) {
357        snapshot = new StateSnapshot(STOPPING);
358        // We don't call listeners here because we already did that when we set the
359        // shutdownWhenStartupFinishes flag.
360        doStop();
361      } else {
362        snapshot = new StateSnapshot(RUNNING);
363        running();
364      }
365    } finally {
366      monitor.leave();
367      executeListeners();
368    }
369  }
370
371  /**
372   * Implementing classes should invoke this method once their service has stopped. It will cause
373   * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
374   *
375   * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor {@link
376   *     State#RUNNING}.
377   */
378  protected final void notifyStopped() {
379    monitor.enter();
380    try {
381      // We check the internal state of the snapshot instead of state() directly so we don't allow
382      // notifyStopped() to be called while STARTING, even if stop() has already been called.
383      State previous = snapshot.state;
384      if (previous != STOPPING && previous != RUNNING) {
385        IllegalStateException failure =
386            new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
387        notifyFailed(failure);
388        throw failure;
389      }
390      snapshot = new StateSnapshot(TERMINATED);
391      terminated(previous);
392    } finally {
393      monitor.leave();
394      executeListeners();
395    }
396  }
397
398  /**
399   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
400   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
401   * or otherwise cannot be started nor stopped.
402   */
403  protected final void notifyFailed(Throwable cause) {
404    checkNotNull(cause);
405
406    monitor.enter();
407    try {
408      State previous = state();
409      switch (previous) {
410        case NEW:
411        case TERMINATED:
412          throw new IllegalStateException("Failed while in state:" + previous, cause);
413        case RUNNING:
414        case STARTING:
415        case STOPPING:
416          snapshot = new StateSnapshot(FAILED, false, cause);
417          failed(previous, cause);
418          break;
419        case FAILED:
420          // Do nothing
421          break;
422        default:
423          throw new AssertionError("Unexpected state: " + previous);
424      }
425    } finally {
426      monitor.leave();
427      executeListeners();
428    }
429  }
430
431  @Override
432  public final boolean isRunning() {
433    return state() == RUNNING;
434  }
435
436  @Override
437  public final State state() {
438    return snapshot.externalState();
439  }
440
441  /**
442   * @since 14.0
443   */
444  @Override
445  public final Throwable failureCause() {
446    return snapshot.failureCause();
447  }
448
449  /**
450   * @since 13.0
451   */
452  @Override
453  public final void addListener(Listener listener, Executor executor) {
454    checkNotNull(listener, "listener");
455    checkNotNull(executor, "executor");
456    monitor.enter();
457    try {
458      if (!state().isTerminal()) {
459        listeners.add(new ListenerCallQueue<Listener>(listener, executor));
460      }
461    } finally {
462      monitor.leave();
463    }
464  }
465
466  @Override
467  public String toString() {
468    return getClass().getSimpleName() + " [" + state() + "]";
469  }
470
471  /**
472   * Attempts to execute all the listeners in {@link #listeners} while not holding the
473   * {@link #monitor}.
474   */
475  private void executeListeners() {
476    if (!monitor.isOccupiedByCurrentThread()) {
477      // iterate by index to avoid concurrent modification exceptions
478      for (int i = 0; i < listeners.size(); i++) {
479        listeners.get(i).execute();
480      }
481    }
482  }
483
484  @GuardedBy("monitor")
485  private void starting() {
486    STARTING_CALLBACK.enqueueOn(listeners);
487  }
488
489  @GuardedBy("monitor")
490  private void running() {
491    RUNNING_CALLBACK.enqueueOn(listeners);
492  }
493
494  @GuardedBy("monitor")
495  private void stopping(final State from) {
496    if (from == State.STARTING) {
497      STOPPING_FROM_STARTING_CALLBACK.enqueueOn(listeners);
498    } else if (from == State.RUNNING) {
499      STOPPING_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
500    } else {
501      throw new AssertionError();
502    }
503  }
504
505  @GuardedBy("monitor")
506  private void terminated(final State from) {
507    switch (from) {
508      case NEW:
509        TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners);
510        break;
511      case RUNNING:
512        TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
513        break;
514      case STOPPING:
515        TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners);
516        break;
517      case STARTING:
518      case TERMINATED:
519      case FAILED:
520      default:
521        throw new AssertionError();
522    }
523  }
524
525  @GuardedBy("monitor")
526  private void failed(final State from, final Throwable cause) {
527    // can't memoize this one due to the exception
528    new Callback<Listener>("failed({from = " + from + ", cause = " + cause + "})") {
529      @Override
530      void call(Listener listener) {
531        listener.failed(from, cause);
532      }
533    }.enqueueOn(listeners);
534  }
535
536  /**
537   * An immutable snapshot of the current state of the service. This class represents a consistent
538   * snapshot of the state and therefore it can be used to answer simple queries without needing to
539   * grab a lock.
540   */
541  @Immutable
542  private static final class StateSnapshot {
543    /**
544     * The internal state, which equals external state unless shutdownWhenStartupFinishes is true.
545     */
546    final State state;
547
548    /**
549     * If true, the user requested a shutdown while the service was still starting up.
550     */
551    final boolean shutdownWhenStartupFinishes;
552
553    /**
554     * The exception that caused this service to fail. This will be {@code null} unless the service
555     * has failed.
556     */
557    @Nullable final Throwable failure;
558
559    StateSnapshot(State internalState) {
560      this(internalState, false, null);
561    }
562
563    StateSnapshot(
564        State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
565      checkArgument(
566          !shutdownWhenStartupFinishes || internalState == STARTING,
567          "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
568          internalState);
569      checkArgument(
570          !(failure != null ^ internalState == FAILED),
571          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
572              + "instead.",
573          internalState,
574          failure);
575      this.state = internalState;
576      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
577      this.failure = failure;
578    }
579
580    /** @see Service#state() */
581    State externalState() {
582      if (shutdownWhenStartupFinishes && state == STARTING) {
583        return STOPPING;
584      } else {
585        return state;
586      }
587    }
588
589    /** @see Service#failureCause() */
590    Throwable failureCause() {
591      checkState(
592          state == FAILED,
593          "failureCause() is only valid if the service has failed, service is %s",
594          state);
595      return failure;
596    }
597  }
598}