001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import com.google.common.annotations.Beta; 020import com.google.common.annotations.VisibleForTesting; 021import com.google.common.base.MoreObjects; 022import com.google.common.base.Preconditions; 023import com.google.common.base.Supplier; 024import com.google.common.collect.ImmutableList; 025import com.google.common.collect.Iterables; 026import com.google.common.collect.MapMaker; 027import com.google.common.math.IntMath; 028import com.google.common.primitives.Ints; 029 030import java.lang.ref.Reference; 031import java.lang.ref.ReferenceQueue; 032import java.lang.ref.WeakReference; 033import java.math.RoundingMode; 034import java.util.Arrays; 035import java.util.Collections; 036import java.util.List; 037import java.util.concurrent.ConcurrentMap; 038import java.util.concurrent.Semaphore; 039import java.util.concurrent.atomic.AtomicReferenceArray; 040import java.util.concurrent.locks.Lock; 041import java.util.concurrent.locks.ReadWriteLock; 042import java.util.concurrent.locks.ReentrantLock; 043import java.util.concurrent.locks.ReentrantReadWriteLock; 044 045/** 046 * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping 047 * similar to that of {@code ConcurrentHashMap} in a reusable form, and extends it for 048 * semaphores and read-write locks. Conceptually, lock striping is the technique of dividing a lock 049 * into many <i>stripes</i>, increasing the granularity of a single lock and allowing independent 050 * operations to lock different stripes and proceed concurrently, instead of creating contention 051 * for a single lock. 052 * 053 * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore), 054 * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} 055 * (assuming {@link Object#hashCode()} is correctly implemented for the keys). Note 056 * that if {@code key1} is <strong>not</strong> equal to {@code key2}, it is <strong>not</strong> 057 * guaranteed that {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless 058 * be mapped to the same lock. The lower the number of stripes, the higher the probability of this 059 * happening. 060 * 061 * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>}, 062 * and {@code Striped<ReadWriteLock>}. For each type, two implementations are offered: 063 * {@linkplain #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak} 064 * {@code Striped<Lock>}, {@linkplain #semaphore(int, int) strong} and {@linkplain 065 * #lazyWeakSemaphore(int, int) weak} {@code Striped<Semaphore>}, and {@linkplain 066 * #readWriteLock(int) strong} and {@linkplain #lazyWeakReadWriteLock(int) weak} 067 * {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all stripes (locks/semaphores) are 068 * initialized eagerly, and are not reclaimed unless {@code Striped} itself is reclaimable. 069 * <i>Weak</i> means that locks/semaphores are created lazily, and they are allowed to be reclaimed 070 * if nobody is holding on to them. This is useful, for example, if one wants to create a {@code 071 * Striped<Lock>} of many locks, but worries that in most cases only a small portion of these 072 * would be in use. 073 * 074 * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K} 075 * represents the task. This maximizes concurrency by having each unique key mapped to a unique 076 * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock 077 * for all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of 078 * choosing either of these extremes, {@code Striped} allows the user to trade between required 079 * concurrency and memory footprint. For example, if a set of tasks are CPU-bound, one could easily 080 * create a very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes, 081 * instead of possibly thousands of locks which could be created in a {@code Map<K, Lock>} 082 * structure. 083 * 084 * @author Dimitris Andreou 085 * @since 13.0 086 */ 087@Beta 088public abstract class Striped<L> { 089 /** 090 * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be 091 * smaller than a large array. (This assumes that in the lazy case, most stripes are unused. As 092 * always, if many stripes are in use, a non-lazy striped makes more sense.) 093 */ 094 private static final int LARGE_LAZY_CUTOFF = 1024; 095 096 private Striped() {} 097 098 /** 099 * Returns the stripe that corresponds to the passed key. It is always guaranteed that if 100 * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}. 101 * 102 * @param key an arbitrary, non-null key 103 * @return the stripe that the passed key corresponds to 104 */ 105 public abstract L get(Object key); 106 107 /** 108 * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to 109 * {@code size()}, exclusively. 110 * 111 * @param index the index of the stripe to return; must be in {@code [0...size())} 112 * @return the stripe at the specified index 113 */ 114 public abstract L getAt(int index); 115 116 /** 117 * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key). 118 */ 119 abstract int indexFor(Object key); 120 121 /** 122 * Returns the total number of stripes in this instance. 123 */ 124 public abstract int size(); 125 126 /** 127 * Returns the stripes that correspond to the passed objects, in ascending (as per 128 * {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned 129 * by this method are guaranteed to not deadlock each other. 130 * 131 * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and 132 * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number 133 * of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays 134 * are needed for a pair of them to match). Please consider carefully the implications of the 135 * number of stripes, the intended concurrency level, and the typical number of keys used in a 136 * {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls 137 * in Bins model</a> for mathematical formulas that can be used to estimate the probability of 138 * collisions. 139 * 140 * @param keys arbitrary non-null keys 141 * @return the stripes corresponding to the objects (one per each object, derived by delegating 142 * to {@link #get(Object)}; may contain duplicates), in an increasing index order. 143 */ 144 public Iterable<L> bulkGet(Iterable<?> keys) { 145 // Initially using the array to store the keys, then reusing it to store the respective L's 146 final Object[] array = Iterables.toArray(keys, Object.class); 147 if (array.length == 0) { 148 return ImmutableList.of(); 149 } 150 int[] stripes = new int[array.length]; 151 for (int i = 0; i < array.length; i++) { 152 stripes[i] = indexFor(array[i]); 153 } 154 Arrays.sort(stripes); 155 // optimize for runs of identical stripes 156 int previousStripe = stripes[0]; 157 array[0] = getAt(previousStripe); 158 for (int i = 1; i < array.length; i++) { 159 int currentStripe = stripes[i]; 160 if (currentStripe == previousStripe) { 161 array[i] = array[i - 1]; 162 } else { 163 array[i] = getAt(currentStripe); 164 previousStripe = currentStripe; 165 } 166 } 167 /* 168 * Note that the returned Iterable holds references to the returned stripes, to avoid 169 * error-prone code like: 170 * 171 * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)' 172 * Iterable<Lock> locks = stripedLock.bulkGet(keys); 173 * for (Lock lock : locks) { 174 * lock.lock(); 175 * } 176 * operation(); 177 * for (Lock lock : locks) { 178 * lock.unlock(); 179 * } 180 * 181 * If we only held the int[] stripes, translating it on the fly to L's, the original locks 182 * might be garbage collected after locking them, ending up in a huge mess. 183 */ 184 @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's 185 List<L> asList = (List<L>) Arrays.asList(array); 186 return Collections.unmodifiableList(asList); 187 } 188 189 // Static factories 190 191 /** 192 * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks. 193 * Every lock is reentrant. 194 * 195 * @param stripes the minimum number of stripes (locks) required 196 * @return a new {@code Striped<Lock>} 197 */ 198 public static Striped<Lock> lock(int stripes) { 199 return new CompactStriped<Lock>(stripes, new Supplier<Lock>() { 200 @Override public Lock get() { 201 return new PaddedLock(); 202 } 203 }); 204 } 205 206 /** 207 * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks. 208 * Every lock is reentrant. 209 * 210 * @param stripes the minimum number of stripes (locks) required 211 * @return a new {@code Striped<Lock>} 212 */ 213 public static Striped<Lock> lazyWeakLock(int stripes) { 214 return lazy(stripes, new Supplier<Lock>() { 215 @Override public Lock get() { 216 return new ReentrantLock(false); 217 } 218 }); 219 } 220 221 private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) { 222 return stripes < LARGE_LAZY_CUTOFF 223 ? new SmallLazyStriped<L>(stripes, supplier) 224 : new LargeLazyStriped<L>(stripes, supplier); 225 } 226 227 /** 228 * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores, 229 * with the specified number of permits. 230 * 231 * @param stripes the minimum number of stripes (semaphores) required 232 * @param permits the number of permits in each semaphore 233 * @return a new {@code Striped<Semaphore>} 234 */ 235 public static Striped<Semaphore> semaphore(int stripes, final int permits) { 236 return new CompactStriped<Semaphore>(stripes, new Supplier<Semaphore>() { 237 @Override public Semaphore get() { 238 return new PaddedSemaphore(permits); 239 } 240 }); 241 } 242 243 /** 244 * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores, 245 * with the specified number of permits. 246 * 247 * @param stripes the minimum number of stripes (semaphores) required 248 * @param permits the number of permits in each semaphore 249 * @return a new {@code Striped<Semaphore>} 250 */ 251 public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) { 252 return lazy(stripes, new Supplier<Semaphore>() { 253 @Override public Semaphore get() { 254 return new Semaphore(permits, false); 255 } 256 }); 257 } 258 259 /** 260 * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced 261 * read-write locks. Every lock is reentrant. 262 * 263 * @param stripes the minimum number of stripes (locks) required 264 * @return a new {@code Striped<ReadWriteLock>} 265 */ 266 public static Striped<ReadWriteLock> readWriteLock(int stripes) { 267 return new CompactStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER); 268 } 269 270 /** 271 * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced 272 * read-write locks. Every lock is reentrant. 273 * 274 * @param stripes the minimum number of stripes (locks) required 275 * @return a new {@code Striped<ReadWriteLock>} 276 */ 277 public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) { 278 return lazy(stripes, READ_WRITE_LOCK_SUPPLIER); 279 } 280 281 // ReentrantReadWriteLock is large enough to make padding probably unnecessary 282 private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER = 283 new Supplier<ReadWriteLock>() { 284 @Override public ReadWriteLock get() { 285 return new ReentrantReadWriteLock(); 286 } 287 }; 288 289 private abstract static class PowerOfTwoStriped<L> extends Striped<L> { 290 /** Capacity (power of two) minus one, for fast mod evaluation */ 291 final int mask; 292 293 PowerOfTwoStriped(int stripes) { 294 Preconditions.checkArgument(stripes > 0, "Stripes must be positive"); 295 this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1; 296 } 297 298 @Override final int indexFor(Object key) { 299 int hash = smear(key.hashCode()); 300 return hash & mask; 301 } 302 303 @Override public final L get(Object key) { 304 return getAt(indexFor(key)); 305 } 306 } 307 308 /** 309 * Implementation of Striped where 2^k stripes are represented as an array of the same length, 310 * eagerly initialized. 311 */ 312 private static class CompactStriped<L> extends PowerOfTwoStriped<L> { 313 /** Size is a power of two. */ 314 private final Object[] array; 315 316 private CompactStriped(int stripes, Supplier<L> supplier) { 317 super(stripes); 318 Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)"); 319 320 this.array = new Object[mask + 1]; 321 for (int i = 0; i < array.length; i++) { 322 array[i] = supplier.get(); 323 } 324 } 325 326 @SuppressWarnings("unchecked") // we only put L's in the array 327 @Override public L getAt(int index) { 328 return (L) array[index]; 329 } 330 331 @Override public int size() { 332 return array.length; 333 } 334 } 335 336 /** 337 * Implementation of Striped where up to 2^k stripes can be represented, using an 338 * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the 339 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. 340 */ 341 @VisibleForTesting static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> { 342 final AtomicReferenceArray<ArrayReference<? extends L>> locks; 343 final Supplier<L> supplier; 344 final int size; 345 final ReferenceQueue<L> queue = new ReferenceQueue<L>(); 346 347 SmallLazyStriped(int stripes, Supplier<L> supplier) { 348 super(stripes); 349 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; 350 this.locks = new AtomicReferenceArray<ArrayReference<? extends L>>(size); 351 this.supplier = supplier; 352 } 353 354 @Override public L getAt(int index) { 355 if (size != Integer.MAX_VALUE) { 356 Preconditions.checkElementIndex(index, size()); 357 } // else no check necessary, all index values are valid 358 ArrayReference<? extends L> existingRef = locks.get(index); 359 L existing = existingRef == null ? null : existingRef.get(); 360 if (existing != null) { 361 return existing; 362 } 363 L created = supplier.get(); 364 ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue); 365 while (!locks.compareAndSet(index, existingRef, newRef)) { 366 // we raced, we need to re-read and try again 367 existingRef = locks.get(index); 368 existing = existingRef == null ? null : existingRef.get(); 369 if (existing != null) { 370 return existing; 371 } 372 } 373 drainQueue(); 374 return created; 375 } 376 377 // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references 378 // in the array. We could skip this if we decide we don't care about holding on to Reference 379 // objects indefinitely. 380 private void drainQueue() { 381 Reference<? extends L> ref; 382 while ((ref = queue.poll()) != null) { 383 // We only ever register ArrayReferences with the queue so this is always safe. 384 ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref; 385 // Try to clear out the array slot, n.b. if we fail that is fine, in either case the 386 // arrayRef will be out of the array after this step. 387 locks.compareAndSet(arrayRef.index, arrayRef, null); 388 } 389 } 390 391 @Override public int size() { 392 return size; 393 } 394 395 private static final class ArrayReference<L> extends WeakReference<L> { 396 final int index; 397 398 ArrayReference(L referent, int index, ReferenceQueue<L> queue) { 399 super(referent, queue); 400 this.index = index; 401 } 402 } 403 } 404 405 /** 406 * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap 407 * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the 408 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. 409 */ 410 @VisibleForTesting static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> { 411 final ConcurrentMap<Integer, L> locks; 412 final Supplier<L> supplier; 413 final int size; 414 415 LargeLazyStriped(int stripes, Supplier<L> supplier) { 416 super(stripes); 417 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; 418 this.supplier = supplier; 419 this.locks = new MapMaker().weakValues().makeMap(); 420 } 421 422 @Override public L getAt(int index) { 423 if (size != Integer.MAX_VALUE) { 424 Preconditions.checkElementIndex(index, size()); 425 } // else no check necessary, all index values are valid 426 L existing = locks.get(index); 427 if (existing != null) { 428 return existing; 429 } 430 L created = supplier.get(); 431 existing = locks.putIfAbsent(index, created); 432 return MoreObjects.firstNonNull(existing, created); 433 } 434 435 @Override public int size() { 436 return size; 437 } 438 } 439 440 /** 441 * A bit mask were all bits are set. 442 */ 443 private static final int ALL_SET = ~0; 444 445 private static int ceilToPowerOfTwo(int x) { 446 return 1 << IntMath.log2(x, RoundingMode.CEILING); 447 } 448 449 /* 450 * This method was written by Doug Lea with assistance from members of JCP 451 * JSR-166 Expert Group and released to the public domain, as explained at 452 * http://creativecommons.org/licenses/publicdomain 453 * 454 * As of 2010/06/11, this method is identical to the (package private) hash 455 * method in OpenJDK 7's java.util.HashMap class. 456 */ 457 // Copied from java/com/google/common/collect/Hashing.java 458 private static int smear(int hashCode) { 459 hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); 460 return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); 461 } 462 463 private static class PaddedLock extends ReentrantLock { 464 /* 465 * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add 466 * a fourth long here, to minimize chance of interference between consecutive locks, 467 * but I couldn't observe any benefit from that. 468 */ 469 @SuppressWarnings("unused") 470 long q1, q2, q3; 471 472 PaddedLock() { 473 super(false); 474 } 475 } 476 477 private static class PaddedSemaphore extends Semaphore { 478 // See PaddedReentrantLock comment 479 @SuppressWarnings("unused") 480 long q1, q2, q3; 481 482 PaddedSemaphore(int permits) { 483 super(permits, false); 484 } 485 } 486}