001    /*
002     * Copyright (C) 2009 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License");
005     * you may not use this file except in compliance with the License.
006     * You may obtain a copy of the License at
007     *
008     * http://www.apache.org/licenses/LICENSE-2.0
009     *
010     * Unless required by applicable law or agreed to in writing, software
011     * distributed under the License is distributed on an "AS IS" BASIS,
012     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013     * See the License for the specific language governing permissions and
014     * limitations under the License.
015     */
016    
017    package com.google.common.util.concurrent;
018    
019    import static com.google.common.base.Preconditions.checkNotNull;
020    
021    import com.google.common.annotations.Beta;
022    import com.google.common.annotations.VisibleForTesting;
023    
024    import java.util.concurrent.Executor;
025    import java.util.concurrent.Executors;
026    import java.util.concurrent.Future;
027    import java.util.concurrent.ThreadFactory;
028    import java.util.concurrent.atomic.AtomicBoolean;
029    
030    /**
031     * Utilities necessary for working with libraries that supply plain {@link
032     * Future} instances.
033     *
034     * @author Sven Mawson
035     * @since 10.0 (replacing {@code Futures.makeListenable}, which
036     *     existed in 1.0)
037     */
038    @Beta
039    public
040    final class JdkFutureAdapters {
041      /**
042       * Assigns a thread to the given {@link Future} to provide {@link
043       * ListenableFuture} functionality.
044       *
045       * <p><b>Warning:</b> If the input future does not already implement {@link
046       * ListenableFuture}, the returned future will emulate {@link
047       * ListenableFuture#addListener} by taking a thread from an internal,
048       * unbounded pool at the first call to {@code addListener} and holding it
049       * until the future is {@linkplain Future#isDone() done}.
050       *
051       * <p>Prefer to create {@code ListenableFuture} instances with {@link
052       * SettableFuture}, {@link MoreExecutors#listeningDecorator(
053       * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
054       * {@link AbstractFuture}, and other utilities over creating plain {@code
055       * Future} instances to be upgraded to {@code ListenableFuture} after the
056       * fact.
057       */
058      public static <V> ListenableFuture<V> listenInPoolThread(
059          Future<V> future) {
060        if (future instanceof ListenableFuture<?>) {
061          return (ListenableFuture<V>) future;
062        }
063        return new ListenableFutureAdapter<V>(future);
064      }
065    
066      @VisibleForTesting
067      static <V> ListenableFuture<V> listenInPoolThread(
068          Future<V> future, Executor executor) {
069        checkNotNull(executor);
070        if (future instanceof ListenableFuture<?>) {
071          return (ListenableFuture<V>) future;
072        }
073        return new ListenableFutureAdapter<V>(future, executor);
074      }
075    
076      /**
077       * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
078       * will wait on the future to finish, and when it completes, run the
079       * listeners.  This implementation will wait on the source future
080       * indefinitely, so if the source future never completes, the adapter will
081       * never complete either.
082       *
083       * <p>If the delegate future is interrupted or throws an unexpected unchecked
084       * exception, the listeners will not be invoked.
085       */
086      private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
087          implements ListenableFuture<V> {
088    
089        private static final ThreadFactory threadFactory =
090            new ThreadFactoryBuilder()
091                .setNameFormat("ListenableFutureAdapter-thread-%d")
092                .build();
093        private static final Executor defaultAdapterExecutor =
094            Executors.newCachedThreadPool(threadFactory);
095    
096        private final Executor adapterExecutor;
097    
098        // The execution list to hold our listeners.
099        private final ExecutionList executionList = new ExecutionList();
100    
101        // This allows us to only start up a thread waiting on the delegate future
102        // when the first listener is added.
103        private final AtomicBoolean hasListeners = new AtomicBoolean(false);
104    
105        // The delegate future.
106        private final Future<V> delegate;
107    
108        ListenableFutureAdapter(Future<V> delegate) {
109          this(delegate, defaultAdapterExecutor);
110        }
111    
112        ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
113          this.delegate = checkNotNull(delegate);
114          this.adapterExecutor = checkNotNull(adapterExecutor);
115        }
116    
117        @Override
118        protected Future<V> delegate() {
119          return delegate;
120        }
121    
122        @Override
123        public void addListener(Runnable listener, Executor exec) {
124          executionList.add(listener, exec);
125    
126          // When a listener is first added, we run a task that will wait for
127          // the delegate to finish, and when it is done will run the listeners.
128          if (hasListeners.compareAndSet(false, true)) {
129            if (delegate.isDone()) {
130              // If the delegate is already done, run the execution list
131              // immediately on the current thread.
132              executionList.execute();
133              return;
134            }
135    
136            adapterExecutor.execute(new Runnable() {
137              @Override
138              public void run() {
139                try {
140                  delegate.get();
141                } catch (Error e) {
142                  throw e;
143                } catch (InterruptedException e) {
144                  Thread.currentThread().interrupt();
145                  // Threads from our private pool are never interrupted.
146                  throw new AssertionError(e);
147                } catch (Throwable e) {
148                  // ExecutionException / CancellationException / RuntimeException
149                  // The task is done, run the listeners.
150                }
151                executionList.execute();
152              }
153            });
154          }
155        }
156      }
157    
158      private JdkFutureAdapters() {}
159    }