001    /*
002     * Copyright (C) 2011 The Guava Authors
003     *
004     * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005     * in compliance with the License. You may obtain a copy of the License at
006     *
007     * http://www.apache.org/licenses/LICENSE-2.0
008     *
009     * Unless required by applicable law or agreed to in writing, software distributed under the License
010     * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011     * or implied. See the License for the specific language governing permissions and limitations under
012     * the License.
013     */
014    
015    package com.google.common.collect;
016    
017    import com.google.common.annotations.Beta;
018    import com.google.common.base.Preconditions;
019    
020    import java.util.ArrayDeque;
021    import java.util.Collection;
022    import java.util.PriorityQueue;
023    import java.util.Queue;
024    import java.util.concurrent.ArrayBlockingQueue;
025    import java.util.concurrent.BlockingQueue;
026    import java.util.concurrent.ConcurrentLinkedQueue;
027    import java.util.concurrent.LinkedBlockingDeque;
028    import java.util.concurrent.LinkedBlockingQueue;
029    import java.util.concurrent.PriorityBlockingQueue;
030    import java.util.concurrent.SynchronousQueue;
031    import java.util.concurrent.TimeUnit;
032    
033    /**
034     * Static utility methods pertaining to {@link Queue}
035     * instances. Also see this class's counterparts
036     * {@link Lists}, {@link Sets}, and {@link Maps}.
037     *
038     * @author Kurt Alfred Kluever
039     * @since 11.0
040     */
041    @Beta
042    public final class Queues {
043      private Queues() {}
044    
045      // ArrayBlockingQueue
046    
047      /**
048       * Creates an empty {@code ArrayBlockingQueue} instance.
049       *
050       * @return a new, empty {@code ArrayBlockingQueue}
051       */
052      public static <E> ArrayBlockingQueue<E> newArrayBlockingQueue(int capacity) {
053        return new ArrayBlockingQueue<E>(capacity);
054      }
055    
056      // ArrayDeque
057    
058      // ConcurrentLinkedQueue
059    
060      /**
061       * Creates an empty {@code ConcurrentLinkedQueue} instance.
062       *
063       * @return a new, empty {@code ConcurrentLinkedQueue}
064       */
065      public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue() {
066        return new ConcurrentLinkedQueue<E>();
067      }
068    
069      /**
070       * Creates an {@code ConcurrentLinkedQueue} instance containing the given elements.
071       *
072       * @param elements the elements that the queue should contain, in order
073       * @return a new {@code ConcurrentLinkedQueue} containing those elements
074       */
075      public static <E> ConcurrentLinkedQueue<E> newConcurrentLinkedQueue(
076          Iterable<? extends E> elements) {
077        if (elements instanceof Collection) {
078          return new ConcurrentLinkedQueue<E>(Collections2.cast(elements));
079        }
080        ConcurrentLinkedQueue<E> queue = new ConcurrentLinkedQueue<E>();
081        Iterables.addAll(queue, elements);
082        return queue;
083      }
084    
085      // LinkedBlockingDeque
086    
087      // LinkedBlockingQueue
088    
089      /**
090       * Creates an empty {@code LinkedBlockingQueue} instance.
091       *
092       * @return a new, empty {@code LinkedBlockingQueue}
093       */
094      public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue() {
095        return new LinkedBlockingQueue<E>();
096      }
097    
098      /**
099       * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
100       *
101       * @param capacity the capacity of this queue
102       * @return a new, empty {@code LinkedBlockingQueue}
103       * @throws IllegalArgumentException if {@code capacity} is less than 1
104       */
105      public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(int capacity) {
106        return new LinkedBlockingQueue<E>(capacity);
107      }
108    
109      /**
110       * Creates an {@code LinkedBlockingQueue} instance containing the given elements.
111       *
112       * @param elements the elements that the queue should contain, in order
113       * @return a new {@code LinkedBlockingQueue} containing those elements
114       */
115      public static <E> LinkedBlockingQueue<E> newLinkedBlockingQueue(Iterable<? extends E> elements) {
116        if (elements instanceof Collection) {
117          return new LinkedBlockingQueue<E>(Collections2.cast(elements));
118        }
119        LinkedBlockingQueue<E> queue = new LinkedBlockingQueue<E>();
120        Iterables.addAll(queue, elements);
121        return queue;
122      }
123    
124      // LinkedList: see {@link com.google.common.collect.Lists}
125    
126      // PriorityBlockingQueue
127    
128      /**
129       * Creates an empty {@code PriorityBlockingQueue} instance.
130       *
131       * @return a new, empty {@code PriorityBlockingQueue}
132       */
133      public static <E> PriorityBlockingQueue<E> newPriorityBlockingQueue() {
134        return new PriorityBlockingQueue<E>();
135      }
136    
137      /**
138       * Creates an {@code PriorityBlockingQueue} instance containing the given elements.
139       *
140       * @param elements the elements that the queue should contain, in order
141       * @return a new {@code PriorityBlockingQueue} containing those elements
142       */
143      public static <E> PriorityBlockingQueue<E> newPriorityBlockingQueue(
144          Iterable<? extends E> elements) {
145        if (elements instanceof Collection) {
146          return new PriorityBlockingQueue<E>(Collections2.cast(elements));
147        }
148        PriorityBlockingQueue<E> queue = new PriorityBlockingQueue<E>();
149        Iterables.addAll(queue, elements);
150        return queue;
151      }
152    
153      // PriorityQueue
154    
155      /**
156       * Creates an empty {@code PriorityQueue} instance.
157       *
158       * @return a new, empty {@code PriorityQueue}
159       */
160      public static <E> PriorityQueue<E> newPriorityQueue() {
161        return new PriorityQueue<E>();
162      }
163    
164      /**
165       * Creates an {@code PriorityQueue} instance containing the given elements.
166       *
167       * @param elements the elements that the queue should contain, in order
168       * @return a new {@code PriorityQueue} containing those elements
169       */
170      public static <E> PriorityQueue<E> newPriorityQueue(Iterable<? extends E> elements) {
171        if (elements instanceof Collection) {
172          return new PriorityQueue<E>(Collections2.cast(elements));
173        }
174        PriorityQueue<E> queue = new PriorityQueue<E>();
175        Iterables.addAll(queue, elements);
176        return queue;
177      }
178    
179      // SynchronousQueue
180    
181      /**
182       * Creates an empty {@code SynchronousQueue} instance.
183       *
184       * @return a new, empty {@code SynchronousQueue}
185       */
186      public static <E> SynchronousQueue<E> newSynchronousQueue() {
187        return new SynchronousQueue<E>();
188      }
189    
190      /**
191       * Drains the queue as {@link BlockingQueue#drainTo(Collection, int)}, but if the requested
192       * {@code numElements} elements are not available, it will wait for them up to the specified
193       * timeout.
194       *
195       * @param q the blocking queue to be drained
196       * @param buffer where to add the transferred elements
197       * @param numElements the number of elements to be waited for
198       * @param timeout how long to wait before giving up, in units of {@code unit}
199       * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
200       * @return the number of elements transferred
201       * @throws InterruptedException if interrupted while waiting
202       */
203      public static <E> int drain(BlockingQueue<E> q, Collection<? super E> buffer, int numElements,
204          long timeout, TimeUnit unit) throws InterruptedException {
205        Preconditions.checkNotNull(buffer);
206        /*
207         * This code performs one System.nanoTime() more than necessary, and in return, the time to
208         * execute Queue#drainTo is not added *on top* of waiting for the timeout (which could make
209         * the timeout arbitrarily inaccurate, given a queue that is slow to drain).
210         */
211        long deadline = System.nanoTime() + unit.toNanos(timeout);
212        int added = 0;
213        while (added < numElements) {
214          // we could rely solely on #poll, but #drainTo might be more efficient when there are multiple
215          // elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
216          added += q.drainTo(buffer, numElements - added);
217          if (added < numElements) { // not enough elements immediately available; will have to poll
218            E e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
219            if (e == null) {
220              break; // we already waited enough, and there are no more elements in sight
221            }
222            buffer.add(e);
223            added++;
224          }
225        }
226        return added;
227      }
228    
229      /**
230       * Drains the queue as {@linkplain #drain(BlockingQueue, Collection, int, long, TimeUnit)},
231       * but with a different behavior in case it is interrupted while waiting. In that case, the
232       * operation will continue as usual, and in the end the thread's interruption status will be set
233       * (no {@code InterruptedException} is thrown).
234       *
235       * @param q the blocking queue to be drained
236       * @param buffer where to add the transferred elements
237       * @param numElements the number of elements to be waited for
238       * @param timeout how long to wait before giving up, in units of {@code unit}
239       * @param unit a {@code TimeUnit} determining how to interpret the timeout parameter
240       * @return the number of elements transferred
241       */
242      public static <E> int drainUninterruptibly(BlockingQueue<E> q, Collection<? super E> buffer,
243          int numElements, long timeout, TimeUnit unit) {
244        Preconditions.checkNotNull(buffer);
245        long deadline = System.nanoTime() + unit.toNanos(timeout);
246        int added = 0;
247        boolean interrupted = false;
248        try {
249          while (added < numElements) {
250            // we could rely solely on #poll, but #drainTo might be more efficient when there are
251            // multiple elements already available (e.g. LinkedBlockingQueue#drainTo locks only once)
252            added += q.drainTo(buffer, numElements - added);
253            if (added < numElements) { // not enough elements immediately available; will have to poll
254              E e; // written exactly once, by a successful (uninterrupted) invocation of #poll
255              while (true) {
256                try {
257                  e = q.poll(deadline - System.nanoTime(), TimeUnit.NANOSECONDS);
258                  break;
259                } catch (InterruptedException ex) {
260                  interrupted = true; // note interruption and retry
261                }
262              }
263              if (e == null) {
264                break; // we already waited enough, and there are no more elements in sight
265              }
266              buffer.add(e);
267              added++;
268            }
269          }
270        } finally {
271          if (interrupted) {
272            Thread.currentThread().interrupt();
273          }
274        }
275        return added;
276      }
277    }