001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020 021import com.google.common.annotations.Beta; 022import com.google.common.annotations.GwtIncompatible; 023import com.google.common.base.Supplier; 024import com.google.errorprone.annotations.CanIgnoreReturnValue; 025import com.google.errorprone.annotations.concurrent.GuardedBy; 026import com.google.j2objc.annotations.WeakOuter; 027import java.util.concurrent.Callable; 028import java.util.concurrent.Executor; 029import java.util.concurrent.Executors; 030import java.util.concurrent.Future; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.TimeoutException; 035import java.util.concurrent.locks.ReentrantLock; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038import org.checkerframework.checker.nullness.compatqual.MonotonicNonNullDecl; 039import org.checkerframework.checker.nullness.compatqual.NullableDecl; 040 041/** 042 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 043 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 044 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 045 * 046 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 047 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 048 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 049 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 050 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 051 * 052 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 053 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 054 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 055 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 056 * shared state without additional synchronization necessary for visibility to later executions of 057 * the life cycle methods. 058 * 059 * <h3>Usage Example</h3> 060 * 061 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 062 * rate limit itself. 063 * 064 * <pre>{@code 065 * class CrawlingService extends AbstractScheduledService { 066 * private Set<Uri> visited; 067 * private Queue<Uri> toCrawl; 068 * protected void startUp() throws Exception { 069 * toCrawl = readStartingUris(); 070 * } 071 * 072 * protected void runOneIteration() throws Exception { 073 * Uri uri = toCrawl.remove(); 074 * Collection<Uri> newUris = crawl(uri); 075 * visited.add(uri); 076 * for (Uri newUri : newUris) { 077 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 078 * } 079 * } 080 * 081 * protected void shutDown() throws Exception { 082 * saveUris(toCrawl); 083 * } 084 * 085 * protected Scheduler scheduler() { 086 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 087 * } 088 * } 089 * }</pre> 090 * 091 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 092 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 093 * rate limit the number of queries we perform. 094 * 095 * @author Luke Sandberg 096 * @since 11.0 097 */ 098@Beta 099@GwtIncompatible 100public abstract class AbstractScheduledService implements Service { 101 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 102 103 /** 104 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 105 * task. 106 * 107 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 108 * methods, these provide {@link Scheduler} instances for the common use case of running the 109 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 110 * CustomScheduler}. 111 * 112 * @author Luke Sandberg 113 * @since 11.0 114 */ 115 public abstract static class Scheduler { 116 /** 117 * Returns a {@link Scheduler} that schedules the task using the {@link 118 * ScheduledExecutorService#scheduleWithFixedDelay} method. 119 * 120 * @param initialDelay the time to delay first execution 121 * @param delay the delay between the termination of one execution and the commencement of the 122 * next 123 * @param unit the time unit of the initialDelay and delay parameters 124 */ 125 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 126 public static Scheduler newFixedDelaySchedule( 127 final long initialDelay, final long delay, final TimeUnit unit) { 128 checkNotNull(unit); 129 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 130 return new Scheduler() { 131 @Override 132 public Future<?> schedule( 133 AbstractService service, ScheduledExecutorService executor, Runnable task) { 134 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 135 } 136 }; 137 } 138 139 /** 140 * Returns a {@link Scheduler} that schedules the task using the {@link 141 * ScheduledExecutorService#scheduleAtFixedRate} method. 142 * 143 * @param initialDelay the time to delay first execution 144 * @param period the period between successive executions of the task 145 * @param unit the time unit of the initialDelay and period parameters 146 */ 147 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 148 public static Scheduler newFixedRateSchedule( 149 final long initialDelay, final long period, final TimeUnit unit) { 150 checkNotNull(unit); 151 checkArgument(period > 0, "period must be > 0, found %s", period); 152 return new Scheduler() { 153 @Override 154 public Future<?> schedule( 155 AbstractService service, ScheduledExecutorService executor, Runnable task) { 156 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 157 } 158 }; 159 } 160 161 /** Schedules the task to run on the provided executor on behalf of the service. */ 162 abstract Future<?> schedule( 163 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 164 165 private Scheduler() {} 166 } 167 168 /* use AbstractService for state management */ 169 private final AbstractService delegate = new ServiceDelegate(); 170 171 @WeakOuter 172 private final class ServiceDelegate extends AbstractService { 173 174 // A handle to the running task so that we can stop it when a shutdown has been requested. 175 // These two fields are volatile because their values will be accessed from multiple threads. 176 @MonotonicNonNullDecl private volatile Future<?> runningTask; 177 @MonotonicNonNullDecl private volatile ScheduledExecutorService executorService; 178 179 // This lock protects the task so we can ensure that none of the template methods (startUp, 180 // shutDown or runOneIteration) run concurrently with one another. 181 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 182 // lock. 183 private final ReentrantLock lock = new ReentrantLock(); 184 185 @WeakOuter 186 class Task implements Runnable { 187 @Override 188 public void run() { 189 lock.lock(); 190 try { 191 if (runningTask.isCancelled()) { 192 // task may have been cancelled while blocked on the lock. 193 return; 194 } 195 AbstractScheduledService.this.runOneIteration(); 196 } catch (Throwable t) { 197 try { 198 shutDown(); 199 } catch (Exception ignored) { 200 logger.log( 201 Level.WARNING, 202 "Error while attempting to shut down the service after failure.", 203 ignored); 204 } 205 notifyFailed(t); 206 runningTask.cancel(false); // prevent future invocations. 207 } finally { 208 lock.unlock(); 209 } 210 } 211 } 212 213 private final Runnable task = new Task(); 214 215 @Override 216 protected final void doStart() { 217 executorService = 218 MoreExecutors.renamingDecorator( 219 executor(), 220 new Supplier<String>() { 221 @Override 222 public String get() { 223 return serviceName() + " " + state(); 224 } 225 }); 226 executorService.execute( 227 new Runnable() { 228 @Override 229 public void run() { 230 lock.lock(); 231 try { 232 startUp(); 233 runningTask = scheduler().schedule(delegate, executorService, task); 234 notifyStarted(); 235 } catch (Throwable t) { 236 notifyFailed(t); 237 if (runningTask != null) { 238 // prevent the task from running if possible 239 runningTask.cancel(false); 240 } 241 } finally { 242 lock.unlock(); 243 } 244 } 245 }); 246 } 247 248 @Override 249 protected final void doStop() { 250 runningTask.cancel(false); 251 executorService.execute( 252 new Runnable() { 253 @Override 254 public void run() { 255 try { 256 lock.lock(); 257 try { 258 if (state() != State.STOPPING) { 259 // This means that the state has changed since we were scheduled. This implies 260 // that an execution of runOneIteration has thrown an exception and we have 261 // transitioned to a failed state, also this means that shutDown has already 262 // been called, so we do not want to call it again. 263 return; 264 } 265 shutDown(); 266 } finally { 267 lock.unlock(); 268 } 269 notifyStopped(); 270 } catch (Throwable t) { 271 notifyFailed(t); 272 } 273 } 274 }); 275 } 276 277 @Override 278 public String toString() { 279 return AbstractScheduledService.this.toString(); 280 } 281 } 282 283 /** Constructor for use by subclasses. */ 284 protected AbstractScheduledService() {} 285 286 /** 287 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 288 * the service will transition to the {@link Service.State#FAILED} state and this method will no 289 * longer be called. 290 */ 291 protected abstract void runOneIteration() throws Exception; 292 293 /** 294 * Start the service. 295 * 296 * <p>By default this method does nothing. 297 */ 298 protected void startUp() throws Exception {} 299 300 /** 301 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 302 * 303 * <p>By default this method does nothing. 304 */ 305 protected void shutDown() throws Exception {} 306 307 /** 308 * Returns the {@link Scheduler} object used to configure this service. This method will only be 309 * called once. 310 */ 311 // TODO(cpovirk): @ForOverride 312 protected abstract Scheduler scheduler(); 313 314 /** 315 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 316 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 317 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 318 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 319 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 320 * instance. This method is guaranteed to only be called once. 321 * 322 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 323 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, 324 * the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 325 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 326 * fails}. 327 */ 328 protected ScheduledExecutorService executor() { 329 @WeakOuter 330 class ThreadFactoryImpl implements ThreadFactory { 331 @Override 332 public Thread newThread(Runnable runnable) { 333 return MoreExecutors.newThread(serviceName(), runnable); 334 } 335 } 336 final ScheduledExecutorService executor = 337 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 338 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 339 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 340 // Technically this listener is added after start() was called so it is a little gross, but it 341 // is called within doStart() so we know that the service cannot terminate or fail concurrently 342 // with adding this listener so it is impossible to miss an event that we are interested in. 343 addListener( 344 new Listener() { 345 @Override 346 public void terminated(State from) { 347 executor.shutdown(); 348 } 349 350 @Override 351 public void failed(State from, Throwable failure) { 352 executor.shutdown(); 353 } 354 }, 355 directExecutor()); 356 return executor; 357 } 358 359 /** 360 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 361 * debugging output. 362 * 363 * @since 14.0 364 */ 365 protected String serviceName() { 366 return getClass().getSimpleName(); 367 } 368 369 @Override 370 public String toString() { 371 return serviceName() + " [" + state() + "]"; 372 } 373 374 @Override 375 public final boolean isRunning() { 376 return delegate.isRunning(); 377 } 378 379 @Override 380 public final State state() { 381 return delegate.state(); 382 } 383 384 /** @since 13.0 */ 385 @Override 386 public final void addListener(Listener listener, Executor executor) { 387 delegate.addListener(listener, executor); 388 } 389 390 /** @since 14.0 */ 391 @Override 392 public final Throwable failureCause() { 393 return delegate.failureCause(); 394 } 395 396 /** @since 15.0 */ 397 @CanIgnoreReturnValue 398 @Override 399 public final Service startAsync() { 400 delegate.startAsync(); 401 return this; 402 } 403 404 /** @since 15.0 */ 405 @CanIgnoreReturnValue 406 @Override 407 public final Service stopAsync() { 408 delegate.stopAsync(); 409 return this; 410 } 411 412 /** @since 15.0 */ 413 @Override 414 public final void awaitRunning() { 415 delegate.awaitRunning(); 416 } 417 418 /** @since 15.0 */ 419 @Override 420 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 421 delegate.awaitRunning(timeout, unit); 422 } 423 424 /** @since 15.0 */ 425 @Override 426 public final void awaitTerminated() { 427 delegate.awaitTerminated(); 428 } 429 430 /** @since 15.0 */ 431 @Override 432 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 433 delegate.awaitTerminated(timeout, unit); 434 } 435 436 /** 437 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 438 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 439 * cancelled, the {@link #getNextSchedule} method will be called. 440 * 441 * @author Luke Sandberg 442 * @since 11.0 443 */ 444 @Beta 445 public abstract static class CustomScheduler extends Scheduler { 446 447 /** A callable class that can reschedule itself using a {@link CustomScheduler}. */ 448 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 449 450 /** The underlying task. */ 451 private final Runnable wrappedRunnable; 452 453 /** The executor on which this Callable will be scheduled. */ 454 private final ScheduledExecutorService executor; 455 456 /** 457 * The service that is managing this callable. This is used so that failure can be reported 458 * properly. 459 */ 460 private final AbstractService service; 461 462 /** 463 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 464 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 465 * ensure that it is assigned atomically with being scheduled. 466 */ 467 private final ReentrantLock lock = new ReentrantLock(); 468 469 /** The future that represents the next execution of this task. */ 470 @GuardedBy("lock") 471 @NullableDecl 472 private Future<Void> currentFuture; 473 474 ReschedulableCallable( 475 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 476 this.wrappedRunnable = runnable; 477 this.executor = executor; 478 this.service = service; 479 } 480 481 @Override 482 public Void call() throws Exception { 483 wrappedRunnable.run(); 484 reschedule(); 485 return null; 486 } 487 488 /** Atomically reschedules this task and assigns the new future to {@link #currentFuture}. */ 489 public void reschedule() { 490 // invoke the callback outside the lock, prevents some shenanigans. 491 Schedule schedule; 492 try { 493 schedule = CustomScheduler.this.getNextSchedule(); 494 } catch (Throwable t) { 495 service.notifyFailed(t); 496 return; 497 } 498 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 499 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 500 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 501 // correct order. 502 Throwable scheduleFailure = null; 503 lock.lock(); 504 try { 505 if (currentFuture == null || !currentFuture.isCancelled()) { 506 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 507 } 508 } catch (Throwable e) { 509 // If an exception is thrown by the subclass then we need to make sure that the service 510 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 511 // because the service does not monitor the state of the future so if the exception is not 512 // caught and forwarded to the service the task would stop executing but the service would 513 // have no idea. 514 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 515 // the AbstractService could monitor the future directly. Rescheduling is still hard... 516 // but it would help with some of these lock ordering issues. 517 scheduleFailure = e; 518 } finally { 519 lock.unlock(); 520 } 521 // Call notifyFailed outside the lock to avoid lock ordering issues. 522 if (scheduleFailure != null) { 523 service.notifyFailed(scheduleFailure); 524 } 525 } 526 527 // N.B. Only protect cancel and isCancelled because those are the only methods that are 528 // invoked by the AbstractScheduledService. 529 @Override 530 public boolean cancel(boolean mayInterruptIfRunning) { 531 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 532 lock.lock(); 533 try { 534 return currentFuture.cancel(mayInterruptIfRunning); 535 } finally { 536 lock.unlock(); 537 } 538 } 539 540 @Override 541 public boolean isCancelled() { 542 lock.lock(); 543 try { 544 return currentFuture.isCancelled(); 545 } finally { 546 lock.unlock(); 547 } 548 } 549 550 @Override 551 protected Future<Void> delegate() { 552 throw new UnsupportedOperationException( 553 "Only cancel and isCancelled is supported by this future"); 554 } 555 } 556 557 @Override 558 final Future<?> schedule( 559 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 560 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 561 task.reschedule(); 562 return task; 563 } 564 565 /** 566 * A value object that represents an absolute delay until a task should be invoked. 567 * 568 * @author Luke Sandberg 569 * @since 11.0 570 */ 571 @Beta 572 protected static final class Schedule { 573 574 private final long delay; 575 private final TimeUnit unit; 576 577 /** 578 * @param delay the time from now to delay execution 579 * @param unit the time unit of the delay parameter 580 */ 581 public Schedule(long delay, TimeUnit unit) { 582 this.delay = delay; 583 this.unit = checkNotNull(unit); 584 } 585 } 586 587 /** 588 * Calculates the time at which to next invoke the task. 589 * 590 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 591 * on the same thread as the previous execution of {@link 592 * AbstractScheduledService#runOneIteration}. 593 * 594 * @return a schedule that defines the delay before the next execution. 595 */ 596 // TODO(cpovirk): @ForOverride 597 protected abstract Schedule getNextSchedule() throws Exception; 598 } 599}