001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.base.Preconditions.checkState;
020import static com.google.common.util.concurrent.Service.State.FAILED;
021import static com.google.common.util.concurrent.Service.State.NEW;
022import static com.google.common.util.concurrent.Service.State.RUNNING;
023import static com.google.common.util.concurrent.Service.State.STARTING;
024import static com.google.common.util.concurrent.Service.State.STOPPING;
025import static com.google.common.util.concurrent.Service.State.TERMINATED;
026
027import com.google.common.annotations.Beta;
028import com.google.common.annotations.GwtIncompatible;
029import com.google.common.util.concurrent.Monitor.Guard;
030import com.google.common.util.concurrent.Service.State; // javadoc needs this
031import com.google.errorprone.annotations.CanIgnoreReturnValue;
032import com.google.errorprone.annotations.ForOverride;
033import com.google.j2objc.annotations.WeakOuter;
034import java.util.concurrent.Executor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import javax.annotation.Nullable;
038import javax.annotation.concurrent.GuardedBy;
039
040/**
041 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
042 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
043 * callbacks. Its subclasses must manage threads manually; consider {@link
044 * AbstractExecutionThreadService} if you need only a single execution thread.
045 *
046 * @author Jesse Wilson
047 * @author Luke Sandberg
048 * @since 1.0
049 */
050@Beta
051@GwtIncompatible
052public abstract class AbstractService implements Service {
053  private static final ListenerCallQueue.Event<Listener> STARTING_EVENT =
054      new ListenerCallQueue.Event<Listener>() {
055        @Override
056        public void call(Listener listener) {
057          listener.starting();
058        }
059
060        @Override
061        public String toString() {
062          return "starting()";
063        }
064      };
065  private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT =
066      new ListenerCallQueue.Event<Listener>() {
067        @Override
068        public void call(Listener listener) {
069          listener.running();
070        }
071
072        @Override
073        public String toString() {
074          return "running()";
075        }
076      };
077  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT =
078      stoppingEvent(STARTING);
079  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT =
080      stoppingEvent(RUNNING);
081
082  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
083      terminatedEvent(NEW);
084  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
085      terminatedEvent(RUNNING);
086  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
087      terminatedEvent(STOPPING);
088
089  private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) {
090    return new ListenerCallQueue.Event<Listener>() {
091      @Override
092      public void call(Listener listener) {
093        listener.terminated(from);
094      }
095
096      @Override
097      public String toString() {
098        return "terminated({from = " + from + "})";
099      }
100    };
101  }
102
103  private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) {
104    return new ListenerCallQueue.Event<Listener>() {
105      @Override
106      public void call(Listener listener) {
107        listener.stopping(from);
108      }
109
110      @Override
111      public String toString() {
112        return "stopping({from = " + from + "})";
113      }
114    };
115  }
116
117  private final Monitor monitor = new Monitor();
118
119  private final Guard isStartable = new IsStartableGuard();
120
121  @WeakOuter
122  private final class IsStartableGuard extends Guard {
123    IsStartableGuard() {
124      super(AbstractService.this.monitor);
125    }
126
127    @Override
128    public boolean isSatisfied() {
129      return state() == NEW;
130    }
131  }
132
133  private final Guard isStoppable = new IsStoppableGuard();
134
135  @WeakOuter
136  private final class IsStoppableGuard extends Guard {
137    IsStoppableGuard() {
138      super(AbstractService.this.monitor);
139    }
140
141    @Override
142    public boolean isSatisfied() {
143      return state().compareTo(RUNNING) <= 0;
144    }
145  }
146
147  private final Guard hasReachedRunning = new HasReachedRunningGuard();
148
149  @WeakOuter
150  private final class HasReachedRunningGuard extends Guard {
151    HasReachedRunningGuard() {
152      super(AbstractService.this.monitor);
153    }
154
155    @Override
156    public boolean isSatisfied() {
157      return state().compareTo(RUNNING) >= 0;
158    }
159  }
160
161  private final Guard isStopped = new IsStoppedGuard();
162
163  @WeakOuter
164  private final class IsStoppedGuard extends Guard {
165    IsStoppedGuard() {
166      super(AbstractService.this.monitor);
167    }
168
169    @Override
170    public boolean isSatisfied() {
171      return state().isTerminal();
172    }
173  }
174
175  /** The listeners to notify during a state transition. */
176  private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>();
177
178  /**
179   * The current state of the service. This should be written with the lock held but can be read
180   * without it because it is an immutable object in a volatile field. This is desirable so that
181   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
182   * without grabbing the lock.
183   *
184   * <p>To update this field correctly the lock must be held to guarantee that the state is
185   * consistent.
186   */
187  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
188
189  /** Constructor for use by subclasses. */
190  protected AbstractService() {}
191
192  /**
193   * This method is called by {@link #startAsync} to initiate service startup. The invocation of
194   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
195   * or after it has returned. If startup fails, the invocation should cause a call to {@link
196   * #notifyFailed(Throwable)} instead.
197   *
198   * <p>This method should return promptly; prefer to do work on a different thread where it is
199   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
200   * called multiple times.
201   */
202  @ForOverride
203  protected abstract void doStart();
204
205  /**
206   * This method should be used to initiate service shutdown. The invocation of this method should
207   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
208   * returned. If shutdown fails, the invocation should cause a call to {@link
209   * #notifyFailed(Throwable)} instead.
210   *
211   * <p>This method should return promptly; prefer to do work on a different thread where it is
212   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
213   * called multiple times.
214   */
215  @ForOverride
216  protected abstract void doStop();
217
218  @CanIgnoreReturnValue
219  @Override
220  public final Service startAsync() {
221    if (monitor.enterIf(isStartable)) {
222      try {
223        snapshot = new StateSnapshot(STARTING);
224        enqueueStartingEvent();
225        doStart();
226      } catch (Throwable startupFailure) {
227        notifyFailed(startupFailure);
228      } finally {
229        monitor.leave();
230        dispatchListenerEvents();
231      }
232    } else {
233      throw new IllegalStateException("Service " + this + " has already been started");
234    }
235    return this;
236  }
237
238  @CanIgnoreReturnValue
239  @Override
240  public final Service stopAsync() {
241    if (monitor.enterIf(isStoppable)) {
242      try {
243        State previous = state();
244        switch (previous) {
245          case NEW:
246            snapshot = new StateSnapshot(TERMINATED);
247            enqueueTerminatedEvent(NEW);
248            break;
249          case STARTING:
250            snapshot = new StateSnapshot(STARTING, true, null);
251            enqueueStoppingEvent(STARTING);
252            break;
253          case RUNNING:
254            snapshot = new StateSnapshot(STOPPING);
255            enqueueStoppingEvent(RUNNING);
256            doStop();
257            break;
258          case STOPPING:
259          case TERMINATED:
260          case FAILED:
261            // These cases are impossible due to the if statement above.
262            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
263          default:
264            throw new AssertionError("Unexpected state: " + previous);
265        }
266      } catch (Throwable shutdownFailure) {
267        notifyFailed(shutdownFailure);
268      } finally {
269        monitor.leave();
270        dispatchListenerEvents();
271      }
272    }
273    return this;
274  }
275
276  @Override
277  public final void awaitRunning() {
278    monitor.enterWhenUninterruptibly(hasReachedRunning);
279    try {
280      checkCurrentState(RUNNING);
281    } finally {
282      monitor.leave();
283    }
284  }
285
286  @Override
287  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
288    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
289      try {
290        checkCurrentState(RUNNING);
291      } finally {
292        monitor.leave();
293      }
294    } else {
295      // It is possible due to races the we are currently in the expected state even though we
296      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
297      // even check the guard. I don't think we care too much about this use case but it could lead
298      // to a confusing error message.
299      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
300    }
301  }
302
303  @Override
304  public final void awaitTerminated() {
305    monitor.enterWhenUninterruptibly(isStopped);
306    try {
307      checkCurrentState(TERMINATED);
308    } finally {
309      monitor.leave();
310    }
311  }
312
313  @Override
314  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
315    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
316      try {
317        checkCurrentState(TERMINATED);
318      } finally {
319        monitor.leave();
320      }
321    } else {
322      // It is possible due to races the we are currently in the expected state even though we
323      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
324      // even check the guard. I don't think we care too much about this use case but it could lead
325      // to a confusing error message.
326      throw new TimeoutException(
327          "Timed out waiting for "
328              + this
329              + " to reach a terminal state. "
330              + "Current state: "
331              + state());
332    }
333  }
334
335  /** Checks that the current state is equal to the expected state. */
336  @GuardedBy("monitor")
337  private void checkCurrentState(State expected) {
338    State actual = state();
339    if (actual != expected) {
340      if (actual == FAILED) {
341        // Handle this specially so that we can include the failureCause, if there is one.
342        throw new IllegalStateException(
343            "Expected the service " + this + " to be " + expected + ", but the service has FAILED",
344            failureCause());
345      }
346      throw new IllegalStateException(
347          "Expected the service " + this + " to be " + expected + ", but was " + actual);
348    }
349  }
350
351  /**
352   * Implementing classes should invoke this method once their service has started. It will cause
353   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
354   *
355   * @throws IllegalStateException if the service is not {@link State#STARTING}.
356   */
357  protected final void notifyStarted() {
358    monitor.enter();
359    try {
360      // We have to examine the internal state of the snapshot here to properly handle the stop
361      // while starting case.
362      if (snapshot.state != STARTING) {
363        IllegalStateException failure =
364            new IllegalStateException(
365                "Cannot notifyStarted() when the service is " + snapshot.state);
366        notifyFailed(failure);
367        throw failure;
368      }
369
370      if (snapshot.shutdownWhenStartupFinishes) {
371        snapshot = new StateSnapshot(STOPPING);
372        // We don't call listeners here because we already did that when we set the
373        // shutdownWhenStartupFinishes flag.
374        doStop();
375      } else {
376        snapshot = new StateSnapshot(RUNNING);
377        enqueueRunningEvent();
378      }
379    } finally {
380      monitor.leave();
381      dispatchListenerEvents();
382    }
383  }
384
385  /**
386   * Implementing classes should invoke this method once their service has stopped. It will cause
387   * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
388   *
389   * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor {@link
390   *     State#RUNNING}.
391   */
392  protected final void notifyStopped() {
393    monitor.enter();
394    try {
395      // We check the internal state of the snapshot instead of state() directly so we don't allow
396      // notifyStopped() to be called while STARTING, even if stop() has already been called.
397      State previous = snapshot.state;
398      if (previous != STOPPING && previous != RUNNING) {
399        IllegalStateException failure =
400            new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
401        notifyFailed(failure);
402        throw failure;
403      }
404      snapshot = new StateSnapshot(TERMINATED);
405      enqueueTerminatedEvent(previous);
406    } finally {
407      monitor.leave();
408      dispatchListenerEvents();
409    }
410  }
411
412  /**
413   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
414   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
415   * or otherwise cannot be started nor stopped.
416   */
417  protected final void notifyFailed(Throwable cause) {
418    checkNotNull(cause);
419
420    monitor.enter();
421    try {
422      State previous = state();
423      switch (previous) {
424        case NEW:
425        case TERMINATED:
426          throw new IllegalStateException("Failed while in state:" + previous, cause);
427        case RUNNING:
428        case STARTING:
429        case STOPPING:
430          snapshot = new StateSnapshot(FAILED, false, cause);
431          enqueueFailedEvent(previous, cause);
432          break;
433        case FAILED:
434          // Do nothing
435          break;
436        default:
437          throw new AssertionError("Unexpected state: " + previous);
438      }
439    } finally {
440      monitor.leave();
441      dispatchListenerEvents();
442    }
443  }
444
445  @Override
446  public final boolean isRunning() {
447    return state() == RUNNING;
448  }
449
450  @Override
451  public final State state() {
452    return snapshot.externalState();
453  }
454
455  /**
456   * @since 14.0
457   */
458  @Override
459  public final Throwable failureCause() {
460    return snapshot.failureCause();
461  }
462
463  /**
464   * @since 13.0
465   */
466  @Override
467  public final void addListener(Listener listener, Executor executor) {
468    listeners.addListener(listener, executor);
469  }
470
471  @Override
472  public String toString() {
473    return getClass().getSimpleName() + " [" + state() + "]";
474  }
475
476  /**
477   * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link
478   * #monitor}.
479   */
480  private void dispatchListenerEvents() {
481    if (!monitor.isOccupiedByCurrentThread()) {
482      listeners.dispatch();
483    }
484  }
485
486  private void enqueueStartingEvent() {
487    listeners.enqueue(STARTING_EVENT);
488  }
489
490  private void enqueueRunningEvent() {
491    listeners.enqueue(RUNNING_EVENT);
492  }
493
494  private void enqueueStoppingEvent(final State from) {
495    if (from == State.STARTING) {
496      listeners.enqueue(STOPPING_FROM_STARTING_EVENT);
497    } else if (from == State.RUNNING) {
498      listeners.enqueue(STOPPING_FROM_RUNNING_EVENT);
499    } else {
500      throw new AssertionError();
501    }
502  }
503
504  private void enqueueTerminatedEvent(final State from) {
505    switch (from) {
506      case NEW:
507        listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
508        break;
509      case RUNNING:
510        listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
511        break;
512      case STOPPING:
513        listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
514        break;
515      case STARTING:
516      case TERMINATED:
517      case FAILED:
518      default:
519        throw new AssertionError();
520    }
521  }
522
523  private void enqueueFailedEvent(final State from, final Throwable cause) {
524    // can't memoize this one due to the exception
525    listeners.enqueue(
526        new ListenerCallQueue.Event<Listener>() {
527          @Override
528          public void call(Listener listener) {
529            listener.failed(from, cause);
530          }
531
532          @Override
533          public String toString() {
534            return "failed({from = " + from + ", cause = " + cause + "})";
535          }
536        });
537  }
538
539  /**
540   * An immutable snapshot of the current state of the service. This class represents a consistent
541   * snapshot of the state and therefore it can be used to answer simple queries without needing to
542   * grab a lock.
543   */
544  // @Immutable except that Throwable is mutable (initCause(), setStackTrace(), mutable subclasses).
545  private static final class StateSnapshot {
546    /**
547     * The internal state, which equals external state unless shutdownWhenStartupFinishes is true.
548     */
549    final State state;
550
551    /**
552     * If true, the user requested a shutdown while the service was still starting up.
553     */
554    final boolean shutdownWhenStartupFinishes;
555
556    /**
557     * The exception that caused this service to fail. This will be {@code null} unless the service
558     * has failed.
559     */
560    @Nullable final Throwable failure;
561
562    StateSnapshot(State internalState) {
563      this(internalState, false, null);
564    }
565
566    StateSnapshot(
567        State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
568      checkArgument(
569          !shutdownWhenStartupFinishes || internalState == STARTING,
570          "shutdownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
571          internalState);
572      checkArgument(
573          !(failure != null ^ internalState == FAILED),
574          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
575              + "instead.",
576          internalState,
577          failure);
578      this.state = internalState;
579      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
580      this.failure = failure;
581    }
582
583    /** @see Service#state() */
584    State externalState() {
585      if (shutdownWhenStartupFinishes && state == STARTING) {
586        return STOPPING;
587      } else {
588        return state;
589      }
590    }
591
592    /** @see Service#failureCause() */
593    Throwable failureCause() {
594      checkState(
595          state == FAILED,
596          "failureCause() is only valid if the service has failed, service is %s",
597          state);
598      return failure;
599    }
600  }
601}