001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.collect;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.collect.CollectPreconditions.checkNonnegative;
023
024import com.google.common.annotations.Beta;
025import com.google.common.annotations.GwtIncompatible;
026import com.google.common.annotations.VisibleForTesting;
027import com.google.common.collect.Serialization.FieldSetter;
028import com.google.common.math.IntMath;
029import com.google.common.primitives.Ints;
030import com.google.errorprone.annotations.CanIgnoreReturnValue;
031import com.google.j2objc.annotations.WeakOuter;
032import java.io.IOException;
033import java.io.ObjectInputStream;
034import java.io.ObjectOutputStream;
035import java.io.Serializable;
036import java.util.Collection;
037import java.util.Iterator;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ConcurrentMap;
043import java.util.concurrent.atomic.AtomicInteger;
044import javax.annotation.CheckForNull;
045import org.checkerframework.checker.nullness.qual.Nullable;
046
047/**
048 * A multiset that supports concurrent modifications and that provides atomic versions of most
049 * {@code Multiset} operations (exceptions where noted). Null elements are not supported.
050 *
051 * <p>See the Guava User Guide article on <a href=
052 * "https://github.com/google/guava/wiki/NewCollectionTypesExplained#multiset"> {@code
053 * Multiset}</a>.
054 *
055 * @author Cliff L. Biffle
056 * @author mike nonemacher
057 * @since 2.0
058 */
059@GwtIncompatible
060@ElementTypesAreNonnullByDefault
061public final class ConcurrentHashMultiset<E> extends AbstractMultiset<E> implements Serializable {
062
063  /*
064   * The ConcurrentHashMultiset's atomic operations are implemented primarily in terms of
065   * AtomicInteger's atomic operations, with some help from ConcurrentMap's atomic operations on
066   * creation and removal (including automatic removal of zeroes). If the modification of an
067   * AtomicInteger results in zero, we compareAndSet the value to zero; if that succeeds, we remove
068   * the entry from the Map. If another operation sees a zero in the map, it knows that the entry is
069   * about to be removed, so this operation may remove it (often by replacing it with a new
070   * AtomicInteger).
071   */
072
073  /** The number of occurrences of each element. */
074  private final transient ConcurrentMap<E, AtomicInteger> countMap;
075
076  // This constant allows the deserialization code to set a final field. This holder class
077  // makes sure it is not initialized unless an instance is deserialized.
078  private static class FieldSettersHolder {
079    static final FieldSetter<ConcurrentHashMultiset> COUNT_MAP_FIELD_SETTER =
080        Serialization.getFieldSetter(ConcurrentHashMultiset.class, "countMap");
081  }
082
083  /**
084   * Creates a new, empty {@code ConcurrentHashMultiset} using the default initial capacity, load
085   * factor, and concurrency settings.
086   */
087  public static <E> ConcurrentHashMultiset<E> create() {
088    // TODO(schmoe): provide a way to use this class with other (possibly arbitrary)
089    // ConcurrentMap implementors. One possibility is to extract most of this class into
090    // an AbstractConcurrentMapMultiset.
091    return new ConcurrentHashMultiset<E>(new ConcurrentHashMap<E, AtomicInteger>());
092  }
093
094  /**
095   * Creates a new {@code ConcurrentHashMultiset} containing the specified elements, using the
096   * default initial capacity, load factor, and concurrency settings.
097   *
098   * <p>This implementation is highly efficient when {@code elements} is itself a {@link Multiset}.
099   *
100   * @param elements the elements that the multiset should contain
101   */
102  public static <E> ConcurrentHashMultiset<E> create(Iterable<? extends E> elements) {
103    ConcurrentHashMultiset<E> multiset = ConcurrentHashMultiset.create();
104    Iterables.addAll(multiset, elements);
105    return multiset;
106  }
107
108  /**
109   * Creates a new, empty {@code ConcurrentHashMultiset} using {@code countMap} as the internal
110   * backing map.
111   *
112   * <p>This instance will assume ownership of {@code countMap}, and other code should not maintain
113   * references to the map or modify it in any way.
114   *
115   * <p>The returned multiset is serializable if the input map is.
116   *
117   * @param countMap backing map for storing the elements in the multiset and their counts. It must
118   *     be empty.
119   * @throws IllegalArgumentException if {@code countMap} is not empty
120   * @since 20.0
121   */
122  @Beta
123  public static <E> ConcurrentHashMultiset<E> create(ConcurrentMap<E, AtomicInteger> countMap) {
124    return new ConcurrentHashMultiset<E>(countMap);
125  }
126
127  @VisibleForTesting
128  ConcurrentHashMultiset(ConcurrentMap<E, AtomicInteger> countMap) {
129    checkArgument(countMap.isEmpty(), "the backing map (%s) must be empty", countMap);
130    this.countMap = countMap;
131  }
132
133  // Query Operations
134
135  /**
136   * Returns the number of occurrences of {@code element} in this multiset.
137   *
138   * @param element the element to look for
139   * @return the nonnegative number of occurrences of the element
140   */
141  @Override
142  public int count(@CheckForNull Object element) {
143    AtomicInteger existingCounter = Maps.safeGet(countMap, element);
144    return (existingCounter == null) ? 0 : existingCounter.get();
145  }
146
147  /**
148   * {@inheritDoc}
149   *
150   * <p>If the data in the multiset is modified by any other threads during this method, it is
151   * undefined which (if any) of these modifications will be reflected in the result.
152   */
153  @Override
154  public int size() {
155    long sum = 0L;
156    for (AtomicInteger value : countMap.values()) {
157      sum += value.get();
158    }
159    return Ints.saturatedCast(sum);
160  }
161
162  /*
163   * Note: the superclass toArray() methods assume that size() gives a correct
164   * answer, which ours does not.
165   */
166
167  @Override
168  public Object[] toArray() {
169    return snapshot().toArray();
170  }
171
172  @Override
173  @SuppressWarnings("nullness") // b/192354773 in our checker affects toArray declarations
174  public <T extends @Nullable Object> T[] toArray(T[] array) {
175    return snapshot().toArray(array);
176  }
177
178  /*
179   * We'd love to use 'new ArrayList(this)' or 'list.addAll(this)', but
180   * either of these would recurse back to us again!
181   */
182  private List<E> snapshot() {
183    List<E> list = Lists.newArrayListWithExpectedSize(size());
184    for (Multiset.Entry<E> entry : entrySet()) {
185      E element = entry.getElement();
186      for (int i = entry.getCount(); i > 0; i--) {
187        list.add(element);
188      }
189    }
190    return list;
191  }
192
193  // Modification Operations
194
195  /**
196   * Adds a number of occurrences of the specified element to this multiset.
197   *
198   * @param element the element to add
199   * @param occurrences the number of occurrences to add
200   * @return the previous count of the element before the operation; possibly zero
201   * @throws IllegalArgumentException if {@code occurrences} is negative, or if the resulting amount
202   *     would exceed {@link Integer#MAX_VALUE}
203   */
204  @CanIgnoreReturnValue
205  @Override
206  public int add(E element, int occurrences) {
207    checkNotNull(element);
208    if (occurrences == 0) {
209      return count(element);
210    }
211    CollectPreconditions.checkPositive(occurrences, "occurrences");
212
213    while (true) {
214      AtomicInteger existingCounter = Maps.safeGet(countMap, element);
215      if (existingCounter == null) {
216        existingCounter = countMap.putIfAbsent(element, new AtomicInteger(occurrences));
217        if (existingCounter == null) {
218          return 0;
219        }
220        // existingCounter != null: fall through to operate against the existing AtomicInteger
221      }
222
223      while (true) {
224        int oldValue = existingCounter.get();
225        if (oldValue != 0) {
226          try {
227            int newValue = IntMath.checkedAdd(oldValue, occurrences);
228            if (existingCounter.compareAndSet(oldValue, newValue)) {
229              // newValue can't == 0, so no need to check & remove
230              return oldValue;
231            }
232          } catch (ArithmeticException overflow) {
233            throw new IllegalArgumentException(
234                "Overflow adding " + occurrences + " occurrences to a count of " + oldValue);
235          }
236        } else {
237          // In the case of a concurrent remove, we might observe a zero value, which means another
238          // thread is about to remove (element, existingCounter) from the map. Rather than wait,
239          // we can just do that work here.
240          AtomicInteger newCounter = new AtomicInteger(occurrences);
241          if ((countMap.putIfAbsent(element, newCounter) == null)
242              || countMap.replace(element, existingCounter, newCounter)) {
243            return 0;
244          }
245          break;
246        }
247      }
248
249      // If we're still here, there was a race, so just try again.
250    }
251  }
252
253  /**
254   * Removes a number of occurrences of the specified element from this multiset. If the multiset
255   * contains fewer than this number of occurrences to begin with, all occurrences will be removed.
256   *
257   * @param element the element whose occurrences should be removed
258   * @param occurrences the number of occurrences of the element to remove
259   * @return the count of the element before the operation; possibly zero
260   * @throws IllegalArgumentException if {@code occurrences} is negative
261   */
262  /*
263   * TODO(cpovirk): remove and removeExactly currently accept null inputs only
264   * if occurrences == 0. This satisfies both NullPointerTester and
265   * CollectionRemoveTester.testRemove_nullAllowed, but it's not clear that it's
266   * a good policy, especially because, in order for the test to pass, the
267   * parameter must be misleadingly annotated as @Nullable. I suspect that
268   * we'll want to remove @Nullable, add an eager checkNotNull, and loosen up
269   * testRemove_nullAllowed.
270   */
271  @CanIgnoreReturnValue
272  @Override
273  public int remove(@CheckForNull Object element, int occurrences) {
274    if (occurrences == 0) {
275      return count(element);
276    }
277    CollectPreconditions.checkPositive(occurrences, "occurrences");
278
279    AtomicInteger existingCounter = Maps.safeGet(countMap, element);
280    if (existingCounter == null) {
281      return 0;
282    }
283    while (true) {
284      int oldValue = existingCounter.get();
285      if (oldValue != 0) {
286        int newValue = Math.max(0, oldValue - occurrences);
287        if (existingCounter.compareAndSet(oldValue, newValue)) {
288          if (newValue == 0) {
289            // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
290            // another thread has already replaced it with a new counter, which is fine.
291            countMap.remove(element, existingCounter);
292          }
293          return oldValue;
294        }
295      } else {
296        return 0;
297      }
298    }
299  }
300
301  /**
302   * Removes exactly the specified number of occurrences of {@code element}, or makes no change if
303   * this is not possible.
304   *
305   * <p>This method, in contrast to {@link #remove(Object, int)}, has no effect when the element
306   * count is smaller than {@code occurrences}.
307   *
308   * @param element the element to remove
309   * @param occurrences the number of occurrences of {@code element} to remove
310   * @return {@code true} if the removal was possible (including if {@code occurrences} is zero)
311   * @throws IllegalArgumentException if {@code occurrences} is negative
312   */
313  @CanIgnoreReturnValue
314  public boolean removeExactly(@CheckForNull Object element, int occurrences) {
315    if (occurrences == 0) {
316      return true;
317    }
318    CollectPreconditions.checkPositive(occurrences, "occurrences");
319
320    AtomicInteger existingCounter = Maps.safeGet(countMap, element);
321    if (existingCounter == null) {
322      return false;
323    }
324    while (true) {
325      int oldValue = existingCounter.get();
326      if (oldValue < occurrences) {
327        return false;
328      }
329      int newValue = oldValue - occurrences;
330      if (existingCounter.compareAndSet(oldValue, newValue)) {
331        if (newValue == 0) {
332          // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
333          // another thread has already replaced it with a new counter, which is fine.
334          countMap.remove(element, existingCounter);
335        }
336        return true;
337      }
338    }
339  }
340
341  /**
342   * Adds or removes occurrences of {@code element} such that the {@link #count} of the element
343   * becomes {@code count}.
344   *
345   * @return the count of {@code element} in the multiset before this call
346   * @throws IllegalArgumentException if {@code count} is negative
347   */
348  @CanIgnoreReturnValue
349  @Override
350  public int setCount(E element, int count) {
351    checkNotNull(element);
352    checkNonnegative(count, "count");
353    while (true) {
354      AtomicInteger existingCounter = Maps.safeGet(countMap, element);
355      if (existingCounter == null) {
356        if (count == 0) {
357          return 0;
358        } else {
359          existingCounter = countMap.putIfAbsent(element, new AtomicInteger(count));
360          if (existingCounter == null) {
361            return 0;
362          }
363          // existingCounter != null: fall through
364        }
365      }
366
367      while (true) {
368        int oldValue = existingCounter.get();
369        if (oldValue == 0) {
370          if (count == 0) {
371            return 0;
372          } else {
373            AtomicInteger newCounter = new AtomicInteger(count);
374            if ((countMap.putIfAbsent(element, newCounter) == null)
375                || countMap.replace(element, existingCounter, newCounter)) {
376              return 0;
377            }
378          }
379          break;
380        } else {
381          if (existingCounter.compareAndSet(oldValue, count)) {
382            if (count == 0) {
383              // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
384              // another thread has already replaced it with a new counter, which is fine.
385              countMap.remove(element, existingCounter);
386            }
387            return oldValue;
388          }
389        }
390      }
391    }
392  }
393
394  /**
395   * Sets the number of occurrences of {@code element} to {@code newCount}, but only if the count is
396   * currently {@code expectedOldCount}. If {@code element} does not appear in the multiset exactly
397   * {@code expectedOldCount} times, no changes will be made.
398   *
399   * @return {@code true} if the change was successful. This usually indicates that the multiset has
400   *     been modified, but not always: in the case that {@code expectedOldCount == newCount}, the
401   *     method will return {@code true} if the condition was met.
402   * @throws IllegalArgumentException if {@code expectedOldCount} or {@code newCount} is negative
403   */
404  @CanIgnoreReturnValue
405  @Override
406  public boolean setCount(E element, int expectedOldCount, int newCount) {
407    checkNotNull(element);
408    checkNonnegative(expectedOldCount, "oldCount");
409    checkNonnegative(newCount, "newCount");
410
411    AtomicInteger existingCounter = Maps.safeGet(countMap, element);
412    if (existingCounter == null) {
413      if (expectedOldCount != 0) {
414        return false;
415      } else if (newCount == 0) {
416        return true;
417      } else {
418        // if our write lost the race, it must have lost to a nonzero value, so we can stop
419        return countMap.putIfAbsent(element, new AtomicInteger(newCount)) == null;
420      }
421    }
422    int oldValue = existingCounter.get();
423    if (oldValue == expectedOldCount) {
424      if (oldValue == 0) {
425        if (newCount == 0) {
426          // Just observed a 0; try to remove the entry to clean up the map
427          countMap.remove(element, existingCounter);
428          return true;
429        } else {
430          AtomicInteger newCounter = new AtomicInteger(newCount);
431          return (countMap.putIfAbsent(element, newCounter) == null)
432              || countMap.replace(element, existingCounter, newCounter);
433        }
434      } else {
435        if (existingCounter.compareAndSet(oldValue, newCount)) {
436          if (newCount == 0) {
437            // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
438            // another thread has already replaced it with a new counter, which is fine.
439            countMap.remove(element, existingCounter);
440          }
441          return true;
442        }
443      }
444    }
445    return false;
446  }
447
448  // Views
449
450  @Override
451  Set<E> createElementSet() {
452    final Set<E> delegate = countMap.keySet();
453    return new ForwardingSet<E>() {
454      @Override
455      protected Set<E> delegate() {
456        return delegate;
457      }
458
459      @Override
460      public boolean contains(@CheckForNull Object object) {
461        return object != null && Collections2.safeContains(delegate, object);
462      }
463
464      @Override
465      public boolean containsAll(Collection<?> collection) {
466        return standardContainsAll(collection);
467      }
468
469      @Override
470      public boolean remove(@CheckForNull Object object) {
471        return object != null && Collections2.safeRemove(delegate, object);
472      }
473
474      @Override
475      public boolean removeAll(Collection<?> c) {
476        return standardRemoveAll(c);
477      }
478    };
479  }
480
481  @Override
482  Iterator<E> elementIterator() {
483    throw new AssertionError("should never be called");
484  }
485
486  /** @deprecated Internal method, use {@link #entrySet()}. */
487  @Deprecated
488  @Override
489  public Set<Multiset.Entry<E>> createEntrySet() {
490    return new EntrySet();
491  }
492
493  @Override
494  int distinctElements() {
495    return countMap.size();
496  }
497
498  @Override
499  public boolean isEmpty() {
500    return countMap.isEmpty();
501  }
502
503  @Override
504  Iterator<Entry<E>> entryIterator() {
505    // AbstractIterator makes this fairly clean, but it doesn't support remove(). To support
506    // remove(), we create an AbstractIterator, and then use ForwardingIterator to delegate to it.
507    final Iterator<Entry<E>> readOnlyIterator =
508        new AbstractIterator<Entry<E>>() {
509          private final Iterator<Map.Entry<E, AtomicInteger>> mapEntries =
510              countMap.entrySet().iterator();
511
512          @Override
513          @CheckForNull
514          protected Entry<E> computeNext() {
515            while (true) {
516              if (!mapEntries.hasNext()) {
517                return endOfData();
518              }
519              Map.Entry<E, AtomicInteger> mapEntry = mapEntries.next();
520              int count = mapEntry.getValue().get();
521              if (count != 0) {
522                return Multisets.immutableEntry(mapEntry.getKey(), count);
523              }
524            }
525          }
526        };
527
528    return new ForwardingIterator<Entry<E>>() {
529      @CheckForNull private Entry<E> last;
530
531      @Override
532      protected Iterator<Entry<E>> delegate() {
533        return readOnlyIterator;
534      }
535
536      @Override
537      public Entry<E> next() {
538        last = super.next();
539        return last;
540      }
541
542      @Override
543      public void remove() {
544        checkState(last != null, "no calls to next() since the last call to remove()");
545        ConcurrentHashMultiset.this.setCount(last.getElement(), 0);
546        last = null;
547      }
548    };
549  }
550
551  @Override
552  public Iterator<E> iterator() {
553    return Multisets.iteratorImpl(this);
554  }
555
556  @Override
557  public void clear() {
558    countMap.clear();
559  }
560
561  @WeakOuter
562  private class EntrySet extends AbstractMultiset<E>.EntrySet {
563    @Override
564    ConcurrentHashMultiset<E> multiset() {
565      return ConcurrentHashMultiset.this;
566    }
567
568    /*
569     * Note: the superclass toArray() methods assume that size() gives a correct
570     * answer, which ours does not.
571     */
572
573    @Override
574    public Object[] toArray() {
575      return snapshot().toArray();
576    }
577
578    @Override
579    @SuppressWarnings("nullness") // b/192354773 in our checker affects toArray declarations
580    public <T extends @Nullable Object> T[] toArray(T[] array) {
581      return snapshot().toArray(array);
582    }
583
584    private List<Multiset.Entry<E>> snapshot() {
585      List<Multiset.Entry<E>> list = Lists.newArrayListWithExpectedSize(size());
586      // Not Iterables.addAll(list, this), because that'll forward right back here.
587      Iterators.addAll(list, iterator());
588      return list;
589    }
590  }
591
592  /** @serialData the ConcurrentMap of elements and their counts. */
593  private void writeObject(ObjectOutputStream stream) throws IOException {
594    stream.defaultWriteObject();
595    stream.writeObject(countMap);
596  }
597
598  private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
599    stream.defaultReadObject();
600    @SuppressWarnings("unchecked") // reading data stored by writeObject
601    ConcurrentMap<E, Integer> deserializedCountMap =
602        (ConcurrentMap<E, Integer>) stream.readObject();
603    FieldSettersHolder.COUNT_MAP_FIELD_SETTER.set(this, deserializedCountMap);
604  }
605
606  private static final long serialVersionUID = 1;
607}