001/*
002 * Copyright (C) 2011 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.util.concurrent.Futures.immediateCancelledFuture;
020import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
021import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException;
022import static java.util.Objects.requireNonNull;
023
024import com.google.common.annotations.GwtIncompatible;
025import com.google.common.annotations.J2ktIncompatible;
026import com.google.errorprone.annotations.CanIgnoreReturnValue;
027import com.google.errorprone.annotations.concurrent.GuardedBy;
028import com.google.j2objc.annotations.WeakOuter;
029import java.util.concurrent.Callable;
030import java.util.concurrent.Executor;
031import java.util.concurrent.Executors;
032import java.util.concurrent.Future;
033import java.util.concurrent.ScheduledExecutorService;
034import java.util.concurrent.ScheduledFuture;
035import java.util.concurrent.ThreadFactory;
036import java.util.concurrent.TimeUnit;
037import java.util.concurrent.TimeoutException;
038import java.util.concurrent.locks.ReentrantLock;
039import java.util.logging.Level;
040import javax.annotation.CheckForNull;
041import org.checkerframework.checker.nullness.qual.Nullable;
042
043/**
044 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in
045 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp},
046 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically.
047 *
048 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run
049 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the
050 * {@link #runOneIteration} that will be executed periodically as specified by its {@link
051 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic
052 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method.
053 *
054 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link
055 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link
056 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start
057 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify
058 * shared state without additional synchronization necessary for visibility to later executions of
059 * the life cycle methods.
060 *
061 * <h3>Usage Example</h3>
062 *
063 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to
064 * rate limit itself.
065 *
066 * <pre>{@code
067 * class CrawlingService extends AbstractScheduledService {
068 *   private Set<Uri> visited;
069 *   private Queue<Uri> toCrawl;
070 *   protected void startUp() throws Exception {
071 *     toCrawl = readStartingUris();
072 *   }
073 *
074 *   protected void runOneIteration() throws Exception {
075 *     Uri uri = toCrawl.remove();
076 *     Collection<Uri> newUris = crawl(uri);
077 *     visited.add(uri);
078 *     for (Uri newUri : newUris) {
079 *       if (!visited.contains(newUri)) { toCrawl.add(newUri); }
080 *     }
081 *   }
082 *
083 *   protected void shutDown() throws Exception {
084 *     saveUris(toCrawl);
085 *   }
086 *
087 *   protected Scheduler scheduler() {
088 *     return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS);
089 *   }
090 * }
091 * }</pre>
092 *
093 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of
094 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to
095 * rate limit the number of queries we perform.
096 *
097 * @author Luke Sandberg
098 * @since 11.0
099 */
100@GwtIncompatible
101@J2ktIncompatible
102@ElementTypesAreNonnullByDefault
103public abstract class AbstractScheduledService implements Service {
104  private static final LazyLogger logger = new LazyLogger(AbstractScheduledService.class);
105
106  /**
107   * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its
108   * task.
109   *
110   * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory
111   * methods, these provide {@link Scheduler} instances for the common use case of running the
112   * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link
113   * CustomScheduler}.
114   *
115   * @author Luke Sandberg
116   * @since 11.0
117   */
118  public abstract static class Scheduler {
119    /**
120     * Returns a {@link Scheduler} that schedules the task using the {@link
121     * ScheduledExecutorService#scheduleWithFixedDelay} method.
122     *
123     * @param initialDelay the time to delay first execution
124     * @param delay the delay between the termination of one execution and the commencement of the
125     *     next
126     * @param unit the time unit of the initialDelay and delay parameters
127     */
128    @SuppressWarnings("GoodTime") // should accept a java.time.Duration
129    public static Scheduler newFixedDelaySchedule(
130        final long initialDelay, final long delay, final TimeUnit unit) {
131      checkNotNull(unit);
132      checkArgument(delay > 0, "delay must be > 0, found %s", delay);
133      return new Scheduler() {
134        @Override
135        public Cancellable schedule(
136            AbstractService service, ScheduledExecutorService executor, Runnable task) {
137          return new FutureAsCancellable(
138              executor.scheduleWithFixedDelay(task, initialDelay, delay, unit));
139        }
140      };
141    }
142
143    /**
144     * Returns a {@link Scheduler} that schedules the task using the {@link
145     * ScheduledExecutorService#scheduleAtFixedRate} method.
146     *
147     * @param initialDelay the time to delay first execution
148     * @param period the period between successive executions of the task
149     * @param unit the time unit of the initialDelay and period parameters
150     */
151    @SuppressWarnings("GoodTime") // should accept a java.time.Duration
152    public static Scheduler newFixedRateSchedule(
153        final long initialDelay, final long period, final TimeUnit unit) {
154      checkNotNull(unit);
155      checkArgument(period > 0, "period must be > 0, found %s", period);
156      return new Scheduler() {
157        @Override
158        public Cancellable schedule(
159            AbstractService service, ScheduledExecutorService executor, Runnable task) {
160          return new FutureAsCancellable(
161              executor.scheduleAtFixedRate(task, initialDelay, period, unit));
162        }
163      };
164    }
165
166    /** Schedules the task to run on the provided executor on behalf of the service. */
167    abstract Cancellable schedule(
168        AbstractService service, ScheduledExecutorService executor, Runnable runnable);
169
170    private Scheduler() {}
171  }
172
173  /* use AbstractService for state management */
174  private final AbstractService delegate = new ServiceDelegate();
175
176  @WeakOuter
177  private final class ServiceDelegate extends AbstractService {
178
179    // A handle to the running task so that we can stop it when a shutdown has been requested.
180    // These two fields are volatile because their values will be accessed from multiple threads.
181    @CheckForNull private volatile Cancellable runningTask;
182    @CheckForNull private volatile ScheduledExecutorService executorService;
183
184    // This lock protects the task so we can ensure that none of the template methods (startUp,
185    // shutDown or runOneIteration) run concurrently with one another.
186    // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the
187    // lock.
188    private final ReentrantLock lock = new ReentrantLock();
189
190    @WeakOuter
191    class Task implements Runnable {
192      @Override
193      public void run() {
194        lock.lock();
195        try {
196          /*
197           * requireNonNull is safe because Task isn't run (or at least it doesn't succeed in taking
198           * the lock) until after it's scheduled and the runningTask field is set.
199           */
200          if (requireNonNull(runningTask).isCancelled()) {
201            // task may have been cancelled while blocked on the lock.
202            return;
203          }
204          AbstractScheduledService.this.runOneIteration();
205        } catch (Throwable t) {
206          restoreInterruptIfIsInterruptedException(t);
207          try {
208            shutDown();
209          } catch (Exception ignored) {
210            restoreInterruptIfIsInterruptedException(ignored);
211            logger
212                .get()
213                .log(
214                    Level.WARNING,
215                    "Error while attempting to shut down the service after failure.",
216                    ignored);
217          }
218          notifyFailed(t);
219          // requireNonNull is safe now, just as it was above.
220          requireNonNull(runningTask).cancel(false); // prevent future invocations.
221        } finally {
222          lock.unlock();
223        }
224      }
225    }
226
227    private final Runnable task = new Task();
228
229    @Override
230    protected final void doStart() {
231      executorService =
232          MoreExecutors.renamingDecorator(executor(), () -> serviceName() + " " + state());
233      executorService.execute(
234          () -> {
235            lock.lock();
236            try {
237              startUp();
238              /*
239               * requireNonNull is safe because executorService is never cleared after the
240               * assignment above.
241               */
242              requireNonNull(executorService);
243              runningTask = scheduler().schedule(delegate, executorService, task);
244              notifyStarted();
245            } catch (Throwable t) {
246              restoreInterruptIfIsInterruptedException(t);
247              notifyFailed(t);
248              if (runningTask != null) {
249                // prevent the task from running if possible
250                runningTask.cancel(false);
251              }
252            } finally {
253              lock.unlock();
254            }
255          });
256    }
257
258    @Override
259    protected final void doStop() {
260      // Both requireNonNull calls are safe because doStop can run only after a successful doStart.
261      requireNonNull(runningTask);
262      requireNonNull(executorService);
263      runningTask.cancel(false);
264      executorService.execute(
265          () -> {
266            try {
267              lock.lock();
268              try {
269                if (state() != State.STOPPING) {
270                  // This means that the state has changed since we were scheduled. This implies
271                  // that an execution of runOneIteration has thrown an exception and we have
272                  // transitioned to a failed state, also this means that shutDown has already
273                  // been called, so we do not want to call it again.
274                  return;
275                }
276                shutDown();
277              } finally {
278                lock.unlock();
279              }
280              notifyStopped();
281            } catch (Throwable t) {
282              restoreInterruptIfIsInterruptedException(t);
283              notifyFailed(t);
284            }
285          });
286    }
287
288    @Override
289    public String toString() {
290      return AbstractScheduledService.this.toString();
291    }
292  }
293
294  /** Constructor for use by subclasses. */
295  protected AbstractScheduledService() {}
296
297  /**
298   * Run one iteration of the scheduled task. If any invocation of this method throws an exception,
299   * the service will transition to the {@link Service.State#FAILED} state and this method will no
300   * longer be called.
301   */
302  protected abstract void runOneIteration() throws Exception;
303
304  /**
305   * Start the service.
306   *
307   * <p>By default this method does nothing.
308   */
309  protected void startUp() throws Exception {}
310
311  /**
312   * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}.
313   *
314   * <p>By default this method does nothing.
315   */
316  protected void shutDown() throws Exception {}
317
318  /**
319   * Returns the {@link Scheduler} object used to configure this service. This method will only be
320   * called once.
321   */
322  // TODO(cpovirk): @ForOverride
323  protected abstract Scheduler scheduler();
324
325  /**
326   * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
327   * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the
328   * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service
329   * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED
330   * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService}
331   * instance. This method is guaranteed to only be called once.
332   *
333   * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread pool
334   * that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, the
335   * pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service
336   * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED
337   * fails}.
338   */
339  protected ScheduledExecutorService executor() {
340    @WeakOuter
341    class ThreadFactoryImpl implements ThreadFactory {
342      @Override
343      public Thread newThread(Runnable runnable) {
344        return MoreExecutors.newThread(serviceName(), runnable);
345      }
346    }
347    final ScheduledExecutorService executor =
348        Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl());
349    // Add a listener to shut down the executor after the service is stopped. This ensures that the
350    // JVM shutdown will not be prevented from exiting after this service has stopped or failed.
351    // Technically this listener is added after start() was called so it is a little gross, but it
352    // is called within doStart() so we know that the service cannot terminate or fail concurrently
353    // with adding this listener so it is impossible to miss an event that we are interested in.
354    addListener(
355        new Listener() {
356          @Override
357          public void terminated(State from) {
358            executor.shutdown();
359          }
360
361          @Override
362          public void failed(State from, Throwable failure) {
363            executor.shutdown();
364          }
365        },
366        directExecutor());
367    return executor;
368  }
369
370  /**
371   * Returns the name of this service. {@link AbstractScheduledService} may include the name in
372   * debugging output.
373   *
374   * @since 14.0
375   */
376  protected String serviceName() {
377    return getClass().getSimpleName();
378  }
379
380  @Override
381  public String toString() {
382    return serviceName() + " [" + state() + "]";
383  }
384
385  @Override
386  public final boolean isRunning() {
387    return delegate.isRunning();
388  }
389
390  @Override
391  public final State state() {
392    return delegate.state();
393  }
394
395  /** @since 13.0 */
396  @Override
397  public final void addListener(Listener listener, Executor executor) {
398    delegate.addListener(listener, executor);
399  }
400
401  /** @since 14.0 */
402  @Override
403  public final Throwable failureCause() {
404    return delegate.failureCause();
405  }
406
407  /** @since 15.0 */
408  @CanIgnoreReturnValue
409  @Override
410  public final Service startAsync() {
411    delegate.startAsync();
412    return this;
413  }
414
415  /** @since 15.0 */
416  @CanIgnoreReturnValue
417  @Override
418  public final Service stopAsync() {
419    delegate.stopAsync();
420    return this;
421  }
422
423  /** @since 15.0 */
424  @Override
425  public final void awaitRunning() {
426    delegate.awaitRunning();
427  }
428
429  /** @since 15.0 */
430  @Override
431  public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException {
432    delegate.awaitRunning(timeout, unit);
433  }
434
435  /** @since 15.0 */
436  @Override
437  public final void awaitTerminated() {
438    delegate.awaitTerminated();
439  }
440
441  /** @since 15.0 */
442  @Override
443  public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException {
444    delegate.awaitTerminated(timeout, unit);
445  }
446
447  interface Cancellable {
448    void cancel(boolean mayInterruptIfRunning);
449
450    boolean isCancelled();
451  }
452
453  private static final class FutureAsCancellable implements Cancellable {
454    private final Future<?> delegate;
455
456    FutureAsCancellable(Future<?> delegate) {
457      this.delegate = delegate;
458    }
459
460    @Override
461    public void cancel(boolean mayInterruptIfRunning) {
462      delegate.cancel(mayInterruptIfRunning);
463    }
464
465    @Override
466    public boolean isCancelled() {
467      return delegate.isCancelled();
468    }
469  }
470
471  /**
472   * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to
473   * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been
474   * cancelled, the {@link #getNextSchedule} method will be called.
475   *
476   * @author Luke Sandberg
477   * @since 11.0
478   */
479  public abstract static class CustomScheduler extends Scheduler {
480
481    /** A callable class that can reschedule itself using a {@link CustomScheduler}. */
482    private final class ReschedulableCallable implements Callable<@Nullable Void> {
483
484      /** The underlying task. */
485      private final Runnable wrappedRunnable;
486
487      /** The executor on which this Callable will be scheduled. */
488      private final ScheduledExecutorService executor;
489
490      /**
491       * The service that is managing this callable. This is used so that failure can be reported
492       * properly.
493       */
494      /*
495       * This reference is part of a reference cycle, which is typically something we want to avoid
496       * under j2objc -- but it is not detected by our j2objc cycle test. The cycle:
497       *
498       * - CustomScheduler.service contains an instance of ServiceDelegate. (It needs it so that it
499       *   can call notifyFailed.)
500       *
501       * - ServiceDelegate.runningTask contains an instance of ReschedulableCallable (at least in
502       *   the case that the service is using CustomScheduler). (It needs it so that it can cancel
503       *   the task and detect whether it has been cancelled.)
504       *
505       * - ReschedulableCallable has a reference back to its enclosing CustomScheduler. (It needs it
506       *   so that it can call getNextSchedule).
507       *
508       * Maybe there is a way to avoid this cycle. But we think the cycle is safe enough to ignore:
509       * Each task is retained for only as long as it is running -- so it's retained only as long as
510       * it would already be retained by the underlying executor.
511       *
512       * If the cycle test starts reporting this cycle in the future, we should add an entry to
513       * cycle_suppress_list.txt.
514       */
515      private final AbstractService service;
516
517      /**
518       * This lock is used to ensure safe and correct cancellation, it ensures that a new task is
519       * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to
520       * ensure that it is assigned atomically with being scheduled.
521       */
522      private final ReentrantLock lock = new ReentrantLock();
523
524      /** The future that represents the next execution of this task. */
525      @GuardedBy("lock")
526      @CheckForNull
527      private SupplantableFuture cancellationDelegate;
528
529      ReschedulableCallable(
530          AbstractService service, ScheduledExecutorService executor, Runnable runnable) {
531        this.wrappedRunnable = runnable;
532        this.executor = executor;
533        this.service = service;
534      }
535
536      @Override
537      @CheckForNull
538      public Void call() throws Exception {
539        wrappedRunnable.run();
540        reschedule();
541        return null;
542      }
543
544      /**
545       * Atomically reschedules this task and assigns the new future to {@link
546       * #cancellationDelegate}.
547       */
548      @CanIgnoreReturnValue
549      public Cancellable reschedule() {
550        // invoke the callback outside the lock, prevents some shenanigans.
551        Schedule schedule;
552        try {
553          schedule = CustomScheduler.this.getNextSchedule();
554        } catch (Throwable t) {
555          restoreInterruptIfIsInterruptedException(t);
556          service.notifyFailed(t);
557          return new FutureAsCancellable(immediateCancelledFuture());
558        }
559        // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that
560        // cancel calls cancel on the correct future. 2. we want to make sure that the assignment
561        // to currentFuture doesn't race with itself so that currentFuture is assigned in the
562        // correct order.
563        Throwable scheduleFailure = null;
564        Cancellable toReturn;
565        lock.lock();
566        try {
567          toReturn = initializeOrUpdateCancellationDelegate(schedule);
568        } catch (Throwable e) {
569          // Any Exception is either a RuntimeException or sneaky checked exception.
570          //
571          // If an exception is thrown by the subclass then we need to make sure that the service
572          // notices and transitions to the FAILED state. We do it by calling notifyFailed directly
573          // because the service does not monitor the state of the future so if the exception is not
574          // caught and forwarded to the service the task would stop executing but the service would
575          // have no idea.
576          // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then
577          // the AbstractService could monitor the future directly. Rescheduling is still hard...
578          // but it would help with some of these lock ordering issues.
579          scheduleFailure = e;
580          toReturn = new FutureAsCancellable(immediateCancelledFuture());
581        } finally {
582          lock.unlock();
583        }
584        // Call notifyFailed outside the lock to avoid lock ordering issues.
585        if (scheduleFailure != null) {
586          service.notifyFailed(scheduleFailure);
587        }
588        return toReturn;
589      }
590
591      @GuardedBy("lock")
592      /*
593       * The GuardedBy checker warns us that we're not holding cancellationDelegate.lock. But in
594       * fact we are holding it because it is the same as this.lock, which we know we are holding,
595       * thanks to @GuardedBy above. (cancellationDelegate.lock is initialized to this.lock in the
596       * call to `new SupplantableFuture` below.)
597       */
598      @SuppressWarnings("GuardedBy")
599      private Cancellable initializeOrUpdateCancellationDelegate(Schedule schedule) {
600        if (cancellationDelegate == null) {
601          return cancellationDelegate = new SupplantableFuture(lock, submitToExecutor(schedule));
602        }
603        if (!cancellationDelegate.currentFuture.isCancelled()) {
604          cancellationDelegate.currentFuture = submitToExecutor(schedule);
605        }
606        return cancellationDelegate;
607      }
608
609      private ScheduledFuture<@Nullable Void> submitToExecutor(Schedule schedule) {
610        return executor.schedule(this, schedule.delay, schedule.unit);
611      }
612    }
613
614    /**
615     * Contains the most recently submitted {@code Future}, which may be cancelled or updated,
616     * always under a lock.
617     */
618    private static final class SupplantableFuture implements Cancellable {
619      private final ReentrantLock lock;
620
621      @GuardedBy("lock")
622      private Future<@Nullable Void> currentFuture;
623
624      SupplantableFuture(ReentrantLock lock, Future<@Nullable Void> currentFuture) {
625        this.lock = lock;
626        this.currentFuture = currentFuture;
627      }
628
629      @Override
630      public void cancel(boolean mayInterruptIfRunning) {
631        /*
632         * Lock to ensure that a task cannot be rescheduled while a cancel is ongoing.
633         *
634         * In theory, cancel() could execute arbitrary listeners -- bad to do while holding a lock.
635         * However, we don't expose currentFuture to users, so they can't attach listeners. And the
636         * Future might not even be a ListenableFuture, just a plain Future. That said, similar
637         * problems can exist with methods like FutureTask.done(), not to mention slow calls to
638         * Thread.interrupt() (as discussed in InterruptibleTask). At the end of the day, it's
639         * unlikely that cancel() will be slow, so we can probably get away with calling it while
640         * holding a lock. Still, it would be nice to avoid somehow.
641         */
642        lock.lock();
643        try {
644          currentFuture.cancel(mayInterruptIfRunning);
645        } finally {
646          lock.unlock();
647        }
648      }
649
650      @Override
651      public boolean isCancelled() {
652        lock.lock();
653        try {
654          return currentFuture.isCancelled();
655        } finally {
656          lock.unlock();
657        }
658      }
659    }
660
661    @Override
662    final Cancellable schedule(
663        AbstractService service, ScheduledExecutorService executor, Runnable runnable) {
664      return new ReschedulableCallable(service, executor, runnable).reschedule();
665    }
666
667    /**
668     * A value object that represents an absolute delay until a task should be invoked.
669     *
670     * @author Luke Sandberg
671     * @since 11.0
672     */
673    protected static final class Schedule {
674
675      private final long delay;
676      private final TimeUnit unit;
677
678      /**
679       * @param delay the time from now to delay execution
680       * @param unit the time unit of the delay parameter
681       */
682      public Schedule(long delay, TimeUnit unit) {
683        this.delay = delay;
684        this.unit = checkNotNull(unit);
685      }
686    }
687
688    /**
689     * Calculates the time at which to next invoke the task.
690     *
691     * <p>This is guaranteed to be called immediately after the task has completed an iteration and
692     * on the same thread as the previous execution of {@link
693     * AbstractScheduledService#runOneIteration}.
694     *
695     * @return a schedule that defines the delay before the next execution.
696     */
697    // TODO(cpovirk): @ForOverride
698    protected abstract Schedule getNextSchedule() throws Exception;
699  }
700}