001/*
002 * This file is a modified version of
003 * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java?revision=1.35
004 * which contained the following notice:
005 *
006 * Written by Doug Lea with assistance from members of JCP JSR-166 Expert Group and released to the
007 * public domain, as explained at http://creativecommons.org/publicdomain/zero/1.0/
008 *
009 * Rationale for copying:
010 * Guava targets JDK5, whose AbstractExecutorService class lacks the newTaskFor protected
011 * customization methods needed by MoreExecutors.listeningDecorator. This class is a copy of
012 * AbstractExecutorService from the JSR166 CVS repository. It contains the desired methods.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.util.concurrent.MoreExecutors.invokeAnyImpl;
018
019import com.google.common.annotations.Beta;
020
021import java.util.ArrayList;
022import java.util.Collection;
023import java.util.Iterator;
024import java.util.List;
025import java.util.concurrent.Callable;
026import java.util.concurrent.CancellationException;
027import java.util.concurrent.ExecutionException;
028import java.util.concurrent.Future;
029import java.util.concurrent.TimeUnit;
030import java.util.concurrent.TimeoutException;
031
032import javax.annotation.Nullable;
033
034/**
035 * Implements {@link ListeningExecutorService} execution methods atop the abstract {@link #execute}
036 * method. More concretely, the {@code submit}, {@code invokeAny} and {@code invokeAll} methods
037 * create {@link ListenableFutureTask} instances and pass them to {@link #execute}.
038 *
039 * <p>In addition to {@link #execute}, subclasses must implement all methods related to shutdown and
040 * termination.
041 *
042 * @author Doug Lea
043 * @since 14.0
044 */
045@Beta
046public abstract class AbstractListeningExecutorService implements ListeningExecutorService {
047  @Override public ListenableFuture<?> submit(Runnable task) {
048    ListenableFutureTask<Void> ftask = ListenableFutureTask.create(task, null);
049    execute(ftask);
050    return ftask;
051  }
052
053  @Override public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result) {
054    ListenableFutureTask<T> ftask = ListenableFutureTask.create(task, result);
055    execute(ftask);
056    return ftask;
057  }
058
059  @Override public <T> ListenableFuture<T> submit(Callable<T> task) {
060    ListenableFutureTask<T> ftask = ListenableFutureTask.create(task);
061    execute(ftask);
062    return ftask;
063  }
064
065  @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
066      throws InterruptedException, ExecutionException {
067    try {
068      return invokeAnyImpl(this, tasks, false, 0);
069    } catch (TimeoutException cannotHappen) {
070      throw new AssertionError();
071    }
072  }
073
074  @Override public <T> T invokeAny(
075      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
076      throws InterruptedException, ExecutionException, TimeoutException {
077    return invokeAnyImpl(this, tasks, true, unit.toNanos(timeout));
078  }
079
080  @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
081      throws InterruptedException {
082    if (tasks == null) {
083      throw new NullPointerException();
084    }
085    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
086    boolean done = false;
087    try {
088      for (Callable<T> t : tasks) {
089        ListenableFutureTask<T> f = ListenableFutureTask.create(t);
090        futures.add(f);
091        execute(f);
092      }
093      for (Future<T> f : futures) {
094        if (!f.isDone()) {
095          try {
096            f.get();
097          } catch (CancellationException ignore) {
098          } catch (ExecutionException ignore) {
099          }
100        }
101      }
102      done = true;
103      return futures;
104    } finally {
105      if (!done) {
106        for (Future<T> f : futures) {
107          f.cancel(true);
108        }
109      }
110    }
111  }
112
113  @Override public <T> List<Future<T>> invokeAll(
114      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
115      throws InterruptedException {
116    if (tasks == null || unit == null) {
117      throw new NullPointerException();
118    }
119    long nanos = unit.toNanos(timeout);
120    List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
121    boolean done = false;
122    try {
123      for (Callable<T> t : tasks) {
124        futures.add(ListenableFutureTask.create(t));
125      }
126
127      long lastTime = System.nanoTime();
128
129      // Interleave time checks and calls to execute in case
130      // executor doesn't have any/much parallelism.
131      Iterator<Future<T>> it = futures.iterator();
132      while (it.hasNext()) {
133        execute((Runnable) (it.next()));
134        long now = System.nanoTime();
135        nanos -= now - lastTime;
136        lastTime = now;
137        if (nanos <= 0) {
138          return futures;
139        }
140      }
141
142      for (Future<T> f : futures) {
143        if (!f.isDone()) {
144          if (nanos <= 0) {
145            return futures;
146          }
147          try {
148            f.get(nanos, TimeUnit.NANOSECONDS);
149          } catch (CancellationException ignore) {
150          } catch (ExecutionException ignore) {
151          } catch (TimeoutException toe) {
152            return futures;
153          }
154          long now = System.nanoTime();
155          nanos -= now - lastTime;
156          lastTime = now;
157        }
158      }
159      done = true;
160      return futures;
161    } finally {
162      if (!done) {
163        for (Future<T> f : futures) {
164          f.cancel(true);
165        }
166      }
167    }
168  }
169}