Java多线程并发ReentrantReadWriteLock源码分析
本篇内容主要讲解“Java多线程并发ReentrantReadWriteLock源码分析”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“Java多线程并发ReentrantReadWriteLock源码分析”吧!
ReadWriteLock
ReentrantReadWriteLock 是基于 AbstractQueuedSynchronizer 并实现了 ReadWriteLock 接口实现的一个锁机制。ReadWriteLock 定义了读写锁的特性:
public interface ReadWriteLock { /** * Returns the lock used for reading. */ Lock readLock(); /** * Returns the lock used for writing. */ Lock writeLock(); }
ReadWriteLock 中定义了获取两种锁的方式,一个用于获取读锁、一个用于获取写锁。只要没有持有写锁的线程在执行,读锁可以同时被多个尝试读操作的线程持有,而写锁是排他锁。
与互斥锁相比,读写锁在访问共享数据时允许更高级的并发特性,即每次只有一个线程可以执行写操作,并且在没有写操作时其他线程可以并发读取共享数据。从读操作的效率来看,如果是互斥锁每次只能一个线程执行读写操作,而读写锁可以多个线程读,写操作时才互斥,所以读写锁的执行效率更高。
ReentrantReadWriteLock 源码分析
前面的内容介绍了读写锁的含义和优势,接下来分析 Java 并发包中对它的实现 ReentrantReadWriteLock 。
类关系
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable { abstract static class Sync extends AbstractQueuedSynchronizer { static final class HoldCounter static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> } static final class NonfairSync extends Sync static final class FairSync extends Sync public static class ReadLock implements Lock, java.io.Serializable public static class WriteLock implements Lock, java.io.Serializable }
ReentrantReadWriteLock 实现了读写锁接口 ReadWriteLock 和序列化接口 Serializable 。
它有一个抽象静态内部类 Sync ,Sync 是 AQS 的抽象子类,Sync 有两个静态实现 NonfairSync 和 FairSync ,这部分是锁逻辑的核心内容;Sync 还有两个内部数据结构类 HoldCounter 和 ThreadLocalHoldCounter 。
ReadLock 和 WriteLock 分别对应了读锁和写锁,它们都实现了 Lock 接口和序列号接口 Serializable 。它们是 ReentrantReadWriteLock 中对不同操作的锁类型的实现,使用了装饰模式,本质上还是通过 Sync 的能力实现的。
Sync
核心逻辑是来自于 Sync 及其两个实现,Sync 继承自 AbstractQueuedSynchronizer ,自身有两个内部类 HoldCounter 和 ThreadLocalHoldCounter 。
HoldCounter
static final class HoldCounter { int count; // initially 0 // Use id, not reference, to avoid garbage retention final long tid = LockSupport.getThreadId(Thread.currentThread()); }
HoldCounter 是一个计数器,count
用来记录当前线程拥有读锁的数量,即读锁的重入次数;tid
用来记录当前线程唯一 ID 。
Sync 有一个 cachedHoldCounter
属性,用来做缓存效果,避免每次都通过 ThreadLocal 去读取数据。
ThreadLocalHoldCounter
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
ThreadLocalHoldCounter 重写了 ThreadLocal 的 initialValue()
,在 ThreadLocal 没有进行过 set 数据的情况下,默认读取到的值都来自于这个方法,也就是配合 ThreadLocal 使用,默认值返回一个新的 HoldCounter 实例。
在 Sync 中,有一个属性 readHolds
,它的类型是 ThreadLocalHoldCounter ,用来做当前线程读锁重入计数器的 ThreadLocal 包装,便于线程读取自己的读锁重入计数器。
属性
Sync 中定义的属性包括:
abstract static class Sync extends AbstractQueuedSynchronizer { // 高16位为读锁,低16位为写锁 static final int SHARED_SHIFT = 16; // 读锁单位 static final int SHARED_UNIT = (1 << SHARED_SHIFT); // 1 * 2^16 = 65536 // 读锁最大数量 static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1; // 2^16 - 1 // 写锁最大数量 static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; // 2^16 - 1 独占标记 // 当前线程读锁重入次数。当持有读锁的线程数量下降到0时删除。 private transient ThreadLocalHoldCounter readHolds; // 缓存对象,避免每次都去从 ThreadLocal 查找。 private transient HoldCounter cachedHoldCounter; // 第一个获取读锁线程 private transient Thread firstReader; // 第一个读锁线程重入读锁的计数 private transient int firstReaderHoldCount; // ... }
构造方法
Sync() { readHolds = new ThreadLocalHoldCounter(); setState(getState()); // ensures visibility of readHolds }
Sync 初始化方法创建了 ThreadLocalHoldCounter 并重新设置了 State ,为什么要重新设置呢?因为这里要读取当前线程最新的同步状态并重新设置,获取实时的同步状态。
核心方法
Sync 的关键方法包括:
abstract static class Sync extends AbstractQueuedSynchronizer { // 并发计数 static int sharedCount(int c) static int exclusiveCount(int c) // 阻塞检查 abstract boolean readerShouldBlock(); abstract boolean writerShouldBlock(); // 获取和释放写锁 @ReservedStackAccess protected final boolean tryRelease(int releases) @ReservedStackAccess protected final boolean tryAcquire(int acquires) // 获取和释放读锁 @ReservedStackAccess protected final boolean tryReleaseShared(int unused) @ReservedStackAccess protected final int tryAcquireShared(int unused) final int fullTryAcquireShared(Thread current) // 尝试加读写锁 @ReservedStackAccess final boolean tryWriteLock() @ReservedStackAccess final boolean tryReadLock() // ... }
锁的计数方法
首先是两个静态方法 sharedCount(int c)
和 exclusiveCount(int c)
:
/** 表示共享持有的数量。 */ static int sharedCount(int c) { return c >>> SHARED_SHIFT; } // 无符号右移,高位补 0 /** 表示独占持有的数量。 */ static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
参数 c 是 AQS 中的 state,根据 state 进行位运算。这两个方法可以根据锁自身的状态解析出持有读写锁的数量。
sharedCount
,表示占有读锁的线程数量。直接将 AQS 中的 state 右移 16 位,高位补 0,就可以得到读锁的线程数量,因为 state 的高十六位表示读锁,对应的低十六位表示写锁数量。exclusiveCount
,表示占有写锁的线程数量。直接将 AQS 的 state 和 (2^16 - 1) 做与运算,其等效于将 state 模上 2^16 。写锁数量由 state 的低十六位表示。
读写锁阻塞检查方法
第二组方法是 readerShouldBlock
和 writerShouldBlock
,用来检查当前的读锁/写锁是否会造成当前线程阻塞。
// 获取和释放对公平锁和非公平锁使用相同的代码,不同点在于但在队列非空时是否/如何允许碰撞。 // 如果当前线程在尝试获取读锁时,并且在其他符合条件的线程也在尝试获取读锁,由于策略其他等待线程占用了读锁,当前线程应该阻塞,则返回true。 abstract boolean readerShouldBlock(); // 如果当前线程在尝试获取写锁时,并且在其他符合条件的线程也在尝试获取写锁,由于策略其他等待线程占用了写锁,当前线程应该阻塞,则返回true。 abstract boolean writerShouldBlock();
这两个方法的实现在 Sync 的子类中 -- 公平策略实现 FairSync 和非公平策略实现 NonfairSync。
公平策略实现 FairSync 和非公平策略实现 NonfairSync
// 非公平策略 static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L; final boolean writerShouldBlock() { return false; // 正在持有写锁的线程永不阻塞 } final boolean readerShouldBlock() { return apparentlyFirstQueuedIsExclusive(); } } // 公平策略 static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L; final boolean writerShouldBlock() { return hasQueuedPredecessors(); } final boolean readerShouldBlock() { return hasQueuedPredecessors(); } }
公平锁策略和非公平锁策略的实现,本质上的不同是这两个方法的实现。
NonfairSync 非公平策略
NonfairSync 中,执行写操作的线程是否应该进入阻塞状态的判断,直接是 false ,这是因为非公平策略下,如果当前自身已经拥有了写锁,直接重入,以独占的方式继续运行(所以是不公平的)。
执行读操作的线程是否会阻塞,是通过 apparentlyFirstQueuedIsExclusive()
判断的,这个方法是 AQS 中的方法:
final boolean apparentlyFirstQueuedIsExclusive() { Node h = head, s = head.next; return h != null && s != null && !(s instanceof SharedNode) && s.waiter != null; }
这个方法的作用是,CLH 队列中的头节点和它的的 next 都存在的情况下,如果 next 节点不是 SharedNode ,且它的关联线程不为空的情况(即下一个锁不是共享锁,共享锁在读写锁里就是读锁)的情况,会导致当前执行读操作的线程进入阻塞状态,确保写操作的互斥特性。
FairSync 公平策略
FairSync 中,读写执行线程是否应该进入阻塞状态都是根据 hasQueuedPredecessors()
方法判断的:
public final boolean hasQueuedPredecessors() { Thread first = null; Node h = head, s = h.next; if (h != null && (s == null || (first = s.waiter) == null || s.prev == null)) first = getFirstQueuedThread(); // retry via getFirstQueuedThread return first != null && first != Thread.currentThread(); } public final Thread getFirstQueuedThread() { Thread first = null, w; Node h, s; if ((h = head) != null && ((s = h.next) == null || (first = s.waiter) == null || s.prev == null)) { // traverse from tail on stale reads for (Node p = tail, q; p != null && (q = p.prev) != null; p = q) if ((w = p.waiter) != null) first = w; } return first; }
hasQueuedPredecessors()
对 head 节点和它的 next 节点进行空检查,并检查下一个节点的执行线程和 prev 指针是否有值,满足条件的情况下通过 getFirstQueuedThread()
方法获取到队列中第一个节点关联的线程。最终返回的结过是检查这个线程不等于当前线程。
如果存在等待队列第一个等待执行的线程,那么就优先执行这个线程。也就是说,不管当前线程是拥有读锁还是写锁,都优先执行等待队列第一个未执行节点,这里就能体现出公平,即优先执行等待队列中头一个等待的节点所关联的线程。
Release 和 Acquire 方法组
这一组方法是整个 Sync 的核心逻辑,也是加解锁核心逻辑。
tryRelease
:
@ReservedStackAccess protected final boolean tryRelease(int releases) { if (!isHeldExclusively()) // 不是独占持有锁的情况,直接抛出异常。 throw new IllegalMonitorStateException(); int nextc = getState() - releases; // AQS 当前锁状态 - releases = 新的锁状态 boolean free = exclusiveCount(nextc) == 0; // 根据新的锁状态获取到独占写锁的数量 == 0 if (free) setExclusiveOwnerThread(null); // 持有写锁的线程数为0,更新当前独占线程引用 setState(nextc); // 无论是不是解锁了,都要更新锁状态 return free; // 最后返回锁是否已经可用了 }
tryRelease(int releases)
用来尝试释放写锁。
它的逻辑如下图:
tryAcquire
:
@ReservedStackAccess protected final boolean tryAcquire(int acquires) { /* * 工作流程: * 1. 如果写锁计数非零或所有者是不同的线程,则失败。 * 2. 如果写锁计数超过最大数量,失败(这只发生在计数非 0 的情况)。 * 3. 否则,如果这个线程是可重入的获取方式或者队列策略允许的话,它就有资格获得锁。 * 如果是,更新状态并设置 owner。 */ Thread current = Thread.currentThread(); // 当前线程 int c = getState(); // 当前锁状态 int w = exclusiveCount(c); // 计算拥有写锁的线程数量 if (c != 0) { // 0 是锁可用状态,当前状态表面锁状态为被持有。 if (w == 0 || current != getExclusiveOwnerThread()) // 对应 【1】 的情况,写线程数量为0或者当前线程没有占有独占资源 return false; if (w + exclusiveCount(acquires) > MAX_COUNT) // 对应【2】的情况, 判断是否超过最高写线程数量 throw new Error("Maximum lock count exceeded"); // 重入获取写锁 setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) // 是否应该阻塞或更新状态是否成功,失败直接 return false; return false; setExclusiveOwnerThread(current); // 设置当前为持有锁的线程。 return true; }
此函数用于获取写锁,首先会获取 state ,判断 state 是否为0。
若为0,表示此时没有读锁线程,再判断写线程是否应该被阻塞,而在非公平策略下总是不会被阻塞,在公平策略下会进行判断(判断同步队列中是否有等待时间更长的线程,若存在,则需要被阻塞,否则,无需阻塞),之后在设置状态state,然后返回true。若state不为0,则表示此时存在读锁或写锁线程,若写锁线程数量为0或者当前线程为独占锁线程,则返回false,表示不成功,否则,判断写锁线程的重入次数是否大于了最大值,若是,则抛出异常,否则,设置状态state,返回true,表示成功。
其函数流程图如下:
tryReleaseShared
:
@ReservedStackAccess protected final boolean tryReleaseShared(int unused) { Thread current = Thread.currentThread(); // 当前线程 if (firstReader == current) { // 当前线程是否是第一个读线程 // assert firstReaderHoldCount > 0; if (firstReaderHoldCount == 1) firstReader = null; // 释放线程引用 else firstReaderHoldCount--; // 当前线程重入次数自减 } else { HoldCounter rh = cachedHoldCounter; // 获取当前线程的重入读锁的次数 if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1) { readHolds.remove(); if (count <= 0) throw unmatchedUnlockException(); } --rh.count; } // 死循环直到更新状态成功 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) // Releasing the read lock has no effect on readers, // but it may allow waiting writers to proceed if // both read and write locks are now free. return nextc == 0; } }
tryAcquireShared
:
@ReservedStackAccess protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) // 当独占线程不是当前线程 return -1; int r = sharedCount(c); // 共享读锁的线程数量 // 检查读线程不应该阻塞 and 持有读锁的线程数量小于 MAX_COUNT and 更新锁状态成功 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { // 第一个尝试获取读锁的线程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 第一个线程重入 firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; // 无缓存 or 当前线程不是计数器所在线程 if (rh == null || rh.tid != LockSupport.getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); // 从 ThreadLocal 中读取 else if (rh.count == 0) readHolds.set(rh); rh.count++; // 当前线程获取读锁次数 + 1 } return 1; } return fullTryAcquireShared(current); }
最后执行到了 fullTryAcquireShared
:
final int fullTryAcquireShared(Thread current) { /* * 这段代码与 tryAcquireShared 中的部分代码是冗余的,但总体上更简单,因为它不会使 * tryAcquireShared 在重试和懒加载读锁计数之间的交互复杂化。 */ HoldCounter rh = null; for (;;) { // 死循环,不断尝试 int c = getState(); if (exclusiveCount(c) != 0) { // 独占检查是否是当前线程 if (getExclusiveOwnerThread() != current) return -1; // 否则我们持有独占锁;这里的阻塞将导致死锁。 } else if (readerShouldBlock()) { // 确保我们不是重入式地获取读锁 if (firstReader == current) { // assert firstReaderHoldCount > 0; } else { // 不是重入的情况下,更新 HoldCounter if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0) readHolds.remove(); } } if (rh.count == 0) return -1; } } // 共享读锁 == 最大数量,抛出异常 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 是否能够设置成功 if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { // 第一个线程 firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { // 重入 firstReaderHoldCount++; } else { // 其他情况 if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != LockSupport.getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; // cache for release } return 1; } } }
这个方法的整体逻辑与 tryAcquireShared 基本相同。
ReadLock
public static class ReadLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -5992448646407690164L; private final Sync sync; protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquireShared(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public boolean tryLock() { return sync.tryReadLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.releaseShared(1); } public Condition newCondition() { throw new UnsupportedOperationException(); } // ... }
ReadLock 实现了 Lock 接口,代理调用到逻辑都是 Sync 中 Shared 组的核心方法。ReadLock 可以通过 readLock(): ReadLock
方法获取到。
还有一点值得注意,newCondition()
方法直接抛出了异常,这是因为读锁是一种共享锁,不会导致互斥,所以也就不支持使用 Condition 控制阻塞与唤醒。
WriteLock
public static class WriteLock implements Lock, java.io.Serializable { private static final long serialVersionUID = -4992448646407690164L; private final Sync sync; protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } public void lock() { sync.acquire(1); } public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } public boolean tryLock() { return sync.tryWriteLock(); } public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); } public void unlock() { sync.release(1); } public Condition newCondition() { return sync.newCondition(); } public String toString() { Thread o = sync.getOwner(); return super.toString() + ((o == null) ? "[Unlocked]" : "[Locked by thread " + o.getName() + "]"); } public boolean isHeldByCurrentThread() { return sync.isHeldExclusively(); } public int getHoldCount() { return sync.getWriteHoldCount(); } }
写锁本质上也是代理 Sync 中的核心方法。
读写锁降级
锁降级指的是写锁降级为读锁,如果当前线程拥有写锁,将其释放然后再获取读锁,这种操作过程不是锁降级。锁降级是指把线程当前持有写锁,再去获取读锁,随后释放写锁,这个流程称为锁降级。
public void processData() { readLock.lock(); if (!update) { // 必须先释放读锁 readLock.unlock(); // 锁降级从写锁获取到开始 writeLock.lock(); try { if (!update) { // 准备数据的流程(略) update = true; } readLock.lock(); } finally { writeLock.unlock(); } // 锁降级完成,写锁降级为读锁 } try { // 使用数据的流程(略) } finally { readLock.unlock(); } }
锁降级可以保证数据的可见性,如果再持有写锁的情况下,不先去获取读锁,直接释放写锁,再尝试获取读锁,这一系列操作中会有短暂的无锁状态,此时如果有其他线程获取了写锁并修改数据,那么当前线程就无法感知到数据更新,如果当前线程先获取了读锁,那么其他线程就会阻塞,直到当前线程释放读锁后才能获取写锁进行更新。
读写锁 ReentrantReadWriteLock 不支持锁升级,目的是保证数据的可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁,并更新了数据,那么这个更新对其他线程是不可见的,容易造成数据不一致问题。
到此,相信大家对“Java多线程并发ReentrantReadWriteLock源码分析”有了更深的了解,不妨来实际操作一番吧!这里是蜗牛博客网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:niceseo99@gmail.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
评论