001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import com.google.common.annotations.Beta; 020import com.google.common.base.Preconditions; 021import com.google.common.base.Throwables; 022 023import java.util.concurrent.Callable; 024import java.util.concurrent.Executor; 025import java.util.concurrent.Executors; 026import java.util.concurrent.Future; 027import java.util.concurrent.ScheduledExecutorService; 028import java.util.concurrent.ThreadFactory; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.locks.ReentrantLock; 031import java.util.logging.Level; 032import java.util.logging.Logger; 033 034import javax.annotation.concurrent.GuardedBy; 035 036/** 037 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 038 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 039 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 040 * 041 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 042 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 043 * {@link #runOneIteration} that will be executed periodically as specified by its 044 * {@link Scheduler}. When this service is asked to stop via {@link #stop} or {@link #stopAndWait}, 045 * it will cancel the periodic task (but not interrupt it) and wait for it to stop before running 046 * the {@link #shutDown} method. 047 * 048 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 049 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 050 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 051 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely 052 * modify shared state without additional synchronization necessary for visibility to later 053 * executions of the life cycle methods. 054 * 055 * <h3>Usage Example</h3> 056 * 057 * Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 058 * rate limit itself. <pre> {@code 059 * class CrawlingService extends AbstractScheduledService { 060 * private Set<Uri> visited; 061 * private Queue<Uri> toCrawl; 062 * protected void startUp() throws Exception { 063 * toCrawl = readStartingUris(); 064 * } 065 * 066 * protected void runOneIteration() throws Exception { 067 * Uri uri = toCrawl.remove(); 068 * Collection<Uri> newUris = crawl(uri); 069 * visited.add(uri); 070 * for (Uri newUri : newUris) { 071 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 072 * } 073 * } 074 * 075 * protected void shutDown() throws Exception { 076 * saveUris(toCrawl); 077 * } 078 * 079 * protected Scheduler scheduler() { 080 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 081 * } 082 * }}</pre> 083 * 084 * This class uses the life cycle methods to read in a list of starting URIs and save the set of 085 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 086 * rate limit the number of queries we perform. 087 * 088 * @author Luke Sandberg 089 * @since 11.0 090 */ 091@Beta 092public abstract class AbstractScheduledService implements Service { 093 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 094 095 /** 096 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 097 * task. 098 * 099 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 100 * methods, these provide {@link Scheduler} instances for the common use case of running the 101 * service with a fixed schedule. If more flexibility is needed then consider subclassing 102 * {@link CustomScheduler}. 103 * 104 * @author Luke Sandberg 105 * @since 11.0 106 */ 107 public abstract static class Scheduler { 108 /** 109 * Returns a {@link Scheduler} that schedules the task using the 110 * {@link ScheduledExecutorService#scheduleWithFixedDelay} method. 111 * 112 * @param initialDelay the time to delay first execution 113 * @param delay the delay between the termination of one execution and the commencement of the 114 * next 115 * @param unit the time unit of the initialDelay and delay parameters 116 */ 117 public static Scheduler newFixedDelaySchedule(final long initialDelay, final long delay, 118 final TimeUnit unit) { 119 return new Scheduler() { 120 @Override 121 public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 122 Runnable task) { 123 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 124 } 125 }; 126 } 127 128 /** 129 * Returns a {@link Scheduler} that schedules the task using the 130 * {@link ScheduledExecutorService#scheduleAtFixedRate} method. 131 * 132 * @param initialDelay the time to delay first execution 133 * @param period the period between successive executions of the task 134 * @param unit the time unit of the initialDelay and period parameters 135 */ 136 public static Scheduler newFixedRateSchedule(final long initialDelay, final long period, 137 final TimeUnit unit) { 138 return new Scheduler() { 139 @Override 140 public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 141 Runnable task) { 142 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 143 } 144 }; 145 } 146 147 /** Schedules the task to run on the provided executor on behalf of the service. */ 148 abstract Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 149 Runnable runnable); 150 151 private Scheduler() {} 152 } 153 154 /* use AbstractService for state management */ 155 private final AbstractService delegate = new AbstractService() { 156 157 // A handle to the running task so that we can stop it when a shutdown has been requested. 158 // These two fields are volatile because their values will be accessed from multiple threads. 159 private volatile Future<?> runningTask; 160 private volatile ScheduledExecutorService executorService; 161 162 // This lock protects the task so we can ensure that none of the template methods (startUp, 163 // shutDown or runOneIteration) run concurrently with one another. 164 private final ReentrantLock lock = new ReentrantLock(); 165 166 private final Runnable task = new Runnable() { 167 @Override public void run() { 168 lock.lock(); 169 try { 170 AbstractScheduledService.this.runOneIteration(); 171 } catch (Throwable t) { 172 try { 173 shutDown(); 174 } catch (Exception ignored) { 175 logger.log(Level.WARNING, 176 "Error while attempting to shut down the service after failure.", ignored); 177 } 178 notifyFailed(t); 179 throw Throwables.propagate(t); 180 } finally { 181 lock.unlock(); 182 } 183 } 184 }; 185 186 @Override protected final void doStart() { 187 executorService = executor(); 188 executorService.execute(new Runnable() { 189 @Override public void run() { 190 lock.lock(); 191 try { 192 startUp(); 193 runningTask = scheduler().schedule(delegate, executorService, task); 194 notifyStarted(); 195 } catch (Throwable t) { 196 notifyFailed(t); 197 throw Throwables.propagate(t); 198 } finally { 199 lock.unlock(); 200 } 201 } 202 }); 203 } 204 205 @Override protected final void doStop() { 206 runningTask.cancel(false); 207 executorService.execute(new Runnable() { 208 @Override public void run() { 209 try { 210 lock.lock(); 211 try { 212 if (state() != State.STOPPING) { 213 // This means that the state has changed since we were scheduled. This implies that 214 // an execution of runOneIteration has thrown an exception and we have transitioned 215 // to a failed state, also this means that shutDown has already been called, so we 216 // do not want to call it again. 217 return; 218 } 219 shutDown(); 220 } finally { 221 lock.unlock(); 222 } 223 notifyStopped(); 224 } catch (Throwable t) { 225 notifyFailed(t); 226 throw Throwables.propagate(t); 227 } 228 } 229 }); 230 } 231 }; 232 233 /** Constructor for use by subclasses. */ 234 protected AbstractScheduledService() {} 235 236 /** 237 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 238 * the service will transition to the {@link Service.State#FAILED} state and this method will no 239 * longer be called. 240 */ 241 protected abstract void runOneIteration() throws Exception; 242 243 /** 244 * Start the service. 245 * 246 * <p>By default this method does nothing. 247 */ 248 protected void startUp() throws Exception {} 249 250 /** 251 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 252 * 253 * <p>By default this method does nothing. 254 */ 255 protected void shutDown() throws Exception {} 256 257 /** 258 * Returns the {@link Scheduler} object used to configure this service. This method will only be 259 * called once. 260 */ 261 protected abstract Scheduler scheduler(); 262 263 /** 264 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 265 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 266 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this 267 * service {@linkplain Service.State#TERMINATED terminates} or 268 * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a 269 * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called 270 * once. 271 * 272 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 273 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. 274 * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the 275 * service {@linkplain Service.State#TERMINATED terminates} or 276 * {@linkplain Service.State#TERMINATED fails}. 277 */ 278 protected ScheduledExecutorService executor() { 279 final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( 280 new ThreadFactory() { 281 @Override public Thread newThread(Runnable runnable) { 282 return MoreExecutors.newThread(serviceName(), runnable); 283 } 284 }); 285 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 286 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 287 // Technically this listener is added after start() was called so it is a little gross, but it 288 // is called within doStart() so we know that the service cannot terminate or fail concurrently 289 // with adding this listener so it is impossible to miss an event that we are interested in. 290 addListener(new Listener() { 291 @Override public void starting() {} 292 @Override public void running() {} 293 @Override public void stopping(State from) {} 294 @Override public void terminated(State from) { 295 executor.shutdown(); 296 } 297 @Override public void failed(State from, Throwable failure) { 298 executor.shutdown(); 299 }}, MoreExecutors.sameThreadExecutor()); 300 return executor; 301 } 302 303 /** 304 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 305 * debugging output. 306 * 307 * @since 14.0 308 */ 309 protected String serviceName() { 310 return getClass().getSimpleName(); 311 } 312 313 @Override public String toString() { 314 return serviceName() + " [" + state() + "]"; 315 } 316 317 // We override instead of using ForwardingService so that these can be final. 318 319 @Override public final ListenableFuture<State> start() { 320 return delegate.start(); 321 } 322 323 @Override public final State startAndWait() { 324 return delegate.startAndWait(); 325 } 326 327 @Override public final boolean isRunning() { 328 return delegate.isRunning(); 329 } 330 331 @Override public final State state() { 332 return delegate.state(); 333 } 334 335 @Override public final ListenableFuture<State> stop() { 336 return delegate.stop(); 337 } 338 339 @Override public final State stopAndWait() { 340 return delegate.stopAndWait(); 341 } 342 343 /** 344 * @since 13.0 345 */ 346 @Override public final void addListener(Listener listener, Executor executor) { 347 delegate.addListener(listener, executor); 348 } 349 350 /** 351 * @since 14.0 352 */ 353 @Override public final Throwable failureCause() { 354 return delegate.failureCause(); 355 } 356 357 /** 358 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 359 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't 360 * been cancelled, the {@link #getNextSchedule} method will be called. 361 * 362 * @author Luke Sandberg 363 * @since 11.0 364 */ 365 @Beta 366 public abstract static class CustomScheduler extends Scheduler { 367 368 /** 369 * A callable class that can reschedule itself using a {@link CustomScheduler}. 370 */ 371 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 372 373 /** The underlying task. */ 374 private final Runnable wrappedRunnable; 375 376 /** The executor on which this Callable will be scheduled. */ 377 private final ScheduledExecutorService executor; 378 379 /** 380 * The service that is managing this callable. This is used so that failure can be 381 * reported properly. 382 */ 383 private final AbstractService service; 384 385 /** 386 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 387 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 388 * ensure that it is assigned atomically with being scheduled. 389 */ 390 private final ReentrantLock lock = new ReentrantLock(); 391 392 /** The future that represents the next execution of this task.*/ 393 @GuardedBy("lock") 394 private Future<Void> currentFuture; 395 396 ReschedulableCallable(AbstractService service, ScheduledExecutorService executor, 397 Runnable runnable) { 398 this.wrappedRunnable = runnable; 399 this.executor = executor; 400 this.service = service; 401 } 402 403 @Override 404 public Void call() throws Exception { 405 wrappedRunnable.run(); 406 reschedule(); 407 return null; 408 } 409 410 /** 411 * Atomically reschedules this task and assigns the new future to {@link #currentFuture}. 412 */ 413 public void reschedule() { 414 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 415 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 416 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 417 // correct order. 418 lock.lock(); 419 try { 420 if (currentFuture == null || !currentFuture.isCancelled()) { 421 final Schedule schedule = CustomScheduler.this.getNextSchedule(); 422 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 423 } 424 } catch (Throwable e) { 425 // If an exception is thrown by the subclass then we need to make sure that the service 426 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 427 // because the service does not monitor the state of the future so if the exception is not 428 // caught and forwarded to the service the task would stop executing but the service would 429 // have no idea. 430 service.notifyFailed(e); 431 } finally { 432 lock.unlock(); 433 } 434 } 435 436 // N.B. Only protect cancel and isCancelled because those are the only methods that are 437 // invoked by the AbstractScheduledService. 438 @Override 439 public boolean cancel(boolean mayInterruptIfRunning) { 440 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 441 lock.lock(); 442 try { 443 return currentFuture.cancel(mayInterruptIfRunning); 444 } finally { 445 lock.unlock(); 446 } 447 } 448 449 @Override 450 protected Future<Void> delegate() { 451 throw new UnsupportedOperationException("Only cancel is supported by this future"); 452 } 453 } 454 455 @Override 456 final Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 457 Runnable runnable) { 458 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 459 task.reschedule(); 460 return task; 461 } 462 463 /** 464 * A value object that represents an absolute delay until a task should be invoked. 465 * 466 * @author Luke Sandberg 467 * @since 11.0 468 */ 469 @Beta 470 protected static final class Schedule { 471 472 private final long delay; 473 private final TimeUnit unit; 474 475 /** 476 * @param delay the time from now to delay execution 477 * @param unit the time unit of the delay parameter 478 */ 479 public Schedule(long delay, TimeUnit unit) { 480 this.delay = delay; 481 this.unit = Preconditions.checkNotNull(unit); 482 } 483 } 484 485 /** 486 * Calculates the time at which to next invoke the task. 487 * 488 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 489 * on the same thread as the previous execution of {@link 490 * AbstractScheduledService#runOneIteration}. 491 * 492 * @return a schedule that defines the delay before the next execution. 493 */ 494 protected abstract Schedule getNextSchedule() throws Exception; 495 } 496}