001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020
021import com.google.common.annotations.Beta;
022
023import java.util.concurrent.Executor;
024import java.util.concurrent.Executors;
025import java.util.concurrent.Future;
026import java.util.concurrent.ThreadFactory;
027import java.util.concurrent.atomic.AtomicBoolean;
028
029/**
030 * Utilities necessary for working with libraries that supply plain {@link
031 * Future} instances. Note that, whenver possible, it is strongly preferred to
032 * modify those libraries to return {@code ListenableFuture} directly.
033 *
034 * @author Sven Mawson
035 * @since 10.0 (replacing {@code Futures.makeListenable}, which
036 *     existed in 1.0)
037 */
038@Beta
039public final class JdkFutureAdapters {
040  /**
041   * Assigns a thread to the given {@link Future} to provide {@link
042   * ListenableFuture} functionality.
043   *
044   * <p><b>Warning:</b> If the input future does not already implement {@code
045   * ListenableFuture}, the returned future will emulate {@link
046   * ListenableFuture#addListener} by taking a thread from an internal,
047   * unbounded pool at the first call to {@code addListener} and holding it
048   * until the future is {@linkplain Future#isDone() done}.
049   *
050   * <p>Prefer to create {@code ListenableFuture} instances with {@link
051   * SettableFuture}, {@link MoreExecutors#listeningDecorator(
052   * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
053   * {@link AbstractFuture}, and other utilities over creating plain {@code
054   * Future} instances to be upgraded to {@code ListenableFuture} after the
055   * fact.
056   */
057  public static <V> ListenableFuture<V> listenInPoolThread(
058      Future<V> future) {
059    if (future instanceof ListenableFuture) {
060      return (ListenableFuture<V>) future;
061    }
062    return new ListenableFutureAdapter<V>(future);
063  }
064
065  /**
066   * Submits a blocking task for the given {@link Future} to provide {@link
067   * ListenableFuture} functionality.
068   *
069   * <p><b>Warning:</b> If the input future does not already implement {@code
070   * ListenableFuture}, the returned future will emulate {@link
071   * ListenableFuture#addListener} by submitting a task to the given executor at
072   * at the first call to {@code addListener}. The task must be started by the
073   * executor promptly, or else the returned {@code ListenableFuture} may fail
074   * to work.  The task's execution consists of blocking until the input future
075   * is {@linkplain Future#isDone() done}, so each call to this method may
076   * claim and hold a thread for an arbitrary length of time. Use of bounded
077   * executors or other executors that may fail to execute a task promptly may
078   * result in deadlocks.
079   *
080   * <p>Prefer to create {@code ListenableFuture} instances with {@link
081   * SettableFuture}, {@link MoreExecutors#listeningDecorator(
082   * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
083   * {@link AbstractFuture}, and other utilities over creating plain {@code
084   * Future} instances to be upgraded to {@code ListenableFuture} after the
085   * fact.
086   *
087   * @since 12.0
088   */
089  public static <V> ListenableFuture<V> listenInPoolThread(
090      Future<V> future, Executor executor) {
091    checkNotNull(executor);
092    if (future instanceof ListenableFuture) {
093      return (ListenableFuture<V>) future;
094    }
095    return new ListenableFutureAdapter<V>(future, executor);
096  }
097
098  /**
099   * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
100   * will wait on the future to finish, and when it completes, run the
101   * listeners.  This implementation will wait on the source future
102   * indefinitely, so if the source future never completes, the adapter will
103   * never complete either.
104   *
105   * <p>If the delegate future is interrupted or throws an unexpected unchecked
106   * exception, the listeners will not be invoked.
107   */
108  private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
109      implements ListenableFuture<V> {
110
111    private static final ThreadFactory threadFactory =
112        new ThreadFactoryBuilder()
113            .setDaemon(true)
114            .setNameFormat("ListenableFutureAdapter-thread-%d")
115            .build();
116    private static final Executor defaultAdapterExecutor =
117        Executors.newCachedThreadPool(threadFactory);
118
119    private final Executor adapterExecutor;
120
121    // The execution list to hold our listeners.
122    private final ExecutionList executionList = new ExecutionList();
123
124    // This allows us to only start up a thread waiting on the delegate future
125    // when the first listener is added.
126    private final AtomicBoolean hasListeners = new AtomicBoolean(false);
127
128    // The delegate future.
129    private final Future<V> delegate;
130
131    ListenableFutureAdapter(Future<V> delegate) {
132      this(delegate, defaultAdapterExecutor);
133    }
134
135    ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
136      this.delegate = checkNotNull(delegate);
137      this.adapterExecutor = checkNotNull(adapterExecutor);
138    }
139
140    @Override
141    protected Future<V> delegate() {
142      return delegate;
143    }
144
145    @Override
146    public void addListener(Runnable listener, Executor exec) {
147      executionList.add(listener, exec);
148
149      // When a listener is first added, we run a task that will wait for
150      // the delegate to finish, and when it is done will run the listeners.
151      if (hasListeners.compareAndSet(false, true)) {
152        if (delegate.isDone()) {
153          // If the delegate is already done, run the execution list
154          // immediately on the current thread.
155          executionList.execute();
156          return;
157        }
158
159        adapterExecutor.execute(new Runnable() {
160          @Override
161          public void run() {
162            try {
163              delegate.get();
164            } catch (Error e) {
165              throw e;
166            } catch (InterruptedException e) {
167              Thread.currentThread().interrupt();
168              // Threads from our private pool are never interrupted.
169              throw new AssertionError(e);
170            } catch (Throwable e) {
171              // ExecutionException / CancellationException / RuntimeException
172              // The task is done, run the listeners.
173            }
174            executionList.execute();
175          }
176        });
177      }
178    }
179  }
180
181  private JdkFutureAdapters() {}
182}