001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020 021import com.google.common.annotations.Beta; 022import com.google.common.annotations.GwtIncompatible; 023import com.google.common.base.Supplier; 024import com.google.errorprone.annotations.CanIgnoreReturnValue; 025import com.google.errorprone.annotations.concurrent.GuardedBy; 026import com.google.j2objc.annotations.WeakOuter; 027import java.util.concurrent.Callable; 028import java.util.concurrent.Executor; 029import java.util.concurrent.Executors; 030import java.util.concurrent.Future; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.TimeoutException; 035import java.util.concurrent.locks.ReentrantLock; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038import org.checkerframework.checker.nullness.compatqual.MonotonicNonNullDecl; 039import org.checkerframework.checker.nullness.compatqual.NullableDecl; 040 041/** 042 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 043 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 044 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 045 * 046 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 047 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 048 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 049 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 050 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 051 * 052 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 053 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 054 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 055 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 056 * shared state without additional synchronization necessary for visibility to later executions of 057 * the life cycle methods. 058 * 059 * <h3>Usage Example</h3> 060 * 061 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 062 * rate limit itself. 063 * 064 * <pre>{@code 065 * class CrawlingService extends AbstractScheduledService { 066 * private Set<Uri> visited; 067 * private Queue<Uri> toCrawl; 068 * protected void startUp() throws Exception { 069 * toCrawl = readStartingUris(); 070 * } 071 * 072 * protected void runOneIteration() throws Exception { 073 * Uri uri = toCrawl.remove(); 074 * Collection<Uri> newUris = crawl(uri); 075 * visited.add(uri); 076 * for (Uri newUri : newUris) { 077 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 078 * } 079 * } 080 * 081 * protected void shutDown() throws Exception { 082 * saveUris(toCrawl); 083 * } 084 * 085 * protected Scheduler scheduler() { 086 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 087 * } 088 * } 089 * }</pre> 090 * 091 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 092 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 093 * rate limit the number of queries we perform. 094 * 095 * @author Luke Sandberg 096 * @since 11.0 097 */ 098@Beta 099@GwtIncompatible 100public abstract class AbstractScheduledService implements Service { 101 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 102 103 /** 104 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 105 * task. 106 * 107 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 108 * methods, these provide {@link Scheduler} instances for the common use case of running the 109 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 110 * CustomScheduler}. 111 * 112 * @author Luke Sandberg 113 * @since 11.0 114 */ 115 public abstract static class Scheduler { 116 /** 117 * Returns a {@link Scheduler} that schedules the task using the {@link 118 * ScheduledExecutorService#scheduleWithFixedDelay} method. 119 * 120 * @param initialDelay the time to delay first execution 121 * @param delay the delay between the termination of one execution and the commencement of the 122 * next 123 * @param unit the time unit of the initialDelay and delay parameters 124 */ 125 public static Scheduler newFixedDelaySchedule( 126 final long initialDelay, final long delay, final TimeUnit unit) { 127 checkNotNull(unit); 128 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 129 return new Scheduler() { 130 @Override 131 public Future<?> schedule( 132 AbstractService service, ScheduledExecutorService executor, Runnable task) { 133 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 134 } 135 }; 136 } 137 138 /** 139 * Returns a {@link Scheduler} that schedules the task using the {@link 140 * ScheduledExecutorService#scheduleAtFixedRate} method. 141 * 142 * @param initialDelay the time to delay first execution 143 * @param period the period between successive executions of the task 144 * @param unit the time unit of the initialDelay and period parameters 145 */ 146 public static Scheduler newFixedRateSchedule( 147 final long initialDelay, final long period, final TimeUnit unit) { 148 checkNotNull(unit); 149 checkArgument(period > 0, "period must be > 0, found %s", period); 150 return new Scheduler() { 151 @Override 152 public Future<?> schedule( 153 AbstractService service, ScheduledExecutorService executor, Runnable task) { 154 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 155 } 156 }; 157 } 158 159 /** Schedules the task to run on the provided executor on behalf of the service. */ 160 abstract Future<?> schedule( 161 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 162 163 private Scheduler() {} 164 } 165 166 /* use AbstractService for state management */ 167 private final AbstractService delegate = new ServiceDelegate(); 168 169 @WeakOuter 170 private final class ServiceDelegate extends AbstractService { 171 172 // A handle to the running task so that we can stop it when a shutdown has been requested. 173 // These two fields are volatile because their values will be accessed from multiple threads. 174 @MonotonicNonNullDecl private volatile Future<?> runningTask; 175 @MonotonicNonNullDecl private volatile ScheduledExecutorService executorService; 176 177 // This lock protects the task so we can ensure that none of the template methods (startUp, 178 // shutDown or runOneIteration) run concurrently with one another. 179 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 180 // lock. 181 private final ReentrantLock lock = new ReentrantLock(); 182 183 @WeakOuter 184 class Task implements Runnable { 185 @Override 186 public void run() { 187 lock.lock(); 188 try { 189 if (runningTask.isCancelled()) { 190 // task may have been cancelled while blocked on the lock. 191 return; 192 } 193 AbstractScheduledService.this.runOneIteration(); 194 } catch (Throwable t) { 195 try { 196 shutDown(); 197 } catch (Exception ignored) { 198 logger.log( 199 Level.WARNING, 200 "Error while attempting to shut down the service after failure.", 201 ignored); 202 } 203 notifyFailed(t); 204 runningTask.cancel(false); // prevent future invocations. 205 } finally { 206 lock.unlock(); 207 } 208 } 209 } 210 211 private final Runnable task = new Task(); 212 213 @Override 214 protected final void doStart() { 215 executorService = 216 MoreExecutors.renamingDecorator( 217 executor(), 218 new Supplier<String>() { 219 @Override 220 public String get() { 221 return serviceName() + " " + state(); 222 } 223 }); 224 executorService.execute( 225 new Runnable() { 226 @Override 227 public void run() { 228 lock.lock(); 229 try { 230 startUp(); 231 runningTask = scheduler().schedule(delegate, executorService, task); 232 notifyStarted(); 233 } catch (Throwable t) { 234 notifyFailed(t); 235 if (runningTask != null) { 236 // prevent the task from running if possible 237 runningTask.cancel(false); 238 } 239 } finally { 240 lock.unlock(); 241 } 242 } 243 }); 244 } 245 246 @Override 247 protected final void doStop() { 248 runningTask.cancel(false); 249 executorService.execute( 250 new Runnable() { 251 @Override 252 public void run() { 253 try { 254 lock.lock(); 255 try { 256 if (state() != State.STOPPING) { 257 // This means that the state has changed since we were scheduled. This implies 258 // that an execution of runOneIteration has thrown an exception and we have 259 // transitioned to a failed state, also this means that shutDown has already 260 // been called, so we do not want to call it again. 261 return; 262 } 263 shutDown(); 264 } finally { 265 lock.unlock(); 266 } 267 notifyStopped(); 268 } catch (Throwable t) { 269 notifyFailed(t); 270 } 271 } 272 }); 273 } 274 275 @Override 276 public String toString() { 277 return AbstractScheduledService.this.toString(); 278 } 279 } 280 281 /** Constructor for use by subclasses. */ 282 protected AbstractScheduledService() {} 283 284 /** 285 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 286 * the service will transition to the {@link Service.State#FAILED} state and this method will no 287 * longer be called. 288 */ 289 protected abstract void runOneIteration() throws Exception; 290 291 /** 292 * Start the service. 293 * 294 * <p>By default this method does nothing. 295 */ 296 protected void startUp() throws Exception {} 297 298 /** 299 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 300 * 301 * <p>By default this method does nothing. 302 */ 303 protected void shutDown() throws Exception {} 304 305 /** 306 * Returns the {@link Scheduler} object used to configure this service. This method will only be 307 * called once. 308 */ 309 // TODO(cpovirk): @ForOverride 310 protected abstract Scheduler scheduler(); 311 312 /** 313 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 314 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 315 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 316 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 317 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 318 * instance. This method is guaranteed to only be called once. 319 * 320 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 321 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, 322 * the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 323 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 324 * fails}. 325 */ 326 protected ScheduledExecutorService executor() { 327 @WeakOuter 328 class ThreadFactoryImpl implements ThreadFactory { 329 @Override 330 public Thread newThread(Runnable runnable) { 331 return MoreExecutors.newThread(serviceName(), runnable); 332 } 333 } 334 final ScheduledExecutorService executor = 335 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 336 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 337 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 338 // Technically this listener is added after start() was called so it is a little gross, but it 339 // is called within doStart() so we know that the service cannot terminate or fail concurrently 340 // with adding this listener so it is impossible to miss an event that we are interested in. 341 addListener( 342 new Listener() { 343 @Override 344 public void terminated(State from) { 345 executor.shutdown(); 346 } 347 348 @Override 349 public void failed(State from, Throwable failure) { 350 executor.shutdown(); 351 } 352 }, 353 directExecutor()); 354 return executor; 355 } 356 357 /** 358 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 359 * debugging output. 360 * 361 * @since 14.0 362 */ 363 protected String serviceName() { 364 return getClass().getSimpleName(); 365 } 366 367 @Override 368 public String toString() { 369 return serviceName() + " [" + state() + "]"; 370 } 371 372 @Override 373 public final boolean isRunning() { 374 return delegate.isRunning(); 375 } 376 377 @Override 378 public final State state() { 379 return delegate.state(); 380 } 381 382 /** @since 13.0 */ 383 @Override 384 public final void addListener(Listener listener, Executor executor) { 385 delegate.addListener(listener, executor); 386 } 387 388 /** @since 14.0 */ 389 @Override 390 public final Throwable failureCause() { 391 return delegate.failureCause(); 392 } 393 394 /** @since 15.0 */ 395 @CanIgnoreReturnValue 396 @Override 397 public final Service startAsync() { 398 delegate.startAsync(); 399 return this; 400 } 401 402 /** @since 15.0 */ 403 @CanIgnoreReturnValue 404 @Override 405 public final Service stopAsync() { 406 delegate.stopAsync(); 407 return this; 408 } 409 410 /** @since 15.0 */ 411 @Override 412 public final void awaitRunning() { 413 delegate.awaitRunning(); 414 } 415 416 /** @since 15.0 */ 417 @Override 418 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 419 delegate.awaitRunning(timeout, unit); 420 } 421 422 /** @since 15.0 */ 423 @Override 424 public final void awaitTerminated() { 425 delegate.awaitTerminated(); 426 } 427 428 /** @since 15.0 */ 429 @Override 430 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 431 delegate.awaitTerminated(timeout, unit); 432 } 433 434 /** 435 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 436 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 437 * cancelled, the {@link #getNextSchedule} method will be called. 438 * 439 * @author Luke Sandberg 440 * @since 11.0 441 */ 442 @Beta 443 public abstract static class CustomScheduler extends Scheduler { 444 445 /** A callable class that can reschedule itself using a {@link CustomScheduler}. */ 446 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 447 448 /** The underlying task. */ 449 private final Runnable wrappedRunnable; 450 451 /** The executor on which this Callable will be scheduled. */ 452 private final ScheduledExecutorService executor; 453 454 /** 455 * The service that is managing this callable. This is used so that failure can be reported 456 * properly. 457 */ 458 private final AbstractService service; 459 460 /** 461 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 462 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 463 * ensure that it is assigned atomically with being scheduled. 464 */ 465 private final ReentrantLock lock = new ReentrantLock(); 466 467 /** The future that represents the next execution of this task. */ 468 @GuardedBy("lock") 469 @NullableDecl private Future<Void> currentFuture; 470 471 ReschedulableCallable( 472 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 473 this.wrappedRunnable = runnable; 474 this.executor = executor; 475 this.service = service; 476 } 477 478 @Override 479 public Void call() throws Exception { 480 wrappedRunnable.run(); 481 reschedule(); 482 return null; 483 } 484 485 /** Atomically reschedules this task and assigns the new future to {@link #currentFuture}. */ 486 public void reschedule() { 487 // invoke the callback outside the lock, prevents some shenanigans. 488 Schedule schedule; 489 try { 490 schedule = CustomScheduler.this.getNextSchedule(); 491 } catch (Throwable t) { 492 service.notifyFailed(t); 493 return; 494 } 495 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 496 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 497 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 498 // correct order. 499 Throwable scheduleFailure = null; 500 lock.lock(); 501 try { 502 if (currentFuture == null || !currentFuture.isCancelled()) { 503 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 504 } 505 } catch (Throwable e) { 506 // If an exception is thrown by the subclass then we need to make sure that the service 507 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 508 // because the service does not monitor the state of the future so if the exception is not 509 // caught and forwarded to the service the task would stop executing but the service would 510 // have no idea. 511 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 512 // the AbstractService could monitor the future directly. Rescheduling is still hard... 513 // but it would help with some of these lock ordering issues. 514 scheduleFailure = e; 515 } finally { 516 lock.unlock(); 517 } 518 // Call notifyFailed outside the lock to avoid lock ordering issues. 519 if (scheduleFailure != null) { 520 service.notifyFailed(scheduleFailure); 521 } 522 } 523 524 // N.B. Only protect cancel and isCancelled because those are the only methods that are 525 // invoked by the AbstractScheduledService. 526 @Override 527 public boolean cancel(boolean mayInterruptIfRunning) { 528 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 529 lock.lock(); 530 try { 531 return currentFuture.cancel(mayInterruptIfRunning); 532 } finally { 533 lock.unlock(); 534 } 535 } 536 537 @Override 538 public boolean isCancelled() { 539 lock.lock(); 540 try { 541 return currentFuture.isCancelled(); 542 } finally { 543 lock.unlock(); 544 } 545 } 546 547 @Override 548 protected Future<Void> delegate() { 549 throw new UnsupportedOperationException( 550 "Only cancel and isCancelled is supported by this future"); 551 } 552 } 553 554 @Override 555 final Future<?> schedule( 556 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 557 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 558 task.reschedule(); 559 return task; 560 } 561 562 /** 563 * A value object that represents an absolute delay until a task should be invoked. 564 * 565 * @author Luke Sandberg 566 * @since 11.0 567 */ 568 @Beta 569 protected static final class Schedule { 570 571 private final long delay; 572 private final TimeUnit unit; 573 574 /** 575 * @param delay the time from now to delay execution 576 * @param unit the time unit of the delay parameter 577 */ 578 public Schedule(long delay, TimeUnit unit) { 579 this.delay = delay; 580 this.unit = checkNotNull(unit); 581 } 582 } 583 584 /** 585 * Calculates the time at which to next invoke the task. 586 * 587 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 588 * on the same thread as the previous execution of {@link 589 * AbstractScheduledService#runOneIteration}. 590 * 591 * @return a schedule that defines the delay before the next execution. 592 */ 593 // TODO(cpovirk): @ForOverride 594 protected abstract Schedule getNextSchedule() throws Exception; 595 } 596}