001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import com.google.common.annotations.Beta; 020import com.google.common.base.Preconditions; 021import com.google.common.base.Supplier; 022import com.google.common.base.Throwables; 023 024import java.util.concurrent.Callable; 025import java.util.concurrent.Executor; 026import java.util.concurrent.Executors; 027import java.util.concurrent.Future; 028import java.util.concurrent.ScheduledExecutorService; 029import java.util.concurrent.ThreadFactory; 030import java.util.concurrent.TimeUnit; 031import java.util.concurrent.TimeoutException; 032import java.util.concurrent.locks.ReentrantLock; 033import java.util.logging.Level; 034import java.util.logging.Logger; 035 036import javax.annotation.concurrent.GuardedBy; 037 038/** 039 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 040 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 041 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 042 * 043 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 044 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 045 * {@link #runOneIteration} that will be executed periodically as specified by its 046 * {@link Scheduler}. When this service is asked to stop via {@link #stop} or {@link #stopAndWait}, 047 * it will cancel the periodic task (but not interrupt it) and wait for it to stop before running 048 * the {@link #shutDown} method. 049 * 050 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 051 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 052 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 053 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely 054 * modify shared state without additional synchronization necessary for visibility to later 055 * executions of the life cycle methods. 056 * 057 * <h3>Usage Example</h3> 058 * 059 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 060 * rate limit itself. <pre> {@code 061 * class CrawlingService extends AbstractScheduledService { 062 * private Set<Uri> visited; 063 * private Queue<Uri> toCrawl; 064 * protected void startUp() throws Exception { 065 * toCrawl = readStartingUris(); 066 * } 067 * 068 * protected void runOneIteration() throws Exception { 069 * Uri uri = toCrawl.remove(); 070 * Collection<Uri> newUris = crawl(uri); 071 * visited.add(uri); 072 * for (Uri newUri : newUris) { 073 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 074 * } 075 * } 076 * 077 * protected void shutDown() throws Exception { 078 * saveUris(toCrawl); 079 * } 080 * 081 * protected Scheduler scheduler() { 082 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 083 * } 084 * }}</pre> 085 * 086 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 087 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 088 * rate limit the number of queries we perform. 089 * 090 * @author Luke Sandberg 091 * @since 11.0 092 */ 093@Beta 094public abstract class AbstractScheduledService implements Service { 095 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 096 097 /** 098 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 099 * task. 100 * 101 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 102 * methods, these provide {@link Scheduler} instances for the common use case of running the 103 * service with a fixed schedule. If more flexibility is needed then consider subclassing 104 * {@link CustomScheduler}. 105 * 106 * @author Luke Sandberg 107 * @since 11.0 108 */ 109 public abstract static class Scheduler { 110 /** 111 * Returns a {@link Scheduler} that schedules the task using the 112 * {@link ScheduledExecutorService#scheduleWithFixedDelay} method. 113 * 114 * @param initialDelay the time to delay first execution 115 * @param delay the delay between the termination of one execution and the commencement of the 116 * next 117 * @param unit the time unit of the initialDelay and delay parameters 118 */ 119 public static Scheduler newFixedDelaySchedule(final long initialDelay, final long delay, 120 final TimeUnit unit) { 121 return new Scheduler() { 122 @Override 123 public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 124 Runnable task) { 125 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 126 } 127 }; 128 } 129 130 /** 131 * Returns a {@link Scheduler} that schedules the task using the 132 * {@link ScheduledExecutorService#scheduleAtFixedRate} method. 133 * 134 * @param initialDelay the time to delay first execution 135 * @param period the period between successive executions of the task 136 * @param unit the time unit of the initialDelay and period parameters 137 */ 138 public static Scheduler newFixedRateSchedule(final long initialDelay, final long period, 139 final TimeUnit unit) { 140 return new Scheduler() { 141 @Override 142 public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 143 Runnable task) { 144 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 145 } 146 }; 147 } 148 149 /** Schedules the task to run on the provided executor on behalf of the service. */ 150 abstract Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 151 Runnable runnable); 152 153 private Scheduler() {} 154 } 155 156 /* use AbstractService for state management */ 157 private final AbstractService delegate = new AbstractService() { 158 159 // A handle to the running task so that we can stop it when a shutdown has been requested. 160 // These two fields are volatile because their values will be accessed from multiple threads. 161 private volatile Future<?> runningTask; 162 private volatile ScheduledExecutorService executorService; 163 164 // This lock protects the task so we can ensure that none of the template methods (startUp, 165 // shutDown or runOneIteration) run concurrently with one another. 166 private final ReentrantLock lock = new ReentrantLock(); 167 168 private final Runnable task = new Runnable() { 169 @Override public void run() { 170 lock.lock(); 171 try { 172 AbstractScheduledService.this.runOneIteration(); 173 } catch (Throwable t) { 174 try { 175 shutDown(); 176 } catch (Exception ignored) { 177 logger.log(Level.WARNING, 178 "Error while attempting to shut down the service after failure.", ignored); 179 } 180 notifyFailed(t); 181 throw Throwables.propagate(t); 182 } finally { 183 lock.unlock(); 184 } 185 } 186 }; 187 188 @Override protected final void doStart() { 189 executorService = MoreExecutors.renamingDecorator(executor(), new Supplier<String>() { 190 @Override public String get() { 191 return serviceName() + " " + state(); 192 } 193 }); 194 executorService.execute(new Runnable() { 195 @Override public void run() { 196 lock.lock(); 197 try { 198 startUp(); 199 runningTask = scheduler().schedule(delegate, executorService, task); 200 notifyStarted(); 201 } catch (Throwable t) { 202 notifyFailed(t); 203 throw Throwables.propagate(t); 204 } finally { 205 lock.unlock(); 206 } 207 } 208 }); 209 } 210 211 @Override protected final void doStop() { 212 runningTask.cancel(false); 213 executorService.execute(new Runnable() { 214 @Override public void run() { 215 try { 216 lock.lock(); 217 try { 218 if (state() != State.STOPPING) { 219 // This means that the state has changed since we were scheduled. This implies that 220 // an execution of runOneIteration has thrown an exception and we have transitioned 221 // to a failed state, also this means that shutDown has already been called, so we 222 // do not want to call it again. 223 return; 224 } 225 shutDown(); 226 } finally { 227 lock.unlock(); 228 } 229 notifyStopped(); 230 } catch (Throwable t) { 231 notifyFailed(t); 232 throw Throwables.propagate(t); 233 } 234 } 235 }); 236 } 237 }; 238 239 /** Constructor for use by subclasses. */ 240 protected AbstractScheduledService() {} 241 242 /** 243 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 244 * the service will transition to the {@link Service.State#FAILED} state and this method will no 245 * longer be called. 246 */ 247 protected abstract void runOneIteration() throws Exception; 248 249 /** 250 * Start the service. 251 * 252 * <p>By default this method does nothing. 253 */ 254 protected void startUp() throws Exception {} 255 256 /** 257 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 258 * 259 * <p>By default this method does nothing. 260 */ 261 protected void shutDown() throws Exception {} 262 263 /** 264 * Returns the {@link Scheduler} object used to configure this service. This method will only be 265 * called once. 266 */ 267 protected abstract Scheduler scheduler(); 268 269 /** 270 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 271 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 272 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this 273 * service {@linkplain Service.State#TERMINATED terminates} or 274 * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a 275 * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called 276 * once. 277 * 278 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 279 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. 280 * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the 281 * service {@linkplain Service.State#TERMINATED terminates} or 282 * {@linkplain Service.State#TERMINATED fails}. 283 */ 284 protected ScheduledExecutorService executor() { 285 final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( 286 new ThreadFactory() { 287 @Override public Thread newThread(Runnable runnable) { 288 return MoreExecutors.newThread(serviceName(), runnable); 289 } 290 }); 291 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 292 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 293 // Technically this listener is added after start() was called so it is a little gross, but it 294 // is called within doStart() so we know that the service cannot terminate or fail concurrently 295 // with adding this listener so it is impossible to miss an event that we are interested in. 296 addListener(new Listener() { 297 @Override public void terminated(State from) { 298 executor.shutdown(); 299 } 300 @Override public void failed(State from, Throwable failure) { 301 executor.shutdown(); 302 }}, MoreExecutors.sameThreadExecutor()); 303 return executor; 304 } 305 306 /** 307 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 308 * debugging output. 309 * 310 * @since 14.0 311 */ 312 protected String serviceName() { 313 return getClass().getSimpleName(); 314 } 315 316 @Override public String toString() { 317 return serviceName() + " [" + state() + "]"; 318 } 319 320 // We override instead of using ForwardingService so that these can be final. 321 322 @Deprecated 323 @Override 324 public final ListenableFuture<State> start() { 325 return delegate.start(); 326 } 327 328 @Deprecated 329 @Override 330 public final State startAndWait() { 331 return delegate.startAndWait(); 332 } 333 334 @Override public final boolean isRunning() { 335 return delegate.isRunning(); 336 } 337 338 @Override public final State state() { 339 return delegate.state(); 340 } 341 342 @Deprecated 343 @Override 344 public final ListenableFuture<State> stop() { 345 return delegate.stop(); 346 } 347 348 @Deprecated 349 @Override 350 public final State stopAndWait() { 351 return delegate.stopAndWait(); 352 } 353 354 /** 355 * @since 13.0 356 */ 357 @Override public final void addListener(Listener listener, Executor executor) { 358 delegate.addListener(listener, executor); 359 } 360 361 /** 362 * @since 14.0 363 */ 364 @Override public final Throwable failureCause() { 365 return delegate.failureCause(); 366 } 367 368 /** 369 * @since 15.0 370 */ 371 @Override public final Service startAsync() { 372 delegate.startAsync(); 373 return this; 374 } 375 376 /** 377 * @since 15.0 378 */ 379 @Override public final Service stopAsync() { 380 delegate.stopAsync(); 381 return this; 382 } 383 384 /** 385 * @since 15.0 386 */ 387 @Override public final void awaitRunning() { 388 delegate.awaitRunning(); 389 } 390 391 /** 392 * @since 15.0 393 */ 394 @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 395 delegate.awaitRunning(timeout, unit); 396 } 397 398 /** 399 * @since 15.0 400 */ 401 @Override public final void awaitTerminated() { 402 delegate.awaitTerminated(); 403 } 404 405 /** 406 * @since 15.0 407 */ 408 @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 409 delegate.awaitTerminated(timeout, unit); 410 } 411 412 /** 413 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 414 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't 415 * been cancelled, the {@link #getNextSchedule} method will be called. 416 * 417 * @author Luke Sandberg 418 * @since 11.0 419 */ 420 @Beta 421 public abstract static class CustomScheduler extends Scheduler { 422 423 /** 424 * A callable class that can reschedule itself using a {@link CustomScheduler}. 425 */ 426 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 427 428 /** The underlying task. */ 429 private final Runnable wrappedRunnable; 430 431 /** The executor on which this Callable will be scheduled. */ 432 private final ScheduledExecutorService executor; 433 434 /** 435 * The service that is managing this callable. This is used so that failure can be 436 * reported properly. 437 */ 438 private final AbstractService service; 439 440 /** 441 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 442 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 443 * ensure that it is assigned atomically with being scheduled. 444 */ 445 private final ReentrantLock lock = new ReentrantLock(); 446 447 /** The future that represents the next execution of this task.*/ 448 @GuardedBy("lock") 449 private Future<Void> currentFuture; 450 451 ReschedulableCallable(AbstractService service, ScheduledExecutorService executor, 452 Runnable runnable) { 453 this.wrappedRunnable = runnable; 454 this.executor = executor; 455 this.service = service; 456 } 457 458 @Override 459 public Void call() throws Exception { 460 wrappedRunnable.run(); 461 reschedule(); 462 return null; 463 } 464 465 /** 466 * Atomically reschedules this task and assigns the new future to {@link #currentFuture}. 467 */ 468 public void reschedule() { 469 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 470 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 471 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 472 // correct order. 473 lock.lock(); 474 try { 475 if (currentFuture == null || !currentFuture.isCancelled()) { 476 final Schedule schedule = CustomScheduler.this.getNextSchedule(); 477 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 478 } 479 } catch (Throwable e) { 480 // If an exception is thrown by the subclass then we need to make sure that the service 481 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 482 // because the service does not monitor the state of the future so if the exception is not 483 // caught and forwarded to the service the task would stop executing but the service would 484 // have no idea. 485 service.notifyFailed(e); 486 } finally { 487 lock.unlock(); 488 } 489 } 490 491 // N.B. Only protect cancel and isCancelled because those are the only methods that are 492 // invoked by the AbstractScheduledService. 493 @Override 494 public boolean cancel(boolean mayInterruptIfRunning) { 495 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 496 lock.lock(); 497 try { 498 return currentFuture.cancel(mayInterruptIfRunning); 499 } finally { 500 lock.unlock(); 501 } 502 } 503 504 @Override 505 protected Future<Void> delegate() { 506 throw new UnsupportedOperationException("Only cancel is supported by this future"); 507 } 508 } 509 510 @Override 511 final Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 512 Runnable runnable) { 513 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 514 task.reschedule(); 515 return task; 516 } 517 518 /** 519 * A value object that represents an absolute delay until a task should be invoked. 520 * 521 * @author Luke Sandberg 522 * @since 11.0 523 */ 524 @Beta 525 protected static final class Schedule { 526 527 private final long delay; 528 private final TimeUnit unit; 529 530 /** 531 * @param delay the time from now to delay execution 532 * @param unit the time unit of the delay parameter 533 */ 534 public Schedule(long delay, TimeUnit unit) { 535 this.delay = delay; 536 this.unit = Preconditions.checkNotNull(unit); 537 } 538 } 539 540 /** 541 * Calculates the time at which to next invoke the task. 542 * 543 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 544 * on the same thread as the previous execution of {@link 545 * AbstractScheduledService#runOneIteration}. 546 * 547 * @return a schedule that defines the delay before the next execution. 548 */ 549 protected abstract Schedule getNextSchedule() throws Exception; 550 } 551}