001/*
002 * Copyright (C) 2011 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
005 * in compliance with the License. You may obtain a copy of the License at
006 *
007 * http://www.apache.org/licenses/LICENSE-2.0
008 *
009 * Unless required by applicable law or agreed to in writing, software distributed under the License
010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
011 * or implied. See the License for the specific language governing permissions and limitations under
012 * the License.
013 */
014
015package com.google.common.util.concurrent;
016
017import com.google.common.annotations.Beta;
018import com.google.common.annotations.GwtIncompatible;
019import com.google.common.annotations.VisibleForTesting;
020import com.google.common.base.MoreObjects;
021import com.google.common.base.Preconditions;
022import com.google.common.base.Supplier;
023import com.google.common.collect.ImmutableList;
024import com.google.common.collect.Iterables;
025import com.google.common.collect.MapMaker;
026import com.google.common.math.IntMath;
027import com.google.common.primitives.Ints;
028import java.lang.ref.Reference;
029import java.lang.ref.ReferenceQueue;
030import java.lang.ref.WeakReference;
031import java.math.RoundingMode;
032import java.util.Arrays;
033import java.util.Collections;
034import java.util.List;
035import java.util.concurrent.ConcurrentMap;
036import java.util.concurrent.Semaphore;
037import java.util.concurrent.atomic.AtomicReferenceArray;
038import java.util.concurrent.locks.Condition;
039import java.util.concurrent.locks.Lock;
040import java.util.concurrent.locks.ReadWriteLock;
041import java.util.concurrent.locks.ReentrantLock;
042import java.util.concurrent.locks.ReentrantReadWriteLock;
043
044/**
045 * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping similar
046 * to that of {@code ConcurrentHashMap} in a reusable form, and extends it for semaphores and
047 * read-write locks. Conceptually, lock striping is the technique of dividing a lock into many
048 * <i>stripes</i>, increasing the granularity of a single lock and allowing independent operations
049 * to lock different stripes and proceed concurrently, instead of creating contention for a single
050 * lock.
051 *
052 * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore),
053 * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} (assuming
054 * {@link Object#hashCode()} is correctly implemented for the keys). Note that if {@code key1} is
055 * <strong>not</strong> equal to {@code key2}, it is <strong>not</strong> guaranteed that {@code
056 * striped.get(key1) != striped.get(key2)}; the elements might nevertheless be mapped to the same
057 * lock. The lower the number of stripes, the higher the probability of this happening.
058 *
059 * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>}, and
060 * {@code Striped<ReadWriteLock>}. For each type, two implementations are offered: {@linkplain
061 * #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak} {@code Striped<Lock>}, {@linkplain
062 * #semaphore(int, int) strong} and {@linkplain #lazyWeakSemaphore(int, int) weak} {@code
063 * Striped<Semaphore>}, and {@linkplain #readWriteLock(int) strong} and {@linkplain
064 * #lazyWeakReadWriteLock(int) weak} {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all
065 * stripes (locks/semaphores) are initialized eagerly, and are not reclaimed unless {@code Striped}
066 * itself is reclaimable. <i>Weak</i> means that locks/semaphores are created lazily, and they are
067 * allowed to be reclaimed if nobody is holding on to them. This is useful, for example, if one
068 * wants to create a {@code Striped<Lock>} of many locks, but worries that in most cases only a
069 * small portion of these would be in use.
070 *
071 * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K}
072 * represents the task. This maximizes concurrency by having each unique key mapped to a unique
073 * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock for
074 * all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of choosing
075 * either of these extremes, {@code Striped} allows the user to trade between required concurrency
076 * and memory footprint. For example, if a set of tasks are CPU-bound, one could easily create a
077 * very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes, instead of
078 * possibly thousands of locks which could be created in a {@code Map<K, Lock>} structure.
079 *
080 * @author Dimitris Andreou
081 * @since 13.0
082 */
083@Beta
084@GwtIncompatible
085public abstract class Striped<L> {
086  /**
087   * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be
088   * smaller than a large array. (This assumes that in the lazy case, most stripes are unused. As
089   * always, if many stripes are in use, a non-lazy striped makes more sense.)
090   */
091  private static final int LARGE_LAZY_CUTOFF = 1024;
092
093  private Striped() {}
094
095  /**
096   * Returns the stripe that corresponds to the passed key. It is always guaranteed that if {@code
097   * key1.equals(key2)}, then {@code get(key1) == get(key2)}.
098   *
099   * @param key an arbitrary, non-null key
100   * @return the stripe that the passed key corresponds to
101   */
102  public abstract L get(Object key);
103
104  /**
105   * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to {@code size()},
106   * exclusively.
107   *
108   * @param index the index of the stripe to return; must be in {@code [0...size())}
109   * @return the stripe at the specified index
110   */
111  public abstract L getAt(int index);
112
113  /**
114   * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
115   */
116  abstract int indexFor(Object key);
117
118  /** Returns the total number of stripes in this instance. */
119  public abstract int size();
120
121  /**
122   * Returns the stripes that correspond to the passed objects, in ascending (as per {@link
123   * #getAt(int)}) order. Thus, threads that use the stripes in the order returned by this method
124   * are guaranteed to not deadlock each other.
125   *
126   * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and {@code
127   * bulkGet(keys)} with a relative large number of keys can cause an excessive number of shared
128   * stripes (much like the birthday paradox, where much fewer than anticipated birthdays are needed
129   * for a pair of them to match). Please consider carefully the implications of the number of
130   * stripes, the intended concurrency level, and the typical number of keys used in a {@code
131   * bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls in
132   * Bins model</a> for mathematical formulas that can be used to estimate the probability of
133   * collisions.
134   *
135   * @param keys arbitrary non-null keys
136   * @return the stripes corresponding to the objects (one per each object, derived by delegating to
137   *     {@link #get(Object)}; may contain duplicates), in an increasing index order.
138   */
139  public Iterable<L> bulkGet(Iterable<?> keys) {
140    // Initially using the array to store the keys, then reusing it to store the respective L's
141    final Object[] array = Iterables.toArray(keys, Object.class);
142    if (array.length == 0) {
143      return ImmutableList.of();
144    }
145    int[] stripes = new int[array.length];
146    for (int i = 0; i < array.length; i++) {
147      stripes[i] = indexFor(array[i]);
148    }
149    Arrays.sort(stripes);
150    // optimize for runs of identical stripes
151    int previousStripe = stripes[0];
152    array[0] = getAt(previousStripe);
153    for (int i = 1; i < array.length; i++) {
154      int currentStripe = stripes[i];
155      if (currentStripe == previousStripe) {
156        array[i] = array[i - 1];
157      } else {
158        array[i] = getAt(currentStripe);
159        previousStripe = currentStripe;
160      }
161    }
162    /*
163     * Note that the returned Iterable holds references to the returned stripes, to avoid
164     * error-prone code like:
165     *
166     * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)'
167     * Iterable<Lock> locks = stripedLock.bulkGet(keys);
168     * for (Lock lock : locks) {
169     *   lock.lock();
170     * }
171     * operation();
172     * for (Lock lock : locks) {
173     *   lock.unlock();
174     * }
175     *
176     * If we only held the int[] stripes, translating it on the fly to L's, the original locks might
177     * be garbage collected after locking them, ending up in a huge mess.
178     */
179    @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's
180    List<L> asList = (List<L>) Arrays.asList(array);
181    return Collections.unmodifiableList(asList);
182  }
183
184  // Static factories
185
186  /**
187   * Creates a {@code Striped<L>} with eagerly initialized, strongly referenced locks. Every lock is
188   * obtained from the passed supplier.
189   *
190   * @param stripes the minimum number of stripes (locks) required
191   * @param supplier a {@code Supplier<L>} object to obtain locks from
192   * @return a new {@code Striped<L>}
193   */
194  static <L> Striped<L> custom(int stripes, Supplier<L> supplier) {
195    return new CompactStriped<>(stripes, supplier);
196  }
197
198  /**
199   * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks. Every lock
200   * is reentrant.
201   *
202   * @param stripes the minimum number of stripes (locks) required
203   * @return a new {@code Striped<Lock>}
204   */
205  public static Striped<Lock> lock(int stripes) {
206    return custom(
207        stripes,
208        new Supplier<Lock>() {
209          @Override
210          public Lock get() {
211            return new PaddedLock();
212          }
213        });
214  }
215
216  /**
217   * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks. Every lock is
218   * reentrant.
219   *
220   * @param stripes the minimum number of stripes (locks) required
221   * @return a new {@code Striped<Lock>}
222   */
223  public static Striped<Lock> lazyWeakLock(int stripes) {
224    return lazy(
225        stripes,
226        new Supplier<Lock>() {
227          @Override
228          public Lock get() {
229            return new ReentrantLock(false);
230          }
231        });
232  }
233
234  private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) {
235    return stripes < LARGE_LAZY_CUTOFF
236        ? new SmallLazyStriped<L>(stripes, supplier)
237        : new LargeLazyStriped<L>(stripes, supplier);
238  }
239
240  /**
241   * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
242   * with the specified number of permits.
243   *
244   * @param stripes the minimum number of stripes (semaphores) required
245   * @param permits the number of permits in each semaphore
246   * @return a new {@code Striped<Semaphore>}
247   */
248  public static Striped<Semaphore> semaphore(int stripes, final int permits) {
249    return custom(
250        stripes,
251        new Supplier<Semaphore>() {
252          @Override
253          public Semaphore get() {
254            return new PaddedSemaphore(permits);
255          }
256        });
257  }
258
259  /**
260   * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
261   * with the specified number of permits.
262   *
263   * @param stripes the minimum number of stripes (semaphores) required
264   * @param permits the number of permits in each semaphore
265   * @return a new {@code Striped<Semaphore>}
266   */
267  public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) {
268    return lazy(
269        stripes,
270        new Supplier<Semaphore>() {
271          @Override
272          public Semaphore get() {
273            return new Semaphore(permits, false);
274          }
275        });
276  }
277
278  /**
279   * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
280   * read-write locks. Every lock is reentrant.
281   *
282   * @param stripes the minimum number of stripes (locks) required
283   * @return a new {@code Striped<ReadWriteLock>}
284   */
285  public static Striped<ReadWriteLock> readWriteLock(int stripes) {
286    return custom(stripes, READ_WRITE_LOCK_SUPPLIER);
287  }
288
289  /**
290   * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced read-write
291   * locks. Every lock is reentrant.
292   *
293   * @param stripes the minimum number of stripes (locks) required
294   * @return a new {@code Striped<ReadWriteLock>}
295   */
296  public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) {
297    return lazy(stripes, WEAK_SAFE_READ_WRITE_LOCK_SUPPLIER);
298  }
299
300  private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER =
301      new Supplier<ReadWriteLock>() {
302        @Override
303        public ReadWriteLock get() {
304          return new ReentrantReadWriteLock();
305        }
306      };
307
308  private static final Supplier<ReadWriteLock> WEAK_SAFE_READ_WRITE_LOCK_SUPPLIER =
309      new Supplier<ReadWriteLock>() {
310        @Override
311        public ReadWriteLock get() {
312          return new WeakSafeReadWriteLock();
313        }
314      };
315
316  /**
317   * ReadWriteLock implementation whose read and write locks retain a reference back to this lock.
318   * Otherwise, a reference to just the read lock or just the write lock would not suffice to ensure
319   * the {@code ReadWriteLock} is retained.
320   */
321  private static final class WeakSafeReadWriteLock implements ReadWriteLock {
322    private final ReadWriteLock delegate;
323
324    WeakSafeReadWriteLock() {
325      this.delegate = new ReentrantReadWriteLock();
326    }
327
328    @Override
329    public Lock readLock() {
330      return new WeakSafeLock(delegate.readLock(), this);
331    }
332
333    @Override
334    public Lock writeLock() {
335      return new WeakSafeLock(delegate.writeLock(), this);
336    }
337  }
338
339  /** Lock object that ensures a strong reference is retained to a specified object. */
340  private static final class WeakSafeLock extends ForwardingLock {
341    private final Lock delegate;
342
343    @SuppressWarnings("unused")
344    private final WeakSafeReadWriteLock strongReference;
345
346    WeakSafeLock(Lock delegate, WeakSafeReadWriteLock strongReference) {
347      this.delegate = delegate;
348      this.strongReference = strongReference;
349    }
350
351    @Override
352    Lock delegate() {
353      return delegate;
354    }
355
356    @Override
357    public Condition newCondition() {
358      return new WeakSafeCondition(delegate.newCondition(), strongReference);
359    }
360  }
361
362  /** Condition object that ensures a strong reference is retained to a specified object. */
363  private static final class WeakSafeCondition extends ForwardingCondition {
364    private final Condition delegate;
365
366    @SuppressWarnings("unused")
367    private final WeakSafeReadWriteLock strongReference;
368
369    WeakSafeCondition(Condition delegate, WeakSafeReadWriteLock strongReference) {
370      this.delegate = delegate;
371      this.strongReference = strongReference;
372    }
373
374    @Override
375    Condition delegate() {
376      return delegate;
377    }
378  }
379
380  private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
381    /** Capacity (power of two) minus one, for fast mod evaluation */
382    final int mask;
383
384    PowerOfTwoStriped(int stripes) {
385      Preconditions.checkArgument(stripes > 0, "Stripes must be positive");
386      this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1;
387    }
388
389    @Override
390    final int indexFor(Object key) {
391      int hash = smear(key.hashCode());
392      return hash & mask;
393    }
394
395    @Override
396    public final L get(Object key) {
397      return getAt(indexFor(key));
398    }
399  }
400
401  /**
402   * Implementation of Striped where 2^k stripes are represented as an array of the same length,
403   * eagerly initialized.
404   */
405  private static class CompactStriped<L> extends PowerOfTwoStriped<L> {
406    /** Size is a power of two. */
407    private final Object[] array;
408
409    private CompactStriped(int stripes, Supplier<L> supplier) {
410      super(stripes);
411      Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)");
412
413      this.array = new Object[mask + 1];
414      for (int i = 0; i < array.length; i++) {
415        array[i] = supplier.get();
416      }
417    }
418
419    @SuppressWarnings("unchecked") // we only put L's in the array
420    @Override
421    public L getAt(int index) {
422      return (L) array[index];
423    }
424
425    @Override
426    public int size() {
427      return array.length;
428    }
429  }
430
431  /**
432   * Implementation of Striped where up to 2^k stripes can be represented, using an
433   * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the
434   * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
435   */
436  @VisibleForTesting
437  static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> {
438    final AtomicReferenceArray<ArrayReference<? extends L>> locks;
439    final Supplier<L> supplier;
440    final int size;
441    final ReferenceQueue<L> queue = new ReferenceQueue<L>();
442
443    SmallLazyStriped(int stripes, Supplier<L> supplier) {
444      super(stripes);
445      this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
446      this.locks = new AtomicReferenceArray<>(size);
447      this.supplier = supplier;
448    }
449
450    @Override
451    public L getAt(int index) {
452      if (size != Integer.MAX_VALUE) {
453        Preconditions.checkElementIndex(index, size());
454      } // else no check necessary, all index values are valid
455      ArrayReference<? extends L> existingRef = locks.get(index);
456      L existing = existingRef == null ? null : existingRef.get();
457      if (existing != null) {
458        return existing;
459      }
460      L created = supplier.get();
461      ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue);
462      while (!locks.compareAndSet(index, existingRef, newRef)) {
463        // we raced, we need to re-read and try again
464        existingRef = locks.get(index);
465        existing = existingRef == null ? null : existingRef.get();
466        if (existing != null) {
467          return existing;
468        }
469      }
470      drainQueue();
471      return created;
472    }
473
474    // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references
475    // in the array. We could skip this if we decide we don't care about holding on to Reference
476    // objects indefinitely.
477    private void drainQueue() {
478      Reference<? extends L> ref;
479      while ((ref = queue.poll()) != null) {
480        // We only ever register ArrayReferences with the queue so this is always safe.
481        ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref;
482        // Try to clear out the array slot, n.b. if we fail that is fine, in either case the
483        // arrayRef will be out of the array after this step.
484        locks.compareAndSet(arrayRef.index, arrayRef, null);
485      }
486    }
487
488    @Override
489    public int size() {
490      return size;
491    }
492
493    private static final class ArrayReference<L> extends WeakReference<L> {
494      final int index;
495
496      ArrayReference(L referent, int index, ReferenceQueue<L> queue) {
497        super(referent, queue);
498        this.index = index;
499      }
500    }
501  }
502
503  /**
504   * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap
505   * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the
506   * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
507   */
508  @VisibleForTesting
509  static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> {
510    final ConcurrentMap<Integer, L> locks;
511    final Supplier<L> supplier;
512    final int size;
513
514    LargeLazyStriped(int stripes, Supplier<L> supplier) {
515      super(stripes);
516      this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
517      this.supplier = supplier;
518      this.locks = new MapMaker().weakValues().makeMap();
519    }
520
521    @Override
522    public L getAt(int index) {
523      if (size != Integer.MAX_VALUE) {
524        Preconditions.checkElementIndex(index, size());
525      } // else no check necessary, all index values are valid
526      L existing = locks.get(index);
527      if (existing != null) {
528        return existing;
529      }
530      L created = supplier.get();
531      existing = locks.putIfAbsent(index, created);
532      return MoreObjects.firstNonNull(existing, created);
533    }
534
535    @Override
536    public int size() {
537      return size;
538    }
539  }
540
541  /** A bit mask were all bits are set. */
542  private static final int ALL_SET = ~0;
543
544  private static int ceilToPowerOfTwo(int x) {
545    return 1 << IntMath.log2(x, RoundingMode.CEILING);
546  }
547
548  /*
549   * This method was written by Doug Lea with assistance from members of JCP JSR-166 Expert Group
550   * and released to the public domain, as explained at
551   * http://creativecommons.org/licenses/publicdomain
552   *
553   * As of 2010/06/11, this method is identical to the (package private) hash method in OpenJDK 7's
554   * java.util.HashMap class.
555   */
556  // Copied from java/com/google/common/collect/Hashing.java
557  private static int smear(int hashCode) {
558    hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
559    return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
560  }
561
562  private static class PaddedLock extends ReentrantLock {
563    /*
564     * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add a fourth
565     * long here, to minimize chance of interference between consecutive locks, but I couldn't
566     * observe any benefit from that.
567     */
568    long unused1;
569    long unused2;
570    long unused3;
571
572    PaddedLock() {
573      super(false);
574    }
575  }
576
577  private static class PaddedSemaphore extends Semaphore {
578    // See PaddedReentrantLock comment
579    long unused1;
580    long unused2;
581    long unused3;
582
583    PaddedSemaphore(int permits) {
584      super(permits, false);
585    }
586  }
587}