001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkArgument;
018import static com.google.common.base.Preconditions.checkNotNull;
019import static com.google.common.util.concurrent.Callables.threadRenaming;
020import static com.google.common.util.concurrent.Internal.toNanosSaturated;
021import static com.google.common.util.concurrent.SneakyThrows.sneakyThrow;
022import static java.util.Objects.requireNonNull;
023
024import com.google.common.annotations.GwtCompatible;
025import com.google.common.annotations.GwtIncompatible;
026import com.google.common.annotations.J2ktIncompatible;
027import com.google.common.annotations.VisibleForTesting;
028import com.google.common.base.Supplier;
029import com.google.common.collect.Lists;
030import com.google.common.collect.Queues;
031import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
032import com.google.errorprone.annotations.CanIgnoreReturnValue;
033import java.lang.reflect.InvocationTargetException;
034import java.time.Duration;
035import java.util.Collection;
036import java.util.Iterator;
037import java.util.List;
038import java.util.concurrent.BlockingQueue;
039import java.util.concurrent.Callable;
040import java.util.concurrent.Delayed;
041import java.util.concurrent.ExecutionException;
042import java.util.concurrent.Executor;
043import java.util.concurrent.ExecutorService;
044import java.util.concurrent.Executors;
045import java.util.concurrent.Future;
046import java.util.concurrent.RejectedExecutionException;
047import java.util.concurrent.ScheduledExecutorService;
048import java.util.concurrent.ScheduledFuture;
049import java.util.concurrent.ScheduledThreadPoolExecutor;
050import java.util.concurrent.ThreadFactory;
051import java.util.concurrent.ThreadPoolExecutor;
052import java.util.concurrent.TimeUnit;
053import java.util.concurrent.TimeoutException;
054import org.jspecify.annotations.Nullable;
055
056/**
057 * Factory and utility methods for {@link java.util.concurrent.Executor}, {@link ExecutorService},
058 * and {@link java.util.concurrent.ThreadFactory}.
059 *
060 * @author Eric Fellheimer
061 * @author Kyle Littlefield
062 * @author Justin Mahoney
063 * @since 3.0
064 */
065@GwtCompatible(emulated = true)
066public final class MoreExecutors {
067  private MoreExecutors() {}
068
069  /**
070   * Converts the given ThreadPoolExecutor into an ExecutorService that exits when the application
071   * is complete. It does so by using daemon threads and adding a shutdown hook to wait for their
072   * completion.
073   *
074   * <p>This is mainly for fixed thread pools. See {@link Executors#newFixedThreadPool(int)}.
075   *
076   * @param executor the executor to modify to make sure it exits when the application is finished
077   * @param terminationTimeout how long to wait for the executor to finish before terminating the
078   *     JVM
079   * @return an unmodifiable version of the input which will not hang the JVM
080   * @since 28.0 (but only since 33.4.0 in the Android flavor)
081   */
082  @J2ktIncompatible
083  @GwtIncompatible // TODO
084  public static ExecutorService getExitingExecutorService(
085      ThreadPoolExecutor executor, Duration terminationTimeout) {
086    return getExitingExecutorService(
087        executor, toNanosSaturated(terminationTimeout), TimeUnit.NANOSECONDS);
088  }
089
090  /**
091   * Converts the given ThreadPoolExecutor into an ExecutorService that exits when the application
092   * is complete. It does so by using daemon threads and adding a shutdown hook to wait for their
093   * completion.
094   *
095   * <p>This is mainly for fixed thread pools. See {@link Executors#newFixedThreadPool(int)}.
096   *
097   * @param executor the executor to modify to make sure it exits when the application is finished
098   * @param terminationTimeout how long to wait for the executor to finish before terminating the
099   *     JVM
100   * @param timeUnit unit of time for the time parameter
101   * @return an unmodifiable version of the input which will not hang the JVM
102   */
103  @J2ktIncompatible
104  @GwtIncompatible // TODO
105  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
106  public static ExecutorService getExitingExecutorService(
107      ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
108    return new Application().getExitingExecutorService(executor, terminationTimeout, timeUnit);
109  }
110
111  /**
112   * Converts the given ThreadPoolExecutor into an ExecutorService that exits when the application
113   * is complete. It does so by using daemon threads and adding a shutdown hook to wait for their
114   * completion.
115   *
116   * <p>This method waits 120 seconds before continuing with JVM termination, even if the executor
117   * has not finished its work.
118   *
119   * <p>This is mainly for fixed thread pools. See {@link Executors#newFixedThreadPool(int)}.
120   *
121   * @param executor the executor to modify to make sure it exits when the application is finished
122   * @return an unmodifiable version of the input which will not hang the JVM
123   */
124  @J2ktIncompatible
125  @GwtIncompatible // concurrency
126  public static ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
127    return new Application().getExitingExecutorService(executor);
128  }
129
130  /**
131   * Converts the given ScheduledThreadPoolExecutor into a ScheduledExecutorService that exits when
132   * the application is complete. It does so by using daemon threads and adding a shutdown hook to
133   * wait for their completion.
134   *
135   * <p>This is mainly for fixed thread pools. See {@link Executors#newScheduledThreadPool(int)}.
136   *
137   * @param executor the executor to modify to make sure it exits when the application is finished
138   * @param terminationTimeout how long to wait for the executor to finish before terminating the
139   *     JVM
140   * @return an unmodifiable version of the input which will not hang the JVM
141   * @since 28.0 (but only since 33.4.0 in the Android flavor)
142   */
143  @J2ktIncompatible
144  @GwtIncompatible // java.time.Duration
145  public static ScheduledExecutorService getExitingScheduledExecutorService(
146      ScheduledThreadPoolExecutor executor, Duration terminationTimeout) {
147    return getExitingScheduledExecutorService(
148        executor, toNanosSaturated(terminationTimeout), TimeUnit.NANOSECONDS);
149  }
150
151  /**
152   * Converts the given ScheduledThreadPoolExecutor into a ScheduledExecutorService that exits when
153   * the application is complete. It does so by using daemon threads and adding a shutdown hook to
154   * wait for their completion.
155   *
156   * <p>This is mainly for fixed thread pools. See {@link Executors#newScheduledThreadPool(int)}.
157   *
158   * @param executor the executor to modify to make sure it exits when the application is finished
159   * @param terminationTimeout how long to wait for the executor to finish before terminating the
160   *     JVM
161   * @param timeUnit unit of time for the time parameter
162   * @return an unmodifiable version of the input which will not hang the JVM
163   */
164  @J2ktIncompatible
165  @GwtIncompatible // TODO
166  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
167  public static ScheduledExecutorService getExitingScheduledExecutorService(
168      ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
169    return new Application()
170        .getExitingScheduledExecutorService(executor, terminationTimeout, timeUnit);
171  }
172
173  /**
174   * Converts the given ScheduledThreadPoolExecutor into a ScheduledExecutorService that exits when
175   * the application is complete. It does so by using daemon threads and adding a shutdown hook to
176   * wait for their completion.
177   *
178   * <p>This method waits 120 seconds before continuing with JVM termination, even if the executor
179   * has not finished its work.
180   *
181   * <p>This is mainly for fixed thread pools. See {@link Executors#newScheduledThreadPool(int)}.
182   *
183   * @param executor the executor to modify to make sure it exits when the application is finished
184   * @return an unmodifiable version of the input which will not hang the JVM
185   */
186  @J2ktIncompatible
187  @GwtIncompatible // TODO
188  public static ScheduledExecutorService getExitingScheduledExecutorService(
189      ScheduledThreadPoolExecutor executor) {
190    return new Application().getExitingScheduledExecutorService(executor);
191  }
192
193  /**
194   * Add a shutdown hook to wait for thread completion in the given {@link ExecutorService service}.
195   * This is useful if the given service uses daemon threads, and we want to keep the JVM from
196   * exiting immediately on shutdown, instead giving these daemon threads a chance to terminate
197   * normally.
198   *
199   * @param service ExecutorService which uses daemon threads
200   * @param terminationTimeout how long to wait for the executor to finish before terminating the
201   *     JVM
202   * @since 28.0 (but only since 33.4.0 in the Android flavor)
203   */
204  @J2ktIncompatible
205  @GwtIncompatible // java.time.Duration
206  public static void addDelayedShutdownHook(ExecutorService service, Duration terminationTimeout) {
207    addDelayedShutdownHook(service, toNanosSaturated(terminationTimeout), TimeUnit.NANOSECONDS);
208  }
209
210  /**
211   * Add a shutdown hook to wait for thread completion in the given {@link ExecutorService service}.
212   * This is useful if the given service uses daemon threads, and we want to keep the JVM from
213   * exiting immediately on shutdown, instead giving these daemon threads a chance to terminate
214   * normally.
215   *
216   * @param service ExecutorService which uses daemon threads
217   * @param terminationTimeout how long to wait for the executor to finish before terminating the
218   *     JVM
219   * @param timeUnit unit of time for the time parameter
220   */
221  @J2ktIncompatible
222  @GwtIncompatible // TODO
223  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
224  public static void addDelayedShutdownHook(
225      ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
226    new Application().addDelayedShutdownHook(service, terminationTimeout, timeUnit);
227  }
228
229  /** Represents the current application to register shutdown hooks. */
230  @J2ktIncompatible
231  @GwtIncompatible // TODO
232  @VisibleForTesting
233  static class Application {
234
235    final ExecutorService getExitingExecutorService(
236        ThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
237      useDaemonThreadFactory(executor);
238      ExecutorService service = Executors.unconfigurableExecutorService(executor);
239      addDelayedShutdownHook(executor, terminationTimeout, timeUnit);
240      return service;
241    }
242
243    final ExecutorService getExitingExecutorService(ThreadPoolExecutor executor) {
244      return getExitingExecutorService(executor, 120, TimeUnit.SECONDS);
245    }
246
247    final ScheduledExecutorService getExitingScheduledExecutorService(
248        ScheduledThreadPoolExecutor executor, long terminationTimeout, TimeUnit timeUnit) {
249      useDaemonThreadFactory(executor);
250      ScheduledExecutorService service = Executors.unconfigurableScheduledExecutorService(executor);
251      addDelayedShutdownHook(executor, terminationTimeout, timeUnit);
252      return service;
253    }
254
255    final ScheduledExecutorService getExitingScheduledExecutorService(
256        ScheduledThreadPoolExecutor executor) {
257      return getExitingScheduledExecutorService(executor, 120, TimeUnit.SECONDS);
258    }
259
260    final void addDelayedShutdownHook(
261        ExecutorService service, long terminationTimeout, TimeUnit timeUnit) {
262      checkNotNull(service);
263      checkNotNull(timeUnit);
264      addShutdownHook(
265          newThread(
266              "DelayedShutdownHook-for-" + service,
267              () -> {
268                service.shutdown();
269                try {
270                  // We'd like to log progress and failures that may arise in the
271                  // following code, but unfortunately the behavior of logging
272                  // is undefined in shutdown hooks.
273                  // This is because the logging code installs a shutdown hook of its
274                  // own. See Cleaner class inside {@link LogManager}.
275                  service.awaitTermination(terminationTimeout, timeUnit);
276                } catch (InterruptedException ignored) {
277                  // We're shutting down anyway, so just ignore.
278                }
279              }));
280    }
281
282    @VisibleForTesting
283    void addShutdownHook(Thread hook) {
284      Runtime.getRuntime().addShutdownHook(hook);
285    }
286  }
287
288  @J2ktIncompatible
289  @GwtIncompatible // TODO
290  private static void useDaemonThreadFactory(ThreadPoolExecutor executor) {
291    executor.setThreadFactory(
292        new ThreadFactoryBuilder()
293            .setDaemon(true)
294            .setThreadFactory(executor.getThreadFactory())
295            .build());
296  }
297
298  /**
299   * Creates an executor service that runs each task in the thread that invokes {@code
300   * execute/submit}, as in {@code ThreadPoolExecutor.CallerRunsPolicy}. This applies both to
301   * individually submitted tasks and to collections of tasks submitted via {@code invokeAll} or
302   * {@code invokeAny}. In the latter case, tasks will run serially on the calling thread. Tasks are
303   * run to completion before a {@code Future} is returned to the caller (unless the executor has
304   * been shutdown).
305   *
306   * <p>Although all tasks are immediately executed in the thread that submitted the task, this
307   * {@code ExecutorService} imposes a small locking overhead on each task submission in order to
308   * implement shutdown and termination behavior.
309   *
310   * <p>The implementation deviates from the {@code ExecutorService} specification with regards to
311   * the {@code shutdownNow} method. First, "best-effort" with regards to canceling running tasks is
312   * implemented as "no-effort". No interrupts or other attempts are made to stop threads executing
313   * tasks. Second, the returned list will always be empty, as any submitted task is considered to
314   * have started execution. This applies also to tasks given to {@code invokeAll} or {@code
315   * invokeAny} which are pending serial execution, even the subset of the tasks that have not yet
316   * started execution. It is unclear from the {@code ExecutorService} specification if these should
317   * be included, and it's much easier to implement the interpretation that they not be. Finally, a
318   * call to {@code shutdown} or {@code shutdownNow} may result in concurrent calls to {@code
319   * invokeAll/invokeAny} throwing RejectedExecutionException, although a subset of the tasks may
320   * already have been executed.
321   *
322   * @since 18.0 (present as MoreExecutors.sameThreadExecutor() since 10.0)
323   */
324  @GwtIncompatible // TODO
325  public static ListeningExecutorService newDirectExecutorService() {
326    return new DirectExecutorService();
327  }
328
329  /**
330   * Returns an {@link Executor} that runs each task in the thread that invokes {@link
331   * Executor#execute execute}, as in {@code ThreadPoolExecutor.CallerRunsPolicy}.
332   *
333   * <p>This executor is appropriate for tasks that are lightweight and not deeply chained.
334   * Inappropriate {@code directExecutor} usage can cause problems, and these problems can be
335   * difficult to reproduce because they depend on timing. For example:
336   *
337   * <ul>
338   *   <li>When a {@code ListenableFuture} listener is registered to run under {@code
339   *       directExecutor}, the listener can execute in any of three possible threads:
340   *       <ol>
341   *         <li>When a thread attaches a listener to a {@code ListenableFuture} that's already
342   *             complete, the listener runs immediately in that thread.
343   *         <li>When a thread attaches a listener to a {@code ListenableFuture} that's
344   *             <em>in</em>complete and the {@code ListenableFuture} later completes normally, the
345   *             listener runs in the thread that completes the {@code ListenableFuture}.
346   *         <li>When a listener is attached to a {@code ListenableFuture} and the {@code
347   *             ListenableFuture} gets cancelled, the listener runs immediately in the thread that
348   *             cancelled the {@code Future}.
349   *       </ol>
350   *       Given all these possibilities, it is frequently possible for listeners to execute in UI
351   *       threads, RPC network threads, or other latency-sensitive threads. In those cases, slow
352   *       listeners can harm responsiveness, slow the system as a whole, or worse. (See also the
353   *       note about locking below.)
354   *   <li>If many tasks will be triggered by the same event, one heavyweight task may delay other
355   *       tasks -- even tasks that are not themselves {@code directExecutor} tasks.
356   *   <li>If many such tasks are chained together (such as with {@code
357   *       future.transform(...).transform(...).transform(...)....}), they may overflow the stack.
358   *       (In simple cases, callers can avoid this by registering all tasks with the same {@link
359   *       MoreExecutors#newSequentialExecutor} wrapper around {@code directExecutor()}. More
360   *       complex cases may require using thread pools or making deeper changes.)
361   *   <li>If an exception propagates out of a {@code Runnable}, it is not necessarily seen by any
362   *       {@code UncaughtExceptionHandler} for the thread. For example, if the callback passed to
363   *       {@link Futures#addCallback} throws an exception, that exception will be typically be
364   *       logged by the {@link ListenableFuture} implementation, even if the thread is configured
365   *       to do something different. In other cases, no code will catch the exception, and it may
366   *       terminate whichever thread happens to trigger the execution.
367   * </ul>
368   *
369   * A specific warning about locking: Code that executes user-supplied tasks, such as {@code
370   * ListenableFuture} listeners, should take care not to do so while holding a lock. Additionally,
371   * as a further line of defense, prefer not to perform any locking inside a task that will be run
372   * under {@code directExecutor}: Not only might the wait for a lock be long, but if the running
373   * thread was holding a lock, the listener may deadlock or break lock isolation.
374   *
375   * <p>This instance is equivalent to:
376   *
377   * <pre>{@code
378   * final class DirectExecutor implements Executor {
379   *   public void execute(Runnable r) {
380   *     r.run();
381   *   }
382   * }
383   * }</pre>
384   *
385   * <p>This should be preferred to {@link #newDirectExecutorService()} because implementing the
386   * {@link ExecutorService} subinterface necessitates significant performance overhead.
387   *
388   * @since 18.0
389   */
390  public static Executor directExecutor() {
391    return DirectExecutor.INSTANCE;
392  }
393
394  /**
395   * Returns an {@link Executor} that runs each task executed sequentially, such that no two tasks
396   * are running concurrently.
397   *
398   * <p>{@linkplain Executor#execute executed} tasks have a happens-before order as defined in the
399   * Java Language Specification. Tasks execute with the same happens-before order that the function
400   * calls to {@link Executor#execute execute()} that submitted those tasks had.
401   *
402   * <p>The executor uses {@code delegate} in order to {@link Executor#execute execute} each task in
403   * turn, and does not create any threads of its own.
404   *
405   * <p>After execution begins on a thread from the {@code delegate} {@link Executor}, tasks are
406   * polled and executed from a task queue until there are no more tasks. The thread will not be
407   * released until there are no more tasks to run.
408   *
409   * <p>If a task is submitted while a thread is executing tasks from the task queue, the thread
410   * will not be released until that submitted task is also complete.
411   *
412   * <p>If a task is {@linkplain Thread#interrupt interrupted} while a task is running:
413   *
414   * <ol>
415   *   <li>execution will not stop until the task queue is empty.
416   *   <li>tasks will begin execution with the thread marked as not interrupted - any interruption
417   *       applies only to the task that was running at the point of interruption.
418   *   <li>if the thread was interrupted before the SequentialExecutor's worker begins execution,
419   *       the interrupt will be restored to the thread after it completes so that its {@code
420   *       delegate} Executor may process the interrupt.
421   *   <li>subtasks are run with the thread uninterrupted and interrupts received during execution
422   *       of a task are ignored.
423   * </ol>
424   *
425   * <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
426   * If an {@code Error} is thrown, the error will propagate and execution will stop until the next
427   * time a task is submitted.
428   *
429   * <p>When an {@code Error} is thrown by an executed task, previously submitted tasks may never
430   * run. An attempt will be made to restart execution on the next call to {@code execute}. If the
431   * {@code delegate} has begun to reject execution, the previously submitted tasks may never run,
432   * despite not throwing a RejectedExecutionException synchronously with the call to {@code
433   * execute}. If this behaviour is problematic, use an Executor with a single thread (e.g. {@link
434   * Executors#newSingleThreadExecutor}).
435   *
436   * @since 23.3 (since 23.1 as {@code sequentialExecutor})
437   */
438  @GwtIncompatible
439  public static Executor newSequentialExecutor(Executor delegate) {
440    return new SequentialExecutor(delegate);
441  }
442
443  /**
444   * Creates an {@link ExecutorService} whose {@code submit} and {@code invokeAll} methods submit
445   * {@link ListenableFutureTask} instances to the given delegate executor. Those methods, as well
446   * as {@code execute} and {@code invokeAny}, are implemented in terms of calls to {@code
447   * delegate.execute}. All other methods are forwarded unchanged to the delegate. This implies that
448   * the returned {@code ListeningExecutorService} never calls the delegate's {@code submit}, {@code
449   * invokeAll}, and {@code invokeAny} methods, so any special handling of tasks must be implemented
450   * in the delegate's {@code execute} method or by wrapping the returned {@code
451   * ListeningExecutorService}.
452   *
453   * <p>If the delegate executor was already an instance of {@code ListeningExecutorService}, it is
454   * returned untouched, and the rest of this documentation does not apply.
455   *
456   * @since 10.0
457   */
458  @GwtIncompatible // TODO
459  public static ListeningExecutorService listeningDecorator(ExecutorService delegate) {
460    return (delegate instanceof ListeningExecutorService)
461        ? (ListeningExecutorService) delegate
462        : (delegate instanceof ScheduledExecutorService)
463            ? new ScheduledListeningDecorator((ScheduledExecutorService) delegate)
464            : new ListeningDecorator(delegate);
465  }
466
467  /**
468   * Creates a {@link ScheduledExecutorService} whose {@code submit} and {@code invokeAll} methods
469   * submit {@link ListenableFutureTask} instances to the given delegate executor. Those methods, as
470   * well as {@code execute} and {@code invokeAny}, are implemented in terms of calls to {@code
471   * delegate.execute}. All other methods are forwarded unchanged to the delegate. This implies that
472   * the returned {@code ListeningScheduledExecutorService} never calls the delegate's {@code
473   * submit}, {@code invokeAll}, and {@code invokeAny} methods, so any special handling of tasks
474   * must be implemented in the delegate's {@code execute} method or by wrapping the returned {@code
475   * ListeningScheduledExecutorService}.
476   *
477   * <p>If the delegate executor was already an instance of {@code
478   * ListeningScheduledExecutorService}, it is returned untouched, and the rest of this
479   * documentation does not apply.
480   *
481   * @since 10.0
482   */
483  @GwtIncompatible // TODO
484  public static ListeningScheduledExecutorService listeningDecorator(
485      ScheduledExecutorService delegate) {
486    return (delegate instanceof ListeningScheduledExecutorService)
487        ? (ListeningScheduledExecutorService) delegate
488        : new ScheduledListeningDecorator(delegate);
489  }
490
491  @GwtIncompatible // TODO
492  private static class ListeningDecorator extends AbstractListeningExecutorService {
493    private final ExecutorService delegate;
494
495    ListeningDecorator(ExecutorService delegate) {
496      this.delegate = checkNotNull(delegate);
497    }
498
499    @Override
500    public final boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
501      return delegate.awaitTermination(timeout, unit);
502    }
503
504    @Override
505    public final boolean isShutdown() {
506      return delegate.isShutdown();
507    }
508
509    @Override
510    public final boolean isTerminated() {
511      return delegate.isTerminated();
512    }
513
514    @Override
515    public final void shutdown() {
516      delegate.shutdown();
517    }
518
519    @Override
520    public final List<Runnable> shutdownNow() {
521      return delegate.shutdownNow();
522    }
523
524    @Override
525    public final void execute(Runnable command) {
526      delegate.execute(command);
527    }
528
529    @Override
530    public final String toString() {
531      return super.toString() + "[" + delegate + "]";
532    }
533  }
534
535  @GwtIncompatible // TODO
536  private static final class ScheduledListeningDecorator extends ListeningDecorator
537      implements ListeningScheduledExecutorService {
538    @SuppressWarnings("hiding")
539    final ScheduledExecutorService delegate;
540
541    ScheduledListeningDecorator(ScheduledExecutorService delegate) {
542      super(delegate);
543      this.delegate = checkNotNull(delegate);
544    }
545
546    @Override
547    public ListenableScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
548      TrustedListenableFutureTask<@Nullable Void> task =
549          TrustedListenableFutureTask.create(command, null);
550      ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
551      return new ListenableScheduledTask<@Nullable Void>(task, scheduled);
552    }
553
554    @Override
555    public <V extends @Nullable Object> ListenableScheduledFuture<V> schedule(
556        Callable<V> callable, long delay, TimeUnit unit) {
557      TrustedListenableFutureTask<V> task = TrustedListenableFutureTask.create(callable);
558      ScheduledFuture<?> scheduled = delegate.schedule(task, delay, unit);
559      return new ListenableScheduledTask<>(task, scheduled);
560    }
561
562    @Override
563    public ListenableScheduledFuture<?> scheduleAtFixedRate(
564        Runnable command, long initialDelay, long period, TimeUnit unit) {
565      NeverSuccessfulListenableFutureTask task = new NeverSuccessfulListenableFutureTask(command);
566      ScheduledFuture<?> scheduled = delegate.scheduleAtFixedRate(task, initialDelay, period, unit);
567      return new ListenableScheduledTask<@Nullable Void>(task, scheduled);
568    }
569
570    @Override
571    public ListenableScheduledFuture<?> scheduleWithFixedDelay(
572        Runnable command, long initialDelay, long delay, TimeUnit unit) {
573      NeverSuccessfulListenableFutureTask task = new NeverSuccessfulListenableFutureTask(command);
574      ScheduledFuture<?> scheduled =
575          delegate.scheduleWithFixedDelay(task, initialDelay, delay, unit);
576      return new ListenableScheduledTask<@Nullable Void>(task, scheduled);
577    }
578
579    private static final class ListenableScheduledTask<V extends @Nullable Object>
580        extends SimpleForwardingListenableFuture<V> implements ListenableScheduledFuture<V> {
581
582      private final ScheduledFuture<?> scheduledDelegate;
583
584      public ListenableScheduledTask(
585          ListenableFuture<V> listenableDelegate, ScheduledFuture<?> scheduledDelegate) {
586        super(listenableDelegate);
587        this.scheduledDelegate = scheduledDelegate;
588      }
589
590      @Override
591      public boolean cancel(boolean mayInterruptIfRunning) {
592        boolean cancelled = super.cancel(mayInterruptIfRunning);
593        if (cancelled) {
594          // Unless it is cancelled, the delegate may continue being scheduled
595          scheduledDelegate.cancel(mayInterruptIfRunning);
596
597          // TODO(user): Cancel "this" if "scheduledDelegate" is cancelled.
598        }
599        return cancelled;
600      }
601
602      @Override
603      public long getDelay(TimeUnit unit) {
604        return scheduledDelegate.getDelay(unit);
605      }
606
607      @Override
608      public int compareTo(Delayed other) {
609        return scheduledDelegate.compareTo(other);
610      }
611    }
612
613    @GwtIncompatible // TODO
614    private static final class NeverSuccessfulListenableFutureTask
615        extends AbstractFuture.TrustedFuture<@Nullable Void> implements Runnable {
616      private final Runnable delegate;
617
618      public NeverSuccessfulListenableFutureTask(Runnable delegate) {
619        this.delegate = checkNotNull(delegate);
620      }
621
622      @Override
623      public void run() {
624        try {
625          delegate.run();
626        } catch (Throwable t) {
627          // Any Exception is either a RuntimeException or sneaky checked exception.
628          setException(t);
629          throw t;
630        }
631      }
632
633      @Override
634      protected String pendingToString() {
635        return "task=[" + delegate + "]";
636      }
637    }
638  }
639
640  /*
641   * This following method is a modified version of one found in
642   * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/test/tck/AbstractExecutorServiceTest.java?revision=1.30
643   * which contained the following notice:
644   *
645   * Written by Doug Lea with assistance from members of JCP JSR-166 Expert Group and released to
646   * the public domain, as explained at http://creativecommons.org/publicdomain/zero/1.0/
647   *
648   * Other contributors include Andrew Wright, Jeffrey Hayes, Pat Fisher, Mike Judd.
649   */
650
651  /**
652   * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
653   * implementations.
654   */
655  @J2ktIncompatible
656  @GwtIncompatible
657  @ParametricNullness
658  static <T extends @Nullable Object> T invokeAnyImpl(
659      ListeningExecutorService executorService,
660      Collection<? extends Callable<T>> tasks,
661      boolean timed,
662      Duration timeout)
663      throws InterruptedException, ExecutionException, TimeoutException {
664    return invokeAnyImpl(
665        executorService, tasks, timed, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
666  }
667
668  /**
669   * An implementation of {@link ExecutorService#invokeAny} for {@link ListeningExecutorService}
670   * implementations.
671   */
672  @SuppressWarnings({
673    "GoodTime", // should accept a java.time.Duration
674    "CatchingUnchecked", // sneaky checked exception
675    "Interruption", // We copy AbstractExecutorService.invokeAny. Maybe we shouldn't: b/227335009.
676  })
677  @J2ktIncompatible
678  @GwtIncompatible
679  @ParametricNullness
680  static <T extends @Nullable Object> T invokeAnyImpl(
681      ListeningExecutorService executorService,
682      Collection<? extends Callable<T>> tasks,
683      boolean timed,
684      long timeout,
685      TimeUnit unit)
686      throws InterruptedException, ExecutionException, TimeoutException {
687    checkNotNull(executorService);
688    checkNotNull(unit);
689    int ntasks = tasks.size();
690    checkArgument(ntasks > 0);
691    List<Future<T>> futures = Lists.newArrayListWithCapacity(ntasks);
692    BlockingQueue<Future<T>> futureQueue = Queues.newLinkedBlockingQueue();
693    long timeoutNanos = unit.toNanos(timeout);
694
695    // For efficiency, especially in executors with limited
696    // parallelism, check to see if previously submitted tasks are
697    // done before submitting more of them. This interleaving
698    // plus the exception mechanics account for messiness of main
699    // loop.
700
701    try {
702      // Record exceptions so that if we fail to obtain any
703      // result, we can throw the last exception we got.
704      ExecutionException ee = null;
705      long lastTime = timed ? System.nanoTime() : 0;
706      Iterator<? extends Callable<T>> it = tasks.iterator();
707
708      futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
709      --ntasks;
710      int active = 1;
711
712      while (true) {
713        Future<T> f = futureQueue.poll();
714        if (f == null) {
715          if (ntasks > 0) {
716            --ntasks;
717            futures.add(submitAndAddQueueListener(executorService, it.next(), futureQueue));
718            ++active;
719          } else if (active == 0) {
720            break;
721          } else if (timed) {
722            f = futureQueue.poll(timeoutNanos, TimeUnit.NANOSECONDS);
723            if (f == null) {
724              throw new TimeoutException();
725            }
726            long now = System.nanoTime();
727            timeoutNanos -= now - lastTime;
728            lastTime = now;
729          } else {
730            f = futureQueue.take();
731          }
732        }
733        if (f != null) {
734          --active;
735          try {
736            return f.get();
737          } catch (ExecutionException eex) {
738            ee = eex;
739          } catch (InterruptedException iex) {
740            throw iex;
741          } catch (Exception rex) { // sneaky checked exception
742            ee = new ExecutionException(rex);
743          }
744        }
745      }
746
747      if (ee == null) {
748        ee = new ExecutionException(null);
749      }
750      throw ee;
751    } finally {
752      for (Future<T> f : futures) {
753        f.cancel(true);
754      }
755    }
756  }
757
758  /**
759   * Submits the task and adds a listener that adds the future to {@code queue} when it completes.
760   */
761  @J2ktIncompatible
762  @GwtIncompatible // TODO
763  private static <T extends @Nullable Object> ListenableFuture<T> submitAndAddQueueListener(
764      ListeningExecutorService executorService, Callable<T> task, BlockingQueue<Future<T>> queue) {
765    ListenableFuture<T> future = executorService.submit(task);
766    future.addListener(() -> queue.add(future), directExecutor());
767    return future;
768  }
769
770  /**
771   * Returns a default thread factory used to create new threads.
772   *
773   * <p>When running on AppEngine with access to <a
774   * href="https://cloud.google.com/appengine/docs/standard/java/javadoc/">AppEngine legacy
775   * APIs</a>, this method returns {@code ThreadManager.currentRequestThreadFactory()}. Otherwise,
776   * it returns {@link Executors#defaultThreadFactory()}.
777   *
778   * @since 14.0
779   */
780  @J2ktIncompatible
781  @GwtIncompatible // concurrency
782  public static ThreadFactory platformThreadFactory() {
783    if (!isAppEngineWithApiClasses()) {
784      return Executors.defaultThreadFactory();
785    }
786    try {
787      return (ThreadFactory)
788          Class.forName("com.google.appengine.api.ThreadManager")
789              .getMethod("currentRequestThreadFactory")
790              .invoke(null);
791    } catch (IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
792      throw new RuntimeException("Couldn't invoke ThreadManager.currentRequestThreadFactory", e);
793    } catch (InvocationTargetException e) {
794      // `currentRequestThreadFactory` has no `throws` clause.
795      throw sneakyThrow(e.getCause());
796    }
797  }
798
799  @J2ktIncompatible
800  @GwtIncompatible // TODO
801  private static boolean isAppEngineWithApiClasses() {
802    if (System.getProperty("com.google.appengine.runtime.environment") == null) {
803      return false;
804    }
805    try {
806      Class.forName("com.google.appengine.api.utils.SystemProperty");
807    } catch (ClassNotFoundException e) {
808      return false;
809    }
810    try {
811      // If the current environment is null, we're not inside AppEngine.
812      return Class.forName("com.google.apphosting.api.ApiProxy")
813              .getMethod("getCurrentEnvironment")
814              .invoke(null)
815          != null;
816    } catch (ClassNotFoundException e) {
817      // If ApiProxy doesn't exist, we're not on AppEngine at all.
818      return false;
819    } catch (InvocationTargetException e) {
820      // If ApiProxy throws an exception, we're not in a proper AppEngine environment.
821      return false;
822    } catch (IllegalAccessException e) {
823      // If the method isn't accessible, we're not on a supported version of AppEngine;
824      return false;
825    } catch (NoSuchMethodException e) {
826      // If the method doesn't exist, we're not on a supported version of AppEngine;
827      return false;
828    }
829  }
830
831  /**
832   * Creates a thread using {@link #platformThreadFactory}, and sets its name to {@code name} unless
833   * changing the name is forbidden by the security manager.
834   */
835  @J2ktIncompatible
836  @GwtIncompatible // concurrency
837  static Thread newThread(String name, Runnable runnable) {
838    checkNotNull(name);
839    checkNotNull(runnable);
840    // TODO(b/139726489): Confirm that null is impossible here.
841    Thread result = requireNonNull(platformThreadFactory().newThread(runnable));
842    try {
843      result.setName(name);
844    } catch (SecurityException e) {
845      // OK if we can't set the name in this environment.
846    }
847    return result;
848  }
849
850  // TODO(lukes): provide overloads for ListeningExecutorService? ListeningScheduledExecutorService?
851  // TODO(lukes): provide overloads that take constant strings? Function<Runnable, String>s to
852  // calculate names?
853
854  /**
855   * Creates an {@link Executor} that renames the {@link Thread threads} that its tasks run in.
856   *
857   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
858   * right before each task is run. The renaming is best effort, if a {@link SecurityManager}
859   * prevents the renaming then it will be skipped but the tasks will still execute.
860   *
861   * @param executor The executor to decorate
862   * @param nameSupplier The source of names for each task
863   */
864  @J2ktIncompatible
865  @GwtIncompatible // concurrency
866  static Executor renamingDecorator(Executor executor, Supplier<String> nameSupplier) {
867    checkNotNull(executor);
868    checkNotNull(nameSupplier);
869    return command -> executor.execute(threadRenaming(command, nameSupplier));
870  }
871
872  /**
873   * Creates an {@link ExecutorService} that renames the {@link Thread threads} that its tasks run
874   * in.
875   *
876   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
877   * right before each task is run. The renaming is best effort, if a {@link SecurityManager}
878   * prevents the renaming then it will be skipped but the tasks will still execute.
879   *
880   * @param service The executor to decorate
881   * @param nameSupplier The source of names for each task
882   */
883  @J2ktIncompatible
884  @GwtIncompatible // concurrency
885  static ExecutorService renamingDecorator(ExecutorService service, Supplier<String> nameSupplier) {
886    checkNotNull(service);
887    checkNotNull(nameSupplier);
888    return new WrappingExecutorService(service) {
889      @Override
890      protected <T extends @Nullable Object> Callable<T> wrapTask(Callable<T> callable) {
891        return threadRenaming(callable, nameSupplier);
892      }
893
894      @Override
895      protected Runnable wrapTask(Runnable command) {
896        return threadRenaming(command, nameSupplier);
897      }
898    };
899  }
900
901  /**
902   * Creates a {@link ScheduledExecutorService} that renames the {@link Thread threads} that its
903   * tasks run in.
904   *
905   * <p>The names are retrieved from the {@code nameSupplier} on the thread that is being renamed
906   * right before each task is run. The renaming is best effort, if a {@link SecurityManager}
907   * prevents the renaming then it will be skipped but the tasks will still execute.
908   *
909   * @param service The executor to decorate
910   * @param nameSupplier The source of names for each task
911   */
912  @J2ktIncompatible
913  @GwtIncompatible // concurrency
914  static ScheduledExecutorService renamingDecorator(
915      ScheduledExecutorService service, Supplier<String> nameSupplier) {
916    checkNotNull(service);
917    checkNotNull(nameSupplier);
918    return new WrappingScheduledExecutorService(service) {
919      @Override
920      protected <T extends @Nullable Object> Callable<T> wrapTask(Callable<T> callable) {
921        return threadRenaming(callable, nameSupplier);
922      }
923
924      @Override
925      protected Runnable wrapTask(Runnable command) {
926        return threadRenaming(command, nameSupplier);
927      }
928    };
929  }
930
931  /**
932   * Shuts down the given executor service gradually, first disabling new submissions and later, if
933   * necessary, cancelling remaining tasks.
934   *
935   * <p>The method takes the following steps:
936   *
937   * <ol>
938   *   <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of new submitted tasks.
939   *   <li>awaits executor service termination for half of the specified timeout.
940   *   <li>if the timeout expires, it calls {@link ExecutorService#shutdownNow()}, cancelling
941   *       pending tasks and interrupting running tasks.
942   *   <li>awaits executor service termination for the other half of the specified timeout.
943   * </ol>
944   *
945   * <p>If, at any step of the process, the calling thread is interrupted, the method calls {@link
946   * ExecutorService#shutdownNow()} and returns.
947   *
948   * <p>For a version of this method that waits <i>indefinitely</i>, use {@link
949   * ExecutorService#close}.
950   *
951   * @param service the {@code ExecutorService} to shut down
952   * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate
953   * @return {@code true} if the {@code ExecutorService} was terminated successfully, {@code false}
954   *     if the call timed out or was interrupted
955   * @since 28.0 (but only since 33.4.0 in the Android flavor)
956   */
957  @CanIgnoreReturnValue
958  @J2ktIncompatible
959  @GwtIncompatible // java.time.Duration
960  public static boolean shutdownAndAwaitTermination(ExecutorService service, Duration timeout) {
961    return shutdownAndAwaitTermination(service, toNanosSaturated(timeout), TimeUnit.NANOSECONDS);
962  }
963
964  /**
965   * Shuts down the given executor service gradually, first disabling new submissions and later, if
966   * necessary, cancelling remaining tasks.
967   *
968   * <p>The method takes the following steps:
969   *
970   * <ol>
971   *   <li>calls {@link ExecutorService#shutdown()}, disabling acceptance of new submitted tasks.
972   *   <li>awaits executor service termination for half of the specified timeout.
973   *   <li>if the timeout expires, it calls {@link ExecutorService#shutdownNow()}, cancelling
974   *       pending tasks and interrupting running tasks.
975   *   <li>awaits executor service termination for the other half of the specified timeout.
976   * </ol>
977   *
978   * <p>If, at any step of the process, the calling thread is interrupted, the method calls {@link
979   * ExecutorService#shutdownNow()} and returns.
980   *
981   * <p>For a version of this method that waits <i>indefinitely</i>, use {@link
982   * ExecutorService#close}.
983   *
984   * @param service the {@code ExecutorService} to shut down
985   * @param timeout the maximum time to wait for the {@code ExecutorService} to terminate
986   * @param unit the time unit of the timeout argument
987   * @return {@code true} if the {@code ExecutorService} was terminated successfully, {@code false}
988   *     if the call timed out or was interrupted
989   * @since 17.0
990   */
991  @CanIgnoreReturnValue
992  @J2ktIncompatible
993  @GwtIncompatible // concurrency
994  @SuppressWarnings("GoodTime") // should accept a java.time.Duration
995  public static boolean shutdownAndAwaitTermination(
996      ExecutorService service, long timeout, TimeUnit unit) {
997    long halfTimeoutNanos = unit.toNanos(timeout) / 2;
998    // Disable new tasks from being submitted
999    service.shutdown();
1000    try {
1001      // Wait for half the duration of the timeout for existing tasks to terminate
1002      if (!service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS)) {
1003        // Cancel currently executing tasks
1004        service.shutdownNow();
1005        // Wait the other half of the timeout for tasks to respond to being cancelled
1006        service.awaitTermination(halfTimeoutNanos, TimeUnit.NANOSECONDS);
1007      }
1008    } catch (InterruptedException ie) {
1009      // Preserve interrupt status
1010      Thread.currentThread().interrupt();
1011      // (Re-)Cancel if current thread also interrupted
1012      service.shutdownNow();
1013    }
1014    return service.isTerminated();
1015  }
1016
1017  /**
1018   * Returns an Executor that will propagate {@link RejectedExecutionException} from the delegate
1019   * executor to the given {@code future}.
1020   *
1021   * <p>Note, the returned executor can only be used once.
1022   */
1023  static Executor rejectionPropagatingExecutor(Executor delegate, AbstractFuture<?> future) {
1024    checkNotNull(delegate);
1025    checkNotNull(future);
1026    if (delegate == directExecutor()) {
1027      // directExecutor() cannot throw RejectedExecutionException
1028      return delegate;
1029    }
1030    return command -> {
1031      try {
1032        delegate.execute(command);
1033      } catch (RejectedExecutionException e) {
1034        future.setException(e);
1035      }
1036    };
1037  }
1038}