001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020 021import com.google.common.annotations.GwtIncompatible; 022import com.google.common.base.Supplier; 023import com.google.errorprone.annotations.CanIgnoreReturnValue; 024import com.google.errorprone.annotations.concurrent.GuardedBy; 025import com.google.j2objc.annotations.WeakOuter; 026import java.util.concurrent.Callable; 027import java.util.concurrent.Executor; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.ScheduledExecutorService; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.TimeoutException; 034import java.util.concurrent.locks.ReentrantLock; 035import java.util.logging.Level; 036import java.util.logging.Logger; 037import org.checkerframework.checker.nullness.compatqual.NullableDecl; 038 039/** 040 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 041 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 042 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 043 * 044 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 045 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 046 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 047 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 048 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 049 * 050 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 051 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 052 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 053 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 054 * shared state without additional synchronization necessary for visibility to later executions of 055 * the life cycle methods. 056 * 057 * <h3>Usage Example</h3> 058 * 059 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 060 * rate limit itself. 061 * 062 * <pre>{@code 063 * class CrawlingService extends AbstractScheduledService { 064 * private Set<Uri> visited; 065 * private Queue<Uri> toCrawl; 066 * protected void startUp() throws Exception { 067 * toCrawl = readStartingUris(); 068 * } 069 * 070 * protected void runOneIteration() throws Exception { 071 * Uri uri = toCrawl.remove(); 072 * Collection<Uri> newUris = crawl(uri); 073 * visited.add(uri); 074 * for (Uri newUri : newUris) { 075 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 076 * } 077 * } 078 * 079 * protected void shutDown() throws Exception { 080 * saveUris(toCrawl); 081 * } 082 * 083 * protected Scheduler scheduler() { 084 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 085 * } 086 * } 087 * }</pre> 088 * 089 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 090 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 091 * rate limit the number of queries we perform. 092 * 093 * @author Luke Sandberg 094 * @since 11.0 095 */ 096@GwtIncompatible 097public abstract class AbstractScheduledService implements Service { 098 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 099 100 /** 101 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 102 * task. 103 * 104 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 105 * methods, these provide {@link Scheduler} instances for the common use case of running the 106 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 107 * CustomScheduler}. 108 * 109 * @author Luke Sandberg 110 * @since 11.0 111 */ 112 public abstract static class Scheduler { 113 /** 114 * Returns a {@link Scheduler} that schedules the task using the {@link 115 * ScheduledExecutorService#scheduleWithFixedDelay} method. 116 * 117 * @param initialDelay the time to delay first execution 118 * @param delay the delay between the termination of one execution and the commencement of the 119 * next 120 * @param unit the time unit of the initialDelay and delay parameters 121 */ 122 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 123 public static Scheduler newFixedDelaySchedule( 124 final long initialDelay, final long delay, final TimeUnit unit) { 125 checkNotNull(unit); 126 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 127 return new Scheduler() { 128 @Override 129 public Future<?> schedule( 130 AbstractService service, ScheduledExecutorService executor, Runnable task) { 131 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 132 } 133 }; 134 } 135 136 /** 137 * Returns a {@link Scheduler} that schedules the task using the {@link 138 * ScheduledExecutorService#scheduleAtFixedRate} method. 139 * 140 * @param initialDelay the time to delay first execution 141 * @param period the period between successive executions of the task 142 * @param unit the time unit of the initialDelay and period parameters 143 */ 144 @SuppressWarnings("GoodTime") // should accept a java.time.Duration 145 public static Scheduler newFixedRateSchedule( 146 final long initialDelay, final long period, final TimeUnit unit) { 147 checkNotNull(unit); 148 checkArgument(period > 0, "period must be > 0, found %s", period); 149 return new Scheduler() { 150 @Override 151 public Future<?> schedule( 152 AbstractService service, ScheduledExecutorService executor, Runnable task) { 153 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 154 } 155 }; 156 } 157 158 /** Schedules the task to run on the provided executor on behalf of the service. */ 159 abstract Future<?> schedule( 160 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 161 162 private Scheduler() {} 163 } 164 165 /* use AbstractService for state management */ 166 private final AbstractService delegate = new ServiceDelegate(); 167 168 @WeakOuter 169 private final class ServiceDelegate extends AbstractService { 170 171 // A handle to the running task so that we can stop it when a shutdown has been requested. 172 // These two fields are volatile because their values will be accessed from multiple threads. 173 @NullableDecl private volatile Future<?> runningTask; 174 @NullableDecl private volatile ScheduledExecutorService executorService; 175 176 // This lock protects the task so we can ensure that none of the template methods (startUp, 177 // shutDown or runOneIteration) run concurrently with one another. 178 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 179 // lock. 180 private final ReentrantLock lock = new ReentrantLock(); 181 182 @WeakOuter 183 class Task implements Runnable { 184 @Override 185 public void run() { 186 lock.lock(); 187 try { 188 if (runningTask.isCancelled()) { 189 // task may have been cancelled while blocked on the lock. 190 return; 191 } 192 AbstractScheduledService.this.runOneIteration(); 193 } catch (Throwable t) { 194 try { 195 shutDown(); 196 } catch (Exception ignored) { 197 logger.log( 198 Level.WARNING, 199 "Error while attempting to shut down the service after failure.", 200 ignored); 201 } 202 notifyFailed(t); 203 runningTask.cancel(false); // prevent future invocations. 204 } finally { 205 lock.unlock(); 206 } 207 } 208 } 209 210 private final Runnable task = new Task(); 211 212 @Override 213 protected final void doStart() { 214 executorService = 215 MoreExecutors.renamingDecorator( 216 executor(), 217 new Supplier<String>() { 218 @Override 219 public String get() { 220 return serviceName() + " " + state(); 221 } 222 }); 223 executorService.execute( 224 new Runnable() { 225 @Override 226 public void run() { 227 lock.lock(); 228 try { 229 startUp(); 230 runningTask = scheduler().schedule(delegate, executorService, task); 231 notifyStarted(); 232 } catch (Throwable t) { 233 notifyFailed(t); 234 if (runningTask != null) { 235 // prevent the task from running if possible 236 runningTask.cancel(false); 237 } 238 } finally { 239 lock.unlock(); 240 } 241 } 242 }); 243 } 244 245 @Override 246 protected final void doStop() { 247 runningTask.cancel(false); 248 executorService.execute( 249 new Runnable() { 250 @Override 251 public void run() { 252 try { 253 lock.lock(); 254 try { 255 if (state() != State.STOPPING) { 256 // This means that the state has changed since we were scheduled. This implies 257 // that an execution of runOneIteration has thrown an exception and we have 258 // transitioned to a failed state, also this means that shutDown has already 259 // been called, so we do not want to call it again. 260 return; 261 } 262 shutDown(); 263 } finally { 264 lock.unlock(); 265 } 266 notifyStopped(); 267 } catch (Throwable t) { 268 notifyFailed(t); 269 } 270 } 271 }); 272 } 273 274 @Override 275 public String toString() { 276 return AbstractScheduledService.this.toString(); 277 } 278 } 279 280 /** Constructor for use by subclasses. */ 281 protected AbstractScheduledService() {} 282 283 /** 284 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 285 * the service will transition to the {@link Service.State#FAILED} state and this method will no 286 * longer be called. 287 */ 288 protected abstract void runOneIteration() throws Exception; 289 290 /** 291 * Start the service. 292 * 293 * <p>By default this method does nothing. 294 */ 295 protected void startUp() throws Exception {} 296 297 /** 298 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 299 * 300 * <p>By default this method does nothing. 301 */ 302 protected void shutDown() throws Exception {} 303 304 /** 305 * Returns the {@link Scheduler} object used to configure this service. This method will only be 306 * called once. 307 */ 308 // TODO(cpovirk): @ForOverride 309 protected abstract Scheduler scheduler(); 310 311 /** 312 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 313 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 314 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 315 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 316 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 317 * instance. This method is guaranteed to only be called once. 318 * 319 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 320 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, 321 * the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 322 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 323 * fails}. 324 */ 325 protected ScheduledExecutorService executor() { 326 @WeakOuter 327 class ThreadFactoryImpl implements ThreadFactory { 328 @Override 329 public Thread newThread(Runnable runnable) { 330 return MoreExecutors.newThread(serviceName(), runnable); 331 } 332 } 333 final ScheduledExecutorService executor = 334 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 335 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 336 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 337 // Technically this listener is added after start() was called so it is a little gross, but it 338 // is called within doStart() so we know that the service cannot terminate or fail concurrently 339 // with adding this listener so it is impossible to miss an event that we are interested in. 340 addListener( 341 new Listener() { 342 @Override 343 public void terminated(State from) { 344 executor.shutdown(); 345 } 346 347 @Override 348 public void failed(State from, Throwable failure) { 349 executor.shutdown(); 350 } 351 }, 352 directExecutor()); 353 return executor; 354 } 355 356 /** 357 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 358 * debugging output. 359 * 360 * @since 14.0 361 */ 362 protected String serviceName() { 363 return getClass().getSimpleName(); 364 } 365 366 @Override 367 public String toString() { 368 return serviceName() + " [" + state() + "]"; 369 } 370 371 @Override 372 public final boolean isRunning() { 373 return delegate.isRunning(); 374 } 375 376 @Override 377 public final State state() { 378 return delegate.state(); 379 } 380 381 /** @since 13.0 */ 382 @Override 383 public final void addListener(Listener listener, Executor executor) { 384 delegate.addListener(listener, executor); 385 } 386 387 /** @since 14.0 */ 388 @Override 389 public final Throwable failureCause() { 390 return delegate.failureCause(); 391 } 392 393 /** @since 15.0 */ 394 @CanIgnoreReturnValue 395 @Override 396 public final Service startAsync() { 397 delegate.startAsync(); 398 return this; 399 } 400 401 /** @since 15.0 */ 402 @CanIgnoreReturnValue 403 @Override 404 public final Service stopAsync() { 405 delegate.stopAsync(); 406 return this; 407 } 408 409 /** @since 15.0 */ 410 @Override 411 public final void awaitRunning() { 412 delegate.awaitRunning(); 413 } 414 415 /** @since 15.0 */ 416 @Override 417 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 418 delegate.awaitRunning(timeout, unit); 419 } 420 421 /** @since 15.0 */ 422 @Override 423 public final void awaitTerminated() { 424 delegate.awaitTerminated(); 425 } 426 427 /** @since 15.0 */ 428 @Override 429 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 430 delegate.awaitTerminated(timeout, unit); 431 } 432 433 /** 434 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 435 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 436 * cancelled, the {@link #getNextSchedule} method will be called. 437 * 438 * @author Luke Sandberg 439 * @since 11.0 440 */ 441 public abstract static class CustomScheduler extends Scheduler { 442 443 /** A callable class that can reschedule itself using a {@link CustomScheduler}. */ 444 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 445 446 /** The underlying task. */ 447 private final Runnable wrappedRunnable; 448 449 /** The executor on which this Callable will be scheduled. */ 450 private final ScheduledExecutorService executor; 451 452 /** 453 * The service that is managing this callable. This is used so that failure can be reported 454 * properly. 455 */ 456 private final AbstractService service; 457 458 /** 459 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 460 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 461 * ensure that it is assigned atomically with being scheduled. 462 */ 463 private final ReentrantLock lock = new ReentrantLock(); 464 465 /** The future that represents the next execution of this task. */ 466 @GuardedBy("lock") 467 @NullableDecl 468 private Future<Void> currentFuture; 469 470 ReschedulableCallable( 471 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 472 this.wrappedRunnable = runnable; 473 this.executor = executor; 474 this.service = service; 475 } 476 477 @Override 478 public Void call() throws Exception { 479 wrappedRunnable.run(); 480 reschedule(); 481 return null; 482 } 483 484 /** Atomically reschedules this task and assigns the new future to {@link #currentFuture}. */ 485 public void reschedule() { 486 // invoke the callback outside the lock, prevents some shenanigans. 487 Schedule schedule; 488 try { 489 schedule = CustomScheduler.this.getNextSchedule(); 490 } catch (Throwable t) { 491 service.notifyFailed(t); 492 return; 493 } 494 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 495 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 496 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 497 // correct order. 498 Throwable scheduleFailure = null; 499 lock.lock(); 500 try { 501 if (currentFuture == null || !currentFuture.isCancelled()) { 502 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 503 } 504 } catch (Throwable e) { 505 // If an exception is thrown by the subclass then we need to make sure that the service 506 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 507 // because the service does not monitor the state of the future so if the exception is not 508 // caught and forwarded to the service the task would stop executing but the service would 509 // have no idea. 510 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 511 // the AbstractService could monitor the future directly. Rescheduling is still hard... 512 // but it would help with some of these lock ordering issues. 513 scheduleFailure = e; 514 } finally { 515 lock.unlock(); 516 } 517 // Call notifyFailed outside the lock to avoid lock ordering issues. 518 if (scheduleFailure != null) { 519 service.notifyFailed(scheduleFailure); 520 } 521 } 522 523 // N.B. Only protect cancel and isCancelled because those are the only methods that are 524 // invoked by the AbstractScheduledService. 525 @Override 526 public boolean cancel(boolean mayInterruptIfRunning) { 527 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 528 lock.lock(); 529 try { 530 return currentFuture.cancel(mayInterruptIfRunning); 531 } finally { 532 lock.unlock(); 533 } 534 } 535 536 @Override 537 public boolean isCancelled() { 538 lock.lock(); 539 try { 540 return currentFuture.isCancelled(); 541 } finally { 542 lock.unlock(); 543 } 544 } 545 546 @Override 547 protected Future<Void> delegate() { 548 throw new UnsupportedOperationException( 549 "Only cancel and isCancelled is supported by this future"); 550 } 551 } 552 553 @Override 554 final Future<?> schedule( 555 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 556 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 557 task.reschedule(); 558 return task; 559 } 560 561 /** 562 * A value object that represents an absolute delay until a task should be invoked. 563 * 564 * @author Luke Sandberg 565 * @since 11.0 566 */ 567 protected static final class Schedule { 568 569 private final long delay; 570 private final TimeUnit unit; 571 572 /** 573 * @param delay the time from now to delay execution 574 * @param unit the time unit of the delay parameter 575 */ 576 public Schedule(long delay, TimeUnit unit) { 577 this.delay = delay; 578 this.unit = checkNotNull(unit); 579 } 580 } 581 582 /** 583 * Calculates the time at which to next invoke the task. 584 * 585 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 586 * on the same thread as the previous execution of {@link 587 * AbstractScheduledService#runOneIteration}. 588 * 589 * @return a schedule that defines the delay before the next execution. 590 */ 591 // TODO(cpovirk): @ForOverride 592 protected abstract Schedule getNextSchedule() throws Exception; 593 } 594}