001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.base.Preconditions.checkState;
020import static com.google.common.util.concurrent.Service.State.FAILED;
021import static com.google.common.util.concurrent.Service.State.NEW;
022import static com.google.common.util.concurrent.Service.State.RUNNING;
023import static com.google.common.util.concurrent.Service.State.STARTING;
024import static com.google.common.util.concurrent.Service.State.STOPPING;
025import static com.google.common.util.concurrent.Service.State.TERMINATED;
026import static java.util.Objects.requireNonNull;
027
028import com.google.common.annotations.Beta;
029import com.google.common.annotations.GwtIncompatible;
030import com.google.common.util.concurrent.Monitor.Guard;
031import com.google.common.util.concurrent.Service.State; // javadoc needs this
032import com.google.errorprone.annotations.CanIgnoreReturnValue;
033import com.google.errorprone.annotations.ForOverride;
034import com.google.errorprone.annotations.concurrent.GuardedBy;
035import com.google.j2objc.annotations.WeakOuter;
036import java.util.concurrent.Executor;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.TimeoutException;
039import javax.annotation.CheckForNull;
040
041/**
042 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
043 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
044 * callbacks. Its subclasses must manage threads manually; consider {@link
045 * AbstractExecutionThreadService} if you need only a single execution thread.
046 *
047 * @author Jesse Wilson
048 * @author Luke Sandberg
049 * @since 1.0
050 */
051@GwtIncompatible
052@ElementTypesAreNonnullByDefault
053public abstract class AbstractService implements Service {
054  private static final ListenerCallQueue.Event<Listener> STARTING_EVENT =
055      new ListenerCallQueue.Event<Listener>() {
056        @Override
057        public void call(Listener listener) {
058          listener.starting();
059        }
060
061        @Override
062        public String toString() {
063          return "starting()";
064        }
065      };
066  private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT =
067      new ListenerCallQueue.Event<Listener>() {
068        @Override
069        public void call(Listener listener) {
070          listener.running();
071        }
072
073        @Override
074        public String toString() {
075          return "running()";
076        }
077      };
078  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT =
079      stoppingEvent(STARTING);
080  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT =
081      stoppingEvent(RUNNING);
082
083  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
084      terminatedEvent(NEW);
085  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT =
086      terminatedEvent(STARTING);
087  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
088      terminatedEvent(RUNNING);
089  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
090      terminatedEvent(STOPPING);
091
092  private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) {
093    return new ListenerCallQueue.Event<Listener>() {
094      @Override
095      public void call(Listener listener) {
096        listener.terminated(from);
097      }
098
099      @Override
100      public String toString() {
101        return "terminated({from = " + from + "})";
102      }
103    };
104  }
105
106  private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) {
107    return new ListenerCallQueue.Event<Listener>() {
108      @Override
109      public void call(Listener listener) {
110        listener.stopping(from);
111      }
112
113      @Override
114      public String toString() {
115        return "stopping({from = " + from + "})";
116      }
117    };
118  }
119
120  private final Monitor monitor = new Monitor();
121
122  private final Guard isStartable = new IsStartableGuard();
123
124  @WeakOuter
125  private final class IsStartableGuard extends Guard {
126    IsStartableGuard() {
127      super(AbstractService.this.monitor);
128    }
129
130    @Override
131    public boolean isSatisfied() {
132      return state() == NEW;
133    }
134  }
135
136  private final Guard isStoppable = new IsStoppableGuard();
137
138  @WeakOuter
139  private final class IsStoppableGuard extends Guard {
140    IsStoppableGuard() {
141      super(AbstractService.this.monitor);
142    }
143
144    @Override
145    public boolean isSatisfied() {
146      return state().compareTo(RUNNING) <= 0;
147    }
148  }
149
150  private final Guard hasReachedRunning = new HasReachedRunningGuard();
151
152  @WeakOuter
153  private final class HasReachedRunningGuard extends Guard {
154    HasReachedRunningGuard() {
155      super(AbstractService.this.monitor);
156    }
157
158    @Override
159    public boolean isSatisfied() {
160      return state().compareTo(RUNNING) >= 0;
161    }
162  }
163
164  private final Guard isStopped = new IsStoppedGuard();
165
166  @WeakOuter
167  private final class IsStoppedGuard extends Guard {
168    IsStoppedGuard() {
169      super(AbstractService.this.monitor);
170    }
171
172    @Override
173    public boolean isSatisfied() {
174      return state().compareTo(TERMINATED) >= 0;
175    }
176  }
177
178  /** The listeners to notify during a state transition. */
179  private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>();
180
181  /**
182   * The current state of the service. This should be written with the lock held but can be read
183   * without it because it is an immutable object in a volatile field. This is desirable so that
184   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
185   * without grabbing the lock.
186   *
187   * <p>To update this field correctly the lock must be held to guarantee that the state is
188   * consistent.
189   */
190  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
191
192  /** Constructor for use by subclasses. */
193  protected AbstractService() {}
194
195  /**
196   * This method is called by {@link #startAsync} to initiate service startup. The invocation of
197   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
198   * or after it has returned. If startup fails, the invocation should cause a call to {@link
199   * #notifyFailed(Throwable)} instead.
200   *
201   * <p>This method should return promptly; prefer to do work on a different thread where it is
202   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
203   * called multiple times.
204   */
205  @ForOverride
206  protected abstract void doStart();
207
208  /**
209   * This method should be used to initiate service shutdown. The invocation of this method should
210   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
211   * returned. If shutdown fails, the invocation should cause a call to {@link
212   * #notifyFailed(Throwable)} instead.
213   *
214   * <p>This method should return promptly; prefer to do work on a different thread where it is
215   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
216   * called multiple times.
217   *
218   * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not
219   * invoked immediately. Instead, it will be deferred until after the service is {@link
220   * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}.
221   */
222  @ForOverride
223  protected abstract void doStop();
224
225  /**
226   * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link
227   * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the
228   * method to cancel pending work and then call {@link #notifyStopped} to stop the service.
229   *
230   * <p>This method should return promptly; prefer to do work on a different thread where it is
231   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
232   * called multiple times.
233   *
234   * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the
235   * external state observable by the caller of {@link #stopAsync}.
236   *
237   * @since 27.0
238   */
239  @Beta
240  @ForOverride
241  protected void doCancelStart() {}
242
243  @CanIgnoreReturnValue
244  @Override
245  public final Service startAsync() {
246    if (monitor.enterIf(isStartable)) {
247      try {
248        snapshot = new StateSnapshot(STARTING);
249        enqueueStartingEvent();
250        doStart();
251      } catch (Throwable startupFailure) {
252        notifyFailed(startupFailure);
253      } finally {
254        monitor.leave();
255        dispatchListenerEvents();
256      }
257    } else {
258      throw new IllegalStateException("Service " + this + " has already been started");
259    }
260    return this;
261  }
262
263  @CanIgnoreReturnValue
264  @Override
265  public final Service stopAsync() {
266    if (monitor.enterIf(isStoppable)) {
267      try {
268        State previous = state();
269        switch (previous) {
270          case NEW:
271            snapshot = new StateSnapshot(TERMINATED);
272            enqueueTerminatedEvent(NEW);
273            break;
274          case STARTING:
275            snapshot = new StateSnapshot(STARTING, true, null);
276            enqueueStoppingEvent(STARTING);
277            doCancelStart();
278            break;
279          case RUNNING:
280            snapshot = new StateSnapshot(STOPPING);
281            enqueueStoppingEvent(RUNNING);
282            doStop();
283            break;
284          case STOPPING:
285          case TERMINATED:
286          case FAILED:
287            // These cases are impossible due to the if statement above.
288            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
289        }
290      } catch (Throwable shutdownFailure) {
291        notifyFailed(shutdownFailure);
292      } finally {
293        monitor.leave();
294        dispatchListenerEvents();
295      }
296    }
297    return this;
298  }
299
300  @Override
301  public final void awaitRunning() {
302    monitor.enterWhenUninterruptibly(hasReachedRunning);
303    try {
304      checkCurrentState(RUNNING);
305    } finally {
306      monitor.leave();
307    }
308  }
309
310  @Override
311  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
312    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
313      try {
314        checkCurrentState(RUNNING);
315      } finally {
316        monitor.leave();
317      }
318    } else {
319      // It is possible due to races the we are currently in the expected state even though we
320      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
321      // even check the guard. I don't think we care too much about this use case but it could lead
322      // to a confusing error message.
323      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
324    }
325  }
326
327  @Override
328  public final void awaitTerminated() {
329    monitor.enterWhenUninterruptibly(isStopped);
330    try {
331      checkCurrentState(TERMINATED);
332    } finally {
333      monitor.leave();
334    }
335  }
336
337  @Override
338  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
339    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
340      try {
341        checkCurrentState(TERMINATED);
342      } finally {
343        monitor.leave();
344      }
345    } else {
346      // It is possible due to races the we are currently in the expected state even though we
347      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
348      // even check the guard. I don't think we care too much about this use case but it could lead
349      // to a confusing error message.
350      throw new TimeoutException(
351          "Timed out waiting for "
352              + this
353              + " to reach a terminal state. "
354              + "Current state: "
355              + state());
356    }
357  }
358
359  /** Checks that the current state is equal to the expected state. */
360  @GuardedBy("monitor")
361  private void checkCurrentState(State expected) {
362    State actual = state();
363    if (actual != expected) {
364      if (actual == FAILED) {
365        // Handle this specially so that we can include the failureCause, if there is one.
366        throw new IllegalStateException(
367            "Expected the service " + this + " to be " + expected + ", but the service has FAILED",
368            failureCause());
369      }
370      throw new IllegalStateException(
371          "Expected the service " + this + " to be " + expected + ", but was " + actual);
372    }
373  }
374
375  /**
376   * Implementing classes should invoke this method once their service has started. It will cause
377   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
378   *
379   * @throws IllegalStateException if the service is not {@link State#STARTING}.
380   */
381  protected final void notifyStarted() {
382    monitor.enter();
383    try {
384      // We have to examine the internal state of the snapshot here to properly handle the stop
385      // while starting case.
386      if (snapshot.state != STARTING) {
387        IllegalStateException failure =
388            new IllegalStateException(
389                "Cannot notifyStarted() when the service is " + snapshot.state);
390        notifyFailed(failure);
391        throw failure;
392      }
393
394      if (snapshot.shutdownWhenStartupFinishes) {
395        snapshot = new StateSnapshot(STOPPING);
396        // We don't call listeners here because we already did that when we set the
397        // shutdownWhenStartupFinishes flag.
398        doStop();
399      } else {
400        snapshot = new StateSnapshot(RUNNING);
401        enqueueRunningEvent();
402      }
403    } finally {
404      monitor.leave();
405      dispatchListenerEvents();
406    }
407  }
408
409  /**
410   * Implementing classes should invoke this method once their service has stopped. It will cause
411   * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link
412   * State#TERMINATED}.
413   *
414   * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link
415   *     State#STARTING}, or {@link State#RUNNING}.
416   */
417  protected final void notifyStopped() {
418    monitor.enter();
419    try {
420      State previous = state();
421      switch (previous) {
422        case NEW:
423        case TERMINATED:
424        case FAILED:
425          throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
426        case RUNNING:
427        case STARTING:
428        case STOPPING:
429          snapshot = new StateSnapshot(TERMINATED);
430          enqueueTerminatedEvent(previous);
431          break;
432      }
433    } finally {
434      monitor.leave();
435      dispatchListenerEvents();
436    }
437  }
438
439  /**
440   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
441   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
442   * or otherwise cannot be started nor stopped.
443   */
444  protected final void notifyFailed(Throwable cause) {
445    checkNotNull(cause);
446
447    monitor.enter();
448    try {
449      State previous = state();
450      switch (previous) {
451        case NEW:
452        case TERMINATED:
453          throw new IllegalStateException("Failed while in state:" + previous, cause);
454        case RUNNING:
455        case STARTING:
456        case STOPPING:
457          snapshot = new StateSnapshot(FAILED, false, cause);
458          enqueueFailedEvent(previous, cause);
459          break;
460        case FAILED:
461          // Do nothing
462          break;
463      }
464    } finally {
465      monitor.leave();
466      dispatchListenerEvents();
467    }
468  }
469
470  @Override
471  public final boolean isRunning() {
472    return state() == RUNNING;
473  }
474
475  @Override
476  public final State state() {
477    return snapshot.externalState();
478  }
479
480  /** @since 14.0 */
481  @Override
482  public final Throwable failureCause() {
483    return snapshot.failureCause();
484  }
485
486  /** @since 13.0 */
487  @Override
488  public final void addListener(Listener listener, Executor executor) {
489    listeners.addListener(listener, executor);
490  }
491
492  @Override
493  public String toString() {
494    return getClass().getSimpleName() + " [" + state() + "]";
495  }
496
497  /**
498   * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link
499   * #monitor}.
500   */
501  private void dispatchListenerEvents() {
502    if (!monitor.isOccupiedByCurrentThread()) {
503      listeners.dispatch();
504    }
505  }
506
507  private void enqueueStartingEvent() {
508    listeners.enqueue(STARTING_EVENT);
509  }
510
511  private void enqueueRunningEvent() {
512    listeners.enqueue(RUNNING_EVENT);
513  }
514
515  private void enqueueStoppingEvent(final State from) {
516    if (from == State.STARTING) {
517      listeners.enqueue(STOPPING_FROM_STARTING_EVENT);
518    } else if (from == State.RUNNING) {
519      listeners.enqueue(STOPPING_FROM_RUNNING_EVENT);
520    } else {
521      throw new AssertionError();
522    }
523  }
524
525  private void enqueueTerminatedEvent(final State from) {
526    switch (from) {
527      case NEW:
528        listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
529        break;
530      case STARTING:
531        listeners.enqueue(TERMINATED_FROM_STARTING_EVENT);
532        break;
533      case RUNNING:
534        listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
535        break;
536      case STOPPING:
537        listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
538        break;
539      case TERMINATED:
540      case FAILED:
541        throw new AssertionError();
542    }
543  }
544
545  private void enqueueFailedEvent(final State from, final Throwable cause) {
546    // can't memoize this one due to the exception
547    listeners.enqueue(
548        new ListenerCallQueue.Event<Listener>() {
549          @Override
550          public void call(Listener listener) {
551            listener.failed(from, cause);
552          }
553
554          @Override
555          public String toString() {
556            return "failed({from = " + from + ", cause = " + cause + "})";
557          }
558        });
559  }
560
561  /**
562   * An immutable snapshot of the current state of the service. This class represents a consistent
563   * snapshot of the state and therefore it can be used to answer simple queries without needing to
564   * grab a lock.
565   */
566  // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses).
567  private static final class StateSnapshot {
568    /**
569     * The internal state, which equals external state unless shutdownWhenStartupFinishes is true.
570     */
571    final State state;
572
573    /** If true, the user requested a shutdown while the service was still starting up. */
574    final boolean shutdownWhenStartupFinishes;
575
576    /**
577     * The exception that caused this service to fail. This will be {@code null} unless the service
578     * has failed.
579     */
580    @CheckForNull final Throwable failure;
581
582    StateSnapshot(State internalState) {
583      this(internalState, false, null);
584    }
585
586    StateSnapshot(
587        State internalState, boolean shutdownWhenStartupFinishes, @CheckForNull Throwable failure) {
588      checkArgument(
589          !shutdownWhenStartupFinishes || internalState == STARTING,
590          "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
591          internalState);
592      checkArgument(
593          (failure != null) == (internalState == FAILED),
594          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
595              + "instead.",
596          internalState,
597          failure);
598      this.state = internalState;
599      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
600      this.failure = failure;
601    }
602
603    /** @see Service#state() */
604    State externalState() {
605      if (shutdownWhenStartupFinishes && state == STARTING) {
606        return STOPPING;
607      } else {
608        return state;
609      }
610    }
611
612    /** @see Service#failureCause() */
613    Throwable failureCause() {
614      checkState(
615          state == FAILED,
616          "failureCause() is only valid if the service has failed, service is %s",
617          state);
618      // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED.
619      return requireNonNull(failure);
620    }
621  }
622}