001    /*
002     * Copyright (C) 2011 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import com.google.common.annotations.Beta;
020    import com.google.common.base.Preconditions;
021    import com.google.common.base.Throwables;
022    
023    import java.util.concurrent.Callable;
024    import java.util.concurrent.Executors;
025    import java.util.concurrent.Future;
026    import java.util.concurrent.ScheduledExecutorService;
027    import java.util.concurrent.TimeUnit;
028    import java.util.concurrent.locks.ReentrantLock;
029    import java.util.logging.Level;
030    import java.util.logging.Logger;
031    
032    import javax.annotation.concurrent.GuardedBy;
033    
034    /**
035     * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 
036     * the "running" state need to perform a periodic task.  Subclasses can implement {@link #startUp},
037     * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically.
038     * 
039     * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run
040     * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 
041     * {@link #runOneIteration} that will be executed periodically as specified by its 
042     * {@link Scheduler}. When this service is asked to stop via {@link #stop} or {@link #stopAndWait}, 
043     * it will cancel the periodic task (but not interrupt it) and wait for it to stop before running 
044     * the {@link #shutDown} method.  
045     * 
046     * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 
047     * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link
048     * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 
049     * late.  Also, all life cycle methods are executed with a lock held, so subclasses can safely 
050     * modify shared state without additional synchronization necessary for visibility to later 
051     * executions of the life cycle methods.
052     * 
053     * <h3>Usage Example</h3>
054     * 
055     * Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 
056     * rate limit itself. <pre> {@code
057     * class CrawlingService extends AbstractScheduledService {
058     *   private Set<Uri> visited;
059     *   private Queue<Uri> toCrawl; 
060     *   protected void startUp() throws Exception {
061     *     toCrawl = readStartingUris();
062     *   }
063     * 
064     *   protected void runOneIteration() throws Exception {
065     *     Uri uri = toCrawl.remove();
066     *     Collection<Uri> newUris = crawl(uri);
067     *     visited.add(uri);
068     *     for (Uri newUri : newUris) {
069     *       if (!visited.contains(newUri)) { toCrawl.add(newUri); }
070     *     }
071     *   }
072     *   
073     *   protected void shutDown() throws Exception {
074     *     saveUris(toCrawl);
075     *   }
076     * 
077     *   protected Scheduler scheduler() {
078     *     return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS);
079     *   }
080     * }}</pre>
081     * 
082     * This class uses the life cycle methods to read in a list of starting URIs and save the set of 
083     * outstanding URIs when shutting down.  Also, it takes advantage of the scheduling functionality to
084     * rate limit the number of queries we perform.
085     * 
086     * @author Luke Sandberg
087     * @since 11.0
088     */
089    @Beta
090    public abstract class AbstractScheduledService implements Service {
091      private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName());
092      
093      /**
094       * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 
095       * task.
096       * 
097       * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 
098       * methods, these provide {@link Scheduler} instances for the common use case of running the 
099       * service with a fixed schedule.  If more flexibility is needed then consider subclassing the 
100       * {@link CustomScheduler} abstract class in preference to creating your own {@link Scheduler} 
101       * implementation. 
102       * 
103       * @author Luke Sandberg
104       * @since 11.0
105       */
106      public abstract static class Scheduler {
107        /**
108         * Returns a {@link Scheduler} that schedules the task using the 
109         * {@link ScheduledExecutorService#scheduleWithFixedDelay} method.
110         * 
111         * @param initialDelay the time to delay first execution
112         * @param delay the delay between the termination of one execution and the commencement of the 
113         *        next
114         * @param unit the time unit of the initialDelay and delay parameters
115         */
116        public static Scheduler newFixedDelaySchedule(final long initialDelay, final long delay, 
117            final TimeUnit unit) {
118          return new Scheduler() {
119            @Override
120            public Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
121                Runnable task) {
122              return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit);
123            } 
124          };
125        }
126    
127        /**
128         * Returns a {@link Scheduler} that schedules the task using the 
129         * {@link ScheduledExecutorService#scheduleAtFixedRate} method.
130         * 
131         * @param initialDelay the time to delay first execution
132         * @param period the period between successive executions of the task
133         * @param unit the time unit of the initialDelay and period parameters
134         */
135        public static Scheduler newFixedRateSchedule(final long initialDelay, final long period, 
136            final TimeUnit unit) {
137          return new Scheduler() {
138            @Override
139            public Future<?> schedule(AbstractService service, ScheduledExecutorService executor,
140                Runnable task) {
141              return executor.scheduleAtFixedRate(task, initialDelay, period, unit);
142            }
143          };
144        }
145        
146        /** Schedules the task to run on the provided executor on behalf of the service.  */
147        abstract Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 
148            Runnable runnable);
149        
150        private Scheduler() {}
151      }
152      
153      /* use AbstractService for state management */
154      private final AbstractService delegate = new AbstractService() {
155        
156        // A handle to the running task so that we can stop it when a shutdown has been requested.
157        // These two fields are volatile because their values will be accessed from multiple threads.
158        private volatile Future<?> runningTask;
159        private volatile ScheduledExecutorService executorService;
160        
161        // This lock protects the task so we can ensure that none of the template methods (startUp, 
162        // shutDown or runOneIteration) run concurrently with one another.
163        private final ReentrantLock lock = new ReentrantLock();
164        
165        private final Runnable task = new Runnable() {
166          @Override public void run() {
167            lock.lock();
168            try {
169              AbstractScheduledService.this.runOneIteration();
170            } catch (Throwable t) {
171              try {
172                shutDown();
173              } catch (Exception ignored) {
174                logger.log(Level.WARNING, 
175                    "Error while attempting to shut down the service after failure.", ignored);
176              }
177              notifyFailed(t);
178              throw Throwables.propagate(t);
179            } finally {
180              lock.unlock();
181            }
182          }
183        };
184        
185        @Override protected final void doStart() {
186          executorService = executor();
187          executorService.execute(new Runnable() {
188            @Override public void run() {
189              lock.lock();
190              try {
191                startUp();
192                runningTask = scheduler().schedule(delegate, executorService, task);
193                notifyStarted();
194              } catch (Throwable t) {
195                notifyFailed(t);
196                throw Throwables.propagate(t);
197              } finally {
198                lock.unlock();
199              }
200            }
201          });
202        }
203    
204        @Override protected final void doStop() {
205          runningTask.cancel(false); 
206          executorService.execute(new Runnable() {
207            @Override public void run() {
208              try {
209                lock.lock();
210                try {
211                  if (state() != State.STOPPING) {
212                    // This means that the state has changed since we were scheduled.  This implies that
213                    // an execution of runOneIteration has thrown an exception and we have transitioned
214                    // to a failed state, also this means that shutDown has already been called, so we
215                    // do not want to call it again.
216                    return;
217                  }
218                  shutDown();
219                } finally {
220                  lock.unlock();
221                }
222                notifyStopped();
223              } catch (Throwable t) {
224                notifyFailed(t);
225                throw Throwables.propagate(t);
226              }
227            }
228          });
229        }
230      };
231      
232      /** 
233       * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 
234       * the service will transition to the {@link Service.State#FAILED} state and this method will no 
235       * longer be called.
236       */
237      protected abstract void runOneIteration() throws Exception;
238    
239      /** Start the service. */
240      protected abstract void startUp() throws Exception;
241    
242      /** Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. */
243      protected abstract void shutDown() throws Exception;
244    
245      /**
246       * Returns the {@link Scheduler} object used to configure this service.  This method will only be
247       * called once. 
248       */
249      protected abstract Scheduler scheduler();
250      
251      /**
252       * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp},
253       * {@link #runOneIteration} and {@link #shutDown} methods.  The executor will not be 
254       * {@link ScheduledExecutorService#shutdown} when this service stops. Subclasses may override this
255       * method to use a custom {@link ScheduledExecutorService} instance.
256       * 
257       * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread
258       * pool.  This method will only be called once.
259       */
260      protected ScheduledExecutorService executor() {
261        return Executors.newSingleThreadScheduledExecutor();
262      }
263    
264      @Override public String toString() {
265        return getClass().getSimpleName() + " [" + state() + "]";
266      }
267    
268      // We override instead of using ForwardingService so that these can be final.
269    
270      @Override public final ListenableFuture<State> start() {
271        return delegate.start();
272      }
273    
274      @Override public final State startAndWait() {
275        return delegate.startAndWait();
276      }
277    
278      @Override public final boolean isRunning() {
279        return delegate.isRunning();
280      }
281    
282      @Override public final State state() {
283        return delegate.state();
284      }
285    
286      @Override public final ListenableFuture<State> stop() {
287        return delegate.stop();
288      }
289    
290      @Override public final State stopAndWait() {
291        return delegate.stopAndWait();
292      }
293      
294      /**
295       * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 
296       * use a dynamically changing schedule.  After every execution of the task, assuming it hasn't 
297       * been cancelled, the {@link #getNextSchedule} method will be called.
298       * 
299       * @author Luke Sandberg
300       * @since 11.0
301       */ 
302      @Beta
303      public abstract static class CustomScheduler extends Scheduler {
304    
305        /**
306         * A callable class that can reschedule itself using a {@link CustomScheduler}.
307         */
308        private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> {
309          
310          /** The underlying task. */
311          private final Runnable wrappedRunnable;
312          
313          /** The executor on which this Callable will be scheduled. */
314          private final ScheduledExecutorService executor;
315          
316          /** 
317           * The service that is managing this callable.  This is used so that failure can be 
318           * reported properly.
319           */
320          private final AbstractService service;
321          
322          /**
323           * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 
324           * not scheduled while a cancel is ongoing.  Also it protects the currentFuture variable to 
325           * ensure that it is assigned atomically with being scheduled.
326           */ 
327          private final ReentrantLock lock = new ReentrantLock();
328          
329          /** The future that represents the next execution of this task.*/
330          @GuardedBy("lock")
331          private Future<Void> currentFuture;
332          
333          ReschedulableCallable(AbstractService service, ScheduledExecutorService executor, 
334              Runnable runnable) {
335            this.wrappedRunnable = runnable;
336            this.executor = executor;
337            this.service = service;
338          }
339          
340          @Override
341          public Void call() throws Exception {
342            wrappedRunnable.run();
343            reschedule();
344            return null;
345          }
346    
347          /**
348           * Atomically reschedules this task and assigns the new future to {@link #currentFuture}.
349           */
350          public void reschedule() {
351            // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that
352            // cancel calls cancel on the correct future. 2. we want to make sure that the assignment
353            // to currentFuture doesn't race with itself so that currentFuture is assigned in the 
354            // correct order.
355            lock.lock();
356            try {
357              if (currentFuture == null || !currentFuture.isCancelled()) {
358                final Schedule schedule = CustomScheduler.this.getNextSchedule();
359                currentFuture = executor.schedule(this, schedule.delay, schedule.unit);
360              }
361            } catch (Throwable e) {
362              // If an exception is thrown by the subclass then we need to make sure that the service
363              // notices and transitions to the FAILED state.  We do it by calling notifyFailed directly
364              // because the service does not monitor the state of the future so if the exception is not
365              // caught and forwarded to the service the task would stop executing but the service would
366              // have no idea.
367              service.notifyFailed(e);
368            } finally {
369              lock.unlock();
370            }
371          }
372          
373          // N.B. Only protect cancel and isCancelled because those are the only methods that are 
374          // invoked by the AbstractScheduledService.
375          @Override
376          public boolean cancel(boolean mayInterruptIfRunning) {
377            // Ensure that a task cannot be rescheduled while a cancel is ongoing.
378            lock.lock();
379            try {
380              return currentFuture.cancel(mayInterruptIfRunning);
381            } finally {
382              lock.unlock();
383            }
384          }
385    
386          @Override
387          protected Future<Void> delegate() {
388            throw new UnsupportedOperationException("Only cancel is supported by this future");
389          }
390        }
391        
392        @Override
393        final Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 
394            Runnable runnable) {
395          ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable);
396          task.reschedule();
397          return task;
398        }
399        
400        /**
401         * A value object that represents an absolute delay until a task should be invoked.
402         * 
403         * @author Luke Sandberg
404         * @since 11.0
405         */
406        @Beta
407        protected static final class Schedule {
408          
409          private final long delay;
410          private final TimeUnit unit;
411          
412          /**
413           * @param delay the time from now to delay execution
414           * @param unit the time unit of the delay parameter
415           */
416          public Schedule(long delay, TimeUnit unit) {
417            this.delay = delay;
418            this.unit = Preconditions.checkNotNull(unit);
419          }
420        }
421        
422        /**
423         * Calculates the time at which to next invoke the task.
424         * 
425         * <p>This is guaranteed to be called immediately after the task has completed an iteration and
426         * on the same thread as the previous execution of {@link 
427         * AbstractScheduledService#runOneIteration}.
428         * 
429         * @return a schedule that defines the delay before the next execution.
430         */
431        protected abstract Schedule getNextSchedule() throws Exception;
432      }
433    }