001 /*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017 package com.google.common.collect;
018
019 import static com.google.common.base.Preconditions.checkArgument;
020 import static com.google.common.base.Preconditions.checkState;
021 import static com.google.common.collect.Multisets.checkNonnegative;
022
023 import com.google.common.annotations.Beta;
024 import com.google.common.annotations.VisibleForTesting;
025 import com.google.common.collect.Serialization.FieldSetter;
026 import com.google.common.primitives.Ints;
027
028 import java.io.IOException;
029 import java.io.ObjectInputStream;
030 import java.io.ObjectOutputStream;
031 import java.io.Serializable;
032 import java.util.Iterator;
033 import java.util.List;
034 import java.util.Map;
035 import java.util.Set;
036 import java.util.concurrent.ConcurrentHashMap;
037 import java.util.concurrent.ConcurrentMap;
038 import java.util.concurrent.atomic.AtomicInteger;
039
040 import javax.annotation.Nullable;
041
042 /**
043 * A multiset that supports concurrent modifications and that provides atomic
044 * versions of most {@code Multiset} operations (exceptions where noted). Null
045 * elements are not supported.
046 *
047 * @author Cliff L. Biffle
048 * @author mike nonemacher
049 * @since 2.0 (imported from Google Collections Library)
050 */
051 public final class ConcurrentHashMultiset<E> extends AbstractMultiset<E> implements Serializable {
052
053 /*
054 * The ConcurrentHashMultiset's atomic operations are implemented primarily in terms of
055 * AtomicInteger's atomic operations, with some help from ConcurrentMap's atomic operations on
056 * creation and removal (including automatic removal of zeroes). If the modification of an
057 * AtomicInteger results in zero, we compareAndSet the value to zero; if that succeeds, we remove
058 * the entry from the Map. If another operation sees a zero in the map, it knows that the entry is
059 * about to be removed, so this operation may remove it (often by replacing it with a new
060 * AtomicInteger).
061 */
062
063 /** The number of occurrences of each element. */
064 private final transient ConcurrentMap<E, AtomicInteger> countMap;
065
066 // This constant allows the deserialization code to set a final field. This
067 // holder class makes sure it is not initialized unless an instance is
068 // deserialized.
069 private static class FieldSettersHolder {
070 static final FieldSetter<ConcurrentHashMultiset> COUNT_MAP_FIELD_SETTER =
071 Serialization.getFieldSetter(ConcurrentHashMultiset.class, "countMap");
072 }
073
074 /**
075 * Creates a new, empty {@code ConcurrentHashMultiset} using the default
076 * initial capacity, load factor, and concurrency settings.
077 */
078 public static <E> ConcurrentHashMultiset<E> create() {
079 // TODO(schmoe): provide a way to use this class with other (possibly arbitrary)
080 // ConcurrentMap implementors. One possibility is to extract most of this class into
081 // an AbstractConcurrentMapMultiset.
082 return new ConcurrentHashMultiset<E>(new ConcurrentHashMap<E, AtomicInteger>());
083 }
084
085 /**
086 * Creates a new {@code ConcurrentHashMultiset} containing the specified
087 * elements, using the default initial capacity, load factor, and concurrency
088 * settings.
089 *
090 * <p>This implementation is highly efficient when {@code elements} is itself
091 * a {@link Multiset}.
092 *
093 * @param elements the elements that the multiset should contain
094 */
095 public static <E> ConcurrentHashMultiset<E> create(Iterable<? extends E> elements) {
096 ConcurrentHashMultiset<E> multiset = ConcurrentHashMultiset.create();
097 Iterables.addAll(multiset, elements);
098 return multiset;
099 }
100
101 /**
102 * Creates a new, empty {@code ConcurrentHashMultiset} using {@code mapMaker}
103 * to construct the internal backing map.
104 *
105 * <p>If this {@link MapMaker} is configured to use entry eviction of any kind, this eviction
106 * applies to all occurrences of a given element as a single unit. However, most updates to the
107 * multiset do not count as map updates at all, since we're usually just mutating the value
108 * stored in the map, so {@link MapMaker#expireAfterAccess} makes sense (evict the entry that
109 * was queried or updated longest ago), but {@link MapMaker#expireAfterWrite} doesn't, because
110 * the eviction time is measured from when we saw the first occurrence of the object.
111 *
112 * <p>The returned multiset is serializable but any serialization caveats
113 * given in {@code MapMaker} apply.
114 *
115 * <p>Finally, soft/weak values can be used but are not very useful: the values are created
116 * internally and not exposed externally, so no one else will have a strong reference to the
117 * values. Weak keys on the other hand can be useful in some scenarios.
118 *
119 * @since 7.0
120 */
121 @Beta
122 public static <E> ConcurrentHashMultiset<E> create(
123 GenericMapMaker<? super E, ? super Number> mapMaker) {
124 return new ConcurrentHashMultiset<E>(mapMaker.<E, AtomicInteger>makeMap());
125 }
126
127 /**
128 * Creates an instance using {@code countMap} to store elements and their
129 * counts.
130 *
131 * <p>This instance will assume ownership of {@code countMap}, and other code
132 * should not maintain references to the map or modify it in any way.
133 *
134 * @param countMap backing map for storing the elements in the multiset and
135 * their counts. It must be empty.
136 * @throws IllegalArgumentException if {@code countMap} is not empty
137 */
138 @VisibleForTesting ConcurrentHashMultiset(ConcurrentMap<E, AtomicInteger> countMap) {
139 checkArgument(countMap.isEmpty());
140 this.countMap = countMap;
141 }
142
143 // Query Operations
144
145 /**
146 * Returns the number of occurrences of {@code element} in this multiset.
147 *
148 * @param element the element to look for
149 * @return the nonnegative number of occurrences of the element
150 */
151 @Override public int count(@Nullable Object element) {
152 AtomicInteger existingCounter = safeGet(element);
153 return (existingCounter == null) ? 0 : existingCounter.get();
154 }
155
156 /**
157 * Depending on the type of the underlying map, map.get may throw NullPointerException or
158 * ClassCastException, if the object is null or of the wrong type. We usually just want to treat
159 * those cases as if the element isn't in the map, by catching the exceptions and returning null.
160 */
161 private AtomicInteger safeGet(Object element) {
162 try {
163 return countMap.get(element);
164 } catch (NullPointerException e) {
165 return null;
166 } catch (ClassCastException e) {
167 return null;
168 }
169 }
170
171 /**
172 * {@inheritDoc}
173 *
174 * <p>If the data in the multiset is modified by any other threads during this
175 * method, it is undefined which (if any) of these modifications will be
176 * reflected in the result.
177 */
178 @Override public int size() {
179 long sum = 0L;
180 for (AtomicInteger value : countMap.values()) {
181 sum += value.get();
182 }
183 return Ints.saturatedCast(sum);
184 }
185
186 /*
187 * Note: the superclass toArray() methods assume that size() gives a correct
188 * answer, which ours does not.
189 */
190
191 @Override public Object[] toArray() {
192 return snapshot().toArray();
193 }
194
195 @Override public <T> T[] toArray(T[] array) {
196 return snapshot().toArray(array);
197 }
198
199 /*
200 * We'd love to use 'new ArrayList(this)' or 'list.addAll(this)', but
201 * either of these would recurse back to us again!
202 */
203 private List<E> snapshot() {
204 List<E> list = Lists.newArrayListWithExpectedSize(size());
205 for (Multiset.Entry<E> entry : entrySet()) {
206 E element = entry.getElement();
207 for (int i = entry.getCount(); i > 0; i--) {
208 list.add(element);
209 }
210 }
211 return list;
212 }
213
214 // Modification Operations
215
216 /**
217 * Adds a number of occurrences of the specified element to this multiset.
218 *
219 * @param element the element to add
220 * @param occurrences the number of occurrences to add
221 * @return the previous count of the element before the operation; possibly
222 * zero
223 * @throws IllegalArgumentException if {@code occurrences} is negative, or if
224 * the resulting amount would exceed {@link Integer#MAX_VALUE}
225 */
226 @Override public int add(E element, int occurrences) {
227 if (occurrences == 0) {
228 return count(element);
229 }
230 checkArgument(occurrences > 0, "Invalid occurrences: %s", occurrences);
231
232 while (true) {
233 AtomicInteger existingCounter = safeGet(element);
234 if (existingCounter == null) {
235 existingCounter = countMap.putIfAbsent(element, new AtomicInteger(occurrences));
236 if (existingCounter == null) {
237 return 0;
238 }
239 // existingCounter != null: fall through to operate against the existing AtomicInteger
240 }
241
242 while (true) {
243 int oldValue = existingCounter.get();
244 if (oldValue != 0) {
245 checkArgument(occurrences <= Integer.MAX_VALUE - oldValue,
246 "Overflow adding %s occurrences to a count of %s",
247 occurrences, oldValue);
248 int newValue = oldValue + occurrences;
249 if (existingCounter.compareAndSet(oldValue, newValue)) {
250 // newValue can't == 0, so no need to check & remove
251 return oldValue;
252 }
253 } else {
254 // In the case of a concurrent remove, we might observe a zero value, which means another
255 // thread is about to remove (element, existingCounter) from the map. Rather than wait,
256 // we can just do that work here.
257 AtomicInteger newCounter = new AtomicInteger(occurrences);
258 if ((countMap.putIfAbsent(element, newCounter) == null)
259 || countMap.replace(element, existingCounter, newCounter)) {
260 return 0;
261 }
262 break;
263 }
264 }
265
266 // If we're still here, there was a race, so just try again.
267 }
268 }
269
270 /**
271 * Removes a number of occurrences of the specified element from this
272 * multiset. If the multiset contains fewer than this number of occurrences to
273 * begin with, all occurrences will be removed.
274 *
275 * @param element the element whose occurrences should be removed
276 * @param occurrences the number of occurrences of the element to remove
277 * @return the count of the element before the operation; possibly zero
278 * @throws IllegalArgumentException if {@code occurrences} is negative
279 */
280 @Override public int remove(@Nullable Object element, int occurrences) {
281 if (occurrences == 0) {
282 return count(element);
283 }
284 checkArgument(occurrences > 0, "Invalid occurrences: %s", occurrences);
285
286 AtomicInteger existingCounter = safeGet(element);
287 if (existingCounter == null) {
288 return 0;
289 }
290 while (true) {
291 int oldValue = existingCounter.get();
292 if (oldValue != 0) {
293 int newValue = Math.max(0, oldValue - occurrences);
294 if (existingCounter.compareAndSet(oldValue, newValue)) {
295 if (newValue == 0) {
296 // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
297 // another thread has already replaced it with a new counter, which is fine.
298 countMap.remove(element, existingCounter);
299 }
300 return oldValue;
301 }
302 } else {
303 return 0;
304 }
305 }
306 }
307
308 /**
309 * Removes exactly the specified number of occurrences of {@code element}, or
310 * makes no change if this is not possible.
311 *
312 * <p>This method, in contrast to {@link #remove(Object, int)}, has no effect
313 * when the element count is smaller than {@code occurrences}.
314 *
315 * @param element the element to remove
316 * @param occurrences the number of occurrences of {@code element} to remove
317 * @return {@code true} if the removal was possible (including if {@code
318 * occurrences} is zero)
319 */
320 public boolean removeExactly(@Nullable Object element, int occurrences) {
321 if (occurrences == 0) {
322 return true;
323 }
324 checkArgument(occurrences > 0, "Invalid occurrences: %s", occurrences);
325
326 AtomicInteger existingCounter = safeGet(element);
327 if (existingCounter == null) {
328 return false;
329 }
330 while (true) {
331 int oldValue = existingCounter.get();
332 if (oldValue < occurrences) {
333 return false;
334 }
335 int newValue = oldValue - occurrences;
336 if (existingCounter.compareAndSet(oldValue, newValue)) {
337 if (newValue == 0) {
338 // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
339 // another thread has already replaced it with a new counter, which is fine.
340 countMap.remove(element, existingCounter);
341 }
342 return true;
343 }
344 }
345 }
346
347 /**
348 * Adds or removes occurrences of {@code element} such that the {@link #count}
349 * of the element becomes {@code count}.
350 *
351 * @return the count of {@code element} in the multiset before this call
352 * @throws IllegalArgumentException if {@code count} is negative
353 */
354 @Override public int setCount(E element, int count) {
355 checkNonnegative(count, "count");
356 while (true) {
357 AtomicInteger existingCounter = safeGet(element);
358 if (existingCounter == null) {
359 if (count == 0) {
360 return 0;
361 } else {
362 existingCounter = countMap.putIfAbsent(element, new AtomicInteger(count));
363 if (existingCounter == null) {
364 return 0;
365 }
366 // existingCounter != null: fall through
367 }
368 }
369
370 while (true) {
371 int oldValue = existingCounter.get();
372 if (oldValue == 0) {
373 if (count == 0) {
374 return 0;
375 } else {
376 AtomicInteger newCounter = new AtomicInteger(count);
377 if ((countMap.putIfAbsent(element, newCounter) == null)
378 || countMap.replace(element, existingCounter, newCounter)) {
379 return 0;
380 }
381 }
382 break;
383 } else {
384 if (existingCounter.compareAndSet(oldValue, count)) {
385 if (count == 0) {
386 // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
387 // another thread has already replaced it with a new counter, which is fine.
388 countMap.remove(element, existingCounter);
389 }
390 return oldValue;
391 }
392 }
393 }
394 }
395 }
396
397 /**
398 * Sets the number of occurrences of {@code element} to {@code newCount}, but
399 * only if the count is currently {@code expectedOldCount}. If {@code element} does
400 * not appear in the multiset exactly {@code expectedOldCount} times, no changes will
401 * be made.
402 *
403 * @return {@code true} if the change was successful. This usually indicates
404 * that the multiset has been modified, but not always: in the case that
405 * {@code expectedOldCount == newCount}, the method will return {@code true} if
406 * the condition was met.
407 * @throws IllegalArgumentException if {@code expectedOldCount} or {@code newCount} is negative
408 */
409 @Override public boolean setCount(E element, int expectedOldCount, int newCount) {
410 checkNonnegative(expectedOldCount, "oldCount");
411 checkNonnegative(newCount, "newCount");
412
413 AtomicInteger existingCounter = safeGet(element);
414 if (existingCounter == null) {
415 if (expectedOldCount != 0) {
416 return false;
417 } else if (newCount == 0) {
418 return true;
419 } else {
420 // if our write lost the race, it must have lost to a nonzero value, so we can stop
421 return countMap.putIfAbsent(element, new AtomicInteger(newCount)) == null;
422 }
423 }
424 int oldValue = existingCounter.get();
425 if (oldValue == expectedOldCount) {
426 if (oldValue == 0) {
427 if (newCount == 0) {
428 // Just observed a 0; try to remove the entry to clean up the map
429 countMap.remove(element, existingCounter);
430 return true;
431 } else {
432 AtomicInteger newCounter = new AtomicInteger(newCount);
433 return (countMap.putIfAbsent(element, newCounter) == null)
434 || countMap.replace(element, existingCounter, newCounter);
435 }
436 } else {
437 if (existingCounter.compareAndSet(oldValue, newCount)) {
438 if (newCount == 0) {
439 // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
440 // another thread has already replaced it with a new counter, which is fine.
441 countMap.remove(element, existingCounter);
442 }
443 return true;
444 }
445 }
446 }
447 return false;
448 }
449
450 // Views
451
452 @Override Set<E> createElementSet() {
453 final Set<E> delegate = countMap.keySet();
454 return new ForwardingSet<E>() {
455 @Override protected Set<E> delegate() {
456 return delegate;
457 }
458 @Override public boolean remove(Object object) {
459 try {
460 return delegate.remove(object);
461 } catch (NullPointerException e) {
462 return false;
463 } catch (ClassCastException e) {
464 return false;
465 }
466 }
467 };
468 }
469
470 private transient EntrySet entrySet;
471
472 @Override public Set<Multiset.Entry<E>> entrySet() {
473 EntrySet result = entrySet;
474 if (result == null) {
475 entrySet = result = new EntrySet();
476 }
477 return result;
478 }
479
480 @Override int distinctElements() {
481 return countMap.size();
482 }
483
484 @Override public boolean isEmpty() {
485 return countMap.isEmpty();
486 }
487
488 @Override Iterator<Entry<E>> entryIterator() {
489 // AbstractIterator makes this fairly clean, but it doesn't support remove(). To support
490 // remove(), we create an AbstractIterator, and then use ForwardingIterator to delegate to it.
491 final Iterator<Entry<E>> readOnlyIterator =
492 new AbstractIterator<Entry<E>>() {
493 private Iterator<Map.Entry<E, AtomicInteger>> mapEntries = countMap.entrySet().iterator();
494
495 @Override protected Entry<E> computeNext() {
496 while (true) {
497 if (!mapEntries.hasNext()) {
498 return endOfData();
499 }
500 Map.Entry<E, AtomicInteger> mapEntry = mapEntries.next();
501 int count = mapEntry.getValue().get();
502 if (count != 0) {
503 return Multisets.immutableEntry(mapEntry.getKey(), count);
504 }
505 }
506 }
507 };
508
509 return new ForwardingIterator<Entry<E>>() {
510 private Entry<E> last;
511
512 @Override protected Iterator<Entry<E>> delegate() {
513 return readOnlyIterator;
514 }
515
516 @Override public Entry<E> next() {
517 last = super.next();
518 return last;
519 }
520
521 @Override public void remove() {
522 checkState(last != null);
523 ConcurrentHashMultiset.this.setCount(last.getElement(), 0);
524 last = null;
525 }
526 };
527 }
528
529 @Override public void clear() {
530 countMap.clear();
531 }
532
533 private class EntrySet extends AbstractMultiset<E>.EntrySet {
534 @Override ConcurrentHashMultiset<E> multiset() {
535 return ConcurrentHashMultiset.this;
536 }
537
538 /*
539 * Note: the superclass toArray() methods assume that size() gives a correct
540 * answer, which ours does not.
541 */
542
543 @Override public Object[] toArray() {
544 return snapshot().toArray();
545 }
546
547 @Override public <T> T[] toArray(T[] array) {
548 return snapshot().toArray(array);
549 }
550
551 private List<Multiset.Entry<E>> snapshot() {
552 List<Multiset.Entry<E>> list = Lists.newArrayListWithExpectedSize(size());
553 // Not Iterables.addAll(list, this), because that'll forward right back here.
554 Iterators.addAll(list, iterator());
555 return list;
556 }
557
558 @Override public boolean remove(Object object) {
559 if (object instanceof Multiset.Entry) {
560 Multiset.Entry<?> entry = (Multiset.Entry<?>) object;
561 Object element = entry.getElement();
562 int entryCount = entry.getCount();
563 if (entryCount != 0) {
564 // Safe as long as we never add a new entry, which we won't.
565 @SuppressWarnings("unchecked")
566 Multiset<Object> multiset = (Multiset) multiset();
567 return multiset.setCount(element, entryCount, 0);
568 }
569 }
570 return false;
571 }
572 }
573
574 /**
575 * @serialData the ConcurrentMap of elements and their counts.
576 */
577 private void writeObject(ObjectOutputStream stream) throws IOException {
578 stream.defaultWriteObject();
579 stream.writeObject(countMap);
580 }
581
582 private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
583 stream.defaultReadObject();
584 @SuppressWarnings("unchecked") // reading data stored by writeObject
585 ConcurrentMap<E, Integer> deserializedCountMap =
586 (ConcurrentMap<E, Integer>) stream.readObject();
587 FieldSettersHolder.COUNT_MAP_FIELD_SETTER.set(this, deserializedCountMap);
588 }
589
590 private static final long serialVersionUID = 1;
591 }
592