001 /* 002 * Copyright (C) 2009 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017 package com.google.common.util.concurrent; 018 019 import static com.google.common.base.Preconditions.checkNotNull; 020 021 import com.google.common.annotations.Beta; 022 023 import java.util.concurrent.Executor; 024 import java.util.concurrent.Executors; 025 import java.util.concurrent.Future; 026 import java.util.concurrent.ThreadFactory; 027 import java.util.concurrent.atomic.AtomicBoolean; 028 029 /** 030 * Utilities necessary for working with libraries that supply plain {@link 031 * Future} instances. Note that, whenver possible, it is strongly preferred to 032 * modify those libraries to return {@code ListenableFuture} directly. 033 * 034 * @author Sven Mawson 035 * @since 10.0 (replacing {@code Futures.makeListenable}, which 036 * existed in 1.0) 037 */ 038 @Beta 039 public final class JdkFutureAdapters { 040 /** 041 * Assigns a thread to the given {@link Future} to provide {@link 042 * ListenableFuture} functionality. 043 * 044 * <p><b>Warning:</b> If the input future does not already implement {@code 045 * ListenableFuture}, the returned future will emulate {@link 046 * ListenableFuture#addListener} by taking a thread from an internal, 047 * unbounded pool at the first call to {@code addListener} and holding it 048 * until the future is {@linkplain Future#isDone() done}. 049 * 050 * <p>Prefer to create {@code ListenableFuture} instances with {@link 051 * SettableFuture}, {@link MoreExecutors#listeningDecorator( 052 * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask}, 053 * {@link AbstractFuture}, and other utilities over creating plain {@code 054 * Future} instances to be upgraded to {@code ListenableFuture} after the 055 * fact. 056 */ 057 public static <V> ListenableFuture<V> listenInPoolThread( 058 Future<V> future) { 059 if (future instanceof ListenableFuture<?>) { 060 return (ListenableFuture<V>) future; 061 } 062 return new ListenableFutureAdapter<V>(future); 063 } 064 065 /** 066 * Submits a blocking task for the given {@link Future} to provide {@link 067 * ListenableFuture} functionality. 068 * 069 * <p><b>Warning:</b> If the input future does not already implement {@code 070 * ListenableFuture}, the returned future will emulate {@link 071 * ListenableFuture#addListener} by submitting a task to the given executor at 072 * at the first call to {@code addListener}. The task must be started by the 073 * executor promptly, or else the returned {@code ListenableFuture} may fail 074 * to work. The task's execution consists of blocking until the input future 075 * is {@linkplain Future#isDone() done}, so each call to this method may 076 * claim and hold a thread for an arbitrary length of time. Use of bounded 077 * executors or other executors that may fail to execute a task promptly may 078 * result in deadlocks. 079 * 080 * <p>Prefer to create {@code ListenableFuture} instances with {@link 081 * SettableFuture}, {@link MoreExecutors#listeningDecorator( 082 * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask}, 083 * {@link AbstractFuture}, and other utilities over creating plain {@code 084 * Future} instances to be upgraded to {@code ListenableFuture} after the 085 * fact. 086 * 087 * @since 12.0 088 */ 089 public static <V> ListenableFuture<V> listenInPoolThread( 090 Future<V> future, Executor executor) { 091 checkNotNull(executor); 092 if (future instanceof ListenableFuture<?>) { 093 return (ListenableFuture<V>) future; 094 } 095 return new ListenableFutureAdapter<V>(future, executor); 096 } 097 098 /** 099 * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This 100 * will wait on the future to finish, and when it completes, run the 101 * listeners. This implementation will wait on the source future 102 * indefinitely, so if the source future never completes, the adapter will 103 * never complete either. 104 * 105 * <p>If the delegate future is interrupted or throws an unexpected unchecked 106 * exception, the listeners will not be invoked. 107 */ 108 private static class ListenableFutureAdapter<V> extends ForwardingFuture<V> 109 implements ListenableFuture<V> { 110 111 private static final ThreadFactory threadFactory = 112 new ThreadFactoryBuilder() 113 .setDaemon(true) 114 .setNameFormat("ListenableFutureAdapter-thread-%d") 115 .build(); 116 private static final Executor defaultAdapterExecutor = 117 Executors.newCachedThreadPool(threadFactory); 118 119 private final Executor adapterExecutor; 120 121 // The execution list to hold our listeners. 122 private final ExecutionList executionList = new ExecutionList(); 123 124 // This allows us to only start up a thread waiting on the delegate future 125 // when the first listener is added. 126 private final AtomicBoolean hasListeners = new AtomicBoolean(false); 127 128 // The delegate future. 129 private final Future<V> delegate; 130 131 ListenableFutureAdapter(Future<V> delegate) { 132 this(delegate, defaultAdapterExecutor); 133 } 134 135 ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) { 136 this.delegate = checkNotNull(delegate); 137 this.adapterExecutor = checkNotNull(adapterExecutor); 138 } 139 140 @Override 141 protected Future<V> delegate() { 142 return delegate; 143 } 144 145 @Override 146 public void addListener(Runnable listener, Executor exec) { 147 executionList.add(listener, exec); 148 149 // When a listener is first added, we run a task that will wait for 150 // the delegate to finish, and when it is done will run the listeners. 151 if (hasListeners.compareAndSet(false, true)) { 152 if (delegate.isDone()) { 153 // If the delegate is already done, run the execution list 154 // immediately on the current thread. 155 executionList.execute(); 156 return; 157 } 158 159 adapterExecutor.execute(new Runnable() { 160 @Override 161 public void run() { 162 try { 163 delegate.get(); 164 } catch (Error e) { 165 throw e; 166 } catch (InterruptedException e) { 167 Thread.currentThread().interrupt(); 168 // Threads from our private pool are never interrupted. 169 throw new AssertionError(e); 170 } catch (Throwable e) { 171 // ExecutionException / CancellationException / RuntimeException 172 // The task is done, run the listeners. 173 } 174 executionList.execute(); 175 } 176 }); 177 } 178 } 179 } 180 181 private JdkFutureAdapters() {} 182 }