001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.base.Preconditions.checkArgument; 018import static com.google.common.base.Preconditions.checkNotNull; 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020 021import com.google.common.annotations.Beta; 022import com.google.common.annotations.GwtIncompatible; 023import com.google.common.base.Supplier; 024import com.google.errorprone.annotations.CanIgnoreReturnValue; 025import com.google.errorprone.annotations.concurrent.GuardedBy; 026import com.google.j2objc.annotations.WeakOuter; 027import java.util.concurrent.Callable; 028import java.util.concurrent.Executor; 029import java.util.concurrent.Executors; 030import java.util.concurrent.Future; 031import java.util.concurrent.ScheduledExecutorService; 032import java.util.concurrent.ThreadFactory; 033import java.util.concurrent.TimeUnit; 034import java.util.concurrent.TimeoutException; 035import java.util.concurrent.locks.ReentrantLock; 036import java.util.logging.Level; 037import java.util.logging.Logger; 038 039/** 040 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 041 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 042 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 043 * 044 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 045 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 046 * {@link #runOneIteration} that will be executed periodically as specified by its {@link 047 * Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the periodic 048 * task (but not interrupt it) and wait for it to stop before running the {@link #shutDown} method. 049 * 050 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 051 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 052 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 053 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely modify 054 * shared state without additional synchronization necessary for visibility to later executions of 055 * the life cycle methods. 056 * 057 * <h3>Usage Example</h3> 058 * 059 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 060 * rate limit itself. 061 * 062 * <pre>{@code 063 * class CrawlingService extends AbstractScheduledService { 064 * private Set<Uri> visited; 065 * private Queue<Uri> toCrawl; 066 * protected void startUp() throws Exception { 067 * toCrawl = readStartingUris(); 068 * } 069 * 070 * protected void runOneIteration() throws Exception { 071 * Uri uri = toCrawl.remove(); 072 * Collection<Uri> newUris = crawl(uri); 073 * visited.add(uri); 074 * for (Uri newUri : newUris) { 075 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 076 * } 077 * } 078 * 079 * protected void shutDown() throws Exception { 080 * saveUris(toCrawl); 081 * } 082 * 083 * protected Scheduler scheduler() { 084 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 085 * } 086 * } 087 * }</pre> 088 * 089 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 090 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 091 * rate limit the number of queries we perform. 092 * 093 * @author Luke Sandberg 094 * @since 11.0 095 */ 096@Beta 097@GwtIncompatible 098public abstract class AbstractScheduledService implements Service { 099 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 100 101 /** 102 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 103 * task. 104 * 105 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 106 * methods, these provide {@link Scheduler} instances for the common use case of running the 107 * service with a fixed schedule. If more flexibility is needed then consider subclassing {@link 108 * CustomScheduler}. 109 * 110 * @author Luke Sandberg 111 * @since 11.0 112 */ 113 public abstract static class Scheduler { 114 /** 115 * Returns a {@link Scheduler} that schedules the task using the {@link 116 * ScheduledExecutorService#scheduleWithFixedDelay} method. 117 * 118 * @param initialDelay the time to delay first execution 119 * @param delay the delay between the termination of one execution and the commencement of the 120 * next 121 * @param unit the time unit of the initialDelay and delay parameters 122 */ 123 public static Scheduler newFixedDelaySchedule( 124 final long initialDelay, final long delay, final TimeUnit unit) { 125 checkNotNull(unit); 126 checkArgument(delay > 0, "delay must be > 0, found %s", delay); 127 return new Scheduler() { 128 @Override 129 public Future<?> schedule( 130 AbstractService service, ScheduledExecutorService executor, Runnable task) { 131 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 132 } 133 }; 134 } 135 136 /** 137 * Returns a {@link Scheduler} that schedules the task using the {@link 138 * ScheduledExecutorService#scheduleAtFixedRate} method. 139 * 140 * @param initialDelay the time to delay first execution 141 * @param period the period between successive executions of the task 142 * @param unit the time unit of the initialDelay and period parameters 143 */ 144 public static Scheduler newFixedRateSchedule( 145 final long initialDelay, final long period, final TimeUnit unit) { 146 checkNotNull(unit); 147 checkArgument(period > 0, "period must be > 0, found %s", period); 148 return new Scheduler() { 149 @Override 150 public Future<?> schedule( 151 AbstractService service, ScheduledExecutorService executor, Runnable task) { 152 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 153 } 154 }; 155 } 156 157 /** Schedules the task to run on the provided executor on behalf of the service. */ 158 abstract Future<?> schedule( 159 AbstractService service, ScheduledExecutorService executor, Runnable runnable); 160 161 private Scheduler() {} 162 } 163 164 /* use AbstractService for state management */ 165 private final AbstractService delegate = new ServiceDelegate(); 166 167 @WeakOuter 168 private final class ServiceDelegate extends AbstractService { 169 170 // A handle to the running task so that we can stop it when a shutdown has been requested. 171 // These two fields are volatile because their values will be accessed from multiple threads. 172 private volatile Future<?> runningTask; 173 private volatile ScheduledExecutorService executorService; 174 175 // This lock protects the task so we can ensure that none of the template methods (startUp, 176 // shutDown or runOneIteration) run concurrently with one another. 177 // TODO(lukes): why don't we use ListenableFuture to sequence things? Then we could drop the 178 // lock. 179 private final ReentrantLock lock = new ReentrantLock(); 180 181 @WeakOuter 182 class Task implements Runnable { 183 @Override 184 public void run() { 185 lock.lock(); 186 try { 187 if (runningTask.isCancelled()) { 188 // task may have been cancelled while blocked on the lock. 189 return; 190 } 191 AbstractScheduledService.this.runOneIteration(); 192 } catch (Throwable t) { 193 try { 194 shutDown(); 195 } catch (Exception ignored) { 196 logger.log( 197 Level.WARNING, 198 "Error while attempting to shut down the service after failure.", 199 ignored); 200 } 201 notifyFailed(t); 202 runningTask.cancel(false); // prevent future invocations. 203 } finally { 204 lock.unlock(); 205 } 206 } 207 } 208 209 private final Runnable task = new Task(); 210 211 @Override 212 protected final void doStart() { 213 executorService = 214 MoreExecutors.renamingDecorator( 215 executor(), 216 new Supplier<String>() { 217 @Override 218 public String get() { 219 return serviceName() + " " + state(); 220 } 221 }); 222 executorService.execute( 223 new Runnable() { 224 @Override 225 public void run() { 226 lock.lock(); 227 try { 228 startUp(); 229 runningTask = scheduler().schedule(delegate, executorService, task); 230 notifyStarted(); 231 } catch (Throwable t) { 232 notifyFailed(t); 233 if (runningTask != null) { 234 // prevent the task from running if possible 235 runningTask.cancel(false); 236 } 237 } finally { 238 lock.unlock(); 239 } 240 } 241 }); 242 } 243 244 @Override 245 protected final void doStop() { 246 runningTask.cancel(false); 247 executorService.execute( 248 new Runnable() { 249 @Override 250 public void run() { 251 try { 252 lock.lock(); 253 try { 254 if (state() != State.STOPPING) { 255 // This means that the state has changed since we were scheduled. This implies 256 // that an execution of runOneIteration has thrown an exception and we have 257 // transitioned to a failed state, also this means that shutDown has already 258 // been called, so we do not want to call it again. 259 return; 260 } 261 shutDown(); 262 } finally { 263 lock.unlock(); 264 } 265 notifyStopped(); 266 } catch (Throwable t) { 267 notifyFailed(t); 268 } 269 } 270 }); 271 } 272 273 @Override 274 public String toString() { 275 return AbstractScheduledService.this.toString(); 276 } 277 } 278 279 /** Constructor for use by subclasses. */ 280 protected AbstractScheduledService() {} 281 282 /** 283 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 284 * the service will transition to the {@link Service.State#FAILED} state and this method will no 285 * longer be called. 286 */ 287 protected abstract void runOneIteration() throws Exception; 288 289 /** 290 * Start the service. 291 * 292 * <p>By default this method does nothing. 293 */ 294 protected void startUp() throws Exception {} 295 296 /** 297 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 298 * 299 * <p>By default this method does nothing. 300 */ 301 protected void shutDown() throws Exception {} 302 303 /** 304 * Returns the {@link Scheduler} object used to configure this service. This method will only be 305 * called once. 306 */ 307 // TODO(cpovirk): @ForOverride 308 protected abstract Scheduler scheduler(); 309 310 /** 311 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 312 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 313 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this service 314 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 315 * fails}. Subclasses may override this method to supply a custom {@link ScheduledExecutorService} 316 * instance. This method is guaranteed to only be called once. 317 * 318 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 319 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. Also, 320 * the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the service 321 * {@linkplain Service.State#TERMINATED terminates} or {@linkplain Service.State#TERMINATED 322 * fails}. 323 */ 324 protected ScheduledExecutorService executor() { 325 @WeakOuter 326 class ThreadFactoryImpl implements ThreadFactory { 327 @Override 328 public Thread newThread(Runnable runnable) { 329 return MoreExecutors.newThread(serviceName(), runnable); 330 } 331 } 332 final ScheduledExecutorService executor = 333 Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl()); 334 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 335 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 336 // Technically this listener is added after start() was called so it is a little gross, but it 337 // is called within doStart() so we know that the service cannot terminate or fail concurrently 338 // with adding this listener so it is impossible to miss an event that we are interested in. 339 addListener( 340 new Listener() { 341 @Override 342 public void terminated(State from) { 343 executor.shutdown(); 344 } 345 346 @Override 347 public void failed(State from, Throwable failure) { 348 executor.shutdown(); 349 } 350 }, 351 directExecutor()); 352 return executor; 353 } 354 355 /** 356 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 357 * debugging output. 358 * 359 * @since 14.0 360 */ 361 protected String serviceName() { 362 return getClass().getSimpleName(); 363 } 364 365 @Override 366 public String toString() { 367 return serviceName() + " [" + state() + "]"; 368 } 369 370 @Override 371 public final boolean isRunning() { 372 return delegate.isRunning(); 373 } 374 375 @Override 376 public final State state() { 377 return delegate.state(); 378 } 379 380 /** @since 13.0 */ 381 @Override 382 public final void addListener(Listener listener, Executor executor) { 383 delegate.addListener(listener, executor); 384 } 385 386 /** @since 14.0 */ 387 @Override 388 public final Throwable failureCause() { 389 return delegate.failureCause(); 390 } 391 392 /** @since 15.0 */ 393 @CanIgnoreReturnValue 394 @Override 395 public final Service startAsync() { 396 delegate.startAsync(); 397 return this; 398 } 399 400 /** @since 15.0 */ 401 @CanIgnoreReturnValue 402 @Override 403 public final Service stopAsync() { 404 delegate.stopAsync(); 405 return this; 406 } 407 408 /** @since 15.0 */ 409 @Override 410 public final void awaitRunning() { 411 delegate.awaitRunning(); 412 } 413 414 /** @since 15.0 */ 415 @Override 416 public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 417 delegate.awaitRunning(timeout, unit); 418 } 419 420 /** @since 15.0 */ 421 @Override 422 public final void awaitTerminated() { 423 delegate.awaitTerminated(); 424 } 425 426 /** @since 15.0 */ 427 @Override 428 public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 429 delegate.awaitTerminated(timeout, unit); 430 } 431 432 /** 433 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 434 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't been 435 * cancelled, the {@link #getNextSchedule} method will be called. 436 * 437 * @author Luke Sandberg 438 * @since 11.0 439 */ 440 @Beta 441 public abstract static class CustomScheduler extends Scheduler { 442 443 /** A callable class that can reschedule itself using a {@link CustomScheduler}. */ 444 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 445 446 /** The underlying task. */ 447 private final Runnable wrappedRunnable; 448 449 /** The executor on which this Callable will be scheduled. */ 450 private final ScheduledExecutorService executor; 451 452 /** 453 * The service that is managing this callable. This is used so that failure can be reported 454 * properly. 455 */ 456 private final AbstractService service; 457 458 /** 459 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 460 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 461 * ensure that it is assigned atomically with being scheduled. 462 */ 463 private final ReentrantLock lock = new ReentrantLock(); 464 465 /** The future that represents the next execution of this task. */ 466 @GuardedBy("lock") 467 private Future<Void> currentFuture; 468 469 ReschedulableCallable( 470 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 471 this.wrappedRunnable = runnable; 472 this.executor = executor; 473 this.service = service; 474 } 475 476 @Override 477 public Void call() throws Exception { 478 wrappedRunnable.run(); 479 reschedule(); 480 return null; 481 } 482 483 /** Atomically reschedules this task and assigns the new future to {@link #currentFuture}. */ 484 public void reschedule() { 485 // invoke the callback outside the lock, prevents some shenanigans. 486 Schedule schedule; 487 try { 488 schedule = CustomScheduler.this.getNextSchedule(); 489 } catch (Throwable t) { 490 service.notifyFailed(t); 491 return; 492 } 493 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 494 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 495 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 496 // correct order. 497 Throwable scheduleFailure = null; 498 lock.lock(); 499 try { 500 if (currentFuture == null || !currentFuture.isCancelled()) { 501 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 502 } 503 } catch (Throwable e) { 504 // If an exception is thrown by the subclass then we need to make sure that the service 505 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 506 // because the service does not monitor the state of the future so if the exception is not 507 // caught and forwarded to the service the task would stop executing but the service would 508 // have no idea. 509 // TODO(lukes): consider building everything in terms of ListenableScheduledFuture then 510 // the AbstractService could monitor the future directly. Rescheduling is still hard... 511 // but it would help with some of these lock ordering issues. 512 scheduleFailure = e; 513 } finally { 514 lock.unlock(); 515 } 516 // Call notifyFailed outside the lock to avoid lock ordering issues. 517 if (scheduleFailure != null) { 518 service.notifyFailed(scheduleFailure); 519 } 520 } 521 522 // N.B. Only protect cancel and isCancelled because those are the only methods that are 523 // invoked by the AbstractScheduledService. 524 @Override 525 public boolean cancel(boolean mayInterruptIfRunning) { 526 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 527 lock.lock(); 528 try { 529 return currentFuture.cancel(mayInterruptIfRunning); 530 } finally { 531 lock.unlock(); 532 } 533 } 534 535 @Override 536 public boolean isCancelled() { 537 lock.lock(); 538 try { 539 return currentFuture.isCancelled(); 540 } finally { 541 lock.unlock(); 542 } 543 } 544 545 @Override 546 protected Future<Void> delegate() { 547 throw new UnsupportedOperationException( 548 "Only cancel and isCancelled is supported by this future"); 549 } 550 } 551 552 @Override 553 final Future<?> schedule( 554 AbstractService service, ScheduledExecutorService executor, Runnable runnable) { 555 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 556 task.reschedule(); 557 return task; 558 } 559 560 /** 561 * A value object that represents an absolute delay until a task should be invoked. 562 * 563 * @author Luke Sandberg 564 * @since 11.0 565 */ 566 @Beta 567 protected static final class Schedule { 568 569 private final long delay; 570 private final TimeUnit unit; 571 572 /** 573 * @param delay the time from now to delay execution 574 * @param unit the time unit of the delay parameter 575 */ 576 public Schedule(long delay, TimeUnit unit) { 577 this.delay = delay; 578 this.unit = checkNotNull(unit); 579 } 580 } 581 582 /** 583 * Calculates the time at which to next invoke the task. 584 * 585 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 586 * on the same thread as the previous execution of {@link 587 * AbstractScheduledService#runOneIteration}. 588 * 589 * @return a schedule that defines the delay before the next execution. 590 */ 591 // TODO(cpovirk): @ForOverride 592 protected abstract Schedule getNextSchedule() throws Exception; 593 } 594}