001    /*
002     * Copyright (C) 2011 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005     * in compliance with the License. You may obtain a copy of the License at
006     *
007     * http://www.apache.org/licenses/LICENSE-2.0
008     *
009     * Unless required by applicable law or agreed to in writing, software distributed under the License
010     * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011     * or implied. See the License for the specific language governing permissions and limitations under
012     * the License.
013     */
014    
015    package com.google.common.collect;
016    
017    import com.google.common.annotations.Beta;
018    import com.google.common.base.Preconditions;
019    
020    import java.util.ArrayDeque;
021    import java.util.Collection;
022    import java.util.Deque;
023    import java.util.PriorityQueue;
024    import java.util.Queue;
025    import java.util.concurrent.ArrayBlockingQueue;
026    import java.util.concurrent.BlockingQueue;
027    import java.util.concurrent.ConcurrentLinkedQueue;
028    import java.util.concurrent.LinkedBlockingDeque;
029    import java.util.concurrent.LinkedBlockingQueue;
030    import java.util.concurrent.PriorityBlockingQueue;
031    import java.util.concurrent.SynchronousQueue;
032    import java.util.concurrent.TimeUnit;
033    
034    /**
035     * Static utility methods pertaining to {@link Queue} and {@link Deque} instances.
036     * Also see this class's counterparts {@link Lists}, {@link Sets}, and {@link Maps}.
037     *
038     * @author Kurt Alfred Kluever
039     * @since 11.0
040     */
041    @Beta
042    public final class Queues {
043      private Queues() {}
044    
045      // ArrayBlockingQueue
046    
047      /**
048       * Creates an empty {@code ArrayBlockingQueue} instance.
049       *
050       * @return a new, empty {@code ArrayBlockingQueue}
051       */
052      public static <E> ArrayBlockingQueue<E> newArrayBlockingQueue(int capacity) {
053        return new ArrayBlockingQueue<E>(capacity);
054      }
055    
056      // ArrayDeque
057    
058      /**
059       * Creates an empty {@code ArrayDeque} instance.
060       *
061       * @return a new, empty {@code ArrayDeque}
062       * @since 12.0
063       */
064      public static <E> ArrayDeque<E> newArrayDeque() {
065        return new ArrayDeque<E>();
066      }
067    
068      /**
069       * Creates an {@code ArrayDeque} instance containing the given elements.
070       *
071       * @param elements the elements that the queue should contain, in order
072       * @return a new {@code ArrayDeque} containing those elements
073       * @since 12.0
074       */
075      public static <E> ArrayDeque<E> newArrayDeque(Iterable<? extends E> elements) {
076        if (elements instanceof Collection) {
077          return new ArrayDeque<E>(Collections2.cast(elements));
078        }
079        ArrayDeque<E> deque = new ArrayDeque<E>();
080        Iterables.addAll(deque, elements);
081        return deque;
082      }
083    
084      // ConcurrentLinkedQueue
085    
086      /**
087       * Creates an empty {@code ConcurrentLinkedQueue} instance.
088       *
089       * @return a new, empty {@code ConcurrentLinkedQueue}
090       */
091      public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue() {
092        return new ConcurrentLinkedQueue<E>();
093      }
094    
095      /**
096       * Creates an {@code ConcurrentLinkedQueue} instance containing the given elements.
097       *
098       * @param elements the elements that the queue should contain, in order
099       * @return a new {@code ConcurrentLinkedQueue} containing those elements
100       */
101      public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue(
102          Iterable<? extends E> elements) {
103        if (elements instanceof Collection) {
104          return new ConcurrentLinkedQueue<E>(Collections2.cast(elements));
105        }
106        ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
107        Iterables.addAll(queue, elements);
108        return queue;
109      }
110    
111      // LinkedBlockingDeque
112    
113      /**
114       * Creates an empty {@code LinkedBlockingDeque} instance.
115       *
116       * @return a new, empty {@code LinkedBlockingDeque}
117       * @since 12.0
118       */
119      public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque() {
120        return new LinkedBlockingDeque<E>();
121      }
122    
123      /**
124       * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
125       *
126       * @param capacity the capacity of this deque
127       * @return a new, empty {@code LinkedBlockingDeque}
128       * @throws IllegalArgumentException if {@code capacity} is less than 1
129       * @since 12.0
130       */
131      public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(int capacity) {
132        return new LinkedBlockingDeque<E>(capacity);
133      }
134    
135      /**
136       * Creates an {@code LinkedBlockingDeque} instance containing the given elements.
137       *
138       * @param elements the elements that the queue should contain, in order
139       * @return a new {@code LinkedBlockingDeque} containing those elements
140       * @since 12.0
141       */
142      public static <E> LinkedBlockingDeque<E> newLinkedBlockingDeque(Iterable<? extends E> elements) {
143        if (elements instanceof Collection) {
144          return new LinkedBlockingDeque<E>(Collections2.cast(elements));
145        }
146        LinkedBlockingDeque<E> deque = new LinkedBlockingDeque<E>();
147        Iterables.addAll(deque, elements);
148        return deque;
149      }
150    
151      // LinkedBlockingQueue
152    
153      /**
154       * Creates an empty {@code LinkedBlockingQueue} instance.
155       *
156       * @return a new, empty {@code LinkedBlockingQueue}
157       */
158      public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue() {
159        return new LinkedBlockingQueue<E>();
160      }
161    
162      /**
163       * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
164       *
165       * @param capacity the capacity of this queue
166       * @return a new, empty {@code LinkedBlockingQueue}
167       * @throws IllegalArgumentException if {@code capacity} is less than 1
168       */
169      public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(int capacity) {
170        return new LinkedBlockingQueue<E>(capacity);
171      }
172    
173      /**
174       * Creates an {@code LinkedBlockingQueue} instance containing the given elements.
175       *
176       * @param elements the elements that the queue should contain, in order
177       * @return a new {@code LinkedBlockingQueue} containing those elements
178       */
179      public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(Iterable<? extends E> elements) {
180        if (elements instanceof Collection) {
181          return new LinkedBlockingQueue<E>(Collections2.cast(elements));
182        }
183        LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>();
184        Iterables.addAll(queue, elements);
185        return queue;
186      }
187    
188      // LinkedList: see {@link com.google.common.collect.Lists}
189    
190      // PriorityBlockingQueue
191    
192      /**
193       * Creates an empty {@code PriorityBlockingQueue} instance.
194       *
195       * @return a new, empty {@code PriorityBlockingQueue}
196       */
197      public static <E> PriorityBlockingQueue<E> newPriorityBlockingQueue() {
198        return new PriorityBlockingQueue<E>();
199      }
200    
201      /**
202       * Creates an {@code PriorityBlockingQueue} instance containing the given elements.
203       *
204       * @param elements the elements that the queue should contain, in order
205       * @return a new {@code PriorityBlockingQueue} containing those elements
206       */
207      public static <E> PriorityBlockingQueue<E> newPriorityBlockingQueue(
208          Iterable<? extends E> elements) {
209        if (elements instanceof Collection) {
210          return new PriorityBlockingQueue<E>(Collections2.cast(elements));
211        }
212        PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>();
213        Iterables.addAll(queue, elements);
214        return queue;
215      }
216    
217      // PriorityQueue
218    
219      /**
220       * Creates an empty {@code PriorityQueue} instance.
221       *
222       * @return a new, empty {@code PriorityQueue}
223       */
224      public static <E> PriorityQueue<E> newPriorityQueue() {
225        return new PriorityQueue<E>();
226      }
227    
228      /**
229       * Creates an {@code PriorityQueue} instance containing the given elements.
230       *
231       * @param elements the elements that the queue should contain, in order
232       * @return a new {@code PriorityQueue} containing those elements
233       */
234      public static <E> PriorityQueue<E> newPriorityQueue(Iterable<? extends E> elements) {
235        if (elements instanceof Collection) {
236          return new PriorityQueue<E>(Collections2.cast(elements));
237        }
238        PriorityQueue<E> queue = new PriorityQueue<E>();
239        Iterables.addAll(queue, elements);
240        return queue;
241      }
242    
243      // SynchronousQueue
244    
245      /**
246       * Creates an empty {@code SynchronousQueue} instance.
247       *
248       * @return a new, empty {@code SynchronousQueue}
249       */
250      public static <E> SynchronousQueue<E> newSynchronousQueue() {
251        return new SynchronousQueue<E>();
252      }
253      
254      /**
255       * Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested 
256       * {@code numElements} elements are not available, it will wait for them up to the specified
257       * timeout.
258       * 
259       * @param q the blocking queue to be drained
260       * @param buffer where to add the transferred elements
261       * @param numElements the number of elements to be waited for
262       * @param timeout how long to wait before giving up, in units of {@code unit}
263       * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
264       * @return the number of elements transferred
265       * @throws InterruptedException if interrupted while waiting
266       */
267      public static <E> int drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements,
268          long timeout, TimeUnit unit) throws InterruptedException {
269        Preconditions.checkNotNull(buffer);
270        /*
271         * This code performs one System.nanoTime() more than necessary, and in return, the time to
272         * execute Queue#drainTo is not added *on top* of waiting for the timeout (which could make
273         * the timeout arbitrarily inaccurate, given a queue that is slow to drain).
274         */
275        long deadline = System.nanoTime() + unit.toNanos(timeout);
276        int added = 0;
277        while (added < numElements) {
278          // we could rely solely on #poll, but #drainTo might be more efficient when there are multiple
279          // elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
280          added += q.drainTo(buffer, numElements - added);
281          if (added < numElements) { // not enough elements immediately available; will have to poll
282            E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
283            if (e == null) {
284              break; // we already waited enough, and there are no more elements in sight
285            }
286            buffer.add(e);
287            added++;
288          }
289        }
290        return added;
291      }
292      
293      /**
294       * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)}, 
295       * but with a different behavior in case it is interrupted while waiting. In that case, the 
296       * operation will continue as usual, and in the end the thread's interruption status will be set 
297       * (no {@code InterruptedException} is thrown). 
298       * 
299       * @param q the blocking queue to be drained
300       * @param buffer where to add the transferred elements
301       * @param numElements the number of elements to be waited for
302       * @param timeout how long to wait before giving up, in units of {@code unit}
303       * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
304       * @return the number of elements transferred
305       */
306      public static <E> int drainUninterruptibly(BlockingQueue<E> q, Collection<? super E> buffer, 
307          int numElements, long timeout, TimeUnit unit) {
308        Preconditions.checkNotNull(buffer);
309        long deadline = System.nanoTime() + unit.toNanos(timeout);
310        int added = 0;
311        boolean interrupted = false;
312        try {
313          while (added < numElements) {
314            // we could rely solely on #poll, but #drainTo might be more efficient when there are 
315            // multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
316            added += q.drainTo(buffer, numElements - added);
317            if (added < numElements) { // not enough elements immediately available; will have to poll
318              E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
319              while (true) {
320                try {
321                  e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
322                  break;
323                } catch (InterruptedException ex) {
324                  interrupted = true; // note interruption and retry
325                }
326              }
327              if (e == null) {
328                break; // we already waited enough, and there are no more elements in sight
329              }
330              buffer.add(e);
331              added++;
332            }
333          }
334        } finally {
335          if (interrupted) {
336            Thread.currentThread().interrupt();
337          }
338        }
339        return added;
340      }
341    }