001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Preconditions.checkArgument; 020import static com.google.common.base.Preconditions.checkNotNull; 021import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 022 023import com.google.common.annotations.Beta; 024import com.google.common.base.Supplier; 025import com.google.j2objc.annotations.WeakOuter; 026 027import java.util.concurrent.Callable; 028import java.util.concurrent.Executor; 029import java.util.concurrent.Executors; 030import java.util.concurrent.Future; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.TimeoutException; 035import java.util.concurrent.locks.ReentrantLock; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039import javax.annotation.concurrent.GuardedBy; 040 041/** 042 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 043 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 044 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 045 * 046 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 047 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 048 * {@link #runOneIteration} that will be executed periodically as specified by its 049 * {@link Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the 050 * periodic task (but not interrupt it) and wait for it to stop before running the 051 * {@link #shutDown} method. 052 * 053 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 054 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 055 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 056 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely 057 * modify shared state without additional synchronization necessary for visibility to later 058 * executions of the life cycle methods. 059 * 060 * <h3>Usage Example</h3> 061 * 062 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 063 * rate limit itself. <pre> {@code 064 * class CrawlingService extends AbstractScheduledService { 065 * private Set<Uri> visited; 066 * private Queue<Uri> toCrawl; 067 * protected void startUp() throws Exception { 068 * toCrawl = readStartingUris(); 069 * } 070 * 071 * protected void runOneIteration() throws Exception { 072 * Uri uri = toCrawl.remove(); 073 * Collection<Uri> newUris = crawl(uri); 074 * visited.add(uri); 075 * for (Uri newUri : newUris) { 076 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 077 * } 078 * } 079 * 080 * protected void shutDown() throws Exception { 081 * saveUris(toCrawl); 082 * } 083 * 084 * protected Scheduler scheduler() { 085 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 086 * } 087 * }}</pre> 088 * 089 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 090 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 091 * rate limit the number of queries we perform. 092 * 093 * @author Luke Sandberg 094 * @since 11.0 095 */ 096@Beta 097public abstract class AbstractScheduledService implements Service { 098 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 099 100 /** 101 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 102 * task. 103 * 104 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 105 * methods, these provide {@link Scheduler} instances for the common use case of running the 106 * service with a fixed schedule. If more flexibility is needed then consider subclassing 107 * {@link CustomScheduler}. 108 * 109 * @author Luke Sandberg 110 * @since 11.0 111 */ 112 public abstract static class Scheduler { 113 /** 114 * Returns a {@link Scheduler} that schedules the task using the 115 * {@link ScheduledExecutorService#scheduleWithFixedDelay} method. 116 * 117 * @param initialDelay the time to delay first execution 118 * @param delay the delay between the termination of one execution and the commencement of the 119 * next 120 * @param unit the time unit of the initialDelay and delay parameters 121 */ 122 public static Scheduler newFixedDelaySchedule(final long initialDelay, final long delay, 123 final TimeUnit unit) { 124 checkNotNull(unit); 125 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 126 return new Scheduler() { 127 @Override 128 public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 129 Runnable task) { 130 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 131 } 132 }; 133 } 134 135 /** 136 * Returns a {@link Scheduler} that schedules the task using the 137 * {@link ScheduledExecutorService#scheduleAtFixedRate} method. 138 * 139 * @param initialDelay the time to delay first execution 140 * @param period the period between successive executions of the task 141 * @param unit the time unit of the initialDelay and period parameters 142 */ 143 public static Scheduler newFixedRateSchedule(final long initialDelay, final long period, 144 final TimeUnit unit) { 145 checkNotNull(unit); 146 checkArgument(period > 0, "period must be > 0, found %s", period); 147 return new Scheduler() { 148 @Override 149 public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 150 Runnable task) { 151 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 152 } 153 }; 154 } 155 156 /** Schedules the task to run on the provided executor on behalf of the service. */ 157 abstract Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 158 Runnable runnable); 159 160 private Scheduler() {} 161 } 162 163 /* use AbstractService for state management */ 164 private final AbstractService delegate = new ServiceDelegate(); 165 166 @WeakOuter 167 private final class ServiceDelegate extends AbstractService { 168 169 // A handle to the running task so that we can stop it when a shutdown has been requested. 170 // These two fields are volatile because their values will be accessed from multiple threads. 171 private volatile Future<?> runningTask; 172 private volatile ScheduledExecutorService executorService; 173 174 // This lock protects the task so we can ensure that none of the template methods (startUp, 175 // shutDown or runOneIteration) run concurrently with one another. 176 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 177 // lock. 178 private final ReentrantLock lock = new ReentrantLock(); 179 180 @WeakOuter 181 class Task implements Runnable { 182 @Override public void run() { 183 lock.lock(); 184 try { 185 if (runningTask.isCancelled()) { 186 // task may have been cancelled while blocked on the lock. 187 return; 188 } 189 AbstractScheduledService.this.runOneIteration(); 190 } catch (Throwable t) { 191 try { 192 shutDown(); 193 } catch (Exception ignored) { 194 logger.log(Level.WARNING, 195 "Error while attempting to shut down the service after failure.", ignored); 196 } 197 notifyFailed(t); 198 runningTask.cancel(false); // prevent future invocations. 199 } finally { 200 lock.unlock(); 201 } 202 } 203 } 204 205 private final Runnable task = new Task(); 206 207 @Override protected final void doStart() { 208 executorService = MoreExecutors.renamingDecorator(executor(), new Supplier<String>() { 209 @Override public String get() { 210 return serviceName() + " " + state(); 211 } 212 }); 213 executorService.execute(new Runnable() { 214 @Override public void run() { 215 lock.lock(); 216 try { 217 startUp(); 218 runningTask = scheduler().schedule(delegate, executorService, task); 219 notifyStarted(); 220 } catch (Throwable t) { 221 notifyFailed(t); 222 if (runningTask != null) { 223 // prevent the task from running if possible 224 runningTask.cancel(false); 225 } 226 } finally { 227 lock.unlock(); 228 } 229 } 230 }); 231 } 232 233 @Override protected final void doStop() { 234 runningTask.cancel(false); 235 executorService.execute(new Runnable() { 236 @Override public void run() { 237 try { 238 lock.lock(); 239 try { 240 if (state() != State.STOPPING) { 241 // This means that the state has changed since we were scheduled. This implies that 242 // an execution of runOneIteration has thrown an exception and we have transitioned 243 // to a failed state, also this means that shutDown has already been called, so we 244 // do not want to call it again. 245 return; 246 } 247 shutDown(); 248 } finally { 249 lock.unlock(); 250 } 251 notifyStopped(); 252 } catch (Throwable t) { 253 notifyFailed(t); 254 } 255 } 256 }); 257 } 258 259 @Override public String toString() { 260 return AbstractScheduledService.this.toString(); 261 } 262 } 263 264 /** Constructor for use by subclasses. */ 265 protected AbstractScheduledService() {} 266 267 /** 268 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 269 * the service will transition to the {@link Service.State#FAILED} state and this method will no 270 * longer be called. 271 */ 272 protected abstract void runOneIteration() throws Exception; 273 274 /** 275 * Start the service. 276 * 277 * <p>By default this method does nothing. 278 */ 279 protected void startUp() throws Exception {} 280 281 /** 282 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 283 * 284 * <p>By default this method does nothing. 285 */ 286 protected void shutDown() throws Exception {} 287 288 /** 289 * Returns the {@link Scheduler} object used to configure this service. This method will only be 290 * called once. 291 */ 292 protected abstract Scheduler scheduler(); 293 294 /** 295 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 296 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 297 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this 298 * service {@linkplain Service.State#TERMINATED terminates} or 299 * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a 300 * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called 301 * once. 302 * 303 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 304 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. 305 * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the 306 * service {@linkplain Service.State#TERMINATED terminates} or 307 * {@linkplain Service.State#TERMINATED fails}. 308 */ 309 protected ScheduledExecutorService executor() { 310 @WeakOuter class ThreadFactoryImpl implements ThreadFactory { 311 @Override public Thread newThread(Runnable runnable) { 312 return MoreExecutors.newThread(serviceName(), runnable); 313 } 314 } 315 final ScheduledExecutorService executor = 316 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 317 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 318 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 319 // Technically this listener is added after start() was called so it is a little gross, but it 320 // is called within doStart() so we know that the service cannot terminate or fail concurrently 321 // with adding this listener so it is impossible to miss an event that we are interested in. 322 addListener(new Listener() { 323 @Override public void terminated(State from) { 324 executor.shutdown(); 325 } 326 @Override public void failed(State from, Throwable failure) { 327 executor.shutdown(); 328 } 329 }, directExecutor()); 330 return executor; 331 } 332 333 /** 334 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 335 * debugging output. 336 * 337 * @since 14.0 338 */ 339 protected String serviceName() { 340 return getClass().getSimpleName(); 341 } 342 343 @Override public String toString() { 344 return serviceName() + " [" + state() + "]"; 345 } 346 347 @Override public final boolean isRunning() { 348 return delegate.isRunning(); 349 } 350 351 @Override public final State state() { 352 return delegate.state(); 353 } 354 355 /** 356 * @since 13.0 357 */ 358 @Override public final void addListener(Listener listener, Executor executor) { 359 delegate.addListener(listener, executor); 360 } 361 362 /** 363 * @since 14.0 364 */ 365 @Override public final Throwable failureCause() { 366 return delegate.failureCause(); 367 } 368 369 /** 370 * @since 15.0 371 */ 372 @Override public final Service startAsync() { 373 delegate.startAsync(); 374 return this; 375 } 376 377 /** 378 * @since 15.0 379 */ 380 @Override public final Service stopAsync() { 381 delegate.stopAsync(); 382 return this; 383 } 384 385 /** 386 * @since 15.0 387 */ 388 @Override public final void awaitRunning() { 389 delegate.awaitRunning(); 390 } 391 392 /** 393 * @since 15.0 394 */ 395 @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 396 delegate.awaitRunning(timeout, unit); 397 } 398 399 /** 400 * @since 15.0 401 */ 402 @Override public final void awaitTerminated() { 403 delegate.awaitTerminated(); 404 } 405 406 /** 407 * @since 15.0 408 */ 409 @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 410 delegate.awaitTerminated(timeout, unit); 411 } 412 413 /** 414 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 415 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't 416 * been cancelled, the {@link #getNextSchedule} method will be called. 417 * 418 * @author Luke Sandberg 419 * @since 11.0 420 */ 421 @Beta 422 public abstract static class CustomScheduler extends Scheduler { 423 424 /** 425 * A callable class that can reschedule itself using a {@link CustomScheduler}. 426 */ 427 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 428 429 /** The underlying task. */ 430 private final Runnable wrappedRunnable; 431 432 /** The executor on which this Callable will be scheduled. */ 433 private final ScheduledExecutorService executor; 434 435 /** 436 * The service that is managing this callable. This is used so that failure can be 437 * reported properly. 438 */ 439 private final AbstractService service; 440 441 /** 442 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 443 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 444 * ensure that it is assigned atomically with being scheduled. 445 */ 446 private final ReentrantLock lock = new ReentrantLock(); 447 448 /** The future that represents the next execution of this task.*/ 449 @GuardedBy("lock") 450 private Future<Void> currentFuture; 451 452 ReschedulableCallable(AbstractService service, ScheduledExecutorService executor, 453 Runnable runnable) { 454 this.wrappedRunnable = runnable; 455 this.executor = executor; 456 this.service = service; 457 } 458 459 @Override 460 public Void call() throws Exception { 461 wrappedRunnable.run(); 462 reschedule(); 463 return null; 464 } 465 466 /** 467 * Atomically reschedules this task and assigns the new future to {@link #currentFuture}. 468 */ 469 public void reschedule() { 470 // invoke the callback outside the lock, prevents some shenanigans. 471 Schedule schedule; 472 try { 473 schedule = CustomScheduler.this.getNextSchedule(); 474 } catch (Throwable t) { 475 service.notifyFailed(t); 476 return; 477 } 478 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 479 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 480 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 481 // correct order. 482 Throwable scheduleFailure = null; 483 lock.lock(); 484 try { 485 if (currentFuture == null || !currentFuture.isCancelled()) { 486 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 487 } 488 } catch (Throwable e) { 489 // If an exception is thrown by the subclass then we need to make sure that the service 490 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 491 // because the service does not monitor the state of the future so if the exception is not 492 // caught and forwarded to the service the task would stop executing but the service would 493 // have no idea. 494 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 495 // the AbstractService could monitor the future directly. Rescheduling is still hard... 496 // but it would help with some of these lock ordering issues. 497 scheduleFailure = e; 498 } finally { 499 lock.unlock(); 500 } 501 // Call notifyFailed outside the lock to avoid lock ordering issues. 502 if (scheduleFailure != null) { 503 service.notifyFailed(scheduleFailure); 504 } 505 } 506 507 // N.B. Only protect cancel and isCancelled because those are the only methods that are 508 // invoked by the AbstractScheduledService. 509 @Override 510 public boolean cancel(boolean mayInterruptIfRunning) { 511 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 512 lock.lock(); 513 try { 514 return currentFuture.cancel(mayInterruptIfRunning); 515 } finally { 516 lock.unlock(); 517 } 518 } 519 520 @Override 521 public boolean isCancelled() { 522 lock.lock(); 523 try { 524 return currentFuture.isCancelled(); 525 } finally { 526 lock.unlock(); 527 } 528 } 529 530 @Override 531 protected Future<Void> delegate() { 532 throw new UnsupportedOperationException( 533 "Only cancel and isCancelled is supported by this future"); 534 } 535 } 536 537 @Override 538 final Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 539 Runnable runnable) { 540 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 541 task.reschedule(); 542 return task; 543 } 544 545 /** 546 * A value object that represents an absolute delay until a task should be invoked. 547 * 548 * @author Luke Sandberg 549 * @since 11.0 550 */ 551 @Beta 552 protected static final class Schedule { 553 554 private final long delay; 555 private final TimeUnit unit; 556 557 /** 558 * @param delay the time from now to delay execution 559 * @param unit the time unit of the delay parameter 560 */ 561 public Schedule(long delay, TimeUnit unit) { 562 this.delay = delay; 563 this.unit = checkNotNull(unit); 564 } 565 } 566 567 /** 568 * Calculates the time at which to next invoke the task. 569 * 570 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 571 * on the same thread as the previous execution of {@link 572 * AbstractScheduledService#runOneIteration}. 573 * 574 * @return a schedule that defines the delay before the next execution. 575 */ 576 protected abstract Schedule getNextSchedule() throws Exception; 577 } 578}