001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.base.Preconditions.checkState;
020import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException;
021import static com.google.common.util.concurrent.Service.State.FAILED;
022import static com.google.common.util.concurrent.Service.State.NEW;
023import static com.google.common.util.concurrent.Service.State.RUNNING;
024import static com.google.common.util.concurrent.Service.State.STARTING;
025import static com.google.common.util.concurrent.Service.State.STOPPING;
026import static com.google.common.util.concurrent.Service.State.TERMINATED;
027import static java.util.Objects.requireNonNull;
028
029import com.google.common.annotations.GwtIncompatible;
030import com.google.common.annotations.J2ktIncompatible;
031import com.google.common.util.concurrent.Monitor.Guard;
032import com.google.common.util.concurrent.Service.State;
033import com.google.errorprone.annotations.CanIgnoreReturnValue;
034import com.google.errorprone.annotations.ForOverride;
035import com.google.errorprone.annotations.concurrent.GuardedBy;
036import com.google.j2objc.annotations.WeakOuter;
037import java.time.Duration;
038import java.util.concurrent.Executor;
039import java.util.concurrent.TimeUnit;
040import java.util.concurrent.TimeoutException;
041import org.jspecify.annotations.Nullable;
042
043/**
044 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
045 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
046 * callbacks. Its subclasses must manage threads manually; consider {@link
047 * AbstractExecutionThreadService} if you need only a single execution thread.
048 *
049 * @author Jesse Wilson
050 * @author Luke Sandberg
051 * @since 1.0
052 */
053@GwtIncompatible
054@J2ktIncompatible
055public abstract class AbstractService implements Service {
056  private static final ListenerCallQueue.Event<Listener> STARTING_EVENT =
057      new ListenerCallQueue.Event<Listener>() {
058        @Override
059        public void call(Listener listener) {
060          listener.starting();
061        }
062
063        @Override
064        public String toString() {
065          return "starting()";
066        }
067      };
068  private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT =
069      new ListenerCallQueue.Event<Listener>() {
070        @Override
071        public void call(Listener listener) {
072          listener.running();
073        }
074
075        @Override
076        public String toString() {
077          return "running()";
078        }
079      };
080  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT =
081      stoppingEvent(STARTING);
082  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT =
083      stoppingEvent(RUNNING);
084
085  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
086      terminatedEvent(NEW);
087  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT =
088      terminatedEvent(STARTING);
089  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
090      terminatedEvent(RUNNING);
091  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
092      terminatedEvent(STOPPING);
093
094  private static ListenerCallQueue.Event<Listener> terminatedEvent(State from) {
095    return new ListenerCallQueue.Event<Listener>() {
096      @Override
097      public void call(Listener listener) {
098        listener.terminated(from);
099      }
100
101      @Override
102      public String toString() {
103        return "terminated({from = " + from + "})";
104      }
105    };
106  }
107
108  private static ListenerCallQueue.Event<Listener> stoppingEvent(State from) {
109    return new ListenerCallQueue.Event<Listener>() {
110      @Override
111      public void call(Listener listener) {
112        listener.stopping(from);
113      }
114
115      @Override
116      public String toString() {
117        return "stopping({from = " + from + "})";
118      }
119    };
120  }
121
122  private final Monitor monitor = new Monitor();
123
124  private final Guard isStartable = new IsStartableGuard();
125
126  @WeakOuter
127  private final class IsStartableGuard extends Guard {
128    IsStartableGuard() {
129      super(AbstractService.this.monitor);
130    }
131
132    @Override
133    public boolean isSatisfied() {
134      return state() == NEW;
135    }
136  }
137
138  private final Guard isStoppable = new IsStoppableGuard();
139
140  @WeakOuter
141  private final class IsStoppableGuard extends Guard {
142    IsStoppableGuard() {
143      super(AbstractService.this.monitor);
144    }
145
146    @Override
147    public boolean isSatisfied() {
148      return state().compareTo(RUNNING) <= 0;
149    }
150  }
151
152  private final Guard hasReachedRunning = new HasReachedRunningGuard();
153
154  @WeakOuter
155  private final class HasReachedRunningGuard extends Guard {
156    HasReachedRunningGuard() {
157      super(AbstractService.this.monitor);
158    }
159
160    @Override
161    public boolean isSatisfied() {
162      return state().compareTo(RUNNING) >= 0;
163    }
164  }
165
166  private final Guard isStopped = new IsStoppedGuard();
167
168  @WeakOuter
169  private final class IsStoppedGuard extends Guard {
170    IsStoppedGuard() {
171      super(AbstractService.this.monitor);
172    }
173
174    @Override
175    public boolean isSatisfied() {
176      return state().compareTo(TERMINATED) >= 0;
177    }
178  }
179
180  /** The listeners to notify during a state transition. */
181  private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>();
182
183  /**
184   * The current state of the service. This should be written with the lock held but can be read
185   * without it because it is an immutable object in a volatile field. This is desirable so that
186   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
187   * without grabbing the lock.
188   *
189   * <p>To update this field correctly the lock must be held to guarantee that the state is
190   * consistent.
191   */
192  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
193
194  /** Constructor for use by subclasses. */
195  protected AbstractService() {}
196
197  /**
198   * This method is called by {@link #startAsync} to initiate service startup. The invocation of
199   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
200   * or after it has returned. If startup fails, the invocation should cause a call to {@link
201   * #notifyFailed(Throwable)} instead.
202   *
203   * <p>This method should return promptly; prefer to do work on a different thread where it is
204   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
205   * called multiple times.
206   */
207  @ForOverride
208  protected abstract void doStart();
209
210  /**
211   * This method should be used to initiate service shutdown. The invocation of this method should
212   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
213   * returned. If shutdown fails, the invocation should cause a call to {@link
214   * #notifyFailed(Throwable)} instead.
215   *
216   * <p>This method should return promptly; prefer to do work on a different thread where it is
217   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
218   * called multiple times.
219   *
220   * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not
221   * invoked immediately. Instead, it will be deferred until after the service is {@link
222   * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}.
223   */
224  @ForOverride
225  protected abstract void doStop();
226
227  /**
228   * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link
229   * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the
230   * method to cancel pending work and then call {@link #notifyStopped} to stop the service.
231   *
232   * <p>This method should return promptly; prefer to do work on a different thread where it is
233   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
234   * called multiple times.
235   *
236   * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the
237   * external state observable by the caller of {@link #stopAsync}.
238   *
239   * @since 27.0
240   */
241  @ForOverride
242  protected void doCancelStart() {}
243
244  @CanIgnoreReturnValue
245  @Override
246  public final Service startAsync() {
247    if (monitor.enterIf(isStartable)) {
248      try {
249        snapshot = new StateSnapshot(STARTING);
250        enqueueStartingEvent();
251        doStart();
252      } catch (Throwable startupFailure) {
253        restoreInterruptIfIsInterruptedException(startupFailure);
254        notifyFailed(startupFailure);
255      } finally {
256        monitor.leave();
257        dispatchListenerEvents();
258      }
259    } else {
260      throw new IllegalStateException("Service " + this + " has already been started");
261    }
262    return this;
263  }
264
265  @CanIgnoreReturnValue
266  @Override
267  public final Service stopAsync() {
268    if (monitor.enterIf(isStoppable)) {
269      try {
270        State previous = state();
271        switch (previous) {
272          case NEW:
273            snapshot = new StateSnapshot(TERMINATED);
274            enqueueTerminatedEvent(NEW);
275            break;
276          case STARTING:
277            snapshot = new StateSnapshot(STARTING, true, null);
278            enqueueStoppingEvent(STARTING);
279            doCancelStart();
280            break;
281          case RUNNING:
282            snapshot = new StateSnapshot(STOPPING);
283            enqueueStoppingEvent(RUNNING);
284            doStop();
285            break;
286          case STOPPING:
287          case TERMINATED:
288          case FAILED:
289            // These cases are impossible due to the if statement above.
290            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
291        }
292      } catch (Throwable shutdownFailure) {
293        restoreInterruptIfIsInterruptedException(shutdownFailure);
294        notifyFailed(shutdownFailure);
295      } finally {
296        monitor.leave();
297        dispatchListenerEvents();
298      }
299    }
300    return this;
301  }
302
303  @Override
304  public final void awaitRunning() {
305    monitor.enterWhenUninterruptibly(hasReachedRunning);
306    try {
307      checkCurrentState(RUNNING);
308    } finally {
309      monitor.leave();
310    }
311  }
312
313  /**
314   * @since 28.0
315   */
316  @Override
317  public final void awaitRunning(Duration timeout) throws TimeoutException {
318    Service.super.awaitRunning(timeout);
319  }
320
321  @Override
322  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
323    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
324      try {
325        checkCurrentState(RUNNING);
326      } finally {
327        monitor.leave();
328      }
329    } else {
330      // It is possible due to races that we are currently in the expected state even though we
331      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
332      // even check the guard. I don't think we care too much about this use case but it could lead
333      // to a confusing error message.
334      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
335    }
336  }
337
338  @Override
339  public final void awaitTerminated() {
340    monitor.enterWhenUninterruptibly(isStopped);
341    try {
342      checkCurrentState(TERMINATED);
343    } finally {
344      monitor.leave();
345    }
346  }
347
348  /**
349   * @since 28.0
350   */
351  @Override
352  public final void awaitTerminated(Duration timeout) throws TimeoutException {
353    Service.super.awaitTerminated(timeout);
354  }
355
356  @Override
357  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
358    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
359      try {
360        checkCurrentState(TERMINATED);
361      } finally {
362        monitor.leave();
363      }
364    } else {
365      // It is possible due to races that we are currently in the expected state even though we
366      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
367      // even check the guard. I don't think we care too much about this use case but it could lead
368      // to a confusing error message.
369      throw new TimeoutException(
370          "Timed out waiting for "
371              + this
372              + " to reach a terminal state. "
373              + "Current state: "
374              + state());
375    }
376  }
377
378  /** Checks that the current state is equal to the expected state. */
379  @GuardedBy("monitor")
380  private void checkCurrentState(State expected) {
381    State actual = state();
382    if (actual != expected) {
383      if (actual == FAILED) {
384        // Handle this specially so that we can include the failureCause, if there is one.
385        throw new IllegalStateException(
386            "Expected the service " + this + " to be " + expected + ", but the service has FAILED",
387            failureCause());
388      }
389      throw new IllegalStateException(
390          "Expected the service " + this + " to be " + expected + ", but was " + actual);
391    }
392  }
393
394  /**
395   * Implementing classes should invoke this method once their service has started. It will cause
396   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
397   *
398   * @throws IllegalStateException if the service is not {@link State#STARTING}.
399   */
400  protected final void notifyStarted() {
401    monitor.enter();
402    try {
403      // We have to examine the internal state of the snapshot here to properly handle the stop
404      // while starting case.
405      if (snapshot.state != STARTING) {
406        IllegalStateException failure =
407            new IllegalStateException(
408                "Cannot notifyStarted() when the service is " + snapshot.state);
409        notifyFailed(failure);
410        throw failure;
411      }
412
413      if (snapshot.shutdownWhenStartupFinishes) {
414        snapshot = new StateSnapshot(STOPPING);
415        // We don't call listeners here because we already did that when we set the
416        // shutdownWhenStartupFinishes flag.
417        doStop();
418      } else {
419        snapshot = new StateSnapshot(RUNNING);
420        enqueueRunningEvent();
421      }
422    } finally {
423      monitor.leave();
424      dispatchListenerEvents();
425    }
426  }
427
428  /**
429   * Implementing classes should invoke this method once their service has stopped. It will cause
430   * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link
431   * State#TERMINATED}.
432   *
433   * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link
434   *     State#STARTING}, or {@link State#RUNNING}.
435   */
436  protected final void notifyStopped() {
437    monitor.enter();
438    try {
439      State previous = state();
440      switch (previous) {
441        case NEW:
442        case TERMINATED:
443        case FAILED:
444          throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
445        case RUNNING:
446        case STARTING:
447        case STOPPING:
448          snapshot = new StateSnapshot(TERMINATED);
449          enqueueTerminatedEvent(previous);
450          break;
451      }
452    } finally {
453      monitor.leave();
454      dispatchListenerEvents();
455    }
456  }
457
458  /**
459   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
460   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
461   * or otherwise cannot be started nor stopped.
462   */
463  protected final void notifyFailed(Throwable cause) {
464    checkNotNull(cause);
465
466    monitor.enter();
467    try {
468      State previous = state();
469      switch (previous) {
470        case NEW:
471        case TERMINATED:
472          throw new IllegalStateException("Failed while in state:" + previous, cause);
473        case RUNNING:
474        case STARTING:
475        case STOPPING:
476          snapshot = new StateSnapshot(FAILED, false, cause);
477          enqueueFailedEvent(previous, cause);
478          break;
479        case FAILED:
480          // Do nothing
481          break;
482      }
483    } finally {
484      monitor.leave();
485      dispatchListenerEvents();
486    }
487  }
488
489  @Override
490  public final boolean isRunning() {
491    return state() == RUNNING;
492  }
493
494  @Override
495  public final State state() {
496    return snapshot.externalState();
497  }
498
499  /**
500   * @since 14.0
501   */
502  @Override
503  public final Throwable failureCause() {
504    return snapshot.failureCause();
505  }
506
507  /**
508   * @since 13.0
509   */
510  @Override
511  public final void addListener(Listener listener, Executor executor) {
512    listeners.addListener(listener, executor);
513  }
514
515  @Override
516  public String toString() {
517    return getClass().getSimpleName() + " [" + state() + "]";
518  }
519
520  /**
521   * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link
522   * #monitor}.
523   */
524  private void dispatchListenerEvents() {
525    if (!monitor.isOccupiedByCurrentThread()) {
526      listeners.dispatch();
527    }
528  }
529
530  private void enqueueStartingEvent() {
531    listeners.enqueue(STARTING_EVENT);
532  }
533
534  private void enqueueRunningEvent() {
535    listeners.enqueue(RUNNING_EVENT);
536  }
537
538  private void enqueueStoppingEvent(State from) {
539    if (from == State.STARTING) {
540      listeners.enqueue(STOPPING_FROM_STARTING_EVENT);
541    } else if (from == State.RUNNING) {
542      listeners.enqueue(STOPPING_FROM_RUNNING_EVENT);
543    } else {
544      throw new AssertionError();
545    }
546  }
547
548  private void enqueueTerminatedEvent(State from) {
549    switch (from) {
550      case NEW:
551        listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
552        break;
553      case STARTING:
554        listeners.enqueue(TERMINATED_FROM_STARTING_EVENT);
555        break;
556      case RUNNING:
557        listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
558        break;
559      case STOPPING:
560        listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
561        break;
562      case TERMINATED:
563      case FAILED:
564        throw new AssertionError();
565    }
566  }
567
568  private void enqueueFailedEvent(State from, Throwable cause) {
569    // can't memoize this one due to the exception
570    listeners.enqueue(
571        new ListenerCallQueue.Event<Listener>() {
572          @Override
573          public void call(Listener listener) {
574            listener.failed(from, cause);
575          }
576
577          @Override
578          public String toString() {
579            return "failed({from = " + from + ", cause = " + cause + "})";
580          }
581        });
582  }
583
584  /**
585   * An immutable snapshot of the current state of the service. This class represents a consistent
586   * snapshot of the state and therefore it can be used to answer simple queries without needing to
587   * grab a lock.
588   */
589  // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses).
590  private static final class StateSnapshot {
591    /**
592     * The internal state, which equals external state unless shutdownWhenStartupFinishes is true.
593     */
594    final State state;
595
596    /** If true, the user requested a shutdown while the service was still starting up. */
597    final boolean shutdownWhenStartupFinishes;
598
599    /**
600     * The exception that caused this service to fail. This will be {@code null} unless the service
601     * has failed.
602     */
603    final @Nullable Throwable failure;
604
605    StateSnapshot(State internalState) {
606      this(internalState, false, null);
607    }
608
609    StateSnapshot(
610        State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
611      checkArgument(
612          !shutdownWhenStartupFinishes || internalState == STARTING,
613          "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
614          internalState);
615      checkArgument(
616          (failure != null) == (internalState == FAILED),
617          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
618              + "instead.",
619          internalState,
620          failure);
621      this.state = internalState;
622      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
623      this.failure = failure;
624    }
625
626    /**
627     * @see Service#state()
628     */
629    State externalState() {
630      if (shutdownWhenStartupFinishes && state == STARTING) {
631        return STOPPING;
632      } else {
633        return state;
634      }
635    }
636
637    /**
638     * @see Service#failureCause()
639     */
640    Throwable failureCause() {
641      checkState(
642          state == FAILED,
643          "failureCause() is only valid if the service has failed, service is %s",
644          state);
645      // requireNonNull is safe because the constructor requires a non-null cause with state=FAILED.
646      return requireNonNull(failure);
647    }
648  }
649}