001/*
002 * Copyright (C) 2011 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.util.concurrent.Internal.saturatedToNanos;
020import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
021
022import com.google.common.annotations.Beta;
023import com.google.common.annotations.GwtIncompatible;
024import com.google.common.base.Supplier;
025import com.google.errorprone.annotations.CanIgnoreReturnValue;
026import com.google.errorprone.annotations.concurrent.GuardedBy;
027import com.google.j2objc.annotations.WeakOuter;
028import java.time.Duration;
029import java.util.concurrent.Callable;
030import java.util.concurrent.Executor;
031import java.util.concurrent.Executors;
032import java.util.concurrent.Future;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.ThreadFactory;
035import java.util.concurrent.TimeUnit;
036import java.util.concurrent.TimeoutException;
037import java.util.concurrent.locks.ReentrantLock;
038import java.util.logging.Level;
039import java.util.logging.Logger;
040import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
041import org.checkerframework.checker.nullness.qual.Nullable;
042
043/**
044 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in
045 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp},
046 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically.
047 *
048 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run
049 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the
050 * {@link #runOneIteration} that will be executed periodically as specified by its {@link
051 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic
052 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method.
053 *
054 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link
055 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link
056 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start
057 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify
058 * shared state without additional synchronization necessary for visibility to later executions of
059 * the life cycle methods.
060 *
061 * <h3>Usage Example</h3>
062 *
063 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to
064 * rate limit itself.
065 *
066 * <pre>{@code
067 * class CrawlingService extends AbstractScheduledService {
068 *   private Set<Uri> visited;
069 *   private Queue<Uri> toCrawl;
070 *   protected void startUp() throws Exception {
071 *     toCrawl = readStartingUris();
072 *   }
073 *
074 *   protected void runOneIteration() throws Exception {
075 *     Uri uri = toCrawl.remove();
076 *     Collection<Uri> newUris = crawl(uri);
077 *     visited.add(uri);
078 *     for (Uri newUri : newUris) {
079 *       if (!visited.contains(newUri)) { toCrawl.add(newUri); }
080 *     }
081 *   }
082 *
083 *   protected void shutDown() throws Exception {
084 *     saveUris(toCrawl);
085 *   }
086 *
087 *   protected Scheduler scheduler() {
088 *     return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS);
089 *   }
090 * }
091 * }</pre>
092 *
093 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of
094 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to
095 * rate limit the number of queries we perform.
096 *
097 * @author Luke Sandberg
098 * @since 11.0
099 */
100@Beta
101@GwtIncompatible
102public abstract class AbstractScheduledService implements Service {
103  private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName());
104
105  /**
106   * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its
107   * task.
108   *
109   * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory
110   * methods, these provide {@link Scheduler} instances for the common use case of running the
111   * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link
112   * CustomScheduler}.
113   *
114   * @author Luke Sandberg
115   * @since 11.0
116   */
117  public abstract static class Scheduler {
118    /**
119     * Returns a {@link Scheduler} that schedules the task using the {@link
120     * ScheduledExecutorService#scheduleWithFixedDelay} method.
121     *
122     * @param initialDelay the time to delay first execution
123     * @param delay the delay between the termination of one execution and the commencement of the
124     *     next
125     * @since 28.0
126     */
127    public static Scheduler newFixedDelaySchedule(Duration initialDelay, Duration delay) {
128      return newFixedDelaySchedule(
129          saturatedToNanos(initialDelay), saturatedToNanos(delay), TimeUnit.NANOSECONDS);
130    }
131
132    /**
133     * Returns a {@link Scheduler} that schedules the task using the {@link
134     * ScheduledExecutorService#scheduleWithFixedDelay} method.
135     *
136     * @param initialDelay the time to delay first execution
137     * @param delay the delay between the termination of one execution and the commencement of the
138     *     next
139     * @param unit the time unit of the initialDelay and delay parameters
140     */
141    @SuppressWarnings("GoodTime") // should accept a java.time.Duration
142    public static Scheduler newFixedDelaySchedule(
143        final long initialDelay, final long delay, final TimeUnit unit) {
144      checkNotNull(unit);
145      checkArgument(delay > 0, "delay must be > 0, found %s", delay);
146      return new Scheduler() {
147        @Override
148        public Future<?> schedule(
149            AbstractService service, ScheduledExecutorService executor, Runnable task) {
150          return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit);
151        }
152      };
153    }
154
155    /**
156     * Returns a {@link Scheduler} that schedules the task using the {@link
157     * ScheduledExecutorService#scheduleAtFixedRate} method.
158     *
159     * @param initialDelay the time to delay first execution
160     * @param period the period between successive executions of the task
161     * @since 28.0
162     */
163    public static Scheduler newFixedRateSchedule(Duration initialDelay, Duration period) {
164      return newFixedRateSchedule(
165          saturatedToNanos(initialDelay), saturatedToNanos(period), TimeUnit.NANOSECONDS);
166    }
167
168    /**
169     * Returns a {@link Scheduler} that schedules the task using the {@link
170     * ScheduledExecutorService#scheduleAtFixedRate} method.
171     *
172     * @param initialDelay the time to delay first execution
173     * @param period the period between successive executions of the task
174     * @param unit the time unit of the initialDelay and period parameters
175     */
176    @SuppressWarnings("GoodTime") // should accept a java.time.Duration
177    public static Scheduler newFixedRateSchedule(
178        final long initialDelay, final long period, final TimeUnit unit) {
179      checkNotNull(unit);
180      checkArgument(period > 0, "period must be > 0, found %s", period);
181      return new Scheduler() {
182        @Override
183        public Future<?> schedule(
184            AbstractService service, ScheduledExecutorService executor, Runnable task) {
185          return executor.scheduleAtFixedRate(task, initialDelay, period, unit);
186        }
187      };
188    }
189
190    /** Schedules the task to run on the provided executor on behalf of the service. */
191    abstract Future<?> schedule(
192        AbstractService service, ScheduledExecutorService executor, Runnable runnable);
193
194    private Scheduler() {}
195  }
196
197  /* use AbstractService for state management */
198  private final AbstractService delegate = new ServiceDelegate();
199
200  @WeakOuter
201  private final class ServiceDelegate extends AbstractService {
202
203    // A handle to the running task so that we can stop it when a shutdown has been requested.
204    // These two fields are volatile because their values will be accessed from multiple threads.
205    @MonotonicNonNull private volatile Future<?> runningTask;
206    @MonotonicNonNull private volatile ScheduledExecutorService executorService;
207
208    // This lock protects the task so we can ensure that none of the template methods (startUp,
209    // shutDown or runOneIteration) run concurrently with one another.
210    // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the
211    // lock.
212    private final ReentrantLock lock = new ReentrantLock();
213
214    @WeakOuter
215    class Task implements Runnable {
216      @Override
217      public void run() {
218        lock.lock();
219        try {
220          if (runningTask.isCancelled()) {
221            // task may have been cancelled while blocked on the lock.
222            return;
223          }
224          AbstractScheduledService.this.runOneIteration();
225        } catch (Throwable t) {
226          try {
227            shutDown();
228          } catch (Exception ignored) {
229            logger.log(
230                Level.WARNING,
231                "Error while attempting to shut down the service after failure.",
232                ignored);
233          }
234          notifyFailed(t);
235          runningTask.cancel(false); // prevent future invocations.
236        } finally {
237          lock.unlock();
238        }
239      }
240    }
241
242    private final Runnable task = new Task();
243
244    @Override
245    protected final void doStart() {
246      executorService =
247          MoreExecutors.renamingDecorator(
248              executor(),
249              new Supplier<String>() {
250                @Override
251                public String get() {
252                  return serviceName() + " " + state();
253                }
254              });
255      executorService.execute(
256          new Runnable() {
257            @Override
258            public void run() {
259              lock.lock();
260              try {
261                startUp();
262                runningTask = scheduler().schedule(delegate, executorService, task);
263                notifyStarted();
264              } catch (Throwable t) {
265                notifyFailed(t);
266                if (runningTask != null) {
267                  // prevent the task from running if possible
268                  runningTask.cancel(false);
269                }
270              } finally {
271                lock.unlock();
272              }
273            }
274          });
275    }
276
277    @Override
278    protected final void doStop() {
279      runningTask.cancel(false);
280      executorService.execute(
281          new Runnable() {
282            @Override
283            public void run() {
284              try {
285                lock.lock();
286                try {
287                  if (state() != State.STOPPING) {
288                    // This means that the state has changed since we were scheduled. This implies
289                    // that an execution of runOneIteration has thrown an exception and we have
290                    // transitioned to a failed state, also this means that shutDown has already
291                    // been called, so we do not want to call it again.
292                    return;
293                  }
294                  shutDown();
295                } finally {
296                  lock.unlock();
297                }
298                notifyStopped();
299              } catch (Throwable t) {
300                notifyFailed(t);
301              }
302            }
303          });
304    }
305
306    @Override
307    public String toString() {
308      return AbstractScheduledService.this.toString();
309    }
310  }
311
312  /** Constructor for use by subclasses. */
313  protected AbstractScheduledService() {}
314
315  /**
316   * Run one iteration of the scheduled task. If any invocation of this method throws an exception,
317   * the service will transition to the {@link Service.State#FAILED} state and this method will no
318   * longer be called.
319   */
320  protected abstract void runOneIteration() throws Exception;
321
322  /**
323   * Start the service.
324   *
325   * <p>By default this method does nothing.
326   */
327  protected void startUp() throws Exception {}
328
329  /**
330   * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}.
331   *
332   * <p>By default this method does nothing.
333   */
334  protected void shutDown() throws Exception {}
335
336  /**
337   * Returns the {@link Scheduler} object used to configure this service. This method will only be
338   * called once.
339   */
340  // TODO(cpovirk): @ForOverride
341  protected abstract Scheduler scheduler();
342
343  /**
344   * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
345   * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the
346   * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service
347   * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED
348   * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService}
349   * instance. This method is guaranteed to only be called once.
350   *
351   * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread
352   * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. Also,
353   * the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service
354   * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED
355   * fails}.
356   */
357  protected ScheduledExecutorService executor() {
358    @WeakOuter
359    class ThreadFactoryImpl implements ThreadFactory {
360      @Override
361      public Thread newThread(Runnable runnable) {
362        return MoreExecutors.newThread(serviceName(), runnable);
363      }
364    }
365    final ScheduledExecutorService executor =
366        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl());
367    // Add a listener to shutdown the executor after the service is stopped. This ensures that the
368    // JVM shutdown will not be prevented from exiting after this service has stopped or failed.
369    // Technically this listener is added after start() was called so it is a little gross, but it
370    // is called within doStart() so we know that the service cannot terminate or fail concurrently
371    // with adding this listener so it is impossible to miss an event that we are interested in.
372    addListener(
373        new Listener() {
374          @Override
375          public void terminated(State from) {
376            executor.shutdown();
377          }
378
379          @Override
380          public void failed(State from, Throwable failure) {
381            executor.shutdown();
382          }
383        },
384        directExecutor());
385    return executor;
386  }
387
388  /**
389   * Returns the name of this service. {@link AbstractScheduledService} may include the name in
390   * debugging output.
391   *
392   * @since 14.0
393   */
394  protected String serviceName() {
395    return getClass().getSimpleName();
396  }
397
398  @Override
399  public String toString() {
400    return serviceName() + " [" + state() + "]";
401  }
402
403  @Override
404  public final boolean isRunning() {
405    return delegate.isRunning();
406  }
407
408  @Override
409  public final State state() {
410    return delegate.state();
411  }
412
413  /** @since 13.0 */
414  @Override
415  public final void addListener(Listener listener, Executor executor) {
416    delegate.addListener(listener, executor);
417  }
418
419  /** @since 14.0 */
420  @Override
421  public final Throwable failureCause() {
422    return delegate.failureCause();
423  }
424
425  /** @since 15.0 */
426  @CanIgnoreReturnValue
427  @Override
428  public final Service startAsync() {
429    delegate.startAsync();
430    return this;
431  }
432
433  /** @since 15.0 */
434  @CanIgnoreReturnValue
435  @Override
436  public final Service stopAsync() {
437    delegate.stopAsync();
438    return this;
439  }
440
441  /** @since 15.0 */
442  @Override
443  public final void awaitRunning() {
444    delegate.awaitRunning();
445  }
446
447  /** @since 15.0 */
448  @Override
449  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
450    delegate.awaitRunning(timeout, unit);
451  }
452
453  /** @since 15.0 */
454  @Override
455  public final void awaitTerminated() {
456    delegate.awaitTerminated();
457  }
458
459  /** @since 15.0 */
460  @Override
461  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
462    delegate.awaitTerminated(timeout, unit);
463  }
464
465  /**
466   * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to
467   * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been
468   * cancelled, the {@link #getNextSchedule} method will be called.
469   *
470   * @author Luke Sandberg
471   * @since 11.0
472   */
473  @Beta
474  public abstract static class CustomScheduler extends Scheduler {
475
476    /** A callable class that can reschedule itself using a {@link CustomScheduler}. */
477    private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> {
478
479      /** The underlying task. */
480      private final Runnable wrappedRunnable;
481
482      /** The executor on which this Callable will be scheduled. */
483      private final ScheduledExecutorService executor;
484
485      /**
486       * The service that is managing this callable. This is used so that failure can be reported
487       * properly.
488       */
489      private final AbstractService service;
490
491      /**
492       * This lock is used to ensure safe and correct cancellation, it ensures that a new task is
493       * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to
494       * ensure that it is assigned atomically with being scheduled.
495       */
496      private final ReentrantLock lock = new ReentrantLock();
497
498      /** The future that represents the next execution of this task. */
499      @GuardedBy("lock")
500      private @Nullable Future<Void> currentFuture;
501
502      ReschedulableCallable(
503          AbstractService service, ScheduledExecutorService executor, Runnable runnable) {
504        this.wrappedRunnable = runnable;
505        this.executor = executor;
506        this.service = service;
507      }
508
509      @Override
510      public Void call() throws Exception {
511        wrappedRunnable.run();
512        reschedule();
513        return null;
514      }
515
516      /** Atomically reschedules this task and assigns the new future to {@link #currentFuture}. */
517      public void reschedule() {
518        // invoke the callback outside the lock, prevents some shenanigans.
519        Schedule schedule;
520        try {
521          schedule = CustomScheduler.this.getNextSchedule();
522        } catch (Throwable t) {
523          service.notifyFailed(t);
524          return;
525        }
526        // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that
527        // cancel calls cancel on the correct future. 2. we want to make sure that the assignment
528        // to currentFuture doesn't race with itself so that currentFuture is assigned in the
529        // correct order.
530        Throwable scheduleFailure = null;
531        lock.lock();
532        try {
533          if (currentFuture == null || !currentFuture.isCancelled()) {
534            currentFuture = executor.schedule(this, schedule.delay, schedule.unit);
535          }
536        } catch (Throwable e) {
537          // If an exception is thrown by the subclass then we need to make sure that the service
538          // notices and transitions to the FAILED state. We do it by calling notifyFailed directly
539          // because the service does not monitor the state of the future so if the exception is not
540          // caught and forwarded to the service the task would stop executing but the service would
541          // have no idea.
542          // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then
543          // the AbstractService could monitor the future directly. Rescheduling is still hard...
544          // but it would help with some of these lock ordering issues.
545          scheduleFailure = e;
546        } finally {
547          lock.unlock();
548        }
549        // Call notifyFailed outside the lock to avoid lock ordering issues.
550        if (scheduleFailure != null) {
551          service.notifyFailed(scheduleFailure);
552        }
553      }
554
555      // N.B. Only protect cancel and isCancelled because those are the only methods that are
556      // invoked by the AbstractScheduledService.
557      @Override
558      public boolean cancel(boolean mayInterruptIfRunning) {
559        // Ensure that a task cannot be rescheduled while a cancel is ongoing.
560        lock.lock();
561        try {
562          return currentFuture.cancel(mayInterruptIfRunning);
563        } finally {
564          lock.unlock();
565        }
566      }
567
568      @Override
569      public boolean isCancelled() {
570        lock.lock();
571        try {
572          return currentFuture.isCancelled();
573        } finally {
574          lock.unlock();
575        }
576      }
577
578      @Override
579      protected Future<Void> delegate() {
580        throw new UnsupportedOperationException(
581            "Only cancel and isCancelled is supported by this future");
582      }
583    }
584
585    @Override
586    final Future<?> schedule(
587        AbstractService service, ScheduledExecutorService executor, Runnable runnable) {
588      ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable);
589      task.reschedule();
590      return task;
591    }
592
593    /**
594     * A value object that represents an absolute delay until a task should be invoked.
595     *
596     * @author Luke Sandberg
597     * @since 11.0
598     */
599    @Beta
600    protected static final class Schedule {
601
602      private final long delay;
603      private final TimeUnit unit;
604
605      /**
606       * @param delay the time from now to delay execution
607       * @param unit the time unit of the delay parameter
608       */
609      public Schedule(long delay, TimeUnit unit) {
610        this.delay = delay;
611        this.unit = checkNotNull(unit);
612      }
613    }
614
615    /**
616     * Calculates the time at which to next invoke the task.
617     *
618     * <p>This is guaranteed to be called immediately after the task has completed an iteration and
619     * on the same thread as the previous execution of {@link
620     * AbstractScheduledService#runOneIteration}.
621     *
622     * @return a schedule that defines the delay before the next execution.
623     */
624    // TODO(cpovirk): @ForOverride
625    protected abstract Schedule getNextSchedule() throws Exception;
626  }
627}