Java并發(fā)讀寫(xiě)鎖ReentrantReadWriteLock 使用場(chǎng)景
ReentrantReadWriteLock使用場(chǎng)景
ReentrantReadWriteLock 是 Java 的一種讀寫(xiě)鎖,它允許多個(gè)讀線(xiàn)程同時(shí)訪問(wèn),但只允許一個(gè)寫(xiě)線(xiàn)程訪問(wèn)(會(huì)阻塞所有的讀寫(xiě)線(xiàn)程)。這種鎖的設(shè)計(jì)可以提高性能,特別是在讀操作的數(shù)量遠(yuǎn)遠(yuǎn)超過(guò)寫(xiě)操作的情況下。
在并發(fā)場(chǎng)景中,為了解決線(xiàn)程安全問(wèn)題,我們通常會(huì)使用關(guān)鍵字 synchronized 或者 JUC 包中實(shí)現(xiàn)了 Lock 接口的 ReentrantLock。但它們都是獨(dú)占式獲取鎖,也就是在同一時(shí)刻只有一個(gè)線(xiàn)程能夠獲取鎖。
而在一些業(yè)務(wù)場(chǎng)景中,大部分只是讀數(shù)據(jù),寫(xiě)數(shù)據(jù)很少,如果僅僅是讀數(shù)據(jù)的話(huà)并不會(huì)影響數(shù)據(jù)正確性,而如果在這種業(yè)務(wù)場(chǎng)景下,依然使用獨(dú)占鎖的話(huà),很顯然會(huì)出現(xiàn)性能瓶頸。針對(duì)這種讀多寫(xiě)少的情況,Java 提供了另外一個(gè)實(shí)現(xiàn) Lock 接口的 ReentrantReadWriteLock——讀寫(xiě)鎖。
ReentrantReadWriteLock其實(shí)就是 讀讀并發(fā)、讀寫(xiě)互斥、寫(xiě)寫(xiě)互斥。如果一個(gè)對(duì)象并發(fā)讀的場(chǎng)景大于并發(fā)寫(xiě)的場(chǎng)景,那就可以使用 ReentrantReadWriteLock來(lái)達(dá)到保證線(xiàn)程安全的前提下提高并發(fā)效率。首先,我們先了解一下Doug Lea為我們準(zhǔn)備的兩個(gè)demo。
CachedData
一個(gè)緩存對(duì)象的使用案例,緩存對(duì)象在使用時(shí),一般并發(fā)讀的場(chǎng)景遠(yuǎn)遠(yuǎn)大于并發(fā)寫(xiě)的場(chǎng)景,所以緩存對(duì)象是非常適合使用ReentrantReadWriteLock來(lái)做控制的
class CachedData {
//被緩存的具體對(duì)象
Object data;
//當(dāng)前對(duì)象是否可用,使用volatile來(lái)保證可見(jiàn)性
volatile boolean cacheValid;
//ReentrantReadWriteLock
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
//業(yè)務(wù)處理邏輯
void processCachedData() {
//要讀取數(shù)據(jù)時(shí),先加讀鎖,如果加成功,說(shuō)明此時(shí)沒(méi)有人在并發(fā)寫(xiě)
rwl.readLock().lock();
//拿到讀鎖后,判斷當(dāng)前對(duì)象是否有效
if (!cacheValid) {
// Must release read lock before acquiring write lock
//這里的處理非常經(jīng)典,當(dāng)你持有讀鎖之后,不能直接獲取寫(xiě)鎖,
//因?yàn)閷?xiě)鎖是獨(dú)占鎖,如果直接獲取寫(xiě)鎖,那代碼就在這里死鎖了
//所以必須要先釋放讀鎖,然后手動(dòng)獲取寫(xiě)鎖
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
//經(jīng)典處理之二,在獨(dú)占鎖內(nèi)部要處理數(shù)據(jù)時(shí),一定要做二次校驗(yàn)
//因?yàn)榭赡芡瑫r(shí)有多個(gè)線(xiàn)程全都在獲取寫(xiě)鎖,
//當(dāng)時(shí)線(xiàn)程1釋放寫(xiě)鎖之后,線(xiàn)程2馬上獲取到寫(xiě)鎖,此時(shí)如果不做二次校驗(yàn)?zāi)强赡芫蛯?dǎo)致某些操作做了多次
if (!cacheValid) {
data = ...
//當(dāng)緩存對(duì)象更新成功后,重置標(biāo)記為true
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
//這里有一個(gè)非常神奇的鎖降級(jí)操作,所謂降級(jí)是說(shuō)當(dāng)你持有寫(xiě)鎖后,可以再次獲取讀鎖
//這里之所以要獲取一次寫(xiě)鎖是為了防止當(dāng)前線(xiàn)程釋放寫(xiě)鎖之后,其他線(xiàn)程馬上獲取到寫(xiě)鎖,改變緩存對(duì)象
//因?yàn)樽x寫(xiě)互斥,所以有了這個(gè)讀鎖之后,在讀鎖釋放之前,別的線(xiàn)程是無(wú)法修改緩存對(duì)象的
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}RWDictionary
Doug Lea給出的第二個(gè)demo,一個(gè)并發(fā)容器的demo。并發(fā)容器我們一般都是直接使用ConcurrentHashMap的,但是我們可以使用非并發(fā)安全的容器+ReentrantReadWriteLock來(lái)組合出一個(gè)并發(fā)容器。如果這個(gè)并發(fā)容器的讀的頻率>寫(xiě)的頻率,那這個(gè)效率還是不錯(cuò)的
class RWDictionary {
//原來(lái)非并發(fā)安全的容器
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();
public Data get(String key) {
//讀數(shù)據(jù),上讀鎖
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
//讀數(shù)據(jù),上讀鎖
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
//寫(xiě)數(shù)據(jù),上寫(xiě)鎖
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
//寫(xiě)數(shù)據(jù),上寫(xiě)鎖
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}ReentrantReadWriteLock的特性
讀寫(xiě)鎖允許同一時(shí)刻被多個(gè)讀線(xiàn)程訪問(wèn),但是在寫(xiě)線(xiàn)程訪問(wèn)時(shí),所有的讀線(xiàn)程和其他的寫(xiě)線(xiàn)程都會(huì)被阻塞。
在分析 WirteLock 和 ReadLock 的互斥性時(shí),我們可以按照 WriteLock 與 WriteLock,WriteLock 與 ReadLock 以及 ReadLock 與 ReadLock 進(jìn)行對(duì)比分析。
這里總結(jié)一下讀寫(xiě)鎖的特性:
- 公平性選擇:支持非公平性(默認(rèn))和公平的鎖獲取方式,非公平的吞吐量?jī)?yōu)于公平;
- 重入性:支持重入,讀鎖獲取后能再次獲取,寫(xiě)鎖獲取之后能夠再次獲取寫(xiě)鎖,同時(shí)也能夠獲取讀鎖;
- 鎖降級(jí):寫(xiě)鎖降級(jí)是一種允許寫(xiě)鎖轉(zhuǎn)換為讀鎖的過(guò)程。通常的順序是:
- 獲取寫(xiě)鎖:線(xiàn)程首先獲取寫(xiě)鎖,確保在修改數(shù)據(jù)時(shí)排它訪問(wèn)。
- 獲取讀鎖:在寫(xiě)鎖保持的同時(shí),線(xiàn)程可以再次獲取讀鎖。
- 釋放寫(xiě)鎖:線(xiàn)程保持讀鎖的同時(shí)釋放寫(xiě)鎖。
- 釋放讀鎖:最后線(xiàn)程釋放讀鎖。
這樣,寫(xiě)鎖就降級(jí)為讀鎖,允許其他線(xiàn)程進(jìn)行并發(fā)讀取,但仍然排除其他線(xiàn)程的寫(xiě)操作。
接下來(lái)額外說(shuō)一下鎖降級(jí)
- 鎖降級(jí)
鎖降級(jí)指的是寫(xiě)鎖降級(jí)成為讀鎖。如果當(dāng)前線(xiàn)程擁有寫(xiě)鎖,然后將其釋放,最后再獲取讀鎖,這種分段完成的過(guò)程不能稱(chēng)之為鎖降級(jí)。鎖降級(jí)是指把持住(當(dāng)前擁有的)寫(xiě)鎖,再獲取到讀鎖,隨后釋放(先前擁有的)寫(xiě)鎖的過(guò)程。
接下來(lái)看一個(gè)鎖降級(jí)的示例。因?yàn)閿?shù)據(jù)不常變化,所以多個(gè)線(xiàn)程可以并發(fā)地進(jìn)行數(shù)據(jù)處理,當(dāng)數(shù)據(jù)變更后,如果當(dāng)前線(xiàn)程感知到數(shù)據(jù)變化,則進(jìn)行數(shù)據(jù)的準(zhǔn)備工作,同時(shí)其他處理線(xiàn)程被阻塞,直到當(dāng)前線(xiàn)程完成數(shù)據(jù)的準(zhǔn)備工作,如代碼如下所示:
public void processData() {
readLock.lock();
if (!update) {
// 必須先釋放讀鎖
readLock.unlock();
// 鎖降級(jí)從寫(xiě)鎖獲取到開(kāi)始
writeLock.lock();
try {
if (!update) {
// 準(zhǔn)備數(shù)據(jù)的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 鎖降級(jí)完成,寫(xiě)鎖降級(jí)為讀鎖
}
try {
// 使用數(shù)據(jù)的流程(略)
} finally {
readLock.unlock();
}
}上述示例中,當(dāng)數(shù)據(jù)發(fā)生變更后,update變量(布爾類(lèi)型且volatile修飾)被設(shè)置為false,此時(shí)所有訪問(wèn)processData()方法的線(xiàn)程都能夠感知到變化,但只有一個(gè)線(xiàn)程能夠獲取到寫(xiě)鎖,其他線(xiàn)程會(huì)被阻塞在讀鎖和寫(xiě)鎖的lock()方法上。當(dāng)前線(xiàn)程獲取寫(xiě)鎖完成數(shù)據(jù)準(zhǔn)備之后,再獲取讀鎖,隨后釋放寫(xiě)鎖,完成鎖降級(jí)。
鎖降級(jí)中讀鎖的獲取是否必要呢? 答案是必要的。主要是為了保證數(shù)據(jù)的可見(jiàn)性,如果當(dāng)前線(xiàn)程不獲取讀鎖而是直接釋放寫(xiě)鎖,假設(shè)此刻另一個(gè)線(xiàn)程(記作線(xiàn)程T)獲取了寫(xiě)鎖并修改了數(shù)據(jù),那么當(dāng)前線(xiàn)程無(wú)法感知線(xiàn)程T的數(shù)據(jù)更新。如果當(dāng)前線(xiàn)程獲取讀鎖,即遵循鎖降級(jí)的步驟,則線(xiàn)程T將會(huì)被阻塞,直到當(dāng)前線(xiàn)程使用數(shù)據(jù)并釋放讀鎖之后,線(xiàn)程T才能獲取寫(xiě)鎖進(jìn)行數(shù)據(jù)更新。
RentrantReadWriteLock不支持鎖升級(jí)(把持讀鎖、獲取寫(xiě)鎖,最后釋放讀鎖的過(guò)程)。目的也是保證數(shù)據(jù)可見(jiàn)性,如果讀鎖已被多個(gè)線(xiàn)程獲取,其中任意線(xiàn)程成功獲取了寫(xiě)鎖并更新了數(shù)據(jù),則其更新對(duì)其他獲取到讀鎖的線(xiàn)程是不可見(jiàn)的。
ReentrantReadWriteLock源碼分析
類(lèi)的繼承關(guān)系
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {}說(shuō)明: 可以看到,ReentrantReadWriteLock實(shí)現(xiàn)了ReadWriteLock接口,ReadWriteLock接口定義了獲取讀鎖和寫(xiě)鎖的規(guī)范,具體需要實(shí)現(xiàn)類(lèi)去實(shí)現(xiàn);同時(shí)其還實(shí)現(xiàn)了Serializable接口,表示可以進(jìn)行序列化,在源代碼中可以看到ReentrantReadWriteLock實(shí)現(xiàn)了自己的序列化邏輯。
類(lèi)的內(nèi)部類(lèi)
ReentrantReadWriteLock有五個(gè)內(nèi)部類(lèi),五個(gè)內(nèi)部類(lèi)之間也是相互關(guān)聯(lián)的。內(nèi)部類(lèi)的關(guān)系如下圖所示。

說(shuō)明: 如上圖所示,Sync繼承自AQS、NonfairSync繼承自Sync類(lèi)、FairSync繼承自Sync類(lèi);ReadLock實(shí)現(xiàn)了Lock接口、WriteLock也實(shí)現(xiàn)了Lock接口。
內(nèi)部類(lèi) -類(lèi)Sync
- Sync類(lèi)的繼承關(guān)系
abstract static class Sync extends AbstractQueuedSynchronizer {}說(shuō)明: Sync抽象類(lèi)繼承自AQS抽象類(lèi),Sync類(lèi)提供了對(duì)ReentrantReadWriteLock的支持。
- Sync類(lèi)的內(nèi)部類(lèi)
Sync類(lèi)內(nèi)部存在兩個(gè)內(nèi)部類(lèi),分別為HoldCounter和ThreadLocalHoldCounter,其中HoldCounter主要與讀鎖配套使用,其中,HoldCounter源碼如下。
// 計(jì)數(shù)器
static final class HoldCounter {
// 計(jì)數(shù)
int count = 0;
// Use id, not reference, to avoid garbage retention
// 獲取當(dāng)前線(xiàn)程的TID屬性的值
final long tid = getThreadId(Thread.currentThread());
}說(shuō)明: HoldCounter主要有兩個(gè)屬性,count和tid,其中count表示某個(gè)讀線(xiàn)程重入的次數(shù),tid表示該線(xiàn)程的tid字段的值,該字段可以用來(lái)唯一標(biāo)識(shí)一個(gè)線(xiàn)程。ThreadLocalHoldCounter的源碼如下
// 本地線(xiàn)程計(jì)數(shù)器
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
// 重寫(xiě)初始化方法,在沒(méi)有進(jìn)行set的情況下,獲取的都是該HoldCounter值
public HoldCounter initialValue() {
return new HoldCounter();
}
}說(shuō)明: ThreadLocalHoldCounter重寫(xiě)了ThreadLocal的initialValue方法,ThreadLocal類(lèi)可以將線(xiàn)程與對(duì)象相關(guān)聯(lián)。在沒(méi)有進(jìn)行set的情況下,get到的均是initialValue方法里面生成的那個(gè)HolderCounter對(duì)象。
- Sync類(lèi)的屬性
abstract static class Sync extends AbstractQueuedSynchronizer {
// 版本序列號(hào)
private static final long serialVersionUID = 6317671515068378041L;
// 高16位為讀鎖,低16位為寫(xiě)鎖
static final int SHARED_SHIFT = 16;
// 讀鎖單位
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 讀鎖最大數(shù)量
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 寫(xiě)鎖最大數(shù)量
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 本地線(xiàn)程計(jì)數(shù)器
private transient ThreadLocalHoldCounter readHolds;
// 緩存的計(jì)數(shù)器
private transient HoldCounter cachedHoldCounter;
// 第一個(gè)讀線(xiàn)程
private transient Thread firstReader = null;
// 第一個(gè)讀線(xiàn)程的計(jì)數(shù)
private transient int firstReaderHoldCount;
}說(shuō)明: 該屬性中包括了讀鎖、寫(xiě)鎖線(xiàn)程的最大量。本地線(xiàn)程計(jì)數(shù)器等。
- Sync類(lèi)的構(gòu)造函數(shù)
// 構(gòu)造函數(shù)
Sync() {
// 本地線(xiàn)程計(jì)數(shù)器
readHolds = new ThreadLocalHoldCounter();
// 設(shè)置AQS的狀態(tài)
setState(getState()); // ensures visibility of readHolds
}說(shuō)明:在Sync的構(gòu)造函數(shù)中設(shè)置了本地線(xiàn)程計(jì)數(shù)器和AQS的狀態(tài)state。
類(lèi)的屬性
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
// 版本序列號(hào)
private static final long serialVersionUID = -6992448646407690164L;
// 讀鎖
private final ReentrantReadWriteLock.ReadLock readerLock;
// 寫(xiě)鎖
private final ReentrantReadWriteLock.WriteLock writerLock;
// 同步隊(duì)列
final Sync sync;
private static final sun.misc.Unsafe UNSAFE;
// 線(xiàn)程ID的偏移地址
private static final long TID_OFFSET;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> tk = Thread.class;
// 獲取線(xiàn)程的tid字段的內(nèi)存地址
TID_OFFSET = UNSAFE.objectFieldOffset
(tk.getDeclaredField("tid"));
} catch (Exception e) {
throw new Error(e);
}
}
}說(shuō)明: 可以看到ReentrantReadWriteLock屬性包括了一個(gè)ReentrantReadWriteLock.ReadLock對(duì)象,表示讀鎖;一個(gè)ReentrantReadWriteLock.WriteLock對(duì)象,表示寫(xiě)鎖;一個(gè)Sync對(duì)象,表示同步隊(duì)列。
類(lèi)的構(gòu)造函數(shù)
- ReentrantReadWriteLock()型構(gòu)造函數(shù)
public ReentrantReadWriteLock() {
this(false);
}說(shuō)明: 此構(gòu)造函數(shù)會(huì)調(diào)用另外一個(gè)有參構(gòu)造函數(shù)。
- ReentrantReadWriteLock(boolean)型構(gòu)造函數(shù)
public ReentrantReadWriteLock(boolean fair) {
// 公平策略或者是非公平策略
sync = fair ? new FairSync() : new NonfairSync();
// 讀鎖
readerLock = new ReadLock(this);
// 寫(xiě)鎖
writerLock = new WriteLock(this);
}說(shuō)明: 可以指定設(shè)置公平策略或者非公平策略,并且該構(gòu)造函數(shù)中生成了讀鎖與寫(xiě)鎖兩個(gè)對(duì)象。
內(nèi)部類(lèi) - Sync核心函數(shù)分析
對(duì)ReentrantReadWriteLock對(duì)象的操作絕大多數(shù)都轉(zhuǎn)發(fā)至Sync對(duì)象進(jìn)行處理。下面對(duì)Sync類(lèi)中的重點(diǎn)函數(shù)進(jìn)行分析
- sharedCount函數(shù)
表示占有讀鎖的線(xiàn)程數(shù)量,源碼如下
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }說(shuō)明::直接將state右移16位,就可以得到讀鎖的線(xiàn)程數(shù)量,因?yàn)閟tate的高16位表示讀鎖,對(duì)應(yīng)的低十六位表示寫(xiě)鎖數(shù)量。
- exclusiveCount函數(shù)
表示占有寫(xiě)鎖的線(xiàn)程數(shù)量,源碼如下
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }說(shuō)明:
EXCLUSIVE_MASK為:
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
EXCLUSIVE_MASK 為 1 左移 16 位然后減 1,即為 0x0000FFFF。而 exclusiveCount 方法是將同步狀態(tài)(state 為 int 類(lèi)型)與 0x0000FFFF 相與,即取同步狀態(tài)的低 16 位。
那么低 16 位代表什么呢?根據(jù) exclusiveCount 方法的注釋為獨(dú)占式獲取的次數(shù)即寫(xiě)鎖被獲取的次數(shù),現(xiàn)在就可以得出來(lái)一個(gè)結(jié)論同步狀態(tài)的低 16 位用來(lái)表示寫(xiě)鎖的獲取次數(shù)。
寫(xiě)鎖的獲取
同一時(shí)刻,ReentrantReadWriteLock 的寫(xiě)鎖是不能被多個(gè)線(xiàn)程獲取的,很顯然 ReentrantReadWriteLock 的寫(xiě)鎖是獨(dú)占式鎖,而實(shí)現(xiàn)寫(xiě)鎖的同步語(yǔ)義是通過(guò)重寫(xiě) AQS 中的 tryAcquire 方法實(shí)現(xiàn)的。
- tryAcquire函數(shù)
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
// 獲取當(dāng)前線(xiàn)程
Thread current = Thread.currentThread();
// 獲取狀態(tài)
int c = getState();
// 寫(xiě)線(xiàn)程數(shù)量
int w = exclusiveCount(c);
if (c != 0) { // 狀態(tài)不為0
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 寫(xiě)線(xiàn)程數(shù)量為0或者當(dāng)前線(xiàn)程沒(méi)有占有獨(dú)占資源
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 判斷是否超過(guò)最高寫(xiě)線(xiàn)程數(shù)量
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 設(shè)置AQS狀態(tài)
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires)) // 寫(xiě)線(xiàn)程是否應(yīng)該被阻塞
return false;
// 設(shè)置獨(dú)占線(xiàn)程
setExclusiveOwnerThread(current);
return true;
}說(shuō)明: 此函數(shù)用于獲取寫(xiě)鎖:首先會(huì)獲取state,判斷是否為0;
1. 若為0,表示此時(shí)沒(méi)有讀鎖線(xiàn)程,再判斷寫(xiě)線(xiàn)程是否應(yīng)該被阻塞,而在非公平策略下總是不會(huì)被阻塞,在公平策略下會(huì)進(jìn)行判斷(判斷同步隊(duì)列中是否有等待時(shí)間更長(zhǎng)的線(xiàn)程;若存在,則需要被阻塞,否則,無(wú)需阻塞),之后在設(shè)置狀態(tài)state,然后返回true。
2. 若state不為0,則表示此時(shí)存在讀鎖或?qū)戞i線(xiàn)程,若寫(xiě)鎖線(xiàn)程數(shù)量為0或者當(dāng)前線(xiàn)程為獨(dú)占鎖線(xiàn)程,則返回false,表示不成功,否則,判斷寫(xiě)鎖線(xiàn)程的重入次數(shù)是否大于了最大值,若是,則拋出異常,否則,設(shè)置狀態(tài)state,返回true,表示成功。其函數(shù)流程圖如下

