001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import com.google.common.annotations.Beta; 018import com.google.common.annotations.GwtIncompatible; 019import com.google.common.annotations.VisibleForTesting; 020import com.google.common.base.MoreObjects; 021import com.google.common.base.Preconditions; 022import com.google.common.base.Supplier; 023import com.google.common.collect.ImmutableList; 024import com.google.common.collect.Iterables; 025import com.google.common.collect.MapMaker; 026import com.google.common.math.IntMath; 027import com.google.common.primitives.Ints; 028import java.lang.ref.Reference; 029import java.lang.ref.ReferenceQueue; 030import java.lang.ref.WeakReference; 031import java.math.RoundingMode; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.Semaphore; 037import java.util.concurrent.atomic.AtomicReferenceArray; 038import java.util.concurrent.locks.Lock; 039import java.util.concurrent.locks.ReadWriteLock; 040import java.util.concurrent.locks.ReentrantLock; 041import java.util.concurrent.locks.ReentrantReadWriteLock; 042 043/** 044 * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping similar 045 * to that of {@code ConcurrentHashMap} in a reusable form, and extends it for semaphores and 046 * read-write locks. Conceptually, lock striping is the technique of dividing a lock into many 047 * <i>stripes</i>, increasing the granularity of a single lock and allowing independent operations 048 * to lock different stripes and proceed concurrently, instead of creating contention for a single 049 * lock. 050 * 051 * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore), 052 * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} (assuming 053 * {@link Object#hashCode()} is correctly implemented for the keys). Note that if {@code key1} is 054 * <strong>not</strong> equal to {@code key2}, it is <strong>not</strong> guaranteed that 055 * {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless be mapped to the 056 * same lock. The lower the number of stripes, the higher the probability of this happening. 057 * 058 * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>}, and 059 * {@code Striped<ReadWriteLock>}. For each type, two implementations are offered: 060 * {@linkplain #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak} {@code Striped<Lock>}, 061 * {@linkplain #semaphore(int, int) strong} and {@linkplain #lazyWeakSemaphore(int, int) weak} 062 * {@code Striped<Semaphore>}, and {@linkplain #readWriteLock(int) strong} and 063 * {@linkplain #lazyWeakReadWriteLock(int) weak} {@code Striped<ReadWriteLock>}. <i>Strong</i> means 064 * that all stripes (locks/semaphores) are initialized eagerly, and are not reclaimed unless 065 * {@code Striped} itself is reclaimable. <i>Weak</i> means that locks/semaphores are created 066 * lazily, and they are allowed to be reclaimed if nobody is holding on to them. This is useful, for 067 * example, if one wants to create a {@code 068 * Striped<Lock>} of many locks, but worries that in most cases only a small portion of these would 069 * be in use. 070 * 071 * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K} 072 * represents the task. This maximizes concurrency by having each unique key mapped to a unique 073 * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock for 074 * all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of choosing 075 * either of these extremes, {@code Striped} allows the user to trade between required concurrency 076 * and memory footprint. For example, if a set of tasks are CPU-bound, one could easily create a 077 * very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes, instead of 078 * possibly thousands of locks which could be created in a {@code Map<K, Lock>} structure. 079 * 080 * @author Dimitris Andreou 081 * @since 13.0 082 */ 083@Beta 084@GwtIncompatible 085public abstract class Striped<L> { 086 /** 087 * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be 088 * smaller than a large array. (This assumes that in the lazy case, most stripes are unused. As 089 * always, if many stripes are in use, a non-lazy striped makes more sense.) 090 */ 091 private static final int LARGE_LAZY_CUTOFF = 1024; 092 093 private Striped() {} 094 095 /** 096 * Returns the stripe that corresponds to the passed key. It is always guaranteed that if 097 * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}. 098 * 099 * @param key an arbitrary, non-null key 100 * @return the stripe that the passed key corresponds to 101 */ 102 public abstract L get(Object key); 103 104 /** 105 * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to {@code size()}, 106 * exclusively. 107 * 108 * @param index the index of the stripe to return; must be in {@code [0...size())} 109 * @return the stripe at the specified index 110 */ 111 public abstract L getAt(int index); 112 113 /** 114 * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key). 115 */ 116 abstract int indexFor(Object key); 117 118 /** 119 * Returns the total number of stripes in this instance. 120 */ 121 public abstract int size(); 122 123 /** 124 * Returns the stripes that correspond to the passed objects, in ascending (as per 125 * {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned by this 126 * method are guaranteed to not deadlock each other. 127 * 128 * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and 129 * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number of 130 * shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays are 131 * needed for a pair of them to match). Please consider carefully the implications of the number 132 * of stripes, the intended concurrency level, and the typical number of keys used in a 133 * {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls 134 * in Bins model</a> for mathematical formulas that can be used to estimate the probability of 135 * collisions. 136 * 137 * @param keys arbitrary non-null keys 138 * @return the stripes corresponding to the objects (one per each object, derived by delegating to 139 * {@link #get(Object)}; may contain duplicates), in an increasing index order. 140 */ 141 public Iterable<L> bulkGet(Iterable<?> keys) { 142 // Initially using the array to store the keys, then reusing it to store the respective L's 143 final Object[] array = Iterables.toArray(keys, Object.class); 144 if (array.length == 0) { 145 return ImmutableList.of(); 146 } 147 int[] stripes = new int[array.length]; 148 for (int i = 0; i < array.length; i++) { 149 stripes[i] = indexFor(array[i]); 150 } 151 Arrays.sort(stripes); 152 // optimize for runs of identical stripes 153 int previousStripe = stripes[0]; 154 array[0] = getAt(previousStripe); 155 for (int i = 1; i < array.length; i++) { 156 int currentStripe = stripes[i]; 157 if (currentStripe == previousStripe) { 158 array[i] = array[i - 1]; 159 } else { 160 array[i] = getAt(currentStripe); 161 previousStripe = currentStripe; 162 } 163 } 164 /* 165 * Note that the returned Iterable holds references to the returned stripes, to avoid 166 * error-prone code like: 167 * 168 * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)' 169 * Iterable<Lock> locks = stripedLock.bulkGet(keys); 170 * for (Lock lock : locks) { 171 * lock.lock(); 172 * } 173 * operation(); 174 * for (Lock lock : locks) { 175 * lock.unlock(); 176 * } 177 * 178 * If we only held the int[] stripes, translating it on the fly to L's, the original locks might 179 * be garbage collected after locking them, ending up in a huge mess. 180 */ 181 @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's 182 List<L> asList = (List<L>) Arrays.asList(array); 183 return Collections.unmodifiableList(asList); 184 } 185 186 // Static factories 187 188 /** 189 * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks. Every lock 190 * is reentrant. 191 * 192 * @param stripes the minimum number of stripes (locks) required 193 * @return a new {@code Striped<Lock>} 194 */ 195 public static Striped<Lock> lock(int stripes) { 196 return new CompactStriped<Lock>( 197 stripes, 198 new Supplier<Lock>() { 199 @Override 200 public Lock get() { 201 return new PaddedLock(); 202 } 203 }); 204 } 205 206 /** 207 * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks. Every lock is 208 * reentrant. 209 * 210 * @param stripes the minimum number of stripes (locks) required 211 * @return a new {@code Striped<Lock>} 212 */ 213 public static Striped<Lock> lazyWeakLock(int stripes) { 214 return lazy( 215 stripes, 216 new Supplier<Lock>() { 217 @Override 218 public Lock get() { 219 return new ReentrantLock(false); 220 } 221 }); 222 } 223 224 private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) { 225 return stripes < LARGE_LAZY_CUTOFF 226 ? new SmallLazyStriped<L>(stripes, supplier) 227 : new LargeLazyStriped<L>(stripes, supplier); 228 } 229 230 /** 231 * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores, 232 * with the specified number of permits. 233 * 234 * @param stripes the minimum number of stripes (semaphores) required 235 * @param permits the number of permits in each semaphore 236 * @return a new {@code Striped<Semaphore>} 237 */ 238 public static Striped<Semaphore> semaphore(int stripes, final int permits) { 239 return new CompactStriped<Semaphore>( 240 stripes, 241 new Supplier<Semaphore>() { 242 @Override 243 public Semaphore get() { 244 return new PaddedSemaphore(permits); 245 } 246 }); 247 } 248 249 /** 250 * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores, 251 * with the specified number of permits. 252 * 253 * @param stripes the minimum number of stripes (semaphores) required 254 * @param permits the number of permits in each semaphore 255 * @return a new {@code Striped<Semaphore>} 256 */ 257 public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) { 258 return lazy( 259 stripes, 260 new Supplier<Semaphore>() { 261 @Override 262 public Semaphore get() { 263 return new Semaphore(permits, false); 264 } 265 }); 266 } 267 268 /** 269 * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced 270 * read-write locks. Every lock is reentrant. 271 * 272 * @param stripes the minimum number of stripes (locks) required 273 * @return a new {@code Striped<ReadWriteLock>} 274 */ 275 public static Striped<ReadWriteLock> readWriteLock(int stripes) { 276 return new CompactStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER); 277 } 278 279 /** 280 * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced read-write 281 * locks. Every lock is reentrant. 282 * 283 * @param stripes the minimum number of stripes (locks) required 284 * @return a new {@code Striped<ReadWriteLock>} 285 */ 286 public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) { 287 return lazy(stripes, READ_WRITE_LOCK_SUPPLIER); 288 } 289 290 // ReentrantReadWriteLock is large enough to make padding probably unnecessary 291 private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER = 292 new Supplier<ReadWriteLock>() { 293 @Override 294 public ReadWriteLock get() { 295 return new ReentrantReadWriteLock(); 296 } 297 }; 298 299 private abstract static class PowerOfTwoStriped<L> extends Striped<L> { 300 /** Capacity (power of two) minus one, for fast mod evaluation */ 301 final int mask; 302 303 PowerOfTwoStriped(int stripes) { 304 Preconditions.checkArgument(stripes > 0, "Stripes must be positive"); 305 this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1; 306 } 307 308 @Override 309 final int indexFor(Object key) { 310 int hash = smear(key.hashCode()); 311 return hash & mask; 312 } 313 314 @Override 315 public final L get(Object key) { 316 return getAt(indexFor(key)); 317 } 318 } 319 320 /** 321 * Implementation of Striped where 2^k stripes are represented as an array of the same length, 322 * eagerly initialized. 323 */ 324 private static class CompactStriped<L> extends PowerOfTwoStriped<L> { 325 /** Size is a power of two. */ 326 private final Object[] array; 327 328 private CompactStriped(int stripes, Supplier<L> supplier) { 329 super(stripes); 330 Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)"); 331 332 this.array = new Object[mask + 1]; 333 for (int i = 0; i < array.length; i++) { 334 array[i] = supplier.get(); 335 } 336 } 337 338 @SuppressWarnings("unchecked") // we only put L's in the array 339 @Override 340 public L getAt(int index) { 341 return (L) array[index]; 342 } 343 344 @Override 345 public int size() { 346 return array.length; 347 } 348 } 349 350 /** 351 * Implementation of Striped where up to 2^k stripes can be represented, using an 352 * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the 353 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. 354 */ 355 @VisibleForTesting 356 static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> { 357 final AtomicReferenceArray<ArrayReference<? extends L>> locks; 358 final Supplier<L> supplier; 359 final int size; 360 final ReferenceQueue<L> queue = new ReferenceQueue<L>(); 361 362 SmallLazyStriped(int stripes, Supplier<L> supplier) { 363 super(stripes); 364 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; 365 this.locks = new AtomicReferenceArray<ArrayReference<? extends L>>(size); 366 this.supplier = supplier; 367 } 368 369 @Override 370 public L getAt(int index) { 371 if (size != Integer.MAX_VALUE) { 372 Preconditions.checkElementIndex(index, size()); 373 } // else no check necessary, all index values are valid 374 ArrayReference<? extends L> existingRef = locks.get(index); 375 L existing = existingRef == null ? null : existingRef.get(); 376 if (existing != null) { 377 return existing; 378 } 379 L created = supplier.get(); 380 ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue); 381 while (!locks.compareAndSet(index, existingRef, newRef)) { 382 // we raced, we need to re-read and try again 383 existingRef = locks.get(index); 384 existing = existingRef == null ? null : existingRef.get(); 385 if (existing != null) { 386 return existing; 387 } 388 } 389 drainQueue(); 390 return created; 391 } 392 393 // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references 394 // in the array. We could skip this if we decide we don't care about holding on to Reference 395 // objects indefinitely. 396 private void drainQueue() { 397 Reference<? extends L> ref; 398 while ((ref = queue.poll()) != null) { 399 // We only ever register ArrayReferences with the queue so this is always safe. 400 ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref; 401 // Try to clear out the array slot, n.b. if we fail that is fine, in either case the 402 // arrayRef will be out of the array after this step. 403 locks.compareAndSet(arrayRef.index, arrayRef, null); 404 } 405 } 406 407 @Override 408 public int size() { 409 return size; 410 } 411 412 private static final class ArrayReference<L> extends WeakReference<L> { 413 final int index; 414 415 ArrayReference(L referent, int index, ReferenceQueue<L> queue) { 416 super(referent, queue); 417 this.index = index; 418 } 419 } 420 } 421 422 /** 423 * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap 424 * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the 425 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. 426 */ 427 @VisibleForTesting 428 static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> { 429 final ConcurrentMap<Integer, L> locks; 430 final Supplier<L> supplier; 431 final int size; 432 433 LargeLazyStriped(int stripes, Supplier<L> supplier) { 434 super(stripes); 435 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; 436 this.supplier = supplier; 437 this.locks = new MapMaker().weakValues().makeMap(); 438 } 439 440 @Override 441 public L getAt(int index) { 442 if (size != Integer.MAX_VALUE) { 443 Preconditions.checkElementIndex(index, size()); 444 } // else no check necessary, all index values are valid 445 L existing = locks.get(index); 446 if (existing != null) { 447 return existing; 448 } 449 L created = supplier.get(); 450 existing = locks.putIfAbsent(index, created); 451 return MoreObjects.firstNonNull(existing, created); 452 } 453 454 @Override 455 public int size() { 456 return size; 457 } 458 } 459 460 /** 461 * A bit mask were all bits are set. 462 */ 463 private static final int ALL_SET = ~0; 464 465 private static int ceilToPowerOfTwo(int x) { 466 return 1 << IntMath.log2(x, RoundingMode.CEILING); 467 } 468 469 /* 470 * This method was written by Doug Lea with assistance from members of JCP JSR-166 Expert Group 471 * and released to the public domain, as explained at 472 * http://creativecommons.org/licenses/publicdomain 473 * 474 * As of 2010/06/11, this method is identical to the (package private) hash method in OpenJDK 7's 475 * java.util.HashMap class. 476 */ 477 // Copied from java/com/google/common/collect/Hashing.java 478 private static int smear(int hashCode) { 479 hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); 480 return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); 481 } 482 483 private static class PaddedLock extends ReentrantLock { 484 /* 485 * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add a fourth 486 * long here, to minimize chance of interference between consecutive locks, but I couldn't 487 * observe any benefit from that. 488 */ 489 long unused1; 490 long unused2; 491 long unused3; 492 493 PaddedLock() { 494 super(false); 495 } 496 } 497 498 private static class PaddedSemaphore extends Semaphore { 499 // See PaddedReentrantLock comment 500 long unused1; 501 long unused2; 502 long unused3; 503 504 PaddedSemaphore(int permits) { 505 super(permits, false); 506 } 507 } 508}