001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020 021import com.google.common.annotations.Beta; 022import com.google.common.annotations.GwtIncompatible; 023import com.google.common.base.Supplier; 024import com.google.errorprone.annotations.CanIgnoreReturnValue; 025import com.google.j2objc.annotations.WeakOuter; 026import java.util.concurrent.Callable; 027import java.util.concurrent.Executor; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.ScheduledExecutorService; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.TimeoutException; 034import java.util.concurrent.locks.ReentrantLock; 035import java.util.logging.Level; 036import java.util.logging.Logger; 037import javax.annotation.concurrent.GuardedBy; 038 039/** 040 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 041 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 042 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 043 * 044 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 045 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 046 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 047 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 048 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 049 * 050 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 051 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 052 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 053 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 054 * shared state without additional synchronization necessary for visibility to later executions of 055 * the life cycle methods. 056 * 057 * <h3>Usage Example</h3> 058 * 059 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 060 * rate limit itself. <pre> {@code 061 * class CrawlingService extends AbstractScheduledService { 062 * private Set<Uri> visited; 063 * private Queue<Uri> toCrawl; 064 * protected void startUp() throws Exception { 065 * toCrawl = readStartingUris(); 066 * } 067 * 068 * protected void runOneIteration() throws Exception { 069 * Uri uri = toCrawl.remove(); 070 * Collection<Uri> newUris = crawl(uri); 071 * visited.add(uri); 072 * for (Uri newUri : newUris) { 073 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 074 * } 075 * } 076 * 077 * protected void shutDown() throws Exception { 078 * saveUris(toCrawl); 079 * } 080 * 081 * protected Scheduler scheduler() { 082 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 083 * } 084 * }}</pre> 085 * 086 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 087 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 088 * rate limit the number of queries we perform. 089 * 090 * @author Luke Sandberg 091 * @since 11.0 092 */ 093@Beta 094@GwtIncompatible 095public abstract class AbstractScheduledService implements Service { 096 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 097 098 /** 099 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 100 * task. 101 * 102 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 103 * methods, these provide {@link Scheduler} instances for the common use case of running the 104 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 105 * CustomScheduler}. 106 * 107 * @author Luke Sandberg 108 * @since 11.0 109 */ 110 public abstract static class Scheduler { 111 /** 112 * Returns a {@link Scheduler} that schedules the task using the {@link 113 * ScheduledExecutorService#scheduleWithFixedDelay} method. 114 * 115 * @param initialDelay the time to delay first execution 116 * @param delay the delay between the termination of one execution and the commencement of the 117 * next 118 * @param unit the time unit of the initialDelay and delay parameters 119 */ 120 public static Scheduler newFixedDelaySchedule( 121 final long initialDelay, final long delay, final TimeUnit unit) { 122 checkNotNull(unit); 123 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 124 return new Scheduler() { 125 @Override 126 public Future<?> schedule( 127 AbstractService service, ScheduledExecutorService executor, Runnable task) { 128 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 129 } 130 }; 131 } 132 133 /** 134 * Returns a {@link Scheduler} that schedules the task using the {@link 135 * ScheduledExecutorService#scheduleAtFixedRate} method. 136 * 137 * @param initialDelay the time to delay first execution 138 * @param period the period between successive executions of the task 139 * @param unit the time unit of the initialDelay and period parameters 140 */ 141 public static Scheduler newFixedRateSchedule( 142 final long initialDelay, final long period, final TimeUnit unit) { 143 checkNotNull(unit); 144 checkArgument(period > 0, "period must be > 0, found %s", period); 145 return new Scheduler() { 146 @Override 147 public Future<?> schedule( 148 AbstractService service, ScheduledExecutorService executor, Runnable task) { 149 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 150 } 151 }; 152 } 153 154 /** Schedules the task to run on the provided executor on behalf of the service. */ 155 abstract Future<?> schedule( 156 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 157 158 private Scheduler() {} 159 } 160 161 /* use AbstractService for state management */ 162 private final AbstractService delegate = new ServiceDelegate(); 163 164 @WeakOuter 165 private final class ServiceDelegate extends AbstractService { 166 167 // A handle to the running task so that we can stop it when a shutdown has been requested. 168 // These two fields are volatile because their values will be accessed from multiple threads. 169 private volatile Future<?> runningTask; 170 private volatile ScheduledExecutorService executorService; 171 172 // This lock protects the task so we can ensure that none of the template methods (startUp, 173 // shutDown or runOneIteration) run concurrently with one another. 174 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 175 // lock. 176 private final ReentrantLock lock = new ReentrantLock(); 177 178 @WeakOuter 179 class Task implements Runnable { 180 @Override 181 public void run() { 182 lock.lock(); 183 try { 184 if (runningTask.isCancelled()) { 185 // task may have been cancelled while blocked on the lock. 186 return; 187 } 188 AbstractScheduledService.this.runOneIteration(); 189 } catch (Throwable t) { 190 try { 191 shutDown(); 192 } catch (Exception ignored) { 193 logger.log( 194 Level.WARNING, 195 "Error while attempting to shut down the service after failure.", 196 ignored); 197 } 198 notifyFailed(t); 199 runningTask.cancel(false); // prevent future invocations. 200 } finally { 201 lock.unlock(); 202 } 203 } 204 } 205 206 private final Runnable task = new Task(); 207 208 @Override 209 protected final void doStart() { 210 executorService = 211 MoreExecutors.renamingDecorator( 212 executor(), 213 new Supplier<String>() { 214 @Override 215 public String get() { 216 return serviceName() + " " + state(); 217 } 218 }); 219 executorService.execute( 220 new Runnable() { 221 @Override 222 public void run() { 223 lock.lock(); 224 try { 225 startUp(); 226 runningTask = scheduler().schedule(delegate, executorService, task); 227 notifyStarted(); 228 } catch (Throwable t) { 229 notifyFailed(t); 230 if (runningTask != null) { 231 // prevent the task from running if possible 232 runningTask.cancel(false); 233 } 234 } finally { 235 lock.unlock(); 236 } 237 } 238 }); 239 } 240 241 @Override 242 protected final void doStop() { 243 runningTask.cancel(false); 244 executorService.execute( 245 new Runnable() { 246 @Override 247 public void run() { 248 try { 249 lock.lock(); 250 try { 251 if (state() != State.STOPPING) { 252 // This means that the state has changed since we were scheduled. This implies 253 // that an execution of runOneIteration has thrown an exception and we have 254 // transitioned to a failed state, also this means that shutDown has already 255 // been called, so we do not want to call it again. 256 return; 257 } 258 shutDown(); 259 } finally { 260 lock.unlock(); 261 } 262 notifyStopped(); 263 } catch (Throwable t) { 264 notifyFailed(t); 265 } 266 } 267 }); 268 } 269 270 @Override 271 public String toString() { 272 return AbstractScheduledService.this.toString(); 273 } 274 } 275 276 /** Constructor for use by subclasses. */ 277 protected AbstractScheduledService() {} 278 279 /** 280 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 281 * the service will transition to the {@link Service.State#FAILED} state and this method will no 282 * longer be called. 283 */ 284 protected abstract void runOneIteration() throws Exception; 285 286 /** 287 * Start the service. 288 * 289 * <p>By default this method does nothing. 290 */ 291 protected void startUp() throws Exception {} 292 293 /** 294 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 295 * 296 * <p>By default this method does nothing. 297 */ 298 protected void shutDown() throws Exception {} 299 300 /** 301 * Returns the {@link Scheduler} object used to configure this service. This method will only be 302 * called once. 303 */ 304 // TODO(cpovirk): @ForOverride 305 protected abstract Scheduler scheduler(); 306 307 /** 308 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 309 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 310 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 311 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 312 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 313 * instance. This method is guaranteed to only be called once. 314 * 315 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 316 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, 317 * the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 318 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 319 * fails}. 320 */ 321 protected ScheduledExecutorService executor() { 322 @WeakOuter 323 class ThreadFactoryImpl implements ThreadFactory { 324 @Override 325 public Thread newThread(Runnable runnable) { 326 return MoreExecutors.newThread(serviceName(), runnable); 327 } 328 } 329 final ScheduledExecutorService executor = 330 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 331 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 332 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 333 // Technically this listener is added after start() was called so it is a little gross, but it 334 // is called within doStart() so we know that the service cannot terminate or fail concurrently 335 // with adding this listener so it is impossible to miss an event that we are interested in. 336 addListener( 337 new Listener() { 338 @Override 339 public void terminated(State from) { 340 executor.shutdown(); 341 } 342 343 @Override 344 public void failed(State from, Throwable failure) { 345 executor.shutdown(); 346 } 347 }, 348 directExecutor()); 349 return executor; 350 } 351 352 /** 353 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 354 * debugging output. 355 * 356 * @since 14.0 357 */ 358 protected String serviceName() { 359 return getClass().getSimpleName(); 360 } 361 362 @Override 363 public String toString() { 364 return serviceName() + " [" + state() + "]"; 365 } 366 367 @Override 368 public final boolean isRunning() { 369 return delegate.isRunning(); 370 } 371 372 @Override 373 public final State state() { 374 return delegate.state(); 375 } 376 377 /** 378 * @since 13.0 379 */ 380 @Override 381 public final void addListener(Listener listener, Executor executor) { 382 delegate.addListener(listener, executor); 383 } 384 385 /** 386 * @since 14.0 387 */ 388 @Override 389 public final Throwable failureCause() { 390 return delegate.failureCause(); 391 } 392 393 /** 394 * @since 15.0 395 */ 396 @CanIgnoreReturnValue 397 @Override 398 public final Service startAsync() { 399 delegate.startAsync(); 400 return this; 401 } 402 403 /** 404 * @since 15.0 405 */ 406 @CanIgnoreReturnValue 407 @Override 408 public final Service stopAsync() { 409 delegate.stopAsync(); 410 return this; 411 } 412 413 /** 414 * @since 15.0 415 */ 416 @Override 417 public final void awaitRunning() { 418 delegate.awaitRunning(); 419 } 420 421 /** 422 * @since 15.0 423 */ 424 @Override 425 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 426 delegate.awaitRunning(timeout, unit); 427 } 428 429 /** 430 * @since 15.0 431 */ 432 @Override 433 public final void awaitTerminated() { 434 delegate.awaitTerminated(); 435 } 436 437 /** 438 * @since 15.0 439 */ 440 @Override 441 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 442 delegate.awaitTerminated(timeout, unit); 443 } 444 445 /** 446 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 447 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 448 * cancelled, the {@link #getNextSchedule} method will be called. 449 * 450 * @author Luke Sandberg 451 * @since 11.0 452 */ 453 @Beta 454 public abstract static class CustomScheduler extends Scheduler { 455 456 /** 457 * A callable class that can reschedule itself using a {@link CustomScheduler}. 458 */ 459 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 460 461 /** The underlying task. */ 462 private final Runnable wrappedRunnable; 463 464 /** The executor on which this Callable will be scheduled. */ 465 private final ScheduledExecutorService executor; 466 467 /** 468 * The service that is managing this callable. This is used so that failure can be reported 469 * properly. 470 */ 471 private final AbstractService service; 472 473 /** 474 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 475 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 476 * ensure that it is assigned atomically with being scheduled. 477 */ 478 private final ReentrantLock lock = new ReentrantLock(); 479 480 /** The future that represents the next execution of this task. */ 481 @GuardedBy("lock") 482 private Future<Void> currentFuture; 483 484 ReschedulableCallable( 485 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 486 this.wrappedRunnable = runnable; 487 this.executor = executor; 488 this.service = service; 489 } 490 491 @Override 492 public Void call() throws Exception { 493 wrappedRunnable.run(); 494 reschedule(); 495 return null; 496 } 497 498 /** 499 * Atomically reschedules this task and assigns the new future to {@link #currentFuture}. 500 */ 501 public void reschedule() { 502 // invoke the callback outside the lock, prevents some shenanigans. 503 Schedule schedule; 504 try { 505 schedule = CustomScheduler.this.getNextSchedule(); 506 } catch (Throwable t) { 507 service.notifyFailed(t); 508 return; 509 } 510 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 511 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 512 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 513 // correct order. 514 Throwable scheduleFailure = null; 515 lock.lock(); 516 try { 517 if (currentFuture == null || !currentFuture.isCancelled()) { 518 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 519 } 520 } catch (Throwable e) { 521 // If an exception is thrown by the subclass then we need to make sure that the service 522 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 523 // because the service does not monitor the state of the future so if the exception is not 524 // caught and forwarded to the service the task would stop executing but the service would 525 // have no idea. 526 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 527 // the AbstractService could monitor the future directly. Rescheduling is still hard... 528 // but it would help with some of these lock ordering issues. 529 scheduleFailure = e; 530 } finally { 531 lock.unlock(); 532 } 533 // Call notifyFailed outside the lock to avoid lock ordering issues. 534 if (scheduleFailure != null) { 535 service.notifyFailed(scheduleFailure); 536 } 537 } 538 539 // N.B. Only protect cancel and isCancelled because those are the only methods that are 540 // invoked by the AbstractScheduledService. 541 @Override 542 public boolean cancel(boolean mayInterruptIfRunning) { 543 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 544 lock.lock(); 545 try { 546 return currentFuture.cancel(mayInterruptIfRunning); 547 } finally { 548 lock.unlock(); 549 } 550 } 551 552 @Override 553 public boolean isCancelled() { 554 lock.lock(); 555 try { 556 return currentFuture.isCancelled(); 557 } finally { 558 lock.unlock(); 559 } 560 } 561 562 @Override 563 protected Future<Void> delegate() { 564 throw new UnsupportedOperationException( 565 "Only cancel and isCancelled is supported by this future"); 566 } 567 } 568 569 @Override 570 final Future<?> schedule( 571 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 572 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 573 task.reschedule(); 574 return task; 575 } 576 577 /** 578 * A value object that represents an absolute delay until a task should be invoked. 579 * 580 * @author Luke Sandberg 581 * @since 11.0 582 */ 583 @Beta 584 protected static final class Schedule { 585 586 private final long delay; 587 private final TimeUnit unit; 588 589 /** 590 * @param delay the time from now to delay execution 591 * @param unit the time unit of the delay parameter 592 */ 593 public Schedule(long delay, TimeUnit unit) { 594 this.delay = delay; 595 this.unit = checkNotNull(unit); 596 } 597 } 598 599 /** 600 * Calculates the time at which to next invoke the task. 601 * 602 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 603 * on the same thread as the previous execution of {@link 604 * AbstractScheduledService#runOneIteration}. 605 * 606 * @return a schedule that defines the delay before the next execution. 607 */ 608 // TODO(cpovirk): @ForOverride 609 protected abstract Schedule getNextSchedule() throws Exception; 610 } 611}