001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.Internal.toNanosSaturated; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021 022import com.google.common.annotations.GwtIncompatible; 023import com.google.common.base.Supplier; 024import com.google.errorprone.annotations.CanIgnoreReturnValue; 025import com.google.errorprone.annotations.concurrent.GuardedBy; 026import com.google.j2objc.annotations.WeakOuter; 027import java.time.Duration; 028import java.util.concurrent.Callable; 029import java.util.concurrent.Executor; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.ScheduledExecutorService; 033import java.util.concurrent.ThreadFactory; 034import java.util.concurrent.TimeUnit; 035import java.util.concurrent.TimeoutException; 036import java.util.concurrent.locks.ReentrantLock; 037import java.util.logging.Level; 038import java.util.logging.Logger; 039import org.checkerframework.checker.nullness.qual.Nullable; 040 041/** 042 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 043 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 044 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 045 * 046 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 047 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 048 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 049 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 050 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 051 * 052 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 053 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 054 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 055 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 056 * shared state without additional synchronization necessary for visibility to later executions of 057 * the life cycle methods. 058 * 059 * <h3>Usage Example</h3> 060 * 061 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 062 * rate limit itself. 063 * 064 * <pre>{@code 065 * class CrawlingService extends AbstractScheduledService { 066 * private Set<Uri> visited; 067 * private Queue<Uri> toCrawl; 068 * protected void startUp() throws Exception { 069 * toCrawl = readStartingUris(); 070 * } 071 * 072 * protected void runOneIteration() throws Exception { 073 * Uri uri = toCrawl.remove(); 074 * Collection<Uri> newUris = crawl(uri); 075 * visited.add(uri); 076 * for (Uri newUri : newUris) { 077 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 078 * } 079 * } 080 * 081 * protected void shutDown() throws Exception { 082 * saveUris(toCrawl); 083 * } 084 * 085 * protected Scheduler scheduler() { 086 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 087 * } 088 * } 089 * }</pre> 090 * 091 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 092 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 093 * rate limit the number of queries we perform. 094 * 095 * @author Luke Sandberg 096 * @since 11.0 097 */ 098@GwtIncompatible 099public abstract class AbstractScheduledService implements Service { 100 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 101 102 /** 103 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 104 * task. 105 * 106 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 107 * methods, these provide {@link Scheduler} instances for the common use case of running the 108 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 109 * CustomScheduler}. 110 * 111 * @author Luke Sandberg 112 * @since 11.0 113 */ 114 public abstract static class Scheduler { 115 /** 116 * Returns a {@link Scheduler} that schedules the task using the {@link 117 * ScheduledExecutorService#scheduleWithFixedDelay} method. 118 * 119 * @param initialDelay the time to delay first execution 120 * @param delay the delay between the termination of one execution and the commencement of the 121 * next 122 * @since 28.0 123 */ 124 public static Scheduler newFixedDelaySchedule(Duration initialDelay, Duration delay) { 125 return newFixedDelaySchedule( 126 toNanosSaturated(initialDelay), toNanosSaturated(delay), TimeUnit.NANOSECONDS); 127 } 128 129 /** 130 * Returns a {@link Scheduler} that schedules the task using the {@link 131 * ScheduledExecutorService#scheduleWithFixedDelay} method. 132 * 133 * @param initialDelay the time to delay first execution 134 * @param delay the delay between the termination of one execution and the commencement of the 135 * next 136 * @param unit the time unit of the initialDelay and delay parameters 137 */ 138 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 139 public static Scheduler newFixedDelaySchedule( 140 final long initialDelay, final long delay, final TimeUnit unit) { 141 checkNotNull(unit); 142 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 143 return new Scheduler() { 144 @Override 145 public Future<?> schedule( 146 AbstractService service, ScheduledExecutorService executor, Runnable task) { 147 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 148 } 149 }; 150 } 151 152 /** 153 * Returns a {@link Scheduler} that schedules the task using the {@link 154 * ScheduledExecutorService#scheduleAtFixedRate} method. 155 * 156 * @param initialDelay the time to delay first execution 157 * @param period the period between successive executions of the task 158 * @since 28.0 159 */ 160 public static Scheduler newFixedRateSchedule(Duration initialDelay, Duration period) { 161 return newFixedRateSchedule( 162 toNanosSaturated(initialDelay), toNanosSaturated(period), TimeUnit.NANOSECONDS); 163 } 164 165 /** 166 * Returns a {@link Scheduler} that schedules the task using the {@link 167 * ScheduledExecutorService#scheduleAtFixedRate} method. 168 * 169 * @param initialDelay the time to delay first execution 170 * @param period the period between successive executions of the task 171 * @param unit the time unit of the initialDelay and period parameters 172 */ 173 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 174 public static Scheduler newFixedRateSchedule( 175 final long initialDelay, final long period, final TimeUnit unit) { 176 checkNotNull(unit); 177 checkArgument(period > 0, "period must be > 0, found %s", period); 178 return new Scheduler() { 179 @Override 180 public Future<?> schedule( 181 AbstractService service, ScheduledExecutorService executor, Runnable task) { 182 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 183 } 184 }; 185 } 186 187 /** Schedules the task to run on the provided executor on behalf of the service. */ 188 abstract Future<?> schedule( 189 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 190 191 private Scheduler() {} 192 } 193 194 /* use AbstractService for state management */ 195 private final AbstractService delegate = new ServiceDelegate(); 196 197 @WeakOuter 198 private final class ServiceDelegate extends AbstractService { 199 200 // A handle to the running task so that we can stop it when a shutdown has been requested. 201 // These two fields are volatile because their values will be accessed from multiple threads. 202 private volatile @Nullable Future<?> runningTask; 203 private volatile @Nullable ScheduledExecutorService executorService; 204 205 // This lock protects the task so we can ensure that none of the template methods (startUp, 206 // shutDown or runOneIteration) run concurrently with one another. 207 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 208 // lock. 209 private final ReentrantLock lock = new ReentrantLock(); 210 211 @WeakOuter 212 class Task implements Runnable { 213 @Override 214 public void run() { 215 lock.lock(); 216 try { 217 if (runningTask.isCancelled()) { 218 // task may have been cancelled while blocked on the lock. 219 return; 220 } 221 AbstractScheduledService.this.runOneIteration(); 222 } catch (Throwable t) { 223 try { 224 shutDown(); 225 } catch (Exception ignored) { 226 logger.log( 227 Level.WARNING, 228 "Error while attempting to shut down the service after failure.", 229 ignored); 230 } 231 notifyFailed(t); 232 runningTask.cancel(false); // prevent future invocations. 233 } finally { 234 lock.unlock(); 235 } 236 } 237 } 238 239 private final Runnable task = new Task(); 240 241 @Override 242 protected final void doStart() { 243 executorService = 244 MoreExecutors.renamingDecorator( 245 executor(), 246 new Supplier<String>() { 247 @Override 248 public String get() { 249 return serviceName() + " " + state(); 250 } 251 }); 252 executorService.execute( 253 new Runnable() { 254 @Override 255 public void run() { 256 lock.lock(); 257 try { 258 startUp(); 259 runningTask = scheduler().schedule(delegate, executorService, task); 260 notifyStarted(); 261 } catch (Throwable t) { 262 notifyFailed(t); 263 if (runningTask != null) { 264 // prevent the task from running if possible 265 runningTask.cancel(false); 266 } 267 } finally { 268 lock.unlock(); 269 } 270 } 271 }); 272 } 273 274 @Override 275 protected final void doStop() { 276 runningTask.cancel(false); 277 executorService.execute( 278 new Runnable() { 279 @Override 280 public void run() { 281 try { 282 lock.lock(); 283 try { 284 if (state() != State.STOPPING) { 285 // This means that the state has changed since we were scheduled. This implies 286 // that an execution of runOneIteration has thrown an exception and we have 287 // transitioned to a failed state, also this means that shutDown has already 288 // been called, so we do not want to call it again. 289 return; 290 } 291 shutDown(); 292 } finally { 293 lock.unlock(); 294 } 295 notifyStopped(); 296 } catch (Throwable t) { 297 notifyFailed(t); 298 } 299 } 300 }); 301 } 302 303 @Override 304 public String toString() { 305 return AbstractScheduledService.this.toString(); 306 } 307 } 308 309 /** Constructor for use by subclasses. */ 310 protected AbstractScheduledService() {} 311 312 /** 313 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 314 * the service will transition to the {@link Service.State#FAILED} state and this method will no 315 * longer be called. 316 */ 317 protected abstract void runOneIteration() throws Exception; 318 319 /** 320 * Start the service. 321 * 322 * <p>By default this method does nothing. 323 */ 324 protected void startUp() throws Exception {} 325 326 /** 327 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 328 * 329 * <p>By default this method does nothing. 330 */ 331 protected void shutDown() throws Exception {} 332 333 /** 334 * Returns the {@link Scheduler} object used to configure this service. This method will only be 335 * called once. 336 */ 337 // TODO(cpovirk): @ForOverride 338 protected abstract Scheduler scheduler(); 339 340 /** 341 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 342 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 343 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 344 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 345 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 346 * instance. This method is guaranteed to only be called once. 347 * 348 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 349 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, 350 * the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 351 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 352 * fails}. 353 */ 354 protected ScheduledExecutorService executor() { 355 @WeakOuter 356 class ThreadFactoryImpl implements ThreadFactory { 357 @Override 358 public Thread newThread(Runnable runnable) { 359 return MoreExecutors.newThread(serviceName(), runnable); 360 } 361 } 362 final ScheduledExecutorService executor = 363 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 364 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 365 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 366 // Technically this listener is added after start() was called so it is a little gross, but it 367 // is called within doStart() so we know that the service cannot terminate or fail concurrently 368 // with adding this listener so it is impossible to miss an event that we are interested in. 369 addListener( 370 new Listener() { 371 @Override 372 public void terminated(State from) { 373 executor.shutdown(); 374 } 375 376 @Override 377 public void failed(State from, Throwable failure) { 378 executor.shutdown(); 379 } 380 }, 381 directExecutor()); 382 return executor; 383 } 384 385 /** 386 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 387 * debugging output. 388 * 389 * @since 14.0 390 */ 391 protected String serviceName() { 392 return getClass().getSimpleName(); 393 } 394 395 @Override 396 public String toString() { 397 return serviceName() + " [" + state() + "]"; 398 } 399 400 @Override 401 public final boolean isRunning() { 402 return delegate.isRunning(); 403 } 404 405 @Override 406 public final State state() { 407 return delegate.state(); 408 } 409 410 /** @since 13.0 */ 411 @Override 412 public final void addListener(Listener listener, Executor executor) { 413 delegate.addListener(listener, executor); 414 } 415 416 /** @since 14.0 */ 417 @Override 418 public final Throwable failureCause() { 419 return delegate.failureCause(); 420 } 421 422 /** @since 15.0 */ 423 @CanIgnoreReturnValue 424 @Override 425 public final Service startAsync() { 426 delegate.startAsync(); 427 return this; 428 } 429 430 /** @since 15.0 */ 431 @CanIgnoreReturnValue 432 @Override 433 public final Service stopAsync() { 434 delegate.stopAsync(); 435 return this; 436 } 437 438 /** @since 15.0 */ 439 @Override 440 public final void awaitRunning() { 441 delegate.awaitRunning(); 442 } 443 444 /** @since 28.0 */ 445 @Override 446 public final void awaitRunning(Duration timeout) throws TimeoutException { 447 Service.super.awaitRunning(timeout); 448 } 449 450 /** @since 15.0 */ 451 @Override 452 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 453 delegate.awaitRunning(timeout, unit); 454 } 455 456 /** @since 15.0 */ 457 @Override 458 public final void awaitTerminated() { 459 delegate.awaitTerminated(); 460 } 461 462 /** @since 28.0 */ 463 @Override 464 public final void awaitTerminated(Duration timeout) throws TimeoutException { 465 Service.super.awaitTerminated(timeout); 466 } 467 468 /** @since 15.0 */ 469 @Override 470 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 471 delegate.awaitTerminated(timeout, unit); 472 } 473 474 /** 475 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 476 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 477 * cancelled, the {@link #getNextSchedule} method will be called. 478 * 479 * @author Luke Sandberg 480 * @since 11.0 481 */ 482 public abstract static class CustomScheduler extends Scheduler { 483 484 /** A callable class that can reschedule itself using a {@link CustomScheduler}. */ 485 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 486 487 /** The underlying task. */ 488 private final Runnable wrappedRunnable; 489 490 /** The executor on which this Callable will be scheduled. */ 491 private final ScheduledExecutorService executor; 492 493 /** 494 * The service that is managing this callable. This is used so that failure can be reported 495 * properly. 496 */ 497 private final AbstractService service; 498 499 /** 500 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 501 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 502 * ensure that it is assigned atomically with being scheduled. 503 */ 504 private final ReentrantLock lock = new ReentrantLock(); 505 506 /** The future that represents the next execution of this task. */ 507 @GuardedBy("lock") 508 private @Nullable Future<Void> currentFuture; 509 510 ReschedulableCallable( 511 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 512 this.wrappedRunnable = runnable; 513 this.executor = executor; 514 this.service = service; 515 } 516 517 @Override 518 public Void call() throws Exception { 519 wrappedRunnable.run(); 520 reschedule(); 521 return null; 522 } 523 524 /** Atomically reschedules this task and assigns the new future to {@link #currentFuture}. */ 525 public void reschedule() { 526 // invoke the callback outside the lock, prevents some shenanigans. 527 Schedule schedule; 528 try { 529 schedule = CustomScheduler.this.getNextSchedule(); 530 } catch (Throwable t) { 531 service.notifyFailed(t); 532 return; 533 } 534 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 535 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 536 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 537 // correct order. 538 Throwable scheduleFailure = null; 539 lock.lock(); 540 try { 541 if (currentFuture == null || !currentFuture.isCancelled()) { 542 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 543 } 544 } catch (Throwable e) { 545 // If an exception is thrown by the subclass then we need to make sure that the service 546 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 547 // because the service does not monitor the state of the future so if the exception is not 548 // caught and forwarded to the service the task would stop executing but the service would 549 // have no idea. 550 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 551 // the AbstractService could monitor the future directly. Rescheduling is still hard... 552 // but it would help with some of these lock ordering issues. 553 scheduleFailure = e; 554 } finally { 555 lock.unlock(); 556 } 557 // Call notifyFailed outside the lock to avoid lock ordering issues. 558 if (scheduleFailure != null) { 559 service.notifyFailed(scheduleFailure); 560 } 561 } 562 563 // N.B. Only protect cancel and isCancelled because those are the only methods that are 564 // invoked by the AbstractScheduledService. 565 @Override 566 public boolean cancel(boolean mayInterruptIfRunning) { 567 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 568 lock.lock(); 569 try { 570 return currentFuture.cancel(mayInterruptIfRunning); 571 } finally { 572 lock.unlock(); 573 } 574 } 575 576 @Override 577 public boolean isCancelled() { 578 lock.lock(); 579 try { 580 return currentFuture.isCancelled(); 581 } finally { 582 lock.unlock(); 583 } 584 } 585 586 @Override 587 protected Future<Void> delegate() { 588 throw new UnsupportedOperationException( 589 "Only cancel and isCancelled is supported by this future"); 590 } 591 } 592 593 @Override 594 final Future<?> schedule( 595 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 596 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 597 task.reschedule(); 598 return task; 599 } 600 601 /** 602 * A value object that represents an absolute delay until a task should be invoked. 603 * 604 * @author Luke Sandberg 605 * @since 11.0 606 */ 607 protected static final class Schedule { 608 609 private final long delay; 610 private final TimeUnit unit; 611 612 /** 613 * @param delay the time from now to delay execution 614 * @param unit the time unit of the delay parameter 615 */ 616 public Schedule(long delay, TimeUnit unit) { 617 this.delay = delay; 618 this.unit = checkNotNull(unit); 619 } 620 } 621 622 /** 623 * Calculates the time at which to next invoke the task. 624 * 625 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 626 * on the same thread as the previous execution of {@link 627 * AbstractScheduledService#runOneIteration}. 628 * 629 * @return a schedule that defines the delay before the next execution. 630 */ 631 // TODO(cpovirk): @ForOverride 632 protected abstract Schedule getNextSchedule() throws Exception; 633 } 634}