001    /*
002     * Copyright (C) 2009 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    
021    import com.google.common.annotations.Beta;
022    import com.google.common.annotations.VisibleForTesting;
023    
024    import java.util.concurrent.Executor;
025    import java.util.concurrent.Executors;
026    import java.util.concurrent.Future;
027    import java.util.concurrent.ThreadFactory;
028    import java.util.concurrent.atomic.AtomicBoolean;
029    
030    /**
031     * Utilities necessary for working with libraries that supply plain {@link
032     * Future} instances. Note that, whenver possible, it is strongly preferred to
033     * modify those libraries to return {@code ListenableFuture} directly.
034     *
035     * @author Sven Mawson
036     * @since 10.0 (replacing {@code Futures.makeListenable}, which
037     *     existed in 1.0)
038     */
039    @Beta
040    public final class JdkFutureAdapters {
041      /**
042       * Assigns a thread to the given {@link Future} to provide {@link
043       * ListenableFuture} functionality.
044       *
045       * <p><b>Warning:</b> If the input future does not already implement {@link
046       * ListenableFuture}, the returned future will emulate {@link
047       * ListenableFuture#addListener} by taking a thread from an internal,
048       * unbounded pool at the first call to {@code addListener} and holding it
049       * until the future is {@linkplain Future#isDone() done}.
050       *
051       * <p>Prefer to create {@code ListenableFuture} instances with {@link
052       * SettableFuture}, {@link MoreExecutors#listeningDecorator(
053       * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
054       * {@link AbstractFuture}, and other utilities over creating plain {@code
055       * Future} instances to be upgraded to {@code ListenableFuture} after the
056       * fact.
057       */
058      public static <V> ListenableFuture<V> listenInPoolThread(
059          Future<V> future) {
060        if (future instanceof ListenableFuture<?>) {
061          return (ListenableFuture<V>) future;
062        }
063        return new ListenableFutureAdapter<V>(future);
064      }
065    
066      @VisibleForTesting
067      static <V> ListenableFuture<V> listenInPoolThread(
068          Future<V> future, Executor executor) {
069        checkNotNull(executor);
070        if (future instanceof ListenableFuture<?>) {
071          return (ListenableFuture<V>) future;
072        }
073        return new ListenableFutureAdapter<V>(future, executor);
074      }
075    
076      /**
077       * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
078       * will wait on the future to finish, and when it completes, run the
079       * listeners.  This implementation will wait on the source future
080       * indefinitely, so if the source future never completes, the adapter will
081       * never complete either.
082       *
083       * <p>If the delegate future is interrupted or throws an unexpected unchecked
084       * exception, the listeners will not be invoked.
085       */
086      private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
087          implements ListenableFuture<V> {
088    
089        private static final ThreadFactory threadFactory =
090            new ThreadFactoryBuilder()
091                .setDaemon(true)
092                .setNameFormat("ListenableFutureAdapter-thread-%d")
093                .build();
094        private static final Executor defaultAdapterExecutor =
095            Executors.newCachedThreadPool(threadFactory);
096    
097        private final Executor adapterExecutor;
098    
099        // The execution list to hold our listeners.
100        private final ExecutionList executionList = new ExecutionList();
101    
102        // This allows us to only start up a thread waiting on the delegate future
103        // when the first listener is added.
104        private final AtomicBoolean hasListeners = new AtomicBoolean(false);
105    
106        // The delegate future.
107        private final Future<V> delegate;
108    
109        ListenableFutureAdapter(Future<V> delegate) {
110          this(delegate, defaultAdapterExecutor);
111        }
112    
113        ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
114          this.delegate = checkNotNull(delegate);
115          this.adapterExecutor = checkNotNull(adapterExecutor);
116        }
117    
118        @Override
119        protected Future<V> delegate() {
120          return delegate;
121        }
122    
123        @Override
124        public void addListener(Runnable listener, Executor exec) {
125          executionList.add(listener, exec);
126    
127          // When a listener is first added, we run a task that will wait for
128          // the delegate to finish, and when it is done will run the listeners.
129          if (hasListeners.compareAndSet(false, true)) {
130            if (delegate.isDone()) {
131              // If the delegate is already done, run the execution list
132              // immediately on the current thread.
133              executionList.execute();
134              return;
135            }
136    
137            adapterExecutor.execute(new Runnable() {
138              @Override
139              public void run() {
140                try {
141                  delegate.get();
142                } catch (Error e) {
143                  throw e;
144                } catch (InterruptedException e) {
145                  Thread.currentThread().interrupt();
146                  // Threads from our private pool are never interrupted.
147                  throw new AssertionError(e);
148                } catch (Throwable e) {
149                  // ExecutionException / CancellationException / RuntimeException
150                  // The task is done, run the listeners.
151                }
152                executionList.execute();
153              }
154            });
155          }
156        }
157      }
158    
159      private JdkFutureAdapters() {}
160    }