001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.base.Preconditions.checkState;
020import static com.google.common.util.concurrent.Service.State.FAILED;
021import static com.google.common.util.concurrent.Service.State.NEW;
022import static com.google.common.util.concurrent.Service.State.RUNNING;
023import static com.google.common.util.concurrent.Service.State.STARTING;
024import static com.google.common.util.concurrent.Service.State.STOPPING;
025import static com.google.common.util.concurrent.Service.State.TERMINATED;
026
027import com.google.common.annotations.Beta;
028import com.google.common.annotations.GwtIncompatible;
029import com.google.common.util.concurrent.Monitor.Guard;
030import com.google.common.util.concurrent.Service.State; // javadoc needs this
031import com.google.errorprone.annotations.CanIgnoreReturnValue;
032import com.google.errorprone.annotations.ForOverride;
033import com.google.j2objc.annotations.WeakOuter;
034import java.util.concurrent.Executor;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import javax.annotation.Nullable;
038import javax.annotation.concurrent.GuardedBy;
039import javax.annotation.concurrent.Immutable;
040
041/**
042 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
043 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
044 * callbacks. Its subclasses must manage threads manually; consider {@link
045 * AbstractExecutionThreadService} if you need only a single execution thread.
046 *
047 * @author Jesse Wilson
048 * @author Luke Sandberg
049 * @since 1.0
050 */
051@Beta
052@GwtIncompatible
053public abstract class AbstractService implements Service {
054  private static final ListenerCallQueue.Event<Listener> STARTING_EVENT =
055      new ListenerCallQueue.Event<Listener>() {
056        @Override
057        public void call(Listener listener) {
058          listener.starting();
059        }
060
061        @Override
062        public String toString() {
063          return "starting()";
064        }
065      };
066  private static final ListenerCallQueue.Event<Listener> RUNNING_EVENT =
067      new ListenerCallQueue.Event<Listener>() {
068        @Override
069        public void call(Listener listener) {
070          listener.running();
071        }
072
073        @Override
074        public String toString() {
075          return "running()";
076        }
077      };
078  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_STARTING_EVENT =
079      stoppingEvent(STARTING);
080  private static final ListenerCallQueue.Event<Listener> STOPPING_FROM_RUNNING_EVENT =
081      stoppingEvent(RUNNING);
082
083  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_NEW_EVENT =
084      terminatedEvent(NEW);
085  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_RUNNING_EVENT =
086      terminatedEvent(RUNNING);
087  private static final ListenerCallQueue.Event<Listener> TERMINATED_FROM_STOPPING_EVENT =
088      terminatedEvent(STOPPING);
089
090  private static ListenerCallQueue.Event<Listener> terminatedEvent(final State from) {
091    return new ListenerCallQueue.Event<Listener>() {
092      @Override
093      public void call(Listener listener) {
094        listener.terminated(from);
095      }
096
097      @Override
098      public String toString() {
099        return "terminated({from = " + from + "})";
100      }
101    };
102  }
103
104  private static ListenerCallQueue.Event<Listener> stoppingEvent(final State from) {
105    return new ListenerCallQueue.Event<Listener>() {
106      @Override
107      public void call(Listener listener) {
108        listener.stopping(from);
109      }
110
111      @Override
112      public String toString() {
113        return "stopping({from = " + from + "})";
114      }
115    };
116  }
117
118  private final Monitor monitor = new Monitor();
119
120  private final Guard isStartable = new IsStartableGuard();
121
122  @WeakOuter
123  private final class IsStartableGuard extends Guard {
124    IsStartableGuard() {
125      super(AbstractService.this.monitor);
126    }
127
128    @Override
129    public boolean isSatisfied() {
130      return state() == NEW;
131    }
132  }
133
134  private final Guard isStoppable = new IsStoppableGuard();
135
136  @WeakOuter
137  private final class IsStoppableGuard extends Guard {
138    IsStoppableGuard() {
139      super(AbstractService.this.monitor);
140    }
141
142    @Override
143    public boolean isSatisfied() {
144      return state().compareTo(RUNNING) <= 0;
145    }
146  }
147
148  private final Guard hasReachedRunning = new HasReachedRunningGuard();
149
150  @WeakOuter
151  private final class HasReachedRunningGuard extends Guard {
152    HasReachedRunningGuard() {
153      super(AbstractService.this.monitor);
154    }
155
156    @Override
157    public boolean isSatisfied() {
158      return state().compareTo(RUNNING) >= 0;
159    }
160  }
161
162  private final Guard isStopped = new IsStoppedGuard();
163
164  @WeakOuter
165  private final class IsStoppedGuard extends Guard {
166    IsStoppedGuard() {
167      super(AbstractService.this.monitor);
168    }
169
170    @Override
171    public boolean isSatisfied() {
172      return state().isTerminal();
173    }
174  }
175
176  /** The listeners to notify during a state transition. */
177  private final ListenerCallQueue<Listener> listeners = new ListenerCallQueue<>();
178
179  /**
180   * The current state of the service. This should be written with the lock held but can be read
181   * without it because it is an immutable object in a volatile field. This is desirable so that
182   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
183   * without grabbing the lock.
184   *
185   * <p>To update this field correctly the lock must be held to guarantee that the state is
186   * consistent.
187   */
188  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
189
190  /** Constructor for use by subclasses. */
191  protected AbstractService() {}
192
193  /**
194   * This method is called by {@link #startAsync} to initiate service startup. The invocation of
195   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
196   * or after it has returned. If startup fails, the invocation should cause a call to {@link
197   * #notifyFailed(Throwable)} instead.
198   *
199   * <p>This method should return promptly; prefer to do work on a different thread where it is
200   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is
201   * called multiple times.
202   */
203  @ForOverride
204  protected abstract void doStart();
205
206  /**
207   * This method should be used to initiate service shutdown. The invocation of this method should
208   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
209   * returned. If shutdown fails, the invocation should cause a call to {@link
210   * #notifyFailed(Throwable)} instead.
211   *
212   * <p>This method should return promptly; prefer to do work on a different thread where it is
213   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is
214   * called multiple times.
215   */
216  @ForOverride
217  protected abstract void doStop();
218
219  @CanIgnoreReturnValue
220  @Override
221  public final Service startAsync() {
222    if (monitor.enterIf(isStartable)) {
223      try {
224        snapshot = new StateSnapshot(STARTING);
225        enqueueStartingEvent();
226        doStart();
227      } catch (Throwable startupFailure) {
228        notifyFailed(startupFailure);
229      } finally {
230        monitor.leave();
231        dispatchListenerEvents();
232      }
233    } else {
234      throw new IllegalStateException("Service " + this + " has already been started");
235    }
236    return this;
237  }
238
239  @CanIgnoreReturnValue
240  @Override
241  public final Service stopAsync() {
242    if (monitor.enterIf(isStoppable)) {
243      try {
244        State previous = state();
245        switch (previous) {
246          case NEW:
247            snapshot = new StateSnapshot(TERMINATED);
248            enqueueTerminatedEvent(NEW);
249            break;
250          case STARTING:
251            snapshot = new StateSnapshot(STARTING, true, null);
252            enqueueStoppingEvent(STARTING);
253            break;
254          case RUNNING:
255            snapshot = new StateSnapshot(STOPPING);
256            enqueueStoppingEvent(RUNNING);
257            doStop();
258            break;
259          case STOPPING:
260          case TERMINATED:
261          case FAILED:
262            // These cases are impossible due to the if statement above.
263            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
264          default:
265            throw new AssertionError("Unexpected state: " + previous);
266        }
267      } catch (Throwable shutdownFailure) {
268        notifyFailed(shutdownFailure);
269      } finally {
270        monitor.leave();
271        dispatchListenerEvents();
272      }
273    }
274    return this;
275  }
276
277  @Override
278  public final void awaitRunning() {
279    monitor.enterWhenUninterruptibly(hasReachedRunning);
280    try {
281      checkCurrentState(RUNNING);
282    } finally {
283      monitor.leave();
284    }
285  }
286
287  @Override
288  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
289    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
290      try {
291        checkCurrentState(RUNNING);
292      } finally {
293        monitor.leave();
294      }
295    } else {
296      // It is possible due to races the we are currently in the expected state even though we
297      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
298      // even check the guard. I don't think we care too much about this use case but it could lead
299      // to a confusing error message.
300      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state.");
301    }
302  }
303
304  @Override
305  public final void awaitTerminated() {
306    monitor.enterWhenUninterruptibly(isStopped);
307    try {
308      checkCurrentState(TERMINATED);
309    } finally {
310      monitor.leave();
311    }
312  }
313
314  @Override
315  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
316    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
317      try {
318        checkCurrentState(TERMINATED);
319      } finally {
320        monitor.leave();
321      }
322    } else {
323      // It is possible due to races the we are currently in the expected state even though we
324      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
325      // even check the guard. I don't think we care too much about this use case but it could lead
326      // to a confusing error message.
327      throw new TimeoutException(
328          "Timed out waiting for "
329              + this
330              + " to reach a terminal state. "
331              + "Current state: "
332              + state());
333    }
334  }
335
336  /** Checks that the current state is equal to the expected state. */
337  @GuardedBy("monitor")
338  private void checkCurrentState(State expected) {
339    State actual = state();
340    if (actual != expected) {
341      if (actual == FAILED) {
342        // Handle this specially so that we can include the failureCause, if there is one.
343        throw new IllegalStateException(
344            "Expected the service " + this + " to be " + expected + ", but the service has FAILED",
345            failureCause());
346      }
347      throw new IllegalStateException(
348          "Expected the service " + this + " to be " + expected + ", but was " + actual);
349    }
350  }
351
352  /**
353   * Implementing classes should invoke this method once their service has started. It will cause
354   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
355   *
356   * @throws IllegalStateException if the service is not {@link State#STARTING}.
357   */
358  protected final void notifyStarted() {
359    monitor.enter();
360    try {
361      // We have to examine the internal state of the snapshot here to properly handle the stop
362      // while starting case.
363      if (snapshot.state != STARTING) {
364        IllegalStateException failure =
365            new IllegalStateException(
366                "Cannot notifyStarted() when the service is " + snapshot.state);
367        notifyFailed(failure);
368        throw failure;
369      }
370
371      if (snapshot.shutdownWhenStartupFinishes) {
372        snapshot = new StateSnapshot(STOPPING);
373        // We don't call listeners here because we already did that when we set the
374        // shutdownWhenStartupFinishes flag.
375        doStop();
376      } else {
377        snapshot = new StateSnapshot(RUNNING);
378        enqueueRunningEvent();
379      }
380    } finally {
381      monitor.leave();
382      dispatchListenerEvents();
383    }
384  }
385
386  /**
387   * Implementing classes should invoke this method once their service has stopped. It will cause
388   * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
389   *
390   * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor {@link
391   *     State#RUNNING}.
392   */
393  protected final void notifyStopped() {
394    monitor.enter();
395    try {
396      // We check the internal state of the snapshot instead of state() directly so we don't allow
397      // notifyStopped() to be called while STARTING, even if stop() has already been called.
398      State previous = snapshot.state;
399      if (previous != STOPPING && previous != RUNNING) {
400        IllegalStateException failure =
401            new IllegalStateException("Cannot notifyStopped() when the service is " + previous);
402        notifyFailed(failure);
403        throw failure;
404      }
405      snapshot = new StateSnapshot(TERMINATED);
406      enqueueTerminatedEvent(previous);
407    } finally {
408      monitor.leave();
409      dispatchListenerEvents();
410    }
411  }
412
413  /**
414   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
415   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
416   * or otherwise cannot be started nor stopped.
417   */
418  protected final void notifyFailed(Throwable cause) {
419    checkNotNull(cause);
420
421    monitor.enter();
422    try {
423      State previous = state();
424      switch (previous) {
425        case NEW:
426        case TERMINATED:
427          throw new IllegalStateException("Failed while in state:" + previous, cause);
428        case RUNNING:
429        case STARTING:
430        case STOPPING:
431          snapshot = new StateSnapshot(FAILED, false, cause);
432          enqueueFailedEvent(previous, cause);
433          break;
434        case FAILED:
435          // Do nothing
436          break;
437        default:
438          throw new AssertionError("Unexpected state: " + previous);
439      }
440    } finally {
441      monitor.leave();
442      dispatchListenerEvents();
443    }
444  }
445
446  @Override
447  public final boolean isRunning() {
448    return state() == RUNNING;
449  }
450
451  @Override
452  public final State state() {
453    return snapshot.externalState();
454  }
455
456  /**
457   * @since 14.0
458   */
459  @Override
460  public final Throwable failureCause() {
461    return snapshot.failureCause();
462  }
463
464  /**
465   * @since 13.0
466   */
467  @Override
468  public final void addListener(Listener listener, Executor executor) {
469    listeners.addListener(listener, executor);
470  }
471
472  @Override
473  public String toString() {
474    return getClass().getSimpleName() + " [" + state() + "]";
475  }
476
477  /**
478   * Attempts to execute all the listeners in {@link #listeners} while not holding the {@link
479   * #monitor}.
480   */
481  private void dispatchListenerEvents() {
482    if (!monitor.isOccupiedByCurrentThread()) {
483      listeners.dispatch();
484    }
485  }
486
487  private void enqueueStartingEvent() {
488    listeners.enqueue(STARTING_EVENT);
489  }
490
491  private void enqueueRunningEvent() {
492    listeners.enqueue(RUNNING_EVENT);
493  }
494
495  private void enqueueStoppingEvent(final State from) {
496    if (from == State.STARTING) {
497      listeners.enqueue(STOPPING_FROM_STARTING_EVENT);
498    } else if (from == State.RUNNING) {
499      listeners.enqueue(STOPPING_FROM_RUNNING_EVENT);
500    } else {
501      throw new AssertionError();
502    }
503  }
504
505  private void enqueueTerminatedEvent(final State from) {
506    switch (from) {
507      case NEW:
508        listeners.enqueue(TERMINATED_FROM_NEW_EVENT);
509        break;
510      case RUNNING:
511        listeners.enqueue(TERMINATED_FROM_RUNNING_EVENT);
512        break;
513      case STOPPING:
514        listeners.enqueue(TERMINATED_FROM_STOPPING_EVENT);
515        break;
516      case STARTING:
517      case TERMINATED:
518      case FAILED:
519      default:
520        throw new AssertionError();
521    }
522  }
523
524  private void enqueueFailedEvent(final State from, final Throwable cause) {
525    // can't memoize this one due to the exception
526    listeners.enqueue(
527        new ListenerCallQueue.Event<Listener>() {
528          @Override
529          public void call(Listener listener) {
530            listener.failed(from, cause);
531          }
532
533          @Override
534          public String toString() {
535            return "failed({from = " + from + ", cause = " + cause + "})";
536          }
537        });
538  }
539
540  /**
541   * An immutable snapshot of the current state of the service. This class represents a consistent
542   * snapshot of the state and therefore it can be used to answer simple queries without needing to
543   * grab a lock.
544   */
545  @Immutable
546  private static final class StateSnapshot {
547    /**
548     * The internal state, which equals external state unless shutdownWhenStartupFinishes is true.
549     */
550    final State state;
551
552    /**
553     * If true, the user requested a shutdown while the service was still starting up.
554     */
555    final boolean shutdownWhenStartupFinishes;
556
557    /**
558     * The exception that caused this service to fail. This will be {@code null} unless the service
559     * has failed.
560     */
561    @Nullable final Throwable failure;
562
563    StateSnapshot(State internalState) {
564      this(internalState, false, null);
565    }
566
567    StateSnapshot(
568        State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
569      checkArgument(
570          !shutdownWhenStartupFinishes || internalState == STARTING,
571          "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.",
572          internalState);
573      checkArgument(
574          !(failure != null ^ internalState == FAILED),
575          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
576              + "instead.",
577          internalState,
578          failure);
579      this.state = internalState;
580      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
581      this.failure = failure;
582    }
583
584    /** @see Service#state() */
585    State externalState() {
586      if (shutdownWhenStartupFinishes && state == STARTING) {
587        return STOPPING;
588      } else {
589        return state;
590      }
591    }
592
593    /** @see Service#failureCause() */
594    Throwable failureCause() {
595      checkState(
596          state == FAILED,
597          "failureCause() is only valid if the service has failed, service is %s",
598          state);
599      return failure;
600    }
601  }
602}