其主要邏輯為:當(dāng)讀鎖已經(jīng)被讀線(xiàn)程獲取或者寫(xiě)鎖已經(jīng)被其他寫(xiě)線(xiàn)程獲取,則寫(xiě)鎖獲取失??;否則,獲取成功并支持重入,增加寫(xiě)狀態(tài)。
寫(xiě)鎖的釋放
寫(xiě)鎖釋放通過(guò)重寫(xiě) AQS 的 tryRelease 方法,源碼為:
- tryRelease函數(shù)
/*
* Note that tryRelease and tryAcquire can be called by
* Conditions. So it is possible that their arguments contain
* both read and write holds that are all released during a
* condition wait and re-established in tryAcquire.
*/
protected final boolean tryRelease(int releases) {
// 判斷是否偽獨(dú)占線(xiàn)程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 計(jì)算釋放資源后的寫(xiě)鎖的數(shù)量
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0; // 是否釋放成功
if (free)
setExclusiveOwnerThread(null); // 設(shè)置獨(dú)占線(xiàn)程為空
setState(nextc); // 設(shè)置狀態(tài)
return free;
}說(shuō)明: 此函數(shù)用于釋放寫(xiě)鎖資源,首先會(huì)判斷該線(xiàn)程是否為獨(dú)占線(xiàn)程,若不為獨(dú)占線(xiàn)程,則拋出異常,否則,計(jì)算釋放資源后的寫(xiě)鎖的數(shù)量,若為0,表示成功釋放,資源不將被占用,否則,表示資源還被占用。其函數(shù)流程圖如下。

