001    /*
002     * Copyright (C) 2009 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    
021    import com.google.common.annotations.Beta;
022    
023    import java.util.concurrent.Executor;
024    import java.util.concurrent.Executors;
025    import java.util.concurrent.Future;
026    import java.util.concurrent.ThreadFactory;
027    import java.util.concurrent.atomic.AtomicBoolean;
028    
029    /**
030     * Utilities necessary for working with libraries that supply plain {@link
031     * Future} instances. Note that, whenver possible, it is strongly preferred to
032     * modify those libraries to return {@code ListenableFuture} directly.
033     *
034     * @author Sven Mawson
035     * @since 10.0 (replacing {@code Futures.makeListenable}, which
036     *     existed in 1.0)
037     */
038    @Beta
039    public final class JdkFutureAdapters {
040      /**
041       * Assigns a thread to the given {@link Future} to provide {@link
042       * ListenableFuture} functionality.
043       *
044       * <p><b>Warning:</b> If the input future does not already implement {@code
045       * ListenableFuture}, the returned future will emulate {@link
046       * ListenableFuture#addListener} by taking a thread from an internal,
047       * unbounded pool at the first call to {@code addListener} and holding it
048       * until the future is {@linkplain Future#isDone() done}.
049       *
050       * <p>Prefer to create {@code ListenableFuture} instances with {@link
051       * SettableFuture}, {@link MoreExecutors#listeningDecorator(
052       * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
053       * {@link AbstractFuture}, and other utilities over creating plain {@code
054       * Future} instances to be upgraded to {@code ListenableFuture} after the
055       * fact.
056       */
057      public static <V> ListenableFuture<V> listenInPoolThread(
058          Future<V> future) {
059        if (future instanceof ListenableFuture) {
060          return (ListenableFuture<V>) future;
061        }
062        return new ListenableFutureAdapter<V>(future);
063      }
064    
065      /**
066       * Submits a blocking task for the given {@link Future} to provide {@link
067       * ListenableFuture} functionality.
068       *
069       * <p><b>Warning:</b> If the input future does not already implement {@code
070       * ListenableFuture}, the returned future will emulate {@link
071       * ListenableFuture#addListener} by submitting a task to the given executor at
072       * at the first call to {@code addListener}. The task must be started by the
073       * executor promptly, or else the returned {@code ListenableFuture} may fail
074       * to work.  The task's execution consists of blocking until the input future
075       * is {@linkplain Future#isDone() done}, so each call to this method may
076       * claim and hold a thread for an arbitrary length of time. Use of bounded
077       * executors or other executors that may fail to execute a task promptly may
078       * result in deadlocks.
079       *
080       * <p>Prefer to create {@code ListenableFuture} instances with {@link
081       * SettableFuture}, {@link MoreExecutors#listeningDecorator(
082       * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
083       * {@link AbstractFuture}, and other utilities over creating plain {@code
084       * Future} instances to be upgraded to {@code ListenableFuture} after the
085       * fact.
086       *
087       * @since 12.0
088       */
089      public static <V> ListenableFuture<V> listenInPoolThread(
090          Future<V> future, Executor executor) {
091        checkNotNull(executor);
092        if (future instanceof ListenableFuture) {
093          return (ListenableFuture<V>) future;
094        }
095        return new ListenableFutureAdapter<V>(future, executor);
096      }
097    
098      /**
099       * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
100       * will wait on the future to finish, and when it completes, run the
101       * listeners.  This implementation will wait on the source future
102       * indefinitely, so if the source future never completes, the adapter will
103       * never complete either.
104       *
105       * <p>If the delegate future is interrupted or throws an unexpected unchecked
106       * exception, the listeners will not be invoked.
107       */
108      private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
109          implements ListenableFuture<V> {
110    
111        private static final ThreadFactory threadFactory =
112            new ThreadFactoryBuilder()
113                .setDaemon(true)
114                .setNameFormat("ListenableFutureAdapter-thread-%d")
115                .build();
116        private static final Executor defaultAdapterExecutor =
117            Executors.newCachedThreadPool(threadFactory);
118    
119        private final Executor adapterExecutor;
120    
121        // The execution list to hold our listeners.
122        private final ExecutionList executionList = new ExecutionList();
123    
124        // This allows us to only start up a thread waiting on the delegate future
125        // when the first listener is added.
126        private final AtomicBoolean hasListeners = new AtomicBoolean(false);
127    
128        // The delegate future.
129        private final Future<V> delegate;
130    
131        ListenableFutureAdapter(Future<V> delegate) {
132          this(delegate, defaultAdapterExecutor);
133        }
134    
135        ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
136          this.delegate = checkNotNull(delegate);
137          this.adapterExecutor = checkNotNull(adapterExecutor);
138        }
139    
140        @Override
141        protected Future<V> delegate() {
142          return delegate;
143        }
144    
145        @Override
146        public void addListener(Runnable listener, Executor exec) {
147          executionList.add(listener, exec);
148    
149          // When a listener is first added, we run a task that will wait for
150          // the delegate to finish, and when it is done will run the listeners.
151          if (hasListeners.compareAndSet(false, true)) {
152            if (delegate.isDone()) {
153              // If the delegate is already done, run the execution list
154              // immediately on the current thread.
155              executionList.execute();
156              return;
157            }
158    
159            adapterExecutor.execute(new Runnable() {
160              @Override
161              public void run() {
162                try {
163                  delegate.get();
164                } catch (Error e) {
165                  throw e;
166                } catch (InterruptedException e) {
167                  Thread.currentThread().interrupt();
168                  // Threads from our private pool are never interrupted.
169                  throw new AssertionError(e);
170                } catch (Throwable e) {
171                  // ExecutionException / CancellationException / RuntimeException
172                  // The task is done, run the listeners.
173                }
174                executionList.execute();
175              }
176            });
177          }
178        }
179      }
180    
181      private JdkFutureAdapters() {}
182    }