001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021
022import com.google.common.annotations.Beta;
023import com.google.common.annotations.VisibleForTesting;
024import com.google.common.base.Supplier;
025import com.google.common.base.Throwables;
026import com.google.common.collect.Lists;
027import com.google.common.collect.Queues;
028import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
029
030import java.lang.reflect.InvocationTargetException;
031import java.util.Collection;
032import java.util.Collections;
033import java.util.Iterator;
034import java.util.List;
035import java.util.concurrent.BlockingQueue;
036import java.util.concurrent.Callable;
037import java.util.concurrent.Delayed;
038import java.util.concurrent.ExecutionException;
039import java.util.concurrent.Executor;
040import java.util.concurrent.ExecutorService;
041import java.util.concurrent.Executors;
042import java.util.concurrent.Future;
043import java.util.concurrent.RejectedExecutionException;
044import java.util.concurrent.ScheduledExecutorService;
045import java.util.concurrent.ScheduledFuture;
046import java.util.concurrent.ScheduledThreadPoolExecutor;
047import java.util.concurrent.ThreadFactory;
048import java.util.concurrent.ThreadPoolExecutor;
049import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
050import java.util.concurrent.TimeUnit;
051import java.util.concurrent.TimeoutException;
052import java.util.concurrent.locks.Condition;
053import java.util.concurrent.locks.Lock;
054import java.util.concurrent.locks.ReentrantLock;
055
056/**
057 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link
058 * ExecutorService}, and {@link ThreadFactory}.
059 *
060 * @author Eric Fellheimer
061 * @author Kyle Littlefield
062 * @author Justin Mahoney
063 * @since 3.0
064 */
065public final class MoreExecutors {
066  private MoreExecutors() {}
067
068  /**
069   * Converts the given ThreadPoolExecutor into an ExecutorService that exits
070   * when the application is complete.  It does so by using daemon threads and
071   * adding a shutdown hook to wait for their completion.
072   *
073   * <p>This is mainly for fixed thread pools.
074   * See {@link Executors#newFixedThreadPool(int)}.
075   *
076   * @param executor the executor to modify to make sure it exits when the
077   *        application is finished
078   * @param terminationTimeout how long to wait for the executor to
079   *        finish before terminating the JVM
080   * @param timeUnit unit of time for the time parameter
081   * @return an unmodifiable version of the input which will not hang the JVM
082   */
083  @Beta
084  public static ExecutorService getExitingExecutorService(
085      ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
086    return new Application()
087        .getExitingExecutorService(executor, terminationTimeout, timeUnit);
088  }
089
090  /**
091   * Converts the given ScheduledThreadPoolExecutor into a
092   * ScheduledExecutorService that exits when the application is complete.  It
093   * does so by using daemon threads and adding a shutdown hook to wait for
094   * their completion.
095   *
096   * <p>This is mainly for fixed thread pools.
097   * See {@link Executors#newScheduledThreadPool(int)}.
098   *
099   * @param executor the executor to modify to make sure it exits when the
100   *        application is finished
101   * @param terminationTimeout how long to wait for the executor to
102   *        finish before terminating the JVM
103   * @param timeUnit unit of time for the time parameter
104   * @return an unmodifiable version of the input which will not hang the JVM
105   */
106  @Beta
107  public static ScheduledExecutorService getExitingScheduledExecutorService(
108      ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
109    return new Application()
110        .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
111  }
112
113  /**
114   * Add a shutdown hook to wait for thread completion in the given
115   * {@link ExecutorService service}.  This is useful if the given service uses
116   * daemon threads, and we want to keep the JVM from exiting immediately on
117   * shutdown, instead giving these daemon threads a chance to terminate
118   * normally.
119   * @param service ExecutorService which uses daemon threads
120   * @param terminationTimeout how long to wait for the executor to finish
121   *        before terminating the JVM
122   * @param timeUnit unit of time for the time parameter
123   */
124  @Beta
125  public static void addDelayedShutdownHook(
126      ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
127    new Application()
128        .addDelayedShutdownHook(service, terminationTimeout, timeUnit);
129  }
130
131  /**
132   * Converts the given ThreadPoolExecutor into an ExecutorService that exits
133   * when the application is complete.  It does so by using daemon threads and
134   * adding a shutdown hook to wait for their completion.
135   *
136   * <p>This method waits 120 seconds before continuing with JVM termination,
137   * even if the executor has not finished its work.
138   *
139   * <p>This is mainly for fixed thread pools.
140   * See {@link Executors#newFixedThreadPool(int)}.
141   *
142   * @param executor the executor to modify to make sure it exits when the
143   *        application is finished
144   * @return an unmodifiable version of the input which will not hang the JVM
145   */
146  @Beta
147  public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
148    return new Application().getExitingExecutorService(executor);
149  }
150
151  /**
152   * Converts the given ThreadPoolExecutor into a ScheduledExecutorService that
153   * exits when the application is complete.  It does so by using daemon threads
154   * and adding a shutdown hook to wait for their completion.
155   *
156   * <p>This method waits 120 seconds before continuing with JVM termination,
157   * even if the executor has not finished its work.
158   *
159   * <p>This is mainly for fixed thread pools.
160   * See {@link Executors#newScheduledThreadPool(int)}.
161   *
162   * @param executor the executor to modify to make sure it exits when the
163   *        application is finished
164   * @return an unmodifiable version of the input which will not hang the JVM
165   */
166  @Beta
167  public static ScheduledExecutorService getExitingScheduledExecutorService(
168      ScheduledThreadPoolExecutor executor) {
169    return new Application().getExitingScheduledExecutorService(executor);
170  }
171
172  /** Represents the current application to register shutdown hooks. */
173  @VisibleForTesting static class Application {
174
175    final ExecutorService getExitingExecutorService(
176        ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
177      useDaemonThreadFactory(executor);
178      ExecutorService service = Executors.unconfigurableExecutorService(executor);
179      addDelayedShutdownHook(service, terminationTimeout, timeUnit);
180      return service;
181    }
182
183    final ScheduledExecutorService getExitingScheduledExecutorService(
184        ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
185      useDaemonThreadFactory(executor);
186      ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
187      addDelayedShutdownHook(service, terminationTimeout, timeUnit);
188      return service;
189    }
190
191    final void addDelayedShutdownHook(
192        final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
193      checkNotNull(service);
194      checkNotNull(timeUnit);
195      addShutdownHook(MoreExecutors.newThread("DelayedShutdownHook-for-" + service, new Runnable() {
196        @Override
197        public void run() {
198          try {
199            // We'd like to log progress and failures that may arise in the
200            // following code, but unfortunately the behavior of logging
201            // is undefined in shutdown hooks.
202            // This is because the logging code installs a shutdown hook of its
203            // own. See Cleaner class inside {@link LogManager}.
204            service.shutdown();
205            service.awaitTermination(terminationTimeout, timeUnit);
206          } catch (InterruptedException ignored) {
207            // We're shutting down anyway, so just ignore.
208          }
209        }
210      }));
211    }
212
213    final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
214      return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
215    }
216
217    final ScheduledExecutorService getExitingScheduledExecutorService(
218        ScheduledThreadPoolExecutor executor) {
219      return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
220    }
221
222    @VisibleForTesting void addShutdownHook(Thread hook) {
223      Runtime.getRuntime().addShutdownHook(hook);
224    }
225  }
226
227  private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
228    executor.setThreadFactory(new ThreadFactoryBuilder()
229        .setDaemon(true)
230        .setThreadFactory(executor.getThreadFactory())
231        .build());
232  }
233
234  /**
235   * Creates an executor service that runs each task in the thread
236   * that invokes {@code execute/submit}, as in {@link CallerRunsPolicy}  This
237   * applies both to individually submitted tasks and to collections of tasks
238   * submitted via {@code invokeAll} or {@code invokeAny}.  In the latter case,
239   * tasks will run serially on the calling thread.  Tasks are run to
240   * completion before a {@code Future} is returned to the caller (unless the
241   * executor has been shutdown).
242   *
243   * <p>Although all tasks are immediately executed in the thread that
244   * submitted the task, this {@code ExecutorService} imposes a small
245   * locking overhead on each task submission in order to implement shutdown
246   * and termination behavior.
247   *
248   * <p>The implementation deviates from the {@code ExecutorService}
249   * specification with regards to the {@code shutdownNow} method.  First,
250   * "best-effort" with regards to canceling running tasks is implemented
251   * as "no-effort".  No interrupts or other attempts are made to stop
252   * threads executing tasks.  Second, the returned list will always be empty,
253   * as any submitted task is considered to have started execution.
254   * This applies also to tasks given to {@code invokeAll} or {@code invokeAny}
255   * which are pending serial execution, even the subset of the tasks that
256   * have not yet started execution.  It is unclear from the
257   * {@code ExecutorService} specification if these should be included, and
258   * it's much easier to implement the interpretation that they not be.
259   * Finally, a call to {@code shutdown} or {@code shutdownNow} may result
260   * in concurrent calls to {@code invokeAll/invokeAny} throwing
261   * RejectedExecutionException, although a subset of the tasks may already
262   * have been executed.
263   *
264   * @since 10.0 (<a href="http://code.google.com/p/guava-libraries/wiki/Compatibility"
265   *        >mostly source-compatible</a> since 3.0)
266   */
267  public static ListeningExecutorService sameThreadExecutor() {
268    return new SameThreadExecutorService();
269  }
270
271  // See sameThreadExecutor javadoc for behavioral notes.
272  private static class SameThreadExecutorService
273      extends AbstractListeningExecutorService {
274    /**
275     * Lock used whenever accessing the state variables
276     * (runningTasks, shutdown, terminationCondition) of the executor
277     */
278    private final Lock lock = new ReentrantLock();
279
280    /** Signaled after the executor is shutdown and running tasks are done */
281    private final Condition termination = lock.newCondition();
282
283    /*
284     * Conceptually, these two variables describe the executor being in
285     * one of three states:
286     *   - Active: shutdown == false
287     *   - Shutdown: runningTasks > 0 and shutdown == true
288     *   - Terminated: runningTasks == 0 and shutdown == true
289     */
290    private int runningTasks = 0;
291    private boolean shutdown = false;
292
293    @Override
294    public void execute(Runnable command) {
295      startTask();
296      try {
297        command.run();
298      } finally {
299        endTask();
300      }
301    }
302
303    @Override
304    public boolean isShutdown() {
305      lock.lock();
306      try {
307        return shutdown;
308      } finally {
309        lock.unlock();
310      }
311    }
312
313    @Override
314    public void shutdown() {
315      lock.lock();
316      try {
317        shutdown = true;
318      } finally {
319        lock.unlock();
320      }
321    }
322
323    // See sameThreadExecutor javadoc for unusual behavior of this method.
324    @Override
325    public List<Runnable> shutdownNow() {
326      shutdown();
327      return Collections.emptyList();
328    }
329
330    @Override
331    public boolean isTerminated() {
332      lock.lock();
333      try {
334        return shutdown && runningTasks == 0;
335      } finally {
336        lock.unlock();
337      }
338    }
339
340    @Override
341    public boolean awaitTermination(long timeout, TimeUnit unit)
342        throws InterruptedException {
343      long nanos = unit.toNanos(timeout);
344      lock.lock();
345      try {
346        for (;;) {
347          if (isTerminated()) {
348            return true;
349          } else if (nanos <= 0) {
350            return false;
351          } else {
352            nanos = termination.awaitNanos(nanos);
353          }
354        }
355      } finally {
356        lock.unlock();
357      }
358    }
359
360    /**
361     * Checks if the executor has been shut down and increments the running
362     * task count.
363     *
364     * @throws RejectedExecutionException if the executor has been previously
365     *         shutdown
366     */
367    private void startTask() {
368      lock.lock();
369      try {
370        if (isShutdown()) {
371          throw new RejectedExecutionException("Executor already shutdown");
372        }
373        runningTasks++;
374      } finally {
375        lock.unlock();
376      }
377    }
378
379    /**
380     * Decrements the running task count.
381     */
382    private void endTask() {
383      lock.lock();
384      try {
385        runningTasks--;
386        if (isTerminated()) {
387          termination.signalAll();
388        }
389      } finally {
390        lock.unlock();
391      }
392    }
393  }
394
395  /**
396   * Creates an {@link ExecutorService} whose {@code submit} and {@code
397   * invokeAll} methods submit {@link ListenableFutureTask} instances to the
398   * given delegate executor. Those methods, as well as {@code execute} and
399   * {@code invokeAny}, are implemented in terms of calls to {@code
400   * delegate.execute}. All other methods are forwarded unchanged to the
401   * delegate. This implies that the returned {@code ListeningExecutorService}
402   * never calls the delegate's {@code submit}, {@code invokeAll}, and {@code
403   * invokeAny} methods, so any special handling of tasks must be implemented in
404   * the delegate's {@code execute} method or by wrapping the returned {@code
405   * ListeningExecutorService}.
406   *
407   * <p>If the delegate executor was already an instance of {@code
408   * ListeningExecutorService}, it is returned untouched, and the rest of this
409   * documentation does not apply.
410   *
411   * @since 10.0
412   */
413  public static ListeningExecutorService listeningDecorator(
414      ExecutorService delegate) {
415    return (delegate instanceof ListeningExecutorService)
416        ? (ListeningExecutorService) delegate
417        : (delegate instanceof ScheduledExecutorService)
418        ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
419        : new ListeningDecorator(delegate);
420  }
421
422  /**
423   * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code
424   * invokeAll} methods submit {@link ListenableFutureTask} instances to the
425   * given delegate executor. Those methods, as well as {@code execute} and
426   * {@code invokeAny}, are implemented in terms of calls to {@code
427   * delegate.execute}. All other methods are forwarded unchanged to the
428   * delegate. This implies that the returned {@code
429   * ListeningScheduledExecutorService} never calls the delegate's {@code
430   * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special
431   * handling of tasks must be implemented in the delegate's {@code execute}
432   * method or by wrapping the returned {@code
433   * ListeningScheduledExecutorService}.
434   *
435   * <p>If the delegate executor was already an instance of {@code
436   * ListeningScheduledExecutorService}, it is returned untouched, and the rest
437   * of this documentation does not apply.
438   *
439   * @since 10.0
440   */
441  public static ListeningScheduledExecutorService listeningDecorator(
442      ScheduledExecutorService delegate) {
443    return (delegate instanceof ListeningScheduledExecutorService)
444        ? (ListeningScheduledExecutorService) delegate
445        : new ScheduledListeningDecorator(delegate);
446  }
447
448  private static class ListeningDecorator
449      extends AbstractListeningExecutorService {
450    private final ExecutorService delegate;
451
452    ListeningDecorator(ExecutorService delegate) {
453      this.delegate = checkNotNull(delegate);
454    }
455
456    @Override
457    public boolean awaitTermination(long timeout, TimeUnit unit)
458        throws InterruptedException {
459      return delegate.awaitTermination(timeout, unit);
460    }
461
462    @Override
463    public boolean isShutdown() {
464      return delegate.isShutdown();
465    }
466
467    @Override
468    public boolean isTerminated() {
469      return delegate.isTerminated();
470    }
471
472    @Override
473    public void shutdown() {
474      delegate.shutdown();
475    }
476
477    @Override
478    public List<Runnable> shutdownNow() {
479      return delegate.shutdownNow();
480    }
481
482    @Override
483    public void execute(Runnable command) {
484      delegate.execute(command);
485    }
486  }
487
488  private static class ScheduledListeningDecorator
489      extends ListeningDecorator implements ListeningScheduledExecutorService {
490    @SuppressWarnings("hiding")
491    final ScheduledExecutorService delegate;
492
493    ScheduledListeningDecorator(ScheduledExecutorService delegate) {
494      super(delegate);
495      this.delegate = checkNotNull(delegate);
496    }
497
498    @Override
499    public ListenableScheduledFuture<?> schedule(
500        Runnable command, long delay, TimeUnit unit) {
501      ListenableFutureTask<Void> task =
502          ListenableFutureTask.create(command, null);
503      ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
504      return new ListenableScheduledTask<Void>(task, scheduled);
505    }
506
507    @Override
508    public <V> ListenableScheduledFuture<V> schedule(
509        Callable<V> callable, long delay, TimeUnit unit) {
510      ListenableFutureTask<V> task = ListenableFutureTask.create(callable);
511      ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
512      return new ListenableScheduledTask<V>(task, scheduled);
513    }
514
515    @Override
516    public ListenableScheduledFuture<?> scheduleAtFixedRate(
517        Runnable command, long initialDelay, long period, TimeUnit unit) {
518      NeverSuccessfulListenableFutureTask task =
519          new NeverSuccessfulListenableFutureTask(command);
520      ScheduledFuture<?> scheduled =
521          delegate.scheduleAtFixedRate(task, initialDelay, period, unit);
522      return new ListenableScheduledTask<Void>(task, scheduled);
523    }
524
525    @Override
526    public ListenableScheduledFuture<?> scheduleWithFixedDelay(
527        Runnable command, long initialDelay, long delay, TimeUnit unit) {
528      NeverSuccessfulListenableFutureTask task =
529          new NeverSuccessfulListenableFutureTask(command);
530      ScheduledFuture<?> scheduled =
531          delegate.scheduleWithFixedDelay(task, initialDelay, delay, unit);
532      return new ListenableScheduledTask<Void>(task, scheduled);
533    }
534
535    private static final class ListenableScheduledTask<V>
536        extends SimpleForwardingListenableFuture<V>
537        implements ListenableScheduledFuture<V> {
538
539      private final ScheduledFuture<?> scheduledDelegate;
540
541      public ListenableScheduledTask(
542          ListenableFuture<V> listenableDelegate,
543          ScheduledFuture<?> scheduledDelegate) {
544        super(listenableDelegate);
545        this.scheduledDelegate = scheduledDelegate;
546      }
547
548      @Override
549      public boolean cancel(boolean mayInterruptIfRunning) {
550        boolean cancelled = super.cancel(mayInterruptIfRunning);
551        if (cancelled) {
552          // Unless it is cancelled, the delegate may continue being scheduled
553          scheduledDelegate.cancel(mayInterruptIfRunning);
554
555          // TODO(user): Cancel "this" if "scheduledDelegate" is cancelled.
556        }
557        return cancelled;
558      }
559
560      @Override
561      public long getDelay(TimeUnit unit) {
562        return scheduledDelegate.getDelay(unit);
563      }
564
565      @Override
566      public int compareTo(Delayed other) {
567        return scheduledDelegate.compareTo(other);
568      }
569    }
570
571    private static final class NeverSuccessfulListenableFutureTask
572        extends AbstractFuture<Void>
573        implements Runnable {
574      private final Runnable delegate;
575
576      public NeverSuccessfulListenableFutureTask(Runnable delegate) {
577        this.delegate = checkNotNull(delegate);
578      }
579
580      @Override public void run() {
581        try {
582          delegate.run();
583        } catch (Throwable t) {
584          setException(t);
585          throw Throwables.propagate(t);
586        }
587      }
588    }
589  }
590
591  /*
592   * This following method is a modified version of one found in
593   * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
594   * which contained the following notice:
595   *
596   * Written by Doug Lea with assistance from members of JCP JSR-166
597   * Expert Group and released to the public domain, as explained at
598   * http://creativecommons.org/publicdomain/zero/1.0/
599   * Other contributors include Andrew Wright, Jeffrey Hayes,
600   * Pat Fisher, Mike Judd.
601   */
602
603  /**
604   * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
605   * implementations.
606   */ static <T> T invokeAnyImpl(ListeningExecutorService executorService,
607      Collection<? extends Callable<T>> tasks, boolean timed, long nanos)
608          throws InterruptedException, ExecutionException, TimeoutException {
609    checkNotNull(executorService);
610    int ntasks = tasks.size();
611    checkArgument(ntasks > 0);
612    List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
613    BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
614
615    // For efficiency, especially in executors with limited
616    // parallelism, check to see if previously submitted tasks are
617    // done before submitting more of them. This interleaving
618    // plus the exception mechanics account for messiness of main
619    // loop.
620
621    try {
622      // Record exceptions so that if we fail to obtain any
623      // result, we can throw the last exception we got.
624      ExecutionException ee = null;
625      long lastTime = timed ? System.nanoTime() : 0;
626      Iterator<? extends Callable<T>> it = tasks.iterator();
627
628      futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
629      --ntasks;
630      int active = 1;
631
632      for (;;) {
633        Future<T> f = futureQueue.poll();
634        if (f == null) {
635          if (ntasks > 0) {
636            --ntasks;
637            futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
638            ++active;
639          } else if (active == 0) {
640            break;
641          } else if (timed) {
642            f = futureQueue.poll(nanos, TimeUnit.NANOSECONDS);
643            if (f == null) {
644              throw new TimeoutException();
645            }
646            long now = System.nanoTime();
647            nanos -= now - lastTime;
648            lastTime = now;
649          } else {
650            f = futureQueue.take();
651          }
652        }
653        if (f != null) {
654          --active;
655          try {
656            return f.get();
657          } catch (ExecutionException eex) {
658            ee = eex;
659          } catch (RuntimeException rex) {
660            ee = new ExecutionException(rex);
661          }
662        }
663      }
664
665      if (ee == null) {
666        ee = new ExecutionException(null);
667      }
668      throw ee;
669    } finally {
670      for (Future<T> f : futures) {
671        f.cancel(true);
672      }
673    }
674  }
675
676  /**
677   * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
678   */
679  private static <T> ListenableFuture<T> submitAndAddQueueListener(
680      ListeningExecutorService executorService, Callable<T> task,
681      final BlockingQueue<Future<T>> queue) {
682    final ListenableFuture<T> future = executorService.submit(task);
683    future.addListener(new Runnable() {
684      @Override public void run() {
685        queue.add(future);
686      }
687    }, MoreExecutors.sameThreadExecutor());
688    return future;
689  }
690
691  /**
692   * Returns a default thread factory used to create new threads.
693   *
694   * <p>On AppEngine, returns {@code ThreadManager.currentRequestThreadFactory()}.
695   * Otherwise, returns {@link Executors#defaultThreadFactory()}.
696   *
697   * @since 14.0
698   */
699  @Beta
700  public static ThreadFactory platformThreadFactory() {
701    if (!isAppEngine()) {
702      return Executors.defaultThreadFactory();
703    }
704    try {
705      return (ThreadFactory) Class.forName("com.google.appengine.api.ThreadManager")
706          .getMethod("currentRequestThreadFactory")
707          .invoke(null);
708    } catch (IllegalAccessException e) {
709      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
710    } catch (ClassNotFoundException e) {
711      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
712    } catch (NoSuchMethodException e) {
713      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
714    } catch (InvocationTargetException e) {
715      throw Throwables.propagate(e.getCause());
716    }
717  }
718
719  private static boolean isAppEngine() {
720    if (System.getProperty("com.google.appengine.runtime.environment") == null) {
721      return false;
722    }
723    try {
724      // If the current environment is null, we're not inside AppEngine.
725      return Class.forName("com.google.apphosting.api.ApiProxy")
726          .getMethod("getCurrentEnvironment")
727          .invoke(null) != null;
728    } catch (ClassNotFoundException e) {
729      // If ApiProxy doesn't exist, we're not on AppEngine at all.
730      return false;
731    } catch (InvocationTargetException e) {
732      // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
733      return false;
734    } catch (IllegalAccessException e) {
735      // If the method isn't accessible, we're not on a supported version of AppEngine;
736      return false;
737    } catch (NoSuchMethodException e) {
738      // If the method doesn't exist, we're not on a supported version of AppEngine;
739      return false;
740    }
741  }
742
743  /**
744   * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name}
745   * unless changing the name is forbidden by the security manager.
746   */
747  static Thread newThread(String name, Runnable runnable) {
748    checkNotNull(name);
749    checkNotNull(runnable);
750    Thread result = platformThreadFactory().newThread(runnable);
751    try {
752      result.setName(name);
753    } catch (SecurityException e) {
754      // OK if we can't set the name in this environment.
755    }
756    return result;
757  }
758
759  // TODO(user): provide overloads for ListeningExecutorService? ListeningScheduledExecutorService?
760  // TODO(user): provide overloads that take constant strings? Function<Runnable, String>s to
761  // calculate names?
762
763  /**
764   * Creates an {@link Executor} that renames the {@link Thread threads} that its tasks run in.
765   *
766   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
767   * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
768   * prevents the renaming then it will be skipped but the tasks will still execute.
769   *
770   *
771   * @param executor The executor to decorate
772   * @param nameSupplier The source of names for each task
773   */
774  static Executor renamingDecorator(final Executor executor, final Supplier<String> nameSupplier) {
775    checkNotNull(executor);
776    checkNotNull(nameSupplier);
777    if (isAppEngine()) {
778      // AppEngine doesn't support thread renaming, so don't even try
779      return executor;
780    }
781    return new Executor() {
782      @Override public void execute(Runnable command) {
783        executor.execute(Callables.threadRenaming(command, nameSupplier));
784      }
785    };
786  }
787
788  /**
789   * Creates an {@link ExecutorService} that renames the {@link Thread threads} that its tasks run
790   * in.
791   *
792   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
793   * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
794   * prevents the renaming then it will be skipped but the tasks will still execute.
795   *
796   *
797   * @param service The executor to decorate
798   * @param nameSupplier The source of names for each task
799   */
800  static ExecutorService renamingDecorator(final ExecutorService service,
801      final Supplier<String> nameSupplier) {
802    checkNotNull(service);
803    checkNotNull(nameSupplier);
804    if (isAppEngine()) {
805      // AppEngine doesn't support thread renaming, so don't even try.
806      return service;
807    }
808    return new WrappingExecutorService(service) {
809      @Override protected <T> Callable<T> wrapTask(Callable<T> callable) {
810        return Callables.threadRenaming(callable, nameSupplier);
811      }
812      @Override protected Runnable wrapTask(Runnable command) {
813        return Callables.threadRenaming(command, nameSupplier);
814      }
815    };
816  }
817
818  /**
819   * Creates a {@link ScheduledExecutorService} that renames the {@link Thread threads} that its
820   * tasks run in.
821   *
822   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
823   * right before each task is run.  The renaming is best effort, if a {@link SecurityManager}
824   * prevents the renaming then it will be skipped but the tasks will still execute.
825   *
826   *
827   * @param service The executor to decorate
828   * @param nameSupplier The source of names for each task
829   */
830  static ScheduledExecutorService renamingDecorator(final ScheduledExecutorService service,
831      final Supplier<String> nameSupplier) {
832    checkNotNull(service);
833    checkNotNull(nameSupplier);
834    if (isAppEngine()) {
835      // AppEngine doesn't support thread renaming, so don't even try.
836      return service;
837    }
838    return new WrappingScheduledExecutorService(service) {
839      @Override protected <T> Callable<T> wrapTask(Callable<T> callable) {
840        return Callables.threadRenaming(callable, nameSupplier);
841      }
842      @Override protected Runnable wrapTask(Runnable command) {
843        return Callables.threadRenaming(command, nameSupplier);
844      }
845    };
846  }
847
848  /**
849   * Shuts down the given executor gradually, first disabling new submissions and later cancelling
850   * existing tasks.
851   *
852   * <p>The method takes the following steps:
853   * <ol>
854   *  <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of new submitted tasks.
855   *  <li>waits for half of the specified timeout.
856   *  <li>if the timeout expires, it calls {@link ExecutorService#shutdownNow()}, cancelling
857   *  pending tasks and interrupting running tasks.
858   *  <li>waits for the other half of the specified timeout.
859   * </ol>
860   *
861   * <p>If, at any step of the process, the given executor is terminated or the calling thread is
862   * interrupted, the method may return without executing any remaining steps.
863   *
864   * @param service the {@code ExecutorService} to shut down
865   * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate
866   * @param unit the time unit of the timeout argument
867   * @return {@code true) if the pool was terminated successfully, {@code false} if the
868   *     {@code ExecutorService} could not terminate <b>or</b> the thread running this method
869   *     is interrupted while waiting for the {@code ExecutorService} to terminate
870   * @since 17.0
871   */
872  @Beta
873  public static boolean shutdownAndAwaitTermination(
874      ExecutorService service, long timeout, TimeUnit unit) {
875    checkNotNull(unit);
876    // Disable new tasks from being submitted
877    service.shutdown();
878    try {
879      long halfTimeoutNanos = TimeUnit.NANOSECONDS.convert(timeout, unit) / 2;
880      // Wait for half the duration of the timeout for existing tasks to terminate
881      if (!service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
882        // Cancel currently executing tasks
883        service.shutdownNow();
884        // Wait the other half of the timeout for tasks to respond to being cancelled
885        service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
886      }
887    } catch (InterruptedException ie) {
888      // Preserve interrupt status
889      Thread.currentThread().interrupt();
890      // (Re-)Cancel if current thread also interrupted
891      service.shutdownNow();
892    }
893    return service.isTerminated();
894  }
895}