讀鎖的獲取
看完了寫(xiě)鎖,再來(lái)看看讀鎖,讀鎖不是獨(dú)占式鎖,即同一時(shí)刻該鎖可以被多個(gè)讀線(xiàn)程獲取,也就是一種共享式鎖。按照之前對(duì) AQS 的介紹,實(shí)現(xiàn)共享式同步組件的同步語(yǔ)義需要通過(guò)重寫(xiě) AQS 的 tryAcquireShared 方法和 tryReleaseShared 方法。讀鎖的獲取實(shí)現(xiàn)方法為:
- tryAcquireShared函數(shù)
private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}
// 共享模式下獲取資源
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
// 獲取當(dāng)前線(xiàn)程
Thread current = Thread.currentThread();
// 獲取狀態(tài)
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) // 寫(xiě)線(xiàn)程數(shù)不為0并且占有資源的不是當(dāng)前線(xiàn)程
return -1;
// 讀鎖數(shù)量
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 讀線(xiàn)程是否應(yīng)該被阻塞、并且小于最大值、并且比較設(shè)置成功
if (r == 0) { // 讀鎖數(shù)量為0
// 設(shè)置第一個(gè)讀線(xiàn)程
firstReader = current;
// 讀線(xiàn)程占用的資源數(shù)為1
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 當(dāng)前線(xiàn)程為第一個(gè)讀線(xiàn)程
// 占用資源數(shù)加1
firstReaderHoldCount++;
} else { // 讀鎖數(shù)量不為0并且不為當(dāng)前線(xiàn)程
// 獲取計(jì)數(shù)器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 計(jì)數(shù)器為空或者計(jì)數(shù)器的tid不為當(dāng)前正在運(yùn)行的線(xiàn)程的tid
// 獲取當(dāng)前線(xiàn)程對(duì)應(yīng)的計(jì)數(shù)器
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) // 計(jì)數(shù)為0
// 設(shè)置
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}說(shuō)明: 此函數(shù)表示讀鎖線(xiàn)程獲取讀鎖。首先判斷寫(xiě)鎖是否為0并且當(dāng)前線(xiàn)程不占有獨(dú)占鎖,直接返回;否則,判斷讀線(xiàn)程是否需要被阻塞并且讀鎖數(shù)量是否小于最大值并且比較設(shè)置狀態(tài)成功,若當(dāng)前沒(méi)有讀鎖,則設(shè)置第一個(gè)讀線(xiàn)程firstReader和firstReaderHoldCount;若當(dāng)前線(xiàn)程線(xiàn)程為第一個(gè)讀線(xiàn)程,則增加firstReaderHoldCount;否則,將設(shè)置當(dāng)前線(xiàn)程對(duì)應(yīng)的HoldCounter對(duì)象的值。流程圖如下。

