001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import static com.google.common.base.Preconditions.checkNotNull;
018import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
019
020import com.google.common.annotations.Beta;
021import com.google.common.annotations.GwtIncompatible;
022import java.util.concurrent.Executor;
023import java.util.concurrent.Executors;
024import java.util.concurrent.Future;
025import java.util.concurrent.ThreadFactory;
026import java.util.concurrent.atomic.AtomicBoolean;
027
028/**
029 * Utilities necessary for working with libraries that supply plain {@link Future} instances. Note
030 * that, whenver possible, it is strongly preferred to modify those libraries to return {@code
031 * ListenableFuture} directly.
032 *
033 * @author Sven Mawson
034 * @since 10.0 (replacing {@code Futures.makeListenable}, which existed in 1.0)
035 */
036@Beta
037@GwtIncompatible
038public final class JdkFutureAdapters {
039  /**
040   * Assigns a thread to the given {@link Future} to provide {@link ListenableFuture} functionality.
041   *
042   * <p><b>Warning:</b> If the input future does not already implement {@code ListenableFuture}, the
043   * returned future will emulate {@link ListenableFuture#addListener} by taking a thread from an
044   * internal, unbounded pool at the first call to {@code addListener} and holding it until the
045   * future is {@linkplain Future#isDone() done}.
046   *
047   * <p>Prefer to create {@code ListenableFuture} instances with {@link SettableFuture}, {@link
048   * MoreExecutors#listeningDecorator( java.util.concurrent.ExecutorService)}, {@link
049   * ListenableFutureTask}, {@link AbstractFuture}, and other utilities over creating plain {@code
050   * Future} instances to be upgraded to {@code ListenableFuture} after the fact.
051   */
052  public static <V> ListenableFuture<V> listenInPoolThread(Future<V> future) {
053    if (future instanceof ListenableFuture) {
054      return (ListenableFuture<V>) future;
055    }
056    return new ListenableFutureAdapter<V>(future);
057  }
058
059  /**
060   * Submits a blocking task for the given {@link Future} to provide {@link ListenableFuture}
061   * functionality.
062   *
063   * <p><b>Warning:</b> If the input future does not already implement {@code ListenableFuture}, the
064   * returned future will emulate {@link ListenableFuture#addListener} by submitting a task to the
065   * given executor at the first call to {@code addListener}. The task must be started by the
066   * executor promptly, or else the returned {@code ListenableFuture} may fail to work. The task's
067   * execution consists of blocking until the input future is {@linkplain Future#isDone() done}, so
068   * each call to this method may claim and hold a thread for an arbitrary length of time. Use of
069   * bounded executors or other executors that may fail to execute a task promptly may result in
070   * deadlocks.
071   *
072   * <p>Prefer to create {@code ListenableFuture} instances with {@link SettableFuture}, {@link
073   * MoreExecutors#listeningDecorator( java.util.concurrent.ExecutorService)}, {@link
074   * ListenableFutureTask}, {@link AbstractFuture}, and other utilities over creating plain {@code
075   * Future} instances to be upgraded to {@code ListenableFuture} after the fact.
076   *
077   * @since 12.0
078   */
079  public static <V> ListenableFuture<V> listenInPoolThread(Future<V> future, Executor executor) {
080    checkNotNull(executor);
081    if (future instanceof ListenableFuture) {
082      return (ListenableFuture<V>) future;
083    }
084    return new ListenableFutureAdapter<V>(future, executor);
085  }
086
087  /**
088   * An adapter to turn a {@link Future} into a {@link ListenableFuture}. This will wait on the
089   * future to finish, and when it completes, run the listeners. This implementation will wait on
090   * the source future indefinitely, so if the source future never completes, the adapter will never
091   * complete either.
092   *
093   * <p>If the delegate future is interrupted or throws an unexpected unchecked exception, the
094   * listeners will not be invoked.
095   */
096  private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
097      implements ListenableFuture<V> {
098
099    private static final ThreadFactory threadFactory =
100        new ThreadFactoryBuilder()
101            .setDaemon(true)
102            .setNameFormat("ListenableFutureAdapter-thread-%d")
103            .build();
104    private static final Executor defaultAdapterExecutor =
105        Executors.newCachedThreadPool(threadFactory);
106
107    private final Executor adapterExecutor;
108
109    // The execution list to hold our listeners.
110    private final ExecutionList executionList = new ExecutionList();
111
112    // This allows us to only start up a thread waiting on the delegate future when the first
113    // listener is added.
114    private final AtomicBoolean hasListeners = new AtomicBoolean(false);
115
116    // The delegate future.
117    private final Future<V> delegate;
118
119    ListenableFutureAdapter(Future<V> delegate) {
120      this(delegate, defaultAdapterExecutor);
121    }
122
123    ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
124      this.delegate = checkNotNull(delegate);
125      this.adapterExecutor = checkNotNull(adapterExecutor);
126    }
127
128    @Override
129    protected Future<V> delegate() {
130      return delegate;
131    }
132
133    @Override
134    public void addListener(Runnable listener, Executor exec) {
135      executionList.add(listener, exec);
136
137      // When a listener is first added, we run a task that will wait for the delegate to finish,
138      // and when it is done will run the listeners.
139      if (hasListeners.compareAndSet(false, true)) {
140        if (delegate.isDone()) {
141          // If the delegate is already done, run the execution list immediately on the current
142          // thread.
143          executionList.execute();
144          return;
145        }
146
147        // TODO(lukes): handle RejectedExecutionException
148        adapterExecutor.execute(
149            new Runnable() {
150              @Override
151              public void run() {
152                try {
153                  /*
154                   * Threads from our private pool are never interrupted. Threads from a
155                   * user-supplied executor might be, but... what can we do? This is another reason
156                   * to return a proper ListenableFuture instead of using listenInPoolThread.
157                   */
158                  getUninterruptibly(delegate);
159                } catch (Throwable e) {
160                  // ExecutionException / CancellationException / RuntimeException / Error
161                  // The task is presumably done, run the listeners.
162                }
163                executionList.execute();
164              }
165            });
166      }
167    }
168  }
169
170  private JdkFutureAdapters() {}
171}