001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.base.Preconditions.checkState;
020import static com.google.common.util.concurrent.Service.State.FAILED;
021import static com.google.common.util.concurrent.Service.State.NEW;
022import static com.google.common.util.concurrent.Service.State.RUNNING;
023import static com.google.common.util.concurrent.Service.State.STARTING;
024import static com.google.common.util.concurrent.Service.State.STOPPING;
025import static com.google.common.util.concurrent.Service.State.TERMINATED;
026
027import com.google.common.annotations.Beta;
028import com.google.common.annotations.GwtIncompatible;
029import com.google.common.util.concurrent.Monitor.Guard;
030import com.google.common.util.concurrent.Service.State; // javadoc needs this
031import com.google.errorprone.annotations.CanIgnoreReturnValue;
032import com.google.errorprone.annotations.ForOverride;
033import com.google.errorprone.annotations.concurrent.GuardedBy;
034import com.google.j2objc.annotations.WeakOuter;
035import java.util.concurrent.Executor;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import org.checkerframework.checker.nullness.compatqual.NullableDecl;
039
040/**
041 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
042 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
043 * callbacks. Its subclasses must manage threads manually; consider {@link
044 * AbstractExecutionThreadService} if you need only a single execution thread.
045 *
046 * @author Jesse Wilson
047 * @author Luke Sandberg
048 * @since 1.0
049 */
050@GwtIncompatible
051public abstract class AbstractService implements Service {
052  private static final ListenerCallQueue.Event<Listener> STARTING_EVENT =
053      new ListenerCallQueue.Event<Listener>() {
054        @Override
055        public void call(Listener listener) {
056          listener.starting();
057        }
058
059        @Override
060        public String toString() {
061          return "starting()";
062        }
063      };
064  private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT =
065      new ListenerCallQueue.Event<Listener>() {
066        @Override
067        public void call(Listener listener) {
068          listener.running();
069        }
070
071        @Override
072        public String toString() {
073          return "running()";
074        }
075      };
076  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT =
077      stoppingEvent(STARTING);
078  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT =
079      stoppingEvent(RUNNING);
080
081  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
082      terminatedEvent(NEW);
083  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STARTING_EVENT =
084      terminatedEvent(STARTING);
085  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
086      terminatedEvent(RUNNING);
087  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
088      terminatedEvent(STOPPING);
089
090  private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) {
091    return new ListenerCallQueue.Event<Listener>() {
092      @Override
093      public void call(Listener listener) {
094        listener.terminated(from);
095      }
096
097      @Override
098      public String toString() {
099        return "terminated({from = " + from + "})";
100      }
101    };
102  }
103
104  private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) {
105    return new ListenerCallQueue.Event<Listener>() {
106      @Override
107      public void call(Listener listener) {
108        listener.stopping(from);
109      }
110
111      @Override
112      public String toString() {
113        return "stopping({from = " + from + "})";
114      }
115    };
116  }
117
118  private final Monitor monitor = new Monitor();
119
120  private final Guard isStartable = new IsStartableGuard();
121
122  @WeakOuter
123  private final class IsStartableGuard extends Guard {
124    IsStartableGuard() {
125      super(AbstractService.this.monitor);
126    }
127
128    @Override
129    public boolean isSatisfied() {
130      return state() == NEW;
131    }
132  }
133
134  private final Guard isStoppable = new IsStoppableGuard();
135
136  @WeakOuter
137  private final class IsStoppableGuard extends Guard {
138    IsStoppableGuard() {
139      super(AbstractService.this.monitor);
140    }
141
142    @Override
143    public boolean isSatisfied() {
144      return state().compareTo(RUNNING) <= 0;
145    }
146  }
147
148  private final Guard hasReachedRunning = new HasReachedRunningGuard();
149
150  @WeakOuter
151  private final class HasReachedRunningGuard extends Guard {
152    HasReachedRunningGuard() {
153      super(AbstractService.this.monitor);
154    }
155
156    @Override
157    public boolean isSatisfied() {
158      return state().compareTo(RUNNING) >= 0;
159    }
160  }
161
162  private final Guard isStopped = new IsStoppedGuard();
163
164  @WeakOuter
165  private final class IsStoppedGuard extends Guard {
166    IsStoppedGuard() {
167      super(AbstractService.this.monitor);
168    }
169
170    @Override
171    public boolean isSatisfied() {
172      return state().isTerminal();
173    }
174  }
175
176  /** The listeners to notify during a state transition. */
177  private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>();
178
179  /**
180   * The current state of the service. This should be written with the lock held but can be read
181   * without it because it is an immutable object in a volatile field. This is desirable so that
182   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
183   * without grabbing the lock.
184   *
185   * <p>To update this field correctly the lock must be held to guarantee that the state is
186   * consistent.
187   */
188  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
189
190  /** Constructor for use by subclasses. */
191  protected AbstractService() {}
192
193  /**
194   * This method is called by {@link #startAsync} to initiate service startup. The invocation of
195   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
196   * or after it has returned. If startup fails, the invocation should cause a call to {@link
197   * #notifyFailed(Throwable)} instead.
198   *
199   * <p>This method should return promptly; prefer to do work on a different thread where it is
200   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
201   * called multiple times.
202   */
203  @ForOverride
204  protected abstract void doStart();
205
206  /**
207   * This method should be used to initiate service shutdown. The invocation of this method should
208   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
209   * returned. If shutdown fails, the invocation should cause a call to {@link
210   * #notifyFailed(Throwable)} instead.
211   *
212   * <p>This method should return promptly; prefer to do work on a different thread where it is
213   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
214   * called multiple times.
215   *
216   * <p>If {@link #stopAsync} is called on a {@link State#STARTING} service, this method is not
217   * invoked immediately. Instead, it will be deferred until after the service is {@link
218   * State#RUNNING}. Services that need to cancel startup work can override {@link #doCancelStart}.
219   */
220  @ForOverride
221  protected abstract void doStop();
222
223  /**
224   * This method is called by {@link #stopAsync} when the service is still starting (i.e. {@link
225   * #startAsync} has been called but {@link #notifyStarted} has not). Subclasses can override the
226   * method to cancel pending work and then call {@link #notifyStopped} to stop the service.
227   *
228   * <p>This method should return promptly; prefer to do work on a different thread where it is
229   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
230   * called multiple times.
231   *
232   * <p>When this method is called {@link #state()} will return {@link State#STOPPING}, which is the
233   * external state observable by the caller of {@link #stopAsync}.
234   *
235   * @since 27.0
236   */
237  @Beta
238  @ForOverride
239  protected void doCancelStart() {}
240
241  @CanIgnoreReturnValue
242  @Override
243  public final Service startAsync() {
244    if (monitor.enterIf(isStartable)) {
245      try {
246        snapshot = new StateSnapshot(STARTING);
247        enqueueStartingEvent();
248        doStart();
249      } catch (Throwable startupFailure) {
250        notifyFailed(startupFailure);
251      } finally {
252        monitor.leave();
253        dispatchListenerEvents();
254      }
255    } else {
256      throw new IllegalStateException("Service " + this + " has already been started");
257    }
258    return this;
259  }
260
261  @CanIgnoreReturnValue
262  @Override
263  public final Service stopAsync() {
264    if (monitor.enterIf(isStoppable)) {
265      try {
266        State previous = state();
267        switch (previous) {
268          case NEW:
269            snapshot = new StateSnapshot(TERMINATED);
270            enqueueTerminatedEvent(NEW);
271            break;
272          case STARTING:
273            snapshot = new StateSnapshot(STARTING, true, null);
274            enqueueStoppingEvent(STARTING);
275            doCancelStart();
276            break;
277          case RUNNING:
278            snapshot = new StateSnapshot(STOPPING);
279            enqueueStoppingEvent(RUNNING);
280            doStop();
281            break;
282          case STOPPING:
283          case TERMINATED:
284          case FAILED:
285            // These cases are impossible due to the if statement above.
286            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
287        }
288      } catch (Throwable shutdownFailure) {
289        notifyFailed(shutdownFailure);
290      } finally {
291        monitor.leave();
292        dispatchListenerEvents();
293      }
294    }
295    return this;
296  }
297
298  @Override
299  public final void awaitRunning() {
300    monitor.enterWhenUninterruptibly(hasReachedRunning);
301    try {
302      checkCurrentState(RUNNING);
303    } finally {
304      monitor.leave();
305    }
306  }
307
308  @Override
309  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
310    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
311      try {
312        checkCurrentState(RUNNING);
313      } finally {
314        monitor.leave();
315      }
316    } else {
317      // It is possible due to races the we are currently in the expected state even though we
318      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
319      // even check the guard. I don't think we care too much about this use case but it could lead
320      // to a confusing error message.
321      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
322    }
323  }
324
325  @Override
326  public final void awaitTerminated() {
327    monitor.enterWhenUninterruptibly(isStopped);
328    try {
329      checkCurrentState(TERMINATED);
330    } finally {
331      monitor.leave();
332    }
333  }
334
335  @Override
336  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
337    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
338      try {
339        checkCurrentState(TERMINATED);
340      } finally {
341        monitor.leave();
342      }
343    } else {
344      // It is possible due to races the we are currently in the expected state even though we
345      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
346      // even check the guard. I don't think we care too much about this use case but it could lead
347      // to a confusing error message.
348      throw new TimeoutException(
349          "Timed out waiting for "
350              + this
351              + " to reach a terminal state. "
352              + "Current state: "
353              + state());
354    }
355  }
356
357  /** Checks that the current state is equal to the expected state. */
358  @GuardedBy("monitor")
359  private void checkCurrentState(State expected) {
360    State actual = state();
361    if (actual != expected) {
362      if (actual == FAILED) {
363        // Handle this specially so that we can include the failureCause, if there is one.
364        throw new IllegalStateException(
365            "Expected the service " + this + " to be " + expected + ", but the service has FAILED",
366            failureCause());
367      }
368      throw new IllegalStateException(
369          "Expected the service " + this + " to be " + expected + ", but was " + actual);
370    }
371  }
372
373  /**
374   * Implementing classes should invoke this method once their service has started. It will cause
375   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
376   *
377   * @throws IllegalStateException if the service is not {@link State#STARTING}.
378   */
379  protected final void notifyStarted() {
380    monitor.enter();
381    try {
382      // We have to examine the internal state of the snapshot here to properly handle the stop
383      // while starting case.
384      if (snapshot.state != STARTING) {
385        IllegalStateException failure =
386            new IllegalStateException(
387                "Cannot notifyStarted() when the service is " + snapshot.state);
388        notifyFailed(failure);
389        throw failure;
390      }
391
392      if (snapshot.shutdownWhenStartupFinishes) {
393        snapshot = new StateSnapshot(STOPPING);
394        // We don't call listeners here because we already did that when we set the
395        // shutdownWhenStartupFinishes flag.
396        doStop();
397      } else {
398        snapshot = new StateSnapshot(RUNNING);
399        enqueueRunningEvent();
400      }
401    } finally {
402      monitor.leave();
403      dispatchListenerEvents();
404    }
405  }
406
407  /**
408   * Implementing classes should invoke this method once their service has stopped. It will cause
409   * the service to transition from {@link State#STARTING} or {@link State#STOPPING} to {@link
410   * State#TERMINATED}.
411   *
412   * @throws IllegalStateException if the service is not one of {@link State#STOPPING}, {@link
413   *     State#STARTING}, or {@link State#RUNNING}.
414   */
415  protected final void notifyStopped() {
416    monitor.enter();
417    try {
418      State previous = state();
419      switch (previous) {
420        case NEW:
421        case TERMINATED:
422        case FAILED:
423          throw new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
424        case RUNNING:
425        case STARTING:
426        case STOPPING:
427          snapshot = new StateSnapshot(TERMINATED);
428          enqueueTerminatedEvent(previous);
429          break;
430      }
431    } finally {
432      monitor.leave();
433      dispatchListenerEvents();
434    }
435  }
436
437  /**
438   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
439   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
440   * or otherwise cannot be started nor stopped.
441   */
442  protected final void notifyFailed(Throwable cause) {
443    checkNotNull(cause);
444
445    monitor.enter();
446    try {
447      State previous = state();
448      switch (previous) {
449        case NEW:
450        case TERMINATED:
451          throw new IllegalStateException("Failed while in state:" + previous, cause);
452        case RUNNING:
453        case STARTING:
454        case STOPPING:
455          snapshot = new StateSnapshot(FAILED, false, cause);
456          enqueueFailedEvent(previous, cause);
457          break;
458        case FAILED:
459          // Do nothing
460          break;
461      }
462    } finally {
463      monitor.leave();
464      dispatchListenerEvents();
465    }
466  }
467
468  @Override
469  public final boolean isRunning() {
470    return state() == RUNNING;
471  }
472
473  @Override
474  public final State state() {
475    return snapshot.externalState();
476  }
477
478  /** @since 14.0 */
479  @Override
480  public final Throwable failureCause() {
481    return snapshot.failureCause();
482  }
483
484  /** @since 13.0 */
485  @Override
486  public final void addListener(Listener listener, Executor executor) {
487    listeners.addListener(listener, executor);
488  }
489
490  @Override
491  public String toString() {
492    return getClass().getSimpleName() + " [" + state() + "]";
493  }
494
495  /**
496   * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link
497   * #monitor}.
498   */
499  private void dispatchListenerEvents() {
500    if (!monitor.isOccupiedByCurrentThread()) {
501      listeners.dispatch();
502    }
503  }
504
505  private void enqueueStartingEvent() {
506    listeners.enqueue(STARTING_EVENT);
507  }
508
509  private void enqueueRunningEvent() {
510    listeners.enqueue(RUNNING_EVENT);
511  }
512
513  private void enqueueStoppingEvent(final State from) {
514    if (from == State.STARTING) {
515      listeners.enqueue(STOPPING_FROM_STARTING_EVENT);
516    } else if (from == State.RUNNING) {
517      listeners.enqueue(STOPPING_FROM_RUNNING_EVENT);
518    } else {
519      throw new AssertionError();
520    }
521  }
522
523  private void enqueueTerminatedEvent(final State from) {
524    switch (from) {
525      case NEW:
526        listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
527        break;
528      case STARTING:
529        listeners.enqueue(TERMINATED_FROM_STARTING_EVENT);
530        break;
531      case RUNNING:
532        listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
533        break;
534      case STOPPING:
535        listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
536        break;
537      case TERMINATED:
538      case FAILED:
539        throw new AssertionError();
540    }
541  }
542
543  private void enqueueFailedEvent(final State from, final Throwable cause) {
544    // can't memoize this one due to the exception
545    listeners.enqueue(
546        new ListenerCallQueue.Event<Listener>() {
547          @Override
548          public void call(Listener listener) {
549            listener.failed(from, cause);
550          }
551
552          @Override
553          public String toString() {
554            return "failed({from = " + from + ", cause = " + cause + "})";
555          }
556        });
557  }
558
559  /**
560   * An immutable snapshot of the current state of the service. This class represents a consistent
561   * snapshot of the state and therefore it can be used to answer simple queries without needing to
562   * grab a lock.
563   */
564  // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses).
565  private static final class StateSnapshot {
566    /**
567     * The internal state, which equals external state unless shutdownWhenStartupFinishes is true.
568     */
569    final State state;
570
571    /** If true, the user requested a shutdown while the service was still starting up. */
572    final boolean shutdownWhenStartupFinishes;
573
574    /**
575     * The exception that caused this service to fail. This will be {@code null} unless the service
576     * has failed.
577     */
578    @NullableDecl final Throwable failure;
579
580    StateSnapshot(State internalState) {
581      this(internalState, false, null);
582    }
583
584    StateSnapshot(
585        State internalState, boolean shutdownWhenStartupFinishes, @NullableDecl Throwable failure) {
586      checkArgument(
587          !shutdownWhenStartupFinishes || internalState == STARTING,
588          "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
589          internalState);
590      checkArgument(
591          !(failure != null ^ internalState == FAILED),
592          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
593              + "instead.",
594          internalState,
595          failure);
596      this.state = internalState;
597      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
598      this.failure = failure;
599    }
600
601    /** @see Service#state() */
602    State externalState() {
603      if (shutdownWhenStartupFinishes && state == STARTING) {
604        return STOPPING;
605      } else {
606        return state;
607      }
608    }
609
610    /** @see Service#failureCause() */
611    Throwable failureCause() {
612      checkState(
613          state == FAILED,
614          "failureCause() is only valid if the service has failed, service is %s",
615          state);
616      return failure;
617    }
618  }
619}