001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020 021import com.google.common.annotations.Beta; 022import com.google.common.annotations.GwtIncompatible; 023import com.google.common.base.Supplier; 024import com.google.errorprone.annotations.CanIgnoreReturnValue; 025import com.google.errorprone.annotations.concurrent.GuardedBy; 026import com.google.j2objc.annotations.WeakOuter; 027import java.util.concurrent.Callable; 028import java.util.concurrent.Executor; 029import java.util.concurrent.Executors; 030import java.util.concurrent.Future; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.TimeoutException; 035import java.util.concurrent.locks.ReentrantLock; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038import org.checkerframework.checker.nullness.compatqual.MonotonicNonNullDecl; 039import org.checkerframework.checker.nullness.compatqual.NullableDecl; 040 041/** 042 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 043 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 044 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 045 * 046 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 047 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 048 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 049 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 050 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 051 * 052 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 053 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 054 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 055 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 056 * shared state without additional synchronization necessary for visibility to later executions of 057 * the life cycle methods. 058 * 059 * <h3>Usage Example</h3> 060 * 061 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 062 * rate limit itself. 063 * 064 * <pre>{@code 065 * class CrawlingService extends AbstractScheduledService { 066 * private Set<Uri> visited; 067 * private Queue<Uri> toCrawl; 068 * protected void startUp() throws Exception { 069 * toCrawl = readStartingUris(); 070 * } 071 * 072 * protected void runOneIteration() throws Exception { 073 * Uri uri = toCrawl.remove(); 074 * Collection<Uri> newUris = crawl(uri); 075 * visited.add(uri); 076 * for (Uri newUri : newUris) { 077 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 078 * } 079 * } 080 * 081 * protected void shutDown() throws Exception { 082 * saveUris(toCrawl); 083 * } 084 * 085 * protected Scheduler scheduler() { 086 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 087 * } 088 * } 089 * }</pre> 090 * 091 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 092 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 093 * rate limit the number of queries we perform. 094 * 095 * @author Luke Sandberg 096 * @since 11.0 097 */ 098@Beta 099@GwtIncompatible 100public abstract class AbstractScheduledService implements Service { 101 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 102 103 /** 104 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 105 * task. 106 * 107 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 108 * methods, these provide {@link Scheduler} instances for the common use case of running the 109 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 110 * CustomScheduler}. 111 * 112 * @author Luke Sandberg 113 * @since 11.0 114 */ 115 public abstract static class Scheduler { 116 /** 117 * Returns a {@link Scheduler} that schedules the task using the {@link 118 * ScheduledExecutorService#scheduleWithFixedDelay} method. 119 * 120 * @param initialDelay the time to delay first execution 121 * @param delay the delay between the termination of one execution and the commencement of the 122 * next 123 * @param unit the time unit of the initialDelay and delay parameters 124 */ 125 public static Scheduler newFixedDelaySchedule( 126 final long initialDelay, final long delay, final TimeUnit unit) { 127 checkNotNull(unit); 128 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 129 return new Scheduler() { 130 @Override 131 public Future<?> schedule( 132 AbstractService service, ScheduledExecutorService executor, Runnable task) { 133 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 134 } 135 }; 136 } 137 138 /** 139 * Returns a {@link Scheduler} that schedules the task using the {@link 140 * ScheduledExecutorService#scheduleAtFixedRate} method. 141 * 142 * @param initialDelay the time to delay first execution 143 * @param period the period between successive executions of the task 144 * @param unit the time unit of the initialDelay and period parameters 145 */ 146 public static Scheduler newFixedRateSchedule( 147 final long initialDelay, final long period, final TimeUnit unit) { 148 checkNotNull(unit); 149 checkArgument(period > 0, "period must be > 0, found %s", period); 150 return new Scheduler() { 151 @Override 152 public Future<?> schedule( 153 AbstractService service, ScheduledExecutorService executor, Runnable task) { 154 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 155 } 156 }; 157 } 158 159 /** Schedules the task to run on the provided executor on behalf of the service. */ 160 abstract Future<?> schedule( 161 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 162 163 private Scheduler() {} 164 } 165 166 /* use AbstractService for state management */ 167 private final AbstractService delegate = new ServiceDelegate(); 168 169 @WeakOuter 170 private final class ServiceDelegate extends AbstractService { 171 172 // A handle to the running task so that we can stop it when a shutdown has been requested. 173 // These two fields are volatile because their values will be accessed from multiple threads. 174 @MonotonicNonNullDecl private volatile Future<?> runningTask; 175 @MonotonicNonNullDecl private volatile ScheduledExecutorService executorService; 176 177 // This lock protects the task so we can ensure that none of the template methods (startUp, 178 // shutDown or runOneIteration) run concurrently with one another. 179 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 180 // lock. 181 private final ReentrantLock lock = new ReentrantLock(); 182 183 @WeakOuter 184 class Task implements Runnable { 185 @Override 186 public void run() { 187 lock.lock(); 188 try { 189 if (runningTask.isCancelled()) { 190 // task may have been cancelled while blocked on the lock. 191 return; 192 } 193 AbstractScheduledService.this.runOneIteration(); 194 } catch (Throwable t) { 195 try { 196 shutDown(); 197 } catch (Exception ignored) { 198 logger.log( 199 Level.WARNING, 200 "Error while attempting to shut down the service after failure.", 201 ignored); 202 } 203 notifyFailed(t); 204 runningTask.cancel(false); // prevent future invocations. 205 } finally { 206 lock.unlock(); 207 } 208 } 209 } 210 211 private final Runnable task = new Task(); 212 213 @Override 214 protected final void doStart() { 215 executorService = 216 MoreExecutors.renamingDecorator( 217 executor(), 218 new Supplier<String>() { 219 @Override 220 public String get() { 221 return serviceName() + " " + state(); 222 } 223 }); 224 executorService.execute( 225 new Runnable() { 226 @Override 227 public void run() { 228 lock.lock(); 229 try { 230 startUp(); 231 runningTask = scheduler().schedule(delegate, executorService, task); 232 notifyStarted(); 233 } catch (Throwable t) { 234 notifyFailed(t); 235 if (runningTask != null) { 236 // prevent the task from running if possible 237 runningTask.cancel(false); 238 } 239 } finally { 240 lock.unlock(); 241 } 242 } 243 }); 244 } 245 246 @Override 247 protected final void doStop() { 248 runningTask.cancel(false); 249 executorService.execute( 250 new Runnable() { 251 @Override 252 public void run() { 253 try { 254 lock.lock(); 255 try { 256 if (state() != State.STOPPING) { 257 // This means that the state has changed since we were scheduled. This implies 258 // that an execution of runOneIteration has thrown an exception and we have 259 // transitioned to a failed state, also this means that shutDown has already 260 // been called, so we do not want to call it again. 261 return; 262 } 263 shutDown(); 264 } finally { 265 lock.unlock(); 266 } 267 notifyStopped(); 268 } catch (Throwable t) { 269 notifyFailed(t); 270 } 271 } 272 }); 273 } 274 275 @Override 276 public String toString() { 277 return AbstractScheduledService.this.toString(); 278 } 279 } 280 281 /** Constructor for use by subclasses. */ 282 protected AbstractScheduledService() {} 283 284 /** 285 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 286 * the service will transition to the {@link Service.State#FAILED} state and this method will no 287 * longer be called. 288 */ 289 protected abstract void runOneIteration() throws Exception; 290 291 /** 292 * Start the service. 293 * 294 * <p>By default this method does nothing. 295 */ 296 protected void startUp() throws Exception {} 297 298 /** 299 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 300 * 301 * <p>By default this method does nothing. 302 */ 303 protected void shutDown() throws Exception {} 304 305 /** 306 * Returns the {@link Scheduler} object used to configure this service. This method will only be 307 * called once. 308 */ 309 // TODO(cpovirk): @ForOverride 310 protected abstract Scheduler scheduler(); 311 312 /** 313 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 314 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 315 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 316 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 317 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 318 * instance. This method is guaranteed to only be called once. 319 * 320 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 321 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, 322 * the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 323 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 324 * fails}. 325 */ 326 protected ScheduledExecutorService executor() { 327 @WeakOuter 328 class ThreadFactoryImpl implements ThreadFactory { 329 @Override 330 public Thread newThread(Runnable runnable) { 331 return MoreExecutors.newThread(serviceName(), runnable); 332 } 333 } 334 final ScheduledExecutorService executor = 335 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 336 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 337 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 338 // Technically this listener is added after start() was called so it is a little gross, but it 339 // is called within doStart() so we know that the service cannot terminate or fail concurrently 340 // with adding this listener so it is impossible to miss an event that we are interested in. 341 addListener( 342 new Listener() { 343 @Override 344 public void terminated(State from) { 345 executor.shutdown(); 346 } 347 348 @Override 349 public void failed(State from, Throwable failure) { 350 executor.shutdown(); 351 } 352 }, 353 directExecutor()); 354 return executor; 355 } 356 357 /** 358 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 359 * debugging output. 360 * 361 * @since 14.0 362 */ 363 protected String serviceName() { 364 return getClass().getSimpleName(); 365 } 366 367 @Override 368 public String toString() { 369 return serviceName() + " [" + state() + "]"; 370 } 371 372 @Override 373 public final boolean isRunning() { 374 return delegate.isRunning(); 375 } 376 377 @Override 378 public final State state() { 379 return delegate.state(); 380 } 381 382 /** @since 13.0 */ 383 @Override 384 public final void addListener(Listener listener, Executor executor) { 385 delegate.addListener(listener, executor); 386 } 387 388 /** @since 14.0 */ 389 @Override 390 public final Throwable failureCause() { 391 return delegate.failureCause(); 392 } 393 394 /** @since 15.0 */ 395 @CanIgnoreReturnValue 396 @Override 397 public final Service startAsync() { 398 delegate.startAsync(); 399 return this; 400 } 401 402 /** @since 15.0 */ 403 @CanIgnoreReturnValue 404 @Override 405 public final Service stopAsync() { 406 delegate.stopAsync(); 407 return this; 408 } 409 410 /** @since 15.0 */ 411 @Override 412 public final void awaitRunning() { 413 delegate.awaitRunning(); 414 } 415 416 /** @since 15.0 */ 417 @Override 418 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 419 delegate.awaitRunning(timeout, unit); 420 } 421 422 /** @since 15.0 */ 423 @Override 424 public final void awaitTerminated() { 425 delegate.awaitTerminated(); 426 } 427 428 /** @since 15.0 */ 429 @Override 430 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 431 delegate.awaitTerminated(timeout, unit); 432 } 433 434 /** 435 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 436 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 437 * cancelled, the {@link #getNextSchedule} method will be called. 438 * 439 * @author Luke Sandberg 440 * @since 11.0 441 */ 442 @Beta 443 public abstract static class CustomScheduler extends Scheduler { 444 445 /** A callable class that can reschedule itself using a {@link CustomScheduler}. */ 446 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 447 448 /** The underlying task. */ 449 private final Runnable wrappedRunnable; 450 451 /** The executor on which this Callable will be scheduled. */ 452 private final ScheduledExecutorService executor; 453 454 /** 455 * The service that is managing this callable. This is used so that failure can be reported 456 * properly. 457 */ 458 private final AbstractService service; 459 460 /** 461 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 462 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 463 * ensure that it is assigned atomically with being scheduled. 464 */ 465 private final ReentrantLock lock = new ReentrantLock(); 466 467 /** The future that represents the next execution of this task. */ 468 @GuardedBy("lock") 469 @NullableDecl 470 private Future<Void> currentFuture; 471 472 ReschedulableCallable( 473 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 474 this.wrappedRunnable = runnable; 475 this.executor = executor; 476 this.service = service; 477 } 478 479 @Override 480 public Void call() throws Exception { 481 wrappedRunnable.run(); 482 reschedule(); 483 return null; 484 } 485 486 /** Atomically reschedules this task and assigns the new future to {@link #currentFuture}. */ 487 public void reschedule() { 488 // invoke the callback outside the lock, prevents some shenanigans. 489 Schedule schedule; 490 try { 491 schedule = CustomScheduler.this.getNextSchedule(); 492 } catch (Throwable t) { 493 service.notifyFailed(t); 494 return; 495 } 496 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 497 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 498 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 499 // correct order. 500 Throwable scheduleFailure = null; 501 lock.lock(); 502 try { 503 if (currentFuture == null || !currentFuture.isCancelled()) { 504 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 505 } 506 } catch (Throwable e) { 507 // If an exception is thrown by the subclass then we need to make sure that the service 508 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 509 // because the service does not monitor the state of the future so if the exception is not 510 // caught and forwarded to the service the task would stop executing but the service would 511 // have no idea. 512 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 513 // the AbstractService could monitor the future directly. Rescheduling is still hard... 514 // but it would help with some of these lock ordering issues. 515 scheduleFailure = e; 516 } finally { 517 lock.unlock(); 518 } 519 // Call notifyFailed outside the lock to avoid lock ordering issues. 520 if (scheduleFailure != null) { 521 service.notifyFailed(scheduleFailure); 522 } 523 } 524 525 // N.B. Only protect cancel and isCancelled because those are the only methods that are 526 // invoked by the AbstractScheduledService. 527 @Override 528 public boolean cancel(boolean mayInterruptIfRunning) { 529 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 530 lock.lock(); 531 try { 532 return currentFuture.cancel(mayInterruptIfRunning); 533 } finally { 534 lock.unlock(); 535 } 536 } 537 538 @Override 539 public boolean isCancelled() { 540 lock.lock(); 541 try { 542 return currentFuture.isCancelled(); 543 } finally { 544 lock.unlock(); 545 } 546 } 547 548 @Override 549 protected Future<Void> delegate() { 550 throw new UnsupportedOperationException( 551 "Only cancel and isCancelled is supported by this future"); 552 } 553 } 554 555 @Override 556 final Future<?> schedule( 557 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 558 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 559 task.reschedule(); 560 return task; 561 } 562 563 /** 564 * A value object that represents an absolute delay until a task should be invoked. 565 * 566 * @author Luke Sandberg 567 * @since 11.0 568 */ 569 @Beta 570 protected static final class Schedule { 571 572 private final long delay; 573 private final TimeUnit unit; 574 575 /** 576 * @param delay the time from now to delay execution 577 * @param unit the time unit of the delay parameter 578 */ 579 public Schedule(long delay, TimeUnit unit) { 580 this.delay = delay; 581 this.unit = checkNotNull(unit); 582 } 583 } 584 585 /** 586 * Calculates the time at which to next invoke the task. 587 * 588 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 589 * on the same thread as the previous execution of {@link 590 * AbstractScheduledService#runOneIteration}. 591 * 592 * @return a schedule that defines the delay before the next execution. 593 */ 594 // TODO(cpovirk): @ForOverride 595 protected abstract Schedule getNextSchedule() throws Exception; 596 } 597}