001/*
002 * Copyright (C) 2011 The Guava Authors
003 *
004 * Licensed under the Apache License, Version 2.0 (the "License");
005 * you may not use this file except in compliance with the License.
006 * You may obtain a copy of the License at
007 *
008 * http://www.apache.org/licenses/LICENSE-2.0
009 *
010 * Unless required by applicable law or agreed to in writing, software
011 * distributed under the License is distributed on an "AS IS" BASIS,
012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
013 * See the License for the specific language governing permissions and
014 * limitations under the License.
015 */
016
017package com.google.common.util.concurrent;
018
019import static com.google.common.base.Objects.firstNonNull;
020
021import com.google.common.annotations.Beta;
022import com.google.common.annotations.VisibleForTesting;
023import com.google.common.base.Preconditions;
024import com.google.common.base.Supplier;
025import com.google.common.collect.ImmutableList;
026import com.google.common.collect.Iterables;
027import com.google.common.collect.MapMaker;
028import com.google.common.math.IntMath;
029import com.google.common.primitives.Ints;
030
031import java.lang.ref.Reference;
032import java.lang.ref.ReferenceQueue;
033import java.lang.ref.WeakReference;
034import java.math.RoundingMode;
035import java.util.Arrays;
036import java.util.Collections;
037import java.util.List;
038import java.util.concurrent.ConcurrentMap;
039import java.util.concurrent.Semaphore;
040import java.util.concurrent.atomic.AtomicReferenceArray;
041import java.util.concurrent.locks.Lock;
042import java.util.concurrent.locks.ReadWriteLock;
043import java.util.concurrent.locks.ReentrantLock;
044import java.util.concurrent.locks.ReentrantReadWriteLock;
045
046/**
047 * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping
048 * similar to that of {@code ConcurrentHashMap} in a reusable form, and extends it for
049 * semaphores and read-write locks. Conceptually, lock striping is the technique of dividing a lock
050 * into many <i>stripes</i>, increasing the granularity of a single lock and allowing independent
051 * operations to lock different stripes and proceed concurrently, instead of creating contention
052 * for a single lock.
053 *
054 * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore),
055 * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)}
056 * (assuming {@link Object#hashCode()} is correctly implemented for the keys). Note
057 * that if {@code key1} is <strong>not</strong> equal to {@code key2}, it is <strong>not</strong>
058 * guaranteed that {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless
059 * be mapped to the same lock. The lower the number of stripes, the higher the probability of this
060 * happening.
061 *
062 * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>},
063 * and {@code Striped<ReadWriteLock>}. For each type, two implementations are offered:
064 * {@linkplain #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak}
065 * {@code Striped<Lock>}, {@linkplain #semaphore(int, int) strong} and {@linkplain
066 * #lazyWeakSemaphore(int, int) weak} {@code Striped<Semaphore>}, and {@linkplain
067 * #readWriteLock(int) strong} and {@linkplain #lazyWeakReadWriteLock(int) weak}
068 * {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all stripes (locks/semaphores) are
069 * initialized eagerly, and are not reclaimed unless {@code Striped} itself is reclaimable.
070 * <i>Weak</i> means that locks/semaphores are created lazily, and they are allowed to be reclaimed
071 * if nobody is holding on to them. This is useful, for example, if one wants to create a {@code
072 * Striped<Lock>} of many locks, but worries that in most cases only a small portion of these
073 * would be in use.
074 *
075 * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K}
076 * represents the task. This maximizes concurrency by having each unique key mapped to a unique
077 * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock
078 * for all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of
079 * choosing either of these extremes, {@code Striped} allows the user to trade between required
080 * concurrency and memory footprint. For example, if a set of tasks are CPU-bound, one could easily
081 * create a very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes,
082 * instead of possibly thousands of locks which could be created in a {@code Map<K, Lock>}
083 * structure.
084 *
085 * @author Dimitris Andreou
086 * @since 13.0
087 */
088@Beta
089public abstract class Striped<L> {
090  /**
091   * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be
092   * smaller than a large array.  (This assumes that in the lazy case, most stripes are unused. As
093   * always, if many stripes are in use, a non-lazy striped makes more sense.)
094   */
095  private static final int LARGE_LAZY_CUTOFF = 1024;
096
097  private Striped() {}
098
099  /**
100   * Returns the stripe that corresponds to the passed key. It is always guaranteed that if
101   * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
102   *
103   * @param key an arbitrary, non-null key
104   * @return the stripe that the passed key corresponds to
105   */
106  public abstract L get(Object key);
107
108  /**
109   * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to
110   * {@code size()}, exclusively.
111   *
112   * @param index the index of the stripe to return; must be in {@code [0...size())}
113   * @return the stripe at the specified index
114   */
115  public abstract L getAt(int index);
116
117  /**
118   * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
119   */
120  abstract int indexFor(Object key);
121
122  /**
123   * Returns the total number of stripes in this instance.
124   */
125  public abstract int size();
126
127  /**
128   * Returns the stripes that correspond to the passed objects, in ascending (as per
129   * {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned
130   * by this method are guaranteed to not deadlock each other.
131   *
132   * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and
133   * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number
134   * of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays
135   * are needed for a pair of them to match). Please consider carefully the implications of the
136   * number of stripes, the intended concurrency level, and the typical number of keys used in a
137   * {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls
138   * in Bins model</a> for mathematical formulas that can be used to estimate the probability of
139   * collisions.
140   *
141   * @param keys arbitrary non-null keys
142   * @return the stripes corresponding to the objects (one per each object, derived by delegating
143   *         to {@link #get(Object)}; may contain duplicates), in an increasing index order.
144   */
145  public Iterable<L> bulkGet(Iterable<?> keys) {
146    // Initially using the array to store the keys, then reusing it to store the respective L's
147    final Object[] array = Iterables.toArray(keys, Object.class);
148    if (array.length == 0) {
149      return ImmutableList.of();
150    }
151    int[] stripes = new int[array.length];
152    for (int i = 0; i < array.length; i++) {
153      stripes[i] = indexFor(array[i]);
154    }
155    Arrays.sort(stripes);
156    // optimize for runs of identical stripes
157    int previousStripe = stripes[0];
158    array[0] = getAt(previousStripe);
159    for (int i = 1; i < array.length; i++) {
160      int currentStripe = stripes[i];
161      if (currentStripe == previousStripe) {
162        array[i] = array[i - 1];
163      } else {
164        array[i] = getAt(currentStripe);
165        previousStripe = currentStripe;
166      }
167    }
168    /*
169     * Note that the returned Iterable holds references to the returned stripes, to avoid
170     * error-prone code like:
171     *
172     * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)'
173     * Iterable<Lock> locks = stripedLock.bulkGet(keys);
174     * for (Lock lock : locks) {
175     *   lock.lock();
176     * }
177     * operation();
178     * for (Lock lock : locks) {
179     *   lock.unlock();
180     * }
181     *
182     * If we only held the int[] stripes, translating it on the fly to L's, the original locks
183     * might be garbage collected after locking them, ending up in a huge mess.
184     */
185    @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's
186    List<L> asList = (List<L>) Arrays.asList(array);
187    return Collections.unmodifiableList(asList);
188  }
189
190  // Static factories
191
192  /**
193   * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks.
194   * Every lock is reentrant.
195   *
196   * @param stripes the minimum number of stripes (locks) required
197   * @return a new {@code Striped<Lock>}
198   */
199  public static Striped<Lock> lock(int stripes) {
200    return new CompactStriped<Lock>(stripes, new Supplier<Lock>() {
201      @Override public Lock get() {
202        return new PaddedLock();
203      }
204    });
205  }
206
207  /**
208   * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks.
209   * Every lock is reentrant.
210   *
211   * @param stripes the minimum number of stripes (locks) required
212   * @return a new {@code Striped<Lock>}
213   */
214  public static Striped<Lock> lazyWeakLock(int stripes) {
215    return lazy(stripes, new Supplier<Lock>() {
216      @Override public Lock get() {
217        return new ReentrantLock(false);
218      }
219    });
220  }
221
222  private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) {
223    return stripes < LARGE_LAZY_CUTOFF 
224        ? new SmallLazyStriped<L>(stripes, supplier)
225        : new LargeLazyStriped<L>(stripes, supplier);
226  }
227
228  /**
229   * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
230   * with the specified number of permits.
231   *
232   * @param stripes the minimum number of stripes (semaphores) required
233   * @param permits the number of permits in each semaphore
234   * @return a new {@code Striped<Semaphore>}
235   */
236  public static Striped<Semaphore> semaphore(int stripes, final int permits) {
237    return new CompactStriped<Semaphore>(stripes, new Supplier<Semaphore>() {
238      @Override public Semaphore get() {
239        return new PaddedSemaphore(permits);
240      }
241    });
242  }
243
244  /**
245   * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
246   * with the specified number of permits.
247   *
248   * @param stripes the minimum number of stripes (semaphores) required
249   * @param permits the number of permits in each semaphore
250   * @return a new {@code Striped<Semaphore>}
251   */
252  public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) {
253    return lazy(stripes, new Supplier<Semaphore>() {
254      @Override public Semaphore get() {
255        return new Semaphore(permits, false);
256      }
257    });
258  }
259
260  /**
261   * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
262   * read-write locks. Every lock is reentrant.
263   *
264   * @param stripes the minimum number of stripes (locks) required
265   * @return a new {@code Striped<ReadWriteLock>}
266   */
267  public static Striped<ReadWriteLock> readWriteLock(int stripes) {
268    return new CompactStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER);
269  }
270
271  /**
272   * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced
273   * read-write locks. Every lock is reentrant.
274   *
275   * @param stripes the minimum number of stripes (locks) required
276   * @return a new {@code Striped<ReadWriteLock>}
277   */
278  public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) {
279    return lazy(stripes, READ_WRITE_LOCK_SUPPLIER);
280  }
281
282  // ReentrantReadWriteLock is large enough to make padding probably unnecessary
283  private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER =
284      new Supplier<ReadWriteLock>() {
285    @Override public ReadWriteLock get() {
286      return new ReentrantReadWriteLock();
287    }
288  };
289
290  private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
291    /** Capacity (power of two) minus one, for fast mod evaluation */
292    final int mask;
293
294    PowerOfTwoStriped(int stripes) {
295      Preconditions.checkArgument(stripes > 0, "Stripes must be positive");
296      this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1;
297    }
298
299    @Override final int indexFor(Object key) {
300      int hash = smear(key.hashCode());
301      return hash & mask;
302    }
303
304    @Override public final L get(Object key) {
305      return getAt(indexFor(key));
306    }
307  }
308
309  /**
310   * Implementation of Striped where 2^k stripes are represented as an array of the same length,
311   * eagerly initialized.
312   */
313  private static class CompactStriped<L> extends PowerOfTwoStriped<L> {
314    /** Size is a power of two. */
315    private final Object[] array;
316
317    private CompactStriped(int stripes, Supplier<L> supplier) {
318      super(stripes);
319      Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)");
320
321      this.array = new Object[mask + 1];
322      for (int i = 0; i < array.length; i++) {
323        array[i] = supplier.get();
324      }
325    }
326
327    @SuppressWarnings("unchecked") // we only put L's in the array
328    @Override public L getAt(int index) {
329      return (L) array[index];
330    }
331
332    @Override public int size() {
333      return array.length;
334    }
335  }
336
337  /**
338   * Implementation of Striped where up to 2^k stripes can be represented, using an
339   * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the
340   * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
341   */
342  @VisibleForTesting static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> {
343    final AtomicReferenceArray<ArrayReference<? extends L>> locks;
344    final Supplier<L> supplier;
345    final int size;
346    final ReferenceQueue<L> queue = new ReferenceQueue<L>();
347
348    SmallLazyStriped(int stripes, Supplier<L> supplier) {
349      super(stripes);
350      this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
351      this.locks = new AtomicReferenceArray<ArrayReference<? extends L>>(size);
352      this.supplier = supplier;
353    }
354
355    @Override public L getAt(int index) {
356      if (size != Integer.MAX_VALUE) {
357        Preconditions.checkElementIndex(index, size());
358      } // else no check necessary, all index values are valid
359      ArrayReference<? extends L> existingRef = locks.get(index);
360      L existing = existingRef == null ? null : existingRef.get();
361      if (existing != null) {
362        return existing;
363      }
364      L created = supplier.get();
365      ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue);
366      while (!locks.compareAndSet(index, existingRef, newRef)) {
367        // we raced, we need to re-read and try again
368        existingRef = locks.get(index);
369        existing = existingRef == null ? null : existingRef.get();
370        if (existing != null) {
371          return existing;
372        }
373      }
374      drainQueue();
375      return created;
376    }
377    
378    // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references
379    // in the array.  We could skip this if we decide we don't care about holding on to Reference
380    // objects indefinitely.
381    private void drainQueue() {
382      Reference<? extends L> ref;
383      while ((ref = queue.poll()) != null) {
384        // We only ever register ArrayReferences with the queue so this is always safe.
385        ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref;
386        // Try to clear out the array slot, n.b. if we fail that is fine, in either case the
387        // arrayRef will be out of the array after this step.
388        locks.compareAndSet(arrayRef.index, arrayRef, null);
389      }
390    }
391
392    @Override public int size() {
393      return size;
394    }
395
396    private static final class ArrayReference<L> extends WeakReference<L> {
397      final int index;
398
399      ArrayReference(L referent, int index, ReferenceQueue<L> queue) {
400        super(referent, queue);
401        this.index = index;
402      }
403    }
404  }
405
406  /**
407   * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap
408   * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the
409   * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
410   */
411  @VisibleForTesting static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> {
412    final ConcurrentMap<Integer, L> locks;
413    final Supplier<L> supplier;
414    final int size;
415
416    LargeLazyStriped(int stripes, Supplier<L> supplier) {
417      super(stripes);
418      this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
419      this.supplier = supplier;
420      this.locks = new MapMaker().weakValues().makeMap();
421    }
422
423    @Override public L getAt(int index) {
424      if (size != Integer.MAX_VALUE) {
425        Preconditions.checkElementIndex(index, size());
426      } // else no check necessary, all index values are valid
427      L existing = locks.get(index);
428      if (existing != null) {
429        return existing;
430      }
431      L created = supplier.get();
432      existing = locks.putIfAbsent(index, created);
433      return firstNonNull(existing, created);
434    }
435
436    @Override public int size() {
437      return size;
438    }
439  }
440
441  /**
442   * A bit mask were all bits are set.
443   */
444  private static final int ALL_SET = ~0;
445
446  private static int ceilToPowerOfTwo(int x) {
447    return 1 << IntMath.log2(x, RoundingMode.CEILING);
448  }
449
450  /*
451   * This method was written by Doug Lea with assistance from members of JCP
452   * JSR-166 Expert Group and released to the public domain, as explained at
453   * http://creativecommons.org/licenses/publicdomain
454   *
455   * As of 2010/06/11, this method is identical to the (package private) hash
456   * method in OpenJDK 7's java.util.HashMap class.
457   */
458  // Copied from java/com/google/common/collect/Hashing.java
459  private static int smear(int hashCode) {
460    hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
461    return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
462  }
463
464  private static class PaddedLock extends ReentrantLock {
465    /*
466     * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add
467     * a fourth long here, to minimize chance of interference between consecutive locks,
468     * but I couldn't observe any benefit from that.
469     */
470    @SuppressWarnings("unused")
471    long q1, q2, q3;
472
473    PaddedLock() {
474      super(false);
475    }
476  }
477
478  private static class PaddedSemaphore extends Semaphore {
479    // See PaddedReentrantLock comment
480    @SuppressWarnings("unused")
481    long q1, q2, q3;
482
483    PaddedSemaphore(int permits) {
484      super(permits, false);
485    }
486  }
487}