001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.util.concurrent.Internal.toNanosSaturated;
020import static java.util.Objects.requireNonNull;
021
022import com.google.common.annotations.GwtCompatible;
023import com.google.common.annotations.GwtIncompatible;
024import com.google.common.annotations.J2ktIncompatible;
025import com.google.common.annotations.VisibleForTesting;
026import com.google.common.base.Supplier;
027import com.google.common.base.Throwables;
028import com.google.common.collect.Lists;
029import com.google.common.collect.Queues;
030import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
031import com.google.errorprone.annotations.CanIgnoreReturnValue;
032import com.google.errorprone.annotations.concurrent.GuardedBy;
033import java.lang.reflect.InvocationTargetException;
034import java.time.Duration;
035import java.util.Collection;
036import java.util.Collections;
037import java.util.Iterator;
038import java.util.List;
039import java.util.concurrent.BlockingQueue;
040import java.util.concurrent.Callable;
041import java.util.concurrent.Delayed;
042import java.util.concurrent.ExecutionException;
043import java.util.concurrent.Executor;
044import java.util.concurrent.ExecutorService;
045import java.util.concurrent.Executors;
046import java.util.concurrent.Future;
047import java.util.concurrent.RejectedExecutionException;
048import java.util.concurrent.ScheduledExecutorService;
049import java.util.concurrent.ScheduledFuture;
050import java.util.concurrent.ScheduledThreadPoolExecutor;
051import java.util.concurrent.ThreadFactory;
052import java.util.concurrent.ThreadPoolExecutor;
053import java.util.concurrent.TimeUnit;
054import java.util.concurrent.TimeoutException;
055import org.checkerframework.checker.nullness.qual.Nullable;
056
057/**
058 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link ExecutorService},
059 * and {@link java.util.concurrent.ThreadFactory}.
060 *
061 * @author Eric Fellheimer
062 * @author Kyle Littlefield
063 * @author Justin Mahoney
064 * @since 3.0
065 */
066@GwtCompatible(emulated = true)
067@ElementTypesAreNonnullByDefault
068public final class MoreExecutors {
069  private MoreExecutors() {}
070
071  /**
072   * Converts the given ThreadPoolExecutor into an ExecutorService that exits when the application
073   * is complete. It does so by using daemon threads and adding a shutdown hook to wait for their
074   * completion.
075   *
076   * <p>This is mainly for fixed thread pools. See {@link Executors#newFixedThreadPool(int)}.
077   *
078   * @param executor the executor to modify to make sure it exits when the application is finished
079   * @param terminationTimeout how long to wait for the executor to finish before terminating the
080   *     JVM
081   * @return an unmodifiable version of the input which will not hang the JVM
082   * @since 28.0
083   */
084  @J2ktIncompatible
085  @GwtIncompatible // TODO
086  public static ExecutorService getExitingExecutorService(
087      ThreadPoolExecutor executor, Duration terminationTimeout) {
088    return getExitingExecutorService(
089        executor, toNanosSaturated(terminationTimeout), TimeUnit.NANOSECONDS);
090  }
091
092  /**
093   * Converts the given ThreadPoolExecutor into an ExecutorService that exits when the application
094   * is complete. It does so by using daemon threads and adding a shutdown hook to wait for their
095   * completion.
096   *
097   * <p>This is mainly for fixed thread pools. See {@link Executors#newFixedThreadPool(int)}.
098   *
099   * @param executor the executor to modify to make sure it exits when the application is finished
100   * @param terminationTimeout how long to wait for the executor to finish before terminating the
101   *     JVM
102   * @param timeUnit unit of time for the time parameter
103   * @return an unmodifiable version of the input which will not hang the JVM
104   */
105  @J2ktIncompatible
106  @GwtIncompatible // TODO
107  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
108  public static ExecutorService getExitingExecutorService(
109      ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
110    return new Application().getExitingExecutorService(executor, terminationTimeout, timeUnit);
111  }
112
113  /**
114   * Converts the given ThreadPoolExecutor into an ExecutorService that exits when the application
115   * is complete. It does so by using daemon threads and adding a shutdown hook to wait for their
116   * completion.
117   *
118   * <p>This method waits 120 seconds before continuing with JVM termination, even if the executor
119   * has not finished its work.
120   *
121   * <p>This is mainly for fixed thread pools. See {@link Executors#newFixedThreadPool(int)}.
122   *
123   * @param executor the executor to modify to make sure it exits when the application is finished
124   * @return an unmodifiable version of the input which will not hang the JVM
125   */
126  @J2ktIncompatible
127  @GwtIncompatible // concurrency
128  public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
129    return new Application().getExitingExecutorService(executor);
130  }
131
132  /**
133   * Converts the given ScheduledThreadPoolExecutor into a ScheduledExecutorService that exits when
134   * the application is complete. It does so by using daemon threads and adding a shutdown hook to
135   * wait for their completion.
136   *
137   * <p>This is mainly for fixed thread pools. See {@link Executors#newScheduledThreadPool(int)}.
138   *
139   * @param executor the executor to modify to make sure it exits when the application is finished
140   * @param terminationTimeout how long to wait for the executor to finish before terminating the
141   *     JVM
142   * @return an unmodifiable version of the input which will not hang the JVM
143   * @since 28.0
144   */
145  @J2ktIncompatible
146  @GwtIncompatible // java.time.Duration
147  public static ScheduledExecutorService getExitingScheduledExecutorService(
148      ScheduledThreadPoolExecutor executor, Duration terminationTimeout) {
149    return getExitingScheduledExecutorService(
150        executor, toNanosSaturated(terminationTimeout), TimeUnit.NANOSECONDS);
151  }
152
153  /**
154   * Converts the given ScheduledThreadPoolExecutor into a ScheduledExecutorService that exits when
155   * the application is complete. It does so by using daemon threads and adding a shutdown hook to
156   * wait for their completion.
157   *
158   * <p>This is mainly for fixed thread pools. See {@link Executors#newScheduledThreadPool(int)}.
159   *
160   * @param executor the executor to modify to make sure it exits when the application is finished
161   * @param terminationTimeout how long to wait for the executor to finish before terminating the
162   *     JVM
163   * @param timeUnit unit of time for the time parameter
164   * @return an unmodifiable version of the input which will not hang the JVM
165   */
166  @J2ktIncompatible
167  @GwtIncompatible // TODO
168  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
169  public static ScheduledExecutorService getExitingScheduledExecutorService(
170      ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
171    return new Application()
172        .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
173  }
174
175  /**
176   * Converts the given ScheduledThreadPoolExecutor into a ScheduledExecutorService that exits when
177   * the application is complete. It does so by using daemon threads and adding a shutdown hook to
178   * wait for their completion.
179   *
180   * <p>This method waits 120 seconds before continuing with JVM termination, even if the executor
181   * has not finished its work.
182   *
183   * <p>This is mainly for fixed thread pools. See {@link Executors#newScheduledThreadPool(int)}.
184   *
185   * @param executor the executor to modify to make sure it exits when the application is finished
186   * @return an unmodifiable version of the input which will not hang the JVM
187   */
188  @J2ktIncompatible
189  @GwtIncompatible // TODO
190  public static ScheduledExecutorService getExitingScheduledExecutorService(
191      ScheduledThreadPoolExecutor executor) {
192    return new Application().getExitingScheduledExecutorService(executor);
193  }
194
195  /**
196   * Add a shutdown hook to wait for thread completion in the given {@link ExecutorService service}.
197   * This is useful if the given service uses daemon threads, and we want to keep the JVM from
198   * exiting immediately on shutdown, instead giving these daemon threads a chance to terminate
199   * normally.
200   *
201   * @param service ExecutorService which uses daemon threads
202   * @param terminationTimeout how long to wait for the executor to finish before terminating the
203   *     JVM
204   * @since 28.0
205   */
206  @J2ktIncompatible
207  @GwtIncompatible // java.time.Duration
208  public static void addDelayedShutdownHook(ExecutorService service, Duration terminationTimeout) {
209    addDelayedShutdownHook(service, toNanosSaturated(terminationTimeout), TimeUnit.NANOSECONDS);
210  }
211
212  /**
213   * Add a shutdown hook to wait for thread completion in the given {@link ExecutorService service}.
214   * This is useful if the given service uses daemon threads, and we want to keep the JVM from
215   * exiting immediately on shutdown, instead giving these daemon threads a chance to terminate
216   * normally.
217   *
218   * @param service ExecutorService which uses daemon threads
219   * @param terminationTimeout how long to wait for the executor to finish before terminating the
220   *     JVM
221   * @param timeUnit unit of time for the time parameter
222   */
223  @J2ktIncompatible
224  @GwtIncompatible // TODO
225  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
226  public static void addDelayedShutdownHook(
227      ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
228    new Application().addDelayedShutdownHook(service, terminationTimeout, timeUnit);
229  }
230
231  /** Represents the current application to register shutdown hooks. */
232  @J2ktIncompatible
233  @GwtIncompatible // TODO
234  @VisibleForTesting
235  static class Application {
236
237    final ExecutorService getExitingExecutorService(
238        ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
239      useDaemonThreadFactory(executor);
240      ExecutorService service = Executors.unconfigurableExecutorService(executor);
241      addDelayedShutdownHook(executor, terminationTimeout, timeUnit);
242      return service;
243    }
244
245    final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
246      return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
247    }
248
249    final ScheduledExecutorService getExitingScheduledExecutorService(
250        ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
251      useDaemonThreadFactory(executor);
252      ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
253      addDelayedShutdownHook(executor, terminationTimeout, timeUnit);
254      return service;
255    }
256
257    final ScheduledExecutorService getExitingScheduledExecutorService(
258        ScheduledThreadPoolExecutor executor) {
259      return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
260    }
261
262    final void addDelayedShutdownHook(
263        final ExecutorService service, final long terminationTimeout, final TimeUnit timeUnit) {
264      checkNotNull(service);
265      checkNotNull(timeUnit);
266      addShutdownHook(
267          MoreExecutors.newThread(
268              "DelayedShutdownHook-for-" + service,
269              new Runnable() {
270                @Override
271                public void run() {
272                  try {
273                    // We'd like to log progress and failures that may arise in the
274                    // following code, but unfortunately the behavior of logging
275                    // is undefined in shutdown hooks.
276                    // This is because the logging code installs a shutdown hook of its
277                    // own. See Cleaner class inside {@link LogManager}.
278                    service.shutdown();
279                    service.awaitTermination(terminationTimeout, timeUnit);
280                  } catch (InterruptedException ignored) {
281                    // We're shutting down anyway, so just ignore.
282                  }
283                }
284              }));
285    }
286
287    @VisibleForTesting
288    void addShutdownHook(Thread hook) {
289      Runtime.getRuntime().addShutdownHook(hook);
290    }
291  }
292
293  @J2ktIncompatible
294  @GwtIncompatible // TODO
295  private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
296    executor.setThreadFactory(
297        new ThreadFactoryBuilder()
298            .setDaemon(true)
299            .setThreadFactory(executor.getThreadFactory())
300            .build());
301  }
302
303  // See newDirectExecutorService javadoc for behavioral notes.
304  @J2ktIncompatible
305  @GwtIncompatible // TODO
306  private static final class DirectExecutorService extends AbstractListeningExecutorService {
307    /** Lock used whenever accessing the state variables (runningTasks, shutdown) of the executor */
308    private final Object lock = new Object();
309
310    /*
311     * Conceptually, these two variables describe the executor being in
312     * one of three states:
313     *   - Active: shutdown == false
314     *   - Shutdown: runningTasks > 0 and shutdown == true
315     *   - Terminated: runningTasks == 0 and shutdown == true
316     */
317    @GuardedBy("lock")
318    private int runningTasks = 0;
319
320    @GuardedBy("lock")
321    private boolean shutdown = false;
322
323    @Override
324    public void execute(Runnable command) {
325      startTask();
326      try {
327        command.run();
328      } finally {
329        endTask();
330      }
331    }
332
333    @Override
334    public boolean isShutdown() {
335      synchronized (lock) {
336        return shutdown;
337      }
338    }
339
340    @Override
341    public void shutdown() {
342      synchronized (lock) {
343        shutdown = true;
344        if (runningTasks == 0) {
345          lock.notifyAll();
346        }
347      }
348    }
349
350    // See newDirectExecutorService javadoc for unusual behavior of this method.
351    @Override
352    public List<Runnable> shutdownNow() {
353      shutdown();
354      return Collections.emptyList();
355    }
356
357    @Override
358    public boolean isTerminated() {
359      synchronized (lock) {
360        return shutdown && runningTasks == 0;
361      }
362    }
363
364    @Override
365    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
366      long nanos = unit.toNanos(timeout);
367      synchronized (lock) {
368        while (true) {
369          if (shutdown && runningTasks == 0) {
370            return true;
371          } else if (nanos <= 0) {
372            return false;
373          } else {
374            long now = System.nanoTime();
375            TimeUnit.NANOSECONDS.timedWait(lock, nanos);
376            nanos -= System.nanoTime() - now; // subtract the actual time we waited
377          }
378        }
379      }
380    }
381
382    /**
383     * Checks if the executor has been shut down and increments the running task count.
384     *
385     * @throws RejectedExecutionException if the executor has been previously shutdown
386     */
387    private void startTask() {
388      synchronized (lock) {
389        if (shutdown) {
390          throw new RejectedExecutionException("Executor already shutdown");
391        }
392        runningTasks++;
393      }
394    }
395
396    /** Decrements the running task count. */
397    private void endTask() {
398      synchronized (lock) {
399        int numRunning = --runningTasks;
400        if (numRunning == 0) {
401          lock.notifyAll();
402        }
403      }
404    }
405  }
406
407  /**
408   * Creates an executor service that runs each task in the thread that invokes {@code
409   * execute/submit}, as in {@code ThreadPoolExecutor.CallerRunsPolicy}. This applies both to
410   * individually submitted tasks and to collections of tasks submitted via {@code invokeAll} or
411   * {@code invokeAny}. In the latter case, tasks will run serially on the calling thread. Tasks are
412   * run to completion before a {@code Future} is returned to the caller (unless the executor has
413   * been shutdown).
414   *
415   * <p>Although all tasks are immediately executed in the thread that submitted the task, this
416   * {@code ExecutorService} imposes a small locking overhead on each task submission in order to
417   * implement shutdown and termination behavior.
418   *
419   * <p>The implementation deviates from the {@code ExecutorService} specification with regards to
420   * the {@code shutdownNow} method. First, "best-effort" with regards to canceling running tasks is
421   * implemented as "no-effort". No interrupts or other attempts are made to stop threads executing
422   * tasks. Second, the returned list will always be empty, as any submitted task is considered to
423   * have started execution. This applies also to tasks given to {@code invokeAll} or {@code
424   * invokeAny} which are pending serial execution, even the subset of the tasks that have not yet
425   * started execution. It is unclear from the {@code ExecutorService} specification if these should
426   * be included, and it's much easier to implement the interpretation that they not be. Finally, a
427   * call to {@code shutdown} or {@code shutdownNow} may result in concurrent calls to {@code
428   * invokeAll/invokeAny} throwing RejectedExecutionException, although a subset of the tasks may
429   * already have been executed.
430   *
431   * @since 18.0 (present as MoreExecutors.sameThreadExecutor() since 10.0)
432   */
433  @J2ktIncompatible
434  @GwtIncompatible // TODO
435  public static ListeningExecutorService newDirectExecutorService() {
436    return new DirectExecutorService();
437  }
438
439  /**
440   * Returns an {@link Executor} that runs each task in the thread that invokes {@link
441   * Executor#execute execute}, as in {@code ThreadPoolExecutor.CallerRunsPolicy}.
442   *
443   * <p>This executor is appropriate for tasks that are lightweight and not deeply chained.
444   * Inappropriate {@code directExecutor} usage can cause problems, and these problems can be
445   * difficult to reproduce because they depend on timing. For example:
446   *
447   * <ul>
448   *   <li>When a {@code ListenableFuture} listener is registered to run under {@code
449   *       directExecutor}, the listener can execute in any of three possible threads:
450   *       <ol>
451   *         <li>When a thread attaches a listener to a {@code ListenableFuture} that's already
452   *             complete, the listener runs immediately in that thread.
453   *         <li>When a thread attaches a listener to a {@code ListenableFuture} that's
454   *             <em>in</em>complete and the {@code ListenableFuture} later completes normally, the
455   *             listener runs in the thread that completes the {@code ListenableFuture}.
456   *         <li>When a listener is attached to a {@code ListenableFuture} and the {@code
457   *             ListenableFuture} gets cancelled, the listener runs immediately in the thread that
458   *             cancelled the {@code Future}.
459   *       </ol>
460   *       Given all these possibilities, it is frequently possible for listeners to execute in UI
461   *       threads, RPC network threads, or other latency-sensitive threads. In those cases, slow
462   *       listeners can harm responsiveness, slow the system as a whole, or worse. (See also the
463   *       note about locking below.)
464   *   <li>If many tasks will be triggered by the same event, one heavyweight task may delay other
465   *       tasks -- even tasks that are not themselves {@code directExecutor} tasks.
466   *   <li>If many such tasks are chained together (such as with {@code
467   *       future.transform(...).transform(...).transform(...)....}), they may overflow the stack.
468   *       (In simple cases, callers can avoid this by registering all tasks with the same {@link
469   *       MoreExecutors#newSequentialExecutor} wrapper around {@code directExecutor()}. More
470   *       complex cases may require using thread pools or making deeper changes.)
471   *   <li>If an exception propagates out of a {@code Runnable}, it is not necessarily seen by any
472   *       {@code UncaughtExceptionHandler} for the thread. For example, if the callback passed to
473   *       {@link Futures#addCallback} throws an exception, that exception will be typically be
474   *       logged by the {@link ListenableFuture} implementation, even if the thread is configured
475   *       to do something different. In other cases, no code will catch the exception, and it may
476   *       terminate whichever thread happens to trigger the execution.
477   * </ul>
478   *
479   * A specific warning about locking: Code that executes user-supplied tasks, such as {@code
480   * ListenableFuture} listeners, should take care not to do so while holding a lock. Additionally,
481   * as a further line of defense, prefer not to perform any locking inside a task that will be run
482   * under {@code directExecutor}: Not only might the wait for a lock be long, but if the running
483   * thread was holding a lock, the listener may deadlock or break lock isolation.
484   *
485   * <p>This instance is equivalent to:
486   *
487   * <pre>{@code
488   * final class DirectExecutor implements Executor {
489   *   public void execute(Runnable r) {
490   *     r.run();
491   *   }
492   * }
493   * }</pre>
494   *
495   * <p>This should be preferred to {@link #newDirectExecutorService()} because implementing the
496   * {@link ExecutorService} subinterface necessitates significant performance overhead.
497   *
498   * @since 18.0
499   */
500  public static Executor directExecutor() {
501    return DirectExecutor.INSTANCE;
502  }
503
504  /**
505   * Returns an {@link Executor} that runs each task executed sequentially, such that no two tasks
506   * are running concurrently.
507   *
508   * <p>{@linkplain Executor#execute executed} tasks have a happens-before order as defined in the
509   * Java Language Specification. Tasks execute with the same happens-before order that the function
510   * calls to {@link Executor#execute `execute()`} that submitted those tasks had.
511   *
512   * <p>The executor uses {@code delegate} in order to {@link Executor#execute execute} each task in
513   * turn, and does not create any threads of its own.
514   *
515   * <p>After execution begins on a thread from the {@code delegate} {@link Executor}, tasks are
516   * polled and executed from a task queue until there are no more tasks. The thread will not be
517   * released until there are no more tasks to run.
518   *
519   * <p>If a task is submitted while a thread is executing tasks from the task queue, the thread
520   * will not be released until that submitted task is also complete.
521   *
522   * <p>If a task is {@linkplain Thread#interrupt interrupted} while a task is running:
523   *
524   * <ol>
525   *   <li>execution will not stop until the task queue is empty.
526   *   <li>tasks will begin execution with the thread marked as not interrupted - any interruption
527   *       applies only to the task that was running at the point of interruption.
528   *   <li>if the thread was interrupted before the SequentialExecutor's worker begins execution,
529   *       the interrupt will be restored to the thread after it completes so that its {@code
530   *       delegate} Executor may process the interrupt.
531   *   <li>subtasks are run with the thread uninterrupted and interrupts received during execution
532   *       of a task are ignored.
533   * </ol>
534   *
535   * <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
536   * If an {@code Error} is thrown, the error will propagate and execution will stop until the next
537   * time a task is submitted.
538   *
539   * <p>When an {@code Error} is thrown by an executed task, previously submitted tasks may never
540   * run. An attempt will be made to restart execution on the next call to {@code execute}. If the
541   * {@code delegate} has begun to reject execution, the previously submitted tasks may never run,
542   * despite not throwing a RejectedExecutionException synchronously with the call to {@code
543   * execute}. If this behaviour is problematic, use an Executor with a single thread (e.g. {@link
544   * Executors#newSingleThreadExecutor}).
545   *
546   * @since 23.3 (since 23.1 as {@code sequentialExecutor})
547   */
548  @J2ktIncompatible
549  @GwtIncompatible
550  public static Executor newSequentialExecutor(Executor delegate) {
551    return new SequentialExecutor(delegate);
552  }
553
554  /**
555   * Creates an {@link ExecutorService} whose {@code submit} and {@code invokeAll} methods submit
556   * {@link ListenableFutureTask} instances to the given delegate executor. Those methods, as well
557   * as {@code execute} and {@code invokeAny}, are implemented in terms of calls to {@code
558   * delegate.execute}. All other methods are forwarded unchanged to the delegate. This implies that
559   * the returned {@code ListeningExecutorService} never calls the delegate's {@code submit}, {@code
560   * invokeAll}, and {@code invokeAny} methods, so any special handling of tasks must be implemented
561   * in the delegate's {@code execute} method or by wrapping the returned {@code
562   * ListeningExecutorService}.
563   *
564   * <p>If the delegate executor was already an instance of {@code ListeningExecutorService}, it is
565   * returned untouched, and the rest of this documentation does not apply.
566   *
567   * @since 10.0
568   */
569  @J2ktIncompatible
570  @GwtIncompatible // TODO
571  public static ListeningExecutorService listeningDecorator(ExecutorService delegate) {
572    return (delegate instanceof ListeningExecutorService)
573        ? (ListeningExecutorService) delegate
574        : (delegate instanceof ScheduledExecutorService)
575            ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
576            : new ListeningDecorator(delegate);
577  }
578
579  /**
580   * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code invokeAll} methods
581   * submit {@link ListenableFutureTask} instances to the given delegate executor. Those methods, as
582   * well as {@code execute} and {@code invokeAny}, are implemented in terms of calls to {@code
583   * delegate.execute}. All other methods are forwarded unchanged to the delegate. This implies that
584   * the returned {@code ListeningScheduledExecutorService} never calls the delegate's {@code
585   * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special handling of tasks
586   * must be implemented in the delegate's {@code execute} method or by wrapping the returned {@code
587   * ListeningScheduledExecutorService}.
588   *
589   * <p>If the delegate executor was already an instance of {@code
590   * ListeningScheduledExecutorService}, it is returned untouched, and the rest of this
591   * documentation does not apply.
592   *
593   * @since 10.0
594   */
595  @J2ktIncompatible
596  @GwtIncompatible // TODO
597  public static ListeningScheduledExecutorService listeningDecorator(
598      ScheduledExecutorService delegate) {
599    return (delegate instanceof ListeningScheduledExecutorService)
600        ? (ListeningScheduledExecutorService) delegate
601        : new ScheduledListeningDecorator(delegate);
602  }
603
604  @J2ktIncompatible
605  @GwtIncompatible // TODO
606  private static class ListeningDecorator extends AbstractListeningExecutorService {
607    private final ExecutorService delegate;
608
609    ListeningDecorator(ExecutorService delegate) {
610      this.delegate = checkNotNull(delegate);
611    }
612
613    @Override
614    public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
615      return delegate.awaitTermination(timeout, unit);
616    }
617
618    @Override
619    public final boolean isShutdown() {
620      return delegate.isShutdown();
621    }
622
623    @Override
624    public final boolean isTerminated() {
625      return delegate.isTerminated();
626    }
627
628    @Override
629    public final void shutdown() {
630      delegate.shutdown();
631    }
632
633    @Override
634    public final List<Runnable> shutdownNow() {
635      return delegate.shutdownNow();
636    }
637
638    @Override
639    public final void execute(Runnable command) {
640      delegate.execute(command);
641    }
642
643    @Override
644    public final String toString() {
645      return super.toString() + "[" + delegate + "]";
646    }
647  }
648
649  @J2ktIncompatible
650  @GwtIncompatible // TODO
651  private static final class ScheduledListeningDecorator extends ListeningDecorator
652      implements ListeningScheduledExecutorService {
653    @SuppressWarnings("hiding")
654    final ScheduledExecutorService delegate;
655
656    ScheduledListeningDecorator(ScheduledExecutorService delegate) {
657      super(delegate);
658      this.delegate = checkNotNull(delegate);
659    }
660
661    @Override
662    public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
663      TrustedListenableFutureTask<@Nullable Void> task =
664          TrustedListenableFutureTask.create(command, null);
665      ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
666      return new ListenableScheduledTask<@Nullable Void>(task, scheduled);
667    }
668
669    @Override
670    public <V extends @Nullable Object> ListenableScheduledFuture<V> schedule(
671        Callable<V> callable, long delay, TimeUnit unit) {
672      TrustedListenableFutureTask<V> task = TrustedListenableFutureTask.create(callable);
673      ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
674      return new ListenableScheduledTask<>(task, scheduled);
675    }
676
677    @Override
678    public ListenableScheduledFuture<?> scheduleAtFixedRate(
679        Runnable command, long initialDelay, long period, TimeUnit unit) {
680      NeverSuccessfulListenableFutureTask task = new NeverSuccessfulListenableFutureTask(command);
681      ScheduledFuture<?> scheduled = delegate.scheduleAtFixedRate(task, initialDelay, period, unit);
682      return new ListenableScheduledTask<@Nullable Void>(task, scheduled);
683    }
684
685    @Override
686    public ListenableScheduledFuture<?> scheduleWithFixedDelay(
687        Runnable command, long initialDelay, long delay, TimeUnit unit) {
688      NeverSuccessfulListenableFutureTask task = new NeverSuccessfulListenableFutureTask(command);
689      ScheduledFuture<?> scheduled =
690          delegate.scheduleWithFixedDelay(task, initialDelay, delay, unit);
691      return new ListenableScheduledTask<@Nullable Void>(task, scheduled);
692    }
693
694    private static final class ListenableScheduledTask<V extends @Nullable Object>
695        extends SimpleForwardingListenableFuture<V> implements ListenableScheduledFuture<V> {
696
697      private final ScheduledFuture<?> scheduledDelegate;
698
699      public ListenableScheduledTask(
700          ListenableFuture<V> listenableDelegate, ScheduledFuture<?> scheduledDelegate) {
701        super(listenableDelegate);
702        this.scheduledDelegate = scheduledDelegate;
703      }
704
705      @Override
706      public boolean cancel(boolean mayInterruptIfRunning) {
707        boolean cancelled = super.cancel(mayInterruptIfRunning);
708        if (cancelled) {
709          // Unless it is cancelled, the delegate may continue being scheduled
710          scheduledDelegate.cancel(mayInterruptIfRunning);
711
712          // TODO(user): Cancel "this" if "scheduledDelegate" is cancelled.
713        }
714        return cancelled;
715      }
716
717      @Override
718      public long getDelay(TimeUnit unit) {
719        return scheduledDelegate.getDelay(unit);
720      }
721
722      @Override
723      public int compareTo(Delayed other) {
724        return scheduledDelegate.compareTo(other);
725      }
726    }
727
728    @J2ktIncompatible
729    @GwtIncompatible // TODO
730    private static final class NeverSuccessfulListenableFutureTask
731        extends AbstractFuture.TrustedFuture<@Nullable Void> implements Runnable {
732      private final Runnable delegate;
733
734      public NeverSuccessfulListenableFutureTask(Runnable delegate) {
735        this.delegate = checkNotNull(delegate);
736      }
737
738      @Override
739      public void run() {
740        try {
741          delegate.run();
742        } catch (Throwable t) {
743          // Any Exception is either a RuntimeException or sneaky checked exception.
744          setException(t);
745          throw t;
746        }
747      }
748
749      @Override
750      protected String pendingToString() {
751        return "task=[" + delegate + "]";
752      }
753    }
754  }
755
756  /*
757   * This following method is a modified version of one found in
758   * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
759   * which contained the following notice:
760   *
761   * Written by Doug Lea with assistance from members of JCP JSR-166 Expert Group and released to
762   * the public domain, as explained at http://creativecommons.org/publicdomain/zero/1.0/
763   *
764   * Other contributors include Andrew Wright, Jeffrey Hayes, Pat Fisher, Mike Judd.
765   */
766
767  /**
768   * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
769   * implementations.
770   */
771  @J2ktIncompatible
772  @GwtIncompatible
773  @ParametricNullness
774  static <T extends @Nullable Object> T invokeAnyImpl(
775      ListeningExecutorService executorService,
776      Collection<? extends Callable<T>> tasks,
777      boolean timed,
778      Duration timeout)
779      throws InterruptedException, ExecutionException, TimeoutException {
780    return invokeAnyImpl(
781        executorService, tasks, timed, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
782  }
783
784  /**
785   * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
786   * implementations.
787   */
788  @SuppressWarnings({
789    "GoodTime", // should accept a java.time.Duration
790    "CatchingUnchecked", // sneaky checked exception
791  })
792  @J2ktIncompatible
793  @GwtIncompatible
794  @ParametricNullness
795  static <T extends @Nullable Object> T invokeAnyImpl(
796      ListeningExecutorService executorService,
797      Collection<? extends Callable<T>> tasks,
798      boolean timed,
799      long timeout,
800      TimeUnit unit)
801      throws InterruptedException, ExecutionException, TimeoutException {
802    checkNotNull(executorService);
803    checkNotNull(unit);
804    int ntasks = tasks.size();
805    checkArgument(ntasks > 0);
806    List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
807    BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
808    long timeoutNanos = unit.toNanos(timeout);
809
810    // For efficiency, especially in executors with limited
811    // parallelism, check to see if previously submitted tasks are
812    // done before submitting more of them. This interleaving
813    // plus the exception mechanics account for messiness of main
814    // loop.
815
816    try {
817      // Record exceptions so that if we fail to obtain any
818      // result, we can throw the last exception we got.
819      ExecutionException ee = null;
820      long lastTime = timed ? System.nanoTime() : 0;
821      Iterator<? extends Callable<T>> it = tasks.iterator();
822
823      futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
824      --ntasks;
825      int active = 1;
826
827      while (true) {
828        Future<T> f = futureQueue.poll();
829        if (f == null) {
830          if (ntasks > 0) {
831            --ntasks;
832            futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
833            ++active;
834          } else if (active == 0) {
835            break;
836          } else if (timed) {
837            f = futureQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
838            if (f == null) {
839              throw new TimeoutException();
840            }
841            long now = System.nanoTime();
842            timeoutNanos -= now - lastTime;
843            lastTime = now;
844          } else {
845            f = futureQueue.take();
846          }
847        }
848        if (f != null) {
849          --active;
850          try {
851            return f.get();
852          } catch (ExecutionException eex) {
853            ee = eex;
854          } catch (InterruptedException iex) {
855            throw iex;
856          } catch (Exception rex) { // sneaky checked exception
857            ee = new ExecutionException(rex);
858          }
859        }
860      }
861
862      if (ee == null) {
863        ee = new ExecutionException(null);
864      }
865      throw ee;
866    } finally {
867      for (Future<T> f : futures) {
868        f.cancel(true);
869      }
870    }
871  }
872
873  /**
874   * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
875   */
876  @J2ktIncompatible
877  @GwtIncompatible // TODO
878  private static <T extends @Nullable Object> ListenableFuture<T> submitAndAddQueueListener(
879      ListeningExecutorService executorService,
880      Callable<T> task,
881      final BlockingQueue<Future<T>> queue) {
882    final ListenableFuture<T> future = executorService.submit(task);
883    future.addListener(
884        new Runnable() {
885          @Override
886          public void run() {
887            queue.add(future);
888          }
889        },
890        directExecutor());
891    return future;
892  }
893
894  /**
895   * Returns a default thread factory used to create new threads.
896   *
897   * <p>When running on AppEngine with access to <a
898   * href="https://cloud.google.com/appengine/docs/standard/java/javadoc/">AppEngine legacy
899   * APIs</a>, this method returns {@code ThreadManager.currentRequestThreadFactory()}. Otherwise,
900   * it returns {@link Executors#defaultThreadFactory()}.
901   *
902   * @since 14.0
903   */
904  @J2ktIncompatible
905  @GwtIncompatible // concurrency
906  public static ThreadFactory platformThreadFactory() {
907    if (!isAppEngineWithApiClasses()) {
908      return Executors.defaultThreadFactory();
909    }
910    try {
911      return (ThreadFactory)
912          Class.forName("com.google.appengine.api.ThreadManager")
913              .getMethod("currentRequestThreadFactory")
914              .invoke(null);
915    } catch (IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
916      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
917    } catch (InvocationTargetException e) {
918      throw Throwables.propagate(e.getCause());
919    }
920  }
921
922  @J2ktIncompatible
923  @GwtIncompatible // TODO
924  private static boolean isAppEngineWithApiClasses() {
925    if (System.getProperty("com.google.appengine.runtime.environment") == null) {
926      return false;
927    }
928    try {
929      Class.forName("com.google.appengine.api.utils.SystemProperty");
930    } catch (ClassNotFoundException e) {
931      return false;
932    }
933    try {
934      // If the current environment is null, we're not inside AppEngine.
935      return Class.forName("com.google.apphosting.api.ApiProxy")
936              .getMethod("getCurrentEnvironment")
937              .invoke(null)
938          != null;
939    } catch (ClassNotFoundException e) {
940      // If ApiProxy doesn't exist, we're not on AppEngine at all.
941      return false;
942    } catch (InvocationTargetException e) {
943      // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
944      return false;
945    } catch (IllegalAccessException e) {
946      // If the method isn't accessible, we're not on a supported version of AppEngine;
947      return false;
948    } catch (NoSuchMethodException e) {
949      // If the method doesn't exist, we're not on a supported version of AppEngine;
950      return false;
951    }
952  }
953
954  /**
955   * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name} unless
956   * changing the name is forbidden by the security manager.
957   */
958  @J2ktIncompatible
959  @GwtIncompatible // concurrency
960  static Thread newThread(String name, Runnable runnable) {
961    checkNotNull(name);
962    checkNotNull(runnable);
963    // TODO(b/139726489): Confirm that null is impossible here.
964    Thread result = requireNonNull(platformThreadFactory().newThread(runnable));
965    try {
966      result.setName(name);
967    } catch (SecurityException e) {
968      // OK if we can't set the name in this environment.
969    }
970    return result;
971  }
972
973  // TODO(lukes): provide overloads for ListeningExecutorService? ListeningScheduledExecutorService?
974  // TODO(lukes): provide overloads that take constant strings? Function<Runnable, String>s to
975  // calculate names?
976
977  /**
978   * Creates an {@link Executor} that renames the {@link Thread threads} that its tasks run in.
979   *
980   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
981   * right before each task is run. The renaming is best effort, if a {@link SecurityManager}
982   * prevents the renaming then it will be skipped but the tasks will still execute.
983   *
984   * @param executor The executor to decorate
985   * @param nameSupplier The source of names for each task
986   */
987  @J2ktIncompatible
988  @GwtIncompatible // concurrency
989  static Executor renamingDecorator(final Executor executor, final Supplier<String> nameSupplier) {
990    checkNotNull(executor);
991    checkNotNull(nameSupplier);
992    return new Executor() {
993      @Override
994      public void execute(Runnable command) {
995        executor.execute(Callables.threadRenaming(command, nameSupplier));
996      }
997    };
998  }
999
1000  /**
1001   * Creates an {@link ExecutorService} that renames the {@link Thread threads} that its tasks run
1002   * in.
1003   *
1004   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
1005   * right before each task is run. The renaming is best effort, if a {@link SecurityManager}
1006   * prevents the renaming then it will be skipped but the tasks will still execute.
1007   *
1008   * @param service The executor to decorate
1009   * @param nameSupplier The source of names for each task
1010   */
1011  @J2ktIncompatible
1012  @GwtIncompatible // concurrency
1013  static ExecutorService renamingDecorator(
1014      final ExecutorService service, final Supplier<String> nameSupplier) {
1015    checkNotNull(service);
1016    checkNotNull(nameSupplier);
1017    return new WrappingExecutorService(service) {
1018      @Override
1019      protected <T extends @Nullable Object> Callable<T> wrapTask(Callable<T> callable) {
1020        return Callables.threadRenaming(callable, nameSupplier);
1021      }
1022
1023      @Override
1024      protected Runnable wrapTask(Runnable command) {
1025        return Callables.threadRenaming(command, nameSupplier);
1026      }
1027    };
1028  }
1029
1030  /**
1031   * Creates a {@link ScheduledExecutorService} that renames the {@link Thread threads} that its
1032   * tasks run in.
1033   *
1034   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
1035   * right before each task is run. The renaming is best effort, if a {@link SecurityManager}
1036   * prevents the renaming then it will be skipped but the tasks will still execute.
1037   *
1038   * @param service The executor to decorate
1039   * @param nameSupplier The source of names for each task
1040   */
1041  @J2ktIncompatible
1042  @GwtIncompatible // concurrency
1043  static ScheduledExecutorService renamingDecorator(
1044      final ScheduledExecutorService service, final Supplier<String> nameSupplier) {
1045    checkNotNull(service);
1046    checkNotNull(nameSupplier);
1047    return new WrappingScheduledExecutorService(service) {
1048      @Override
1049      protected <T extends @Nullable Object> Callable<T> wrapTask(Callable<T> callable) {
1050        return Callables.threadRenaming(callable, nameSupplier);
1051      }
1052
1053      @Override
1054      protected Runnable wrapTask(Runnable command) {
1055        return Callables.threadRenaming(command, nameSupplier);
1056      }
1057    };
1058  }
1059
1060  /**
1061   * Shuts down the given executor service gradually, first disabling new submissions and later, if
1062   * necessary, cancelling remaining tasks.
1063   *
1064   * <p>The method takes the following steps:
1065   *
1066   * <ol>
1067   *   <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of new submitted tasks.
1068   *   <li>awaits executor service termination for half of the specified timeout.
1069   *   <li>if the timeout expires, it calls {@link ExecutorService#shutdownNow()}, cancelling
1070   *       pending tasks and interrupting running tasks.
1071   *   <li>awaits executor service termination for the other half of the specified timeout.
1072   * </ol>
1073   *
1074   * <p>If, at any step of the process, the calling thread is interrupted, the method calls {@link
1075   * ExecutorService#shutdownNow()} and returns.
1076   *
1077   * @param service the {@code ExecutorService} to shut down
1078   * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate
1079   * @return {@code true} if the {@code ExecutorService} was terminated successfully, {@code false}
1080   *     if the call timed out or was interrupted
1081   * @since 28.0
1082   */
1083  @CanIgnoreReturnValue
1084  @J2ktIncompatible
1085  @GwtIncompatible // java.time.Duration
1086  public static boolean shutdownAndAwaitTermination(ExecutorService service, Duration timeout) {
1087    return shutdownAndAwaitTermination(service, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
1088  }
1089
1090  /**
1091   * Shuts down the given executor service gradually, first disabling new submissions and later, if
1092   * necessary, cancelling remaining tasks.
1093   *
1094   * <p>The method takes the following steps:
1095   *
1096   * <ol>
1097   *   <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of new submitted tasks.
1098   *   <li>awaits executor service termination for half of the specified timeout.
1099   *   <li>if the timeout expires, it calls {@link ExecutorService#shutdownNow()}, cancelling
1100   *       pending tasks and interrupting running tasks.
1101   *   <li>awaits executor service termination for the other half of the specified timeout.
1102   * </ol>
1103   *
1104   * <p>If, at any step of the process, the calling thread is interrupted, the method calls {@link
1105   * ExecutorService#shutdownNow()} and returns.
1106   *
1107   * @param service the {@code ExecutorService} to shut down
1108   * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate
1109   * @param unit the time unit of the timeout argument
1110   * @return {@code true} if the {@code ExecutorService} was terminated successfully, {@code false}
1111   *     if the call timed out or was interrupted
1112   * @since 17.0
1113   */
1114  @CanIgnoreReturnValue
1115  @J2ktIncompatible
1116  @GwtIncompatible // concurrency
1117  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
1118  public static boolean shutdownAndAwaitTermination(
1119      ExecutorService service, long timeout, TimeUnit unit) {
1120    long halfTimeoutNanos = unit.toNanos(timeout) / 2;
1121    // Disable new tasks from being submitted
1122    service.shutdown();
1123    try {
1124      // Wait for half the duration of the timeout for existing tasks to terminate
1125      if (!service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
1126        // Cancel currently executing tasks
1127        service.shutdownNow();
1128        // Wait the other half of the timeout for tasks to respond to being cancelled
1129        service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
1130      }
1131    } catch (InterruptedException ie) {
1132      // Preserve interrupt status
1133      Thread.currentThread().interrupt();
1134      // (Re-)Cancel if current thread also interrupted
1135      service.shutdownNow();
1136    }
1137    return service.isTerminated();
1138  }
1139
1140  /**
1141   * Returns an Executor that will propagate {@link RejectedExecutionException} from the delegate
1142   * executor to the given {@code future}.
1143   *
1144   * <p>Note, the returned executor can only be used once.
1145   */
1146  static Executor rejectionPropagatingExecutor(
1147      final Executor delegate, final AbstractFuture<?> future) {
1148    checkNotNull(delegate);
1149    checkNotNull(future);
1150    if (delegate == directExecutor()) {
1151      // directExecutor() cannot throw RejectedExecutionException
1152      return delegate;
1153    }
1154    return new Executor() {
1155      @Override
1156      public void execute(Runnable command) {
1157        try {
1158          delegate.execute(command);
1159        } catch (RejectedExecutionException e) {
1160          future.setException(e);
1161        }
1162      }
1163    };
1164  }
1165}