001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 020 021import com.google.common.annotations.Beta; 022import com.google.common.base.Preconditions; 023import com.google.common.base.Supplier; 024import com.google.common.base.Throwables; 025 026import java.util.concurrent.Callable; 027import java.util.concurrent.Executor; 028import java.util.concurrent.Executors; 029import java.util.concurrent.Future; 030import java.util.concurrent.ScheduledExecutorService; 031import java.util.concurrent.ThreadFactory; 032import java.util.concurrent.TimeUnit; 033import java.util.concurrent.TimeoutException; 034import java.util.concurrent.locks.ReentrantLock; 035import java.util.logging.Level; 036import java.util.logging.Logger; 037 038import javax.annotation.concurrent.GuardedBy; 039 040/** 041 * Base class for services that can implement {@link #startUp} and {@link #shutDown} but while in 042 * the "running" state need to perform a periodic task. Subclasses can implement {@link #startUp}, 043 * {@link #shutDown} and also a {@link #runOneIteration} method that will be executed periodically. 044 * 045 * <p>This class uses the {@link ScheduledExecutorService} returned from {@link #executor} to run 046 * the {@link #startUp} and {@link #shutDown} methods and also uses that service to schedule the 047 * {@link #runOneIteration} that will be executed periodically as specified by its 048 * {@link Scheduler}. When this service is asked to stop via {@link #stopAsync} it will cancel the 049 * periodic task (but not interrupt it) and wait for it to stop before running the 050 * {@link #shutDown} method. 051 * 052 * <p>Subclasses are guaranteed that the life cycle methods ({@link #runOneIteration}, {@link 053 * #startUp} and {@link #shutDown}) will never run concurrently. Notably, if any execution of {@link 054 * #runOneIteration} takes longer than its schedule defines, then subsequent executions may start 055 * late. Also, all life cycle methods are executed with a lock held, so subclasses can safely 056 * modify shared state without additional synchronization necessary for visibility to later 057 * executions of the life cycle methods. 058 * 059 * <h3>Usage Example</h3> 060 * 061 * <p>Here is a sketch of a service which crawls a website and uses the scheduling capabilities to 062 * rate limit itself. <pre> {@code 063 * class CrawlingService extends AbstractScheduledService { 064 * private Set<Uri> visited; 065 * private Queue<Uri> toCrawl; 066 * protected void startUp() throws Exception { 067 * toCrawl = readStartingUris(); 068 * } 069 * 070 * protected void runOneIteration() throws Exception { 071 * Uri uri = toCrawl.remove(); 072 * Collection<Uri> newUris = crawl(uri); 073 * visited.add(uri); 074 * for (Uri newUri : newUris) { 075 * if (!visited.contains(newUri)) { toCrawl.add(newUri); } 076 * } 077 * } 078 * 079 * protected void shutDown() throws Exception { 080 * saveUris(toCrawl); 081 * } 082 * 083 * protected Scheduler scheduler() { 084 * return Scheduler.newFixedRateSchedule(0, 1, TimeUnit.SECONDS); 085 * } 086 * }}</pre> 087 * 088 * <p>This class uses the life cycle methods to read in a list of starting URIs and save the set of 089 * outstanding URIs when shutting down. Also, it takes advantage of the scheduling functionality to 090 * rate limit the number of queries we perform. 091 * 092 * @author Luke Sandberg 093 * @since 11.0 094 */ 095@Beta 096public abstract class AbstractScheduledService implements Service { 097 private static final Logger logger = Logger.getLogger(AbstractScheduledService.class.getName()); 098 099 /** 100 * A scheduler defines the policy for how the {@link AbstractScheduledService} should run its 101 * task. 102 * 103 * <p>Consider using the {@link #newFixedDelaySchedule} and {@link #newFixedRateSchedule} factory 104 * methods, these provide {@link Scheduler} instances for the common use case of running the 105 * service with a fixed schedule. If more flexibility is needed then consider subclassing 106 * {@link CustomScheduler}. 107 * 108 * @author Luke Sandberg 109 * @since 11.0 110 */ 111 public abstract static class Scheduler { 112 /** 113 * Returns a {@link Scheduler} that schedules the task using the 114 * {@link ScheduledExecutorService#scheduleWithFixedDelay} method. 115 * 116 * @param initialDelay the time to delay first execution 117 * @param delay the delay between the termination of one execution and the commencement of the 118 * next 119 * @param unit the time unit of the initialDelay and delay parameters 120 */ 121 public static Scheduler newFixedDelaySchedule(final long initialDelay, final long delay, 122 final TimeUnit unit) { 123 return new Scheduler() { 124 @Override 125 public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 126 Runnable task) { 127 return executor.scheduleWithFixedDelay(task, initialDelay, delay, unit); 128 } 129 }; 130 } 131 132 /** 133 * Returns a {@link Scheduler} that schedules the task using the 134 * {@link ScheduledExecutorService#scheduleAtFixedRate} method. 135 * 136 * @param initialDelay the time to delay first execution 137 * @param period the period between successive executions of the task 138 * @param unit the time unit of the initialDelay and period parameters 139 */ 140 public static Scheduler newFixedRateSchedule(final long initialDelay, final long period, 141 final TimeUnit unit) { 142 return new Scheduler() { 143 @Override 144 public Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 145 Runnable task) { 146 return executor.scheduleAtFixedRate(task, initialDelay, period, unit); 147 } 148 }; 149 } 150 151 /** Schedules the task to run on the provided executor on behalf of the service. */ 152 abstract Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 153 Runnable runnable); 154 155 private Scheduler() {} 156 } 157 158 /* use AbstractService for state management */ 159 private final AbstractService delegate = new AbstractService() { 160 161 // A handle to the running task so that we can stop it when a shutdown has been requested. 162 // These two fields are volatile because their values will be accessed from multiple threads. 163 private volatile Future<?> runningTask; 164 private volatile ScheduledExecutorService executorService; 165 166 // This lock protects the task so we can ensure that none of the template methods (startUp, 167 // shutDown or runOneIteration) run concurrently with one another. 168 private final ReentrantLock lock = new ReentrantLock(); 169 170 private final Runnable task = new Runnable() { 171 @Override public void run() { 172 lock.lock(); 173 try { 174 AbstractScheduledService.this.runOneIteration(); 175 } catch (Throwable t) { 176 try { 177 shutDown(); 178 } catch (Exception ignored) { 179 logger.log(Level.WARNING, 180 "Error while attempting to shut down the service after failure.", ignored); 181 } 182 notifyFailed(t); 183 throw Throwables.propagate(t); 184 } finally { 185 lock.unlock(); 186 } 187 } 188 }; 189 190 @Override protected final void doStart() { 191 executorService = MoreExecutors.renamingDecorator(executor(), new Supplier<String>() { 192 @Override public String get() { 193 return serviceName() + " " + state(); 194 } 195 }); 196 executorService.execute(new Runnable() { 197 @Override public void run() { 198 lock.lock(); 199 try { 200 startUp(); 201 runningTask = scheduler().schedule(delegate, executorService, task); 202 notifyStarted(); 203 } catch (Throwable t) { 204 notifyFailed(t); 205 throw Throwables.propagate(t); 206 } finally { 207 lock.unlock(); 208 } 209 } 210 }); 211 } 212 213 @Override protected final void doStop() { 214 runningTask.cancel(false); 215 executorService.execute(new Runnable() { 216 @Override public void run() { 217 try { 218 lock.lock(); 219 try { 220 if (state() != State.STOPPING) { 221 // This means that the state has changed since we were scheduled. This implies that 222 // an execution of runOneIteration has thrown an exception and we have transitioned 223 // to a failed state, also this means that shutDown has already been called, so we 224 // do not want to call it again. 225 return; 226 } 227 shutDown(); 228 } finally { 229 lock.unlock(); 230 } 231 notifyStopped(); 232 } catch (Throwable t) { 233 notifyFailed(t); 234 throw Throwables.propagate(t); 235 } 236 } 237 }); 238 } 239 }; 240 241 /** Constructor for use by subclasses. */ 242 protected AbstractScheduledService() {} 243 244 /** 245 * Run one iteration of the scheduled task. If any invocation of this method throws an exception, 246 * the service will transition to the {@link Service.State#FAILED} state and this method will no 247 * longer be called. 248 */ 249 protected abstract void runOneIteration() throws Exception; 250 251 /** 252 * Start the service. 253 * 254 * <p>By default this method does nothing. 255 */ 256 protected void startUp() throws Exception {} 257 258 /** 259 * Stop the service. This is guaranteed not to run concurrently with {@link #runOneIteration}. 260 * 261 * <p>By default this method does nothing. 262 */ 263 protected void shutDown() throws Exception {} 264 265 /** 266 * Returns the {@link Scheduler} object used to configure this service. This method will only be 267 * called once. 268 */ 269 protected abstract Scheduler scheduler(); 270 271 /** 272 * Returns the {@link ScheduledExecutorService} that will be used to execute the {@link #startUp}, 273 * {@link #runOneIteration} and {@link #shutDown} methods. If this method is overridden the 274 * executor will not be {@linkplain ScheduledExecutorService#shutdown shutdown} when this 275 * service {@linkplain Service.State#TERMINATED terminates} or 276 * {@linkplain Service.State#TERMINATED fails}. Subclasses may override this method to supply a 277 * custom {@link ScheduledExecutorService} instance. This method is guaranteed to only be called 278 * once. 279 * 280 * <p>By default this returns a new {@link ScheduledExecutorService} with a single thread thread 281 * pool that sets the name of the thread to the {@linkplain #serviceName() service name}. 282 * Also, the pool will be {@linkplain ScheduledExecutorService#shutdown() shut down} when the 283 * service {@linkplain Service.State#TERMINATED terminates} or 284 * {@linkplain Service.State#TERMINATED fails}. 285 */ 286 protected ScheduledExecutorService executor() { 287 final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( 288 new ThreadFactory() { 289 @Override public Thread newThread(Runnable runnable) { 290 return MoreExecutors.newThread(serviceName(), runnable); 291 } 292 }); 293 // Add a listener to shutdown the executor after the service is stopped. This ensures that the 294 // JVM shutdown will not be prevented from exiting after this service has stopped or failed. 295 // Technically this listener is added after start() was called so it is a little gross, but it 296 // is called within doStart() so we know that the service cannot terminate or fail concurrently 297 // with adding this listener so it is impossible to miss an event that we are interested in. 298 addListener(new Listener() { 299 @Override public void terminated(State from) { 300 executor.shutdown(); 301 } 302 @Override public void failed(State from, Throwable failure) { 303 executor.shutdown(); 304 } 305 }, directExecutor()); 306 return executor; 307 } 308 309 /** 310 * Returns the name of this service. {@link AbstractScheduledService} may include the name in 311 * debugging output. 312 * 313 * @since 14.0 314 */ 315 protected String serviceName() { 316 return getClass().getSimpleName(); 317 } 318 319 @Override public String toString() { 320 return serviceName() + " [" + state() + "]"; 321 } 322 323 @Override public final boolean isRunning() { 324 return delegate.isRunning(); 325 } 326 327 @Override public final State state() { 328 return delegate.state(); 329 } 330 331 /** 332 * @since 13.0 333 */ 334 @Override public final void addListener(Listener listener, Executor executor) { 335 delegate.addListener(listener, executor); 336 } 337 338 /** 339 * @since 14.0 340 */ 341 @Override public final Throwable failureCause() { 342 return delegate.failureCause(); 343 } 344 345 /** 346 * @since 15.0 347 */ 348 @Override public final Service startAsync() { 349 delegate.startAsync(); 350 return this; 351 } 352 353 /** 354 * @since 15.0 355 */ 356 @Override public final Service stopAsync() { 357 delegate.stopAsync(); 358 return this; 359 } 360 361 /** 362 * @since 15.0 363 */ 364 @Override public final void awaitRunning() { 365 delegate.awaitRunning(); 366 } 367 368 /** 369 * @since 15.0 370 */ 371 @Override public final void awaitRunning(long timeout, TimeUnit unit) throws TimeoutException { 372 delegate.awaitRunning(timeout, unit); 373 } 374 375 /** 376 * @since 15.0 377 */ 378 @Override public final void awaitTerminated() { 379 delegate.awaitTerminated(); 380 } 381 382 /** 383 * @since 15.0 384 */ 385 @Override public final void awaitTerminated(long timeout, TimeUnit unit) throws TimeoutException { 386 delegate.awaitTerminated(timeout, unit); 387 } 388 389 /** 390 * A {@link Scheduler} that provides a convenient way for the {@link AbstractScheduledService} to 391 * use a dynamically changing schedule. After every execution of the task, assuming it hasn't 392 * been cancelled, the {@link #getNextSchedule} method will be called. 393 * 394 * @author Luke Sandberg 395 * @since 11.0 396 */ 397 @Beta 398 public abstract static class CustomScheduler extends Scheduler { 399 400 /** 401 * A callable class that can reschedule itself using a {@link CustomScheduler}. 402 */ 403 private class ReschedulableCallable extends ForwardingFuture<Void> implements Callable<Void> { 404 405 /** The underlying task. */ 406 private final Runnable wrappedRunnable; 407 408 /** The executor on which this Callable will be scheduled. */ 409 private final ScheduledExecutorService executor; 410 411 /** 412 * The service that is managing this callable. This is used so that failure can be 413 * reported properly. 414 */ 415 private final AbstractService service; 416 417 /** 418 * This lock is used to ensure safe and correct cancellation, it ensures that a new task is 419 * not scheduled while a cancel is ongoing. Also it protects the currentFuture variable to 420 * ensure that it is assigned atomically with being scheduled. 421 */ 422 private final ReentrantLock lock = new ReentrantLock(); 423 424 /** The future that represents the next execution of this task.*/ 425 @GuardedBy("lock") 426 private Future<Void> currentFuture; 427 428 ReschedulableCallable(AbstractService service, ScheduledExecutorService executor, 429 Runnable runnable) { 430 this.wrappedRunnable = runnable; 431 this.executor = executor; 432 this.service = service; 433 } 434 435 @Override 436 public Void call() throws Exception { 437 wrappedRunnable.run(); 438 reschedule(); 439 return null; 440 } 441 442 /** 443 * Atomically reschedules this task and assigns the new future to {@link #currentFuture}. 444 */ 445 public void reschedule() { 446 // We reschedule ourselves with a lock held for two reasons. 1. we want to make sure that 447 // cancel calls cancel on the correct future. 2. we want to make sure that the assignment 448 // to currentFuture doesn't race with itself so that currentFuture is assigned in the 449 // correct order. 450 lock.lock(); 451 try { 452 if (currentFuture == null || !currentFuture.isCancelled()) { 453 final Schedule schedule = CustomScheduler.this.getNextSchedule(); 454 currentFuture = executor.schedule(this, schedule.delay, schedule.unit); 455 } 456 } catch (Throwable e) { 457 // If an exception is thrown by the subclass then we need to make sure that the service 458 // notices and transitions to the FAILED state. We do it by calling notifyFailed directly 459 // because the service does not monitor the state of the future so if the exception is not 460 // caught and forwarded to the service the task would stop executing but the service would 461 // have no idea. 462 service.notifyFailed(e); 463 } finally { 464 lock.unlock(); 465 } 466 } 467 468 // N.B. Only protect cancel and isCancelled because those are the only methods that are 469 // invoked by the AbstractScheduledService. 470 @Override 471 public boolean cancel(boolean mayInterruptIfRunning) { 472 // Ensure that a task cannot be rescheduled while a cancel is ongoing. 473 lock.lock(); 474 try { 475 return currentFuture.cancel(mayInterruptIfRunning); 476 } finally { 477 lock.unlock(); 478 } 479 } 480 481 @Override 482 protected Future<Void> delegate() { 483 throw new UnsupportedOperationException("Only cancel is supported by this future"); 484 } 485 } 486 487 @Override 488 final Future<?> schedule(AbstractService service, ScheduledExecutorService executor, 489 Runnable runnable) { 490 ReschedulableCallable task = new ReschedulableCallable(service, executor, runnable); 491 task.reschedule(); 492 return task; 493 } 494 495 /** 496 * A value object that represents an absolute delay until a task should be invoked. 497 * 498 * @author Luke Sandberg 499 * @since 11.0 500 */ 501 @Beta 502 protected static final class Schedule { 503 504 private final long delay; 505 private final TimeUnit unit; 506 507 /** 508 * @param delay the time from now to delay execution 509 * @param unit the time unit of the delay parameter 510 */ 511 public Schedule(long delay, TimeUnit unit) { 512 this.delay = delay; 513 this.unit = Preconditions.checkNotNull(unit); 514 } 515 } 516 517 /** 518 * Calculates the time at which to next invoke the task. 519 * 520 * <p>This is guaranteed to be called immediately after the task has completed an iteration and 521 * on the same thread as the previous execution of {@link 522 * AbstractScheduledService#runOneIteration}. 523 * 524 * @return a schedule that defines the delay before the next execution. 525 */ 526 protected abstract Schedule getNextSchedule() throws Exception; 527 } 528}