001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.base.Preconditions.checkState;
020import static com.google.common.util.concurrent.Service.State.FAILED;
021import static com.google.common.util.concurrent.Service.State.NEW;
022import static com.google.common.util.concurrent.Service.State.RUNNING;
023import static com.google.common.util.concurrent.Service.State.STARTING;
024import static com.google.common.util.concurrent.Service.State.STOPPING;
025import static com.google.common.util.concurrent.Service.State.TERMINATED;
026
027import com.google.common.annotations.Beta;
028import com.google.common.annotations.GwtIncompatible;
029import com.google.common.util.concurrent.Monitor.Guard;
030import com.google.common.util.concurrent.Service.State; // javadoc needs this
031import com.google.errorprone.annotations.CanIgnoreReturnValue;
032import com.google.errorprone.annotations.ForOverride;
033import com.google.errorprone.annotations.concurrent.GuardedBy;
034import com.google.j2objc.annotations.WeakOuter;
035import java.time.Duration;
036import java.util.concurrent.Executor;
037import java.util.concurrent.TimeUnit;
038import java.util.concurrent.TimeoutException;
039import org.checkerframework.checker.nullness.qual.Nullable;
040
041/**
042 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
043 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
044 * callbacks. Its subclasses must manage threads manually; consider {@link
045 * AbstractExecutionThreadService} if you need only a single execution thread.
046 *
047 * @author Jesse Wilson
048 * @author Luke Sandberg
049 * @since 1.0
050 */
051@GwtIncompatible
052public abstract class AbstractService implements Service {
053  private static final ListenerCallQueue.Event<Listener> STARTING_EVENT =
054      new ListenerCallQueue.Event<Listener>() {
055        @Override
056        public void call(Listener listener) {
057          listener.starting();
058        }
059
060        @Override
061        public String toString() {
062          return "starting()";
063        }
064      };
065  private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT =
066      new ListenerCallQueue.Event<Listener>() {
067        @Override
068        public void call(Listener listener) {
069          listener.running();
070        }
071
072        @Override
073        public String toString() {
074          return "running()";
075        }
076      };
077  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT =
078      stoppingEvent(STARTING);
079  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT =
080      stoppingEvent(RUNNING);
081
082  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
083      terminatedEvent(NEW);
084  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT =
085      terminatedEvent(STARTING);
086  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
087      terminatedEvent(RUNNING);
088  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
089      terminatedEvent(STOPPING);
090
091  private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) {
092    return new ListenerCallQueue.Event<Listener>() {
093      @Override
094      public void call(Listener listener) {
095        listener.terminated(from);
096      }
097
098      @Override
099      public String toString() {
100        return "terminated({from = " + from + "})";
101      }
102    };
103  }
104
105  private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) {
106    return new ListenerCallQueue.Event<Listener>() {
107      @Override
108      public void call(Listener listener) {
109        listener.stopping(from);
110      }
111
112      @Override
113      public String toString() {
114        return "stopping({from = " + from + "})";
115      }
116    };
117  }
118
119  private final Monitor monitor = new Monitor();
120
121  private final Guard isStartable = new IsStartableGuard();
122
123  @WeakOuter
124  private final class IsStartableGuard extends Guard {
125    IsStartableGuard() {
126      super(AbstractService.this.monitor);
127    }
128
129    @Override
130    public boolean isSatisfied() {
131      return state() == NEW;
132    }
133  }
134
135  private final Guard isStoppable = new IsStoppableGuard();
136
137  @WeakOuter
138  private final class IsStoppableGuard extends Guard {
139    IsStoppableGuard() {
140      super(AbstractService.this.monitor);
141    }
142
143    @Override
144    public boolean isSatisfied() {
145      return state().compareTo(RUNNING) <= 0;
146    }
147  }
148
149  private final Guard hasReachedRunning = new HasReachedRunningGuard();
150
151  @WeakOuter
152  private final class HasReachedRunningGuard extends Guard {
153    HasReachedRunningGuard() {
154      super(AbstractService.this.monitor);
155    }
156
157    @Override
158    public boolean isSatisfied() {
159      return state().compareTo(RUNNING) >= 0;
160    }
161  }
162
163  private final Guard isStopped = new IsStoppedGuard();
164
165  @WeakOuter
166  private final class IsStoppedGuard extends Guard {
167    IsStoppedGuard() {
168      super(AbstractService.this.monitor);
169    }
170
171    @Override
172    public boolean isSatisfied() {
173      return state().isTerminal();
174    }
175  }
176
177  /** The listeners to notify during a state transition. */
178  private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>();
179
180  /**
181   * The current state of the service. This should be written with the lock held but can be read
182   * without it because it is an immutable object in a volatile field. This is desirable so that
183   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
184   * without grabbing the lock.
185   *
186   * <p>To update this field correctly the lock must be held to guarantee that the state is
187   * consistent.
188   */
189  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
190
191  /** Constructor for use by subclasses. */
192  protected AbstractService() {}
193
194  /**
195   * This method is called by {@link #startAsync} to initiate service startup. The invocation of
196   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
197   * or after it has returned. If startup fails, the invocation should cause a call to {@link
198   * #notifyFailed(Throwable)} instead.
199   *
200   * <p>This method should return promptly; prefer to do work on a different thread where it is
201   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
202   * called multiple times.
203   */
204  @ForOverride
205  protected abstract void doStart();
206
207  /**
208   * This method should be used to initiate service shutdown. The invocation of this method should
209   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
210   * returned. If shutdown fails, the invocation should cause a call to {@link
211   * #notifyFailed(Throwable)} instead.
212   *
213   * <p>This method should return promptly; prefer to do work on a different thread where it is
214   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
215   * called multiple times.
216   *
217   * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not
218   * invoked immediately. Instead, it will be deferred until after the service is {@link
219   * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}.
220   */
221  @ForOverride
222  protected abstract void doStop();
223
224  /**
225   * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link
226   * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the
227   * method to cancel pending work and then call {@link #notifyStopped} to stop the service.
228   *
229   * <p>This method should return promptly; prefer to do work on a different thread where it is
230   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
231   * called multiple times.
232   *
233   * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the
234   * external state observable by the caller of {@link #stopAsync}.
235   *
236   * @since 27.0
237   */
238  @Beta
239  @ForOverride
240  protected void doCancelStart() {}
241
242  @CanIgnoreReturnValue
243  @Override
244  public final Service startAsync() {
245    if (monitor.enterIf(isStartable)) {
246      try {
247        snapshot = new StateSnapshot(STARTING);
248        enqueueStartingEvent();
249        doStart();
250      } catch (Throwable startupFailure) {
251        notifyFailed(startupFailure);
252      } finally {
253        monitor.leave();
254        dispatchListenerEvents();
255      }
256    } else {
257      throw new IllegalStateException("Service " + this + " has already been started");
258    }
259    return this;
260  }
261
262  @CanIgnoreReturnValue
263  @Override
264  public final Service stopAsync() {
265    if (monitor.enterIf(isStoppable)) {
266      try {
267        State previous = state();
268        switch (previous) {
269          case NEW:
270            snapshot = new StateSnapshot(TERMINATED);
271            enqueueTerminatedEvent(NEW);
272            break;
273          case STARTING:
274            snapshot = new StateSnapshot(STARTING, true, null);
275            enqueueStoppingEvent(STARTING);
276            doCancelStart();
277            break;
278          case RUNNING:
279            snapshot = new StateSnapshot(STOPPING);
280            enqueueStoppingEvent(RUNNING);
281            doStop();
282            break;
283          case STOPPING:
284          case TERMINATED:
285          case FAILED:
286            // These cases are impossible due to the if statement above.
287            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
288        }
289      } catch (Throwable shutdownFailure) {
290        notifyFailed(shutdownFailure);
291      } finally {
292        monitor.leave();
293        dispatchListenerEvents();
294      }
295    }
296    return this;
297  }
298
299  @Override
300  public final void awaitRunning() {
301    monitor.enterWhenUninterruptibly(hasReachedRunning);
302    try {
303      checkCurrentState(RUNNING);
304    } finally {
305      monitor.leave();
306    }
307  }
308
309  /** @since 28.0 */
310  @Override
311  public final void awaitRunning(Duration timeout) throws TimeoutException {
312    Service.super.awaitRunning(timeout);
313  }
314
315  @Override
316  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
317    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
318      try {
319        checkCurrentState(RUNNING);
320      } finally {
321        monitor.leave();
322      }
323    } else {
324      // It is possible due to races the we are currently in the expected state even though we
325      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
326      // even check the guard. I don't think we care too much about this use case but it could lead
327      // to a confusing error message.
328      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
329    }
330  }
331
332  @Override
333  public final void awaitTerminated() {
334    monitor.enterWhenUninterruptibly(isStopped);
335    try {
336      checkCurrentState(TERMINATED);
337    } finally {
338      monitor.leave();
339    }
340  }
341
342  /** @since 28.0 */
343  @Override
344  public final void awaitTerminated(Duration timeout) throws TimeoutException {
345    Service.super.awaitTerminated(timeout);
346  }
347
348  @Override
349  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
350    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
351      try {
352        checkCurrentState(TERMINATED);
353      } finally {
354        monitor.leave();
355      }
356    } else {
357      // It is possible due to races the we are currently in the expected state even though we
358      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
359      // even check the guard. I don't think we care too much about this use case but it could lead
360      // to a confusing error message.
361      throw new TimeoutException(
362          "Timed out waiting for "
363              + this
364              + " to reach a terminal state. "
365              + "Current state: "
366              + state());
367    }
368  }
369
370  /** Checks that the current state is equal to the expected state. */
371  @GuardedBy("monitor")
372  private void checkCurrentState(State expected) {
373    State actual = state();
374    if (actual != expected) {
375      if (actual == FAILED) {
376        // Handle this specially so that we can include the failureCause, if there is one.
377        throw new IllegalStateException(
378            "Expected the service " + this + " to be " + expected + ", but the service has FAILED",
379            failureCause());
380      }
381      throw new IllegalStateException(
382          "Expected the service " + this + " to be " + expected + ", but was " + actual);
383    }
384  }
385
386  /**
387   * Implementing classes should invoke this method once their service has started. It will cause
388   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
389   *
390   * @throws IllegalStateException if the service is not {@link State#STARTING}.
391   */
392  protected final void notifyStarted() {
393    monitor.enter();
394    try {
395      // We have to examine the internal state of the snapshot here to properly handle the stop
396      // while starting case.
397      if (snapshot.state != STARTING) {
398        IllegalStateException failure =
399            new IllegalStateException(
400                "Cannot notifyStarted() when the service is " + snapshot.state);
401        notifyFailed(failure);
402        throw failure;
403      }
404
405      if (snapshot.shutdownWhenStartupFinishes) {
406        snapshot = new StateSnapshot(STOPPING);
407        // We don't call listeners here because we already did that when we set the
408        // shutdownWhenStartupFinishes flag.
409        doStop();
410      } else {
411        snapshot = new StateSnapshot(RUNNING);
412        enqueueRunningEvent();
413      }
414    } finally {
415      monitor.leave();
416      dispatchListenerEvents();
417    }
418  }
419
420  /**
421   * Implementing classes should invoke this method once their service has stopped. It will cause
422   * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link
423   * State#TERMINATED}.
424   *
425   * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link
426   *     State#STARTING}, or {@link State#RUNNING}.
427   */
428  protected final void notifyStopped() {
429    monitor.enter();
430    try {
431      State previous = state();
432      switch (previous) {
433        case NEW:
434        case TERMINATED:
435        case FAILED:
436          throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
437        case RUNNING:
438        case STARTING:
439        case STOPPING:
440          snapshot = new StateSnapshot(TERMINATED);
441          enqueueTerminatedEvent(previous);
442          break;
443      }
444    } finally {
445      monitor.leave();
446      dispatchListenerEvents();
447    }
448  }
449
450  /**
451   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
452   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
453   * or otherwise cannot be started nor stopped.
454   */
455  protected final void notifyFailed(Throwable cause) {
456    checkNotNull(cause);
457
458    monitor.enter();
459    try {
460      State previous = state();
461      switch (previous) {
462        case NEW:
463        case TERMINATED:
464          throw new IllegalStateException("Failed while in state:" + previous, cause);
465        case RUNNING:
466        case STARTING:
467        case STOPPING:
468          snapshot = new StateSnapshot(FAILED, false, cause);
469          enqueueFailedEvent(previous, cause);
470          break;
471        case FAILED:
472          // Do nothing
473          break;
474      }
475    } finally {
476      monitor.leave();
477      dispatchListenerEvents();
478    }
479  }
480
481  @Override
482  public final boolean isRunning() {
483    return state() == RUNNING;
484  }
485
486  @Override
487  public final State state() {
488    return snapshot.externalState();
489  }
490
491  /** @since 14.0 */
492  @Override
493  public final Throwable failureCause() {
494    return snapshot.failureCause();
495  }
496
497  /** @since 13.0 */
498  @Override
499  public final void addListener(Listener listener, Executor executor) {
500    listeners.addListener(listener, executor);
501  }
502
503  @Override
504  public String toString() {
505    return getClass().getSimpleName() + " [" + state() + "]";
506  }
507
508  /**
509   * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link
510   * #monitor}.
511   */
512  private void dispatchListenerEvents() {
513    if (!monitor.isOccupiedByCurrentThread()) {
514      listeners.dispatch();
515    }
516  }
517
518  private void enqueueStartingEvent() {
519    listeners.enqueue(STARTING_EVENT);
520  }
521
522  private void enqueueRunningEvent() {
523    listeners.enqueue(RUNNING_EVENT);
524  }
525
526  private void enqueueStoppingEvent(final State from) {
527    if (from == State.STARTING) {
528      listeners.enqueue(STOPPING_FROM_STARTING_EVENT);
529    } else if (from == State.RUNNING) {
530      listeners.enqueue(STOPPING_FROM_RUNNING_EVENT);
531    } else {
532      throw new AssertionError();
533    }
534  }
535
536  private void enqueueTerminatedEvent(final State from) {
537    switch (from) {
538      case NEW:
539        listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
540        break;
541      case STARTING:
542        listeners.enqueue(TERMINATED_FROM_STARTING_EVENT);
543        break;
544      case RUNNING:
545        listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
546        break;
547      case STOPPING:
548        listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
549        break;
550      case TERMINATED:
551      case FAILED:
552        throw new AssertionError();
553    }
554  }
555
556  private void enqueueFailedEvent(final State from, final Throwable cause) {
557    // can't memoize this one due to the exception
558    listeners.enqueue(
559        new ListenerCallQueue.Event<Listener>() {
560          @Override
561          public void call(Listener listener) {
562            listener.failed(from, cause);
563          }
564
565          @Override
566          public String toString() {
567            return "failed({from = " + from + ", cause = " + cause + "})";
568          }
569        });
570  }
571
572  /**
573   * An immutable snapshot of the current state of the service. This class represents a consistent
574   * snapshot of the state and therefore it can be used to answer simple queries without needing to
575   * grab a lock.
576   */
577  // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses).
578  private static final class StateSnapshot {
579    /**
580     * The internal state, which equals external state unless shutdownWhenStartupFinishes is true.
581     */
582    final State state;
583
584    /** If true, the user requested a shutdown while the service was still starting up. */
585    final boolean shutdownWhenStartupFinishes;
586
587    /**
588     * The exception that caused this service to fail. This will be {@code null} unless the service
589     * has failed.
590     */
591    final @Nullable Throwable failure;
592
593    StateSnapshot(State internalState) {
594      this(internalState, false, null);
595    }
596
597    StateSnapshot(
598        State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
599      checkArgument(
600          !shutdownWhenStartupFinishes || internalState == STARTING,
601          "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
602          internalState);
603      checkArgument(
604          !(failure != null ^ internalState == FAILED),
605          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
606              + "instead.",
607          internalState,
608          failure);
609      this.state = internalState;
610      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
611      this.failure = failure;
612    }
613
614    /** @see Service#state() */
615    State externalState() {
616      if (shutdownWhenStartupFinishes && state == STARTING) {
617        return STOPPING;
618      } else {
619        return state;
620      }
621    }
622
623    /** @see Service#failureCause() */
624    Throwable failureCause() {
625      checkState(
626          state == FAILED,
627          "failureCause() is only valid if the service has failed, service is %s",
628          state);
629      return failure;
630    }
631  }
632}