001 /*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.collect;
018
019 import static com.google.common.base.Preconditions.checkArgument;
020 import static com.google.common.base.Preconditions.checkState;
021 import static com.google.common.collect.Multisets.checkNonnegative;
022
023 import com.google.common.annotations.Beta;
024 import com.google.common.annotations.VisibleForTesting;
025 import com.google.common.collect.Serialization.FieldSetter;
026 import com.google.common.math.IntMath;
027 import com.google.common.primitives.Ints;
028
029 import java.io.IOException;
030 import java.io.ObjectInputStream;
031 import java.io.ObjectOutputStream;
032 import java.io.Serializable;
033 import java.util.Collection;
034 import java.util.Iterator;
035 import java.util.List;
036 import java.util.Map;
037 import java.util.Set;
038 import java.util.concurrent.ConcurrentHashMap;
039 import java.util.concurrent.ConcurrentMap;
040 import java.util.concurrent.atomic.AtomicInteger;
041
042 import javax.annotation.Nullable;
043
044 /**
045 * A multiset that supports concurrent modifications and that provides atomic versions of most
046 * {@code Multiset} operations (exceptions where noted). Null elements are not supported.
047 *
048 * <p>See the Guava User Guide article on <a href=
049 * "http://code.google.com/p/guava-libraries/wiki/NewCollectionTypesExplained#Multiset">
050 * {@code Multiset}</a>.
051 *
052 * @author Cliff L. Biffle
053 * @author mike nonemacher
054 * @since 2.0 (imported from Google Collections Library)
055 */
056 public final class ConcurrentHashMultiset<E> extends AbstractMultiset<E> implements Serializable {
057
058 /*
059 * The ConcurrentHashMultiset's atomic operations are implemented primarily in terms of
060 * AtomicInteger's atomic operations, with some help from ConcurrentMap's atomic operations on
061 * creation and removal (including automatic removal of zeroes). If the modification of an
062 * AtomicInteger results in zero, we compareAndSet the value to zero; if that succeeds, we remove
063 * the entry from the Map. If another operation sees a zero in the map, it knows that the entry is
064 * about to be removed, so this operation may remove it (often by replacing it with a new
065 * AtomicInteger).
066 */
067
068 /** The number of occurrences of each element. */
069 private final transient ConcurrentMap<E, AtomicInteger> countMap;
070
071 // This constant allows the deserialization code to set a final field. This holder class
072 // makes sure it is not initialized unless an instance is deserialized.
073 private static class FieldSettersHolder {
074 static final FieldSetter<ConcurrentHashMultiset> COUNT_MAP_FIELD_SETTER =
075 Serialization.getFieldSetter(ConcurrentHashMultiset.class, "countMap");
076 }
077
078 /**
079 * Creates a new, empty {@code ConcurrentHashMultiset} using the default
080 * initial capacity, load factor, and concurrency settings.
081 */
082 public static <E> ConcurrentHashMultiset<E> create() {
083 // TODO(schmoe): provide a way to use this class with other (possibly arbitrary)
084 // ConcurrentMap implementors. One possibility is to extract most of this class into
085 // an AbstractConcurrentMapMultiset.
086 return new ConcurrentHashMultiset<E>(new ConcurrentHashMap<E, AtomicInteger>());
087 }
088
089 /**
090 * Creates a new {@code ConcurrentHashMultiset} containing the specified elements, using
091 * the default initial capacity, load factor, and concurrency settings.
092 *
093 * <p>This implementation is highly efficient when {@code elements} is itself a {@link Multiset}.
094 *
095 * @param elements the elements that the multiset should contain
096 */
097 public static <E> ConcurrentHashMultiset<E> create(Iterable<? extends E> elements) {
098 ConcurrentHashMultiset<E> multiset = ConcurrentHashMultiset.create();
099 Iterables.addAll(multiset, elements);
100 return multiset;
101 }
102
103 /**
104 * Creates a new, empty {@code ConcurrentHashMultiset} using {@code mapMaker}
105 * to construct the internal backing map.
106 *
107 * <p>If this {@link MapMaker} is configured to use entry eviction of any kind, this eviction
108 * applies to all occurrences of a given element as a single unit. However, most updates to the
109 * multiset do not count as map updates at all, since we're usually just mutating the value
110 * stored in the map, so {@link MapMaker#expireAfterAccess} makes sense (evict the entry that
111 * was queried or updated longest ago), but {@link MapMaker#expireAfterWrite} doesn't, because
112 * the eviction time is measured from when we saw the first occurrence of the object.
113 *
114 * <p>The returned multiset is serializable but any serialization caveats
115 * given in {@code MapMaker} apply.
116 *
117 * <p>Finally, soft/weak values can be used but are not very useful: the values are created
118 * internally and not exposed externally, so no one else will have a strong reference to the
119 * values. Weak keys on the other hand can be useful in some scenarios.
120 *
121 * @since 7.0
122 */
123 @Beta
124 public static <E> ConcurrentHashMultiset<E> create(
125 GenericMapMaker<? super E, ? super Number> mapMaker) {
126 return new ConcurrentHashMultiset<E>(mapMaker.<E, AtomicInteger>makeMap());
127 }
128
129 /**
130 * Creates an instance using {@code countMap} to store elements and their counts.
131 *
132 * <p>This instance will assume ownership of {@code countMap}, and other code
133 * should not maintain references to the map or modify it in any way.
134 *
135 * @param countMap backing map for storing the elements in the multiset and
136 * their counts. It must be empty.
137 * @throws IllegalArgumentException if {@code countMap} is not empty
138 */
139 @VisibleForTesting ConcurrentHashMultiset(ConcurrentMap<E, AtomicInteger> countMap) {
140 checkArgument(countMap.isEmpty());
141 this.countMap = countMap;
142 }
143
144 // Query Operations
145
146 /**
147 * Returns the number of occurrences of {@code element} in this multiset.
148 *
149 * @param element the element to look for
150 * @return the nonnegative number of occurrences of the element
151 */
152 @Override public int count(@Nullable Object element) {
153 AtomicInteger existingCounter = safeGet(element);
154 return (existingCounter == null) ? 0 : existingCounter.get();
155 }
156
157 /**
158 * Depending on the type of the underlying map, map.get may throw NullPointerException or
159 * ClassCastException, if the object is null or of the wrong type. We usually just want to treat
160 * those cases as if the element isn't in the map, by catching the exceptions and returning null.
161 */
162 private AtomicInteger safeGet(Object element) {
163 try {
164 return countMap.get(element);
165 } catch (NullPointerException e) {
166 return null;
167 } catch (ClassCastException e) {
168 return null;
169 }
170 }
171
172 /**
173 * {@inheritDoc}
174 *
175 * <p>If the data in the multiset is modified by any other threads during this method,
176 * it is undefined which (if any) of these modifications will be reflected in the result.
177 */
178 @Override public int size() {
179 long sum = 0L;
180 for (AtomicInteger value : countMap.values()) {
181 sum += value.get();
182 }
183 return Ints.saturatedCast(sum);
184 }
185
186 /*
187 * Note: the superclass toArray() methods assume that size() gives a correct
188 * answer, which ours does not.
189 */
190
191 @Override public Object[] toArray() {
192 return snapshot().toArray();
193 }
194
195 @Override public <T> T[] toArray(T[] array) {
196 return snapshot().toArray(array);
197 }
198
199 /*
200 * We'd love to use 'new ArrayList(this)' or 'list.addAll(this)', but
201 * either of these would recurse back to us again!
202 */
203 private List<E> snapshot() {
204 List<E> list = Lists.newArrayListWithExpectedSize(size());
205 for (Multiset.Entry<E> entry : entrySet()) {
206 E element = entry.getElement();
207 for (int i = entry.getCount(); i > 0; i--) {
208 list.add(element);
209 }
210 }
211 return list;
212 }
213
214 // Modification Operations
215
216 /**
217 * Adds a number of occurrences of the specified element to this multiset.
218 *
219 * @param element the element to add
220 * @param occurrences the number of occurrences to add
221 * @return the previous count of the element before the operation; possibly zero
222 * @throws IllegalArgumentException if {@code occurrences} is negative, or if
223 * the resulting amount would exceed {@link Integer#MAX_VALUE}
224 */
225 @Override public int add(E element, int occurrences) {
226 if (occurrences == 0) {
227 return count(element);
228 }
229 checkArgument(occurrences > 0, "Invalid occurrences: %s", occurrences);
230
231 while (true) {
232 AtomicInteger existingCounter = safeGet(element);
233 if (existingCounter == null) {
234 existingCounter = countMap.putIfAbsent(element, new AtomicInteger(occurrences));
235 if (existingCounter == null) {
236 return 0;
237 }
238 // existingCounter != null: fall through to operate against the existing AtomicInteger
239 }
240
241 while (true) {
242 int oldValue = existingCounter.get();
243 if (oldValue != 0) {
244 try {
245 int newValue = IntMath.checkedAdd(oldValue, occurrences);
246 if (existingCounter.compareAndSet(oldValue, newValue)) {
247 // newValue can't == 0, so no need to check & remove
248 return oldValue;
249 }
250 } catch (ArithmeticException overflow) {
251 throw new IllegalArgumentException("Overflow adding " + occurrences
252 + " occurrences to a count of " + oldValue);
253 }
254 } else {
255 // In the case of a concurrent remove, we might observe a zero value, which means another
256 // thread is about to remove (element, existingCounter) from the map. Rather than wait,
257 // we can just do that work here.
258 AtomicInteger newCounter = new AtomicInteger(occurrences);
259 if ((countMap.putIfAbsent(element, newCounter) == null)
260 || countMap.replace(element, existingCounter, newCounter)) {
261 return 0;
262 }
263 break;
264 }
265 }
266
267 // If we're still here, there was a race, so just try again.
268 }
269 }
270
271 /**
272 * Removes a number of occurrences of the specified element from this multiset. If the multiset
273 * contains fewer than this number of occurrences to begin with, all occurrences will be removed.
274 *
275 * @param element the element whose occurrences should be removed
276 * @param occurrences the number of occurrences of the element to remove
277 * @return the count of the element before the operation; possibly zero
278 * @throws IllegalArgumentException if {@code occurrences} is negative
279 */
280 @Override public int remove(@Nullable Object element, int occurrences) {
281 if (occurrences == 0) {
282 return count(element);
283 }
284 checkArgument(occurrences > 0, "Invalid occurrences: %s", occurrences);
285
286 AtomicInteger existingCounter = safeGet(element);
287 if (existingCounter == null) {
288 return 0;
289 }
290 while (true) {
291 int oldValue = existingCounter.get();
292 if (oldValue != 0) {
293 int newValue = Math.max(0, oldValue - occurrences);
294 if (existingCounter.compareAndSet(oldValue, newValue)) {
295 if (newValue == 0) {
296 // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
297 // another thread has already replaced it with a new counter, which is fine.
298 countMap.remove(element, existingCounter);
299 }
300 return oldValue;
301 }
302 } else {
303 return 0;
304 }
305 }
306 }
307
308 /**
309 * Removes exactly the specified number of occurrences of {@code element}, or makes no
310 * change if this is not possible.
311 *
312 * <p>This method, in contrast to {@link #remove(Object, int)}, has no effect when the
313 * element count is smaller than {@code occurrences}.
314 *
315 * @param element the element to remove
316 * @param occurrences the number of occurrences of {@code element} to remove
317 * @return {@code true} if the removal was possible (including if {@code occurrences} is zero)
318 */
319 public boolean removeExactly(@Nullable Object element, int occurrences) {
320 if (occurrences == 0) {
321 return true;
322 }
323 checkArgument(occurrences > 0, "Invalid occurrences: %s", occurrences);
324
325 AtomicInteger existingCounter = safeGet(element);
326 if (existingCounter == null) {
327 return false;
328 }
329 while (true) {
330 int oldValue = existingCounter.get();
331 if (oldValue < occurrences) {
332 return false;
333 }
334 int newValue = oldValue - occurrences;
335 if (existingCounter.compareAndSet(oldValue, newValue)) {
336 if (newValue == 0) {
337 // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
338 // another thread has already replaced it with a new counter, which is fine.
339 countMap.remove(element, existingCounter);
340 }
341 return true;
342 }
343 }
344 }
345
346 /**
347 * Adds or removes occurrences of {@code element} such that the {@link #count} of the
348 * element becomes {@code count}.
349 *
350 * @return the count of {@code element} in the multiset before this call
351 * @throws IllegalArgumentException if {@code count} is negative
352 */
353 @Override public int setCount(E element, int count) {
354 checkNonnegative(count, "count");
355 while (true) {
356 AtomicInteger existingCounter = safeGet(element);
357 if (existingCounter == null) {
358 if (count == 0) {
359 return 0;
360 } else {
361 existingCounter = countMap.putIfAbsent(element, new AtomicInteger(count));
362 if (existingCounter == null) {
363 return 0;
364 }
365 // existingCounter != null: fall through
366 }
367 }
368
369 while (true) {
370 int oldValue = existingCounter.get();
371 if (oldValue == 0) {
372 if (count == 0) {
373 return 0;
374 } else {
375 AtomicInteger newCounter = new AtomicInteger(count);
376 if ((countMap.putIfAbsent(element, newCounter) == null)
377 || countMap.replace(element, existingCounter, newCounter)) {
378 return 0;
379 }
380 }
381 break;
382 } else {
383 if (existingCounter.compareAndSet(oldValue, count)) {
384 if (count == 0) {
385 // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
386 // another thread has already replaced it with a new counter, which is fine.
387 countMap.remove(element, existingCounter);
388 }
389 return oldValue;
390 }
391 }
392 }
393 }
394 }
395
396 /**
397 * Sets the number of occurrences of {@code element} to {@code newCount}, but only if
398 * the count is currently {@code expectedOldCount}. If {@code element} does not appear
399 * in the multiset exactly {@code expectedOldCount} times, no changes will be made.
400 *
401 * @return {@code true} if the change was successful. This usually indicates
402 * that the multiset has been modified, but not always: in the case that
403 * {@code expectedOldCount == newCount}, the method will return {@code true} if
404 * the condition was met.
405 * @throws IllegalArgumentException if {@code expectedOldCount} or {@code newCount} is negative
406 */
407 @Override public boolean setCount(E element, int expectedOldCount, int newCount) {
408 checkNonnegative(expectedOldCount, "oldCount");
409 checkNonnegative(newCount, "newCount");
410
411 AtomicInteger existingCounter = safeGet(element);
412 if (existingCounter == null) {
413 if (expectedOldCount != 0) {
414 return false;
415 } else if (newCount == 0) {
416 return true;
417 } else {
418 // if our write lost the race, it must have lost to a nonzero value, so we can stop
419 return countMap.putIfAbsent(element, new AtomicInteger(newCount)) == null;
420 }
421 }
422 int oldValue = existingCounter.get();
423 if (oldValue == expectedOldCount) {
424 if (oldValue == 0) {
425 if (newCount == 0) {
426 // Just observed a 0; try to remove the entry to clean up the map
427 countMap.remove(element, existingCounter);
428 return true;
429 } else {
430 AtomicInteger newCounter = new AtomicInteger(newCount);
431 return (countMap.putIfAbsent(element, newCounter) == null)
432 || countMap.replace(element, existingCounter, newCounter);
433 }
434 } else {
435 if (existingCounter.compareAndSet(oldValue, newCount)) {
436 if (newCount == 0) {
437 // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
438 // another thread has already replaced it with a new counter, which is fine.
439 countMap.remove(element, existingCounter);
440 }
441 return true;
442 }
443 }
444 }
445 return false;
446 }
447
448 // Views
449
450 @Override Set<E> createElementSet() {
451 final Set<E> delegate = countMap.keySet();
452 return new ForwardingSet<E>() {
453 @Override protected Set<E> delegate() {
454 return delegate;
455 }
456 @Override public boolean remove(Object object) {
457 try {
458 return delegate.remove(object);
459 } catch (NullPointerException e) {
460 return false;
461 } catch (ClassCastException e) {
462 return false;
463 }
464 }
465 @Override public boolean removeAll(Collection<?> c) {
466 return standardRemoveAll(c);
467 }
468 };
469 }
470
471 private transient EntrySet entrySet;
472
473 @Override public Set<Multiset.Entry<E>> entrySet() {
474 EntrySet result = entrySet;
475 if (result == null) {
476 entrySet = result = new EntrySet();
477 }
478 return result;
479 }
480
481 @Override int distinctElements() {
482 return countMap.size();
483 }
484
485 @Override public boolean isEmpty() {
486 return countMap.isEmpty();
487 }
488
489 @Override Iterator<Entry<E>> entryIterator() {
490 // AbstractIterator makes this fairly clean, but it doesn't support remove(). To support
491 // remove(), we create an AbstractIterator, and then use ForwardingIterator to delegate to it.
492 final Iterator<Entry<E>> readOnlyIterator =
493 new AbstractIterator<Entry<E>>() {
494 private Iterator<Map.Entry<E, AtomicInteger>> mapEntries = countMap.entrySet().iterator();
495
496 @Override protected Entry<E> computeNext() {
497 while (true) {
498 if (!mapEntries.hasNext()) {
499 return endOfData();
500 }
501 Map.Entry<E, AtomicInteger> mapEntry = mapEntries.next();
502 int count = mapEntry.getValue().get();
503 if (count != 0) {
504 return Multisets.immutableEntry(mapEntry.getKey(), count);
505 }
506 }
507 }
508 };
509
510 return new ForwardingIterator<Entry<E>>() {
511 private Entry<E> last;
512
513 @Override protected Iterator<Entry<E>> delegate() {
514 return readOnlyIterator;
515 }
516
517 @Override public Entry<E> next() {
518 last = super.next();
519 return last;
520 }
521
522 @Override public void remove() {
523 checkState(last != null);
524 ConcurrentHashMultiset.this.setCount(last.getElement(), 0);
525 last = null;
526 }
527 };
528 }
529
530 @Override public void clear() {
531 countMap.clear();
532 }
533
534 private class EntrySet extends AbstractMultiset<E>.EntrySet {
535 @Override ConcurrentHashMultiset<E> multiset() {
536 return ConcurrentHashMultiset.this;
537 }
538
539 /*
540 * Note: the superclass toArray() methods assume that size() gives a correct
541 * answer, which ours does not.
542 */
543
544 @Override public Object[] toArray() {
545 return snapshot().toArray();
546 }
547
548 @Override public <T> T[] toArray(T[] array) {
549 return snapshot().toArray(array);
550 }
551
552 private List<Multiset.Entry<E>> snapshot() {
553 List<Multiset.Entry<E>> list = Lists.newArrayListWithExpectedSize(size());
554 // Not Iterables.addAll(list, this), because that'll forward right back here.
555 Iterators.addAll(list, iterator());
556 return list;
557 }
558
559 @Override public boolean remove(Object object) {
560 if (object instanceof Multiset.Entry) {
561 Multiset.Entry<?> entry = (Multiset.Entry<?>) object;
562 Object element = entry.getElement();
563 int entryCount = entry.getCount();
564 if (entryCount != 0) {
565 // Safe as long as we never add a new entry, which we won't.
566 @SuppressWarnings("unchecked")
567 Multiset<Object> multiset = (Multiset) multiset();
568 return multiset.setCount(element, entryCount, 0);
569 }
570 }
571 return false;
572 }
573 }
574
575 /**
576 * @serialData the ConcurrentMap of elements and their counts.
577 */
578 private void writeObject(ObjectOutputStream stream) throws IOException {
579 stream.defaultWriteObject();
580 stream.writeObject(countMap);
581 }
582
583 private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
584 stream.defaultReadObject();
585 @SuppressWarnings("unchecked") // reading data stored by writeObject
586 ConcurrentMap<E, Integer> deserializedCountMap =
587 (ConcurrentMap<E, Integer>) stream.readObject();
588 FieldSettersHolder.COUNT_MAP_FIELD_SETTER.set(this, deserializedCountMap);
589 }
590
591 private static final long serialVersionUID = 1;
592 }