001/* 002 * This file is a modified version of 003 * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/main/java/util/concurrent/AbstractExecutorService.java?revision=1.35 004 * which contained the following notice: 005 * 006 * Written by Doug Lea with assistance from members of JCP JSR-166 Expert Group and released to the 007 * public domain, as explained at http://creativecommons.org/publicdomain/zero/1.0/ 008 * 009 * Rationale for copying: 010 * Guava targets JDK5, whose AbstractExecutorService class lacks the newTaskFor protected 011 * customization methods needed by MoreExecutors.listeningDecorator. This class is a copy of 012 * AbstractExecutorService from the JSR166 CVS repository. It contains the desired methods. 013 */ 014 015package com.google.common.util.concurrent; 016 017import static com.google.common.util.concurrent.MoreExecutors.invokeAnyImpl; 018 019import com.google.common.annotations.Beta; 020 021import java.util.ArrayList; 022import java.util.Collection; 023import java.util.Iterator; 024import java.util.List; 025import java.util.concurrent.Callable; 026import java.util.concurrent.CancellationException; 027import java.util.concurrent.ExecutionException; 028import java.util.concurrent.Future; 029import java.util.concurrent.TimeUnit; 030import java.util.concurrent.TimeoutException; 031 032import javax.annotation.Nullable; 033 034/** 035 * Implements {@link ListeningExecutorService} execution methods atop the abstract {@link #execute} 036 * method. More concretely, the {@code submit}, {@code invokeAny} and {@code invokeAll} methods 037 * create {@link ListenableFutureTask} instances and pass them to {@link #execute}. 038 * 039 * <p>In addition to {@link #execute}, subclasses must implement all methods related to shutdown and 040 * termination. 041 * 042 * @author Doug Lea 043 * @since 14.0 044 */ 045@Beta 046public abstract class AbstractListeningExecutorService implements ListeningExecutorService { 047 @Override public ListenableFuture<?> submit(Runnable task) { 048 ListenableFutureTask<Void> ftask = ListenableFutureTask.create(task, null); 049 execute(ftask); 050 return ftask; 051 } 052 053 @Override public <T> ListenableFuture<T> submit(Runnable task, @Nullable T result) { 054 ListenableFutureTask<T> ftask = ListenableFutureTask.create(task, result); 055 execute(ftask); 056 return ftask; 057 } 058 059 @Override public <T> ListenableFuture<T> submit(Callable<T> task) { 060 ListenableFutureTask<T> ftask = ListenableFutureTask.create(task); 061 execute(ftask); 062 return ftask; 063 } 064 065 @Override public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 066 throws InterruptedException, ExecutionException { 067 try { 068 return invokeAnyImpl(this, tasks, false, 0); 069 } catch (TimeoutException cannotHappen) { 070 throw new AssertionError(); 071 } 072 } 073 074 @Override public <T> T invokeAny( 075 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 076 throws InterruptedException, ExecutionException, TimeoutException { 077 return invokeAnyImpl(this, tasks, true, unit.toNanos(timeout)); 078 } 079 080 @Override public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 081 throws InterruptedException { 082 if (tasks == null) { 083 throw new NullPointerException(); 084 } 085 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 086 boolean done = false; 087 try { 088 for (Callable<T> t : tasks) { 089 ListenableFutureTask<T> f = ListenableFutureTask.create(t); 090 futures.add(f); 091 execute(f); 092 } 093 for (Future<T> f : futures) { 094 if (!f.isDone()) { 095 try { 096 f.get(); 097 } catch (CancellationException ignore) { 098 } catch (ExecutionException ignore) { 099 } 100 } 101 } 102 done = true; 103 return futures; 104 } finally { 105 if (!done) { 106 for (Future<T> f : futures) { 107 f.cancel(true); 108 } 109 } 110 } 111 } 112 113 @Override public <T> List<Future<T>> invokeAll( 114 Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) 115 throws InterruptedException { 116 if (tasks == null || unit == null) { 117 throw new NullPointerException(); 118 } 119 long nanos = unit.toNanos(timeout); 120 List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size()); 121 boolean done = false; 122 try { 123 for (Callable<T> t : tasks) { 124 futures.add(ListenableFutureTask.create(t)); 125 } 126 127 long lastTime = System.nanoTime(); 128 129 // Interleave time checks and calls to execute in case 130 // executor doesn't have any/much parallelism. 131 Iterator<Future<T>> it = futures.iterator(); 132 while (it.hasNext()) { 133 execute((Runnable) (it.next())); 134 long now = System.nanoTime(); 135 nanos -= now - lastTime; 136 lastTime = now; 137 if (nanos <= 0) { 138 return futures; 139 } 140 } 141 142 for (Future<T> f : futures) { 143 if (!f.isDone()) { 144 if (nanos <= 0) { 145 return futures; 146 } 147 try { 148 f.get(nanos, TimeUnit.NANOSECONDS); 149 } catch (CancellationException ignore) { 150 } catch (ExecutionException ignore) { 151 } catch (TimeoutException toe) { 152 return futures; 153 } 154 long now = System.nanoTime(); 155 nanos -= now - lastTime; 156 lastTime = now; 157 } 158 } 159 done = true; 160 return futures; 161 } finally { 162 if (!done) { 163 for (Future<T> f : futures) { 164 f.cancel(true); 165 } 166 } 167 } 168 } 169}