001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.Futures.immediateCancelledFuture; 020import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 021import static java.util.Objects.requireNonNull; 022 023import com.google.common.annotations.GwtIncompatible; 024import com.google.common.base.Supplier; 025import com.google.errorprone.annotations.CanIgnoreReturnValue; 026import com.google.errorprone.annotations.concurrent.GuardedBy; 027import com.google.j2objc.annotations.WeakOuter; 028import java.util.concurrent.Callable; 029import java.util.concurrent.Executor; 030import java.util.concurrent.Executors; 031import java.util.concurrent.Future; 032import java.util.concurrent.ScheduledExecutorService; 033import java.util.concurrent.ScheduledFuture; 034import java.util.concurrent.ThreadFactory; 035import java.util.concurrent.TimeUnit; 036import java.util.concurrent.TimeoutException; 037import java.util.concurrent.locks.ReentrantLock; 038import java.util.logging.Level; 039import java.util.logging.Logger; 040import javax.annotation.CheckForNull; 041import org.checkerframework.checker.nullness.qual.Nullable; 042 043/** 044 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 045 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 046 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 047 * 048 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 049 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 050 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 051 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 052 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 053 * 054 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 055 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 056 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 057 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 058 * shared state without additional synchronization necessary for visibility to later executions of 059 * the life cycle methods. 060 * 061 * <h3>Usage Example</h3> 062 * 063 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 064 * rate limit itself. 065 * 066 * <pre>{@code 067 * class CrawlingService extends AbstractScheduledService { 068 * private Set<Uri> visited; 069 * private Queue<Uri> toCrawl; 070 * protected void startUp() throws Exception { 071 * toCrawl = readStartingUris(); 072 * } 073 * 074 * protected void runOneIteration() throws Exception { 075 * Uri uri = toCrawl.remove(); 076 * Collection<Uri> newUris = crawl(uri); 077 * visited.add(uri); 078 * for (Uri newUri : newUris) { 079 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 080 * } 081 * } 082 * 083 * protected void shutDown() throws Exception { 084 * saveUris(toCrawl); 085 * } 086 * 087 * protected Scheduler scheduler() { 088 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 089 * } 090 * } 091 * }</pre> 092 * 093 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 094 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 095 * rate limit the number of queries we perform. 096 * 097 * @author Luke Sandberg 098 * @since 11.0 099 */ 100@GwtIncompatible 101@ElementTypesAreNonnullByDefault 102public abstract class AbstractScheduledService implements Service { 103 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 104 105 /** 106 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 107 * task. 108 * 109 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 110 * methods, these provide {@link Scheduler} instances for the common use case of running the 111 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 112 * CustomScheduler}. 113 * 114 * @author Luke Sandberg 115 * @since 11.0 116 */ 117 public abstract static class Scheduler { 118 /** 119 * Returns a {@link Scheduler} that schedules the task using the {@link 120 * ScheduledExecutorService#scheduleWithFixedDelay} method. 121 * 122 * @param initialDelay the time to delay first execution 123 * @param delay the delay between the termination of one execution and the commencement of the 124 * next 125 * @param unit the time unit of the initialDelay and delay parameters 126 */ 127 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 128 public static Scheduler newFixedDelaySchedule( 129 final long initialDelay, final long delay, final TimeUnit unit) { 130 checkNotNull(unit); 131 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 132 return new Scheduler() { 133 @Override 134 public Cancellable schedule( 135 AbstractService service, ScheduledExecutorService executor, Runnable task) { 136 return new FutureAsCancellable( 137 executor.scheduleWithFixedDelay(task, initialDelay, delay, unit)); 138 } 139 }; 140 } 141 142 /** 143 * Returns a {@link Scheduler} that schedules the task using the {@link 144 * ScheduledExecutorService#scheduleAtFixedRate} method. 145 * 146 * @param initialDelay the time to delay first execution 147 * @param period the period between successive executions of the task 148 * @param unit the time unit of the initialDelay and period parameters 149 */ 150 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 151 public static Scheduler newFixedRateSchedule( 152 final long initialDelay, final long period, final TimeUnit unit) { 153 checkNotNull(unit); 154 checkArgument(period > 0, "period must be > 0, found %s", period); 155 return new Scheduler() { 156 @Override 157 public Cancellable schedule( 158 AbstractService service, ScheduledExecutorService executor, Runnable task) { 159 return new FutureAsCancellable( 160 executor.scheduleAtFixedRate(task, initialDelay, period, unit)); 161 } 162 }; 163 } 164 165 /** Schedules the task to run on the provided executor on behalf of the service. */ 166 abstract Cancellable schedule( 167 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 168 169 private Scheduler() {} 170 } 171 172 /* use AbstractService for state management */ 173 private final AbstractService delegate = new ServiceDelegate(); 174 175 @WeakOuter 176 private final class ServiceDelegate extends AbstractService { 177 178 // A handle to the running task so that we can stop it when a shutdown has been requested. 179 // These two fields are volatile because their values will be accessed from multiple threads. 180 @CheckForNull private volatile Cancellable runningTask; 181 @CheckForNull private volatile ScheduledExecutorService executorService; 182 183 // This lock protects the task so we can ensure that none of the template methods (startUp, 184 // shutDown or runOneIteration) run concurrently with one another. 185 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 186 // lock. 187 private final ReentrantLock lock = new ReentrantLock(); 188 189 @WeakOuter 190 class Task implements Runnable { 191 @Override 192 public void run() { 193 lock.lock(); 194 try { 195 /* 196 * requireNonNull is safe because Task isn't run (or at least it doesn't succeed in taking 197 * the lock) until after it's scheduled and the runningTask field is set. 198 */ 199 if (requireNonNull(runningTask).isCancelled()) { 200 // task may have been cancelled while blocked on the lock. 201 return; 202 } 203 AbstractScheduledService.this.runOneIteration(); 204 } catch (Throwable t) { 205 try { 206 shutDown(); 207 } catch (Exception ignored) { 208 logger.log( 209 Level.WARNING, 210 "Error while attempting to shut down the service after failure.", 211 ignored); 212 } 213 notifyFailed(t); 214 // requireNonNull is safe now, just as it was above. 215 requireNonNull(runningTask).cancel(false); // prevent future invocations. 216 } finally { 217 lock.unlock(); 218 } 219 } 220 } 221 222 private final Runnable task = new Task(); 223 224 @Override 225 protected final void doStart() { 226 executorService = 227 MoreExecutors.renamingDecorator( 228 executor(), 229 new Supplier<String>() { 230 @Override 231 public String get() { 232 return serviceName() + " " + state(); 233 } 234 }); 235 executorService.execute( 236 new Runnable() { 237 @Override 238 public void run() { 239 lock.lock(); 240 try { 241 startUp(); 242 runningTask = scheduler().schedule(delegate, executorService, task); 243 notifyStarted(); 244 } catch (Throwable t) { 245 notifyFailed(t); 246 if (runningTask != null) { 247 // prevent the task from running if possible 248 runningTask.cancel(false); 249 } 250 } finally { 251 lock.unlock(); 252 } 253 } 254 }); 255 } 256 257 @Override 258 protected final void doStop() { 259 // Both requireNonNull calls are safe because doStop can run only after a successful doStart. 260 requireNonNull(runningTask); 261 requireNonNull(executorService); 262 runningTask.cancel(false); 263 executorService.execute( 264 new Runnable() { 265 @Override 266 public void run() { 267 try { 268 lock.lock(); 269 try { 270 if (state() != State.STOPPING) { 271 // This means that the state has changed since we were scheduled. This implies 272 // that an execution of runOneIteration has thrown an exception and we have 273 // transitioned to a failed state, also this means that shutDown has already 274 // been called, so we do not want to call it again. 275 return; 276 } 277 shutDown(); 278 } finally { 279 lock.unlock(); 280 } 281 notifyStopped(); 282 } catch (Throwable t) { 283 notifyFailed(t); 284 } 285 } 286 }); 287 } 288 289 @Override 290 public String toString() { 291 return AbstractScheduledService.this.toString(); 292 } 293 } 294 295 /** Constructor for use by subclasses. */ 296 protected AbstractScheduledService() {} 297 298 /** 299 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 300 * the service will transition to the {@link Service.State#FAILED} state and this method will no 301 * longer be called. 302 */ 303 protected abstract void runOneIteration() throws Exception; 304 305 /** 306 * Start the service. 307 * 308 * <p>By default this method does nothing. 309 */ 310 protected void startUp() throws Exception {} 311 312 /** 313 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 314 * 315 * <p>By default this method does nothing. 316 */ 317 protected void shutDown() throws Exception {} 318 319 /** 320 * Returns the {@link Scheduler} object used to configure this service. This method will only be 321 * called once. 322 */ 323 // TODO(cpovirk): @ForOverride 324 protected abstract Scheduler scheduler(); 325 326 /** 327 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 328 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 329 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 330 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 331 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 332 * instance. This method is guaranteed to only be called once. 333 * 334 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread pool 335 * that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, the 336 * pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 337 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 338 * fails}. 339 */ 340 protected ScheduledExecutorService executor() { 341 @WeakOuter 342 class ThreadFactoryImpl implements ThreadFactory { 343 @Override 344 public Thread newThread(Runnable runnable) { 345 return MoreExecutors.newThread(serviceName(), runnable); 346 } 347 } 348 final ScheduledExecutorService executor = 349 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 350 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 351 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 352 // Technically this listener is added after start() was called so it is a little gross, but it 353 // is called within doStart() so we know that the service cannot terminate or fail concurrently 354 // with adding this listener so it is impossible to miss an event that we are interested in. 355 addListener( 356 new Listener() { 357 @Override 358 public void terminated(State from) { 359 executor.shutdown(); 360 } 361 362 @Override 363 public void failed(State from, Throwable failure) { 364 executor.shutdown(); 365 } 366 }, 367 directExecutor()); 368 return executor; 369 } 370 371 /** 372 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 373 * debugging output. 374 * 375 * @since 14.0 376 */ 377 protected String serviceName() { 378 return getClass().getSimpleName(); 379 } 380 381 @Override 382 public String toString() { 383 return serviceName() + " [" + state() + "]"; 384 } 385 386 @Override 387 public final boolean isRunning() { 388 return delegate.isRunning(); 389 } 390 391 @Override 392 public final State state() { 393 return delegate.state(); 394 } 395 396 /** @since 13.0 */ 397 @Override 398 public final void addListener(Listener listener, Executor executor) { 399 delegate.addListener(listener, executor); 400 } 401 402 /** @since 14.0 */ 403 @Override 404 public final Throwable failureCause() { 405 return delegate.failureCause(); 406 } 407 408 /** @since 15.0 */ 409 @CanIgnoreReturnValue 410 @Override 411 public final Service startAsync() { 412 delegate.startAsync(); 413 return this; 414 } 415 416 /** @since 15.0 */ 417 @CanIgnoreReturnValue 418 @Override 419 public final Service stopAsync() { 420 delegate.stopAsync(); 421 return this; 422 } 423 424 /** @since 15.0 */ 425 @Override 426 public final void awaitRunning() { 427 delegate.awaitRunning(); 428 } 429 430 /** @since 15.0 */ 431 @Override 432 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 433 delegate.awaitRunning(timeout, unit); 434 } 435 436 /** @since 15.0 */ 437 @Override 438 public final void awaitTerminated() { 439 delegate.awaitTerminated(); 440 } 441 442 /** @since 15.0 */ 443 @Override 444 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 445 delegate.awaitTerminated(timeout, unit); 446 } 447 448 interface Cancellable { 449 void cancel(boolean mayInterruptIfRunning); 450 451 boolean isCancelled(); 452 } 453 454 private static final class FutureAsCancellable implements Cancellable { 455 private final Future<?> delegate; 456 457 FutureAsCancellable(Future<?> delegate) { 458 this.delegate = delegate; 459 } 460 461 @Override 462 public void cancel(boolean mayInterruptIfRunning) { 463 delegate.cancel(mayInterruptIfRunning); 464 } 465 466 @Override 467 public boolean isCancelled() { 468 return delegate.isCancelled(); 469 } 470 } 471 472 /** 473 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 474 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 475 * cancelled, the {@link #getNextSchedule} method will be called. 476 * 477 * @author Luke Sandberg 478 * @since 11.0 479 */ 480 public abstract static class CustomScheduler extends Scheduler { 481 482 /** A callable class that can reschedule itself using a {@link CustomScheduler}. */ 483 private final class ReschedulableCallable implements Callable<@Nullable Void> { 484 485 /** The underlying task. */ 486 private final Runnable wrappedRunnable; 487 488 /** The executor on which this Callable will be scheduled. */ 489 private final ScheduledExecutorService executor; 490 491 /** 492 * The service that is managing this callable. This is used so that failure can be reported 493 * properly. 494 */ 495 /* 496 * This reference is part of a reference cycle, which is typically something we want to avoid 497 * under j2objc -- but it is not detected by our j2objc cycle test. The cycle: 498 * 499 * - CustomScheduler.service contains an instance of ServiceDelegate. (It needs it so that it 500 * can call notifyFailed.) 501 * 502 * - ServiceDelegate.runningTask contains an instance of ReschedulableCallable (at least in 503 * the case that the service is using CustomScheduler). (It needs it so that it can cancel 504 * the task and detect whether it has been cancelled.) 505 * 506 * - ReschedulableCallable has a reference back to its enclosing CustomScheduler. (It needs it 507 * so that it can call getNextSchedule). 508 * 509 * Maybe there is a way to avoid this cycle. But we think the cycle is safe enough to ignore: 510 * Each task is retained for only as long as it is running -- so it's retained only as long as 511 * it would already be retained by the underlying executor. 512 * 513 * If the cycle test starts reporting this cycle in the future, we should add an entry to 514 * cycle_suppress_list.txt. 515 */ 516 private final AbstractService service; 517 518 /** 519 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 520 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 521 * ensure that it is assigned atomically with being scheduled. 522 */ 523 private final ReentrantLock lock = new ReentrantLock(); 524 525 /** The future that represents the next execution of this task. */ 526 @GuardedBy("lock") 527 @CheckForNull 528 private SupplantableFuture cancellationDelegate; 529 530 ReschedulableCallable( 531 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 532 this.wrappedRunnable = runnable; 533 this.executor = executor; 534 this.service = service; 535 } 536 537 @Override 538 @CheckForNull 539 public Void call() throws Exception { 540 wrappedRunnable.run(); 541 reschedule(); 542 return null; 543 } 544 545 /** 546 * Atomically reschedules this task and assigns the new future to {@link 547 * #cancellationDelegate}. 548 */ 549 @CanIgnoreReturnValue 550 public Cancellable reschedule() { 551 // invoke the callback outside the lock, prevents some shenanigans. 552 Schedule schedule; 553 try { 554 schedule = CustomScheduler.this.getNextSchedule(); 555 } catch (Throwable t) { 556 service.notifyFailed(t); 557 return new FutureAsCancellable(immediateCancelledFuture()); 558 } 559 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 560 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 561 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 562 // correct order. 563 Throwable scheduleFailure = null; 564 Cancellable toReturn; 565 lock.lock(); 566 try { 567 toReturn = initializeOrUpdateCancellationDelegate(schedule); 568 } catch (Throwable e) { 569 // If an exception is thrown by the subclass then we need to make sure that the service 570 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 571 // because the service does not monitor the state of the future so if the exception is not 572 // caught and forwarded to the service the task would stop executing but the service would 573 // have no idea. 574 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 575 // the AbstractService could monitor the future directly. Rescheduling is still hard... 576 // but it would help with some of these lock ordering issues. 577 scheduleFailure = e; 578 toReturn = new FutureAsCancellable(immediateCancelledFuture()); 579 } finally { 580 lock.unlock(); 581 } 582 // Call notifyFailed outside the lock to avoid lock ordering issues. 583 if (scheduleFailure != null) { 584 service.notifyFailed(scheduleFailure); 585 } 586 return toReturn; 587 } 588 589 @GuardedBy("lock") 590 /* 591 * The GuardedBy checker warns us that we're not holding cancellationDelegate.lock. But in 592 * fact we are holding it because it is the same as this.lock, which we know we are holding, 593 * thanks to @GuardedBy above. (cancellationDelegate.lock is initialized to this.lock in the 594 * call to `new SupplantableFuture` below.) 595 */ 596 @SuppressWarnings("GuardedBy") 597 private Cancellable initializeOrUpdateCancellationDelegate(Schedule schedule) { 598 if (cancellationDelegate == null) { 599 return cancellationDelegate = new SupplantableFuture(lock, submitToExecutor(schedule)); 600 } 601 if (!cancellationDelegate.currentFuture.isCancelled()) { 602 cancellationDelegate.currentFuture = submitToExecutor(schedule); 603 } 604 return cancellationDelegate; 605 } 606 607 private ScheduledFuture<@Nullable Void> submitToExecutor(Schedule schedule) { 608 return executor.schedule(this, schedule.delay, schedule.unit); 609 } 610 } 611 612 /** 613 * Contains the most recently submitted {@code Future}, which may be cancelled or updated, 614 * always under a lock. 615 */ 616 private static final class SupplantableFuture implements Cancellable { 617 private final ReentrantLock lock; 618 619 @GuardedBy("lock") 620 private Future<@Nullable Void> currentFuture; 621 622 SupplantableFuture(ReentrantLock lock, Future<@Nullable Void> currentFuture) { 623 this.lock = lock; 624 this.currentFuture = currentFuture; 625 } 626 627 @Override 628 public void cancel(boolean mayInterruptIfRunning) { 629 /* 630 * Lock to ensure that a task cannot be rescheduled while a cancel is ongoing. 631 * 632 * In theory, cancel() could execute arbitrary listeners -- bad to do while holding a lock. 633 * However, we don't expose currentFuture to users, so they can't attach listeners. And the 634 * Future might not even be a ListenableFuture, just a plain Future. That said, similar 635 * problems can exist with methods like FutureTask.done(), not to mention slow calls to 636 * Thread.interrupt() (as discussed in InterruptibleTask). At the end of the day, it's 637 * unlikely that cancel() will be slow, so we can probably get away with calling it while 638 * holding a lock. Still, it would be nice to avoid somehow. 639 */ 640 lock.lock(); 641 try { 642 currentFuture.cancel(mayInterruptIfRunning); 643 } finally { 644 lock.unlock(); 645 } 646 } 647 648 @Override 649 public boolean isCancelled() { 650 lock.lock(); 651 try { 652 return currentFuture.isCancelled(); 653 } finally { 654 lock.unlock(); 655 } 656 } 657 } 658 659 @Override 660 final Cancellable schedule( 661 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 662 return new ReschedulableCallable(service, executor, runnable).reschedule(); 663 } 664 665 /** 666 * A value object that represents an absolute delay until a task should be invoked. 667 * 668 * @author Luke Sandberg 669 * @since 11.0 670 */ 671 protected static final class Schedule { 672 673 private final long delay; 674 private final TimeUnit unit; 675 676 /** 677 * @param delay the time from now to delay execution 678 * @param unit the time unit of the delay parameter 679 */ 680 public Schedule(long delay, TimeUnit unit) { 681 this.delay = delay; 682 this.unit = checkNotNull(unit); 683 } 684 } 685 686 /** 687 * Calculates the time at which to next invoke the task. 688 * 689 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 690 * on the same thread as the previous execution of {@link 691 * AbstractScheduledService#runOneIteration}. 692 * 693 * @return a schedule that defines the delay before the next execution. 694 */ 695 // TODO(cpovirk): @ForOverride 696 protected abstract Schedule getNextSchedule() throws Exception; 697 } 698}