001 /* 002 * Copyright (C) 2007 Google Inc. 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import com.google.common.annotations.Beta; 020 021 import java.util.Collections; 022 import java.util.List; 023 import java.util.concurrent.AbstractExecutorService; 024 import java.util.concurrent.ExecutorService; 025 import java.util.concurrent.Executors; 026 import java.util.concurrent.RejectedExecutionException; 027 import java.util.concurrent.ScheduledExecutorService; 028 import java.util.concurrent.ScheduledThreadPoolExecutor; 029 import java.util.concurrent.ThreadFactory; 030 import java.util.concurrent.ThreadPoolExecutor; 031 import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; 032 import java.util.concurrent.TimeUnit; 033 import java.util.concurrent.locks.Condition; 034 import java.util.concurrent.locks.Lock; 035 import java.util.concurrent.locks.ReentrantLock; 036 037 /** 038 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link 039 * ExecutorService}, and {@link ThreadFactory}. 040 * 041 * @author Eric Fellheimer 042 * @author Kyle Littlefield 043 * @author Justin Mahoney 044 * @since 3 045 */ 046 @Beta 047 public final class MoreExecutors { 048 private MoreExecutors() {} 049 050 /** 051 * Converts the given ThreadPoolExecutor into an ExecutorService that exits 052 * when the application is complete. It does so by using daemon threads and 053 * adding a shutdown hook to wait for their completion. 054 * 055 * <p>This is mainly for fixed thread pools. 056 * See {@link Executors#newFixedThreadPool(int)}. 057 * 058 * @param executor the executor to modify to make sure it exits when the 059 * application is finished 060 * @param terminationTimeout how long to wait for the executor to 061 * finish before terminating the JVM 062 * @param timeUnit unit of time for the time parameter 063 * @return an unmodifiable version of the input which will not hang the JVM 064 */ 065 public static ExecutorService getExitingExecutorService( 066 ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) { 067 executor.setThreadFactory(new ThreadFactoryBuilder() 068 .setDaemon(true) 069 .setThreadFactory(executor.getThreadFactory()) 070 .build()); 071 072 ExecutorService service = Executors.unconfigurableExecutorService(executor); 073 074 addDelayedShutdownHook(service, terminationTimeout, timeUnit); 075 076 return service; 077 } 078 079 /** 080 * Converts the given ScheduledThreadPoolExecutor into a 081 * ScheduledExecutorService that exits when the application is complete. It 082 * does so by using daemon threads and adding a shutdown hook to wait for 083 * their completion. 084 * 085 * <p>This is mainly for fixed thread pools. 086 * See {@link Executors#newScheduledThreadPool(int)}. 087 * 088 * @param executor the executor to modify to make sure it exits when the 089 * application is finished 090 * @param terminationTimeout how long to wait for the executor to 091 * finish before terminating the JVM 092 * @param timeUnit unit of time for the time parameter 093 * @return an unmodifiable version of the input which will not hang the JVM 094 */ 095 public static ScheduledExecutorService getExitingScheduledExecutorService( 096 ScheduledThreadPoolExecutor executor, long terminationTimeout, 097 TimeUnit timeUnit) { 098 executor.setThreadFactory(new ThreadFactoryBuilder() 099 .setDaemon(true) 100 .setThreadFactory(executor.getThreadFactory()) 101 .build()); 102 103 ScheduledExecutorService service = 104 Executors.unconfigurableScheduledExecutorService(executor); 105 106 addDelayedShutdownHook(service, terminationTimeout, timeUnit); 107 108 return service; 109 } 110 111 /** 112 * Add a shutdown hook to wait for thread completion in the given 113 * {@link ExecutorService service}. This is useful if the given service uses 114 * daemon threads, and we want to keep the JVM from exiting immediately on 115 * shutdown, instead giving these daemon threads a chance to terminate 116 * normally. 117 * @param service ExecutorService which uses daemon threads 118 * @param terminationTimeout how long to wait for the executor to finish 119 * before terminating the JVM 120 * @param timeUnit unit of time for the time parameter 121 */ 122 public static void addDelayedShutdownHook( 123 final ExecutorService service, final long terminationTimeout, 124 final TimeUnit timeUnit) { 125 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { 126 public void run() { 127 try { 128 // We'd like to log progress and failures that may arise in the 129 // following code, but unfortunately the behavior of logging 130 // is undefined in shutdown hooks. 131 // This is because the logging code installs a shutdown hook of its 132 // own. See Cleaner class inside {@link LogManager}. 133 service.shutdown(); 134 service.awaitTermination(terminationTimeout, timeUnit); 135 } catch (InterruptedException ignored) { 136 // We're shutting down anyway, so just ignore. 137 } 138 } 139 })); 140 } 141 142 /** 143 * Converts the given ThreadPoolExecutor into an ExecutorService that exits 144 * when the application is complete. It does so by using daemon threads and 145 * adding a shutdown hook to wait for their completion. 146 * 147 * <p>This method waits 120 seconds before continuing with JVM termination, 148 * even if the executor has not finished its work. 149 * 150 * <p>This is mainly for fixed thread pools. 151 * See {@link Executors#newFixedThreadPool(int)}. 152 * 153 * @param executor the executor to modify to make sure it exits when the 154 * application is finished 155 * @return an unmodifiable version of the input which will not hang the JVM 156 */ 157 public static ExecutorService getExitingExecutorService( 158 ThreadPoolExecutor executor) { 159 return getExitingExecutorService(executor, 120, TimeUnit.SECONDS); 160 } 161 162 /** 163 * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that 164 * exits when the application is complete. It does so by using daemon threads 165 * and adding a shutdown hook to wait for their completion. 166 * 167 * <p>This method waits 120 seconds before continuing with JVM termination, 168 * even if the executor has not finished its work. 169 * 170 * <p>This is mainly for fixed thread pools. 171 * See {@link Executors#newScheduledThreadPool(int)}. 172 * 173 * @param executor the executor to modify to make sure it exits when the 174 * application is finished 175 * @return an unmodifiable version of the input which will not hang the JVM 176 */ 177 public static ScheduledExecutorService getExitingScheduledExecutorService( 178 ScheduledThreadPoolExecutor executor) { 179 return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS); 180 } 181 182 /** 183 * Creates an executor service that runs each task in the thread 184 * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy} This 185 * applies both to individually submitted tasks and to collections of tasks 186 * submitted via {@code invokeAll} or {@code invokeAny}. In the latter case, 187 * tasks will run serially on the calling thread. Tasks are run to 188 * completion before a {@code Future} is returned to the caller (unless the 189 * executor has been shutdown). 190 * 191 * <p>Although all tasks are immediately executed in the thread that 192 * submitted the task, this {@code ExecutorService} imposes a small 193 * locking overhead on each task submission in order to implement shutdown 194 * and termination behavior. 195 * 196 * <p>The implementation deviates from the {@code ExecutorService} 197 * specification with regards to the {@code shutdownNow} method. First, 198 * "best-effort" with regards to canceling running tasks is implemented 199 * as "no-effort". No interrupts or other attempts are made to stop 200 * threads executing tasks. Second, the returned list will always be empty, 201 * as any submitted task is considered to have started execution. 202 * This applies also to tasks given to {@code invokeAll} or {@code invokeAny} 203 * which are pending serial execution, even the subset of the tasks that 204 * have not yet started execution. It is unclear from the 205 * {@code ExecutorService} specification if these should be included, and 206 * it's much easier to implement the interpretation that they not be. 207 * Finally, a call to {@code shutdown} or {@code shutdownNow} may result 208 * in concurrent calls to {@code invokeAll/invokeAny} throwing 209 * RejectedExecutionException, although a subset of the tasks may already 210 * have been executed. 211 */ 212 public static ExecutorService sameThreadExecutor() { 213 return new SameThreadExecutorService(); 214 } 215 216 // See sameThreadExecutor javadoc for behavioral notes. 217 private static class SameThreadExecutorService 218 extends AbstractExecutorService { 219 /** 220 * Lock used whenever accessing the state variables 221 * (runningTasks, shutdown, terminationCondition) of the executor 222 */ 223 private final Lock lock = new ReentrantLock(); 224 225 /** Signaled after the executor is shutdown and running tasks are done */ 226 private final Condition termination = lock.newCondition(); 227 228 /* 229 * Conceptually, these two variables describe the executor being in 230 * one of three states: 231 * - Active: shutdown == false 232 * - Shutdown: runningTasks > 0 and shutdown == true 233 * - Terminated: runningTasks == 0 and shutdown == true 234 */ 235 private int runningTasks = 0; 236 private boolean shutdown = false; 237 238 @Override 239 public void execute(Runnable command) { 240 startTask(); 241 try { 242 command.run(); 243 } finally { 244 endTask(); 245 } 246 } 247 248 @Override 249 public boolean isShutdown() { 250 lock.lock(); 251 try { 252 return shutdown; 253 } finally { 254 lock.unlock(); 255 } 256 } 257 258 @Override 259 public void shutdown() { 260 lock.lock(); 261 try { 262 shutdown = true; 263 } finally { 264 lock.unlock(); 265 } 266 } 267 268 // See sameThreadExecutor javadoc for unusual behavior of this method. 269 @Override 270 public List<Runnable> shutdownNow() { 271 shutdown(); 272 return Collections.emptyList(); 273 } 274 275 @Override 276 public boolean isTerminated() { 277 lock.lock(); 278 try { 279 return shutdown && runningTasks == 0; 280 } finally { 281 lock.unlock(); 282 } 283 } 284 285 @Override 286 public boolean awaitTermination(long timeout, TimeUnit unit) 287 throws InterruptedException { 288 long nanos = unit.toNanos(timeout); 289 lock.lock(); 290 try { 291 for (;;) { 292 if (isTerminated()) { 293 return true; 294 } else if (nanos <= 0) { 295 return false; 296 } else { 297 nanos = termination.awaitNanos(nanos); 298 } 299 } 300 } finally { 301 lock.unlock(); 302 } 303 } 304 305 /** 306 * Checks if the executor has been shut down and increments the running 307 * task count. 308 * 309 * @throws RejectedExecutionException if the executor has been previously 310 * shutdown 311 */ 312 private void startTask() { 313 lock.lock(); 314 try { 315 if (isShutdown()) { 316 throw new RejectedExecutionException("Executor already shutdown"); 317 } 318 runningTasks++; 319 } finally { 320 lock.unlock(); 321 } 322 } 323 324 /** 325 * Decrements the running task count. 326 */ 327 private void endTask() { 328 lock.lock(); 329 try { 330 runningTasks--; 331 if (isTerminated()) { 332 termination.signalAll(); 333 } 334 } finally { 335 lock.unlock(); 336 } 337 } 338 } 339 }