博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java多线程系列--“JUC锁”08之 共享锁和ReentrantReadWriteLock
阅读量:5745 次
发布时间:2019-06-18

本文共 15971 字,大约阅读时间需要 53 分钟。

ReadWriteLock 和 ReentrantReadWriteLock介绍

ReadWriteLock,顾名思义,是读写锁。它维护了一对相关的锁 — — “读取锁”和“写入锁”,一个用于读取操作,另一个用于写入操作。

“读取锁”用于只读操作,它是“共享锁”,能同时被多个线程获取。
“写入锁”用于写入操作,它是“独占锁”,写入锁只能被一个线程锁获取。
注意:不能同时存在读取锁和写入锁!可以"读/读",但不能"读/写"、"写/写"
ReadWriteLock是一个接口。ReentrantReadWriteLock是它的实现类,ReentrantReadWriteLock包括子类ReadLock和WriteLock。

 

ReadWriteLock 和 ReentrantReadWriteLock函数列表

ReadWriteLock函数列表

// 返回用于读取操作的锁。Lock readLock()// 返回用于写入操作的锁。Lock writeLock()

 

ReentrantReadWriteLock函数列表

// 创建一个新的 ReentrantReadWriteLock,默认是采用“非公平策略”。ReentrantReadWriteLock()// 创建一个新的 ReentrantReadWriteLock,fair是“公平策略”。fair为true,意味着公平策略;否则,意味着非公平策略。ReentrantReadWriteLock(boolean fair)// 返回当前拥有写入锁的线程,如果没有这样的线程,则返回 null。protected Thread getOwner()// 返回一个 collection,它包含可能正在等待获取读取锁的线程。protected Collection
getQueuedReaderThreads()// 返回一个 collection,它包含可能正在等待获取读取或写入锁的线程。protected Collection
getQueuedThreads()// 返回一个 collection,它包含可能正在等待获取写入锁的线程。protected Collection
getQueuedWriterThreads()// 返回等待获取读取或写入锁的线程估计数目。int getQueueLength()// 查询当前线程在此锁上保持的重入读取锁数量。int getReadHoldCount()// 查询为此锁保持的读取锁数量。int getReadLockCount()// 返回一个 collection,它包含可能正在等待与写入锁相关的给定条件的那些线程。protected Collection
getWaitingThreads(Condition condition)// 返回正等待与写入锁相关的给定条件的线程估计数目。int getWaitQueueLength(Condition condition)// 查询当前线程在此锁上保持的重入写入锁数量。int getWriteHoldCount()// 查询是否给定线程正在等待获取读取或写入锁。boolean hasQueuedThread(Thread thread)// 查询是否所有的线程正在等待获取读取或写入锁。boolean hasQueuedThreads()// 查询是否有些线程正在等待与写入锁有关的给定条件。boolean hasWaiters(Condition condition)// 如果此锁将公平性设置为 ture,则返回 true。boolean isFair()// 查询是否某个线程保持了写入锁。boolean isWriteLocked()// 查询当前线程是否保持了写入锁。boolean isWriteLockedByCurrentThread()// 返回用于读取操作的锁。ReentrantReadWriteLock.ReadLock readLock()// 返回用于写入操作的锁。ReentrantReadWriteLock.WriteLock writeLock()

 

ReentrantReadWriteLock数据结构

ReentrantReadWriteLock的UML类图如下:

从中可以看出:

(01) ReentrantReadWriteLock实现了ReadWriteLock接口。ReadWriteLock是一个读写锁的接口,提供了"获取读锁的readLock()函数" 和 "获取写锁的writeLock()函数"。

(02) ReentrantReadWriteLock中包含:sync对象,读锁readerLock和写锁writerLock。读锁ReadLock和写锁WriteLock都实现了Lock接口。读锁ReadLock和写锁WriteLock中也都分别包含了"Sync对象",它们的Sync对象和ReentrantReadWriteLock的Sync对象 是一样的,就是通过sync,读锁和写锁实现了对同一个对象的访问。
(03) 和"ReentrantLock"一样,sync是Sync类型;而且,Sync也是一个继承于AQS的抽象类。Sync也包括"公平锁"FairSync和"非公平锁"NonfairSync。sync对象是"FairSync"和"NonfairSync"中的一个,默认是"NonfairSync"。

 

参考代码(基于JDK1.7.0_40)

ReentrantReadWriteLock的完整源码

 
View Code

 

AQS的完整源码

 
View Code

 

