001 /* 002 * Copyright (C) 2007 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkArgument; 020 import static com.google.common.base.Preconditions.checkNotNull; 021 022 import com.google.common.annotations.Beta; 023 import com.google.common.collect.Lists; 024 import com.google.common.collect.Queues; 025 026 import java.util.Collection; 027 import java.util.Collections; 028 import java.util.Iterator; 029 import java.util.List; 030 import java.util.concurrent.BlockingQueue; 031 import java.util.concurrent.Callable; 032 import java.util.concurrent.ExecutionException; 033 import java.util.concurrent.ExecutorService; 034 import java.util.concurrent.Executors; 035 import java.util.concurrent.Future; 036 import java.util.concurrent.RejectedExecutionException; 037 import java.util.concurrent.ScheduledExecutorService; 038 import java.util.concurrent.ScheduledFuture; 039 import java.util.concurrent.ScheduledThreadPoolExecutor; 040 import java.util.concurrent.ThreadFactory; 041 import java.util.concurrent.ThreadPoolExecutor; 042 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; 043 import java.util.concurrent.TimeUnit; 044 import java.util.concurrent.TimeoutException; 045 import java.util.concurrent.locks.Condition; 046 import java.util.concurrent.locks.Lock; 047 import java.util.concurrent.locks.ReentrantLock; 048 049 /** 050 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link 051 * ExecutorService}, and {@link ThreadFactory}. 052 * 053 * @author Eric Fellheimer 054 * @author Kyle Littlefield 055 * @author Justin Mahoney 056 * @since 3.0 057 */ 058 public final class MoreExecutors { 059 private MoreExecutors() {} 060 061 /** 062 * Converts the given ThreadPoolExecutor into an ExecutorService that exits 063 * when the application is complete. It does so by using daemon threads and 064 * adding a shutdown hook to wait for their completion. 065 * 066 * <p>This is mainly for fixed thread pools. 067 * See {@link Executors#newFixedThreadPool(int)}. 068 * 069 * @param executor the executor to modify to make sure it exits when the 070 * application is finished 071 * @param terminationTimeout how long to wait for the executor to 072 * finish before terminating the JVM 073 * @param timeUnit unit of time for the time parameter 074 * @return an unmodifiable version of the input which will not hang the JVM 075 */ 076 @Beta 077 public static ExecutorService getExitingExecutorService( 078 ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { 079 executor.setThreadFactory(new ThreadFactoryBuilder() 080 .setDaemon(true) 081 .setThreadFactory(executor.getThreadFactory()) 082 .build()); 083 084 ExecutorService service = Executors.unconfigurableExecutorService(executor); 085 086 addDelayedShutdownHook(service, terminationTimeout, timeUnit); 087 088 return service; 089 } 090 091 /** 092 * Converts the given ScheduledThreadPoolExecutor into a 093 * ScheduledExecutorService that exits when the application is complete. It 094 * does so by using daemon threads and adding a shutdown hook to wait for 095 * their completion. 096 * 097 * <p>This is mainly for fixed thread pools. 098 * See {@link Executors#newScheduledThreadPool(int)}. 099 * 100 * @param executor the executor to modify to make sure it exits when the 101 * application is finished 102 * @param terminationTimeout how long to wait for the executor to 103 * finish before terminating the JVM 104 * @param timeUnit unit of time for the time parameter 105 * @return an unmodifiable version of the input which will not hang the JVM 106 */ 107 @Beta 108 public static ScheduledExecutorService getExitingScheduledExecutorService( 109 ScheduledThreadPoolExecutor executor, long terminationTimeout, 110 TimeUnit timeUnit) { 111 executor.setThreadFactory(new ThreadFactoryBuilder() 112 .setDaemon(true) 113 .setThreadFactory(executor.getThreadFactory()) 114 .build()); 115 116 ScheduledExecutorService service = 117 Executors.unconfigurableScheduledExecutorService(executor); 118 119 addDelayedShutdownHook(service, terminationTimeout, timeUnit); 120 121 return service; 122 } 123 124 /** 125 * Add a shutdown hook to wait for thread completion in the given 126 * {@link ExecutorService service}. This is useful if the given service uses 127 * daemon threads, and we want to keep the JVM from exiting immediately on 128 * shutdown, instead giving these daemon threads a chance to terminate 129 * normally. 130 * @param service ExecutorService which uses daemon threads 131 * @param terminationTimeout how long to wait for the executor to finish 132 * before terminating the JVM 133 * @param timeUnit unit of time for the time parameter 134 */ 135 @Beta 136 public static void addDelayedShutdownHook( 137 final ExecutorService service, final long terminationTimeout, 138 final TimeUnit timeUnit) { 139 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 140 @Override 141 public void run() { 142 try { 143 // We'd like to log progress and failures that may arise in the 144 // following code, but unfortunately the behavior of logging 145 // is undefined in shutdown hooks. 146 // This is because the logging code installs a shutdown hook of its 147 // own. See Cleaner class inside {@link LogManager}. 148 service.shutdown(); 149 service.awaitTermination(terminationTimeout, timeUnit); 150 } catch (InterruptedException ignored) { 151 // We're shutting down anyway, so just ignore. 152 } 153 } 154 }, "DelayedShutdownHook-for-" + service)); 155 } 156 157 /** 158 * Converts the given ThreadPoolExecutor into an ExecutorService that exits 159 * when the application is complete. It does so by using daemon threads and 160 * adding a shutdown hook to wait for their completion. 161 * 162 * <p>This method waits 120 seconds before continuing with JVM termination, 163 * even if the executor has not finished its work. 164 * 165 * <p>This is mainly for fixed thread pools. 166 * See {@link Executors#newFixedThreadPool(int)}. 167 * 168 * @param executor the executor to modify to make sure it exits when the 169 * application is finished 170 * @return an unmodifiable version of the input which will not hang the JVM 171 */ 172 @Beta 173 public static ExecutorService getExitingExecutorService( 174 ThreadPoolExecutor executor) { 175 return getExitingExecutorService(executor, 120, TimeUnit.SECONDS); 176 } 177 178 /** 179 * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that 180 * exits when the application is complete. It does so by using daemon threads 181 * and adding a shutdown hook to wait for their completion. 182 * 183 * <p>This method waits 120 seconds before continuing with JVM termination, 184 * even if the executor has not finished its work. 185 * 186 * <p>This is mainly for fixed thread pools. 187 * See {@link Executors#newScheduledThreadPool(int)}. 188 * 189 * @param executor the executor to modify to make sure it exits when the 190 * application is finished 191 * @return an unmodifiable version of the input which will not hang the JVM 192 */ 193 @Beta 194 public static ScheduledExecutorService getExitingScheduledExecutorService( 195 ScheduledThreadPoolExecutor executor) { 196 return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS); 197 } 198 199 /** 200 * Creates an executor service that runs each task in the thread 201 * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy} This 202 * applies both to individually submitted tasks and to collections of tasks 203 * submitted via {@code invokeAll} or {@code invokeAny}. In the latter case, 204 * tasks will run serially on the calling thread. Tasks are run to 205 * completion before a {@code Future} is returned to the caller (unless the 206 * executor has been shutdown). 207 * 208 * <p>Although all tasks are immediately executed in the thread that 209 * submitted the task, this {@code ExecutorService} imposes a small 210 * locking overhead on each task submission in order to implement shutdown 211 * and termination behavior. 212 * 213 * <p>The implementation deviates from the {@code ExecutorService} 214 * specification with regards to the {@code shutdownNow} method. First, 215 * "best-effort" with regards to canceling running tasks is implemented 216 * as "no-effort". No interrupts or other attempts are made to stop 217 * threads executing tasks. Second, the returned list will always be empty, 218 * as any submitted task is considered to have started execution. 219 * This applies also to tasks given to {@code invokeAll} or {@code invokeAny} 220 * which are pending serial execution, even the subset of the tasks that 221 * have not yet started execution. It is unclear from the 222 * {@code ExecutorService} specification if these should be included, and 223 * it's much easier to implement the interpretation that they not be. 224 * Finally, a call to {@code shutdown} or {@code shutdownNow} may result 225 * in concurrent calls to {@code invokeAll/invokeAny} throwing 226 * RejectedExecutionException, although a subset of the tasks may already 227 * have been executed. 228 * 229 * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility" 230 * >mostly source-compatible</a> since 3.0) 231 */ 232 public static ListeningExecutorService sameThreadExecutor() { 233 return new SameThreadExecutorService(); 234 } 235 236 // See sameThreadExecutor javadoc for behavioral notes. 237 private static class SameThreadExecutorService 238 extends AbstractListeningExecutorService { 239 /** 240 * Lock used whenever accessing the state variables 241 * (runningTasks, shutdown, terminationCondition) of the executor 242 */ 243 private final Lock lock = new ReentrantLock(); 244 245 /** Signaled after the executor is shutdown and running tasks are done */ 246 private final Condition termination = lock.newCondition(); 247 248 /* 249 * Conceptually, these two variables describe the executor being in 250 * one of three states: 251 * - Active: shutdown == false 252 * - Shutdown: runningTasks > 0 and shutdown == true 253 * - Terminated: runningTasks == 0 and shutdown == true 254 */ 255 private int runningTasks = 0; 256 private boolean shutdown = false; 257 258 @Override 259 public void execute(Runnable command) { 260 startTask(); 261 try { 262 command.run(); 263 } finally { 264 endTask(); 265 } 266 } 267 268 @Override 269 public boolean isShutdown() { 270 lock.lock(); 271 try { 272 return shutdown; 273 } finally { 274 lock.unlock(); 275 } 276 } 277 278 @Override 279 public void shutdown() { 280 lock.lock(); 281 try { 282 shutdown = true; 283 } finally { 284 lock.unlock(); 285 } 286 } 287 288 // See sameThreadExecutor javadoc for unusual behavior of this method. 289 @Override 290 public List<Runnable> shutdownNow() { 291 shutdown(); 292 return Collections.emptyList(); 293 } 294 295 @Override 296 public boolean isTerminated() { 297 lock.lock(); 298 try { 299 return shutdown && runningTasks == 0; 300 } finally { 301 lock.unlock(); 302 } 303 } 304 305 @Override 306 public boolean awaitTermination(long timeout, TimeUnit unit) 307 throws InterruptedException { 308 long nanos = unit.toNanos(timeout); 309 lock.lock(); 310 try { 311 for (;;) { 312 if (isTerminated()) { 313 return true; 314 } else if (nanos <= 0) { 315 return false; 316 } else { 317 nanos = termination.awaitNanos(nanos); 318 } 319 } 320 } finally { 321 lock.unlock(); 322 } 323 } 324 325 /** 326 * Checks if the executor has been shut down and increments the running 327 * task count. 328 * 329 * @throws RejectedExecutionException if the executor has been previously 330 * shutdown 331 */ 332 private void startTask() { 333 lock.lock(); 334 try { 335 if (isShutdown()) { 336 throw new RejectedExecutionException("Executor already shutdown"); 337 } 338 runningTasks++; 339 } finally { 340 lock.unlock(); 341 } 342 } 343 344 /** 345 * Decrements the running task count. 346 */ 347 private void endTask() { 348 lock.lock(); 349 try { 350 runningTasks--; 351 if (isTerminated()) { 352 termination.signalAll(); 353 } 354 } finally { 355 lock.unlock(); 356 } 357 } 358 } 359 360 /** 361 * Creates an {@link ExecutorService} whose {@code submit} and {@code 362 * invokeAll} methods submit {@link ListenableFutureTask} instances to the 363 * given delegate executor. Those methods, as well as {@code execute} and 364 * {@code invokeAny}, are implemented in terms of calls to {@code 365 * delegate.execute}. All other methods are forwarded unchanged to the 366 * delegate. This implies that the returned {@code ListeningExecutorService} 367 * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code 368 * invokeAny} methods, so any special handling of tasks must be implemented in 369 * the delegate's {@code execute} method or by wrapping the returned {@code 370 * ListeningExecutorService}. 371 * 372 * <p>If the delegate executor was already an instance of {@code 373 * ListeningExecutorService}, it is returned untouched, and the rest of this 374 * documentation does not apply. 375 * 376 * @since 10.0 377 */ 378 public static ListeningExecutorService listeningDecorator( 379 ExecutorService delegate) { 380 return (delegate instanceof ListeningExecutorService) 381 ? (ListeningExecutorService) delegate 382 : (delegate instanceof ScheduledExecutorService) 383 ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate) 384 : new ListeningDecorator(delegate); 385 } 386 387 /** 388 * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code 389 * invokeAll} methods submit {@link ListenableFutureTask} instances to the 390 * given delegate executor. Those methods, as well as {@code execute} and 391 * {@code invokeAny}, are implemented in terms of calls to {@code 392 * delegate.execute}. All other methods are forwarded unchanged to the 393 * delegate. This implies that the returned {@code 394 * SchedulingListeningExecutorService} never calls the delegate's {@code 395 * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special 396 * handling of tasks must be implemented in the delegate's {@code execute} 397 * method or by wrapping the returned {@code 398 * SchedulingListeningExecutorService}. 399 * 400 * <p>If the delegate executor was already an instance of {@code 401 * ListeningScheduledExecutorService}, it is returned untouched, and the rest 402 * of this documentation does not apply. 403 * 404 * @since 10.0 405 */ 406 public static ListeningScheduledExecutorService listeningDecorator( 407 ScheduledExecutorService delegate) { 408 return (delegate instanceof ListeningScheduledExecutorService) 409 ? (ListeningScheduledExecutorService) delegate 410 : new ScheduledListeningDecorator(delegate); 411 } 412 413 private static class ListeningDecorator 414 extends AbstractListeningExecutorService { 415 final ExecutorService delegate; 416 417 ListeningDecorator(ExecutorService delegate) { 418 this.delegate = checkNotNull(delegate); 419 } 420 421 @Override 422 public boolean awaitTermination(long timeout, TimeUnit unit) 423 throws InterruptedException { 424 return delegate.awaitTermination(timeout, unit); 425 } 426 427 @Override 428 public boolean isShutdown() { 429 return delegate.isShutdown(); 430 } 431 432 @Override 433 public boolean isTerminated() { 434 return delegate.isTerminated(); 435 } 436 437 @Override 438 public void shutdown() { 439 delegate.shutdown(); 440 } 441 442 @Override 443 public List<Runnable> shutdownNow() { 444 return delegate.shutdownNow(); 445 } 446 447 @Override 448 public void execute(Runnable command) { 449 delegate.execute(command); 450 } 451 } 452 453 private static class ScheduledListeningDecorator 454 extends ListeningDecorator implements ListeningScheduledExecutorService { 455 @SuppressWarnings("hiding") 456 final ScheduledExecutorService delegate; 457 458 ScheduledListeningDecorator(ScheduledExecutorService delegate) { 459 super(delegate); 460 this.delegate = checkNotNull(delegate); 461 } 462 463 @Override 464 public ScheduledFuture<?> schedule( 465 Runnable command, long delay, TimeUnit unit) { 466 return delegate.schedule(command, delay, unit); 467 } 468 469 @Override 470 public <V> ScheduledFuture<V> schedule( 471 Callable<V> callable, long delay, TimeUnit unit) { 472 return delegate.schedule(callable, delay, unit); 473 } 474 475 @Override 476 public ScheduledFuture<?> scheduleAtFixedRate( 477 Runnable command, long initialDelay, long period, TimeUnit unit) { 478 return delegate.scheduleAtFixedRate(command, initialDelay, period, unit); 479 } 480 481 @Override 482 public ScheduledFuture<?> scheduleWithFixedDelay( 483 Runnable command, long initialDelay, long delay, TimeUnit unit) { 484 return delegate.scheduleWithFixedDelay( 485 command, initialDelay, delay, unit); 486 } 487 } 488 489 /* 490 * This following method is a modified version of one found in 491 * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30 492 * which contained the following notice: 493 * 494 * Written by Doug Lea with assistance from members of JCP JSR-166 495 * Expert Group and released to the public domain, as explained at 496 * http://creativecommons.org/publicdomain/zero/1.0/ 497 * Other contributors include Andrew Wright, Jeffrey Hayes, 498 * Pat Fisher, Mike Judd. 499 */ 500 501 /** 502 * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService} 503 * implementations. 504 */ static <T> T invokeAnyImpl(ListeningExecutorService executorService, 505 Collection<? extends Callable<T>> tasks, boolean timed, long nanos) 506 throws InterruptedException, ExecutionException, TimeoutException { 507 int ntasks = tasks.size(); 508 checkArgument(ntasks > 0); 509 List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks); 510 BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue(); 511 512 // For efficiency, especially in executors with limited 513 // parallelism, check to see if previously submitted tasks are 514 // done before submitting more of them. This interleaving 515 // plus the exception mechanics account for messiness of main 516 // loop. 517 518 try { 519 // Record exceptions so that if we fail to obtain any 520 // result, we can throw the last exception we got. 521 ExecutionException ee = null; 522 long lastTime = timed ? System.nanoTime() : 0; 523 Iterator<? extends Callable<T>> it = tasks.iterator(); 524 525 futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue)); 526 --ntasks; 527 int active = 1; 528 529 for (;;) { 530 Future<T> f = futureQueue.poll(); 531 if (f == null) { 532 if (ntasks > 0) { 533 --ntasks; 534 futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue)); 535 ++active; 536 } else if (active == 0) { 537 break; 538 } else if (timed) { 539 f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS); 540 if (f == null) { 541 throw new TimeoutException(); 542 } 543 long now = System.nanoTime(); 544 nanos -= now - lastTime; 545 lastTime = now; 546 } else { 547 f = futureQueue.take(); 548 } 549 } 550 if (f != null) { 551 --active; 552 try { 553 return f.get(); 554 } catch (ExecutionException eex) { 555 ee = eex; 556 } catch (RuntimeException rex) { 557 ee = new ExecutionException(rex); 558 } 559 } 560 } 561 562 if (ee == null) { 563 ee = new ExecutionException(null); 564 } 565 throw ee; 566 } finally { 567 for (Future<T> f : futures) { 568 f.cancel(true); 569 } 570 } 571 } 572 573 /** 574 * Submits the task and adds a listener that adds the future to {@code queue} when it completes. 575 */ 576 private static <T> ListenableFuture<T> submitAndAddQueueListener( 577 ListeningExecutorService executorService, Callable<T> task, 578 final BlockingQueue<Future<T>> queue) { 579 final ListenableFuture<T> future = executorService.submit(task); 580 future.addListener(new Runnable() { 581 @Override public void run() { 582 queue.add(future); 583 } 584 }, MoreExecutors.sameThreadExecutor()); 585 return future; 586 } 587 }