001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.Internal.saturatedToNanos; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021 022import com.google.common.annotations.Beta; 023import com.google.common.annotations.GwtIncompatible; 024import com.google.common.base.Supplier; 025import com.google.errorprone.annotations.CanIgnoreReturnValue; 026import com.google.errorprone.annotations.concurrent.GuardedBy; 027import com.google.j2objc.annotations.WeakOuter; 028import java.time.Duration; 029import java.util.concurrent.Callable; 030import java.util.concurrent.Executor; 031import java.util.concurrent.Executors; 032import java.util.concurrent.Future; 033import java.util.concurrent.ScheduledExecutorService; 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.locks.ReentrantLock; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040import org.checkerframework.checker.nullness.qual.MonotonicNonNull; 041import org.checkerframework.checker.nullness.qual.Nullable; 042 043/** 044 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 045 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 046 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 047 * 048 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 049 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 050 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 051 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 052 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 053 * 054 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 055 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 056 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 057 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 058 * shared state without additional synchronization necessary for visibility to later executions of 059 * the life cycle methods. 060 * 061 * <h3>Usage Example</h3> 062 * 063 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 064 * rate limit itself. 065 * 066 * <pre>{@code 067 * class CrawlingService extends AbstractScheduledService { 068 * private Set<Uri> visited; 069 * private Queue<Uri> toCrawl; 070 * protected void startUp() throws Exception { 071 * toCrawl = readStartingUris(); 072 * } 073 * 074 * protected void runOneIteration() throws Exception { 075 * Uri uri = toCrawl.remove(); 076 * Collection<Uri> newUris = crawl(uri); 077 * visited.add(uri); 078 * for (Uri newUri : newUris) { 079 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 080 * } 081 * } 082 * 083 * protected void shutDown() throws Exception { 084 * saveUris(toCrawl); 085 * } 086 * 087 * protected Scheduler scheduler() { 088 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 089 * } 090 * } 091 * }</pre> 092 * 093 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 094 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 095 * rate limit the number of queries we perform. 096 * 097 * @author Luke Sandberg 098 * @since 11.0 099 */ 100@Beta 101@GwtIncompatible 102public abstract class AbstractScheduledService implements Service { 103 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 104 105 /** 106 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 107 * task. 108 * 109 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 110 * methods, these provide {@link Scheduler} instances for the common use case of running the 111 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 112 * CustomScheduler}. 113 * 114 * @author Luke Sandberg 115 * @since 11.0 116 */ 117 public abstract static class Scheduler { 118 /** 119 * Returns a {@link Scheduler} that schedules the task using the {@link 120 * ScheduledExecutorService#scheduleWithFixedDelay} method. 121 * 122 * @param initialDelay the time to delay first execution 123 * @param delay the delay between the termination of one execution and the commencement of the 124 * next 125 * @since 28.0 126 */ 127 public static Scheduler newFixedDelaySchedule(Duration initialDelay, Duration delay) { 128 return newFixedDelaySchedule( 129 saturatedToNanos(initialDelay), saturatedToNanos(delay), TimeUnit.NANOSECONDS); 130 } 131 132 /** 133 * Returns a {@link Scheduler} that schedules the task using the {@link 134 * ScheduledExecutorService#scheduleWithFixedDelay} method. 135 * 136 * @param initialDelay the time to delay first execution 137 * @param delay the delay between the termination of one execution and the commencement of the 138 * next 139 * @param unit the time unit of the initialDelay and delay parameters 140 */ 141 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 142 public static Scheduler newFixedDelaySchedule( 143 final long initialDelay, final long delay, final TimeUnit unit) { 144 checkNotNull(unit); 145 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 146 return new Scheduler() { 147 @Override 148 public Future<?> schedule( 149 AbstractService service, ScheduledExecutorService executor, Runnable task) { 150 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 151 } 152 }; 153 } 154 155 /** 156 * Returns a {@link Scheduler} that schedules the task using the {@link 157 * ScheduledExecutorService#scheduleAtFixedRate} method. 158 * 159 * @param initialDelay the time to delay first execution 160 * @param period the period between successive executions of the task 161 * @since 28.0 162 */ 163 public static Scheduler newFixedRateSchedule(Duration initialDelay, Duration period) { 164 return newFixedRateSchedule( 165 saturatedToNanos(initialDelay), saturatedToNanos(period), TimeUnit.NANOSECONDS); 166 } 167 168 /** 169 * Returns a {@link Scheduler} that schedules the task using the {@link 170 * ScheduledExecutorService#scheduleAtFixedRate} method. 171 * 172 * @param initialDelay the time to delay first execution 173 * @param period the period between successive executions of the task 174 * @param unit the time unit of the initialDelay and period parameters 175 */ 176 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 177 public static Scheduler newFixedRateSchedule( 178 final long initialDelay, final long period, final TimeUnit unit) { 179 checkNotNull(unit); 180 checkArgument(period > 0, "period must be > 0, found %s", period); 181 return new Scheduler() { 182 @Override 183 public Future<?> schedule( 184 AbstractService service, ScheduledExecutorService executor, Runnable task) { 185 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 186 } 187 }; 188 } 189 190 /** Schedules the task to run on the provided executor on behalf of the service. */ 191 abstract Future<?> schedule( 192 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 193 194 private Scheduler() {} 195 } 196 197 /* use AbstractService for state management */ 198 private final AbstractService delegate = new ServiceDelegate(); 199 200 @WeakOuter 201 private final class ServiceDelegate extends AbstractService { 202 203 // A handle to the running task so that we can stop it when a shutdown has been requested. 204 // These two fields are volatile because their values will be accessed from multiple threads. 205 @MonotonicNonNull private volatile Future<?> runningTask; 206 @MonotonicNonNull private volatile ScheduledExecutorService executorService; 207 208 // This lock protects the task so we can ensure that none of the template methods (startUp, 209 // shutDown or runOneIteration) run concurrently with one another. 210 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 211 // lock. 212 private final ReentrantLock lock = new ReentrantLock(); 213 214 @WeakOuter 215 class Task implements Runnable { 216 @Override 217 public void run() { 218 lock.lock(); 219 try { 220 if (runningTask.isCancelled()) { 221 // task may have been cancelled while blocked on the lock. 222 return; 223 } 224 AbstractScheduledService.this.runOneIteration(); 225 } catch (Throwable t) { 226 try { 227 shutDown(); 228 } catch (Exception ignored) { 229 logger.log( 230 Level.WARNING, 231 "Error while attempting to shut down the service after failure.", 232 ignored); 233 } 234 notifyFailed(t); 235 runningTask.cancel(false); // prevent future invocations. 236 } finally { 237 lock.unlock(); 238 } 239 } 240 } 241 242 private final Runnable task = new Task(); 243 244 @Override 245 protected final void doStart() { 246 executorService = 247 MoreExecutors.renamingDecorator( 248 executor(), 249 new Supplier<String>() { 250 @Override 251 public String get() { 252 return serviceName() + " " + state(); 253 } 254 }); 255 executorService.execute( 256 new Runnable() { 257 @Override 258 public void run() { 259 lock.lock(); 260 try { 261 startUp(); 262 runningTask = scheduler().schedule(delegate, executorService, task); 263 notifyStarted(); 264 } catch (Throwable t) { 265 notifyFailed(t); 266 if (runningTask != null) { 267 // prevent the task from running if possible 268 runningTask.cancel(false); 269 } 270 } finally { 271 lock.unlock(); 272 } 273 } 274 }); 275 } 276 277 @Override 278 protected final void doStop() { 279 runningTask.cancel(false); 280 executorService.execute( 281 new Runnable() { 282 @Override 283 public void run() { 284 try { 285 lock.lock(); 286 try { 287 if (state() != State.STOPPING) { 288 // This means that the state has changed since we were scheduled. This implies 289 // that an execution of runOneIteration has thrown an exception and we have 290 // transitioned to a failed state, also this means that shutDown has already 291 // been called, so we do not want to call it again. 292 return; 293 } 294 shutDown(); 295 } finally { 296 lock.unlock(); 297 } 298 notifyStopped(); 299 } catch (Throwable t) { 300 notifyFailed(t); 301 } 302 } 303 }); 304 } 305 306 @Override 307 public String toString() { 308 return AbstractScheduledService.this.toString(); 309 } 310 } 311 312 /** Constructor for use by subclasses. */ 313 protected AbstractScheduledService() {} 314 315 /** 316 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 317 * the service will transition to the {@link Service.State#FAILED} state and this method will no 318 * longer be called. 319 */ 320 protected abstract void runOneIteration() throws Exception; 321 322 /** 323 * Start the service. 324 * 325 * <p>By default this method does nothing. 326 */ 327 protected void startUp() throws Exception {} 328 329 /** 330 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 331 * 332 * <p>By default this method does nothing. 333 */ 334 protected void shutDown() throws Exception {} 335 336 /** 337 * Returns the {@link Scheduler} object used to configure this service. This method will only be 338 * called once. 339 */ 340 // TODO(cpovirk): @ForOverride 341 protected abstract Scheduler scheduler(); 342 343 /** 344 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 345 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 346 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 347 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 348 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 349 * instance. This method is guaranteed to only be called once. 350 * 351 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 352 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, 353 * the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 354 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 355 * fails}. 356 */ 357 protected ScheduledExecutorService executor() { 358 @WeakOuter 359 class ThreadFactoryImpl implements ThreadFactory { 360 @Override 361 public Thread newThread(Runnable runnable) { 362 return MoreExecutors.newThread(serviceName(), runnable); 363 } 364 } 365 final ScheduledExecutorService executor = 366 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 367 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 368 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 369 // Technically this listener is added after start() was called so it is a little gross, but it 370 // is called within doStart() so we know that the service cannot terminate or fail concurrently 371 // with adding this listener so it is impossible to miss an event that we are interested in. 372 addListener( 373 new Listener() { 374 @Override 375 public void terminated(State from) { 376 executor.shutdown(); 377 } 378 379 @Override 380 public void failed(State from, Throwable failure) { 381 executor.shutdown(); 382 } 383 }, 384 directExecutor()); 385 return executor; 386 } 387 388 /** 389 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 390 * debugging output. 391 * 392 * @since 14.0 393 */ 394 protected String serviceName() { 395 return getClass().getSimpleName(); 396 } 397 398 @Override 399 public String toString() { 400 return serviceName() + " [" + state() + "]"; 401 } 402 403 @Override 404 public final boolean isRunning() { 405 return delegate.isRunning(); 406 } 407 408 @Override 409 public final State state() { 410 return delegate.state(); 411 } 412 413 /** @since 13.0 */ 414 @Override 415 public final void addListener(Listener listener, Executor executor) { 416 delegate.addListener(listener, executor); 417 } 418 419 /** @since 14.0 */ 420 @Override 421 public final Throwable failureCause() { 422 return delegate.failureCause(); 423 } 424 425 /** @since 15.0 */ 426 @CanIgnoreReturnValue 427 @Override 428 public final Service startAsync() { 429 delegate.startAsync(); 430 return this; 431 } 432 433 /** @since 15.0 */ 434 @CanIgnoreReturnValue 435 @Override 436 public final Service stopAsync() { 437 delegate.stopAsync(); 438 return this; 439 } 440 441 /** @since 15.0 */ 442 @Override 443 public final void awaitRunning() { 444 delegate.awaitRunning(); 445 } 446 447 /** @since 15.0 */ 448 @Override 449 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 450 delegate.awaitRunning(timeout, unit); 451 } 452 453 /** @since 15.0 */ 454 @Override 455 public final void awaitTerminated() { 456 delegate.awaitTerminated(); 457 } 458 459 /** @since 15.0 */ 460 @Override 461 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 462 delegate.awaitTerminated(timeout, unit); 463 } 464 465 /** 466 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 467 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 468 * cancelled, the {@link #getNextSchedule} method will be called. 469 * 470 * @author Luke Sandberg 471 * @since 11.0 472 */ 473 @Beta 474 public abstract static class CustomScheduler extends Scheduler { 475 476 /** A callable class that can reschedule itself using a {@link CustomScheduler}. */ 477 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 478 479 /** The underlying task. */ 480 private final Runnable wrappedRunnable; 481 482 /** The executor on which this Callable will be scheduled. */ 483 private final ScheduledExecutorService executor; 484 485 /** 486 * The service that is managing this callable. This is used so that failure can be reported 487 * properly. 488 */ 489 private final AbstractService service; 490 491 /** 492 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 493 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 494 * ensure that it is assigned atomically with being scheduled. 495 */ 496 private final ReentrantLock lock = new ReentrantLock(); 497 498 /** The future that represents the next execution of this task. */ 499 @GuardedBy("lock") 500 private @Nullable Future<Void> currentFuture; 501 502 ReschedulableCallable( 503 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 504 this.wrappedRunnable = runnable; 505 this.executor = executor; 506 this.service = service; 507 } 508 509 @Override 510 public Void call() throws Exception { 511 wrappedRunnable.run(); 512 reschedule(); 513 return null; 514 } 515 516 /** Atomically reschedules this task and assigns the new future to {@link #currentFuture}. */ 517 public void reschedule() { 518 // invoke the callback outside the lock, prevents some shenanigans. 519 Schedule schedule; 520 try { 521 schedule = CustomScheduler.this.getNextSchedule(); 522 } catch (Throwable t) { 523 service.notifyFailed(t); 524 return; 525 } 526 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 527 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 528 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 529 // correct order. 530 Throwable scheduleFailure = null; 531 lock.lock(); 532 try { 533 if (currentFuture == null || !currentFuture.isCancelled()) { 534 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 535 } 536 } catch (Throwable e) { 537 // If an exception is thrown by the subclass then we need to make sure that the service 538 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 539 // because the service does not monitor the state of the future so if the exception is not 540 // caught and forwarded to the service the task would stop executing but the service would 541 // have no idea. 542 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 543 // the AbstractService could monitor the future directly. Rescheduling is still hard... 544 // but it would help with some of these lock ordering issues. 545 scheduleFailure = e; 546 } finally { 547 lock.unlock(); 548 } 549 // Call notifyFailed outside the lock to avoid lock ordering issues. 550 if (scheduleFailure != null) { 551 service.notifyFailed(scheduleFailure); 552 } 553 } 554 555 // N.B. Only protect cancel and isCancelled because those are the only methods that are 556 // invoked by the AbstractScheduledService. 557 @Override 558 public boolean cancel(boolean mayInterruptIfRunning) { 559 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 560 lock.lock(); 561 try { 562 return currentFuture.cancel(mayInterruptIfRunning); 563 } finally { 564 lock.unlock(); 565 } 566 } 567 568 @Override 569 public boolean isCancelled() { 570 lock.lock(); 571 try { 572 return currentFuture.isCancelled(); 573 } finally { 574 lock.unlock(); 575 } 576 } 577 578 @Override 579 protected Future<Void> delegate() { 580 throw new UnsupportedOperationException( 581 "Only cancel and isCancelled is supported by this future"); 582 } 583 } 584 585 @Override 586 final Future<?> schedule( 587 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 588 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 589 task.reschedule(); 590 return task; 591 } 592 593 /** 594 * A value object that represents an absolute delay until a task should be invoked. 595 * 596 * @author Luke Sandberg 597 * @since 11.0 598 */ 599 @Beta 600 protected static final class Schedule { 601 602 private final long delay; 603 private final TimeUnit unit; 604 605 /** 606 * @param delay the time from now to delay execution 607 * @param unit the time unit of the delay parameter 608 */ 609 public Schedule(long delay, TimeUnit unit) { 610 this.delay = delay; 611 this.unit = checkNotNull(unit); 612 } 613 } 614 615 /** 616 * Calculates the time at which to next invoke the task. 617 * 618 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 619 * on the same thread as the previous execution of {@link 620 * AbstractScheduledService#runOneIteration}. 621 * 622 * @return a schedule that defines the delay before the next execution. 623 */ 624 // TODO(cpovirk): @ForOverride 625 protected abstract Schedule getNextSchedule() throws Exception; 626 } 627}