其中,共享锁源码相关的代码如下:

public static class ReadLock implements Lock, java.io.Serializable {    private static final long serialVersionUID = -5992448646407690164L;    // ReentrantReadWriteLock的AQS对象    private final Sync sync;    protected ReadLock(ReentrantReadWriteLock lock) {        sync = lock.sync;    }    // 获取“共享锁”    public void lock() {        sync.acquireShared(1);    }    // 如果线程是中断状态,则抛出一场,否则尝试获取共享锁。    public void lockInterruptibly() throws InterruptedException {        sync.acquireSharedInterruptibly(1);    }    // 尝试获取“共享锁”    public  boolean tryLock() {        return sync.tryReadLock();    }    // 在指定时间内,尝试获取“共享锁”    public boolean tryLock(long timeout, TimeUnit unit)            throws InterruptedException {        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));    }    // 释放“共享锁”    public  void unlock() {        sync.releaseShared(1);    }    // 新建条件    public Condition newCondition() {        throw new UnsupportedOperationException();    }    public String toString() {        int r = sync.getReadLockCount();        return super.toString() +            "[Read locks = " + r + "]";    }}

说明

ReadLock中的sync是一个Sync对象,Sync继承于AQS类,即Sync就是一个锁。ReentrantReadWriteLock中也有一个Sync对象,而且ReadLock中的sync和ReentrantReadWriteLock中的sync是对应关系。即ReentrantReadWriteLock和ReadLock共享同一个AQS对象,共享同一把锁。
ReentrantReadWriteLock中Sync的定义如下:

final Sync sync;

下面,分别从“获取共享锁”和“释放共享锁”两个方面对共享锁进行说明。

 

获取共享锁

获取共享锁的思想(即lock函数的步骤),是先通过tryAcquireShared()尝试获取共享锁。尝试成功的话,则直接返回;尝试失败的话,则通过doAcquireShared()不断的循环并尝试获取锁,若有需要,则阻塞等待。doAcquireShared()在循环中每次尝试获取锁时,都是通过tryAcquireShared()来进行尝试的。下面看看“获取共享锁”的详细流程。

1. lock()

lock()在ReadLock中,源码如下:

public void lock() {    sync.acquireShared(1);}

 

2. acquireShared()

Sync继承于AQS,acquireShared()定义在AQS中。源码如下:

public final void acquireShared(int arg) {    if (tryAcquireShared(arg) < 0)        doAcquireShared(arg);}

说明:acquireShared()首先会通过tryAcquireShared()来尝试获取锁。

尝试成功的话,则不再做任何动作(因为已经成功获取到锁了)。
尝试失败的话,则通过doAcquireShared()来获取锁。doAcquireShared()会获取到锁了才返回。

 

3. tryAcquireShared()

tryAcquireShared()定义在ReentrantReadWriteLock.java的Sync中,源码如下:

protected final int tryAcquireShared(int unused) {    Thread current = Thread.currentThread();    // 获取“锁”的状态    int c = getState();    // 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。    if (exclusiveCount(c) != 0 &&        getExclusiveOwnerThread() != current)        return -1;    // 获取“读取锁”的共享计数    int r = sharedCount(c);    // 如果“不需要阻塞等待”,并且“读取锁”的共享计数小于MAX_COUNT;    // 则通过CAS函数更新“锁的状态”,将“读取锁”的共享计数+1。    if (!readerShouldBlock() &&        r < MAX_COUNT &&        compareAndSetState(c, c + SHARED_UNIT)) {        // 第1次获取“读取锁”。        if (r == 0) {             firstReader = current;            firstReaderHoldCount = 1;        // 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程        } else if (firstReader == current) {             firstReaderHoldCount++;        } else {            // HoldCounter是用来统计该线程获取“读取锁”的次数。            HoldCounter rh = cachedHoldCounter;            if (rh == null || rh.tid != current.getId())                cachedHoldCounter = rh = readHolds.get();            else if (rh.count == 0)                readHolds.set(rh);            // 将该线程获取“读取锁”的次数+1。            rh.count++;        }        return 1;    }    return fullTryAcquireShared(current);}

说明:tryAcquireShared()的作用是尝试获取“共享锁”。

如果在尝试获取锁时,“不需要阻塞等待”并且“读取锁的共享计数小于MAX_COUNT”,则直接通过CAS函数更新“读取锁的共享计数”,以及将“当前线程获取读取锁的次数+1”。
否则,通过fullTryAcquireShared()获取读取锁。

 

4. fullTryAcquireShared()

fullTryAcquireShared()在ReentrantReadWriteLock中定义,源码如下:

final int fullTryAcquireShared(Thread current) {    HoldCounter rh = null;    for (;;) {        // 获取“锁”的状态        int c = getState();        // 如果“锁”是“互斥锁”,并且获取锁的线程不是current线程;则返回-1。        if (exclusiveCount(c) != 0) {            if (getExclusiveOwnerThread() != current)                return -1;        // 如果“需要阻塞等待”。        // (01) 当“需要阻塞等待”的线程是第1个获取锁的线程的话,则继续往下执行。        // (02) 当“需要阻塞等待”的线程获取锁的次数=0时,则返回-1。        } else if (readerShouldBlock()) {            // 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程            if (firstReader == current) {            } else {                if (rh == null) {                    rh = cachedHoldCounter;                    if (rh == null || rh.tid != current.getId()) {                        rh = readHolds.get();                        if (rh.count == 0)                            readHolds.remove();                    }                }                // 如果当前线程获取锁的计数=0,则返回-1。                if (rh.count == 0)                    return -1;            }        }        // 如果“不需要阻塞等待”,则获取“读取锁”的共享统计数;        // 如果共享统计数超过MAX_COUNT,则抛出异常。        if (sharedCount(c) == MAX_COUNT)            throw new Error("Maximum lock count exceeded");        // 将线程获取“读取锁”的次数+1。        if (compareAndSetState(c, c + SHARED_UNIT)) {            // 如果是第1次获取“读取锁”,则更新firstReader和firstReaderHoldCount。            if (sharedCount(c) == 0) {                firstReader = current;                firstReaderHoldCount = 1;            // 如果想要获取锁的线程(current)是第1个获取锁(firstReader)的线程,            // 则将firstReaderHoldCount+1。            } else if (firstReader == current) {                firstReaderHoldCount++;            } else {                if (rh == null)                    rh = cachedHoldCounter;                if (rh == null || rh.tid != current.getId())                    rh = readHolds.get();                else if (rh.count == 0)                    readHolds.set(rh);                // 更新线程的获取“读取锁”的共享计数                rh.count++;                cachedHoldCounter = rh; // cache for release            }            return 1;        }    }}

说明:fullTryAcquireShared()会根据“是否需要阻塞等待”,“读取锁的共享计数是否超过限制”等等进行处理。如果不需要阻塞等待,并且锁的共享计数没有超过限制,则通过CAS尝试获取锁,并返回1。

 

5. doAcquireShared()

doAcquireShared()定义在AQS函数中,源码如下:

private void doAcquireShared(int arg) {    // addWaiter(Node.SHARED)的作用是,创建“当前线程”对应的节点,并将该线程添加到CLH队列中。    final Node node = addWaiter(Node.SHARED);    boolean failed = true;    try {        boolean interrupted = false;        for (;;) {            // 获取“node”的前一节点            final Node p = node.predecessor();            // 如果“当前线程”是CLH队列的表头,则尝试获取共享锁。            if (p == head) {                int r = tryAcquireShared(arg);                if (r >= 0) {                    setHeadAndPropagate(node, r);                    p.next = null; // help GC                    if (interrupted)                        selfInterrupt();                    failed = false;                    return;                }            }            // 如果“当前线程”不是CLH队列的表头,则通过shouldParkAfterFailedAcquire()判断是否需要等待,            // 需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。若阻塞等待过程中,线程被中断过,则设置interrupted为true。            if (shouldParkAfterFailedAcquire(p, node) &&                parkAndCheckInterrupt())                interrupted = true;        }    } finally {        if (failed)            cancelAcquire(node);    }}

说明:doAcquireShared()的作用是获取共享锁。

它会首先创建线程对应的CLH队列的节点,然后将该节点添加到CLH队列中。CLH队列是管理获取锁的等待线程的队列。
如果“当前线程”是CLH队列的表头,则尝试获取共享锁;否则,则需要通过shouldParkAfterFailedAcquire()判断是否阻塞等待,需要的话,则通过parkAndCheckInterrupt()进行阻塞等待。
doAcquireShared()会通过for循环,不断的进行上面的操作;目的就是获取共享锁。需要注意的是:doAcquireShared()在每一次尝试获取锁时,是通过tryAcquireShared()来执行的!

shouldParkAfterFailedAcquire(), parkAndCheckInterrupt()等函数已经在“ ”中详细介绍过,这里就不再重复说明了。

 

释放共享锁

释放共享锁的思想,是先通过tryReleaseShared()尝试释放共享锁。尝试成功的话,则通过doReleaseShared()唤醒“其他等待获取共享锁的线程”,并返回true;否则的话,返回flase。

1. unlock()

public  void unlock() {    sync.releaseShared(1);}

说明:该函数实际上调用releaseShared(1)释放共享锁。

 

2. releaseShared()

releaseShared()在AQS中实现,源码如下:

public final boolean releaseShared(int arg) {    if (tryReleaseShared(arg)) {        doReleaseShared();        return true;    }    return false;}

说明:releaseShared()的目的是让当前线程释放它所持有的共享锁。

它首先会通过tryReleaseShared()去尝试释放共享锁。尝试成功,则直接返回;尝试失败,则通过doReleaseShared()去释放共享锁。

 

3. tryReleaseShared()

tryReleaseShared()定义在ReentrantReadWriteLock中,源码如下:

protected final boolean tryReleaseShared(int unused) {    // 获取当前线程,即释放共享锁的线程。    Thread current = Thread.currentThread();    // 如果想要释放锁的线程(current)是第1个获取锁(firstReader)的线程,    // 并且“第1个获取锁的线程获取锁的次数”=1,则设置firstReader为null;    // 否则,将“第1个获取锁的线程的获取次数”-1。    if (firstReader == current) {        // assert firstReaderHoldCount > 0;        if (firstReaderHoldCount == 1)            firstReader = null;        else            firstReaderHoldCount--;    // 获取rh对象,并更新“当前线程获取锁的信息”。    } else {         HoldCounter rh = cachedHoldCounter;        if (rh == null || rh.tid != current.getId())            rh = readHolds.get();        int count = rh.count;        if (count <= 1) {            readHolds.remove();            if (count <= 0)                throw unmatchedUnlockException();        }        --rh.count;    }    for (;;) {        // 获取锁的状态        int c = getState();        // 将锁的获取次数-1。        int nextc = c - SHARED_UNIT;        // 通过CAS更新锁的状态。        if (compareAndSetState(c, nextc))            return nextc == 0;    }}

说明:tryReleaseShared()的作用是尝试释放共享锁。

 

4. doReleaseShared()

doReleaseShared()定义在AQS中,源码如下:

private void doReleaseShared() {    for (;;) {        // 获取CLH队列的头节点        Node h = head;        // 如果头节点不为null,并且头节点不等于tail节点。        if (h != null && h != tail) {            // 获取头节点对应的线程的状态            int ws = h.waitStatus;            // 如果头节点对应的线程是SIGNAL状态,则意味着“头节点的下一个节点所对应的线程”需要被unpark唤醒。            if (ws == Node.SIGNAL) {                // 设置“头节点对应的线程状态”为空状态。失败的话,则继续循环。                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))                    continue;                // 唤醒“头节点的下一个节点所对应的线程”。                unparkSuccessor(h);            }            // 如果头节点对应的线程是空状态,则设置“文件点对应的线程所拥有的共享锁”为其它线程获取锁的空状态。            else if (ws == 0 &&                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))                continue;                // loop on failed CAS        }        // 如果头节点发生变化,则继续循环。否则,退出循环。        if (h == head)                   // loop if head changed            break;    }}

说明:doReleaseShared()会释放“共享锁”。它会从前往后的遍历CLH队列,依次“唤醒”然后“执行”队列中每个节点对应的线程;最终的目的是让这些线程释放它们所持有的锁。

 

公平共享锁和非公平共享锁

和互斥锁ReentrantLock一样,ReadLock也分为公平锁和非公平锁。

公平锁和非公平锁的区别,体现在判断是否需要阻塞的函数readerShouldBlock()是不同的。

公平锁的readerShouldBlock()的源码如下:

final boolean readerShouldBlock() {    return hasQueuedPredecessors();}

 

在公平共享锁中,如果在当前线程的前面有其他线程在等待获取共享锁,则返回true;否则,返回false。

非公平锁的readerShouldBlock()的源码如下:

final boolean readerShouldBlock() {    return apparentlyFirstQueuedIsExclusive();}

在非公平共享锁中,它会无视当前线程的前面是否有其他线程在等待获取共享锁。只要该非公平共享锁对应的线程不为null,则返回true。

 

ReentrantReadWriteLock示例

package lock.demo9;import java.util.concurrent.locks.ReadWriteLock;import java.util.concurrent.locks.ReentrantReadWriteLock;public class ReadWriteLockTest1 {    public static void main(String[] args) {        // 创建账户        MyCount myCount = new MyCount("4238920615242830", 10000);        // 创建用户,并指定账户        User user = new User("Tommy", myCount);        // 分别启动3个“读取账户金钱”的线程 和 3个“设置账户金钱”的线程        for (int i = 0; i < 3; i++) {            user.getCash();            user.setCash((i + 1) * 1000);        }    }}class User {    private String name; // 用户名    private MyCount myCount; // 所要操作的账户    private ReadWriteLock myLock; // 执行操作所需的锁对象    User(String name, MyCount myCount) {        this.name = name;        this.myCount = myCount;        this.myLock = new ReentrantReadWriteLock();    }    public void getCash() {        new Thread() {            public void run() {                myLock.readLock().lock();                try {                    System.out.println(Thread.currentThread().getName() + " getCash start");                    myCount.getCash();                    Thread.sleep(1);                    System.out.println(Thread.currentThread().getName() + " getCash end");                } catch (InterruptedException e) {                } finally {                    myLock.readLock().unlock();                }            }        }.start();    }    public void setCash(final int cash) {        new Thread() {            public void run() {                myLock.writeLock().lock();                try {                    System.out.println(Thread.currentThread().getName() + " setCash start");                    myCount.setCash(cash);                    Thread.sleep(1);                    System.out.println(Thread.currentThread().getName() + " setCash end");                } catch (InterruptedException e) {                } finally {                    myLock.writeLock().unlock();                }            }        }.start();    }}class MyCount {    private String id; // 账号    private int cash; // 账户余额    MyCount(String id, int cash) {        this.id = id;        this.cash = cash;    }    public String getId() {        return id;    }    public void setId(String id) {        this.id = id;    }    public int getCash() {        System.out.println(Thread.currentThread().getName() + " getCash cash=" + cash);        return cash;    }    public void setCash(int cash) {        System.out.println(Thread.currentThread().getName() + " setCash cash=" + cash);        this.cash = cash;    }}

运行结果

Thread-0 getCash startThread-2 getCash startThread-0 getCash cash=10000Thread-2 getCash cash=10000Thread-0 getCash endThread-2 getCash endThread-1 setCash startThread-1 setCash cash=1000Thread-1 setCash endThread-3 setCash startThread-3 setCash cash=2000Thread-3 setCash endThread-4 getCash startThread-4 getCash cash=2000Thread-4 getCash endThread-5 setCash startThread-5 setCash cash=3000Thread-5 setCash end

结果说明

(01) 观察Thread0和Thread-2的运行结果,我们发现,Thread-0启动并获取到“读取锁”,在它还没运行完毕的时候,Thread-2也启动了并且也成功获取到“读取锁”。
因此,“读取锁”支持被多个线程同时获取。
(02) 观察Thread-1,Thread-3,Thread-5这三个“写入锁”的线程。只要“写入锁”被某线程获取,则该线程运行完毕了,才释放该锁。
因此,“写入锁”不支持被多个线程同时获取。

转载地址:http://cxxzx.baihongyu.com/

你可能感兴趣的文章
Python3迭代器和生成器
查看>>
EF 通过DataAnnotations配置属性和类型
查看>>
梯度下降法_最速下降法
查看>>
一些linux命令
查看>>
Cocos2d-x之实现动作的反转
查看>>
Win7下的flutter环境安装配置
查看>>
怎样安装最新版的VMware workstation 14 pro虚拟机
查看>>
Linux-监控与安全运维之zabbix
查看>>
asp.net面试集合
查看>>
webpack4.5.0+vue2.5.16+vue-loader 实战组件化开发案例以及版本问题中踩的一些大坑!!!...
查看>>
Extjs 3 ipad drag drop
查看>>
NPTL LinuxThreads
查看>>
在linux下安装配置rabbitMQ详细教程
查看>>
winform应用程序更新 带进度条
查看>>
JSP 内置对象(上)
查看>>
Codeforces Round #238 (Div. 1) 解题报告
查看>>
两表更新:根据条件从一个表里面查询出符合条件的结果更新另一个表
查看>>
saltstack之(六)配置管理state
查看>>
Mac OS下新建文本文档
查看>>
Android数据存储---SQLite
查看>>