001/*
002 * Copyright (C) 2009 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Preconditions.checkNotNull;
020import static com.google.common.util.concurrent.Uninterruptibles.getUninterruptibly;
021
022import com.google.common.annotations.Beta;
023
024import java.util.concurrent.Executor;
025import java.util.concurrent.Executors;
026import java.util.concurrent.Future;
027import java.util.concurrent.ThreadFactory;
028import java.util.concurrent.atomic.AtomicBoolean;
029
030/**
031 * Utilities necessary for working with libraries that supply plain {@link
032 * Future} instances. Note that, whenver possible, it is strongly preferred to
033 * modify those libraries to return {@code ListenableFuture} directly.
034 *
035 * @author Sven Mawson
036 * @since 10.0 (replacing {@code Futures.makeListenable}, which
037 *     existed in 1.0)
038 */
039@Beta
040public final class JdkFutureAdapters {
041  /**
042   * Assigns a thread to the given {@link Future} to provide {@link
043   * ListenableFuture} functionality.
044   *
045   * <p><b>Warning:</b> If the input future does not already implement {@code
046   * ListenableFuture}, the returned future will emulate {@link
047   * ListenableFuture#addListener} by taking a thread from an internal,
048   * unbounded pool at the first call to {@code addListener} and holding it
049   * until the future is {@linkplain Future#isDone() done}.
050   *
051   * <p>Prefer to create {@code ListenableFuture} instances with {@link
052   * SettableFuture}, {@link MoreExecutors#listeningDecorator(
053   * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
054   * {@link AbstractFuture}, and other utilities over creating plain {@code
055   * Future} instances to be upgraded to {@code ListenableFuture} after the
056   * fact.
057   */
058  public static <V> ListenableFuture<V> listenInPoolThread(
059      Future<V> future) {
060    if (future instanceof ListenableFuture) {
061      return (ListenableFuture<V>) future;
062    }
063    return new ListenableFutureAdapter<V>(future);
064  }
065
066  /**
067   * Submits a blocking task for the given {@link Future} to provide {@link
068   * ListenableFuture} functionality.
069   *
070   * <p><b>Warning:</b> If the input future does not already implement {@code
071   * ListenableFuture}, the returned future will emulate {@link
072   * ListenableFuture#addListener} by submitting a task to the given executor at
073   * the first call to {@code addListener}. The task must be started by the
074   * executor promptly, or else the returned {@code ListenableFuture} may fail
075   * to work.  The task's execution consists of blocking until the input future
076   * is {@linkplain Future#isDone() done}, so each call to this method may
077   * claim and hold a thread for an arbitrary length of time. Use of bounded
078   * executors or other executors that may fail to execute a task promptly may
079   * result in deadlocks.
080   *
081   * <p>Prefer to create {@code ListenableFuture} instances with {@link
082   * SettableFuture}, {@link MoreExecutors#listeningDecorator(
083   * java.util.concurrent.ExecutorService)}, {@link ListenableFutureTask},
084   * {@link AbstractFuture}, and other utilities over creating plain {@code
085   * Future} instances to be upgraded to {@code ListenableFuture} after the
086   * fact.
087   *
088   * @since 12.0
089   */
090  public static <V> ListenableFuture<V> listenInPoolThread(
091      Future<V> future, Executor executor) {
092    checkNotNull(executor);
093    if (future instanceof ListenableFuture) {
094      return (ListenableFuture<V>) future;
095    }
096    return new ListenableFutureAdapter<V>(future, executor);
097  }
098
099  /**
100   * An adapter to turn a {@link Future} into a {@link ListenableFuture}.  This
101   * will wait on the future to finish, and when it completes, run the
102   * listeners.  This implementation will wait on the source future
103   * indefinitely, so if the source future never completes, the adapter will
104   * never complete either.
105   *
106   * <p>If the delegate future is interrupted or throws an unexpected unchecked
107   * exception, the listeners will not be invoked.
108   */
109  private static class ListenableFutureAdapter<V> extends ForwardingFuture<V>
110      implements ListenableFuture<V> {
111
112    private static final ThreadFactory threadFactory =
113        new ThreadFactoryBuilder()
114            .setDaemon(true)
115            .setNameFormat("ListenableFutureAdapter-thread-%d")
116            .build();
117    private static final Executor defaultAdapterExecutor =
118        Executors.newCachedThreadPool(threadFactory);
119
120    private final Executor adapterExecutor;
121
122    // The execution list to hold our listeners.
123    private final ExecutionList executionList = new ExecutionList();
124
125    // This allows us to only start up a thread waiting on the delegate future
126    // when the first listener is added.
127    private final AtomicBoolean hasListeners = new AtomicBoolean(false);
128
129    // The delegate future.
130    private final Future<V> delegate;
131
132    ListenableFutureAdapter(Future<V> delegate) {
133      this(delegate, defaultAdapterExecutor);
134    }
135
136    ListenableFutureAdapter(Future<V> delegate, Executor adapterExecutor) {
137      this.delegate = checkNotNull(delegate);
138      this.adapterExecutor = checkNotNull(adapterExecutor);
139    }
140
141    @Override
142    protected Future<V> delegate() {
143      return delegate;
144    }
145
146    @Override
147    public void addListener(Runnable listener, Executor exec) {
148      executionList.add(listener, exec);
149
150      // When a listener is first added, we run a task that will wait for
151      // the delegate to finish, and when it is done will run the listeners.
152      if (hasListeners.compareAndSet(false, true)) {
153        if (delegate.isDone()) {
154          // If the delegate is already done, run the execution list
155          // immediately on the current thread.
156          executionList.execute();
157          return;
158        }
159
160        // TODO(lukes): handle RejectedExecutionException
161        adapterExecutor.execute(new Runnable() {
162          @Override
163          public void run() {
164            try {
165              /*
166               * Threads from our private pool are never interrupted. Threads
167               * from a user-supplied executor might be, but... what can we do?
168               * This is another reason to return a proper ListenableFuture
169               * instead of using listenInPoolThread.
170               */
171              getUninterruptibly(delegate);
172            } catch (Throwable e) {
173              // ExecutionException / CancellationException / RuntimeException / Error
174              // The task is presumably done, run the listeners.
175            }
176            executionList.execute();
177          }
178        });
179      }
180    }
181  }
182
183  private JdkFutureAdapters() {}
184}