001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022
023import com.google.common.annotations.Beta;
024import com.google.common.collect.Lists;
025import com.google.common.collect.Queues;
026import com.google.common.util.concurrent.Service.State; // javadoc needs this
027
028import java.util.List;
029import java.util.Queue;
030import java.util.concurrent.ExecutionException;
031import java.util.concurrent.Executor;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.TimeoutException;
034import java.util.concurrent.locks.ReentrantLock;
035import java.util.logging.Level;
036import java.util.logging.Logger;
037
038import javax.annotation.Nullable;
039import javax.annotation.concurrent.GuardedBy;
040import javax.annotation.concurrent.Immutable;
041
042/**
043 * Base class for implementing services that can handle {@link #doStart} and {@link #doStop}
044 * requests, responding to them with {@link #notifyStarted()} and {@link #notifyStopped()}
045 * callbacks. Its subclasses must manage threads manually; consider
046 * {@link AbstractExecutionThreadService} if you need only a single execution thread.
047 *
048 * @author Jesse Wilson
049 * @author Luke Sandberg
050 * @since 1.0
051 */
052@Beta
053public abstract class AbstractService implements Service {
054  private static final Logger logger = Logger.getLogger(AbstractService.class.getName());
055  private final ReentrantLock lock = new ReentrantLock();
056
057  private final Transition startup = new Transition();
058  private final Transition shutdown = new Transition();
059
060  /**
061   * The listeners to notify during a state transition.
062   */
063  @GuardedBy("lock")
064  private final List<ListenerExecutorPair> listeners = Lists.newArrayList();
065  
066  /**
067   * The queue of listeners that are waiting to be executed.
068   *
069   * <p>Enqueue operations should be protected by {@link #lock} while dequeue operations should be
070   * protected by the implicit lock on this object. Dequeue operations should be executed atomically
071   * with the execution of the {@link Runnable} and additionally the {@link #lock} should not be
072   * held when the listeners are being executed. Use {@link #executeListeners} for this operation.
073   * This is necessary to ensure that elements on the queue are executed in the correct order.
074   * Enqueue operations should be protected so that listeners are added in the correct order. We use
075   * a concurrent queue implementation so that enqueues can be executed concurrently with dequeues.
076   */
077  @GuardedBy("queuedListeners")
078  private final Queue<Runnable> queuedListeners = Queues.newConcurrentLinkedQueue();
079  
080  /**
081   * The current state of the service.  This should be written with the lock held but can be read
082   * without it because it is an immutable object in a volatile field.  This is desirable so that
083   * methods like {@link #state}, {@link #failureCause} and notably {@link #toString} can be run
084   * without grabbing the lock.  
085   * 
086   * <p>To update this field correctly the lock must be held to guarantee that the state is 
087   * consistent.
088   */
089  @GuardedBy("lock")
090  private volatile StateSnapshot snapshot = new StateSnapshot(State.NEW);
091
092  /** Constructor for use by subclasses. */
093  protected AbstractService() {
094    // Add a listener to update the futures. This needs to be added first so that it is executed 
095    // before the other listeners. This way the other listeners can access the completed futures.
096    addListener(
097        new Listener() {
098          @Override public void starting() {}
099          
100          @Override public void running() { 
101            startup.set(State.RUNNING);
102          }
103    
104          @Override public void stopping(State from) {
105            if (from == State.STARTING) {
106              startup.set(State.STOPPING);
107            }
108          }
109    
110          @Override public void terminated(State from) {
111            if (from == State.NEW) {
112              startup.set(State.TERMINATED);
113            } 
114            shutdown.set(State.TERMINATED);
115          }
116    
117          @Override public void failed(State from, Throwable failure) {
118            switch (from) {
119              case STARTING:
120                startup.setException(failure);
121                shutdown.setException(new Exception("Service failed to start.", failure));
122                break;
123              case RUNNING:
124                shutdown.setException(new Exception("Service failed while running", failure));
125                break;
126              case STOPPING:
127                shutdown.setException(failure);
128                break;
129              case TERMINATED:  /* fall-through */
130              case FAILED:  /* fall-through */
131              case NEW:  /* fall-through */
132              default:
133                throw new AssertionError("Unexpected from state: " + from);
134            }
135          }
136        }, 
137        MoreExecutors.sameThreadExecutor());
138  }
139  
140  /**
141   * This method is called by {@link #start} to initiate service startup. The invocation of this
142   * method should cause a call to {@link #notifyStarted()}, either during this method's run, or
143   * after it has returned. If startup fails, the invocation should cause a call to
144   * {@link #notifyFailed(Throwable)} instead.
145   *
146   * <p>This method should return promptly; prefer to do work on a different thread where it is
147   * convenient. It is invoked exactly once on service startup, even when {@link #start} is called
148   * multiple times.
149   */
150  protected abstract void doStart();
151
152  /**
153   * This method should be used to initiate service shutdown. The invocation of this method should
154   * cause a call to {@link #notifyStopped()}, either during this method's run, or after it has
155   * returned. If shutdown fails, the invocation should cause a call to
156   * {@link #notifyFailed(Throwable)} instead.
157   *
158   * <p> This method should return promptly; prefer to do work on a different thread where it is
159   * convenient. It is invoked exactly once on service shutdown, even when {@link #stop} is called
160   * multiple times.
161   */
162  protected abstract void doStop();
163
164  @Override
165  public final ListenableFuture<State> start() {
166    lock.lock();
167    try {
168      if (snapshot.state == State.NEW) {
169        snapshot = new StateSnapshot(State.STARTING);
170        starting();
171        doStart();
172      }
173    } catch (Throwable startupFailure) {
174      notifyFailed(startupFailure);
175    } finally {
176      lock.unlock();
177      executeListeners();
178    }
179
180    return startup;
181  }
182
183  @Override
184  public final ListenableFuture<State> stop() {
185    lock.lock();
186    try {
187      switch (snapshot.state) {
188        case NEW:
189          snapshot = new StateSnapshot(State.TERMINATED);
190          terminated(State.NEW);
191          break;
192        case STARTING:
193          snapshot = new StateSnapshot(State.STARTING, true, null);
194          stopping(State.STARTING);
195          break;
196        case RUNNING:
197          snapshot = new StateSnapshot(State.STOPPING);
198          stopping(State.RUNNING);
199          doStop();
200          break;
201        case STOPPING:
202        case TERMINATED:
203        case FAILED:
204          // do nothing
205          break;
206        default:
207          throw new AssertionError("Unexpected state: " + snapshot.state);
208      }
209    } catch (Throwable shutdownFailure) {
210      notifyFailed(shutdownFailure);
211    } finally {
212      lock.unlock();
213      executeListeners();
214    }
215
216    return shutdown;
217  }
218
219  @Override
220  public State startAndWait() {
221    return Futures.getUnchecked(start());
222  }
223
224  @Override
225  public State stopAndWait() {
226    return Futures.getUnchecked(stop());
227  }
228
229  /**
230   * Implementing classes should invoke this method once their service has started. It will cause
231   * the service to transition from {@link State#STARTING} to {@link State#RUNNING}.
232   *
233   * @throws IllegalStateException if the service is not {@link State#STARTING}.
234   */
235  protected final void notifyStarted() {
236    lock.lock();
237    try {
238      if (snapshot.state != State.STARTING) {
239        IllegalStateException failure = new IllegalStateException(
240            "Cannot notifyStarted() when the service is " + snapshot.state);
241        notifyFailed(failure);
242        throw failure;
243      }
244
245      if (snapshot.shutdownWhenStartupFinishes) {
246        snapshot = new StateSnapshot(State.STOPPING);
247        // We don't call listeners here because we already did that when we set the 
248        // shutdownWhenStartupFinishes flag.
249        doStop();
250      } else {
251        snapshot = new StateSnapshot(State.RUNNING);
252        running();
253      }
254    } finally {
255      lock.unlock();
256      executeListeners();
257    }
258  }
259
260  /**
261   * Implementing classes should invoke this method once their service has stopped. It will cause
262   * the service to transition from {@link State#STOPPING} to {@link State#TERMINATED}.
263   *
264   * @throws IllegalStateException if the service is neither {@link State#STOPPING} nor
265   *         {@link State#RUNNING}.
266   */
267  protected final void notifyStopped() {
268    lock.lock();
269    try {
270      if (snapshot.state != State.STOPPING && snapshot.state != State.RUNNING) {
271        IllegalStateException failure = new IllegalStateException(
272            "Cannot notifyStopped() when the service is " + snapshot.state);
273        notifyFailed(failure);
274        throw failure;
275      }
276      State previous = snapshot.state;
277      snapshot = new StateSnapshot(State.TERMINATED);
278      terminated(previous);
279    } finally {
280      lock.unlock();
281      executeListeners();
282    }
283  }
284
285  /**
286   * Invoke this method to transition the service to the {@link State#FAILED}. The service will
287   * <b>not be stopped</b> if it is running. Invoke this method when a service has failed critically
288   * or otherwise cannot be started nor stopped.
289   */
290  protected final void notifyFailed(Throwable cause) {
291    checkNotNull(cause);
292
293    lock.lock();
294    try {
295      switch (snapshot.state) {
296        case NEW:
297        case TERMINATED:
298          throw new IllegalStateException("Failed while in state:" + snapshot.state, cause);
299        case RUNNING:
300        case STARTING:
301        case STOPPING:
302          State previous = snapshot.state;
303          snapshot = new StateSnapshot(State.FAILED, false, cause);
304          failed(previous, cause);
305          break;
306        case FAILED:
307          // Do nothing
308          break;
309        default:
310          throw new AssertionError("Unexpected state: " + snapshot.state);
311      }
312    } finally {
313      lock.unlock();
314      executeListeners();
315    }
316  }
317
318  @Override
319  public final boolean isRunning() {
320    return state() == State.RUNNING;
321  }
322
323  @Override
324  public final State state() {
325    return snapshot.externalState();
326  }
327  
328  /**
329   * @since 14.0
330   */
331  @Override
332  public final Throwable failureCause() {
333    return snapshot.failureCause();
334  }
335  
336  /**
337   * @since 13.0
338   */
339  @Override
340  public final void addListener(Listener listener, Executor executor) {
341    checkNotNull(listener, "listener");
342    checkNotNull(executor, "executor");
343    lock.lock();
344    try {
345      if (snapshot.state != State.TERMINATED && snapshot.state != State.FAILED) {
346        listeners.add(new ListenerExecutorPair(listener, executor));
347      }
348    } finally {
349      lock.unlock();
350    }
351  }
352
353  @Override public String toString() {
354    return getClass().getSimpleName() + " [" + state() + "]";
355  }
356
357  /**
358   * A change from one service state to another, plus the result of the change.
359   */
360  private class Transition extends AbstractFuture<State> {
361    @Override
362    public State get(long timeout, TimeUnit unit)
363        throws InterruptedException, TimeoutException, ExecutionException {
364      try {
365        return super.get(timeout, unit);
366      } catch (TimeoutException e) {
367        throw new TimeoutException(AbstractService.this.toString());
368      }
369    }
370  }
371  
372  /** 
373   * Attempts to execute all the listeners in {@link #queuedListeners} while not holding the
374   * {@link #lock}.
375   */
376  private void executeListeners() {
377    if (!lock.isHeldByCurrentThread()) {
378      synchronized (queuedListeners) {
379        Runnable listener;
380        while ((listener = queuedListeners.poll()) != null) {
381          listener.run();
382        }
383      }
384    }
385  }
386  
387  @GuardedBy("lock")
388  private void starting() {
389    for (final ListenerExecutorPair pair : listeners) {
390      queuedListeners.add(new Runnable() {
391        @Override public void run() {
392          pair.execute(new Runnable() {
393            @Override public void run() {
394              pair.listener.starting();
395            }
396          });
397        }
398      });
399    }
400  }
401
402  @GuardedBy("lock")
403  private void running() {
404    for (final ListenerExecutorPair pair : listeners) {
405      queuedListeners.add(new Runnable() {
406        @Override public void run() {
407          pair.execute(new Runnable() {
408            @Override public void run() {
409              pair.listener.running();
410            }
411          });
412        }
413      });
414    }
415  }
416
417  @GuardedBy("lock")
418  private void stopping(final State from) {
419    for (final ListenerExecutorPair pair : listeners) {
420      queuedListeners.add(new Runnable() {
421        @Override public void run() {
422          pair.execute(new Runnable() {
423            @Override public void run() {
424              pair.listener.stopping(from);
425            }
426          });
427        }
428      });
429    }
430  }
431
432  @GuardedBy("lock")
433  private void terminated(final State from) {
434    for (final ListenerExecutorPair pair : listeners) {
435      queuedListeners.add(new Runnable() {
436        @Override public void run() {
437          pair.execute(new Runnable() {
438            @Override public void run() {
439              pair.listener.terminated(from);
440            }
441          });
442        }
443      });
444    }
445    // There are no more state transitions so we can clear this out.
446    listeners.clear();
447  }
448
449  @GuardedBy("lock")
450  private void failed(final State from, final Throwable cause) {
451    for (final ListenerExecutorPair pair : listeners) {
452      queuedListeners.add(new Runnable() {
453        @Override public void run() {
454          pair.execute(new Runnable() {
455            @Override public void run() {
456              pair.listener.failed(from, cause);
457            }
458          });
459        }
460      });
461    }
462    // There are no more state transitions so we can clear this out.
463    listeners.clear();
464  }
465  
466  /** A simple holder for a listener and its executor. */
467  private static class ListenerExecutorPair {
468    final Listener listener;
469    final Executor executor;
470
471    ListenerExecutorPair(Listener listener, Executor executor) {
472      this.listener = listener;
473      this.executor = executor;
474    }
475
476    /**
477     * Executes the given {@link Runnable} on {@link #executor} logging and swallowing all 
478     * exceptions
479     */
480    void execute(Runnable runnable) {
481      try {
482        executor.execute(runnable);
483      } catch (Exception e) {
484        logger.log(Level.SEVERE, "Exception while executing listener " + listener 
485            + " with executor " + executor, e);
486      }
487    }
488  }
489  
490  /**
491   * An immutable snapshot of the current state of the service. This class represents a consistent
492   * snapshot of the state and therefore it can be used to answer simple queries without needing to
493   * grab a lock.
494   */
495  @Immutable
496  private static final class StateSnapshot {
497    /**
498     * The internal state, which equals external state unless
499     * shutdownWhenStartupFinishes is true.
500     */
501    final State state;
502
503    /**
504     * If true, the user requested a shutdown while the service was still starting
505     * up.
506     */
507    final boolean shutdownWhenStartupFinishes;
508    
509    /**
510     * The exception that caused this service to fail.  This will be {@code null}
511     * unless the service has failed.
512     */
513    @Nullable
514    final Throwable failure;
515    
516    StateSnapshot(State internalState) {
517      this(internalState, false, null);
518    }
519    
520    StateSnapshot(
521        State internalState, boolean shutdownWhenStartupFinishes, @Nullable Throwable failure) {
522      checkArgument(!shutdownWhenStartupFinishes || internalState == State.STARTING, 
523          "shudownWhenStartupFinishes can only be set if state is STARTING. Got %s instead.", 
524          internalState);
525      checkArgument(!(failure != null ^ internalState == State.FAILED),
526          "A failure cause should be set if and only if the state is failed.  Got %s and %s "
527          + "instead.", internalState, failure);
528      this.state = internalState;
529      this.shutdownWhenStartupFinishes = shutdownWhenStartupFinishes;
530      this.failure = failure;
531    }
532    
533    /** @see Service#state() */
534    State externalState() {
535      if (shutdownWhenStartupFinishes && state == State.STARTING) {
536        return State.STOPPING;
537      } else {
538        return state;
539      }
540    }
541    
542    /** @see Service#failureCause() */
543    Throwable failureCause() {
544      checkState(state == State.FAILED, 
545          "failureCause() is only valid if the service has failed, service is %s", state);
546      return failure;
547    }
548  }
549}