001    /*
002     * Copyright (C) 2007 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkArgument;
020    import static com.google.common.base.Preconditions.checkNotNull;
021    
022    import com.google.common.annotations.Beta;
023    import com.google.common.collect.Lists;
024    import com.google.common.collect.Queues;
025    
026    import java.util.Collection;
027    import java.util.Collections;
028    import java.util.Iterator;
029    import java.util.List;
030    import java.util.concurrent.BlockingQueue;
031    import java.util.concurrent.Callable;
032    import java.util.concurrent.ExecutionException;
033    import java.util.concurrent.ExecutorService;
034    import java.util.concurrent.Executors;
035    import java.util.concurrent.Future;
036    import java.util.concurrent.RejectedExecutionException;
037    import java.util.concurrent.ScheduledExecutorService;
038    import java.util.concurrent.ScheduledFuture;
039    import java.util.concurrent.ScheduledThreadPoolExecutor;
040    import java.util.concurrent.ThreadFactory;
041    import java.util.concurrent.ThreadPoolExecutor;
042    import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
043    import java.util.concurrent.TimeUnit;
044    import java.util.concurrent.TimeoutException;
045    import java.util.concurrent.locks.Condition;
046    import java.util.concurrent.locks.Lock;
047    import java.util.concurrent.locks.ReentrantLock;
048    
049    /**
050     * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
051     * ExecutorService}, and {@link ThreadFactory}.
052     *
053     * @author Eric Fellheimer
054     * @author Kyle Littlefield
055     * @author Justin Mahoney
056     * @since 3.0
057     */
058    public final class MoreExecutors {
059      private MoreExecutors() {}
060    
061      /**
062       * Converts the given ThreadPoolExecutor into an ExecutorService that exits
063       * when the application is complete.  It does so by using daemon threads and
064       * adding a shutdown hook to wait for their completion.
065       *
066       * <p>This is mainly for fixed thread pools.
067       * See {@link Executors#newFixedThreadPool(int)}.
068       *
069       * @param executor the executor to modify to make sure it exits when the
070       *        application is finished
071       * @param terminationTimeout how long to wait for the executor to
072       *        finish before terminating the JVM
073       * @param timeUnit unit of time for the time parameter
074       * @return an unmodifiable version of the input which will not hang the JVM
075       */
076      @Beta
077      public static ExecutorService getExitingExecutorService(
078          ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
079        executor.setThreadFactory(new ThreadFactoryBuilder()
080            .setDaemon(true)
081            .setThreadFactory(executor.getThreadFactory())
082            .build());
083    
084        ExecutorService service = Executors.unconfigurableExecutorService(executor);
085    
086        addDelayedShutdownHook(service, terminationTimeout, timeUnit);
087    
088        return service;
089      }
090    
091      /**
092       * Converts the given ScheduledThreadPoolExecutor into a
093       * ScheduledExecutorService that exits when the application is complete.  It
094       * does so by using daemon threads and adding a shutdown hook to wait for
095       * their completion.
096       *
097       * <p>This is mainly for fixed thread pools.
098       * See {@link Executors#newScheduledThreadPool(int)}.
099       *
100       * @param executor the executor to modify to make sure it exits when the
101       *        application is finished
102       * @param terminationTimeout how long to wait for the executor to
103       *        finish before terminating the JVM
104       * @param timeUnit unit of time for the time parameter
105       * @return an unmodifiable version of the input which will not hang the JVM
106       */
107      @Beta
108      public static ScheduledExecutorService getExitingScheduledExecutorService(
109          ScheduledThreadPoolExecutor executor, long terminationTimeout,
110          TimeUnit timeUnit) {
111        executor.setThreadFactory(new ThreadFactoryBuilder()
112            .setDaemon(true)
113            .setThreadFactory(executor.getThreadFactory())
114            .build());
115    
116        ScheduledExecutorService service =
117            Executors.unconfigurableScheduledExecutorService(executor);
118    
119        addDelayedShutdownHook(service, terminationTimeout, timeUnit);
120    
121        return service;
122      }
123    
124      /**
125       * Add a shutdown hook to wait for thread completion in the given
126       * {@link ExecutorService service}.  This is useful if the given service uses
127       * daemon threads, and we want to keep the JVM from exiting immediately on
128       * shutdown, instead giving these daemon threads a chance to terminate
129       * normally.
130       * @param service ExecutorService which uses daemon threads
131       * @param terminationTimeout how long to wait for the executor to finish
132       *        before terminating the JVM
133       * @param timeUnit unit of time for the time parameter
134       */
135      @Beta
136      public static void addDelayedShutdownHook(
137          final ExecutorService service, final long terminationTimeout,
138          final TimeUnit timeUnit) {
139        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
140          @Override
141          public void run() {
142            try {
143              // We'd like to log progress and failures that may arise in the
144              // following code, but unfortunately the behavior of logging
145              // is undefined in shutdown hooks.
146              // This is because the logging code installs a shutdown hook of its
147              // own. See Cleaner class inside {@link LogManager}.
148              service.shutdown();
149              service.awaitTermination(terminationTimeout, timeUnit);
150            } catch (InterruptedException ignored) {
151              // We're shutting down anyway, so just ignore.
152            }
153          }
154        }, "DelayedShutdownHook-for-" + service));
155      }
156    
157      /**
158       * Converts the given ThreadPoolExecutor into an ExecutorService that exits
159       * when the application is complete.  It does so by using daemon threads and
160       * adding a shutdown hook to wait for their completion.
161       *
162       * <p>This method waits 120 seconds before continuing with JVM termination,
163       * even if the executor has not finished its work.
164       *
165       * <p>This is mainly for fixed thread pools.
166       * See {@link Executors#newFixedThreadPool(int)}.
167       *
168       * @param executor the executor to modify to make sure it exits when the
169       *        application is finished
170       * @return an unmodifiable version of the input which will not hang the JVM
171       */
172      @Beta
173      public static ExecutorService getExitingExecutorService(
174          ThreadPoolExecutor executor) {
175        return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
176      }
177    
178      /**
179       * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
180       * exits when the application is complete.  It does so by using daemon threads
181       * and adding a shutdown hook to wait for their completion.
182       *
183       * <p>This method waits 120 seconds before continuing with JVM termination,
184       * even if the executor has not finished its work.
185       *
186       * <p>This is mainly for fixed thread pools.
187       * See {@link Executors#newScheduledThreadPool(int)}.
188       *
189       * @param executor the executor to modify to make sure it exits when the
190       *        application is finished
191       * @return an unmodifiable version of the input which will not hang the JVM
192       */
193      @Beta
194      public static ScheduledExecutorService getExitingScheduledExecutorService(
195          ScheduledThreadPoolExecutor executor) {
196        return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
197      }
198    
199      /**
200       * Creates an executor service that runs each task in the thread
201       * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
202       * applies both to individually submitted tasks and to collections of tasks
203       * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
204       * tasks will run serially on the calling thread.  Tasks are run to
205       * completion before a {@code Future} is returned to the caller (unless the
206       * executor has been shutdown).
207       *
208       * <p>Although all tasks are immediately executed in the thread that
209       * submitted the task, this {@code ExecutorService} imposes a small
210       * locking overhead on each task submission in order to implement shutdown
211       * and termination behavior.
212       *
213       * <p>The implementation deviates from the {@code ExecutorService}
214       * specification with regards to the {@code shutdownNow} method.  First,
215       * "best-effort" with regards to canceling running tasks is implemented
216       * as "no-effort".  No interrupts or other attempts are made to stop
217       * threads executing tasks.  Second, the returned list will always be empty,
218       * as any submitted task is considered to have started execution.
219       * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
220       * which are pending serial execution, even the subset of the tasks that
221       * have not yet started execution.  It is unclear from the
222       * {@code ExecutorService} specification if these should be included, and
223       * it's much easier to implement the interpretation that they not be.
224       * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
225       * in concurrent calls to {@code invokeAll/invokeAny} throwing
226       * RejectedExecutionException, although a subset of the tasks may already
227       * have been executed.
228       *
229       * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
230       *        >mostly source-compatible</a> since 3.0)
231       */
232      public static ListeningExecutorService sameThreadExecutor() {
233        return new SameThreadExecutorService();
234      }
235    
236      // See sameThreadExecutor javadoc for behavioral notes.
237      private static class SameThreadExecutorService
238          extends AbstractListeningExecutorService {
239        /**
240         * Lock used whenever accessing the state variables
241         * (runningTasks, shutdown, terminationCondition) of the executor
242         */
243        private final Lock lock = new ReentrantLock();
244    
245        /** Signaled after the executor is shutdown and running tasks are done */
246        private final Condition termination = lock.newCondition();
247    
248        /*
249         * Conceptually, these two variables describe the executor being in
250         * one of three states:
251         *   - Active: shutdown == false
252         *   - Shutdown: runningTasks > 0 and shutdown == true
253         *   - Terminated: runningTasks == 0 and shutdown == true
254         */
255        private int runningTasks = 0;
256        private boolean shutdown = false;
257    
258        @Override
259        public void execute(Runnable command) {
260          startTask();
261          try {
262            command.run();
263          } finally {
264            endTask();
265          }
266        }
267    
268        @Override
269        public boolean isShutdown() {
270          lock.lock();
271          try {
272            return shutdown;
273          } finally {
274            lock.unlock();
275          }
276        }
277    
278        @Override
279        public void shutdown() {
280          lock.lock();
281          try {
282            shutdown = true;
283          } finally {
284            lock.unlock();
285          }
286        }
287    
288        // See sameThreadExecutor javadoc for unusual behavior of this method.
289        @Override
290        public List<Runnable> shutdownNow() {
291          shutdown();
292          return Collections.emptyList();
293        }
294    
295        @Override
296        public boolean isTerminated() {
297          lock.lock();
298          try {
299            return shutdown && runningTasks == 0;
300          } finally {
301            lock.unlock();
302          }
303        }
304    
305        @Override
306        public boolean awaitTermination(long timeout, TimeUnit unit)
307            throws InterruptedException {
308          long nanos = unit.toNanos(timeout);
309          lock.lock();
310          try {
311            for (;;) {
312              if (isTerminated()) {
313                return true;
314              } else if (nanos <= 0) {
315                return false;
316              } else {
317                nanos = termination.awaitNanos(nanos);
318              }
319            }
320          } finally {
321            lock.unlock();
322          }
323        }
324    
325        /**
326         * Checks if the executor has been shut down and increments the running
327         * task count.
328         *
329         * @throws RejectedExecutionException if the executor has been previously
330         *         shutdown
331         */
332        private void startTask() {
333          lock.lock();
334          try {
335            if (isShutdown()) {
336              throw new RejectedExecutionException("Executor already shutdown");
337            }
338            runningTasks++;
339          } finally {
340            lock.unlock();
341          }
342        }
343    
344        /**
345         * Decrements the running task count.
346         */
347        private void endTask() {
348          lock.lock();
349          try {
350            runningTasks--;
351            if (isTerminated()) {
352              termination.signalAll();
353            }
354          } finally {
355            lock.unlock();
356          }
357        }
358      }
359    
360      /**
361       * Creates an {@link ExecutorService} whose {@code submit} and {@code
362       * invokeAll} methods submit {@link ListenableFutureTask} instances to the
363       * given delegate executor. Those methods, as well as {@code execute} and
364       * {@code invokeAny}, are implemented in terms of calls to {@code
365       * delegate.execute}. All other methods are forwarded unchanged to the
366       * delegate. This implies that the returned {@code ListeningExecutorService}
367       * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
368       * invokeAny} methods, so any special handling of tasks must be implemented in
369       * the delegate's {@code execute} method or by wrapping the returned {@code
370       * ListeningExecutorService}.
371       *
372       * <p>If the delegate executor was already an instance of {@code
373       * ListeningExecutorService}, it is returned untouched, and the rest of this
374       * documentation does not apply.
375       *
376       * @since 10.0
377       */
378      public static ListeningExecutorService listeningDecorator(
379          ExecutorService delegate) {
380        return (delegate instanceof ListeningExecutorService)
381            ? (ListeningExecutorService) delegate
382            : (delegate instanceof ScheduledExecutorService)
383            ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
384            : new ListeningDecorator(delegate);
385      }
386    
387      /**
388       * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
389       * invokeAll} methods submit {@link ListenableFutureTask} instances to the
390       * given delegate executor. Those methods, as well as {@code execute} and
391       * {@code invokeAny}, are implemented in terms of calls to {@code
392       * delegate.execute}. All other methods are forwarded unchanged to the
393       * delegate. This implies that the returned {@code
394       * SchedulingListeningExecutorService} never calls the delegate's {@code
395       * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
396       * handling of tasks must be implemented in the delegate's {@code execute}
397       * method or by wrapping the returned {@code
398       * SchedulingListeningExecutorService}.
399       *
400       * <p>If the delegate executor was already an instance of {@code
401       * ListeningScheduledExecutorService}, it is returned untouched, and the rest
402       * of this documentation does not apply.
403       *
404       * @since 10.0
405       */
406      public static ListeningScheduledExecutorService listeningDecorator(
407          ScheduledExecutorService delegate) {
408        return (delegate instanceof ListeningScheduledExecutorService)
409            ? (ListeningScheduledExecutorService) delegate
410            : new ScheduledListeningDecorator(delegate);
411      }
412    
413      private static class ListeningDecorator
414          extends AbstractListeningExecutorService {
415        final ExecutorService delegate;
416    
417        ListeningDecorator(ExecutorService delegate) {
418          this.delegate = checkNotNull(delegate);
419        }
420    
421        @Override
422        public boolean awaitTermination(long timeout, TimeUnit unit)
423            throws InterruptedException {
424          return delegate.awaitTermination(timeout, unit);
425        }
426    
427        @Override
428        public boolean isShutdown() {
429          return delegate.isShutdown();
430        }
431    
432        @Override
433        public boolean isTerminated() {
434          return delegate.isTerminated();
435        }
436    
437        @Override
438        public void shutdown() {
439          delegate.shutdown();
440        }
441    
442        @Override
443        public List<Runnable> shutdownNow() {
444          return delegate.shutdownNow();
445        }
446    
447        @Override
448        public void execute(Runnable command) {
449          delegate.execute(command);
450        }
451      }
452    
453      private static class ScheduledListeningDecorator
454          extends ListeningDecorator implements ListeningScheduledExecutorService {
455        @SuppressWarnings("hiding")
456        final ScheduledExecutorService delegate;
457    
458        ScheduledListeningDecorator(ScheduledExecutorService delegate) {
459          super(delegate);
460          this.delegate = checkNotNull(delegate);
461        }
462    
463        @Override
464        public ScheduledFuture<?> schedule(
465            Runnable command, long delay, TimeUnit unit) {
466          return delegate.schedule(command, delay, unit);
467        }
468    
469        @Override
470        public <V> ScheduledFuture<V> schedule(
471            Callable<V> callable, long delay, TimeUnit unit) {
472          return delegate.schedule(callable, delay, unit);
473        }
474    
475        @Override
476        public ScheduledFuture<?> scheduleAtFixedRate(
477            Runnable command, long initialDelay, long period, TimeUnit unit) {
478          return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
479        }
480    
481        @Override
482        public ScheduledFuture<?> scheduleWithFixedDelay(
483            Runnable command, long initialDelay, long delay, TimeUnit unit) {
484          return delegate.scheduleWithFixedDelay(
485              command, initialDelay, delay, unit);
486        }
487      }
488    
489      /*
490       * This following method is a modified version of one found in
491       * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
492       * which contained the following notice:
493       *
494       * Written by Doug Lea with assistance from members of JCP JSR-166
495       * Expert Group and released to the public domain, as explained at
496       * http://creativecommons.org/publicdomain/zero/1.0/
497       * Other contributors include Andrew Wright, Jeffrey Hayes,
498       * Pat Fisher, Mike Judd.
499       */
500    
501      /**
502       * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
503       * implementations.
504       */ static <T> T invokeAnyImpl(ListeningExecutorService executorService,
505          Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
506              throws InterruptedException, ExecutionException, TimeoutException {
507        int ntasks = tasks.size();
508        checkArgument(ntasks > 0);
509        List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
510        BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
511    
512        // For efficiency, especially in executors with limited
513        // parallelism, check to see if previously submitted tasks are
514        // done before submitting more of them. This interleaving
515        // plus the exception mechanics account for messiness of main
516        // loop.
517    
518        try {
519          // Record exceptions so that if we fail to obtain any
520          // result, we can throw the last exception we got.
521          ExecutionException ee = null;
522          long lastTime = timed ? System.nanoTime() : 0;
523          Iterator<? extends Callable<T>> it = tasks.iterator();
524    
525          futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
526          --ntasks;
527          int active = 1;
528    
529          for (;;) {
530            Future<T> f = futureQueue.poll();
531            if (f == null) {
532              if (ntasks > 0) {
533                --ntasks;
534                futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
535                ++active;
536              } else if (active == 0) {
537                break;
538              } else if (timed) {
539                f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
540                if (f == null) {
541                  throw new TimeoutException();
542                }
543                long now = System.nanoTime();
544                nanos -= now - lastTime;
545                lastTime = now;
546              } else {
547                f = futureQueue.take();
548              }
549            }
550            if (f != null) {
551              --active;
552              try {
553                return f.get();
554              } catch (ExecutionException eex) {
555                ee = eex;
556              } catch (RuntimeException rex) {
557                ee = new ExecutionException(rex);
558              }
559            }
560          }
561    
562          if (ee == null) {
563            ee = new ExecutionException(null);
564          }
565          throw ee;
566        } finally {
567          for (Future<T> f : futures) {
568            f.cancel(true);
569          }
570        }
571      }
572    
573      /**
574       * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
575       */
576      private static <T> ListenableFuture<T> submitAndAddQueueListener(
577          ListeningExecutorService executorService, Callable<T> task,
578          final BlockingQueue<Future<T>> queue) {
579        final ListenableFuture<T> future = executorService.submit(task);
580        future.addListener(new Runnable() {
581          @Override public void run() {
582            queue.add(future);
583          }
584        }, MoreExecutors.sameThreadExecutor());
585        return future;
586      }
587    }