001/*
002 * Copyright (C) 2007 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.collect;
018
019import static com.google.common.base.Preconditions.checkArgument;
020import static com.google.common.base.Preconditions.checkNotNull;
021import static com.google.common.base.Preconditions.checkState;
022import static com.google.common.collect.CollectPreconditions.checkNonnegative;
023
024import com.google.common.annotations.Beta;
025import com.google.common.annotations.GwtIncompatible;
026import com.google.common.annotations.VisibleForTesting;
027import com.google.common.collect.Serialization.FieldSetter;
028import com.google.common.math.IntMath;
029import com.google.common.primitives.Ints;
030import com.google.errorprone.annotations.CanIgnoreReturnValue;
031import com.google.j2objc.annotations.WeakOuter;
032import java.io.IOException;
033import java.io.ObjectInputStream;
034import java.io.ObjectOutputStream;
035import java.io.Serializable;
036import java.util.Collection;
037import java.util.Iterator;
038import java.util.List;
039import java.util.Map;
040import java.util.Set;
041import java.util.concurrent.ConcurrentHashMap;
042import java.util.concurrent.ConcurrentMap;
043import java.util.concurrent.atomic.AtomicInteger;
044import javax.annotation.CheckForNull;
045import org.checkerframework.checker.nullness.qual.Nullable;
046
047/**
048 * A multiset that supports concurrent modifications and that provides atomic versions of most
049 * {@code Multiset} operations (exceptions where noted). Null elements are not supported.
050 *
051 * <p>See the Guava User Guide article on <a href=
052 * "https://github.com/google/guava/wiki/NewCollectionTypesExplained#multiset">{@code Multiset}</a>.
053 *
054 * @author Cliff L. Biffle
055 * @author mike nonemacher
056 * @since 2.0
057 */
058@GwtIncompatible
059@ElementTypesAreNonnullByDefault
060public final class ConcurrentHashMultiset<E> extends AbstractMultiset<E> implements Serializable {
061
062  /*
063   * The ConcurrentHashMultiset's atomic operations are implemented primarily in terms of
064   * AtomicInteger's atomic operations, with some help from ConcurrentMap's atomic operations on
065   * creation and removal (including automatic removal of zeroes). If the modification of an
066   * AtomicInteger results in zero, we compareAndSet the value to zero; if that succeeds, we remove
067   * the entry from the Map. If another operation sees a zero in the map, it knows that the entry is
068   * about to be removed, so this operation may remove it (often by replacing it with a new
069   * AtomicInteger).
070   */
071
072  /** The number of occurrences of each element. */
073  private final transient ConcurrentMap<E, AtomicInteger> countMap;
074
075  // This constant allows the deserialization code to set a final field. This holder class
076  // makes sure it is not initialized unless an instance is deserialized.
077  private static class FieldSettersHolder {
078    static final FieldSetter<ConcurrentHashMultiset> COUNT_MAP_FIELD_SETTER =
079        Serialization.getFieldSetter(ConcurrentHashMultiset.class, "countMap");
080  }
081
082  /**
083   * Creates a new, empty {@code ConcurrentHashMultiset} using the default initial capacity, load
084   * factor, and concurrency settings.
085   */
086  public static <E> ConcurrentHashMultiset<E> create() {
087    // TODO(schmoe): provide a way to use this class with other (possibly arbitrary)
088    // ConcurrentMap implementors. One possibility is to extract most of this class into
089    // an AbstractConcurrentMapMultiset.
090    return new ConcurrentHashMultiset<>(new ConcurrentHashMap<E, AtomicInteger>());
091  }
092
093  /**
094   * Creates a new {@code ConcurrentHashMultiset} containing the specified elements, using the
095   * default initial capacity, load factor, and concurrency settings.
096   *
097   * <p>This implementation is highly efficient when {@code elements} is itself a {@link Multiset}.
098   *
099   * @param elements the elements that the multiset should contain
100   */
101  public static <E> ConcurrentHashMultiset<E> create(Iterable<? extends E> elements) {
102    ConcurrentHashMultiset<E> multiset = ConcurrentHashMultiset.create();
103    Iterables.addAll(multiset, elements);
104    return multiset;
105  }
106
107  /**
108   * Creates a new, empty {@code ConcurrentHashMultiset} using {@code countMap} as the internal
109   * backing map.
110   *
111   * <p>This instance will assume ownership of {@code countMap}, and other code should not maintain
112   * references to the map or modify it in any way.
113   *
114   * <p>The returned multiset is serializable if the input map is.
115   *
116   * @param countMap backing map for storing the elements in the multiset and their counts. It must
117   *     be empty.
118   * @throws IllegalArgumentException if {@code countMap} is not empty
119   * @since 20.0
120   */
121  @Beta
122  public static <E> ConcurrentHashMultiset<E> create(ConcurrentMap<E, AtomicInteger> countMap) {
123    return new ConcurrentHashMultiset<>(countMap);
124  }
125
126  @VisibleForTesting
127  ConcurrentHashMultiset(ConcurrentMap<E, AtomicInteger> countMap) {
128    checkArgument(countMap.isEmpty(), "the backing map (%s) must be empty", countMap);
129    this.countMap = countMap;
130  }
131
132  // Query Operations
133
134  /**
135   * Returns the number of occurrences of {@code element} in this multiset.
136   *
137   * @param element the element to look for
138   * @return the nonnegative number of occurrences of the element
139   */
140  @Override
141  public int count(@CheckForNull Object element) {
142    AtomicInteger existingCounter = Maps.safeGet(countMap, element);
143    return (existingCounter == null) ? 0 : existingCounter.get();
144  }
145
146  /**
147   * {@inheritDoc}
148   *
149   * <p>If the data in the multiset is modified by any other threads during this method, it is
150   * undefined which (if any) of these modifications will be reflected in the result.
151   */
152  @Override
153  public int size() {
154    long sum = 0L;
155    for (AtomicInteger value : countMap.values()) {
156      sum += value.get();
157    }
158    return Ints.saturatedCast(sum);
159  }
160
161  /*
162   * Note: the superclass toArray() methods assume that size() gives a correct
163   * answer, which ours does not.
164   */
165
166  @Override
167  public Object[] toArray() {
168    return snapshot().toArray();
169  }
170
171  @Override
172  @SuppressWarnings("nullness") // b/192354773 in our checker affects toArray declarations
173  public <T extends @Nullable Object> T[] toArray(T[] array) {
174    return snapshot().toArray(array);
175  }
176
177  /*
178   * We'd love to use 'new ArrayList(this)' or 'list.addAll(this)', but
179   * either of these would recurse back to us again!
180   */
181  private List<E> snapshot() {
182    List<E> list = Lists.newArrayListWithExpectedSize(size());
183    for (Multiset.Entry<E> entry : entrySet()) {
184      E element = entry.getElement();
185      for (int i = entry.getCount(); i > 0; i--) {
186        list.add(element);
187      }
188    }
189    return list;
190  }
191
192  // Modification Operations
193
194  /**
195   * Adds a number of occurrences of the specified element to this multiset.
196   *
197   * @param element the element to add
198   * @param occurrences the number of occurrences to add
199   * @return the previous count of the element before the operation; possibly zero
200   * @throws IllegalArgumentException if {@code occurrences} is negative, or if the resulting amount
201   *     would exceed {@link Integer#MAX_VALUE}
202   */
203  @CanIgnoreReturnValue
204  @Override
205  public int add(E element, int occurrences) {
206    checkNotNull(element);
207    if (occurrences == 0) {
208      return count(element);
209    }
210    CollectPreconditions.checkPositive(occurrences, "occurrences");
211
212    while (true) {
213      AtomicInteger existingCounter = Maps.safeGet(countMap, element);
214      if (existingCounter == null) {
215        existingCounter = countMap.putIfAbsent(element, new AtomicInteger(occurrences));
216        if (existingCounter == null) {
217          return 0;
218        }
219        // existingCounter != null: fall through to operate against the existing AtomicInteger
220      }
221
222      while (true) {
223        int oldValue = existingCounter.get();
224        if (oldValue != 0) {
225          try {
226            int newValue = IntMath.checkedAdd(oldValue, occurrences);
227            if (existingCounter.compareAndSet(oldValue, newValue)) {
228              // newValue can't == 0, so no need to check & remove
229              return oldValue;
230            }
231          } catch (ArithmeticException overflow) {
232            throw new IllegalArgumentException(
233                "Overflow adding " + occurrences + " occurrences to a count of " + oldValue);
234          }
235        } else {
236          // In the case of a concurrent remove, we might observe a zero value, which means another
237          // thread is about to remove (element, existingCounter) from the map. Rather than wait,
238          // we can just do that work here.
239          AtomicInteger newCounter = new AtomicInteger(occurrences);
240          if ((countMap.putIfAbsent(element, newCounter) == null)
241              || countMap.replace(element, existingCounter, newCounter)) {
242            return 0;
243          }
244          break;
245        }
246      }
247
248      // If we're still here, there was a race, so just try again.
249    }
250  }
251
252  /**
253   * Removes a number of occurrences of the specified element from this multiset. If the multiset
254   * contains fewer than this number of occurrences to begin with, all occurrences will be removed.
255   *
256   * @param element the element whose occurrences should be removed
257   * @param occurrences the number of occurrences of the element to remove
258   * @return the count of the element before the operation; possibly zero
259   * @throws IllegalArgumentException if {@code occurrences} is negative
260   */
261  /*
262   * TODO(cpovirk): remove and removeExactly currently accept null inputs only
263   * if occurrences == 0. This satisfies both NullPointerTester and
264   * CollectionRemoveTester.testRemove_nullAllowed, but it's not clear that it's
265   * a good policy, especially because, in order for the test to pass, the
266   * parameter must be misleadingly annotated as @Nullable. I suspect that
267   * we'll want to remove @Nullable, add an eager checkNotNull, and loosen up
268   * testRemove_nullAllowed.
269   */
270  @CanIgnoreReturnValue
271  @Override
272  public int remove(@CheckForNull Object element, int occurrences) {
273    if (occurrences == 0) {
274      return count(element);
275    }
276    CollectPreconditions.checkPositive(occurrences, "occurrences");
277
278    AtomicInteger existingCounter = Maps.safeGet(countMap, element);
279    if (existingCounter == null) {
280      return 0;
281    }
282    while (true) {
283      int oldValue = existingCounter.get();
284      if (oldValue != 0) {
285        int newValue = Math.max(0, oldValue - occurrences);
286        if (existingCounter.compareAndSet(oldValue, newValue)) {
287          if (newValue == 0) {
288            // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
289            // another thread has already replaced it with a new counter, which is fine.
290            countMap.remove(element, existingCounter);
291          }
292          return oldValue;
293        }
294      } else {
295        return 0;
296      }
297    }
298  }
299
300  /**
301   * Removes exactly the specified number of occurrences of {@code element}, or makes no change if
302   * this is not possible.
303   *
304   * <p>This method, in contrast to {@link #remove(Object, int)}, has no effect when the element
305   * count is smaller than {@code occurrences}.
306   *
307   * @param element the element to remove
308   * @param occurrences the number of occurrences of {@code element} to remove
309   * @return {@code true} if the removal was possible (including if {@code occurrences} is zero)
310   * @throws IllegalArgumentException if {@code occurrences} is negative
311   */
312  @CanIgnoreReturnValue
313  public boolean removeExactly(@CheckForNull Object element, int occurrences) {
314    if (occurrences == 0) {
315      return true;
316    }
317    CollectPreconditions.checkPositive(occurrences, "occurrences");
318
319    AtomicInteger existingCounter = Maps.safeGet(countMap, element);
320    if (existingCounter == null) {
321      return false;
322    }
323    while (true) {
324      int oldValue = existingCounter.get();
325      if (oldValue < occurrences) {
326        return false;
327      }
328      int newValue = oldValue - occurrences;
329      if (existingCounter.compareAndSet(oldValue, newValue)) {
330        if (newValue == 0) {
331          // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
332          // another thread has already replaced it with a new counter, which is fine.
333          countMap.remove(element, existingCounter);
334        }
335        return true;
336      }
337    }
338  }
339
340  /**
341   * Adds or removes occurrences of {@code element} such that the {@link #count} of the element
342   * becomes {@code count}.
343   *
344   * @return the count of {@code element} in the multiset before this call
345   * @throws IllegalArgumentException if {@code count} is negative
346   */
347  @CanIgnoreReturnValue
348  @Override
349  public int setCount(E element, int count) {
350    checkNotNull(element);
351    checkNonnegative(count, "count");
352    while (true) {
353      AtomicInteger existingCounter = Maps.safeGet(countMap, element);
354      if (existingCounter == null) {
355        if (count == 0) {
356          return 0;
357        } else {
358          existingCounter = countMap.putIfAbsent(element, new AtomicInteger(count));
359          if (existingCounter == null) {
360            return 0;
361          }
362          // existingCounter != null: fall through
363        }
364      }
365
366      while (true) {
367        int oldValue = existingCounter.get();
368        if (oldValue == 0) {
369          if (count == 0) {
370            return 0;
371          } else {
372            AtomicInteger newCounter = new AtomicInteger(count);
373            if ((countMap.putIfAbsent(element, newCounter) == null)
374                || countMap.replace(element, existingCounter, newCounter)) {
375              return 0;
376            }
377          }
378          break;
379        } else {
380          if (existingCounter.compareAndSet(oldValue, count)) {
381            if (count == 0) {
382              // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
383              // another thread has already replaced it with a new counter, which is fine.
384              countMap.remove(element, existingCounter);
385            }
386            return oldValue;
387          }
388        }
389      }
390    }
391  }
392
393  /**
394   * Sets the number of occurrences of {@code element} to {@code newCount}, but only if the count is
395   * currently {@code expectedOldCount}. If {@code element} does not appear in the multiset exactly
396   * {@code expectedOldCount} times, no changes will be made.
397   *
398   * @return {@code true} if the change was successful. This usually indicates that the multiset has
399   *     been modified, but not always: in the case that {@code expectedOldCount == newCount}, the
400   *     method will return {@code true} if the condition was met.
401   * @throws IllegalArgumentException if {@code expectedOldCount} or {@code newCount} is negative
402   */
403  @CanIgnoreReturnValue
404  @Override
405  public boolean setCount(E element, int expectedOldCount, int newCount) {
406    checkNotNull(element);
407    checkNonnegative(expectedOldCount, "oldCount");
408    checkNonnegative(newCount, "newCount");
409
410    AtomicInteger existingCounter = Maps.safeGet(countMap, element);
411    if (existingCounter == null) {
412      if (expectedOldCount != 0) {
413        return false;
414      } else if (newCount == 0) {
415        return true;
416      } else {
417        // if our write lost the race, it must have lost to a nonzero value, so we can stop
418        return countMap.putIfAbsent(element, new AtomicInteger(newCount)) == null;
419      }
420    }
421    int oldValue = existingCounter.get();
422    if (oldValue == expectedOldCount) {
423      if (oldValue == 0) {
424        if (newCount == 0) {
425          // Just observed a 0; try to remove the entry to clean up the map
426          countMap.remove(element, existingCounter);
427          return true;
428        } else {
429          AtomicInteger newCounter = new AtomicInteger(newCount);
430          return (countMap.putIfAbsent(element, newCounter) == null)
431              || countMap.replace(element, existingCounter, newCounter);
432        }
433      } else {
434        if (existingCounter.compareAndSet(oldValue, newCount)) {
435          if (newCount == 0) {
436            // Just CASed to 0; remove the entry to clean up the map. If the removal fails,
437            // another thread has already replaced it with a new counter, which is fine.
438            countMap.remove(element, existingCounter);
439          }
440          return true;
441        }
442      }
443    }
444    return false;
445  }
446
447  // Views
448
449  @Override
450  Set<E> createElementSet() {
451    Set<E> delegate = countMap.keySet();
452    return new ForwardingSet<E>() {
453      @Override
454      protected Set<E> delegate() {
455        return delegate;
456      }
457
458      @Override
459      public boolean contains(@CheckForNull Object object) {
460        return object != null && Collections2.safeContains(delegate, object);
461      }
462
463      @Override
464      public boolean containsAll(Collection<?> collection) {
465        return standardContainsAll(collection);
466      }
467
468      @Override
469      public boolean remove(@CheckForNull Object object) {
470        return object != null && Collections2.safeRemove(delegate, object);
471      }
472
473      @Override
474      public boolean removeAll(Collection<?> c) {
475        return standardRemoveAll(c);
476      }
477    };
478  }
479
480  @Override
481  Iterator<E> elementIterator() {
482    throw new AssertionError("should never be called");
483  }
484
485  /** @deprecated Internal method, use {@link #entrySet()}. */
486  @Deprecated
487  @Override
488  public Set<Multiset.Entry<E>> createEntrySet() {
489    return new EntrySet();
490  }
491
492  @Override
493  int distinctElements() {
494    return countMap.size();
495  }
496
497  @Override
498  public boolean isEmpty() {
499    return countMap.isEmpty();
500  }
501
502  @Override
503  Iterator<Entry<E>> entryIterator() {
504    // AbstractIterator makes this fairly clean, but it doesn't support remove(). To support
505    // remove(), we create an AbstractIterator, and then use ForwardingIterator to delegate to it.
506    Iterator<Entry<E>> readOnlyIterator =
507        new AbstractIterator<Entry<E>>() {
508          private final Iterator<Map.Entry<E, AtomicInteger>> mapEntries =
509              countMap.entrySet().iterator();
510
511          @Override
512          @CheckForNull
513          protected Entry<E> computeNext() {
514            while (true) {
515              if (!mapEntries.hasNext()) {
516                return endOfData();
517              }
518              Map.Entry<E, AtomicInteger> mapEntry = mapEntries.next();
519              int count = mapEntry.getValue().get();
520              if (count != 0) {
521                return Multisets.immutableEntry(mapEntry.getKey(), count);
522              }
523            }
524          }
525        };
526
527    return new ForwardingIterator<Entry<E>>() {
528      @CheckForNull private Entry<E> last;
529
530      @Override
531      protected Iterator<Entry<E>> delegate() {
532        return readOnlyIterator;
533      }
534
535      @Override
536      public Entry<E> next() {
537        last = super.next();
538        return last;
539      }
540
541      @Override
542      public void remove() {
543        checkState(last != null, "no calls to next() since the last call to remove()");
544        ConcurrentHashMultiset.this.setCount(last.getElement(), 0);
545        last = null;
546      }
547    };
548  }
549
550  @Override
551  public Iterator<E> iterator() {
552    return Multisets.iteratorImpl(this);
553  }
554
555  @Override
556  public void clear() {
557    countMap.clear();
558  }
559
560  @WeakOuter
561  private class EntrySet extends AbstractMultiset<E>.EntrySet {
562    @Override
563    ConcurrentHashMultiset<E> multiset() {
564      return ConcurrentHashMultiset.this;
565    }
566
567    /*
568     * Note: the superclass toArray() methods assume that size() gives a correct
569     * answer, which ours does not.
570     */
571
572    @Override
573    public Object[] toArray() {
574      return snapshot().toArray();
575    }
576
577    @Override
578    @SuppressWarnings("nullness") // b/192354773 in our checker affects toArray declarations
579    public <T extends @Nullable Object> T[] toArray(T[] array) {
580      return snapshot().toArray(array);
581    }
582
583    private List<Multiset.Entry<E>> snapshot() {
584      List<Multiset.Entry<E>> list = Lists.newArrayListWithExpectedSize(size());
585      // Not Iterables.addAll(list, this), because that'll forward right back here.
586      Iterators.addAll(list, iterator());
587      return list;
588    }
589  }
590
591  /** @serialData the ConcurrentMap of elements and their counts. */
592  private void writeObject(ObjectOutputStream stream) throws IOException {
593    stream.defaultWriteObject();
594    stream.writeObject(countMap);
595  }
596
597  private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
598    stream.defaultReadObject();
599    @SuppressWarnings("unchecked") // reading data stored by writeObject
600    ConcurrentMap<E, Integer> deserializedCountMap =
601        (ConcurrentMap<E, Integer>) stream.readObject();
602    FieldSettersHolder.COUNT_MAP_FIELD_SETTER.set(this, deserializedCountMap);
603  }
604
605  private static final long serialVersionUID = 1;
606}