• java.util.concurrent.locks包下的ReentrantReadWriteLock的用法
  • 发布于 2个月前
  • 158 热度
    0 评论

今天来介绍一下java.util.concurrent.locks包下的ReentrantReadWriteLock。顾名思义,它是读写锁的一种,同一时间,读操作可以有多个线程,但是写操 作只能有一个线程,并且读和写不能同时进行;读锁被占用,那么写锁就不能获取,反过来也一样。之前学过的互斥锁,比如ReentrantLock、synchronized,在任何时候都只允许一个线程访问共享资源,这在读操作远多于写操作的场景下,显得效率很低,因为即使多个读操作之间并不冲突,它们也必须排队等待。


ReentrantReadWriteLock的诞生,正好可以解决这个问题。它通过分离读锁和写锁,使得并发读成为可能。下面通过阅读源码的方式,来看看大佬是如何设计这个读写锁的。


读写状态的设计

我们知道,基于 AQS 的锁实现,内部都是通过一个int类型的state变量,来维护锁的状态。Java 中,int有 32 位,而ReentrantReadWriteLock就是利用这些位来分别表示读锁和写锁的持有情况,这种设计被称为按位切割使用。读写锁将变量切分为两个部分,高16位表示读,低16位表示写,通过位运算来快速确定,读和写各自的状态。


写锁的获取和释放
写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁,则增加写状态;如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程, 则当前线程进入等待状态。
看tryAcquire这个方法:
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // 堆代码 duidaima.com
        //存在读锁,或者当前线程不是已经获取写锁的线程
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

这里之所以要判断是否存在读锁,因为读写锁要确保写锁的操作对读锁可见。如果允许读锁在已被获取的情况下,还获取写锁,那么正在运行的其他读线程,就没办法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问都会被阻塞。


至于写锁的释放,跟ReentrantLock的释放过程基本类似,每次释放都会减少写状态,当写状态为0 时,表示写锁已被释放,等待的读写线程能够继续访问读写锁,同时之前写线程的修改,对后续读写线程可见。


读锁的获取
读锁的获取通过tryAcquireShared()方法执行,与写锁不同,只要没有写锁被持有,就可以允许多个读锁同时存在。该方法会检查当前状态,确保没有写锁且读锁计数未达到最大限制(防止整型溢出)。成功获取后,会增加读锁的重入计数。
读锁的获取方法,代码如下:
protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
                rh.count++;
            }
        return 1;
    }
    return fullTryAcquireShared(current);
}

这个方法有个很奇怪的点,入参并没有被使用,资料说是占位符。获取读锁的主要逻辑:如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态;如果当前线程获取了写锁,或者写锁未被获取,则当前线程增加读状态(CAS 操作),成功获取读锁。


读锁的释放
直接看tryReleaseShared方法的代码:
protected final boolean tryReleaseShared(int unused) {
    // 堆代码 duidaima.com
    Thread current = Thread.currentThread();
    if (firstReader == current) {
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove();
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
根据当前线程是否首次获取读锁,分别减少读锁的获取计数;然后通过 CAS 操作来释放读锁,因为可能会有多个线程同时释放读锁。
今天的分享到这里结束了。
用户评论