當(dāng)寫(xiě)鎖被其他線(xiàn)程獲取后,讀鎖獲取失敗,否則獲取成功,會(huì)利用 CAS 更新同步狀態(tài)。
另外,當(dāng)前同步狀態(tài)需要加上 SHARED_UNIT((1 << SHARED_SHIFT),即 0x00010000)的原因,我們?cè)谏厦嬉舱f(shuō)過(guò)了,同步狀態(tài)的高 16 位用來(lái)表示讀鎖被獲取的次數(shù)。
如果 CAS 失敗或者已經(jīng)獲取讀鎖的線(xiàn)程再次獲取讀鎖時(shí),是靠 fullTryAcquireShared 方法實(shí)現(xiàn)的。
- fullTryAcquireShared函數(shù)
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) { // 無(wú)限循環(huán)
// 獲取狀態(tài)
int c = getState();
if (exclusiveCount(c) != 0) { // 寫(xiě)線(xiàn)程數(shù)量不為0
if (getExclusiveOwnerThread() != current) // 不為當(dāng)前線(xiàn)程
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) { // 寫(xiě)線(xiàn)程數(shù)量為0并且讀線(xiàn)程被阻塞
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { // 當(dāng)前線(xiàn)程為第一個(gè)讀線(xiàn)程
// assert firstReaderHoldCount > 0;
} else { // 當(dāng)前線(xiàn)程不為第一個(gè)讀線(xiàn)程
if (rh == null) { // 計(jì)數(shù)器不為空
//
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) { // 計(jì)數(shù)器為空或者計(jì)數(shù)器的tid不為當(dāng)前正在運(yùn)行的線(xiàn)程的tid
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT) // 讀鎖數(shù)量為最大值,拋出異常
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 比較并且設(shè)置成功
if (sharedCount(c) == 0) { // 讀線(xiàn)程數(shù)量為0
// 設(shè)置第一個(gè)讀線(xiàn)程
firstReader = current;
//
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}說(shuō)明: 在tryAcquireShared函數(shù)中,如果下列三個(gè)條件不滿(mǎn)足(讀線(xiàn)程是否應(yīng)該被阻塞、小于最大值、比較設(shè)置成功)則會(huì)進(jìn)行fullTryAcquireShared函數(shù)中,它用來(lái)保證相關(guān)操作可以成功。其邏輯與tryAcquireShared邏輯類(lèi)似,可以繼續(xù)再往后看。
讀鎖的釋放
讀鎖釋放的實(shí)現(xiàn)主要通過(guò)方法 tryReleaseShared,源碼如下,主要邏輯請(qǐng)看注釋?zhuān)?/p>
- tryReleaseShared函數(shù)
protected final boolean tryReleaseShared(int unused) {
// 獲取當(dāng)前線(xiàn)程
Thread current = Thread.currentThread();
if (firstReader == current) { // 當(dāng)前線(xiàn)程為第一個(gè)讀線(xiàn)程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1) // 讀線(xiàn)程占用的資源數(shù)為1
firstReader = null;
else // 減少占用的資源
firstReaderHoldCount--;
} else { // 當(dāng)前線(xiàn)程不為第一個(gè)讀線(xiàn)程
// 獲取緩存的計(jì)數(shù)器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 計(jì)數(shù)器為空或者計(jì)數(shù)器的tid不為當(dāng)前正在運(yùn)行的線(xiàn)程的tid
// 獲取當(dāng)前線(xiàn)程對(duì)應(yīng)的計(jì)數(shù)器
rh = readHolds.get();
// 獲取計(jì)數(shù)
int count = rh.count;
if (count <= 1) { // 計(jì)數(shù)小于等于1
// 移除
readHolds.remove();
if (count <= 0) // 計(jì)數(shù)小于等于0,拋出異常
throw unmatchedUnlockException();
}
// 減少計(jì)數(shù)
--rh.count;
}
for (;;) { // 無(wú)限循環(huán)
// 獲取狀態(tài)
int c = getState();
// 獲取狀態(tài)
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) // 比較并進(jìn)行設(shè)置
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}說(shuō)明: 此函數(shù)表示讀鎖線(xiàn)程釋放鎖。首先判斷當(dāng)前線(xiàn)程是否為第一個(gè)讀線(xiàn)程firstReader,若是,則判斷第一個(gè)讀線(xiàn)程占有的資源數(shù)firstReaderHoldCount是否為1,若是,則設(shè)置第一個(gè)讀線(xiàn)程firstReader為空,否則,將第一個(gè)讀線(xiàn)程占有的資源數(shù)firstReaderHoldCount減1;若當(dāng)前線(xiàn)程不是第一個(gè)讀線(xiàn)程,那么首先會(huì)獲取緩存計(jì)數(shù)器(上一個(gè)讀鎖線(xiàn)程對(duì)應(yīng)的計(jì)數(shù)器 ),若計(jì)數(shù)器為空或者tid不等于當(dāng)前線(xiàn)程的tid值,則獲取當(dāng)前線(xiàn)程的計(jì)數(shù)器,如果計(jì)數(shù)器的計(jì)數(shù)count小于等于1,則移除當(dāng)前線(xiàn)程對(duì)應(yīng)的計(jì)數(shù)器,如果計(jì)數(shù)器的計(jì)數(shù)count小于等于0,則拋出異常,之后再減少計(jì)數(shù)即可。無(wú)論何種情況,都會(huì)進(jìn)入無(wú)限循環(huán),該循環(huán)可以確保成功設(shè)置狀態(tài)state。其流程圖如下

鎖降級(jí)
讀寫(xiě)鎖支持鎖降級(jí),遵循按照獲取寫(xiě)鎖,獲取讀鎖再釋放寫(xiě)鎖的次序,寫(xiě)鎖能夠降級(jí)成為讀鎖,不支持鎖升級(jí),關(guān)于鎖降級(jí),下面的示例代碼摘自 ReentrantWriteReadLock 源碼:
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}這里的流程可以解釋如下:
- 獲取讀鎖:首先嘗試獲取讀鎖來(lái)檢查某個(gè)緩存是否有效。
- 檢查緩存:如果緩存無(wú)效,則需要釋放讀鎖,因?yàn)樵讷@取寫(xiě)鎖之前必須釋放讀鎖。
- 獲取寫(xiě)鎖:獲取寫(xiě)鎖以便更新緩存。此時(shí),可能還需要重新檢查緩存狀態(tài),因?yàn)樵卺尫抛x鎖和獲取寫(xiě)鎖之間可能有其他線(xiàn)程修改了狀態(tài)。
- 更新緩存:如果確認(rèn)緩存無(wú)效,更新緩存并將其標(biāo)記為有效。
- 寫(xiě)鎖降級(jí)為讀鎖:在釋放寫(xiě)鎖之前,獲取讀鎖,從而實(shí)現(xiàn)寫(xiě)鎖到讀鎖的降級(jí)。這樣,在釋放寫(xiě)鎖后,其他線(xiàn)程可以并發(fā)讀取,但不能寫(xiě)入。
- 使用數(shù)據(jù):現(xiàn)在可以安全地使用緩存數(shù)據(jù)了。
- 釋放讀鎖:完成操作后釋放讀鎖。
這個(gè)流程結(jié)合了讀鎖和寫(xiě)鎖的優(yōu)點(diǎn),確保了數(shù)據(jù)的一致性和可用性,同時(shí)允許在可能的情況下進(jìn)行并發(fā)讀取。使用讀寫(xiě)鎖的代碼可能看起來(lái)比使用簡(jiǎn)單的互斥鎖更復(fù)雜,但它提供了更精細(xì)的并發(fā)控制,可能會(huì)提高多線(xiàn)程應(yīng)用程序的性能
ReadWriteLock和StampedLock
ReadWriteLock
ReadWriteLock 是Java提供的一個(gè)接口,全類(lèi)名:java.util.concurrent.locks.ReadWriteLock,上面的ReentrantReadWriteLock就是繼承自這個(gè)接口。它允許多個(gè)線(xiàn)程同時(shí)讀取共享資源,但只允許一個(gè)線(xiàn)程寫(xiě)入共享資源。這種機(jī)制可以提高讀取操作的并發(fā)性,但寫(xiě)入操作需要獨(dú)占資源。
特性
- 多個(gè)線(xiàn)程可以同時(shí)獲取讀鎖,但只有一個(gè)線(xiàn)程可以獲取寫(xiě)鎖。
- 當(dāng)一個(gè)線(xiàn)程持有寫(xiě)鎖時(shí),其他線(xiàn)程無(wú)法獲取讀鎖和寫(xiě)鎖,讀寫(xiě)互斥。
- 當(dāng)一個(gè)線(xiàn)程持有讀鎖時(shí),其他線(xiàn)程可以同時(shí)獲取讀鎖,讀讀共享。但無(wú)法獲取寫(xiě)鎖,讀寫(xiě)互斥
使用場(chǎng)景
ReadWriteLock 適用于讀多寫(xiě)少的場(chǎng)景,例如緩存系統(tǒng)、數(shù)據(jù)庫(kù)連接池等。在這些場(chǎng)景中,讀取操作占據(jù)大部分時(shí)間,而寫(xiě)入操作較少。
使用示例
使用 ReadWriteLock 的示例,實(shí)現(xiàn)了一個(gè)簡(jiǎn)單的緩存系統(tǒng):
public class Cache {
private Map<String, Object> data = new HashMap<>();
private ReadWriteLock lock = new ReentrantReadWriteLock();
public Object get(String key) {
lock.readLock().lock();
try {
return data.get(key);
} finally {
lock.readLock().unlock();
}
}
public void put(String key, Object value) {
lock.writeLock().lock();
try {
data.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
}在上述示例中,Cache 類(lèi)使用 ReadWriteLock 來(lái)實(shí)現(xiàn)對(duì) data 的并發(fā)訪問(wèn)控制。get 方法獲取讀鎖并讀取數(shù)據(jù),put 方法獲取寫(xiě)鎖并寫(xiě)入數(shù)據(jù)。
StampedLock
StampedLock 是Java 8 中引入的一種新的鎖機(jī)制,全類(lèi)名:java.util.concurrent.locks.StampedLock,它提供了一種樂(lè)觀讀的機(jī)制,可以進(jìn)一步提升讀取操作的并發(fā)性能。
特性
- 與 ReadWriteLock 類(lèi)似,StampedLock 也支持多個(gè)線(xiàn)程同時(shí)獲取讀鎖,但只允許一個(gè)線(xiàn)程獲取寫(xiě)鎖。
- 與 ReadWriteLock 不同的是,StampedLock 還提供了一個(gè)樂(lè)觀讀鎖(Optimistic Read Lock),即不阻塞其他線(xiàn)程的寫(xiě)操作,但在讀取完成后需要驗(yàn)證數(shù)據(jù)的一致性。
使用場(chǎng)景
StampedLock 適用于讀遠(yuǎn)遠(yuǎn)大于寫(xiě)的場(chǎng)景,并且對(duì)數(shù)據(jù)的一致性要求不高,例如統(tǒng)計(jì)數(shù)據(jù)、監(jiān)控系統(tǒng)等。
使用示例
使用 StampedLock 的示例,實(shí)現(xiàn)了一個(gè)計(jì)數(shù)器:
public class Counter {
private int count = 0;
private StampedLock lock = new StampedLock();
public int getCount() {
long stamp = lock.tryOptimisticRead();
int value = count;
if (!lock.validate(stamp)) {
stamp = lock.readLock();
try {
value = count;
} finally {
lock.unlockRead(stamp);
}
}
return value;
}
public void increment() {
long stamp = lock.writeLock();
try {
count++;
} finally {
lock.unlockWrite(stamp);
}
}
}在上述示例中,Counter 類(lèi)使用 StampedLock 來(lái)實(shí)現(xiàn)對(duì)計(jì)數(shù)器的并發(fā)訪問(wèn)控制。getCount 方法首先嘗試獲取樂(lè)觀讀鎖,并讀取計(jì)數(shù)器的值,然后通過(guò) validate 方法驗(yàn)證數(shù)據(jù)的一致性。如果驗(yàn)證失敗,則獲取悲觀讀鎖,并重新讀取計(jì)數(shù)器的值。increment 方法獲取寫(xiě)鎖,并對(duì)計(jì)數(shù)器進(jìn)行遞增操作。
小結(jié)
ReadWriteLock 和 StampedLock 都是Java中用于并發(fā)控制的重要機(jī)制。
- ReadWriteLock 適用于讀多寫(xiě)少的場(chǎng)景;
- StampedLock 則適用于讀遠(yuǎn)遠(yuǎn)大于寫(xiě)的場(chǎng)景,并且對(duì)數(shù)據(jù)的一致性要求不高;
在實(shí)際應(yīng)用中,我們需要根據(jù)具體場(chǎng)景來(lái)選擇合適的鎖機(jī)制。通過(guò)合理使用這些鎖機(jī)制,我們可以提高并發(fā)程序的性能和可靠性。
到此這篇關(guān)于Java并發(fā)讀寫(xiě)鎖ReentrantReadWriteLock 的文章就介紹到這了,更多相關(guān)Java并發(fā)讀寫(xiě)鎖ReentrantReadWriteLock 內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java中的讀寫(xiě)鎖ReentrantReadWriteLock源碼分析
- Java AQS中ReentrantReadWriteLock讀寫(xiě)鎖的使用
- 一文了解Java讀寫(xiě)鎖ReentrantReadWriteLock的使用
- 詳解Java?ReentrantReadWriteLock讀寫(xiě)鎖的原理與實(shí)現(xiàn)
- ReentrantReadWriteLock?讀寫(xiě)鎖分析總結(jié)
- Java多線(xiàn)程讀寫(xiě)鎖ReentrantReadWriteLock類(lèi)詳解
- Java中ReentrantReadWriteLock讀寫(xiě)鎖的實(shí)現(xiàn)
相關(guān)文章
java.net.MalformedURLException異常的解決方法
下面小編就為大家?guī)?lái)一篇java.net.MalformedURLException異常的解決方法。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-05-05
SpringCloudGateway網(wǎng)關(guān)處攔截并修改請(qǐng)求的操作方法
這篇文章主要介紹了SpringCloudGateway網(wǎng)關(guān)處攔截并修改請(qǐng)求的操作方法,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2023-12-12
一個(gè)applicationContext 加載錯(cuò)誤導(dǎo)致的阻塞問(wèn)題及解決方法
這篇文章主要介紹了一個(gè)applicationContext 加載錯(cuò)誤導(dǎo)致的阻塞問(wèn)題及解決方法,需要的朋友可以參考下2018-11-11
基于HttpServletResponse 相關(guān)常用方法的應(yīng)用
本篇文章小編為大家介紹,基于HttpServletResponse 相關(guān)常用方法的應(yīng)用,需要的朋友參考下2013-04-04
阿里的Easyexcel讀取Excel文件的方法(最新版本)
這篇文章主要介紹了阿里的Easyexcel讀取Excel文件(最新版本)的方法,本文通過(guò)示例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-12-12
EntityWrapper如何在and條件中嵌套o(hù)r語(yǔ)句
這篇文章主要介紹了EntityWrapper如何在and條件中嵌套o(hù)r語(yǔ)句,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
在IDEA中 實(shí)現(xiàn)給main方法附帶參數(shù)的操作
這篇文章主要介紹了在IDEA中 實(shí)現(xiàn)給main方法附帶參數(shù)的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-01-01

