001    /*
002     * Copyright (C) 2011 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import com.google.common.annotations.Beta;
020    import com.google.common.base.Preconditions;
021    import com.google.common.base.Throwables;
022    
023    import java.util.concurrent.Callable;
024    import java.util.concurrent.Executors;
025    import java.util.concurrent.Future;
026    import java.util.concurrent.ScheduledExecutorService;
027    import java.util.concurrent.TimeUnit;
028    import java.util.concurrent.locks.ReentrantLock;
029    import java.util.logging.Level;
030    import java.util.logging.Logger;
031    
032    import javax.annotation.concurrent.GuardedBy;
033    
034    /**
035     * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 
036     * the "running" state need to perform a periodic task.  Subclasses can implement {@link #startUp},
037     * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically.
038     * 
039     * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run
040     * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 
041     * {@link #runOneIteration} that will be executed periodically as specified by its 
042     * {@link Scheduler}. When this service is asked to stop via {@link #stop} or {@link #stopAndWait}, 
043     * it will cancel the periodic task (but not interrupt it) and wait for it to stop before running 
044     * the {@link #shutDown} method.  
045     * 
046     * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 
047     * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link
048     * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 
049     * late.  Also, all life cycle methods are executed with a lock held, so subclasses can safely 
050     * modify shared state without additional synchronization necessary for visibility to later 
051     * executions of the life cycle methods.
052     * 
053     * <h3>Usage Example</h3>
054     * 
055     * Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 
056     * rate limit itself. <pre> {@code
057     * class CrawlingService extends AbstractScheduledService {
058     *   private Set<Uri> visited;
059     *   private Queue<Uri> toCrawl; 
060     *   protected void startUp() throws Exception {
061     *     toCrawl = readStartingUris();
062     *   }
063     * 
064     *   protected void runOneIteration() throws Exception {
065     *     Uri uri = toCrawl.remove();
066     *     Collection<Uri> newUris = crawl(uri);
067     *     visited.add(uri);
068     *     for (Uri newUri : newUris) {
069     *       if (!visited.contains(newUri)) { toCrawl.add(newUri); }
070     *     }
071     *   }
072     *   
073     *   protected void shutDown() throws Exception {
074     *     saveUris(toCrawl);
075     *   }
076     * 
077     *   protected Scheduler scheduler() {
078     *     return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS);
079     *   }
080     * }}</pre>
081     * 
082     * This class uses the life cycle methods to read in a list of starting URIs and save the set of 
083     * outstanding URIs when shutting down.  Also, it takes advantage of the scheduling functionality to
084     * rate limit the number of queries we perform.
085     * 
086     * @author Luke Sandberg
087     * @since 11.0
088     */
089    @Beta
090    public abstract class AbstractScheduledService implements Service {
091      private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName());
092      
093      /**
094       * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 
095       * task.
096       * 
097       * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 
098       * methods, these provide {@link Scheduler} instances for the common use case of running the 
099       * service with a fixed schedule.  If more flexibility is needed then consider subclassing the 
100       * {@link CustomScheduler} abstract class in preference to creating your own {@link Scheduler} 
101       * implementation. 
102       * 
103       * @author Luke Sandberg
104       * @since 11.0
105       */
106      public abstract static class Scheduler {
107        /**
108         * Returns a {@link Scheduler} that schedules the task using the 
109         * {@link ScheduledExecutorService#scheduleWithFixedDelay} method.
110         * 
111         * @param initialDelay the time to delay first execution
112         * @param delay the delay between the termination of one execution and the commencement of the 
113         *        next
114         * @param unit the time unit of the initialDelay and delay parameters
115         */
116        public static Scheduler newFixedDelaySchedule(final long initialDelay, final long delay, 
117            final TimeUnit unit) {
118          return new Scheduler() {
119            @Override
120            public Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
121                Runnable task) {
122              return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit);
123            } 
124          };
125        }
126    
127        /**
128         * Returns a {@link Scheduler} that schedules the task using the 
129         * {@link ScheduledExecutorService#scheduleAtFixedRate} method.
130         * 
131         * @param initialDelay the time to delay first execution
132         * @param period the period between successive executions of the task
133         * @param unit the time unit of the initialDelay and period parameters
134         */
135        public static Scheduler newFixedRateSchedule(final long initialDelay, final long period, 
136            final TimeUnit unit) {
137          return new Scheduler() {
138            @Override
139            public Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
140                Runnable task) {
141              return executor.scheduleAtFixedRate(task, initialDelay, period, unit);
142            }
143          };
144        }
145        
146        /** Schedules the task to run on the provided executor on behalf of the service.  */
147        abstract Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 
148            Runnable runnable);
149        
150        private Scheduler() {}
151      }
152      
153      /* use AbstractService for state management */
154      private final AbstractService delegate = new AbstractService() {
155        
156        // A handle to the running task so that we can stop it when a shutdown has been requested.
157        // These two fields are volatile because their values will be accessed from multiple threads.
158        private volatile Future<?> runningTask;
159        private volatile ScheduledExecutorService executorService;
160        
161        // This lock protects the task so we can ensure that none of the template methods (startUp, 
162        // shutDown or runOneIteration) run concurrently with one another.
163        private final ReentrantLock lock = new ReentrantLock();
164        
165        private final Runnable task = new Runnable() {
166          @Override public void run() {
167            lock.lock();
168            try {
169              AbstractScheduledService.this.runOneIteration();
170            } catch (Throwable t) {
171              try {
172                shutDown();
173              } catch (Exception ignored) {
174                logger.log(Level.WARNING, 
175                    "Error while attempting to shut down the service after failure.", ignored);
176              }
177              notifyFailed(t);
178              throw Throwables.propagate(t);
179            } finally {
180              lock.unlock();
181            }
182          }
183        };
184        
185        @Override protected final void doStart() {
186          executorService = executor();
187          executorService.execute(new Runnable() {
188            @Override public void run() {
189              lock.lock();
190              try {
191                startUp();
192                runningTask = scheduler().schedule(delegate, executorService, task);
193                notifyStarted();
194              } catch (Throwable t) {
195                notifyFailed(t);
196                throw Throwables.propagate(t);
197              } finally {
198                lock.unlock();
199              }
200            }
201          });
202        }
203    
204        @Override protected final void doStop() {
205          runningTask.cancel(false); 
206          executorService.execute(new Runnable() {
207            @Override public void run() {
208              try {
209                lock.lock();
210                try {
211                  if (state() != State.STOPPING) {
212                    // This means that the state has changed since we were scheduled.  This implies that
213                    // an execution of runOneIteration has thrown an exception and we have transitioned
214                    // to a failed state, also this means that shutDown has already been called, so we
215                    // do not want to call it again.
216                    return;
217                  }
218                  shutDown();
219                } finally {
220                  lock.unlock();
221                }
222                notifyStopped();
223              } catch (Throwable t) {
224                notifyFailed(t);
225                throw Throwables.propagate(t);
226              }
227            }
228          });
229        }
230      };
231      
232      /** 
233       * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 
234       * the service will transition to the {@link Service.State#FAILED} state and this method will no 
235       * longer be called.
236       */
237      protected abstract void runOneIteration() throws Exception;
238    
239      /** 
240       * Start the service.
241       * 
242       * <p>By default this method does nothing.
243       */
244      protected void startUp() throws Exception {}
245    
246      /**
247       * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}.
248       * 
249       * <p>By default this method does nothing. 
250       */
251      protected void shutDown() throws Exception {}
252    
253      /**
254       * Returns the {@link Scheduler} object used to configure this service.  This method will only be
255       * called once. 
256       */
257      protected abstract Scheduler scheduler();
258      
259      /**
260       * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
261       * {@link #runOneIteration} and {@link #shutDown} methods.  The executor will not be 
262       * {@link ScheduledExecutorService#shutdown} when this service stops. Subclasses may override this
263       * method to use a custom {@link ScheduledExecutorService} instance.
264       * 
265       * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread
266       * pool.  This method will only be called once.
267       */
268      protected ScheduledExecutorService executor() {
269        return Executors.newSingleThreadScheduledExecutor();
270      }
271    
272      @Override public String toString() {
273        return getClass().getSimpleName() + " [" + state() + "]";
274      }
275    
276      // We override instead of using ForwardingService so that these can be final.
277    
278      @Override public final ListenableFuture<State> start() {
279        return delegate.start();
280      }
281    
282      @Override public final State startAndWait() {
283        return delegate.startAndWait();
284      }
285    
286      @Override public final boolean isRunning() {
287        return delegate.isRunning();
288      }
289    
290      @Override public final State state() {
291        return delegate.state();
292      }
293    
294      @Override public final ListenableFuture<State> stop() {
295        return delegate.stop();
296      }
297    
298      @Override public final State stopAndWait() {
299        return delegate.stopAndWait();
300      }
301      
302      /**
303       * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 
304       * use a dynamically changing schedule.  After every execution of the task, assuming it hasn't 
305       * been cancelled, the {@link #getNextSchedule} method will be called.
306       * 
307       * @author Luke Sandberg
308       * @since 11.0
309       */ 
310      @Beta
311      public abstract static class CustomScheduler extends Scheduler {
312    
313        /**
314         * A callable class that can reschedule itself using a {@link CustomScheduler}.
315         */
316        private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> {
317          
318          /** The underlying task. */
319          private final Runnable wrappedRunnable;
320          
321          /** The executor on which this Callable will be scheduled. */
322          private final ScheduledExecutorService executor;
323          
324          /** 
325           * The service that is managing this callable.  This is used so that failure can be 
326           * reported properly.
327           */
328          private final AbstractService service;
329          
330          /**
331           * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 
332           * not scheduled while a cancel is ongoing.  Also it protects the currentFuture variable to 
333           * ensure that it is assigned atomically with being scheduled.
334           */ 
335          private final ReentrantLock lock = new ReentrantLock();
336          
337          /** The future that represents the next execution of this task.*/
338          @GuardedBy("lock")
339          private Future<Void> currentFuture;
340          
341          ReschedulableCallable(AbstractService service, ScheduledExecutorService executor, 
342              Runnable runnable) {
343            this.wrappedRunnable = runnable;
344            this.executor = executor;
345            this.service = service;
346          }
347          
348          @Override
349          public Void call() throws Exception {
350            wrappedRunnable.run();
351            reschedule();
352            return null;
353          }
354    
355          /**
356           * Atomically reschedules this task and assigns the new future to {@link #currentFuture}.
357           */
358          public void reschedule() {
359            // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that
360            // cancel calls cancel on the correct future. 2. we want to make sure that the assignment
361            // to currentFuture doesn't race with itself so that currentFuture is assigned in the 
362            // correct order.
363            lock.lock();
364            try {
365              if (currentFuture == null || !currentFuture.isCancelled()) {
366                final Schedule schedule = CustomScheduler.this.getNextSchedule();
367                currentFuture = executor.schedule(this, schedule.delay, schedule.unit);
368              }
369            } catch (Throwable e) {
370              // If an exception is thrown by the subclass then we need to make sure that the service
371              // notices and transitions to the FAILED state.  We do it by calling notifyFailed directly
372              // because the service does not monitor the state of the future so if the exception is not
373              // caught and forwarded to the service the task would stop executing but the service would
374              // have no idea.
375              service.notifyFailed(e);
376            } finally {
377              lock.unlock();
378            }
379          }
380          
381          // N.B. Only protect cancel and isCancelled because those are the only methods that are 
382          // invoked by the AbstractScheduledService.
383          @Override
384          public boolean cancel(boolean mayInterruptIfRunning) {
385            // Ensure that a task cannot be rescheduled while a cancel is ongoing.
386            lock.lock();
387            try {
388              return currentFuture.cancel(mayInterruptIfRunning);
389            } finally {
390              lock.unlock();
391            }
392          }
393    
394          @Override
395          protected Future<Void> delegate() {
396            throw new UnsupportedOperationException("Only cancel is supported by this future");
397          }
398        }
399        
400        @Override
401        final Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 
402            Runnable runnable) {
403          ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable);
404          task.reschedule();
405          return task;
406        }
407        
408        /**
409         * A value object that represents an absolute delay until a task should be invoked.
410         * 
411         * @author Luke Sandberg
412         * @since 11.0
413         */
414        @Beta
415        protected static final class Schedule {
416          
417          private final long delay;
418          private final TimeUnit unit;
419          
420          /**
421           * @param delay the time from now to delay execution
422           * @param unit the time unit of the delay parameter
423           */
424          public Schedule(long delay, TimeUnit unit) {
425            this.delay = delay;
426            this.unit = Preconditions.checkNotNull(unit);
427          }
428        }
429        
430        /**
431         * Calculates the time at which to next invoke the task.
432         * 
433         * <p>This is guaranteed to be called immediately after the task has completed an iteration and
434         * on the same thread as the previous execution of {@link 
435         * AbstractScheduledService#runOneIteration}.
436         * 
437         * @return a schedule that defines the delay before the next execution.
438         */
439        protected abstract Schedule getNextSchedule() throws Exception;
440      }
441    }