001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import com.google.common.annotations.Beta; 020import com.google.common.base.Preconditions; 021import com.google.common.base.Supplier; 022import com.google.common.base.Throwables; 023 024import java.util.concurrent.Callable; 025import java.util.concurrent.Executor; 026import java.util.concurrent.Executors; 027import java.util.concurrent.Future; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.TimeoutException; 032import java.util.concurrent.locks.ReentrantLock; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036import javax.annotation.concurrent.GuardedBy; 037 038/** 039 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 040 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 041 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 042 * 043 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 044 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 045 * {@link #runOneIteration} that will be executed periodically as specified by its 046 * {@link Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the 047 * periodic task (but not interrupt it) and wait for it to stop before running the 048 * {@link #shutDown} method. 049 * 050 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 051 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 052 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 053 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely 054 * modify shared state without additional synchronization necessary for visibility to later 055 * executions of the life cycle methods. 056 * 057 * <h3>Usage Example</h3> 058 * 059 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 060 * rate limit itself. <pre> {@code 061 * class CrawlingService extends AbstractScheduledService { 062 * private Set<Uri> visited; 063 * private Queue<Uri> toCrawl; 064 * protected void startUp() throws Exception { 065 * toCrawl = readStartingUris(); 066 * } 067 * 068 * protected void runOneIteration() throws Exception { 069 * Uri uri = toCrawl.remove(); 070 * Collection<Uri> newUris = crawl(uri); 071 * visited.add(uri); 072 * for (Uri newUri : newUris) { 073 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 074 * } 075 * } 076 * 077 * protected void shutDown() throws Exception { 078 * saveUris(toCrawl); 079 * } 080 * 081 * protected Scheduler scheduler() { 082 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 083 * } 084 * }}</pre> 085 * 086 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 087 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 088 * rate limit the number of queries we perform. 089 * 090 * @author Luke Sandberg 091 * @since 11.0 092 */ 093@Beta 094public abstract class AbstractScheduledService implements Service { 095 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 096 097 /** 098 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 099 * task. 100 * 101 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 102 * methods, these provide {@link Scheduler} instances for the common use case of running the 103 * service with a fixed schedule. If more flexibility is needed then consider subclassing 104 * {@link CustomScheduler}. 105 * 106 * @author Luke Sandberg 107 * @since 11.0 108 */ 109 public abstract static class Scheduler { 110 /** 111 * Returns a {@link Scheduler} that schedules the task using the 112 * {@link ScheduledExecutorService#scheduleWithFixedDelay} method. 113 * 114 * @param initialDelay the time to delay first execution 115 * @param delay the delay between the termination of one execution and the commencement of the 116 * next 117 * @param unit the time unit of the initialDelay and delay parameters 118 */ 119 public static Scheduler newFixedDelaySchedule(final long initialDelay, final long delay, 120 final TimeUnit unit) { 121 return new Scheduler() { 122 @Override 123 public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 124 Runnable task) { 125 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 126 } 127 }; 128 } 129 130 /** 131 * Returns a {@link Scheduler} that schedules the task using the 132 * {@link ScheduledExecutorService#scheduleAtFixedRate} method. 133 * 134 * @param initialDelay the time to delay first execution 135 * @param period the period between successive executions of the task 136 * @param unit the time unit of the initialDelay and period parameters 137 */ 138 public static Scheduler newFixedRateSchedule(final long initialDelay, final long period, 139 final TimeUnit unit) { 140 return new Scheduler() { 141 @Override 142 public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 143 Runnable task) { 144 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 145 } 146 }; 147 } 148 149 /** Schedules the task to run on the provided executor on behalf of the service. */ 150 abstract Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 151 Runnable runnable); 152 153 private Scheduler() {} 154 } 155 156 /* use AbstractService for state management */ 157 private final AbstractService delegate = new AbstractService() { 158 159 // A handle to the running task so that we can stop it when a shutdown has been requested. 160 // These two fields are volatile because their values will be accessed from multiple threads. 161 private volatile Future<?> runningTask; 162 private volatile ScheduledExecutorService executorService; 163 164 // This lock protects the task so we can ensure that none of the template methods (startUp, 165 // shutDown or runOneIteration) run concurrently with one another. 166 private final ReentrantLock lock = new ReentrantLock(); 167 168 private final Runnable task = new Runnable() { 169 @Override public void run() { 170 lock.lock(); 171 try { 172 AbstractScheduledService.this.runOneIteration(); 173 } catch (Throwable t) { 174 try { 175 shutDown(); 176 } catch (Exception ignored) { 177 logger.log(Level.WARNING, 178 "Error while attempting to shut down the service after failure.", ignored); 179 } 180 notifyFailed(t); 181 throw Throwables.propagate(t); 182 } finally { 183 lock.unlock(); 184 } 185 } 186 }; 187 188 @Override protected final void doStart() { 189 executorService = MoreExecutors.renamingDecorator(executor(), new Supplier<String>() { 190 @Override public String get() { 191 return serviceName() + " " + state(); 192 } 193 }); 194 executorService.execute(new Runnable() { 195 @Override public void run() { 196 lock.lock(); 197 try { 198 startUp(); 199 runningTask = scheduler().schedule(delegate, executorService, task); 200 notifyStarted(); 201 } catch (Throwable t) { 202 notifyFailed(t); 203 throw Throwables.propagate(t); 204 } finally { 205 lock.unlock(); 206 } 207 } 208 }); 209 } 210 211 @Override protected final void doStop() { 212 runningTask.cancel(false); 213 executorService.execute(new Runnable() { 214 @Override public void run() { 215 try { 216 lock.lock(); 217 try { 218 if (state() != State.STOPPING) { 219 // This means that the state has changed since we were scheduled. This implies that 220 // an execution of runOneIteration has thrown an exception and we have transitioned 221 // to a failed state, also this means that shutDown has already been called, so we 222 // do not want to call it again. 223 return; 224 } 225 shutDown(); 226 } finally { 227 lock.unlock(); 228 } 229 notifyStopped(); 230 } catch (Throwable t) { 231 notifyFailed(t); 232 throw Throwables.propagate(t); 233 } 234 } 235 }); 236 } 237 }; 238 239 /** Constructor for use by subclasses. */ 240 protected AbstractScheduledService() {} 241 242 /** 243 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 244 * the service will transition to the {@link Service.State#FAILED} state and this method will no 245 * longer be called. 246 */ 247 protected abstract void runOneIteration() throws Exception; 248 249 /** 250 * Start the service. 251 * 252 * <p>By default this method does nothing. 253 */ 254 protected void startUp() throws Exception {} 255 256 /** 257 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 258 * 259 * <p>By default this method does nothing. 260 */ 261 protected void shutDown() throws Exception {} 262 263 /** 264 * Returns the {@link Scheduler} object used to configure this service. This method will only be 265 * called once. 266 */ 267 protected abstract Scheduler scheduler(); 268 269 /** 270 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 271 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 272 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this 273 * service {@linkplain Service.State#TERMINATED terminates} or 274 * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a 275 * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called 276 * once. 277 * 278 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 279 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. 280 * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the 281 * service {@linkplain Service.State#TERMINATED terminates} or 282 * {@linkplain Service.State#TERMINATED fails}. 283 */ 284 protected ScheduledExecutorService executor() { 285 final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( 286 new ThreadFactory() { 287 @Override public Thread newThread(Runnable runnable) { 288 return MoreExecutors.newThread(serviceName(), runnable); 289 } 290 }); 291 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 292 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 293 // Technically this listener is added after start() was called so it is a little gross, but it 294 // is called within doStart() so we know that the service cannot terminate or fail concurrently 295 // with adding this listener so it is impossible to miss an event that we are interested in. 296 addListener(new Listener() { 297 @Override public void terminated(State from) { 298 executor.shutdown(); 299 } 300 @Override public void failed(State from, Throwable failure) { 301 executor.shutdown(); 302 }}, MoreExecutors.sameThreadExecutor()); 303 return executor; 304 } 305 306 /** 307 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 308 * debugging output. 309 * 310 * @since 14.0 311 */ 312 protected String serviceName() { 313 return getClass().getSimpleName(); 314 } 315 316 @Override public String toString() { 317 return serviceName() + " [" + state() + "]"; 318 } 319 320 @Override public final boolean isRunning() { 321 return delegate.isRunning(); 322 } 323 324 @Override public final State state() { 325 return delegate.state(); 326 } 327 328 /** 329 * @since 13.0 330 */ 331 @Override public final void addListener(Listener listener, Executor executor) { 332 delegate.addListener(listener, executor); 333 } 334 335 /** 336 * @since 14.0 337 */ 338 @Override public final Throwable failureCause() { 339 return delegate.failureCause(); 340 } 341 342 /** 343 * @since 15.0 344 */ 345 @Override public final Service startAsync() { 346 delegate.startAsync(); 347 return this; 348 } 349 350 /** 351 * @since 15.0 352 */ 353 @Override public final Service stopAsync() { 354 delegate.stopAsync(); 355 return this; 356 } 357 358 /** 359 * @since 15.0 360 */ 361 @Override public final void awaitRunning() { 362 delegate.awaitRunning(); 363 } 364 365 /** 366 * @since 15.0 367 */ 368 @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 369 delegate.awaitRunning(timeout, unit); 370 } 371 372 /** 373 * @since 15.0 374 */ 375 @Override public final void awaitTerminated() { 376 delegate.awaitTerminated(); 377 } 378 379 /** 380 * @since 15.0 381 */ 382 @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 383 delegate.awaitTerminated(timeout, unit); 384 } 385 386 /** 387 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 388 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't 389 * been cancelled, the {@link #getNextSchedule} method will be called. 390 * 391 * @author Luke Sandberg 392 * @since 11.0 393 */ 394 @Beta 395 public abstract static class CustomScheduler extends Scheduler { 396 397 /** 398 * A callable class that can reschedule itself using a {@link CustomScheduler}. 399 */ 400 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 401 402 /** The underlying task. */ 403 private final Runnable wrappedRunnable; 404 405 /** The executor on which this Callable will be scheduled. */ 406 private final ScheduledExecutorService executor; 407 408 /** 409 * The service that is managing this callable. This is used so that failure can be 410 * reported properly. 411 */ 412 private final AbstractService service; 413 414 /** 415 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 416 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 417 * ensure that it is assigned atomically with being scheduled. 418 */ 419 private final ReentrantLock lock = new ReentrantLock(); 420 421 /** The future that represents the next execution of this task.*/ 422 @GuardedBy("lock") 423 private Future<Void> currentFuture; 424 425 ReschedulableCallable(AbstractService service, ScheduledExecutorService executor, 426 Runnable runnable) { 427 this.wrappedRunnable = runnable; 428 this.executor = executor; 429 this.service = service; 430 } 431 432 @Override 433 public Void call() throws Exception { 434 wrappedRunnable.run(); 435 reschedule(); 436 return null; 437 } 438 439 /** 440 * Atomically reschedules this task and assigns the new future to {@link #currentFuture}. 441 */ 442 public void reschedule() { 443 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 444 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 445 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 446 // correct order. 447 lock.lock(); 448 try { 449 if (currentFuture == null || !currentFuture.isCancelled()) { 450 final Schedule schedule = CustomScheduler.this.getNextSchedule(); 451 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 452 } 453 } catch (Throwable e) { 454 // If an exception is thrown by the subclass then we need to make sure that the service 455 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 456 // because the service does not monitor the state of the future so if the exception is not 457 // caught and forwarded to the service the task would stop executing but the service would 458 // have no idea. 459 service.notifyFailed(e); 460 } finally { 461 lock.unlock(); 462 } 463 } 464 465 // N.B. Only protect cancel and isCancelled because those are the only methods that are 466 // invoked by the AbstractScheduledService. 467 @Override 468 public boolean cancel(boolean mayInterruptIfRunning) { 469 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 470 lock.lock(); 471 try { 472 return currentFuture.cancel(mayInterruptIfRunning); 473 } finally { 474 lock.unlock(); 475 } 476 } 477 478 @Override 479 protected Future<Void> delegate() { 480 throw new UnsupportedOperationException("Only cancel is supported by this future"); 481 } 482 } 483 484 @Override 485 final Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 486 Runnable runnable) { 487 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 488 task.reschedule(); 489 return task; 490 } 491 492 /** 493 * A value object that represents an absolute delay until a task should be invoked. 494 * 495 * @author Luke Sandberg 496 * @since 11.0 497 */ 498 @Beta 499 protected static final class Schedule { 500 501 private final long delay; 502 private final TimeUnit unit; 503 504 /** 505 * @param delay the time from now to delay execution 506 * @param unit the time unit of the delay parameter 507 */ 508 public Schedule(long delay, TimeUnit unit) { 509 this.delay = delay; 510 this.unit = Preconditions.checkNotNull(unit); 511 } 512 } 513 514 /** 515 * Calculates the time at which to next invoke the task. 516 * 517 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 518 * on the same thread as the previous execution of {@link 519 * AbstractScheduledService#runOneIteration}. 520 * 521 * @return a schedule that defines the delay before the next execution. 522 */ 523 protected abstract Schedule getNextSchedule() throws Exception; 524 } 525}