001/*
002 * Copyright (C) 2011 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static java.util.concurrent.TimeUnit.NANOSECONDS;
020
021import com.google.common.annotations.Beta;
022import com.google.common.annotations.GwtCompatible;
023import com.google.common.annotations.GwtIncompatible;
024import com.google.common.base.Preconditions;
025
026import java.util.concurrent.BlockingQueue;
027import java.util.concurrent.CancellationException;
028import java.util.concurrent.CountDownLatch;
029import java.util.concurrent.ExecutionException;
030import java.util.concurrent.Future;
031import java.util.concurrent.Semaphore;
032import java.util.concurrent.TimeUnit;
033import java.util.concurrent.TimeoutException;
034
035/**
036 * Utilities for treating interruptible operations as uninterruptible.
037 * In all cases, if a thread is interrupted during such a call, the call
038 * continues to block until the result is available or the timeout elapses,
039 * and only then re-interrupts the thread.
040 *
041 * @author Anthony Zana
042 * @since 10.0
043 */
044@Beta
045@GwtCompatible(emulated = true)
046public final class Uninterruptibles {
047
048  // Implementation Note: As of 3-7-11, the logic for each blocking/timeout
049  // methods is identical, save for method being invoked.
050
051  /**
052   * Invokes {@code latch.}{@link CountDownLatch#await() await()}
053   * uninterruptibly.
054   */
055  @GwtIncompatible("concurrency")
056  public static void awaitUninterruptibly(CountDownLatch latch) {
057    boolean interrupted = false;
058    try {
059      while (true) {
060        try {
061          latch.await();
062          return;
063        } catch (InterruptedException e) {
064          interrupted = true;
065        }
066      }
067    } finally {
068      if (interrupted) {
069        Thread.currentThread().interrupt();
070      }
071    }
072  }
073
074  /**
075   * Invokes
076   * {@code latch.}{@link CountDownLatch#await(long, TimeUnit)
077   * await(timeout, unit)} uninterruptibly.
078   */
079  @GwtIncompatible("concurrency")
080  public static boolean awaitUninterruptibly(CountDownLatch latch,
081      long timeout, TimeUnit unit) {
082    boolean interrupted = false;
083    try {
084      long remainingNanos = unit.toNanos(timeout);
085      long end = System.nanoTime() + remainingNanos;
086
087      while (true) {
088        try {
089          // CountDownLatch treats negative timeouts just like zero.
090          return latch.await(remainingNanos, NANOSECONDS);
091        } catch (InterruptedException e) {
092          interrupted = true;
093          remainingNanos = end - System.nanoTime();
094        }
095      }
096    } finally {
097      if (interrupted) {
098        Thread.currentThread().interrupt();
099      }
100    }
101  }
102
103  /**
104   * Invokes {@code toJoin.}{@link Thread#join() join()} uninterruptibly.
105   */
106  @GwtIncompatible("concurrency")
107  public static void joinUninterruptibly(Thread toJoin) {
108    boolean interrupted = false;
109    try {
110      while (true) {
111        try {
112          toJoin.join();
113          return;
114        } catch (InterruptedException e) {
115          interrupted = true;
116        }
117      }
118    } finally {
119      if (interrupted) {
120        Thread.currentThread().interrupt();
121      }
122    }
123  }
124
125  /**
126   * Invokes {@code future.}{@link Future#get() get()} uninterruptibly.
127   * To get uninterruptibility and remove checked exceptions, see
128   * {@link Futures#getUnchecked}.
129   *
130   * <p>If instead, you wish to treat {@link InterruptedException} uniformly
131   * with other exceptions, see {@link Futures#getChecked(Future, Class)
132   * Futures.getChecked}.
133   *
134   * @throws ExecutionException if the computation threw an exception
135   * @throws CancellationException if the computation was cancelled
136   */
137  public static <V> V getUninterruptibly(Future<V> future)
138      throws ExecutionException {
139    boolean interrupted = false;
140    try {
141      while (true) {
142        try {
143          return future.get();
144        } catch (InterruptedException e) {
145          interrupted = true;
146        }
147      }
148    } finally {
149      if (interrupted) {
150        Thread.currentThread().interrupt();
151      }
152    }
153  }
154
155  /**
156   * Invokes
157   * {@code future.}{@link Future#get(long, TimeUnit) get(timeout, unit)}
158   * uninterruptibly.
159   *
160   * <p>If instead, you wish to treat {@link InterruptedException} uniformly
161   * with other exceptions, see {@link Futures#getChecked(Future, Class)
162   * Futures.getChecked}.
163   *
164   * @throws ExecutionException if the computation threw an exception
165   * @throws CancellationException if the computation was cancelled
166   * @throws TimeoutException if the wait timed out
167   */
168  @GwtIncompatible("TODO")
169  public static <V> V getUninterruptibly(
170      Future<V> future, long timeout, TimeUnit unit)
171          throws ExecutionException, TimeoutException {
172    boolean interrupted = false;
173    try {
174      long remainingNanos = unit.toNanos(timeout);
175      long end = System.nanoTime() + remainingNanos;
176
177      while (true) {
178        try {
179          // Future treats negative timeouts just like zero.
180          return future.get(remainingNanos, NANOSECONDS);
181        } catch (InterruptedException e) {
182          interrupted = true;
183          remainingNanos = end - System.nanoTime();
184        }
185      }
186    } finally {
187      if (interrupted) {
188        Thread.currentThread().interrupt();
189      }
190    }
191  }
192
193  /**
194   * Invokes
195   * {@code unit.}{@link TimeUnit#timedJoin(Thread, long)
196   * timedJoin(toJoin, timeout)} uninterruptibly.
197   */
198  @GwtIncompatible("concurrency")
199  public static void joinUninterruptibly(Thread toJoin,
200      long timeout, TimeUnit unit) {
201    Preconditions.checkNotNull(toJoin);
202    boolean interrupted = false;
203    try {
204      long remainingNanos = unit.toNanos(timeout);
205      long end = System.nanoTime() + remainingNanos;
206      while (true) {
207        try {
208          // TimeUnit.timedJoin() treats negative timeouts just like zero.
209          NANOSECONDS.timedJoin(toJoin, remainingNanos);
210          return;
211        } catch (InterruptedException e) {
212          interrupted = true;
213          remainingNanos = end - System.nanoTime();
214        }
215      }
216    } finally {
217      if (interrupted) {
218        Thread.currentThread().interrupt();
219      }
220    }
221  }
222
223  /**
224   * Invokes {@code queue.}{@link BlockingQueue#take() take()} uninterruptibly.
225   */
226  @GwtIncompatible("concurrency")
227  public static <E> E takeUninterruptibly(BlockingQueue<E> queue) {
228    boolean interrupted = false;
229    try {
230      while (true) {
231        try {
232          return queue.take();
233        } catch (InterruptedException e) {
234          interrupted = true;
235        }
236      }
237    } finally {
238      if (interrupted) {
239        Thread.currentThread().interrupt();
240      }
241    }
242  }
243
244  /**
245   * Invokes {@code queue.}{@link BlockingQueue#put(Object) put(element)}
246   * uninterruptibly.
247   *
248   * @throws ClassCastException if the class of the specified element prevents
249   *     it from being added to the given queue
250   * @throws IllegalArgumentException if some property of the specified element
251   *     prevents it from being added to the given queue
252   */
253  @GwtIncompatible("concurrency")
254  public static <E> void putUninterruptibly(BlockingQueue<E> queue, E element) {
255    boolean interrupted = false;
256    try {
257      while (true) {
258        try {
259          queue.put(element);
260          return;
261        } catch (InterruptedException e) {
262          interrupted = true;
263        }
264      }
265    } finally {
266      if (interrupted) {
267        Thread.currentThread().interrupt();
268      }
269    }
270  }
271
272  // TODO(user): Support Sleeper somehow (wrapper or interface method)?
273  /**
274   * Invokes {@code unit.}{@link TimeUnit#sleep(long) sleep(sleepFor)}
275   * uninterruptibly.
276   */
277  @GwtIncompatible("concurrency")
278  public static void sleepUninterruptibly(long sleepFor, TimeUnit unit) {
279    boolean interrupted = false;
280    try {
281      long remainingNanos = unit.toNanos(sleepFor);
282      long end = System.nanoTime() + remainingNanos;
283      while (true) {
284        try {
285          // TimeUnit.sleep() treats negative timeouts just like zero.
286          NANOSECONDS.sleep(remainingNanos);
287          return;
288        } catch (InterruptedException e) {
289          interrupted = true;
290          remainingNanos = end - System.nanoTime();
291        }
292      }
293    } finally {
294      if (interrupted) {
295        Thread.currentThread().interrupt();
296      }
297    }
298  }
299
300  /**
301   * Invokes {@code semaphore.}{@link Semaphore#tryAcquire(int, long, TimeUnit)
302   * tryAcquire(1, timeout, unit)} uninterruptibly.
303   *
304   * @since 18.0
305   */
306  @GwtIncompatible("concurrency")
307  public static boolean tryAcquireUninterruptibly(
308      Semaphore semaphore, long timeout, TimeUnit unit) {
309    return tryAcquireUninterruptibly(semaphore, 1, timeout, unit);
310  }
311
312  /**
313   * Invokes {@code semaphore.}{@link Semaphore#tryAcquire(int, long, TimeUnit)
314   * tryAcquire(permits, timeout, unit)} uninterruptibly.
315   *
316   * @since 18.0
317   */
318  @GwtIncompatible("concurrency")
319  public static boolean tryAcquireUninterruptibly(
320      Semaphore semaphore, int permits, long timeout, TimeUnit unit) {
321    boolean interrupted = false;
322    try {
323      long remainingNanos = unit.toNanos(timeout);
324      long end = System.nanoTime() + remainingNanos;
325
326      while (true) {
327        try {
328          // Semaphore treats negative timeouts just like zero.
329          return semaphore.tryAcquire(permits, remainingNanos, NANOSECONDS);
330        } catch (InterruptedException e) {
331          interrupted = true;
332          remainingNanos = end - System.nanoTime();
333        }
334      }
335    } finally {
336      if (interrupted) {
337        Thread.currentThread().interrupt();
338      }
339    }
340  }
341
342  // TODO(user): Add support for waitUninterruptibly.
343
344  private Uninterruptibles() {}
345}