001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.util.concurrent.Service.State.FAILED;
023import static com.google.common.util.concurrent.Service.State.NEW;
024import static com.google.common.util.concurrent.Service.State.RUNNING;
025import static com.google.common.util.concurrent.Service.State.STARTING;
026import static com.google.common.util.concurrent.Service.State.STOPPING;
027import static com.google.common.util.concurrent.Service.State.TERMINATED;
028
029import com.google.common.annotations.Beta;
030import com.google.common.util.concurrent.ListenerCallQueue.Callback;
031import com.google.common.util.concurrent.Monitor.Guard;
032import com.google.common.util.concurrent.Service.State; // javadoc needs this
033
034import java.util.ArrayList;
035import java.util.Collections;
036import java.util.List;
037import java.util.concurrent.Executor;
038import java.util.concurrent.TimeUnit;
039import java.util.concurrent.TimeoutException;
040
041import javax.annotation.Nullable;
042import javax.annotation.concurrent.GuardedBy;
043import javax.annotation.concurrent.Immutable;
044
045/**
046 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
047 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
048 * callbacks. Its subclasses must manage threads manually; consider
049 * {@link AbstractExecutionThreadService} if you need only a single execution thread.
050 *
051 * @author Jesse Wilson
052 * @author Luke Sandberg
053 * @since 1.0
054 */
055@Beta
056public abstract class AbstractService implements Service {
057  private static final Callback<Listener> STARTING_CALLBACK = 
058      new Callback<Listener>("starting()") {
059        @Override void call(Listener listener) {
060          listener.starting();
061        }
062      };
063  private static final Callback<Listener> RUNNING_CALLBACK = 
064      new Callback<Listener>("running()") {
065        @Override void call(Listener listener) {
066          listener.running();
067        }
068      };
069  private static final Callback<Listener> STOPPING_FROM_STARTING_CALLBACK = 
070      stoppingCallback(STARTING);
071  private static final Callback<Listener> STOPPING_FROM_RUNNING_CALLBACK = 
072      stoppingCallback(RUNNING);
073  
074  private static final Callback<Listener> TERMINATED_FROM_NEW_CALLBACK = 
075      terminatedCallback(NEW);
076  private static final Callback<Listener> TERMINATED_FROM_RUNNING_CALLBACK = 
077      terminatedCallback(RUNNING);
078  private static final Callback<Listener> TERMINATED_FROM_STOPPING_CALLBACK = 
079      terminatedCallback(STOPPING);
080
081  private static Callback<Listener> terminatedCallback(final State from) {
082    return new Callback<Listener>("terminated({from = " + from + "})") {
083      @Override void call(Listener listener) {
084        listener.terminated(from);
085      }
086    };
087  }
088  
089  private static Callback<Listener> stoppingCallback(final State from) {
090    return new Callback<Listener>("stopping({from = " + from + "})") {
091      @Override void call(Listener listener) {
092        listener.stopping(from);
093      }
094    };
095  }
096  
097  private final Monitor monitor = new Monitor();
098
099  private final Guard isStartable = new Guard(monitor) {
100    @Override public boolean isSatisfied() {
101      return state() == NEW;
102    }
103  };
104    
105  private final Guard isStoppable = new Guard(monitor) {
106    @Override public boolean isSatisfied() {
107      return state().compareTo(RUNNING) <= 0;
108    }
109  };
110  
111  private final Guard hasReachedRunning = new Guard(monitor) {
112    @Override public boolean isSatisfied() {
113      return state().compareTo(RUNNING) >= 0;
114    }
115  };
116  
117  private final Guard isStopped = new Guard(monitor) {
118    @Override public boolean isSatisfied() {
119      return state().isTerminal();
120    }
121  };
122
123  /**
124   * The listeners to notify during a state transition.
125   */
126  @GuardedBy("monitor")
127  private final List<ListenerCallQueue<Listener>> listeners = 
128      Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());
129  
130  /**
131   * The current state of the service.  This should be written with the lock held but can be read
132   * without it because it is an immutable object in a volatile field.  This is desirable so that
133   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
134   * without grabbing the lock.  
135   * 
136   * <p>To update this field correctly the lock must be held to guarantee that the state is 
137   * consistent.
138   */
139  @GuardedBy("monitor")
140  private volatile StateSnapshot snapshot = new StateSnapshot(NEW);
141
142  /** Constructor for use by subclasses. */
143  protected AbstractService() {}
144  
145  /**
146   * This method is called by {@link #startAsync} to initiate service startup. The invocation of 
147   * this method should cause a call to {@link #notifyStarted()}, either during this method's run,
148   * or after it has returned. If startup fails, the invocation should cause a call to
149   * {@link #notifyFailed(Throwable)} instead.
150   *
151   * <p>This method should return promptly; prefer to do work on a different thread where it is
152   * convenient. It is invoked exactly once on service startup, even when {@link #startAsync} is 
153   * called multiple times.
154   */
155  protected abstract void doStart();
156
157  /**
158   * This method should be used to initiate service shutdown. The invocation of this method should
159   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
160   * returned. If shutdown fails, the invocation should cause a call to
161   * {@link #notifyFailed(Throwable)} instead.
162   *
163   * <p> This method should return promptly; prefer to do work on a different thread where it is
164   * convenient. It is invoked exactly once on service shutdown, even when {@link #stopAsync} is 
165   * called multiple times.
166   */
167  protected abstract void doStop();
168
169  @Override public final Service startAsync() {
170    if (monitor.enterIf(isStartable)) {
171      try {
172        snapshot = new StateSnapshot(STARTING);
173        starting();
174        doStart();
175      } catch (Throwable startupFailure) {
176        notifyFailed(startupFailure);
177      } finally {
178        monitor.leave();
179        executeListeners();
180      }
181    } else {
182      throw new IllegalStateException("Service " + this + " has already been started");
183    }
184    return this;
185  }
186
187  @Override public final Service stopAsync() {
188    if (monitor.enterIf(isStoppable)) {
189      try {
190        State previous = state();
191        switch (previous) {
192          case NEW:
193            snapshot = new StateSnapshot(TERMINATED);
194            terminated(NEW);
195            break;
196          case STARTING:
197            snapshot = new StateSnapshot(STARTING, true, null);
198            stopping(STARTING);
199            break;
200          case RUNNING:
201            snapshot = new StateSnapshot(STOPPING);
202            stopping(RUNNING);
203            doStop();
204            break;
205          case STOPPING:
206          case TERMINATED:
207          case FAILED:
208            // These cases are impossible due to the if statement above.
209            throw new AssertionError("isStoppable is incorrectly implemented, saw: " + previous);
210          default:
211            throw new AssertionError("Unexpected state: " + previous);
212        }
213      } catch (Throwable shutdownFailure) {
214        notifyFailed(shutdownFailure);
215      } finally {
216        monitor.leave();
217        executeListeners();
218      }
219    }
220    return this;
221  }
222
223  @Override public final void awaitRunning() {
224    monitor.enterWhenUninterruptibly(hasReachedRunning);
225    try {
226      checkCurrentState(RUNNING);
227    } finally {
228      monitor.leave();
229    }
230  }
231  
232  @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
233    if (monitor.enterWhenUninterruptibly(hasReachedRunning, timeout, unit)) {
234      try {
235        checkCurrentState(RUNNING);
236      } finally {
237        monitor.leave();
238      }
239    } else {
240      // It is possible due to races the we are currently in the expected state even though we 
241      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
242      // even check the guard.  I don't think we care too much about this use case but it could lead
243      // to a confusing error message.
244      throw new TimeoutException("Timed out waiting for " + this + " to reach the RUNNING state. "
245          + "Current state: " + state());
246    }
247  }
248
249  @Override public final void awaitTerminated() {
250    monitor.enterWhenUninterruptibly(isStopped);
251    try {
252      checkCurrentState(TERMINATED);
253    } finally {
254      monitor.leave();
255    }
256  }
257  
258  @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
259    if (monitor.enterWhenUninterruptibly(isStopped, timeout, unit)) {
260      try {
261        checkCurrentState(TERMINATED);
262      } finally {
263        monitor.leave();
264      }
265    } else {
266      // It is possible due to races the we are currently in the expected state even though we 
267      // timed out. e.g. if we weren't event able to grab the lock within the timeout we would never
268      // even check the guard.  I don't think we care too much about this use case but it could lead
269      // to a confusing error message.
270      throw new TimeoutException("Timed out waiting for " + this + " to reach a terminal state. "
271          + "Current state: " + state());
272    }
273  }
274  
275  /** Checks that the current state is equal to the expected state. */
276  @GuardedBy("monitor")
277  private void checkCurrentState(State expected) {
278    State actual = state();
279    if (actual != expected) {
280      if (actual == FAILED) {
281        // Handle this specially so that we can include the failureCause, if there is one.
282        throw new IllegalStateException("Expected the service to be " + expected 
283            + ", but the service has FAILED", failureCause());
284      }
285      throw new IllegalStateException("Expected the service to be " + expected + ", but was " 
286          + actual);
287    }
288  }
289
290  /**
291   * Implementing classes should invoke this method once their service has started. It will cause
292   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
293   *
294   * @throws IllegalStateException if the service is not {@link State#STARTING}.
295   */
296  protected final void notifyStarted() {
297    monitor.enter();
298    try {
299      // We have to examine the internal state of the snapshot here to properly handle the stop 
300      // while starting case.
301      if (snapshot.state != STARTING) {
302        IllegalStateException failure = new IllegalStateException(
303            "Cannot notifyStarted() when the service is " + snapshot.state);
304        notifyFailed(failure);
305        throw failure;
306      }
307
308      if (snapshot.shutdownWhenStartupFinishes) {
309        snapshot = new StateSnapshot(STOPPING);
310        // We don't call listeners here because we already did that when we set the 
311        // shutdownWhenStartupFinishes flag.
312        doStop();
313      } else {
314        snapshot = new StateSnapshot(RUNNING);
315        running();
316      }
317    } finally {
318      monitor.leave();
319      executeListeners();
320    }
321  }
322
323  /**
324   * Implementing classes should invoke this method once their service has stopped. It will cause
325   * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
326   *
327   * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
328   *         {@link State#RUNNING}.
329   */
330  protected final void notifyStopped() {
331    monitor.enter();
332    try {
333      // We check the internal state of the snapshot instead of state() directly so we don't allow
334      // notifyStopped() to be called while STARTING, even if stop() has already been called.
335      State previous = snapshot.state;
336      if (previous != STOPPING && previous != RUNNING) {
337        IllegalStateException failure = new IllegalStateException(
338            "Cannot notifyStopped() when the service is " + previous);
339        notifyFailed(failure);
340        throw failure;
341      }
342      snapshot = new StateSnapshot(TERMINATED);
343      terminated(previous);
344    } finally {
345      monitor.leave();
346      executeListeners();
347    }
348  }
349
350  /**
351   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
352   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
353   * or otherwise cannot be started nor stopped.
354   */
355  protected final void notifyFailed(Throwable cause) {
356    checkNotNull(cause);
357
358    monitor.enter();
359    try {
360      State previous = state();
361      switch (previous) {
362        case NEW:
363        case TERMINATED:
364          throw new IllegalStateException("Failed while in state:" + previous, cause);
365        case RUNNING:
366        case STARTING:
367        case STOPPING:
368          snapshot = new StateSnapshot(FAILED, false, cause);
369          failed(previous, cause);
370          break;
371        case FAILED:
372          // Do nothing
373          break;
374        default:
375          throw new AssertionError("Unexpected state: " + previous);
376      }
377    } finally {
378      monitor.leave();
379      executeListeners();
380    }
381  }
382
383  @Override
384  public final boolean isRunning() {
385    return state() == RUNNING;
386  }
387
388  @Override
389  public final State state() {
390    return snapshot.externalState();
391  }
392  
393  /**
394   * @since 14.0
395   */
396  @Override
397  public final Throwable failureCause() {
398    return snapshot.failureCause();
399  }
400  
401  /**
402   * @since 13.0
403   */
404  @Override
405  public final void addListener(Listener listener, Executor executor) {
406    checkNotNull(listener, "listener");
407    checkNotNull(executor, "executor");
408    monitor.enter();
409    try {
410      if (!state().isTerminal()) {
411        listeners.add(new ListenerCallQueue<Listener>(listener, executor));
412      }
413    } finally {
414      monitor.leave();
415    }
416  }
417
418  @Override public String toString() {
419    return getClass().getSimpleName() + " [" + state() + "]";
420  }
421
422  /** 
423   * Attempts to execute all the listeners in {@link #listeners} while not holding the
424   * {@link #monitor}.
425   */
426  private void executeListeners() {
427    if (!monitor.isOccupiedByCurrentThread()) {
428      // iterate by index to avoid concurrent modification exceptions
429      for (int i = 0; i < listeners.size(); i++) {
430        listeners.get(i).execute();
431      }
432    }
433  }
434
435  @GuardedBy("monitor")
436  private void starting() {
437    STARTING_CALLBACK.enqueueOn(listeners);
438  }
439
440  @GuardedBy("monitor")
441  private void running() {
442    RUNNING_CALLBACK.enqueueOn(listeners);
443  }
444
445  @GuardedBy("monitor")
446  private void stopping(final State from) {
447    if (from == State.STARTING) {
448      STOPPING_FROM_STARTING_CALLBACK.enqueueOn(listeners);
449    } else if (from == State.RUNNING) {
450      STOPPING_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
451    } else {
452      throw new AssertionError();
453    }
454  }
455
456  @GuardedBy("monitor")
457  private void terminated(final State from) {
458    switch(from) {
459      case NEW:
460        TERMINATED_FROM_NEW_CALLBACK.enqueueOn(listeners);
461        break;
462      case RUNNING:
463        TERMINATED_FROM_RUNNING_CALLBACK.enqueueOn(listeners);
464        break;
465      case STOPPING:
466        TERMINATED_FROM_STOPPING_CALLBACK.enqueueOn(listeners);
467        break;
468      case STARTING:
469      case TERMINATED:
470      case FAILED:
471      default:
472        throw new AssertionError();
473    }
474  }
475
476  @GuardedBy("monitor")
477  private void failed(final State from, final Throwable cause) {
478    // can't memoize this one due to the exception
479    new Callback<Listener>("failed({from = " + from + ", cause = " + cause + "})") {
480      @Override void call(Listener listener) {
481        listener.failed(from, cause);
482      }
483    }.enqueueOn(listeners);
484  }
485  
486  /**
487   * An immutable snapshot of the current state of the service. This class represents a consistent
488   * snapshot of the state and therefore it can be used to answer simple queries without needing to
489   * grab a lock.
490   */
491  @Immutable
492  private static final class StateSnapshot {
493    /**
494     * The internal state, which equals external state unless
495     * shutdownWhenStartupFinishes is true.
496     */
497    final State state;
498
499    /**
500     * If true, the user requested a shutdown while the service was still starting
501     * up.
502     */
503    final boolean shutdownWhenStartupFinishes;
504    
505    /**
506     * The exception that caused this service to fail.  This will be {@code null}
507     * unless the service has failed.
508     */
509    @Nullable
510    final Throwable failure;
511    
512    StateSnapshot(State internalState) {
513      this(internalState, false, null);
514    }
515    
516    StateSnapshot(
517        State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
518      checkArgument(!shutdownWhenStartupFinishes || internalState == STARTING, 
519          "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 
520          internalState);
521      checkArgument(!(failure != null ^ internalState == FAILED),
522          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
523          + "instead.", internalState, failure);
524      this.state = internalState;
525      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
526      this.failure = failure;
527    }
528    
529    /** @see Service#state() */
530    State externalState() {
531      if (shutdownWhenStartupFinishes && state == STARTING) {
532        return STOPPING;
533      } else {
534        return state;
535      }
536    }
537    
538    /** @see Service#failureCause() */
539    Throwable failureCause() {
540      checkState(state == FAILED, 
541          "failureCause() is only valid if the service has failed, service is %s", state);
542      return failure;
543    }
544  }
545}