001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021import static com.google.common.util.concurrent.Platform.restoreInterruptIfIsInterruptedException; 022import static java.util.Objects.requireNonNull; 023 024import com.google.common.annotations.GwtIncompatible; 025import com.google.common.annotations.J2ktIncompatible; 026import com.google.errorprone.annotations.CanIgnoreReturnValue; 027import com.google.errorprone.annotations.concurrent.GuardedBy; 028import com.google.j2objc.annotations.WeakOuter; 029import java.util.concurrent.Callable; 030import java.util.concurrent.Executor; 031import java.util.concurrent.Executors; 032import java.util.concurrent.Future; 033import java.util.concurrent.ScheduledExecutorService; 034import java.util.concurrent.ScheduledFuture; 035import java.util.concurrent.ThreadFactory; 036import java.util.concurrent.TimeUnit; 037import java.util.concurrent.TimeoutException; 038import java.util.concurrent.locks.ReentrantLock; 039import java.util.logging.Level; 040import java.util.logging.Logger; 041import javax.annotation.CheckForNull; 042import org.checkerframework.checker.nullness.qual.Nullable; 043 044/** 045 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 046 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 047 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 048 * 049 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 050 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 051 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 052 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 053 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 054 * 055 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 056 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 057 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 058 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 059 * shared state without additional synchronization necessary for visibility to later executions of 060 * the life cycle methods. 061 * 062 * <h3>Usage Example</h3> 063 * 064 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 065 * rate limit itself. 066 * 067 * <pre>{@code 068 * class CrawlingService extends AbstractScheduledService { 069 * private Set<Uri> visited; 070 * private Queue<Uri> toCrawl; 071 * protected void startUp() throws Exception { 072 * toCrawl = readStartingUris(); 073 * } 074 * 075 * protected void runOneIteration() throws Exception { 076 * Uri uri = toCrawl.remove(); 077 * Collection<Uri> newUris = crawl(uri); 078 * visited.add(uri); 079 * for (Uri newUri : newUris) { 080 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 081 * } 082 * } 083 * 084 * protected void shutDown() throws Exception { 085 * saveUris(toCrawl); 086 * } 087 * 088 * protected Scheduler scheduler() { 089 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 090 * } 091 * } 092 * }</pre> 093 * 094 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 095 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 096 * rate limit the number of queries we perform. 097 * 098 * @author Luke Sandberg 099 * @since 11.0 100 */ 101@GwtIncompatible 102@J2ktIncompatible 103@ElementTypesAreNonnullByDefault 104public abstract class AbstractScheduledService implements Service { 105 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 106 107 /** 108 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 109 * task. 110 * 111 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 112 * methods, these provide {@link Scheduler} instances for the common use case of running the 113 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 114 * CustomScheduler}. 115 * 116 * @author Luke Sandberg 117 * @since 11.0 118 */ 119 public abstract static class Scheduler { 120 /** 121 * Returns a {@link Scheduler} that schedules the task using the {@link 122 * ScheduledExecutorService#scheduleWithFixedDelay} method. 123 * 124 * @param initialDelay the time to delay first execution 125 * @param delay the delay between the termination of one execution and the commencement of the 126 * next 127 * @param unit the time unit of the initialDelay and delay parameters 128 */ 129 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 130 public static Scheduler newFixedDelaySchedule( 131 final long initialDelay, final long delay, final TimeUnit unit) { 132 checkNotNull(unit); 133 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 134 return new Scheduler() { 135 @Override 136 public Cancellable schedule( 137 AbstractService service, ScheduledExecutorService executor, Runnable task) { 138 return new FutureAsCancellable( 139 executor.scheduleWithFixedDelay(task, initialDelay, delay, unit)); 140 } 141 }; 142 } 143 144 /** 145 * Returns a {@link Scheduler} that schedules the task using the {@link 146 * ScheduledExecutorService#scheduleAtFixedRate} method. 147 * 148 * @param initialDelay the time to delay first execution 149 * @param period the period between successive executions of the task 150 * @param unit the time unit of the initialDelay and period parameters 151 */ 152 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 153 public static Scheduler newFixedRateSchedule( 154 final long initialDelay, final long period, final TimeUnit unit) { 155 checkNotNull(unit); 156 checkArgument(period > 0, "period must be > 0, found %s", period); 157 return new Scheduler() { 158 @Override 159 public Cancellable schedule( 160 AbstractService service, ScheduledExecutorService executor, Runnable task) { 161 return new FutureAsCancellable( 162 executor.scheduleAtFixedRate(task, initialDelay, period, unit)); 163 } 164 }; 165 } 166 167 /** Schedules the task to run on the provided executor on behalf of the service. */ 168 abstract Cancellable schedule( 169 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 170 171 private Scheduler() {} 172 } 173 174 /* use AbstractService for state management */ 175 private final AbstractService delegate = new ServiceDelegate(); 176 177 @WeakOuter 178 private final class ServiceDelegate extends AbstractService { 179 180 // A handle to the running task so that we can stop it when a shutdown has been requested. 181 // These two fields are volatile because their values will be accessed from multiple threads. 182 @CheckForNull private volatile Cancellable runningTask; 183 @CheckForNull private volatile ScheduledExecutorService executorService; 184 185 // This lock protects the task so we can ensure that none of the template methods (startUp, 186 // shutDown or runOneIteration) run concurrently with one another. 187 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 188 // lock. 189 private final ReentrantLock lock = new ReentrantLock(); 190 191 @WeakOuter 192 class Task implements Runnable { 193 @Override 194 public void run() { 195 lock.lock(); 196 try { 197 /* 198 * requireNonNull is safe because Task isn't run (or at least it doesn't succeed in taking 199 * the lock) until after it's scheduled and the runningTask field is set. 200 */ 201 if (requireNonNull(runningTask).isCancelled()) { 202 // task may have been cancelled while blocked on the lock. 203 return; 204 } 205 AbstractScheduledService.this.runOneIteration(); 206 } catch (Throwable t) { 207 restoreInterruptIfIsInterruptedException(t); 208 try { 209 shutDown(); 210 } catch (Exception ignored) { 211 restoreInterruptIfIsInterruptedException(ignored); 212 logger.log( 213 Level.WARNING, 214 "Error while attempting to shut down the service after failure.", 215 ignored); 216 } 217 notifyFailed(t); 218 // requireNonNull is safe now, just as it was above. 219 requireNonNull(runningTask).cancel(false); // prevent future invocations. 220 } finally { 221 lock.unlock(); 222 } 223 } 224 } 225 226 private final Runnable task = new Task(); 227 228 @Override 229 protected final void doStart() { 230 executorService = 231 MoreExecutors.renamingDecorator(executor(), () -> serviceName() + " " + state()); 232 executorService.execute( 233 () -> { 234 lock.lock(); 235 try { 236 startUp(); 237 /* 238 * requireNonNull is safe because executorService is never cleared after the 239 * assignment above. 240 */ 241 requireNonNull(executorService); 242 runningTask = scheduler().schedule(delegate, executorService, task); 243 notifyStarted(); 244 } catch (Throwable t) { 245 restoreInterruptIfIsInterruptedException(t); 246 notifyFailed(t); 247 if (runningTask != null) { 248 // prevent the task from running if possible 249 runningTask.cancel(false); 250 } 251 } finally { 252 lock.unlock(); 253 } 254 }); 255 } 256 257 @Override 258 protected final void doStop() { 259 // Both requireNonNull calls are safe because doStop can run only after a successful doStart. 260 requireNonNull(runningTask); 261 requireNonNull(executorService); 262 runningTask.cancel(false); 263 executorService.execute( 264 () -> { 265 try { 266 lock.lock(); 267 try { 268 if (state() != State.STOPPING) { 269 // This means that the state has changed since we were scheduled. This implies 270 // that an execution of runOneIteration has thrown an exception and we have 271 // transitioned to a failed state, also this means that shutDown has already 272 // been called, so we do not want to call it again. 273 return; 274 } 275 shutDown(); 276 } finally { 277 lock.unlock(); 278 } 279 notifyStopped(); 280 } catch (Throwable t) { 281 restoreInterruptIfIsInterruptedException(t); 282 notifyFailed(t); 283 } 284 }); 285 } 286 287 @Override 288 public String toString() { 289 return AbstractScheduledService.this.toString(); 290 } 291 } 292 293 /** Constructor for use by subclasses. */ 294 protected AbstractScheduledService() {} 295 296 /** 297 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 298 * the service will transition to the {@link Service.State#FAILED} state and this method will no 299 * longer be called. 300 */ 301 protected abstract void runOneIteration() throws Exception; 302 303 /** 304 * Start the service. 305 * 306 * <p>By default this method does nothing. 307 */ 308 protected void startUp() throws Exception {} 309 310 /** 311 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 312 * 313 * <p>By default this method does nothing. 314 */ 315 protected void shutDown() throws Exception {} 316 317 /** 318 * Returns the {@link Scheduler} object used to configure this service. This method will only be 319 * called once. 320 */ 321 // TODO(cpovirk): @ForOverride 322 protected abstract Scheduler scheduler(); 323 324 /** 325 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 326 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 327 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 328 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 329 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 330 * instance. This method is guaranteed to only be called once. 331 * 332 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread pool 333 * that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, the 334 * pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 335 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 336 * fails}. 337 */ 338 protected ScheduledExecutorService executor() { 339 @WeakOuter 340 class ThreadFactoryImpl implements ThreadFactory { 341 @Override 342 public Thread newThread(Runnable runnable) { 343 return MoreExecutors.newThread(serviceName(), runnable); 344 } 345 } 346 final ScheduledExecutorService executor = 347 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 348 // Add a listener to shut down the executor after the service is stopped. This ensures that the 349 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 350 // Technically this listener is added after start() was called so it is a little gross, but it 351 // is called within doStart() so we know that the service cannot terminate or fail concurrently 352 // with adding this listener so it is impossible to miss an event that we are interested in. 353 addListener( 354 new Listener() { 355 @Override 356 public void terminated(State from) { 357 executor.shutdown(); 358 } 359 360 @Override 361 public void failed(State from, Throwable failure) { 362 executor.shutdown(); 363 } 364 }, 365 directExecutor()); 366 return executor; 367 } 368 369 /** 370 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 371 * debugging output. 372 * 373 * @since 14.0 374 */ 375 protected String serviceName() { 376 return getClass().getSimpleName(); 377 } 378 379 @Override 380 public String toString() { 381 return serviceName() + " [" + state() + "]"; 382 } 383 384 @Override 385 public final boolean isRunning() { 386 return delegate.isRunning(); 387 } 388 389 @Override 390 public final State state() { 391 return delegate.state(); 392 } 393 394 /** @since 13.0 */ 395 @Override 396 public final void addListener(Listener listener, Executor executor) { 397 delegate.addListener(listener, executor); 398 } 399 400 /** @since 14.0 */ 401 @Override 402 public final Throwable failureCause() { 403 return delegate.failureCause(); 404 } 405 406 /** @since 15.0 */ 407 @CanIgnoreReturnValue 408 @Override 409 public final Service startAsync() { 410 delegate.startAsync(); 411 return this; 412 } 413 414 /** @since 15.0 */ 415 @CanIgnoreReturnValue 416 @Override 417 public final Service stopAsync() { 418 delegate.stopAsync(); 419 return this; 420 } 421 422 /** @since 15.0 */ 423 @Override 424 public final void awaitRunning() { 425 delegate.awaitRunning(); 426 } 427 428 /** @since 15.0 */ 429 @Override 430 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 431 delegate.awaitRunning(timeout, unit); 432 } 433 434 /** @since 15.0 */ 435 @Override 436 public final void awaitTerminated() { 437 delegate.awaitTerminated(); 438 } 439 440 /** @since 15.0 */ 441 @Override 442 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 443 delegate.awaitTerminated(timeout, unit); 444 } 445 446 interface Cancellable { 447 void cancel(boolean mayInterruptIfRunning); 448 449 boolean isCancelled(); 450 } 451 452 private static final class FutureAsCancellable implements Cancellable { 453 private final Future<?> delegate; 454 455 FutureAsCancellable(Future<?> delegate) { 456 this.delegate = delegate; 457 } 458 459 @Override 460 public void cancel(boolean mayInterruptIfRunning) { 461 delegate.cancel(mayInterruptIfRunning); 462 } 463 464 @Override 465 public boolean isCancelled() { 466 return delegate.isCancelled(); 467 } 468 } 469 470 /** 471 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 472 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 473 * cancelled, the {@link #getNextSchedule} method will be called. 474 * 475 * @author Luke Sandberg 476 * @since 11.0 477 */ 478 public abstract static class CustomScheduler extends Scheduler { 479 480 /** A callable class that can reschedule itself using a {@link CustomScheduler}. */ 481 private final class ReschedulableCallable implements Callable<@Nullable Void> { 482 483 /** The underlying task. */ 484 private final Runnable wrappedRunnable; 485 486 /** The executor on which this Callable will be scheduled. */ 487 private final ScheduledExecutorService executor; 488 489 /** 490 * The service that is managing this callable. This is used so that failure can be reported 491 * properly. 492 */ 493 /* 494 * This reference is part of a reference cycle, which is typically something we want to avoid 495 * under j2objc -- but it is not detected by our j2objc cycle test. The cycle: 496 * 497 * - CustomScheduler.service contains an instance of ServiceDelegate. (It needs it so that it 498 * can call notifyFailed.) 499 * 500 * - ServiceDelegate.runningTask contains an instance of ReschedulableCallable (at least in 501 * the case that the service is using CustomScheduler). (It needs it so that it can cancel 502 * the task and detect whether it has been cancelled.) 503 * 504 * - ReschedulableCallable has a reference back to its enclosing CustomScheduler. (It needs it 505 * so that it can call getNextSchedule). 506 * 507 * Maybe there is a way to avoid this cycle. But we think the cycle is safe enough to ignore: 508 * Each task is retained for only as long as it is running -- so it's retained only as long as 509 * it would already be retained by the underlying executor. 510 * 511 * If the cycle test starts reporting this cycle in the future, we should add an entry to 512 * cycle_suppress_list.txt. 513 */ 514 private final AbstractService service; 515 516 /** 517 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 518 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 519 * ensure that it is assigned atomically with being scheduled. 520 */ 521 private final ReentrantLock lock = new ReentrantLock(); 522 523 /** The future that represents the next execution of this task. */ 524 @GuardedBy("lock") 525 @CheckForNull 526 private SupplantableFuture cancellationDelegate; 527 528 ReschedulableCallable( 529 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 530 this.wrappedRunnable = runnable; 531 this.executor = executor; 532 this.service = service; 533 } 534 535 @Override 536 @CheckForNull 537 public Void call() throws Exception { 538 wrappedRunnable.run(); 539 reschedule(); 540 return null; 541 } 542 543 /** 544 * Atomically reschedules this task and assigns the new future to {@link 545 * #cancellationDelegate}. 546 */ 547 @CanIgnoreReturnValue 548 public Cancellable reschedule() { 549 // invoke the callback outside the lock, prevents some shenanigans. 550 Schedule schedule; 551 try { 552 schedule = CustomScheduler.this.getNextSchedule(); 553 } catch (Throwable t) { 554 restoreInterruptIfIsInterruptedException(t); 555 service.notifyFailed(t); 556 return new FutureAsCancellable(immediateCancelledFuture()); 557 } 558 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 559 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 560 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 561 // correct order. 562 Throwable scheduleFailure = null; 563 Cancellable toReturn; 564 lock.lock(); 565 try { 566 toReturn = initializeOrUpdateCancellationDelegate(schedule); 567 } catch (RuntimeException | Error e) { 568 // If an exception is thrown by the subclass then we need to make sure that the service 569 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 570 // because the service does not monitor the state of the future so if the exception is not 571 // caught and forwarded to the service the task would stop executing but the service would 572 // have no idea. 573 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 574 // the AbstractService could monitor the future directly. Rescheduling is still hard... 575 // but it would help with some of these lock ordering issues. 576 scheduleFailure = e; 577 toReturn = new FutureAsCancellable(immediateCancelledFuture()); 578 } finally { 579 lock.unlock(); 580 } 581 // Call notifyFailed outside the lock to avoid lock ordering issues. 582 if (scheduleFailure != null) { 583 service.notifyFailed(scheduleFailure); 584 } 585 return toReturn; 586 } 587 588 @GuardedBy("lock") 589 /* 590 * The GuardedBy checker warns us that we're not holding cancellationDelegate.lock. But in 591 * fact we are holding it because it is the same as this.lock, which we know we are holding, 592 * thanks to @GuardedBy above. (cancellationDelegate.lock is initialized to this.lock in the 593 * call to `new SupplantableFuture` below.) 594 */ 595 @SuppressWarnings("GuardedBy") 596 private Cancellable initializeOrUpdateCancellationDelegate(Schedule schedule) { 597 if (cancellationDelegate == null) { 598 return cancellationDelegate = new SupplantableFuture(lock, submitToExecutor(schedule)); 599 } 600 if (!cancellationDelegate.currentFuture.isCancelled()) { 601 cancellationDelegate.currentFuture = submitToExecutor(schedule); 602 } 603 return cancellationDelegate; 604 } 605 606 private ScheduledFuture<@Nullable Void> submitToExecutor(Schedule schedule) { 607 return executor.schedule(this, schedule.delay, schedule.unit); 608 } 609 } 610 611 /** 612 * Contains the most recently submitted {@code Future}, which may be cancelled or updated, 613 * always under a lock. 614 */ 615 private static final class SupplantableFuture implements Cancellable { 616 private final ReentrantLock lock; 617 618 @GuardedBy("lock") 619 private Future<@Nullable Void> currentFuture; 620 621 SupplantableFuture(ReentrantLock lock, Future<@Nullable Void> currentFuture) { 622 this.lock = lock; 623 this.currentFuture = currentFuture; 624 } 625 626 @Override 627 public void cancel(boolean mayInterruptIfRunning) { 628 /* 629 * Lock to ensure that a task cannot be rescheduled while a cancel is ongoing. 630 * 631 * In theory, cancel() could execute arbitrary listeners -- bad to do while holding a lock. 632 * However, we don't expose currentFuture to users, so they can't attach listeners. And the 633 * Future might not even be a ListenableFuture, just a plain Future. That said, similar 634 * problems can exist with methods like FutureTask.done(), not to mention slow calls to 635 * Thread.interrupt() (as discussed in InterruptibleTask). At the end of the day, it's 636 * unlikely that cancel() will be slow, so we can probably get away with calling it while 637 * holding a lock. Still, it would be nice to avoid somehow. 638 */ 639 lock.lock(); 640 try { 641 currentFuture.cancel(mayInterruptIfRunning); 642 } finally { 643 lock.unlock(); 644 } 645 } 646 647 @Override 648 public boolean isCancelled() { 649 lock.lock(); 650 try { 651 return currentFuture.isCancelled(); 652 } finally { 653 lock.unlock(); 654 } 655 } 656 } 657 658 @Override 659 final Cancellable schedule( 660 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 661 return new ReschedulableCallable(service, executor, runnable).reschedule(); 662 } 663 664 /** 665 * A value object that represents an absolute delay until a task should be invoked. 666 * 667 * @author Luke Sandberg 668 * @since 11.0 669 */ 670 protected static final class Schedule { 671 672 private final long delay; 673 private final TimeUnit unit; 674 675 /** 676 * @param delay the time from now to delay execution 677 * @param unit the time unit of the delay parameter 678 */ 679 public Schedule(long delay, TimeUnit unit) { 680 this.delay = delay; 681 this.unit = checkNotNull(unit); 682 } 683 } 684 685 /** 686 * Calculates the time at which to next invoke the task. 687 * 688 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 689 * on the same thread as the previous execution of {@link 690 * AbstractScheduledService#runOneIteration}. 691 * 692 * @return a schedule that defines the delay before the next execution. 693 */ 694 // TODO(cpovirk): @ForOverride 695 protected abstract Schedule getNextSchedule() throws Exception; 696 } 697}