001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.base.Preconditions.checkState;
020import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException;
021import static com.google.common.util.concurrent.Service.State.FAILED;
022import static com.google.common.util.concurrent.Service.State.NEW;
023import static com.google.common.util.concurrent.Service.State.RUNNING;
024import static com.google.common.util.concurrent.Service.State.STARTING;
025import static com.google.common.util.concurrent.Service.State.STOPPING;
026import static com.google.common.util.concurrent.Service.State.TERMINATED;
027import static java.util.Objects.requireNonNull;
028
029import com.google.common.annotations.GwtIncompatible;
030import com.google.common.annotations.J2ktIncompatible;
031import com.google.common.util.concurrent.Monitor.Guard;
032import com.google.common.util.concurrent.Service.State; // javadoc needs this
033import com.google.errorprone.annotations.CanIgnoreReturnValue;
034import com.google.errorprone.annotations.ForOverride;
035import com.google.errorprone.annotations.concurrent.GuardedBy;
036import com.google.j2objc.annotations.WeakOuter;
037import java.time.Duration;
038import java.util.concurrent.Executor;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.TimeoutException;
041import javax.annotation.CheckForNull;
042
043/**
044 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
045 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
046 * callbacks. Its subclasses must manage threads manually; consider {@link
047 * AbstractExecutionThreadService} if you need only a single execution thread.
048 *
049 * @author Jesse Wilson
050 * @author Luke Sandberg
051 * @since 1.0
052 */
053@GwtIncompatible
054@J2ktIncompatible
055@ElementTypesAreNonnullByDefault
056public abstract class AbstractService implements Service {
057  private static final ListenerCallQueue.Event<Listener> STARTING_EVENT =
058      new ListenerCallQueue.Event<Listener>() {
059        @Override
060        public void call(Listener listener) {
061          listener.starting();
062        }
063
064        @Override
065        public String toString() {
066          return "starting()";
067        }
068      };
069  private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT =
070      new ListenerCallQueue.Event<Listener>() {
071        @Override
072        public void call(Listener listener) {
073          listener.running();
074        }
075
076        @Override
077        public String toString() {
078          return "running()";
079        }
080      };
081  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT =
082      stoppingEvent(STARTING);
083  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT =
084      stoppingEvent(RUNNING);
085
086  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
087      terminatedEvent(NEW);
088  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT =
089      terminatedEvent(STARTING);
090  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
091      terminatedEvent(RUNNING);
092  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
093      terminatedEvent(STOPPING);
094
095  private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) {
096    return new ListenerCallQueue.Event<Listener>() {
097      @Override
098      public void call(Listener listener) {
099        listener.terminated(from);
100      }
101
102      @Override
103      public String toString() {
104        return "terminated({from = " + from + "})";
105      }
106    };
107  }
108
109  private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) {
110    return new ListenerCallQueue.Event<Listener>() {
111      @Override
112      public void call(Listener listener) {
113        listener.stopping(from);
114      }
115
116      @Override
117      public String toString() {
118        return "stopping({from = " + from + "})";
119      }
120    };
121  }
122
123  private final Monitor monitor = new Monitor();
124
125  private final Guard isStartable = new IsStartableGuard();
126
127  @WeakOuter
128  private final class IsStartableGuard extends Guard {
129    IsStartableGuard() {
130      super(AbstractService.this.monitor);
131    }
132
133    @Override
134    public boolean isSatisfied() {
135      return state() == NEW;
136    }
137  }
138
139  private final Guard isStoppable = new IsStoppableGuard();
140
141  @WeakOuter
142  private final class IsStoppableGuard extends Guard {
143    IsStoppableGuard() {
144      super(AbstractService.this.monitor);
145    }
146
147    @Override
148    public boolean isSatisfied() {
149      return state().compareTo(RUNNING) <= 0;
150    }
151  }
152
153  private final Guard hasReachedRunning = new HasReachedRunningGuard();
154
155  @WeakOuter
156  private final class HasReachedRunningGuard extends Guard {
157    HasReachedRunningGuard() {
158      super(AbstractService.this.monitor);
159    }
160
161    @Override
162    public boolean isSatisfied() {
163      return state().compareTo(RUNNING) >= 0;
164    }
165  }
166
167  private final Guard isStopped = new IsStoppedGuard();
168
169  @WeakOuter
170  private final class IsStoppedGuard extends Guard {
171    IsStoppedGuard() {
172      super(AbstractService.this.monitor);
173    }
174
175    @Override
176    public boolean isSatisfied() {
177      return state().compareTo(TERMINATED) >= 0;
178    }
179  }
180
181  /** The listeners to notify during a state transition. */
182  private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>();
183
184  /**
185   * The current state of the service. This should be written with the lock held but can be read
186   * without it because it is an immutable object in a volatile field. This is desirable so that
187   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
188   * without grabbing the lock.
189   *
190   * <p>To update this field correctly the lock must be held to guarantee that the state is
191   * consistent.
192   */
193  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
194
195  /** Constructor for use by subclasses. */
196  protected AbstractService() {}
197
198  /**
199   * This method is called by {@link #startAsync} to initiate service startup. The invocation of
200   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
201   * or after it has returned. If startup fails, the invocation should cause a call to {@link
202   * #notifyFailed(Throwable)} instead.
203   *
204   * <p>This method should return promptly; prefer to do work on a different thread where it is
205   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
206   * called multiple times.
207   */
208  @ForOverride
209  protected abstract void doStart();
210
211  /**
212   * This method should be used to initiate service shutdown. The invocation of this method should
213   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
214   * returned. If shutdown fails, the invocation should cause a call to {@link
215   * #notifyFailed(Throwable)} instead.
216   *
217   * <p>This method should return promptly; prefer to do work on a different thread where it is
218   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
219   * called multiple times.
220   *
221   * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not
222   * invoked immediately. Instead, it will be deferred until after the service is {@link
223   * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}.
224   */
225  @ForOverride
226  protected abstract void doStop();
227
228  /**
229   * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link
230   * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the
231   * method to cancel pending work and then call {@link #notifyStopped} to stop the service.
232   *
233   * <p>This method should return promptly; prefer to do work on a different thread where it is
234   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
235   * called multiple times.
236   *
237   * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the
238   * external state observable by the caller of {@link #stopAsync}.
239   *
240   * @since 27.0
241   */
242  @ForOverride
243  protected void doCancelStart() {}
244
245  @CanIgnoreReturnValue
246  @Override
247  public final Service startAsync() {
248    if (monitor.enterIf(isStartable)) {
249      try {
250        snapshot = new StateSnapshot(STARTING);
251        enqueueStartingEvent();
252        doStart();
253      } catch (Throwable startupFailure) {
254        restoreInterruptIfIsInterruptedException(startupFailure);
255        notifyFailed(startupFailure);
256      } finally {
257        monitor.leave();
258        dispatchListenerEvents();
259      }
260    } else {
261      throw new IllegalStateException("Service " + this + " has already been started");
262    }
263    return this;
264  }
265
266  @CanIgnoreReturnValue
267  @Override
268  public final Service stopAsync() {
269    if (monitor.enterIf(isStoppable)) {
270      try {
271        State previous = state();
272        switch (previous) {
273          case NEW:
274            snapshot = new StateSnapshot(TERMINATED);
275            enqueueTerminatedEvent(NEW);
276            break;
277          case STARTING:
278            snapshot = new StateSnapshot(STARTING, true, null);
279            enqueueStoppingEvent(STARTING);
280            doCancelStart();
281            break;
282          case RUNNING:
283            snapshot = new StateSnapshot(STOPPING);
284            enqueueStoppingEvent(RUNNING);
285            doStop();
286            break;
287          case STOPPING:
288          case TERMINATED:
289          case FAILED:
290            // These cases are impossible due to the if statement above.
291            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
292        }
293      } catch (Throwable shutdownFailure) {
294        restoreInterruptIfIsInterruptedException(shutdownFailure);
295        notifyFailed(shutdownFailure);
296      } finally {
297        monitor.leave();
298        dispatchListenerEvents();
299      }
300    }
301    return this;
302  }
303
304  @Override
305  public final void awaitRunning() {
306    monitor.enterWhenUninterruptibly(hasReachedRunning);
307    try {
308      checkCurrentState(RUNNING);
309    } finally {
310      monitor.leave();
311    }
312  }
313
314  /** @since 28.0 */
315  @Override
316  public final void awaitRunning(Duration timeout) throws TimeoutException {
317    Service.super.awaitRunning(timeout);
318  }
319
320  @Override
321  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
322    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
323      try {
324        checkCurrentState(RUNNING);
325      } finally {
326        monitor.leave();
327      }
328    } else {
329      // It is possible due to races that we are currently in the expected state even though we
330      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
331      // even check the guard. I don't think we care too much about this use case but it could lead
332      // to a confusing error message.
333      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
334    }
335  }
336
337  @Override
338  public final void awaitTerminated() {
339    monitor.enterWhenUninterruptibly(isStopped);
340    try {
341      checkCurrentState(TERMINATED);
342    } finally {
343      monitor.leave();
344    }
345  }
346
347  /** @since 28.0 */
348  @Override
349  public final void awaitTerminated(Duration timeout) throws TimeoutException {
350    Service.super.awaitTerminated(timeout);
351  }
352
353  @Override
354  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
355    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
356      try {
357        checkCurrentState(TERMINATED);
358      } finally {
359        monitor.leave();
360      }
361    } else {
362      // It is possible due to races that we are currently in the expected state even though we
363      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
364      // even check the guard. I don't think we care too much about this use case but it could lead
365      // to a confusing error message.
366      throw new TimeoutException(
367          "Timed out waiting for "
368              + this
369              + " to reach a terminal state. "
370              + "Current state: "
371              + state());
372    }
373  }
374
375  /** Checks that the current state is equal to the expected state. */
376  @GuardedBy("monitor")
377  private void checkCurrentState(State expected) {
378    State actual = state();
379    if (actual != expected) {
380      if (actual == FAILED) {
381        // Handle this specially so that we can include the failureCause, if there is one.
382        throw new IllegalStateException(
383            "Expected the service " + this + " to be " + expected + ", but the service has FAILED",
384            failureCause());
385      }
386      throw new IllegalStateException(
387          "Expected the service " + this + " to be " + expected + ", but was " + actual);
388    }
389  }
390
391  /**
392   * Implementing classes should invoke this method once their service has started. It will cause
393   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
394   *
395   * @throws IllegalStateException if the service is not {@link State#STARTING}.
396   */
397  protected final void notifyStarted() {
398    monitor.enter();
399    try {
400      // We have to examine the internal state of the snapshot here to properly handle the stop
401      // while starting case.
402      if (snapshot.state != STARTING) {
403        IllegalStateException failure =
404            new IllegalStateException(
405                "Cannot notifyStarted() when the service is " + snapshot.state);
406        notifyFailed(failure);
407        throw failure;
408      }
409
410      if (snapshot.shutdownWhenStartupFinishes) {
411        snapshot = new StateSnapshot(STOPPING);
412        // We don't call listeners here because we already did that when we set the
413        // shutdownWhenStartupFinishes flag.
414        doStop();
415      } else {
416        snapshot = new StateSnapshot(RUNNING);
417        enqueueRunningEvent();
418      }
419    } finally {
420      monitor.leave();
421      dispatchListenerEvents();
422    }
423  }
424
425  /**
426   * Implementing classes should invoke this method once their service has stopped. It will cause
427   * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link
428   * State#TERMINATED}.
429   *
430   * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link
431   *     State#STARTING}, or {@link State#RUNNING}.
432   */
433  protected final void notifyStopped() {
434    monitor.enter();
435    try {
436      State previous = state();
437      switch (previous) {
438        case NEW:
439        case TERMINATED:
440        case FAILED:
441          throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
442        case RUNNING:
443        case STARTING:
444        case STOPPING:
445          snapshot = new StateSnapshot(TERMINATED);
446          enqueueTerminatedEvent(previous);
447          break;
448      }
449    } finally {
450      monitor.leave();
451      dispatchListenerEvents();
452    }
453  }
454
455  /**
456   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
457   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
458   * or otherwise cannot be started nor stopped.
459   */
460  protected final void notifyFailed(Throwable cause) {
461    checkNotNull(cause);
462
463    monitor.enter();
464    try {
465      State previous = state();
466      switch (previous) {
467        case NEW:
468        case TERMINATED:
469          throw new IllegalStateException("Failed while in state:" + previous, cause);
470        case RUNNING:
471        case STARTING:
472        case STOPPING:
473          snapshot = new StateSnapshot(FAILED, false, cause);
474          enqueueFailedEvent(previous, cause);
475          break;
476        case FAILED:
477          // Do nothing
478          break;
479      }
480    } finally {
481      monitor.leave();
482      dispatchListenerEvents();
483    }
484  }
485
486  @Override
487  public final boolean isRunning() {
488    return state() == RUNNING;
489  }
490
491  @Override
492  public final State state() {
493    return snapshot.externalState();
494  }
495
496  /** @since 14.0 */
497  @Override
498  public final Throwable failureCause() {
499    return snapshot.failureCause();
500  }
501
502  /** @since 13.0 */
503  @Override
504  public final void addListener(Listener listener, Executor executor) {
505    listeners.addListener(listener, executor);
506  }
507
508  @Override
509  public String toString() {
510    return getClass().getSimpleName() + " [" + state() + "]";
511  }
512
513  /**
514   * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link
515   * #monitor}.
516   */
517  private void dispatchListenerEvents() {
518    if (!monitor.isOccupiedByCurrentThread()) {
519      listeners.dispatch();
520    }
521  }
522
523  private void enqueueStartingEvent() {
524    listeners.enqueue(STARTING_EVENT);
525  }
526
527  private void enqueueRunningEvent() {
528    listeners.enqueue(RUNNING_EVENT);
529  }
530
531  private void enqueueStoppingEvent(final State from) {
532    if (from == State.STARTING) {
533      listeners.enqueue(STOPPING_FROM_STARTING_EVENT);
534    } else if (from == State.RUNNING) {
535      listeners.enqueue(STOPPING_FROM_RUNNING_EVENT);
536    } else {
537      throw new AssertionError();
538    }
539  }
540
541  private void enqueueTerminatedEvent(final State from) {
542    switch (from) {
543      case NEW:
544        listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
545        break;
546      case STARTING:
547        listeners.enqueue(TERMINATED_FROM_STARTING_EVENT);
548        break;
549      case RUNNING:
550        listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
551        break;
552      case STOPPING:
553        listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
554        break;
555      case TERMINATED:
556      case FAILED:
557        throw new AssertionError();
558    }
559  }
560
561  private void enqueueFailedEvent(final State from, final Throwable cause) {
562    // can't memoize this one due to the exception
563    listeners.enqueue(
564        new ListenerCallQueue.Event<Listener>() {
565          @Override
566          public void call(Listener listener) {
567            listener.failed(from, cause);
568          }
569
570          @Override
571          public String toString() {
572            return "failed({from = " + from + ", cause = " + cause + "})";
573          }
574        });
575  }
576
577  /**
578   * An immutable snapshot of the current state of the service. This class represents a consistent
579   * snapshot of the state and therefore it can be used to answer simple queries without needing to
580   * grab a lock.
581   */
582  // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses).
583  private static final class StateSnapshot {
584    /**
585     * The internal state, which equals external state unless shutdownWhenStartupFinishes is true.
586     */
587    final State state;
588
589    /** If true, the user requested a shutdown while the service was still starting up. */
590    final boolean shutdownWhenStartupFinishes;
591
592    /**
593     * The exception that caused this service to fail. This will be {@code null} unless the service
594     * has failed.
595     */
596    @CheckForNull final Throwable failure;
597
598    StateSnapshot(State internalState) {
599      this(internalState, false, null);
600    }
601
602    StateSnapshot(
603        State internalState, boolean shutdownWhenStartupFinishes, @CheckForNull Throwable failure) {
604      checkArgument(
605          !shutdownWhenStartupFinishes || internalState == STARTING,
606          "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
607          internalState);
608      checkArgument(
609          (failure != null) == (internalState == FAILED),
610          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
611              + "instead.",
612          internalState,
613          failure);
614      this.state = internalState;
615      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
616      this.failure = failure;
617    }
618
619    /** @see Service#state() */
620    State externalState() {
621      if (shutdownWhenStartupFinishes && state == STARTING) {
622        return STOPPING;
623      } else {
624        return state;
625      }
626    }
627
628    /** @see Service#failureCause() */
629    Throwable failureCause() {
630      checkState(
631          state == FAILED,
632          "failureCause() is only valid if the service has failed, service is %s",
633          state);
634      // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED.
635      return requireNonNull(failure);
636    }
637  }
638}