001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021
022import com.google.common.annotations.Beta;
023import com.google.common.annotations.VisibleForTesting;
024import com.google.common.base.Supplier;
025import com.google.common.base.Throwables;
026import com.google.common.collect.Lists;
027import com.google.common.collect.Queues;
028import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
029
030import java.lang.reflect.InvocationTargetException;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.List;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.Callable;
037import java.util.concurrent.Delayed;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.Executor;
040import java.util.concurrent.ExecutorService;
041import java.util.concurrent.Executors;
042import java.util.concurrent.Future;
043import java.util.concurrent.RejectedExecutionException;
044import java.util.concurrent.ScheduledExecutorService;
045import java.util.concurrent.ScheduledFuture;
046import java.util.concurrent.ScheduledThreadPoolExecutor;
047import java.util.concurrent.ThreadFactory;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
050import java.util.concurrent.TimeUnit;
051import java.util.concurrent.TimeoutException;
052import java.util.concurrent.locks.Condition;
053import java.util.concurrent.locks.Lock;
054import java.util.concurrent.locks.ReentrantLock;
055
056/**
057 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
058 * ExecutorService}, and {@link ThreadFactory}.
059 *
060 * @author Eric Fellheimer
061 * @author Kyle Littlefield
062 * @author Justin Mahoney
063 * @since 3.0
064 */
065public final class MoreExecutors {
066  private MoreExecutors() {}
067
068  /**
069   * Converts the given ThreadPoolExecutor into an ExecutorService that exits
070   * when the application is complete.  It does so by using daemon threads and
071   * adding a shutdown hook to wait for their completion.
072   *
073   * <p>This is mainly for fixed thread pools.
074   * See {@link Executors#newFixedThreadPool(int)}.
075   *
076   * @param executor the executor to modify to make sure it exits when the
077   *        application is finished
078   * @param terminationTimeout how long to wait for the executor to
079   *        finish before terminating the JVM
080   * @param timeUnit unit of time for the time parameter
081   * @return an unmodifiable version of the input which will not hang the JVM
082   */
083  @Beta
084  public static ExecutorService getExitingExecutorService(
085      ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
086    return new Application()
087        .getExitingExecutorService(executor, terminationTimeout, timeUnit);
088  }
089
090  /**
091   * Converts the given ScheduledThreadPoolExecutor into a
092   * ScheduledExecutorService that exits when the application is complete.  It
093   * does so by using daemon threads and adding a shutdown hook to wait for
094   * their completion.
095   *
096   * <p>This is mainly for fixed thread pools.
097   * See {@link Executors#newScheduledThreadPool(int)}.
098   *
099   * @param executor the executor to modify to make sure it exits when the
100   *        application is finished
101   * @param terminationTimeout how long to wait for the executor to
102   *        finish before terminating the JVM
103   * @param timeUnit unit of time for the time parameter
104   * @return an unmodifiable version of the input which will not hang the JVM
105   */
106  @Beta
107  public static ScheduledExecutorService getExitingScheduledExecutorService(
108      ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
109    return new Application()
110        .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
111  }
112
113  /**
114   * Add a shutdown hook to wait for thread completion in the given
115   * {@link ExecutorService service}.  This is useful if the given service uses
116   * daemon threads, and we want to keep the JVM from exiting immediately on
117   * shutdown, instead giving these daemon threads a chance to terminate
118   * normally.
119   * @param service ExecutorService which uses daemon threads
120   * @param terminationTimeout how long to wait for the executor to finish
121   *        before terminating the JVM
122   * @param timeUnit unit of time for the time parameter
123   */
124  @Beta
125  public static void addDelayedShutdownHook(
126      ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
127    new Application()
128        .addDelayedShutdownHook(service, terminationTimeout, timeUnit);
129  }
130
131  /**
132   * Converts the given ThreadPoolExecutor into an ExecutorService that exits
133   * when the application is complete.  It does so by using daemon threads and
134   * adding a shutdown hook to wait for their completion.
135   *
136   * <p>This method waits 120 seconds before continuing with JVM termination,
137   * even if the executor has not finished its work.
138   *
139   * <p>This is mainly for fixed thread pools.
140   * See {@link Executors#newFixedThreadPool(int)}.
141   *
142   * @param executor the executor to modify to make sure it exits when the
143   *        application is finished
144   * @return an unmodifiable version of the input which will not hang the JVM
145   */
146  @Beta
147  public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
148    return new Application().getExitingExecutorService(executor);
149  }
150
151  /**
152   * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
153   * exits when the application is complete.  It does so by using daemon threads
154   * and adding a shutdown hook to wait for their completion.
155   *
156   * <p>This method waits 120 seconds before continuing with JVM termination,
157   * even if the executor has not finished its work.
158   *
159   * <p>This is mainly for fixed thread pools.
160   * See {@link Executors#newScheduledThreadPool(int)}.
161   *
162   * @param executor the executor to modify to make sure it exits when the
163   *        application is finished
164   * @return an unmodifiable version of the input which will not hang the JVM
165   */
166  @Beta
167  public static ScheduledExecutorService getExitingScheduledExecutorService(
168      ScheduledThreadPoolExecutor executor) {
169    return new Application().getExitingScheduledExecutorService(executor);
170  }
171
172  /** Represents the current application to register shutdown hooks. */
173  @VisibleForTesting static class Application {
174
175    final ExecutorService getExitingExecutorService(
176        ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
177      useDaemonThreadFactory(executor);
178      ExecutorService service = Executors.unconfigurableExecutorService(executor);
179      addDelayedShutdownHook(service, terminationTimeout, timeUnit);
180      return service;
181    }
182
183    final ScheduledExecutorService getExitingScheduledExecutorService(
184        ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
185      useDaemonThreadFactory(executor);
186      ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
187      addDelayedShutdownHook(service, terminationTimeout, timeUnit);
188      return service;
189    }
190
191    final void addDelayedShutdownHook(
192        final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
193      checkNotNull(service);
194      checkNotNull(timeUnit);
195      addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() {
196        @Override
197        public void run() {
198          try {
199            // We'd like to log progress and failures that may arise in the
200            // following code, but unfortunately the behavior of logging
201            // is undefined in shutdown hooks.
202            // This is because the logging code installs a shutdown hook of its
203            // own. See Cleaner class inside {@link LogManager}.
204            service.shutdown();
205            service.awaitTermination(terminationTimeout, timeUnit);
206          } catch (InterruptedException ignored) {
207            // We're shutting down anyway, so just ignore.
208          }
209        }
210      }));
211    }
212
213    final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
214      return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
215    }
216
217    final ScheduledExecutorService getExitingScheduledExecutorService(
218        ScheduledThreadPoolExecutor executor) {
219      return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
220    }
221
222    @VisibleForTesting void addShutdownHook(Thread hook) {
223      Runtime.getRuntime().addShutdownHook(hook);
224    }
225  }
226
227  private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
228    executor.setThreadFactory(new ThreadFactoryBuilder()
229        .setDaemon(true)
230        .setThreadFactory(executor.getThreadFactory())
231        .build());
232  }
233
234  /**
235   * Creates an executor service that runs each task in the thread
236   * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
237   * applies both to individually submitted tasks and to collections of tasks
238   * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
239   * tasks will run serially on the calling thread.  Tasks are run to
240   * completion before a {@code Future} is returned to the caller (unless the
241   * executor has been shutdown).
242   *
243   * <p>Although all tasks are immediately executed in the thread that
244   * submitted the task, this {@code ExecutorService} imposes a small
245   * locking overhead on each task submission in order to implement shutdown
246   * and termination behavior.
247   *
248   * <p>The implementation deviates from the {@code ExecutorService}
249   * specification with regards to the {@code shutdownNow} method.  First,
250   * "best-effort" with regards to canceling running tasks is implemented
251   * as "no-effort".  No interrupts or other attempts are made to stop
252   * threads executing tasks.  Second, the returned list will always be empty,
253   * as any submitted task is considered to have started execution.
254   * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
255   * which are pending serial execution, even the subset of the tasks that
256   * have not yet started execution.  It is unclear from the
257   * {@code ExecutorService} specification if these should be included, and
258   * it's much easier to implement the interpretation that they not be.
259   * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
260   * in concurrent calls to {@code invokeAll/invokeAny} throwing
261   * RejectedExecutionException, although a subset of the tasks may already
262   * have been executed.
263   *
264   * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
265   *        >mostly source-compatible</a> since 3.0)
266   * @deprecated Use {@link #directExecutor()} if you only require an {@link Executor} and
267   *     {@link #newDirectExecutorService()} if you need a {@link ListeningExecutorService}.
268   */
269  @Deprecated public static ListeningExecutorService sameThreadExecutor() {
270    return new DirectExecutorService();
271  }
272
273  // See sameThreadExecutor javadoc for behavioral notes.
274  private static class DirectExecutorService
275      extends AbstractListeningExecutorService {
276    /**
277     * Lock used whenever accessing the state variables
278     * (runningTasks, shutdown, terminationCondition) of the executor
279     */
280    private final Lock lock = new ReentrantLock();
281
282    /** Signaled after the executor is shutdown and running tasks are done */
283    private final Condition termination = lock.newCondition();
284
285    /*
286     * Conceptually, these two variables describe the executor being in
287     * one of three states:
288     *   - Active: shutdown == false
289     *   - Shutdown: runningTasks > 0 and shutdown == true
290     *   - Terminated: runningTasks == 0 and shutdown == true
291     */
292    private int runningTasks = 0;
293    private boolean shutdown = false;
294
295    @Override
296    public void execute(Runnable command) {
297      startTask();
298      try {
299        command.run();
300      } finally {
301        endTask();
302      }
303    }
304
305    @Override
306    public boolean isShutdown() {
307      lock.lock();
308      try {
309        return shutdown;
310      } finally {
311        lock.unlock();
312      }
313    }
314
315    @Override
316    public void shutdown() {
317      lock.lock();
318      try {
319        shutdown = true;
320      } finally {
321        lock.unlock();
322      }
323    }
324
325    // See sameThreadExecutor javadoc for unusual behavior of this method.
326    @Override
327    public List<Runnable> shutdownNow() {
328      shutdown();
329      return Collections.emptyList();
330    }
331
332    @Override
333    public boolean isTerminated() {
334      lock.lock();
335      try {
336        return shutdown && runningTasks == 0;
337      } finally {
338        lock.unlock();
339      }
340    }
341
342    @Override
343    public boolean awaitTermination(long timeout, TimeUnit unit)
344        throws InterruptedException {
345      long nanos = unit.toNanos(timeout);
346      lock.lock();
347      try {
348        for (;;) {
349          if (isTerminated()) {
350            return true;
351          } else if (nanos <= 0) {
352            return false;
353          } else {
354            nanos = termination.awaitNanos(nanos);
355          }
356        }
357      } finally {
358        lock.unlock();
359      }
360    }
361
362    /**
363     * Checks if the executor has been shut down and increments the running
364     * task count.
365     *
366     * @throws RejectedExecutionException if the executor has been previously
367     *         shutdown
368     */
369    private void startTask() {
370      lock.lock();
371      try {
372        if (isShutdown()) {
373          throw new RejectedExecutionException("Executor already shutdown");
374        }
375        runningTasks++;
376      } finally {
377        lock.unlock();
378      }
379    }
380
381    /**
382     * Decrements the running task count.
383     */
384    private void endTask() {
385      lock.lock();
386      try {
387        runningTasks--;
388        if (isTerminated()) {
389          termination.signalAll();
390        }
391      } finally {
392        lock.unlock();
393      }
394    }
395  }
396
397  /**
398   * Creates an executor service that runs each task in the thread
399   * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
400   * applies both to individually submitted tasks and to collections of tasks
401   * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
402   * tasks will run serially on the calling thread.  Tasks are run to
403   * completion before a {@code Future} is returned to the caller (unless the
404   * executor has been shutdown).
405   *
406   * <p>Although all tasks are immediately executed in the thread that
407   * submitted the task, this {@code ExecutorService} imposes a small
408   * locking overhead on each task submission in order to implement shutdown
409   * and termination behavior.
410   *
411   * <p>The implementation deviates from the {@code ExecutorService}
412   * specification with regards to the {@code shutdownNow} method.  First,
413   * "best-effort" with regards to canceling running tasks is implemented
414   * as "no-effort".  No interrupts or other attempts are made to stop
415   * threads executing tasks.  Second, the returned list will always be empty,
416   * as any submitted task is considered to have started execution.
417   * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
418   * which are pending serial execution, even the subset of the tasks that
419   * have not yet started execution.  It is unclear from the
420   * {@code ExecutorService} specification if these should be included, and
421   * it's much easier to implement the interpretation that they not be.
422   * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
423   * in concurrent calls to {@code invokeAll/invokeAny} throwing
424   * RejectedExecutionException, although a subset of the tasks may already
425   * have been executed.
426   *
427   * @since 18.0 (present as MoreExecutors.sameThreadExecutor() since 10.0)
428   */
429  public static ListeningExecutorService newDirectExecutorService() {
430    return new DirectExecutorService();
431  }
432
433  /**
434   * Returns an {@link Executor} that runs each task in the thread that invokes
435   * {@link Executor#execute execute}, as in {@link CallerRunsPolicy}.
436   *
437   * <p>This instance is equivalent to: <pre>   {@code
438   *   final class DirectExecutor implements Executor {
439   *     public void execute(Runnable r) {
440   *       r.run();
441   *     }
442   *   }}</pre>
443   *
444   * <p>This should be preferred to {@link #newDirectExecutorService()} because the implementing the
445   * {@link ExecutorService} subinterface necessitates significant performance overhead.
446   *
447   * @since 18.0
448   */
449  public static Executor directExecutor() {
450    return DirectExecutor.INSTANCE;
451  }
452
453  /** See {@link #directExecutor} for behavioral notes. */
454  private enum DirectExecutor implements Executor {
455    INSTANCE;
456    @Override public void execute(Runnable command) {
457      command.run();
458    }
459  }
460
461  /**
462   * Creates an {@link ExecutorService} whose {@code submit} and {@code
463   * invokeAll} methods submit {@link ListenableFutureTask} instances to the
464   * given delegate executor. Those methods, as well as {@code execute} and
465   * {@code invokeAny}, are implemented in terms of calls to {@code
466   * delegate.execute}. All other methods are forwarded unchanged to the
467   * delegate. This implies that the returned {@code ListeningExecutorService}
468   * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
469   * invokeAny} methods, so any special handling of tasks must be implemented in
470   * the delegate's {@code execute} method or by wrapping the returned {@code
471   * ListeningExecutorService}.
472   *
473   * <p>If the delegate executor was already an instance of {@code
474   * ListeningExecutorService}, it is returned untouched, and the rest of this
475   * documentation does not apply.
476   *
477   * @since 10.0
478   */
479  public static ListeningExecutorService listeningDecorator(
480      ExecutorService delegate) {
481    return (delegate instanceof ListeningExecutorService)
482        ? (ListeningExecutorService) delegate
483        : (delegate instanceof ScheduledExecutorService)
484        ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
485        : new ListeningDecorator(delegate);
486  }
487
488  /**
489   * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
490   * invokeAll} methods submit {@link ListenableFutureTask} instances to the
491   * given delegate executor. Those methods, as well as {@code execute} and
492   * {@code invokeAny}, are implemented in terms of calls to {@code
493   * delegate.execute}. All other methods are forwarded unchanged to the
494   * delegate. This implies that the returned {@code
495   * ListeningScheduledExecutorService} never calls the delegate's {@code
496   * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
497   * handling of tasks must be implemented in the delegate's {@code execute}
498   * method or by wrapping the returned {@code
499   * ListeningScheduledExecutorService}.
500   *
501   * <p>If the delegate executor was already an instance of {@code
502   * ListeningScheduledExecutorService}, it is returned untouched, and the rest
503   * of this documentation does not apply.
504   *
505   * @since 10.0
506   */
507  public static ListeningScheduledExecutorService listeningDecorator(
508      ScheduledExecutorService delegate) {
509    return (delegate instanceof ListeningScheduledExecutorService)
510        ? (ListeningScheduledExecutorService) delegate
511        : new ScheduledListeningDecorator(delegate);
512  }
513
514  private static class ListeningDecorator
515      extends AbstractListeningExecutorService {
516    private final ExecutorService delegate;
517
518    ListeningDecorator(ExecutorService delegate) {
519      this.delegate = checkNotNull(delegate);
520    }
521
522    @Override
523    public boolean awaitTermination(long timeout, TimeUnit unit)
524        throws InterruptedException {
525      return delegate.awaitTermination(timeout, unit);
526    }
527
528    @Override
529    public boolean isShutdown() {
530      return delegate.isShutdown();
531    }
532
533    @Override
534    public boolean isTerminated() {
535      return delegate.isTerminated();
536    }
537
538    @Override
539    public void shutdown() {
540      delegate.shutdown();
541    }
542
543    @Override
544    public List<Runnable> shutdownNow() {
545      return delegate.shutdownNow();
546    }
547
548    @Override
549    public void execute(Runnable command) {
550      delegate.execute(command);
551    }
552  }
553
554  private static class ScheduledListeningDecorator
555      extends ListeningDecorator implements ListeningScheduledExecutorService {
556    @SuppressWarnings("hiding")
557    final ScheduledExecutorService delegate;
558
559    ScheduledListeningDecorator(ScheduledExecutorService delegate) {
560      super(delegate);
561      this.delegate = checkNotNull(delegate);
562    }
563
564    @Override
565    public ListenableScheduledFuture<?> schedule(
566        Runnable command, long delay, TimeUnit unit) {
567      ListenableFutureTask<Void> task =
568          ListenableFutureTask.create(command, null);
569      ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
570      return new ListenableScheduledTask<Void>(task, scheduled);
571    }
572
573    @Override
574    public <V> ListenableScheduledFuture<V> schedule(
575        Callable<V> callable, long delay, TimeUnit unit) {
576      ListenableFutureTask<V> task = ListenableFutureTask.create(callable);
577      ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
578      return new ListenableScheduledTask<V>(task, scheduled);
579    }
580
581    @Override
582    public ListenableScheduledFuture<?> scheduleAtFixedRate(
583        Runnable command, long initialDelay, long period, TimeUnit unit) {
584      NeverSuccessfulListenableFutureTask task =
585          new NeverSuccessfulListenableFutureTask(command);
586      ScheduledFuture<?> scheduled =
587          delegate.scheduleAtFixedRate(task, initialDelay, period, unit);
588      return new ListenableScheduledTask<Void>(task, scheduled);
589    }
590
591    @Override
592    public ListenableScheduledFuture<?> scheduleWithFixedDelay(
593        Runnable command, long initialDelay, long delay, TimeUnit unit) {
594      NeverSuccessfulListenableFutureTask task =
595          new NeverSuccessfulListenableFutureTask(command);
596      ScheduledFuture<?> scheduled =
597          delegate.scheduleWithFixedDelay(task, initialDelay, delay, unit);
598      return new ListenableScheduledTask<Void>(task, scheduled);
599    }
600
601    private static final class ListenableScheduledTask<V>
602        extends SimpleForwardingListenableFuture<V>
603        implements ListenableScheduledFuture<V> {
604
605      private final ScheduledFuture<?> scheduledDelegate;
606
607      public ListenableScheduledTask(
608          ListenableFuture<V> listenableDelegate,
609          ScheduledFuture<?> scheduledDelegate) {
610        super(listenableDelegate);
611        this.scheduledDelegate = scheduledDelegate;
612      }
613
614      @Override
615      public boolean cancel(boolean mayInterruptIfRunning) {
616        boolean cancelled = super.cancel(mayInterruptIfRunning);
617        if (cancelled) {
618          // Unless it is cancelled, the delegate may continue being scheduled
619          scheduledDelegate.cancel(mayInterruptIfRunning);
620
621          // TODO(user): Cancel "this" if "scheduledDelegate" is cancelled.
622        }
623        return cancelled;
624      }
625
626      @Override
627      public long getDelay(TimeUnit unit) {
628        return scheduledDelegate.getDelay(unit);
629      }
630
631      @Override
632      public int compareTo(Delayed other) {
633        return scheduledDelegate.compareTo(other);
634      }
635    }
636
637    private static final class NeverSuccessfulListenableFutureTask
638        extends AbstractFuture<Void>
639        implements Runnable {
640      private final Runnable delegate;
641
642      public NeverSuccessfulListenableFutureTask(Runnable delegate) {
643        this.delegate = checkNotNull(delegate);
644      }
645
646      @Override public void run() {
647        try {
648          delegate.run();
649        } catch (Throwable t) {
650          setException(t);
651          throw Throwables.propagate(t);
652        }
653      }
654    }
655  }
656
657  /*
658   * This following method is a modified version of one found in
659   * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
660   * which contained the following notice:
661   *
662   * Written by Doug Lea with assistance from members of JCP JSR-166
663   * Expert Group and released to the public domain, as explained at
664   * http://creativecommons.org/publicdomain/zero/1.0/
665   * Other contributors include Andrew Wright, Jeffrey Hayes,
666   * Pat Fisher, Mike Judd.
667   */
668
669  /**
670   * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
671   * implementations.
672   */ static <T> T invokeAnyImpl(ListeningExecutorService executorService,
673      Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
674          throws InterruptedException, ExecutionException, TimeoutException {
675    checkNotNull(executorService);
676    int ntasks = tasks.size();
677    checkArgument(ntasks > 0);
678    List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
679    BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
680
681    // For efficiency, especially in executors with limited
682    // parallelism, check to see if previously submitted tasks are
683    // done before submitting more of them. This interleaving
684    // plus the exception mechanics account for messiness of main
685    // loop.
686
687    try {
688      // Record exceptions so that if we fail to obtain any
689      // result, we can throw the last exception we got.
690      ExecutionException ee = null;
691      long lastTime = timed ? System.nanoTime() : 0;
692      Iterator<? extends Callable<T>> it = tasks.iterator();
693
694      futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
695      --ntasks;
696      int active = 1;
697
698      for (;;) {
699        Future<T> f = futureQueue.poll();
700        if (f == null) {
701          if (ntasks > 0) {
702            --ntasks;
703            futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
704            ++active;
705          } else if (active == 0) {
706            break;
707          } else if (timed) {
708            f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
709            if (f == null) {
710              throw new TimeoutException();
711            }
712            long now = System.nanoTime();
713            nanos -= now - lastTime;
714            lastTime = now;
715          } else {
716            f = futureQueue.take();
717          }
718        }
719        if (f != null) {
720          --active;
721          try {
722            return f.get();
723          } catch (ExecutionException eex) {
724            ee = eex;
725          } catch (RuntimeException rex) {
726            ee = new ExecutionException(rex);
727          }
728        }
729      }
730
731      if (ee == null) {
732        ee = new ExecutionException(null);
733      }
734      throw ee;
735    } finally {
736      for (Future<T> f : futures) {
737        f.cancel(true);
738      }
739    }
740  }
741
742  /**
743   * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
744   */
745  private static <T> ListenableFuture<T> submitAndAddQueueListener(
746      ListeningExecutorService executorService, Callable<T> task,
747      final BlockingQueue<Future<T>> queue) {
748    final ListenableFuture<T> future = executorService.submit(task);
749    future.addListener(new Runnable() {
750      @Override public void run() {
751        queue.add(future);
752      }
753    }, directExecutor());
754    return future;
755  }
756
757  /**
758   * Returns a default thread factory used to create new threads.
759   *
760   * <p>On AppEngine, returns {@code ThreadManager.currentRequestThreadFactory()}.
761   * Otherwise, returns {@link Executors#defaultThreadFactory()}.
762   *
763   * @since 14.0
764   */
765  @Beta
766  public static ThreadFactory platformThreadFactory() {
767    if (!isAppEngine()) {
768      return Executors.defaultThreadFactory();
769    }
770    try {
771      return (ThreadFactory) Class.forName("com.google.appengine.api.ThreadManager")
772          .getMethod("currentRequestThreadFactory")
773          .invoke(null);
774    } catch (IllegalAccessException e) {
775      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
776    } catch (ClassNotFoundException e) {
777      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
778    } catch (NoSuchMethodException e) {
779      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
780    } catch (InvocationTargetException e) {
781      throw Throwables.propagate(e.getCause());
782    }
783  }
784
785  private static boolean isAppEngine() {
786    if (System.getProperty("com.google.appengine.runtime.environment") == null) {
787      return false;
788    }
789    try {
790      // If the current environment is null, we're not inside AppEngine.
791      return Class.forName("com.google.apphosting.api.ApiProxy")
792          .getMethod("getCurrentEnvironment")
793          .invoke(null) != null;
794    } catch (ClassNotFoundException e) {
795      // If ApiProxy doesn't exist, we're not on AppEngine at all.
796      return false;
797    } catch (InvocationTargetException e) {
798      // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
799      return false;
800    } catch (IllegalAccessException e) {
801      // If the method isn't accessible, we're not on a supported version of AppEngine;
802      return false;
803    } catch (NoSuchMethodException e) {
804      // If the method doesn't exist, we're not on a supported version of AppEngine;
805      return false;
806    }
807  }
808
809  /**
810   * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name}
811   * unless changing the name is forbidden by the security manager.
812   */
813  static Thread newThread(String name, Runnable runnable) {
814    checkNotNull(name);
815    checkNotNull(runnable);
816    Thread result = platformThreadFactory().newThread(runnable);
817    try {
818      result.setName(name);
819    } catch (SecurityException e) {
820      // OK if we can't set the name in this environment.
821    }
822    return result;
823  }
824
825  // TODO(user): provide overloads for ListeningExecutorService? ListeningScheduledExecutorService?
826  // TODO(user): provide overloads that take constant strings? Function<Runnable, String>s to
827  // calculate names?
828
829  /**
830   * Creates an {@link Executor} that renames the {@link Thread threads} that its tasks run in.
831   *
832   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
833   * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
834   * prevents the renaming then it will be skipped but the tasks will still execute.
835   *
836   *
837   * @param executor The executor to decorate
838   * @param nameSupplier The source of names for each task
839   */
840  static Executor renamingDecorator(final Executor executor, final Supplier<String> nameSupplier) {
841    checkNotNull(executor);
842    checkNotNull(nameSupplier);
843    if (isAppEngine()) {
844      // AppEngine doesn't support thread renaming, so don't even try
845      return executor;
846    }
847    return new Executor() {
848      @Override public void execute(Runnable command) {
849        executor.execute(Callables.threadRenaming(command, nameSupplier));
850      }
851    };
852  }
853
854  /**
855   * Creates an {@link ExecutorService} that renames the {@link Thread threads} that its tasks run
856   * in.
857   *
858   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
859   * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
860   * prevents the renaming then it will be skipped but the tasks will still execute.
861   *
862   *
863   * @param service The executor to decorate
864   * @param nameSupplier The source of names for each task
865   */
866  static ExecutorService renamingDecorator(final ExecutorService service,
867      final Supplier<String> nameSupplier) {
868    checkNotNull(service);
869    checkNotNull(nameSupplier);
870    if (isAppEngine()) {
871      // AppEngine doesn't support thread renaming, so don't even try.
872      return service;
873    }
874    return new WrappingExecutorService(service) {
875      @Override protected <T> Callable<T> wrapTask(Callable<T> callable) {
876        return Callables.threadRenaming(callable, nameSupplier);
877      }
878      @Override protected Runnable wrapTask(Runnable command) {
879        return Callables.threadRenaming(command, nameSupplier);
880      }
881    };
882  }
883
884  /**
885   * Creates a {@link ScheduledExecutorService} that renames the {@link Thread threads} that its
886   * tasks run in.
887   *
888   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
889   * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
890   * prevents the renaming then it will be skipped but the tasks will still execute.
891   *
892   *
893   * @param service The executor to decorate
894   * @param nameSupplier The source of names for each task
895   */
896  static ScheduledExecutorService renamingDecorator(final ScheduledExecutorService service,
897      final Supplier<String> nameSupplier) {
898    checkNotNull(service);
899    checkNotNull(nameSupplier);
900    if (isAppEngine()) {
901      // AppEngine doesn't support thread renaming, so don't even try.
902      return service;
903    }
904    return new WrappingScheduledExecutorService(service) {
905      @Override protected <T> Callable<T> wrapTask(Callable<T> callable) {
906        return Callables.threadRenaming(callable, nameSupplier);
907      }
908      @Override protected Runnable wrapTask(Runnable command) {
909        return Callables.threadRenaming(command, nameSupplier);
910      }
911    };
912  }
913
914  /**
915   * Shuts down the given executor gradually, first disabling new submissions and later cancelling
916   * existing tasks.
917   *
918   * <p>The method takes the following steps:
919   * <ol>
920   *  <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of new submitted tasks.
921   *  <li>waits for half of the specified timeout.
922   *  <li>if the timeout expires, it calls {@link ExecutorService#shutdownNow()}, cancelling
923   *  pending tasks and interrupting running tasks.
924   *  <li>waits for the other half of the specified timeout.
925   * </ol>
926   *
927   * <p>If, at any step of the process, the given executor is terminated or the calling thread is
928   * interrupted, the method calls {@link ExecutorService#shutdownNow()}, cancelling
929   * pending tasks and interrupting running tasks.
930   *
931   * @param service the {@code ExecutorService} to shut down
932   * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate
933   * @param unit the time unit of the timeout argument
934   * @return {@code true} if the pool was terminated successfully, {@code false} if the
935   *     {@code ExecutorService} could not terminate <b>or</b> the thread running this method
936   *     is interrupted while waiting for the {@code ExecutorService} to terminate
937   * @since 17.0
938   */
939  @Beta
940  public static boolean shutdownAndAwaitTermination(
941      ExecutorService service, long timeout, TimeUnit unit) {
942    checkNotNull(unit);
943    // Disable new tasks from being submitted
944    service.shutdown();
945    try {
946      long halfTimeoutNanos = TimeUnit.NANOSECONDS.convert(timeout, unit) / 2;
947      // Wait for half the duration of the timeout for existing tasks to terminate
948      if (!service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
949        // Cancel currently executing tasks
950        service.shutdownNow();
951        // Wait the other half of the timeout for tasks to respond to being cancelled
952        service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
953      }
954    } catch (InterruptedException ie) {
955      // Preserve interrupt status
956      Thread.currentThread().interrupt();
957      // (Re-)Cancel if current thread also interrupted
958      service.shutdownNow();
959    }
960    return service.isTerminated();
961  }
962}