001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); 005 * you may not use this file except in compliance with the License. 006 * You may obtain a copy of the License at 007 * 008 * http://www.apache.org/licenses/LICENSE-2.0 009 * 010 * Unless required by applicable law or agreed to in writing, software 011 * distributed under the License is distributed on an "AS IS" BASIS, 012 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 013 * See the License for the specific language governing permissions and 014 * limitations under the License. 015 */ 016 017package com.google.common.util.concurrent; 018 019import static com.google.common.base.Objects.firstNonNull; 020 021import com.google.common.annotations.Beta; 022import com.google.common.annotations.VisibleForTesting; 023import com.google.common.base.Preconditions; 024import com.google.common.base.Supplier; 025import com.google.common.collect.ImmutableList; 026import com.google.common.collect.Iterables; 027import com.google.common.collect.MapMaker; 028import com.google.common.math.IntMath; 029import com.google.common.primitives.Ints; 030 031import java.lang.ref.Reference; 032import java.lang.ref.ReferenceQueue; 033import java.lang.ref.WeakReference; 034import java.math.RoundingMode; 035import java.util.Arrays; 036import java.util.Collections; 037import java.util.List; 038import java.util.concurrent.ConcurrentMap; 039import java.util.concurrent.Semaphore; 040import java.util.concurrent.atomic.AtomicReferenceArray; 041import java.util.concurrent.locks.Lock; 042import java.util.concurrent.locks.ReadWriteLock; 043import java.util.concurrent.locks.ReentrantLock; 044import java.util.concurrent.locks.ReentrantReadWriteLock; 045 046/** 047 * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping 048 * similar to that of {@code ConcurrentHashMap} in a reusable form, and extends it for 049 * semaphores and read-write locks. Conceptually, lock striping is the technique of dividing a lock 050 * into many <i>stripes</i>, increasing the granularity of a single lock and allowing independent 051 * operations to lock different stripes and proceed concurrently, instead of creating contention 052 * for a single lock. 053 * 054 * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore), 055 * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} 056 * (assuming {@link Object#hashCode()} is correctly implemented for the keys). Note 057 * that if {@code key1} is <strong>not</strong> equal to {@code key2}, it is <strong>not</strong> 058 * guaranteed that {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless 059 * be mapped to the same lock. The lower the number of stripes, the higher the probability of this 060 * happening. 061 * 062 * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>}, 063 * and {@code Striped<ReadWriteLock>}. For each type, two implementations are offered: 064 * {@linkplain #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak} 065 * {@code Striped<Lock>}, {@linkplain #semaphore(int, int) strong} and {@linkplain 066 * #lazyWeakSemaphore(int, int) weak} {@code Striped<Semaphore>}, and {@linkplain 067 * #readWriteLock(int) strong} and {@linkplain #lazyWeakReadWriteLock(int) weak} 068 * {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all stripes (locks/semaphores) are 069 * initialized eagerly, and are not reclaimed unless {@code Striped} itself is reclaimable. 070 * <i>Weak</i> means that locks/semaphores are created lazily, and they are allowed to be reclaimed 071 * if nobody is holding on to them. This is useful, for example, if one wants to create a {@code 072 * Striped<Lock>} of many locks, but worries that in most cases only a small portion of these 073 * would be in use. 074 * 075 * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K} 076 * represents the task. This maximizes concurrency by having each unique key mapped to a unique 077 * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock 078 * for all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of 079 * choosing either of these extremes, {@code Striped} allows the user to trade between required 080 * concurrency and memory footprint. For example, if a set of tasks are CPU-bound, one could easily 081 * create a very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes, 082 * instead of possibly thousands of locks which could be created in a {@code Map<K, Lock>} 083 * structure. 084 * 085 * @author Dimitris Andreou 086 * @since 13.0 087 */ 088@Beta 089public abstract class Striped<L> { 090 /** 091 * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be 092 * smaller than a large array. (This assumes that in the lazy case, most stripes are unused. As 093 * always, if many stripes are in use, a non-lazy striped makes more sense.) 094 */ 095 private static final int LARGE_LAZY_CUTOFF = 1024; 096 097 private Striped() {} 098 099 /** 100 * Returns the stripe that corresponds to the passed key. It is always guaranteed that if 101 * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}. 102 * 103 * @param key an arbitrary, non-null key 104 * @return the stripe that the passed key corresponds to 105 */ 106 public abstract L get(Object key); 107 108 /** 109 * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to 110 * {@code size()}, exclusively. 111 * 112 * @param index the index of the stripe to return; must be in {@code [0...size())} 113 * @return the stripe at the specified index 114 */ 115 public abstract L getAt(int index); 116 117 /** 118 * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key). 119 */ 120 abstract int indexFor(Object key); 121 122 /** 123 * Returns the total number of stripes in this instance. 124 */ 125 public abstract int size(); 126 127 /** 128 * Returns the stripes that correspond to the passed objects, in ascending (as per 129 * {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned 130 * by this method are guaranteed to not deadlock each other. 131 * 132 * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and 133 * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number 134 * of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays 135 * are needed for a pair of them to match). Please consider carefully the implications of the 136 * number of stripes, the intended concurrency level, and the typical number of keys used in a 137 * {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls 138 * in Bins model</a> for mathematical formulas that can be used to estimate the probability of 139 * collisions. 140 * 141 * @param keys arbitrary non-null keys 142 * @return the stripes corresponding to the objects (one per each object, derived by delegating 143 * to {@link #get(Object)}; may contain duplicates), in an increasing index order. 144 */ 145 public Iterable<L> bulkGet(Iterable<?> keys) { 146 // Initially using the array to store the keys, then reusing it to store the respective L's 147 final Object[] array = Iterables.toArray(keys, Object.class); 148 if (array.length == 0) { 149 return ImmutableList.of(); 150 } 151 int[] stripes = new int[array.length]; 152 for (int i = 0; i < array.length; i++) { 153 stripes[i] = indexFor(array[i]); 154 } 155 Arrays.sort(stripes); 156 // optimize for runs of identical stripes 157 int previousStripe = stripes[0]; 158 array[0] = getAt(previousStripe); 159 for (int i = 1; i < array.length; i++) { 160 int currentStripe = stripes[i]; 161 if (currentStripe == previousStripe) { 162 array[i] = array[i - 1]; 163 } else { 164 array[i] = getAt(currentStripe); 165 previousStripe = currentStripe; 166 } 167 } 168 /* 169 * Note that the returned Iterable holds references to the returned stripes, to avoid 170 * error-prone code like: 171 * 172 * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)' 173 * Iterable<Lock> locks = stripedLock.bulkGet(keys); 174 * for (Lock lock : locks) { 175 * lock.lock(); 176 * } 177 * operation(); 178 * for (Lock lock : locks) { 179 * lock.unlock(); 180 * } 181 * 182 * If we only held the int[] stripes, translating it on the fly to L's, the original locks 183 * might be garbage collected after locking them, ending up in a huge mess. 184 */ 185 @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's 186 List<L> asList = (List<L>) Arrays.asList(array); 187 return Collections.unmodifiableList(asList); 188 } 189 190 // Static factories 191 192 /** 193 * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks. 194 * Every lock is reentrant. 195 * 196 * @param stripes the minimum number of stripes (locks) required 197 * @return a new {@code Striped<Lock>} 198 */ 199 public static Striped<Lock> lock(int stripes) { 200 return new CompactStriped<Lock>(stripes, new Supplier<Lock>() { 201 @Override public Lock get() { 202 return new PaddedLock(); 203 } 204 }); 205 } 206 207 /** 208 * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks. 209 * Every lock is reentrant. 210 * 211 * @param stripes the minimum number of stripes (locks) required 212 * @return a new {@code Striped<Lock>} 213 */ 214 public static Striped<Lock> lazyWeakLock(int stripes) { 215 return lazy(stripes, new Supplier<Lock>() { 216 @Override public Lock get() { 217 return new ReentrantLock(false); 218 } 219 }); 220 } 221 222 private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) { 223 return stripes < LARGE_LAZY_CUTOFF 224 ? new SmallLazyStriped<L>(stripes, supplier) 225 : new LargeLazyStriped<L>(stripes, supplier); 226 } 227 228 /** 229 * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores, 230 * with the specified number of permits. 231 * 232 * @param stripes the minimum number of stripes (semaphores) required 233 * @param permits the number of permits in each semaphore 234 * @return a new {@code Striped<Semaphore>} 235 */ 236 public static Striped<Semaphore> semaphore(int stripes, final int permits) { 237 return new CompactStriped<Semaphore>(stripes, new Supplier<Semaphore>() { 238 @Override public Semaphore get() { 239 return new PaddedSemaphore(permits); 240 } 241 }); 242 } 243 244 /** 245 * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores, 246 * with the specified number of permits. 247 * 248 * @param stripes the minimum number of stripes (semaphores) required 249 * @param permits the number of permits in each semaphore 250 * @return a new {@code Striped<Semaphore>} 251 */ 252 public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) { 253 return lazy(stripes, new Supplier<Semaphore>() { 254 @Override public Semaphore get() { 255 return new Semaphore(permits, false); 256 } 257 }); 258 } 259 260 /** 261 * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced 262 * read-write locks. Every lock is reentrant. 263 * 264 * @param stripes the minimum number of stripes (locks) required 265 * @return a new {@code Striped<ReadWriteLock>} 266 */ 267 public static Striped<ReadWriteLock> readWriteLock(int stripes) { 268 return new CompactStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER); 269 } 270 271 /** 272 * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced 273 * read-write locks. Every lock is reentrant. 274 * 275 * @param stripes the minimum number of stripes (locks) required 276 * @return a new {@code Striped<ReadWriteLock>} 277 */ 278 public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) { 279 return lazy(stripes, READ_WRITE_LOCK_SUPPLIER); 280 } 281 282 // ReentrantReadWriteLock is large enough to make padding probably unnecessary 283 private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER = 284 new Supplier<ReadWriteLock>() { 285 @Override public ReadWriteLock get() { 286 return new ReentrantReadWriteLock(); 287 } 288 }; 289 290 private abstract static class PowerOfTwoStriped<L> extends Striped<L> { 291 /** Capacity (power of two) minus one, for fast mod evaluation */ 292 final int mask; 293 294 PowerOfTwoStriped(int stripes) { 295 Preconditions.checkArgument(stripes > 0, "Stripes must be positive"); 296 this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1; 297 } 298 299 @Override final int indexFor(Object key) { 300 int hash = smear(key.hashCode()); 301 return hash & mask; 302 } 303 304 @Override public final L get(Object key) { 305 return getAt(indexFor(key)); 306 } 307 } 308 309 /** 310 * Implementation of Striped where 2^k stripes are represented as an array of the same length, 311 * eagerly initialized. 312 */ 313 private static class CompactStriped<L> extends PowerOfTwoStriped<L> { 314 /** Size is a power of two. */ 315 private final Object[] array; 316 317 private CompactStriped(int stripes, Supplier<L> supplier) { 318 super(stripes); 319 Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)"); 320 321 this.array = new Object[mask + 1]; 322 for (int i = 0; i < array.length; i++) { 323 array[i] = supplier.get(); 324 } 325 } 326 327 @SuppressWarnings("unchecked") // we only put L's in the array 328 @Override public L getAt(int index) { 329 return (L) array[index]; 330 } 331 332 @Override public int size() { 333 return array.length; 334 } 335 } 336 337 /** 338 * Implementation of Striped where up to 2^k stripes can be represented, using an 339 * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the 340 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. 341 */ 342 @VisibleForTesting static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> { 343 final AtomicReferenceArray<ArrayReference<? extends L>> locks; 344 final Supplier<L> supplier; 345 final int size; 346 final ReferenceQueue<L> queue = new ReferenceQueue<L>(); 347 348 SmallLazyStriped(int stripes, Supplier<L> supplier) { 349 super(stripes); 350 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; 351 this.locks = new AtomicReferenceArray<ArrayReference<? extends L>>(size); 352 this.supplier = supplier; 353 } 354 355 @Override public L getAt(int index) { 356 if (size != Integer.MAX_VALUE) { 357 Preconditions.checkElementIndex(index, size()); 358 } // else no check necessary, all index values are valid 359 ArrayReference<? extends L> existingRef = locks.get(index); 360 L existing = existingRef == null ? null : existingRef.get(); 361 if (existing != null) { 362 return existing; 363 } 364 L created = supplier.get(); 365 ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue); 366 while (!locks.compareAndSet(index, existingRef, newRef)) { 367 // we raced, we need to re-read and try again 368 existingRef = locks.get(index); 369 existing = existingRef == null ? null : existingRef.get(); 370 if (existing != null) { 371 return existing; 372 } 373 } 374 drainQueue(); 375 return created; 376 } 377 378 // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references 379 // in the array. We could skip this if we decide we don't care about holding on to Reference 380 // objects indefinitely. 381 private void drainQueue() { 382 Reference<? extends L> ref; 383 while ((ref = queue.poll()) != null) { 384 // We only ever register ArrayReferences with the queue so this is always safe. 385 ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref; 386 // Try to clear out the array slot, n.b. if we fail that is fine, in either case the 387 // arrayRef will be out of the array after this step. 388 locks.compareAndSet(arrayRef.index, arrayRef, null); 389 } 390 } 391 392 @Override public int size() { 393 return size; 394 } 395 396 private static final class ArrayReference<L> extends WeakReference<L> { 397 final int index; 398 399 ArrayReference(L referent, int index, ReferenceQueue<L> queue) { 400 super(referent, queue); 401 this.index = index; 402 } 403 } 404 } 405 406 /** 407 * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap 408 * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the 409 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. 410 */ 411 @VisibleForTesting static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> { 412 final ConcurrentMap<Integer, L> locks; 413 final Supplier<L> supplier; 414 final int size; 415 416 LargeLazyStriped(int stripes, Supplier<L> supplier) { 417 super(stripes); 418 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; 419 this.supplier = supplier; 420 this.locks = new MapMaker().weakValues().makeMap(); 421 } 422 423 @Override public L getAt(int index) { 424 if (size != Integer.MAX_VALUE) { 425 Preconditions.checkElementIndex(index, size()); 426 } // else no check necessary, all index values are valid 427 L existing = locks.get(index); 428 if (existing != null) { 429 return existing; 430 } 431 L created = supplier.get(); 432 existing = locks.putIfAbsent(index, created); 433 return firstNonNull(existing, created); 434 } 435 436 @Override public int size() { 437 return size; 438 } 439 } 440 441 /** 442 * A bit mask were all bits are set. 443 */ 444 private static final int ALL_SET = ~0; 445 446 private static int ceilToPowerOfTwo(int x) { 447 return 1 << IntMath.log2(x, RoundingMode.CEILING); 448 } 449 450 /* 451 * This method was written by Doug Lea with assistance from members of JCP 452 * JSR-166 Expert Group and released to the public domain, as explained at 453 * http://creativecommons.org/licenses/publicdomain 454 * 455 * As of 2010/06/11, this method is identical to the (package private) hash 456 * method in OpenJDK 7's java.util.HashMap class. 457 */ 458 // Copied from java/com/google/common/collect/Hashing.java 459 private static int smear(int hashCode) { 460 hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); 461 return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); 462 } 463 464 private static class PaddedLock extends ReentrantLock { 465 /* 466 * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add 467 * a fourth long here, to minimize chance of interference between consecutive locks, 468 * but I couldn't observe any benefit from that. 469 */ 470 @SuppressWarnings("unused") 471 long q1, q2, q3; 472 473 PaddedLock() { 474 super(false); 475 } 476 } 477 478 private static class PaddedSemaphore extends Semaphore { 479 // See PaddedReentrantLock comment 480 @SuppressWarnings("unused") 481 long q1, q2, q3; 482 483 PaddedSemaphore(int permits) { 484 super(permits, false); 485 } 486 } 487}