001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 022import static java.util.Objects.requireNonNull; 023 024import com.google.common.annotations.GwtIncompatible; 025import com.google.common.annotations.J2ktIncompatible; 026import com.google.errorprone.annotations.CanIgnoreReturnValue; 027import com.google.errorprone.annotations.concurrent.GuardedBy; 028import com.google.j2objc.annotations.WeakOuter; 029import java.util.concurrent.Callable; 030import java.util.concurrent.Executor; 031import java.util.concurrent.Executors; 032import java.util.concurrent.Future; 033import java.util.concurrent.ScheduledExecutorService; 034import java.util.concurrent.ScheduledFuture; 035import java.util.concurrent.ThreadFactory; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.concurrent.locks.ReentrantLock; 039import java.util.logging.Level; 040import javax.annotation.CheckForNull; 041import org.checkerframework.checker.nullness.qual.Nullable; 042 043/** 044 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 045 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 046 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 047 * 048 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 049 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 050 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 051 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 052 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 053 * 054 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 055 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 056 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 057 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 058 * shared state without additional synchronization necessary for visibility to later executions of 059 * the life cycle methods. 060 * 061 * <h3>Usage Example</h3> 062 * 063 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 064 * rate limit itself. 065 * 066 * <pre>{@code 067 * class CrawlingService extends AbstractScheduledService { 068 * private Set<Uri> visited; 069 * private Queue<Uri> toCrawl; 070 * protected void startUp() throws Exception { 071 * toCrawl = readStartingUris(); 072 * } 073 * 074 * protected void runOneIteration() throws Exception { 075 * Uri uri = toCrawl.remove(); 076 * Collection<Uri> newUris = crawl(uri); 077 * visited.add(uri); 078 * for (Uri newUri : newUris) { 079 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 080 * } 081 * } 082 * 083 * protected void shutDown() throws Exception { 084 * saveUris(toCrawl); 085 * } 086 * 087 * protected Scheduler scheduler() { 088 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 089 * } 090 * } 091 * }</pre> 092 * 093 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 094 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 095 * rate limit the number of queries we perform. 096 * 097 * @author Luke Sandberg 098 * @since 11.0 099 */ 100@GwtIncompatible 101@J2ktIncompatible 102@ElementTypesAreNonnullByDefault 103public abstract class AbstractScheduledService implements Service { 104 private static final LazyLogger logger = new LazyLogger(AbstractScheduledService.class); 105 106 /** 107 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 108 * task. 109 * 110 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 111 * methods, these provide {@link Scheduler} instances for the common use case of running the 112 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 113 * CustomScheduler}. 114 * 115 * @author Luke Sandberg 116 * @since 11.0 117 */ 118 public abstract static class Scheduler { 119 /** 120 * Returns a {@link Scheduler} that schedules the task using the {@link 121 * ScheduledExecutorService#scheduleWithFixedDelay} method. 122 * 123 * @param initialDelay the time to delay first execution 124 * @param delay the delay between the termination of one execution and the commencement of the 125 * next 126 * @param unit the time unit of the initialDelay and delay parameters 127 */ 128 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 129 public static Scheduler newFixedDelaySchedule( 130 final long initialDelay, final long delay, final TimeUnit unit) { 131 checkNotNull(unit); 132 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 133 return new Scheduler() { 134 @Override 135 public Cancellable schedule( 136 AbstractService service, ScheduledExecutorService executor, Runnable task) { 137 return new FutureAsCancellable( 138 executor.scheduleWithFixedDelay(task, initialDelay, delay, unit)); 139 } 140 }; 141 } 142 143 /** 144 * Returns a {@link Scheduler} that schedules the task using the {@link 145 * ScheduledExecutorService#scheduleAtFixedRate} method. 146 * 147 * @param initialDelay the time to delay first execution 148 * @param period the period between successive executions of the task 149 * @param unit the time unit of the initialDelay and period parameters 150 */ 151 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 152 public static Scheduler newFixedRateSchedule( 153 final long initialDelay, final long period, final TimeUnit unit) { 154 checkNotNull(unit); 155 checkArgument(period > 0, "period must be > 0, found %s", period); 156 return new Scheduler() { 157 @Override 158 public Cancellable schedule( 159 AbstractService service, ScheduledExecutorService executor, Runnable task) { 160 return new FutureAsCancellable( 161 executor.scheduleAtFixedRate(task, initialDelay, period, unit)); 162 } 163 }; 164 } 165 166 /** Schedules the task to run on the provided executor on behalf of the service. */ 167 abstract Cancellable schedule( 168 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 169 170 private Scheduler() {} 171 } 172 173 /* use AbstractService for state management */ 174 private final AbstractService delegate = new ServiceDelegate(); 175 176 @WeakOuter 177 private final class ServiceDelegate extends AbstractService { 178 179 // A handle to the running task so that we can stop it when a shutdown has been requested. 180 // These two fields are volatile because their values will be accessed from multiple threads. 181 @CheckForNull private volatile Cancellable runningTask; 182 @CheckForNull private volatile ScheduledExecutorService executorService; 183 184 // This lock protects the task so we can ensure that none of the template methods (startUp, 185 // shutDown or runOneIteration) run concurrently with one another. 186 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 187 // lock. 188 private final ReentrantLock lock = new ReentrantLock(); 189 190 @WeakOuter 191 class Task implements Runnable { 192 @Override 193 public void run() { 194 lock.lock(); 195 try { 196 /* 197 * requireNonNull is safe because Task isn't run (or at least it doesn't succeed in taking 198 * the lock) until after it's scheduled and the runningTask field is set. 199 */ 200 if (requireNonNull(runningTask).isCancelled()) { 201 // task may have been cancelled while blocked on the lock. 202 return; 203 } 204 AbstractScheduledService.this.runOneIteration(); 205 } catch (Throwable t) { 206 restoreInterruptIfIsInterruptedException(t); 207 try { 208 shutDown(); 209 } catch (Exception ignored) { 210 restoreInterruptIfIsInterruptedException(ignored); 211 logger 212 .get() 213 .log( 214 Level.WARNING, 215 "Error while attempting to shut down the service after failure.", 216 ignored); 217 } 218 notifyFailed(t); 219 // requireNonNull is safe now, just as it was above. 220 requireNonNull(runningTask).cancel(false); // prevent future invocations. 221 } finally { 222 lock.unlock(); 223 } 224 } 225 } 226 227 private final Runnable task = new Task(); 228 229 @Override 230 protected final void doStart() { 231 executorService = 232 MoreExecutors.renamingDecorator(executor(), () -> serviceName() + " " + state()); 233 executorService.execute( 234 () -> { 235 lock.lock(); 236 try { 237 startUp(); 238 /* 239 * requireNonNull is safe because executorService is never cleared after the 240 * assignment above. 241 */ 242 requireNonNull(executorService); 243 runningTask = scheduler().schedule(delegate, executorService, task); 244 notifyStarted(); 245 } catch (Throwable t) { 246 restoreInterruptIfIsInterruptedException(t); 247 notifyFailed(t); 248 if (runningTask != null) { 249 // prevent the task from running if possible 250 runningTask.cancel(false); 251 } 252 } finally { 253 lock.unlock(); 254 } 255 }); 256 } 257 258 @Override 259 protected final void doStop() { 260 // Both requireNonNull calls are safe because doStop can run only after a successful doStart. 261 requireNonNull(runningTask); 262 requireNonNull(executorService); 263 runningTask.cancel(false); 264 executorService.execute( 265 () -> { 266 try { 267 lock.lock(); 268 try { 269 if (state() != State.STOPPING) { 270 // This means that the state has changed since we were scheduled. This implies 271 // that an execution of runOneIteration has thrown an exception and we have 272 // transitioned to a failed state, also this means that shutDown has already 273 // been called, so we do not want to call it again. 274 return; 275 } 276 shutDown(); 277 } finally { 278 lock.unlock(); 279 } 280 notifyStopped(); 281 } catch (Throwable t) { 282 restoreInterruptIfIsInterruptedException(t); 283 notifyFailed(t); 284 } 285 }); 286 } 287 288 @Override 289 public String toString() { 290 return AbstractScheduledService.this.toString(); 291 } 292 } 293 294 /** Constructor for use by subclasses. */ 295 protected AbstractScheduledService() {} 296 297 /** 298 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 299 * the service will transition to the {@link Service.State#FAILED} state and this method will no 300 * longer be called. 301 */ 302 protected abstract void runOneIteration() throws Exception; 303 304 /** 305 * Start the service. 306 * 307 * <p>By default this method does nothing. 308 */ 309 protected void startUp() throws Exception {} 310 311 /** 312 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 313 * 314 * <p>By default this method does nothing. 315 */ 316 protected void shutDown() throws Exception {} 317 318 /** 319 * Returns the {@link Scheduler} object used to configure this service. This method will only be 320 * called once. 321 */ 322 // TODO(cpovirk): @ForOverride 323 protected abstract Scheduler scheduler(); 324 325 /** 326 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 327 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 328 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 329 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 330 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 331 * instance. This method is guaranteed to only be called once. 332 * 333 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread pool 334 * that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, the 335 * pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 336 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 337 * fails}. 338 */ 339 protected ScheduledExecutorService executor() { 340 @WeakOuter 341 class ThreadFactoryImpl implements ThreadFactory { 342 @Override 343 public Thread newThread(Runnable runnable) { 344 return MoreExecutors.newThread(serviceName(), runnable); 345 } 346 } 347 final ScheduledExecutorService executor = 348 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 349 // Add a listener to shut down the executor after the service is stopped. This ensures that the 350 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 351 // Technically this listener is added after start() was called so it is a little gross, but it 352 // is called within doStart() so we know that the service cannot terminate or fail concurrently 353 // with adding this listener so it is impossible to miss an event that we are interested in. 354 addListener( 355 new Listener() { 356 @Override 357 public void terminated(State from) { 358 executor.shutdown(); 359 } 360 361 @Override 362 public void failed(State from, Throwable failure) { 363 executor.shutdown(); 364 } 365 }, 366 directExecutor()); 367 return executor; 368 } 369 370 /** 371 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 372 * debugging output. 373 * 374 * @since 14.0 375 */ 376 protected String serviceName() { 377 return getClass().getSimpleName(); 378 } 379 380 @Override 381 public String toString() { 382 return serviceName() + " [" + state() + "]"; 383 } 384 385 @Override 386 public final boolean isRunning() { 387 return delegate.isRunning(); 388 } 389 390 @Override 391 public final State state() { 392 return delegate.state(); 393 } 394 395 /** @since 13.0 */ 396 @Override 397 public final void addListener(Listener listener, Executor executor) { 398 delegate.addListener(listener, executor); 399 } 400 401 /** @since 14.0 */ 402 @Override 403 public final Throwable failureCause() { 404 return delegate.failureCause(); 405 } 406 407 /** @since 15.0 */ 408 @CanIgnoreReturnValue 409 @Override 410 public final Service startAsync() { 411 delegate.startAsync(); 412 return this; 413 } 414 415 /** @since 15.0 */ 416 @CanIgnoreReturnValue 417 @Override 418 public final Service stopAsync() { 419 delegate.stopAsync(); 420 return this; 421 } 422 423 /** @since 15.0 */ 424 @Override 425 public final void awaitRunning() { 426 delegate.awaitRunning(); 427 } 428 429 /** @since 15.0 */ 430 @Override 431 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 432 delegate.awaitRunning(timeout, unit); 433 } 434 435 /** @since 15.0 */ 436 @Override 437 public final void awaitTerminated() { 438 delegate.awaitTerminated(); 439 } 440 441 /** @since 15.0 */ 442 @Override 443 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 444 delegate.awaitTerminated(timeout, unit); 445 } 446 447 interface Cancellable { 448 void cancel(boolean mayInterruptIfRunning); 449 450 boolean isCancelled(); 451 } 452 453 private static final class FutureAsCancellable implements Cancellable { 454 private final Future<?> delegate; 455 456 FutureAsCancellable(Future<?> delegate) { 457 this.delegate = delegate; 458 } 459 460 @Override 461 public void cancel(boolean mayInterruptIfRunning) { 462 delegate.cancel(mayInterruptIfRunning); 463 } 464 465 @Override 466 public boolean isCancelled() { 467 return delegate.isCancelled(); 468 } 469 } 470 471 /** 472 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 473 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 474 * cancelled, the {@link #getNextSchedule} method will be called. 475 * 476 * @author Luke Sandberg 477 * @since 11.0 478 */ 479 public abstract static class CustomScheduler extends Scheduler { 480 481 /** A callable class that can reschedule itself using a {@link CustomScheduler}. */ 482 private final class ReschedulableCallable implements Callable<@Nullable Void> { 483 484 /** The underlying task. */ 485 private final Runnable wrappedRunnable; 486 487 /** The executor on which this Callable will be scheduled. */ 488 private final ScheduledExecutorService executor; 489 490 /** 491 * The service that is managing this callable. This is used so that failure can be reported 492 * properly. 493 */ 494 /* 495 * This reference is part of a reference cycle, which is typically something we want to avoid 496 * under j2objc -- but it is not detected by our j2objc cycle test. The cycle: 497 * 498 * - CustomScheduler.service contains an instance of ServiceDelegate. (It needs it so that it 499 * can call notifyFailed.) 500 * 501 * - ServiceDelegate.runningTask contains an instance of ReschedulableCallable (at least in 502 * the case that the service is using CustomScheduler). (It needs it so that it can cancel 503 * the task and detect whether it has been cancelled.) 504 * 505 * - ReschedulableCallable has a reference back to its enclosing CustomScheduler. (It needs it 506 * so that it can call getNextSchedule). 507 * 508 * Maybe there is a way to avoid this cycle. But we think the cycle is safe enough to ignore: 509 * Each task is retained for only as long as it is running -- so it's retained only as long as 510 * it would already be retained by the underlying executor. 511 * 512 * If the cycle test starts reporting this cycle in the future, we should add an entry to 513 * cycle_suppress_list.txt. 514 */ 515 private final AbstractService service; 516 517 /** 518 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 519 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 520 * ensure that it is assigned atomically with being scheduled. 521 */ 522 private final ReentrantLock lock = new ReentrantLock(); 523 524 /** The future that represents the next execution of this task. */ 525 @GuardedBy("lock") 526 @CheckForNull 527 private SupplantableFuture cancellationDelegate; 528 529 ReschedulableCallable( 530 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 531 this.wrappedRunnable = runnable; 532 this.executor = executor; 533 this.service = service; 534 } 535 536 @Override 537 @CheckForNull 538 public Void call() throws Exception { 539 wrappedRunnable.run(); 540 reschedule(); 541 return null; 542 } 543 544 /** 545 * Atomically reschedules this task and assigns the new future to {@link 546 * #cancellationDelegate}. 547 */ 548 @CanIgnoreReturnValue 549 public Cancellable reschedule() { 550 // invoke the callback outside the lock, prevents some shenanigans. 551 Schedule schedule; 552 try { 553 schedule = CustomScheduler.this.getNextSchedule(); 554 } catch (Throwable t) { 555 restoreInterruptIfIsInterruptedException(t); 556 service.notifyFailed(t); 557 return new FutureAsCancellable(immediateCancelledFuture()); 558 } 559 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 560 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 561 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 562 // correct order. 563 Throwable scheduleFailure = null; 564 Cancellable toReturn; 565 lock.lock(); 566 try { 567 toReturn = initializeOrUpdateCancellationDelegate(schedule); 568 } catch (Throwable e) { 569 // Any Exception is either a RuntimeException or sneaky checked exception. 570 // 571 // If an exception is thrown by the subclass then we need to make sure that the service 572 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 573 // because the service does not monitor the state of the future so if the exception is not 574 // caught and forwarded to the service the task would stop executing but the service would 575 // have no idea. 576 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 577 // the AbstractService could monitor the future directly. Rescheduling is still hard... 578 // but it would help with some of these lock ordering issues. 579 scheduleFailure = e; 580 toReturn = new FutureAsCancellable(immediateCancelledFuture()); 581 } finally { 582 lock.unlock(); 583 } 584 // Call notifyFailed outside the lock to avoid lock ordering issues. 585 if (scheduleFailure != null) { 586 service.notifyFailed(scheduleFailure); 587 } 588 return toReturn; 589 } 590 591 @GuardedBy("lock") 592 /* 593 * The GuardedBy checker warns us that we're not holding cancellationDelegate.lock. But in 594 * fact we are holding it because it is the same as this.lock, which we know we are holding, 595 * thanks to @GuardedBy above. (cancellationDelegate.lock is initialized to this.lock in the 596 * call to `new SupplantableFuture` below.) 597 */ 598 @SuppressWarnings("GuardedBy") 599 private Cancellable initializeOrUpdateCancellationDelegate(Schedule schedule) { 600 if (cancellationDelegate == null) { 601 return cancellationDelegate = new SupplantableFuture(lock, submitToExecutor(schedule)); 602 } 603 if (!cancellationDelegate.currentFuture.isCancelled()) { 604 cancellationDelegate.currentFuture = submitToExecutor(schedule); 605 } 606 return cancellationDelegate; 607 } 608 609 private ScheduledFuture<@Nullable Void> submitToExecutor(Schedule schedule) { 610 return executor.schedule(this, schedule.delay, schedule.unit); 611 } 612 } 613 614 /** 615 * Contains the most recently submitted {@code Future}, which may be cancelled or updated, 616 * always under a lock. 617 */ 618 private static final class SupplantableFuture implements Cancellable { 619 private final ReentrantLock lock; 620 621 @GuardedBy("lock") 622 private Future<@Nullable Void> currentFuture; 623 624 SupplantableFuture(ReentrantLock lock, Future<@Nullable Void> currentFuture) { 625 this.lock = lock; 626 this.currentFuture = currentFuture; 627 } 628 629 @Override 630 public void cancel(boolean mayInterruptIfRunning) { 631 /* 632 * Lock to ensure that a task cannot be rescheduled while a cancel is ongoing. 633 * 634 * In theory, cancel() could execute arbitrary listeners -- bad to do while holding a lock. 635 * However, we don't expose currentFuture to users, so they can't attach listeners. And the 636 * Future might not even be a ListenableFuture, just a plain Future. That said, similar 637 * problems can exist with methods like FutureTask.done(), not to mention slow calls to 638 * Thread.interrupt() (as discussed in InterruptibleTask). At the end of the day, it's 639 * unlikely that cancel() will be slow, so we can probably get away with calling it while 640 * holding a lock. Still, it would be nice to avoid somehow. 641 */ 642 lock.lock(); 643 try { 644 currentFuture.cancel(mayInterruptIfRunning); 645 } finally { 646 lock.unlock(); 647 } 648 } 649 650 @Override 651 public boolean isCancelled() { 652 lock.lock(); 653 try { 654 return currentFuture.isCancelled(); 655 } finally { 656 lock.unlock(); 657 } 658 } 659 } 660 661 @Override 662 final Cancellable schedule( 663 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 664 return new ReschedulableCallable(service, executor, runnable).reschedule(); 665 } 666 667 /** 668 * A value object that represents an absolute delay until a task should be invoked. 669 * 670 * @author Luke Sandberg 671 * @since 11.0 672 */ 673 protected static final class Schedule { 674 675 private final long delay; 676 private final TimeUnit unit; 677 678 /** 679 * @param delay the time from now to delay execution 680 * @param unit the time unit of the delay parameter 681 */ 682 public Schedule(long delay, TimeUnit unit) { 683 this.delay = delay; 684 this.unit = checkNotNull(unit); 685 } 686 } 687 688 /** 689 * Calculates the time at which to next invoke the task. 690 * 691 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 692 * on the same thread as the previous execution of {@link 693 * AbstractScheduledService#runOneIteration}. 694 * 695 * @return a schedule that defines the delay before the next execution. 696 */ 697 // TODO(cpovirk): @ForOverride 698 protected abstract Schedule getNextSchedule() throws Exception; 699 } 700}