001/* 002 * Copyright (C) 2011 The Guava Authors 003 * 004 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except 005 * in compliance with the License. You may obtain a copy of the License at 006 * 007 * http://www.apache.org/licenses/LICENSE-2.0 008 * 009 * Unless required by applicable law or agreed to in writing, software distributed under the License 010 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express 011 * or implied. See the License for the specific language governing permissions and limitations under 012 * the License. 013 */ 014 015package com.google.common.util.concurrent; 016 017import com.google.common.annotations.Beta; 018import com.google.common.annotations.GwtIncompatible; 019import com.google.common.annotations.VisibleForTesting; 020import com.google.common.base.MoreObjects; 021import com.google.common.base.Preconditions; 022import com.google.common.base.Supplier; 023import com.google.common.collect.ImmutableList; 024import com.google.common.collect.Iterables; 025import com.google.common.collect.MapMaker; 026import com.google.common.math.IntMath; 027import com.google.common.primitives.Ints; 028import java.lang.ref.Reference; 029import java.lang.ref.ReferenceQueue; 030import java.lang.ref.WeakReference; 031import java.math.RoundingMode; 032import java.util.Arrays; 033import java.util.Collections; 034import java.util.List; 035import java.util.concurrent.ConcurrentMap; 036import java.util.concurrent.Semaphore; 037import java.util.concurrent.atomic.AtomicReferenceArray; 038import java.util.concurrent.locks.Condition; 039import java.util.concurrent.locks.Lock; 040import java.util.concurrent.locks.ReadWriteLock; 041import java.util.concurrent.locks.ReentrantLock; 042import java.util.concurrent.locks.ReentrantReadWriteLock; 043 044/** 045 * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping similar 046 * to that of {@code ConcurrentHashMap} in a reusable form, and extends it for semaphores and 047 * read-write locks. Conceptually, lock striping is the technique of dividing a lock into many 048 * <i>stripes</i>, increasing the granularity of a single lock and allowing independent operations 049 * to lock different stripes and proceed concurrently, instead of creating contention for a single 050 * lock. 051 * 052 * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore), 053 * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)} (assuming 054 * {@link Object#hashCode()} is correctly implemented for the keys). Note that if {@code key1} is 055 * <strong>not</strong> equal to {@code key2}, it is <strong>not</strong> guaranteed that {@code 056 * striped.get(key1) != striped.get(key2)}; the elements might nevertheless be mapped to the same 057 * lock. The lower the number of stripes, the higher the probability of this happening. 058 * 059 * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>}, and 060 * {@code Striped<ReadWriteLock>}. For each type, two implementations are offered: {@linkplain 061 * #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak} {@code Striped<Lock>}, {@linkplain 062 * #semaphore(int, int) strong} and {@linkplain #lazyWeakSemaphore(int, int) weak} {@code 063 * Striped<Semaphore>}, and {@linkplain #readWriteLock(int) strong} and {@linkplain 064 * #lazyWeakReadWriteLock(int) weak} {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all 065 * stripes (locks/semaphores) are initialized eagerly, and are not reclaimed unless {@code Striped} 066 * itself is reclaimable. <i>Weak</i> means that locks/semaphores are created lazily, and they are 067 * allowed to be reclaimed if nobody is holding on to them. This is useful, for example, if one 068 * wants to create a {@code Striped<Lock>} of many locks, but worries that in most cases only a 069 * small portion of these would be in use. 070 * 071 * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K} 072 * represents the task. This maximizes concurrency by having each unique key mapped to a unique 073 * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock for 074 * all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of choosing 075 * either of these extremes, {@code Striped} allows the user to trade between required concurrency 076 * and memory footprint. For example, if a set of tasks are CPU-bound, one could easily create a 077 * very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes, instead of 078 * possibly thousands of locks which could be created in a {@code Map<K, Lock>} structure. 079 * 080 * @author Dimitris Andreou 081 * @since 13.0 082 */ 083@Beta 084@GwtIncompatible 085public abstract class Striped<L> { 086 /** 087 * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be 088 * smaller than a large array. (This assumes that in the lazy case, most stripes are unused. As 089 * always, if many stripes are in use, a non-lazy striped makes more sense.) 090 */ 091 private static final int LARGE_LAZY_CUTOFF = 1024; 092 093 private Striped() {} 094 095 /** 096 * Returns the stripe that corresponds to the passed key. It is always guaranteed that if {@code 097 * key1.equals(key2)}, then {@code get(key1) == get(key2)}. 098 * 099 * @param key an arbitrary, non-null key 100 * @return the stripe that the passed key corresponds to 101 */ 102 public abstract L get(Object key); 103 104 /** 105 * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to {@code size()}, 106 * exclusively. 107 * 108 * @param index the index of the stripe to return; must be in {@code [0...size())} 109 * @return the stripe at the specified index 110 */ 111 public abstract L getAt(int index); 112 113 /** 114 * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key). 115 */ 116 abstract int indexFor(Object key); 117 118 /** Returns the total number of stripes in this instance. */ 119 public abstract int size(); 120 121 /** 122 * Returns the stripes that correspond to the passed objects, in ascending (as per {@link 123 * #getAt(int)}) order. Thus, threads that use the stripes in the order returned by this method 124 * are guaranteed to not deadlock each other. 125 * 126 * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and {@code 127 * bulkGet(keys)} with a relative large number of keys can cause an excessive number of shared 128 * stripes (much like the birthday paradox, where much fewer than anticipated birthdays are needed 129 * for a pair of them to match). Please consider carefully the implications of the number of 130 * stripes, the intended concurrency level, and the typical number of keys used in a {@code 131 * bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls in 132 * Bins model</a> for mathematical formulas that can be used to estimate the probability of 133 * collisions. 134 * 135 * @param keys arbitrary non-null keys 136 * @return the stripes corresponding to the objects (one per each object, derived by delegating to 137 * {@link #get(Object)}; may contain duplicates), in an increasing index order. 138 */ 139 public Iterable<L> bulkGet(Iterable<?> keys) { 140 // Initially using the array to store the keys, then reusing it to store the respective L's 141 final Object[] array = Iterables.toArray(keys, Object.class); 142 if (array.length == 0) { 143 return ImmutableList.of(); 144 } 145 int[] stripes = new int[array.length]; 146 for (int i = 0; i < array.length; i++) { 147 stripes[i] = indexFor(array[i]); 148 } 149 Arrays.sort(stripes); 150 // optimize for runs of identical stripes 151 int previousStripe = stripes[0]; 152 array[0] = getAt(previousStripe); 153 for (int i = 1; i < array.length; i++) { 154 int currentStripe = stripes[i]; 155 if (currentStripe == previousStripe) { 156 array[i] = array[i - 1]; 157 } else { 158 array[i] = getAt(currentStripe); 159 previousStripe = currentStripe; 160 } 161 } 162 /* 163 * Note that the returned Iterable holds references to the returned stripes, to avoid 164 * error-prone code like: 165 * 166 * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)' 167 * Iterable<Lock> locks = stripedLock.bulkGet(keys); 168 * for (Lock lock : locks) { 169 * lock.lock(); 170 * } 171 * operation(); 172 * for (Lock lock : locks) { 173 * lock.unlock(); 174 * } 175 * 176 * If we only held the int[] stripes, translating it on the fly to L's, the original locks might 177 * be garbage collected after locking them, ending up in a huge mess. 178 */ 179 @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's 180 List<L> asList = (List<L>) Arrays.asList(array); 181 return Collections.unmodifiableList(asList); 182 } 183 184 // Static factories 185 186 /** 187 * Creates a {@code Striped<L>} with eagerly initialized, strongly referenced locks. Every lock is 188 * obtained from the passed supplier. 189 * 190 * @param stripes the minimum number of stripes (locks) required 191 * @param supplier a {@code Supplier<L>} object to obtain locks from 192 * @return a new {@code Striped<L>} 193 */ 194 static <L> Striped<L> custom(int stripes, Supplier<L> supplier) { 195 return new CompactStriped<>(stripes, supplier); 196 } 197 198 /** 199 * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks. Every lock 200 * is reentrant. 201 * 202 * @param stripes the minimum number of stripes (locks) required 203 * @return a new {@code Striped<Lock>} 204 */ 205 public static Striped<Lock> lock(int stripes) { 206 return custom( 207 stripes, 208 new Supplier<Lock>() { 209 @Override 210 public Lock get() { 211 return new PaddedLock(); 212 } 213 }); 214 } 215 216 /** 217 * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks. Every lock is 218 * reentrant. 219 * 220 * @param stripes the minimum number of stripes (locks) required 221 * @return a new {@code Striped<Lock>} 222 */ 223 public static Striped<Lock> lazyWeakLock(int stripes) { 224 return lazy( 225 stripes, 226 new Supplier<Lock>() { 227 @Override 228 public Lock get() { 229 return new ReentrantLock(false); 230 } 231 }); 232 } 233 234 private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) { 235 return stripes < LARGE_LAZY_CUTOFF 236 ? new SmallLazyStriped<L>(stripes, supplier) 237 : new LargeLazyStriped<L>(stripes, supplier); 238 } 239 240 /** 241 * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores, 242 * with the specified number of permits. 243 * 244 * @param stripes the minimum number of stripes (semaphores) required 245 * @param permits the number of permits in each semaphore 246 * @return a new {@code Striped<Semaphore>} 247 */ 248 public static Striped<Semaphore> semaphore(int stripes, final int permits) { 249 return custom( 250 stripes, 251 new Supplier<Semaphore>() { 252 @Override 253 public Semaphore get() { 254 return new PaddedSemaphore(permits); 255 } 256 }); 257 } 258 259 /** 260 * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores, 261 * with the specified number of permits. 262 * 263 * @param stripes the minimum number of stripes (semaphores) required 264 * @param permits the number of permits in each semaphore 265 * @return a new {@code Striped<Semaphore>} 266 */ 267 public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) { 268 return lazy( 269 stripes, 270 new Supplier<Semaphore>() { 271 @Override 272 public Semaphore get() { 273 return new Semaphore(permits, false); 274 } 275 }); 276 } 277 278 /** 279 * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced 280 * read-write locks. Every lock is reentrant. 281 * 282 * @param stripes the minimum number of stripes (locks) required 283 * @return a new {@code Striped<ReadWriteLock>} 284 */ 285 public static Striped<ReadWriteLock> readWriteLock(int stripes) { 286 return custom(stripes, READ_WRITE_LOCK_SUPPLIER); 287 } 288 289 /** 290 * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced read-write 291 * locks. Every lock is reentrant. 292 * 293 * @param stripes the minimum number of stripes (locks) required 294 * @return a new {@code Striped<ReadWriteLock>} 295 */ 296 public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) { 297 return lazy(stripes, WEAK_SAFE_READ_WRITE_LOCK_SUPPLIER); 298 } 299 300 private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER = 301 new Supplier<ReadWriteLock>() { 302 @Override 303 public ReadWriteLock get() { 304 return new ReentrantReadWriteLock(); 305 } 306 }; 307 308 private static final Supplier<ReadWriteLock> WEAK_SAFE_READ_WRITE_LOCK_SUPPLIER = 309 new Supplier<ReadWriteLock>() { 310 @Override 311 public ReadWriteLock get() { 312 return new WeakSafeReadWriteLock(); 313 } 314 }; 315 316 /** 317 * ReadWriteLock implementation whose read and write locks retain a reference back to this lock. 318 * Otherwise, a reference to just the read lock or just the write lock would not suffice to ensure 319 * the {@code ReadWriteLock} is retained. 320 */ 321 private static final class WeakSafeReadWriteLock implements ReadWriteLock { 322 private final ReadWriteLock delegate; 323 324 WeakSafeReadWriteLock() { 325 this.delegate = new ReentrantReadWriteLock(); 326 } 327 328 @Override 329 public Lock readLock() { 330 return new WeakSafeLock(delegate.readLock(), this); 331 } 332 333 @Override 334 public Lock writeLock() { 335 return new WeakSafeLock(delegate.writeLock(), this); 336 } 337 } 338 339 /** Lock object that ensures a strong reference is retained to a specified object. */ 340 private static final class WeakSafeLock extends ForwardingLock { 341 private final Lock delegate; 342 343 @SuppressWarnings("unused") 344 private final WeakSafeReadWriteLock strongReference; 345 346 WeakSafeLock(Lock delegate, WeakSafeReadWriteLock strongReference) { 347 this.delegate = delegate; 348 this.strongReference = strongReference; 349 } 350 351 @Override 352 Lock delegate() { 353 return delegate; 354 } 355 356 @Override 357 public Condition newCondition() { 358 return new WeakSafeCondition(delegate.newCondition(), strongReference); 359 } 360 } 361 362 /** Condition object that ensures a strong reference is retained to a specified object. */ 363 private static final class WeakSafeCondition extends ForwardingCondition { 364 private final Condition delegate; 365 366 @SuppressWarnings("unused") 367 private final WeakSafeReadWriteLock strongReference; 368 369 WeakSafeCondition(Condition delegate, WeakSafeReadWriteLock strongReference) { 370 this.delegate = delegate; 371 this.strongReference = strongReference; 372 } 373 374 @Override 375 Condition delegate() { 376 return delegate; 377 } 378 } 379 380 private abstract static class PowerOfTwoStriped<L> extends Striped<L> { 381 /** Capacity (power of two) minus one, for fast mod evaluation */ 382 final int mask; 383 384 PowerOfTwoStriped(int stripes) { 385 Preconditions.checkArgument(stripes > 0, "Stripes must be positive"); 386 this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1; 387 } 388 389 @Override 390 final int indexFor(Object key) { 391 int hash = smear(key.hashCode()); 392 return hash & mask; 393 } 394 395 @Override 396 public final L get(Object key) { 397 return getAt(indexFor(key)); 398 } 399 } 400 401 /** 402 * Implementation of Striped where 2^k stripes are represented as an array of the same length, 403 * eagerly initialized. 404 */ 405 private static class CompactStriped<L> extends PowerOfTwoStriped<L> { 406 /** Size is a power of two. */ 407 private final Object[] array; 408 409 private CompactStriped(int stripes, Supplier<L> supplier) { 410 super(stripes); 411 Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)"); 412 413 this.array = new Object[mask + 1]; 414 for (int i = 0; i < array.length; i++) { 415 array[i] = supplier.get(); 416 } 417 } 418 419 @SuppressWarnings("unchecked") // we only put L's in the array 420 @Override 421 public L getAt(int index) { 422 return (L) array[index]; 423 } 424 425 @Override 426 public int size() { 427 return array.length; 428 } 429 } 430 431 /** 432 * Implementation of Striped where up to 2^k stripes can be represented, using an 433 * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the 434 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. 435 */ 436 @VisibleForTesting 437 static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> { 438 final AtomicReferenceArray<ArrayReference<? extends L>> locks; 439 final Supplier<L> supplier; 440 final int size; 441 final ReferenceQueue<L> queue = new ReferenceQueue<L>(); 442 443 SmallLazyStriped(int stripes, Supplier<L> supplier) { 444 super(stripes); 445 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; 446 this.locks = new AtomicReferenceArray<>(size); 447 this.supplier = supplier; 448 } 449 450 @Override 451 public L getAt(int index) { 452 if (size != Integer.MAX_VALUE) { 453 Preconditions.checkElementIndex(index, size()); 454 } // else no check necessary, all index values are valid 455 ArrayReference<? extends L> existingRef = locks.get(index); 456 L existing = existingRef == null ? null : existingRef.get(); 457 if (existing != null) { 458 return existing; 459 } 460 L created = supplier.get(); 461 ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue); 462 while (!locks.compareAndSet(index, existingRef, newRef)) { 463 // we raced, we need to re-read and try again 464 existingRef = locks.get(index); 465 existing = existingRef == null ? null : existingRef.get(); 466 if (existing != null) { 467 return existing; 468 } 469 } 470 drainQueue(); 471 return created; 472 } 473 474 // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references 475 // in the array. We could skip this if we decide we don't care about holding on to Reference 476 // objects indefinitely. 477 private void drainQueue() { 478 Reference<? extends L> ref; 479 while ((ref = queue.poll()) != null) { 480 // We only ever register ArrayReferences with the queue so this is always safe. 481 ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref; 482 // Try to clear out the array slot, n.b. if we fail that is fine, in either case the 483 // arrayRef will be out of the array after this step. 484 locks.compareAndSet(arrayRef.index, arrayRef, null); 485 } 486 } 487 488 @Override 489 public int size() { 490 return size; 491 } 492 493 private static final class ArrayReference<L> extends WeakReference<L> { 494 final int index; 495 496 ArrayReference(L referent, int index, ReferenceQueue<L> queue) { 497 super(referent, queue); 498 this.index = index; 499 } 500 } 501 } 502 503 /** 504 * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap 505 * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the 506 * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced. 507 */ 508 @VisibleForTesting 509 static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> { 510 final ConcurrentMap<Integer, L> locks; 511 final Supplier<L> supplier; 512 final int size; 513 514 LargeLazyStriped(int stripes, Supplier<L> supplier) { 515 super(stripes); 516 this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1; 517 this.supplier = supplier; 518 this.locks = new MapMaker().weakValues().makeMap(); 519 } 520 521 @Override 522 public L getAt(int index) { 523 if (size != Integer.MAX_VALUE) { 524 Preconditions.checkElementIndex(index, size()); 525 } // else no check necessary, all index values are valid 526 L existing = locks.get(index); 527 if (existing != null) { 528 return existing; 529 } 530 L created = supplier.get(); 531 existing = locks.putIfAbsent(index, created); 532 return MoreObjects.firstNonNull(existing, created); 533 } 534 535 @Override 536 public int size() { 537 return size; 538 } 539 } 540 541 /** A bit mask were all bits are set. */ 542 private static final int ALL_SET = ~0; 543 544 private static int ceilToPowerOfTwo(int x) { 545 return 1 << IntMath.log2(x, RoundingMode.CEILING); 546 } 547 548 /* 549 * This method was written by Doug Lea with assistance from members of JCP JSR-166 Expert Group 550 * and released to the public domain, as explained at 551 * http://creativecommons.org/licenses/publicdomain 552 * 553 * As of 2010/06/11, this method is identical to the (package private) hash method in OpenJDK 7's 554 * java.util.HashMap class. 555 */ 556 // Copied from java/com/google/common/collect/Hashing.java 557 private static int smear(int hashCode) { 558 hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12); 559 return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4); 560 } 561 562 private static class PaddedLock extends ReentrantLock { 563 /* 564 * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add a fourth 565 * long here, to minimize chance of interference between consecutive locks, but I couldn't 566 * observe any benefit from that. 567 */ 568 long unused1; 569 long unused2; 570 long unused3; 571 572 PaddedLock() { 573 super(false); 574 } 575 } 576 577 private static class PaddedSemaphore extends Semaphore { 578 // See PaddedReentrantLock comment 579 long unused1; 580 long unused2; 581 long unused3; 582 583 PaddedSemaphore(int permits) { 584 super(permits, false); 585 } 586 } 587}