001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021
022import com.google.common.annotations.Beta;
023import com.google.common.annotations.VisibleForTesting;
024import com.google.common.base.Throwables;
025import com.google.common.collect.Lists;
026import com.google.common.collect.Queues;
027
028import java.lang.reflect.InvocationTargetException;
029import java.util.Collection;
030import java.util.Collections;
031import java.util.Iterator;
032import java.util.List;
033import java.util.concurrent.BlockingQueue;
034import java.util.concurrent.Callable;
035import java.util.concurrent.ExecutionException;
036import java.util.concurrent.ExecutorService;
037import java.util.concurrent.Executors;
038import java.util.concurrent.Future;
039import java.util.concurrent.RejectedExecutionException;
040import java.util.concurrent.ScheduledExecutorService;
041import java.util.concurrent.ScheduledFuture;
042import java.util.concurrent.ScheduledThreadPoolExecutor;
043import java.util.concurrent.ThreadFactory;
044import java.util.concurrent.ThreadPoolExecutor;
045import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
046import java.util.concurrent.TimeUnit;
047import java.util.concurrent.TimeoutException;
048import java.util.concurrent.locks.Condition;
049import java.util.concurrent.locks.Lock;
050import java.util.concurrent.locks.ReentrantLock;
051
052/**
053 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
054 * ExecutorService}, and {@link ThreadFactory}.
055 *
056 * @author Eric Fellheimer
057 * @author Kyle Littlefield
058 * @author Justin Mahoney
059 * @since 3.0
060 */
061public final class MoreExecutors {
062  private MoreExecutors() {}
063
064  /**
065   * Converts the given ThreadPoolExecutor into an ExecutorService that exits
066   * when the application is complete.  It does so by using daemon threads and
067   * adding a shutdown hook to wait for their completion.
068   *
069   * <p>This is mainly for fixed thread pools.
070   * See {@link Executors#newFixedThreadPool(int)}.
071   *
072   * @param executor the executor to modify to make sure it exits when the
073   *        application is finished
074   * @param terminationTimeout how long to wait for the executor to
075   *        finish before terminating the JVM
076   * @param timeUnit unit of time for the time parameter
077   * @return an unmodifiable version of the input which will not hang the JVM
078   */
079  @Beta
080  public static ExecutorService getExitingExecutorService(
081      ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
082    return new Application()
083        .getExitingExecutorService(executor, terminationTimeout, timeUnit);
084  }
085
086  /**
087   * Converts the given ScheduledThreadPoolExecutor into a
088   * ScheduledExecutorService that exits when the application is complete.  It
089   * does so by using daemon threads and adding a shutdown hook to wait for
090   * their completion.
091   *
092   * <p>This is mainly for fixed thread pools.
093   * See {@link Executors#newScheduledThreadPool(int)}.
094   *
095   * @param executor the executor to modify to make sure it exits when the
096   *        application is finished
097   * @param terminationTimeout how long to wait for the executor to
098   *        finish before terminating the JVM
099   * @param timeUnit unit of time for the time parameter
100   * @return an unmodifiable version of the input which will not hang the JVM
101   */
102  @Beta
103  public static ScheduledExecutorService getExitingScheduledExecutorService(
104      ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
105    return new Application()
106        .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
107  }
108
109  /**
110   * Add a shutdown hook to wait for thread completion in the given
111   * {@link ExecutorService service}.  This is useful if the given service uses
112   * daemon threads, and we want to keep the JVM from exiting immediately on
113   * shutdown, instead giving these daemon threads a chance to terminate
114   * normally.
115   * @param service ExecutorService which uses daemon threads
116   * @param terminationTimeout how long to wait for the executor to finish
117   *        before terminating the JVM
118   * @param timeUnit unit of time for the time parameter
119   */
120  @Beta
121  public static void addDelayedShutdownHook(
122      ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
123    new Application()
124        .addDelayedShutdownHook(service, terminationTimeout, timeUnit);
125  }
126
127  /**
128   * Converts the given ThreadPoolExecutor into an ExecutorService that exits
129   * when the application is complete.  It does so by using daemon threads and
130   * adding a shutdown hook to wait for their completion.
131   *
132   * <p>This method waits 120 seconds before continuing with JVM termination,
133   * even if the executor has not finished its work.
134   *
135   * <p>This is mainly for fixed thread pools.
136   * See {@link Executors#newFixedThreadPool(int)}.
137   *
138   * @param executor the executor to modify to make sure it exits when the
139   *        application is finished
140   * @return an unmodifiable version of the input which will not hang the JVM
141   */
142  @Beta
143  public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
144    return new Application().getExitingExecutorService(executor);
145  }
146
147  /**
148   * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
149   * exits when the application is complete.  It does so by using daemon threads
150   * and adding a shutdown hook to wait for their completion.
151   *
152   * <p>This method waits 120 seconds before continuing with JVM termination,
153   * even if the executor has not finished its work.
154   *
155   * <p>This is mainly for fixed thread pools.
156   * See {@link Executors#newScheduledThreadPool(int)}.
157   *
158   * @param executor the executor to modify to make sure it exits when the
159   *        application is finished
160   * @return an unmodifiable version of the input which will not hang the JVM
161   */
162  @Beta
163  public static ScheduledExecutorService getExitingScheduledExecutorService(
164      ScheduledThreadPoolExecutor executor) {
165    return new Application().getExitingScheduledExecutorService(executor);
166  }
167
168  /** Represents the current application to register shutdown hooks. */
169  @VisibleForTesting static class Application {
170
171    final ExecutorService getExitingExecutorService(
172        ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
173      useDaemonThreadFactory(executor);
174      ExecutorService service = Executors.unconfigurableExecutorService(executor);
175      addDelayedShutdownHook(service, terminationTimeout, timeUnit);
176      return service;
177    }
178
179    final ScheduledExecutorService getExitingScheduledExecutorService(
180        ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
181      useDaemonThreadFactory(executor);
182      ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
183      addDelayedShutdownHook(service, terminationTimeout, timeUnit);
184      return service;
185    }
186
187    final void addDelayedShutdownHook(
188        final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
189      checkNotNull(service);
190      checkNotNull(timeUnit);
191      addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() {
192        @Override
193        public void run() {
194          try {
195            // We'd like to log progress and failures that may arise in the
196            // following code, but unfortunately the behavior of logging
197            // is undefined in shutdown hooks.
198            // This is because the logging code installs a shutdown hook of its
199            // own. See Cleaner class inside {@link LogManager}.
200            service.shutdown();
201            service.awaitTermination(terminationTimeout, timeUnit);
202          } catch (InterruptedException ignored) {
203            // We're shutting down anyway, so just ignore.
204          }
205        }
206      }));
207    }
208
209    final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
210      return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
211    }
212
213    final ScheduledExecutorService getExitingScheduledExecutorService(
214        ScheduledThreadPoolExecutor executor) {
215      return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
216    }
217
218    @VisibleForTesting void addShutdownHook(Thread hook) {
219      Runtime.getRuntime().addShutdownHook(hook);
220    }
221  }
222
223  private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
224    executor.setThreadFactory(new ThreadFactoryBuilder()
225        .setDaemon(true)
226        .setThreadFactory(executor.getThreadFactory())
227        .build());
228  }
229
230  /**
231   * Creates an executor service that runs each task in the thread
232   * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
233   * applies both to individually submitted tasks and to collections of tasks
234   * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
235   * tasks will run serially on the calling thread.  Tasks are run to
236   * completion before a {@code Future} is returned to the caller (unless the
237   * executor has been shutdown).
238   *
239   * <p>Although all tasks are immediately executed in the thread that
240   * submitted the task, this {@code ExecutorService} imposes a small
241   * locking overhead on each task submission in order to implement shutdown
242   * and termination behavior.
243   *
244   * <p>The implementation deviates from the {@code ExecutorService}
245   * specification with regards to the {@code shutdownNow} method.  First,
246   * "best-effort" with regards to canceling running tasks is implemented
247   * as "no-effort".  No interrupts or other attempts are made to stop
248   * threads executing tasks.  Second, the returned list will always be empty,
249   * as any submitted task is considered to have started execution.
250   * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
251   * which are pending serial execution, even the subset of the tasks that
252   * have not yet started execution.  It is unclear from the
253   * {@code ExecutorService} specification if these should be included, and
254   * it's much easier to implement the interpretation that they not be.
255   * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
256   * in concurrent calls to {@code invokeAll/invokeAny} throwing
257   * RejectedExecutionException, although a subset of the tasks may already
258   * have been executed.
259   *
260   * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
261   *        >mostly source-compatible</a> since 3.0)
262   */
263  public static ListeningExecutorService sameThreadExecutor() {
264    return new SameThreadExecutorService();
265  }
266
267  // See sameThreadExecutor javadoc for behavioral notes.
268  private static class SameThreadExecutorService
269      extends AbstractListeningExecutorService {
270    /**
271     * Lock used whenever accessing the state variables
272     * (runningTasks, shutdown, terminationCondition) of the executor
273     */
274    private final Lock lock = new ReentrantLock();
275
276    /** Signaled after the executor is shutdown and running tasks are done */
277    private final Condition termination = lock.newCondition();
278
279    /*
280     * Conceptually, these two variables describe the executor being in
281     * one of three states:
282     *   - Active: shutdown == false
283     *   - Shutdown: runningTasks > 0 and shutdown == true
284     *   - Terminated: runningTasks == 0 and shutdown == true
285     */
286    private int runningTasks = 0;
287    private boolean shutdown = false;
288
289    @Override
290    public void execute(Runnable command) {
291      startTask();
292      try {
293        command.run();
294      } finally {
295        endTask();
296      }
297    }
298
299    @Override
300    public boolean isShutdown() {
301      lock.lock();
302      try {
303        return shutdown;
304      } finally {
305        lock.unlock();
306      }
307    }
308
309    @Override
310    public void shutdown() {
311      lock.lock();
312      try {
313        shutdown = true;
314      } finally {
315        lock.unlock();
316      }
317    }
318
319    // See sameThreadExecutor javadoc for unusual behavior of this method.
320    @Override
321    public List<Runnable> shutdownNow() {
322      shutdown();
323      return Collections.emptyList();
324    }
325
326    @Override
327    public boolean isTerminated() {
328      lock.lock();
329      try {
330        return shutdown && runningTasks == 0;
331      } finally {
332        lock.unlock();
333      }
334    }
335
336    @Override
337    public boolean awaitTermination(long timeout, TimeUnit unit)
338        throws InterruptedException {
339      long nanos = unit.toNanos(timeout);
340      lock.lock();
341      try {
342        for (;;) {
343          if (isTerminated()) {
344            return true;
345          } else if (nanos <= 0) {
346            return false;
347          } else {
348            nanos = termination.awaitNanos(nanos);
349          }
350        }
351      } finally {
352        lock.unlock();
353      }
354    }
355
356    /**
357     * Checks if the executor has been shut down and increments the running
358     * task count.
359     *
360     * @throws RejectedExecutionException if the executor has been previously
361     *         shutdown
362     */
363    private void startTask() {
364      lock.lock();
365      try {
366        if (isShutdown()) {
367          throw new RejectedExecutionException("Executor already shutdown");
368        }
369        runningTasks++;
370      } finally {
371        lock.unlock();
372      }
373    }
374
375    /**
376     * Decrements the running task count.
377     */
378    private void endTask() {
379      lock.lock();
380      try {
381        runningTasks--;
382        if (isTerminated()) {
383          termination.signalAll();
384        }
385      } finally {
386        lock.unlock();
387      }
388    }
389  }
390
391  /**
392   * Creates an {@link ExecutorService} whose {@code submit} and {@code
393   * invokeAll} methods submit {@link ListenableFutureTask} instances to the
394   * given delegate executor. Those methods, as well as {@code execute} and
395   * {@code invokeAny}, are implemented in terms of calls to {@code
396   * delegate.execute}. All other methods are forwarded unchanged to the
397   * delegate. This implies that the returned {@code ListeningExecutorService}
398   * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
399   * invokeAny} methods, so any special handling of tasks must be implemented in
400   * the delegate's {@code execute} method or by wrapping the returned {@code
401   * ListeningExecutorService}.
402   *
403   * <p>If the delegate executor was already an instance of {@code
404   * ListeningExecutorService}, it is returned untouched, and the rest of this
405   * documentation does not apply.
406   *
407   * @since 10.0
408   */
409  public static ListeningExecutorService listeningDecorator(
410      ExecutorService delegate) {
411    return (delegate instanceof ListeningExecutorService)
412        ? (ListeningExecutorService) delegate
413        : (delegate instanceof ScheduledExecutorService)
414        ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
415        : new ListeningDecorator(delegate);
416  }
417
418  /**
419   * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
420   * invokeAll} methods submit {@link ListenableFutureTask} instances to the
421   * given delegate executor. Those methods, as well as {@code execute} and
422   * {@code invokeAny}, are implemented in terms of calls to {@code
423   * delegate.execute}. All other methods are forwarded unchanged to the
424   * delegate. This implies that the returned {@code
425   * SchedulingListeningExecutorService} never calls the delegate's {@code
426   * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
427   * handling of tasks must be implemented in the delegate's {@code execute}
428   * method or by wrapping the returned {@code
429   * SchedulingListeningExecutorService}.
430   *
431   * <p>If the delegate executor was already an instance of {@code
432   * ListeningScheduledExecutorService}, it is returned untouched, and the rest
433   * of this documentation does not apply.
434   *
435   * @since 10.0
436   */
437  public static ListeningScheduledExecutorService listeningDecorator(
438      ScheduledExecutorService delegate) {
439    return (delegate instanceof ListeningScheduledExecutorService)
440        ? (ListeningScheduledExecutorService) delegate
441        : new ScheduledListeningDecorator(delegate);
442  }
443
444  private static class ListeningDecorator
445      extends AbstractListeningExecutorService {
446    final ExecutorService delegate;
447
448    ListeningDecorator(ExecutorService delegate) {
449      this.delegate = checkNotNull(delegate);
450    }
451
452    @Override
453    public boolean awaitTermination(long timeout, TimeUnit unit)
454        throws InterruptedException {
455      return delegate.awaitTermination(timeout, unit);
456    }
457
458    @Override
459    public boolean isShutdown() {
460      return delegate.isShutdown();
461    }
462
463    @Override
464    public boolean isTerminated() {
465      return delegate.isTerminated();
466    }
467
468    @Override
469    public void shutdown() {
470      delegate.shutdown();
471    }
472
473    @Override
474    public List<Runnable> shutdownNow() {
475      return delegate.shutdownNow();
476    }
477
478    @Override
479    public void execute(Runnable command) {
480      delegate.execute(command);
481    }
482  }
483
484  private static class ScheduledListeningDecorator
485      extends ListeningDecorator implements ListeningScheduledExecutorService {
486    @SuppressWarnings("hiding")
487    final ScheduledExecutorService delegate;
488
489    ScheduledListeningDecorator(ScheduledExecutorService delegate) {
490      super(delegate);
491      this.delegate = checkNotNull(delegate);
492    }
493
494    @Override
495    public ScheduledFuture<?> schedule(
496        Runnable command, long delay, TimeUnit unit) {
497      return delegate.schedule(command, delay, unit);
498    }
499
500    @Override
501    public <V> ScheduledFuture<V> schedule(
502        Callable<V> callable, long delay, TimeUnit unit) {
503      return delegate.schedule(callable, delay, unit);
504    }
505
506    @Override
507    public ScheduledFuture<?> scheduleAtFixedRate(
508        Runnable command, long initialDelay, long period, TimeUnit unit) {
509      return delegate.scheduleAtFixedRate(command, initialDelay, period, unit);
510    }
511
512    @Override
513    public ScheduledFuture<?> scheduleWithFixedDelay(
514        Runnable command, long initialDelay, long delay, TimeUnit unit) {
515      return delegate.scheduleWithFixedDelay(
516          command, initialDelay, delay, unit);
517    }
518  }
519
520  /*
521   * This following method is a modified version of one found in
522   * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
523   * which contained the following notice:
524   *
525   * Written by Doug Lea with assistance from members of JCP JSR-166
526   * Expert Group and released to the public domain, as explained at
527   * http://creativecommons.org/publicdomain/zero/1.0/
528   * Other contributors include Andrew Wright, Jeffrey Hayes,
529   * Pat Fisher, Mike Judd.
530   */
531
532  /**
533   * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
534   * implementations.
535   */ static <T> T invokeAnyImpl(ListeningExecutorService executorService,
536      Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
537          throws InterruptedException, ExecutionException, TimeoutException {
538    checkNotNull(executorService);
539    int ntasks = tasks.size();
540    checkArgument(ntasks > 0);
541    List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
542    BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
543
544    // For efficiency, especially in executors with limited
545    // parallelism, check to see if previously submitted tasks are
546    // done before submitting more of them. This interleaving
547    // plus the exception mechanics account for messiness of main
548    // loop.
549
550    try {
551      // Record exceptions so that if we fail to obtain any
552      // result, we can throw the last exception we got.
553      ExecutionException ee = null;
554      long lastTime = timed ? System.nanoTime() : 0;
555      Iterator<? extends Callable<T>> it = tasks.iterator();
556
557      futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
558      --ntasks;
559      int active = 1;
560
561      for (;;) {
562        Future<T> f = futureQueue.poll();
563        if (f == null) {
564          if (ntasks > 0) {
565            --ntasks;
566            futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
567            ++active;
568          } else if (active == 0) {
569            break;
570          } else if (timed) {
571            f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
572            if (f == null) {
573              throw new TimeoutException();
574            }
575            long now = System.nanoTime();
576            nanos -= now - lastTime;
577            lastTime = now;
578          } else {
579            f = futureQueue.take();
580          }
581        }
582        if (f != null) {
583          --active;
584          try {
585            return f.get();
586          } catch (ExecutionException eex) {
587            ee = eex;
588          } catch (RuntimeException rex) {
589            ee = new ExecutionException(rex);
590          }
591        }
592      }
593
594      if (ee == null) {
595        ee = new ExecutionException(null);
596      }
597      throw ee;
598    } finally {
599      for (Future<T> f : futures) {
600        f.cancel(true);
601      }
602    }
603  }
604
605  /**
606   * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
607   */
608  private static <T> ListenableFuture<T> submitAndAddQueueListener(
609      ListeningExecutorService executorService, Callable<T> task,
610      final BlockingQueue<Future<T>> queue) {
611    final ListenableFuture<T> future = executorService.submit(task);
612    future.addListener(new Runnable() {
613      @Override public void run() {
614        queue.add(future);
615      }
616    }, MoreExecutors.sameThreadExecutor());
617    return future;
618  }
619
620  /**
621   * Returns a default thread factory used to create new threads.
622   *
623   * <p>On AppEngine, returns {@code ThreadManager.currentRequestThreadFactory()}.
624   * Otherwise, returns {@link Executors#defaultThreadFactory()}.
625   *
626   * @since 14.0
627   */
628  @Beta
629  public static ThreadFactory platformThreadFactory() {
630    if (!isAppEngine()) {
631      return Executors.defaultThreadFactory();
632    }
633    try {
634      return (ThreadFactory) Class.forName("com.google.appengine.api.ThreadManager")
635          .getMethod("currentRequestThreadFactory")
636          .invoke(null);
637    } catch (IllegalAccessException e) {
638      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
639    } catch (ClassNotFoundException e) {
640      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
641    } catch (NoSuchMethodException e) {
642      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
643    } catch (InvocationTargetException e) {
644      throw Throwables.propagate(e.getCause());
645    }
646  }
647
648  private static boolean isAppEngine() {
649    if (System.getProperty("com.google.appengine.runtime.environment") == null) {
650      return false;
651    }
652    try {
653      // If the current environment is null, we're not inside AppEngine.
654      return Class.forName("com.google.apphosting.api.ApiProxy")
655          .getMethod("getCurrentEnvironment")
656          .invoke(null) != null;
657    } catch (ClassNotFoundException e) {
658      // If ApiProxy doesn't exist, we're not on AppEngine at all.
659      return false;
660    } catch (InvocationTargetException e) {
661      // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
662      return false;
663    } catch (IllegalAccessException e) {
664      // If the method isn't accessible, we're not on a supported version of AppEngine;
665      return false;
666    } catch (NoSuchMethodException e) {
667      // If the method doesn't exist, we're not on a supported version of AppEngine;
668      return false;
669    }
670  }
671
672  /**
673   * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name}
674   * unless changing the name is forbidden by the security manager.
675   */
676  static Thread newThread(String name, Runnable runnable) {
677    checkNotNull(name);
678    checkNotNull(runnable);
679    Thread result = platformThreadFactory().newThread(runnable);
680    try {
681      result.setName(name);
682    } catch (SecurityException e) {
683      // OK if we can't set the name in this environment.
684    }
685    return result;
686  }
687}