Java多線程之鎖的強化學習
首先強調(diào)一點:Java多線程的鎖都是基于對象的,Java中的每一個對象都可以作為一個鎖。同時,類鎖也是對象鎖,類是Class對象
Java8鎖
核心思想
關(guān)鍵字在實例方法上,鎖為當前實例
關(guān)鍵字在靜態(tài)方法上,鎖為當前Class對象
關(guān)鍵字在代碼塊上,鎖為括號里面的對象
在進行線程執(zhí)行順序的時候,如果添加了線程睡眠,那么就要看鎖的對象是誰,同一把鎖 / 非同一把鎖是不一樣的
Synchronized
synchronized 是Java提供的關(guān)鍵字,用來保證原子性的
synchronized的作用域如下
- 作用在普通方法上,此方法為原子方法:也就是說同一個時刻只有一個線程可以進入,其他線程必須在方法外等待,此時鎖是對象
- 作用在靜態(tài)方法上,此方法為原子方法:也就是說同一個時刻只有一個線程可以進入,其他線程必須在方法外等待,此時鎖是當前的Class對象
- 作用在代碼塊上,此代碼塊是原子操作:也就是說同一個時刻只有線程可以進入,其他線程必須在方法外等待,鎖是 synchronized(XXX) 里面的 XXX
先看一段簡單的代碼
public class SynchronizedTest {
public static void main(String[] args) {
test1();
test2();
}
// 使用synchronized修飾的方法
public synchronized static void test1() {
System.out.println("SynchronizedTest.test1");
}
// 使用synchronized修飾的代碼塊
public static void test2() {
synchronized (SynchronizedTest.class) {
System.out.println("SynchronizedTest.test2");
}
}
}
執(zhí)行之后,對其進行執(zhí)行javap -v命令反編譯
// 省略啰嗦的代碼
public class cn.zq.sync.SynchronizedTest
minor version: 0
major version: 52
flags: ACC_PUBLIC, ACC_SUPER
{
// 源碼
public cn.zq.sync.SynchronizedTest();
descriptor: ()V
flags: ACC_PUBLIC
// main 方法
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: ACC_PUBLIC, ACC_STATIC
// synchronized 修飾的靜態(tài)方法 test1()
public static synchronized void test1();
descriptor: ()V
// 在這里我們可以看到 flags 中有一個 ACC_SYNCHRONIZED
// 這個就是一個標記符這是 保證原子性的關(guān)鍵
// 當方法調(diào)用的時候,調(diào)用指令將會檢查方法的 ACC_SYNCHRONIZED 訪問標記符是否被設置
// 如果設置了,線程將先獲取 monitor,獲取成功之后才會執(zhí)行方法體,方法執(zhí)行之后,釋放monitor
// 在方法執(zhí)行期間,其他任何線程都無法在獲得一個 monitor 對象,本質(zhì)上沒區(qū)別。
flags: ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
Code:
stack=2, locals=0, args_size=0
0: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
3: ldc #5 // String SynchronizedTest.test1
5: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
8: return
LineNumberTable:
line 17: 0
line 18: 8
// 代碼塊使用的 synchronized
public static void test2();
descriptor: ()V
flags: ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=2, args_size=0
0: ldc #7 // class cn/zq/sync/SynchronizedTest
2: dup
3: astore_0
// 這個 monitorenter 是一個指令
// 每個對象都有一個監(jiān)視器鎖(monitor),當monitor被占用的時候就會處于鎖定狀態(tài)
// 線程執(zhí)行monitorenter的時候,嘗試獲取monitor的鎖。過程如下
// 1.任何monitor進入數(shù)為0,則線程進入并設置為1,此線程就是monitor的擁有者
// 2.如果線程已經(jīng)占用,當前線程再次進入的時候,會將monitor的次數(shù)+1
// 3.如何其他的線程已經(jīng)占用了monitor,則線程進阻塞狀態(tài),直到monitor的進入數(shù)為0
// 4.此時其他線程才能獲取當前代碼塊的執(zhí)行權(quán)
4: monitorenter
5: getstatic #4 // Field java/lang/System.out:Ljava/io/PrintStream;
8: ldc #8 // String SynchronizedTest.test2
10: invokevirtual #6 // Method java/io/PrintStream.println:(Ljava/lang/String;)V
13: aload_0
// 執(zhí)行monitorexit這條指令的線程必須是擁有monitor的
// 執(zhí)行的之后,monitor的進入數(shù)-1.如果為0,那么線程就退出 monitor,不再是此代碼塊的執(zhí)行者
// 此時再由其他的線程獲得所有權(quán)
// 其實 wait/notify 等方法也依賴于monitor對象,
// 所以只有在同步方法或者同步代碼塊中才可以使用,否則會報錯 java.lang.IllegalMonitorstateException 異常
14: monitorexit
15: goto 23
18: astore_1
19: aload_0
20: monitorexit
21: aload_1
22: athrow
23: return
Exception table:
from to target type
5 15 18 any
18 21 18 any
LineNumberTable:
line 21: 0
line 22: 5
line 23: 13
line 24: 23
StackMapTable: number_of_entries = 2
frame_type = 255 /* full_frame */
offset_delta = 18
locals = [ class java/lang/Object ]
stack = [ class java/lang/Throwable ]
frame_type = 250 /* chop */
offset_delta = 4
}
SourceFile: "SynchronizedTest.java"總結(jié):
使用synchronized修飾的同步方法
- 通過反編譯我們可以看到,被synchronized修飾的方法,其中的 flags中有一個標記:ACC_SYNCHRONIZED
- 當線程執(zhí)行方法的時候,會先去檢查是否有這樣的一個標記,如果有的話,說明就是一個同步方法,此時會為當前線程設置 monitor ,獲取成功之后才會去執(zhí)行方法體,執(zhí)行完畢之后釋放monitor
使用synchronized修飾的代碼塊
- 通過反編譯我們看到,在代碼塊的兩側(cè)有JVM指令,在進入代碼塊之前指令是 monitorenter
- 當線程執(zhí)行到代碼塊的時候,會先拿到monitor(初始值為0),然后線程將其設置為1,此時當前線程獨占monitor
- 如果當前持有monitor的線程再次進入monitor,則monitor的值+1,當其退出的時候,monitor的次數(shù)-1
- 當線程線程退出一次monitor的時候,會執(zhí)行monitorexit指令,但是只有持有monitor的線程才能獲取并執(zhí)行monitorexit指令,當當前線程monitor為0的時候,當前線程退出持有鎖
- 此時其他線程再來爭搶
- 但是為什么要有兩個 monitorexit呢?
這個時候我們會發(fā)現(xiàn)synchronized是可重入鎖,其實現(xiàn)原理就是monitor的個數(shù)增加和減少
同時wait / notify方法的執(zhí)行也會依賴 monitor,所以wait和notify方法必須放在同步代碼塊中,否則會報錯 java.lang.IllegalMonitorstateException
因為方法區(qū)域很大,所以設置一個標記,現(xiàn)在執(zhí)行完判斷之后,就全部鎖起來,而代碼塊不確定大小,就需要細化monitor的范圍
ReentrantLock
ReentrantLock是Lock接口的一個實現(xiàn)類
在ReentrantLock內(nèi)部有一個抽象靜態(tài)內(nèi)部類Sync
其中一個是 NonfairSync(非公平鎖),另外一個是 FairSync (公平鎖),二者都實現(xiàn)了此抽象內(nèi)部類Sync,ReentrantLock默認使用的是 非公平鎖 ,我們看一下源碼:
public class ReentrantLock implements Lock, java.io.Serializable {
// 鎖的類型
private final Sync sync;
// 抽象靜態(tài)類Sync繼承了AbstractQueueSynchroniser [這個在下面進行解釋]
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 抽象加鎖方法
abstract void lock();
// 不公平的 tryLock 也就是不公平的嘗試獲取
final boolean nonfairTryAcquire(int acquires) {
// 獲取當前線程對象
final Thread current = Thread.currentThread();
// 獲取線程的狀態(tài)
int c = getState();
// 根據(jù)線程的不同狀態(tài)執(zhí)行不同的邏輯
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 獲取獨占模式的線程的當前鎖的狀態(tài)
else if (current == getExclusiveOwnerThread()) {
// 獲取新的層級大小
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 設置鎖的狀態(tài)
setState(nextc);
return true;
}
return false;
}
// 嘗試釋放方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 返回當前線程是不是獨占的
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 返回 ConditionObject 對象
final ConditionObject newCondition() {
return new ConditionObject();
}
// 獲得獨占的線程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 獲得獨占線程的狀態(tài)
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 判斷是否是加鎖的
final boolean isLocked() {
return getState() != 0;
}
// 序列化
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0);
}
}
// 非公平鎖繼承了Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加鎖操作
final void lock() {
// 判斷是不是第一次加鎖 底層調(diào)用 Unsafe的compareAndSwapInt()方法
if (compareAndSetState(0, 1))
// 設置為獨占鎖
setExclusiveOwnerThread(Thread.currentThread());
// 如果不是第一次加鎖,則調(diào)用 acquire 方法在加一層鎖
else
acquire(1);
}
// 返回嘗試加鎖是否成功
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 公平鎖
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
// 加鎖操作,直接設置為1
final void lock() {
acquire(1);
}
// 嘗試加鎖
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
}
Lock接口
public interface Lock {
// 加鎖
void lock();
// 不斷加鎖
void lockInterruptibly() throws InterruptedException;
// 嘗試加鎖
boolean tryLock();
// 嘗試加鎖,具有超時時間
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 釋放鎖
void unlock();
// Condition 對象
Condition newCondition();
}
Condition接口
public interface Condition {
// 等待
void await() throws InterruptedException;
// 超時等待
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 超時納秒等待
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 可中斷等待
void awaitUninterruptibly();
// 等待死亡
boolean awaitUntil(Date deadline) throws InterruptedException;
// 指定喚醒
void signal();
// 喚醒所有
void signalAll();
}為什么官方提供的是非公平鎖,因為如果是公平鎖,假如一個線程需要執(zhí)行很久,那執(zhí)行效率會大大降低
ReentrantLock的其他方法
public class ReentrantLock implements Lock, java.io.Serializable {
// 鎖的類型
private final Sync sync;
// 默認是非公平鎖
public ReentrantLock() {
sync = new NonfairSync();
}
// 有參構(gòu)造,可以設置鎖的類型
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 加鎖
public void lock() {
sync.lock();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// 解鎖 調(diào)用release() 因為是重入鎖,所以需要減少重入的層數(shù)
public void unlock() {
sync.release(1);
}
// 返回Condition對象 ,用來執(zhí)行線程的喚醒等待等操作
public Condition newCondition() {
return sync.newCondition();
}
// 獲取鎖的層數(shù)
public int getHoldCount() {
return sync.getHoldCount();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
// 是否加鎖
public boolean isLocked() {
return sync.isLocked();
}
// 是否是公平鎖
public final boolean isFair() {
return sync instanceof FairSync;
}
// 獲取獨占鎖
protected Thread getOwner() {
return sync.getOwner();
}
// 查詢是否有任何線程正在等待獲取此鎖
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
// 查詢給定線程是否正在等待獲取此鎖
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
// 獲取隊列的長度
public final int getQueueLength() {
return sync.getQueueLength();
}
// 返回一個包含可能正在等待獲取該鎖的線程的集合
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
// 判斷是否等待
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
// 獲得等待隊列的長度
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
// 獲取正在等待的線程集合
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
// toString()
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
}
總結(jié):
1.ReentrantLock是獨占鎖
2.ReentrantLock是可重入鎖
3.底層使用AbstractQueuedSynchronizer實現(xiàn)
4.synchronized 和 ReentrantLock的區(qū)別
- synchronized是是關(guān)鍵字,可以作用在靜態(tài)方法、普通方法、靜態(tài)代碼塊,底層使用monitor實現(xiàn),synchronized是內(nèi)置鎖,是悲觀鎖,其發(fā)生異常會中斷鎖,所以不會發(fā)生死鎖。是非中斷鎖
- ReentrantLock是類,作用在方法中,其比synchronized更加靈活,但是必須手動加鎖釋放鎖,是樂觀鎖,發(fā)生異常不會中斷鎖,必須在finally中釋放鎖,是可中斷的,使用Lock的讀鎖可以提供效率
AQS
AQS:AbstractQueueSynchronizer => 抽象隊列同步器
AQS定義了一套多線程訪問共享資源的同步器框架,很多同步器的實現(xiàn)都依賴AQS。如ReentrantLock、Semaphore、CountDownLatch …
首先看一下AQS隊列的框架

它維護了一個volatile int state (代表共享資源)和一個FIFO線程等待隊列(多線程爭搶資源被阻塞的時候會先進進入此隊列),這里的volatile是核心。在下個部分進行講解~
state的訪問方式有三種
- getState()
- setState()
- compareAndSetState()
AQS定義了兩種資源共享方式:Exclusive(獨占,只有一個線程可以執(zhí)行,如ReentrantLock)和Share(共享,多個線程可同時執(zhí)行,如Semaphore、CountdownLatch)
不同的自定義同步器爭用共享資源的方式也不同。自定義的同步器在實現(xiàn)的時候只需要實現(xiàn)共享資源的獲取和釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊)AQS在頂層已經(jīng)實現(xiàn)好了。
自定義同步器時需要實現(xiàn)以下方法即可
- isHeldExclusively():該線程是否正在獨占資源。只有用的Condition才需要去實現(xiàn)它
- tryAcquire(int):獨占方式。嘗試獲取資源,成功返回true,否則返回false
- tryRelease(int):獨占方式。嘗試釋放資源,成功返回true,否則返回false
- tryAcquireShared(int):共享方式。嘗試獲取資源。負數(shù)表示失敗,0表示成功但沒有剩余可用資源,正數(shù)表示成功,且還有剩余資源
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待節(jié)點返回true,否則返回fasle
以ReentrantLock為例,state初始化為0,表示未鎖定狀態(tài)。A線程lock()時,會調(diào)用tryAcquire()獨占該鎖,然后將state+1,此后其他線程在調(diào)用tryAcquire()就會失敗,直到A線程unlock()到state為0為止,其他線程才有機會獲取該鎖。當前在A釋放鎖之前,A線程是可以重復獲取此鎖的(state)會累加。這就是可重入,但是獲取多少次,就要釋放多少次。
再和CountdownLock為例,任務分為N個子線程去執(zhí)行,state也初始化為N(注意N要與線程的個數(shù)一致)。這N個子線程是并行執(zhí)行的,每個子線程執(zhí)行完之后countDown一次。state會CAS-1。等到所有的子線程都執(zhí)行完后(即state=0),會upark()主調(diào)用線程,然后主調(diào)用線程就會從await()函數(shù)返回,繼續(xù)剩余動作
一般來說,自定義同步器要么是獨占方法,要么是共享方式,也只需要實現(xiàn)tryAcquire - tryRelease,tryAcquireShared - tryReleaseShared 中的一組即可,但是AQS也支持自定義同步器同時實現(xiàn)獨占鎖和共享鎖兩種方式,如:ReentrantReadWriteLock
AQS的源碼
AbstractQueueSynchronizer 繼承了 AbstractOwnableSynchronizer
AbstractOwnableSynchronizer類
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
private static final long serialVersionUID = 3737899427754241961L;
protected AbstractOwnableSynchronizer() { }
// 獨占模式當前的擁有者
private transient Thread exclusiveOwnerThread;
// 設置獨占模式當前的擁有者
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
// 得到獨占模式當前的擁有者
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
AbstractQueueSynchronizer類
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
private static final long serialVersionUID = 7373984972572414691L;
protected AbstractQueuedSynchronizer() { }
// AbstractQueueSynchronizer 中的靜態(tài)內(nèi)部類 Node 節(jié)點
static final class Node {
// 指示節(jié)點正在以共享模式等待的標記
static final Node SHARED = new Node();
// 指示節(jié)點正在以獨占模式等待的標記
static final Node EXCLUSIVE = null;
// 表示線程已經(jīng)取消
static final int CANCELLED = 1;
// 表示線程之后需要釋放
static final int SIGNAL = -1;
// 表示線程正在等待條件
static final int CONDITION = -2;
// 指示下一個 acquireShared 應該無條件傳播
static final int PROPAGATE = -3;
// 狀態(tài)標記
volatile int waitStatus;
// 隊列的前一個節(jié)點
volatile Node prev;
// 隊列的后一個節(jié)點
volatile Node next;
// 線程
volatile Thread thread;
// 下一個正在等待的節(jié)點
Node nextWaiter;
// 判斷是否時共享的
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回上一個節(jié)點,不能為null,為null拋出空指針異常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 構(gòu)造
Node() { // Used to establish initial head or SHARED marker
}
// 有參構(gòu)造,用來添加線程的隊列
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 有參構(gòu)造,根據(jù)等待條件使用
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
// 頭節(jié)點
private transient volatile Node head;
// 尾節(jié)點
private transient volatile Node tail;
// 狀態(tài)
private volatile int state;
// 獲取當前的狀態(tài)
protected final int getState() {
return state;
}
//設置當前的狀態(tài)
protected final void setState(int newState) {
state = newState;
}
// 比較設置當前的狀態(tài)
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// 納秒數(shù),使之更快的旋轉(zhuǎn)
static final long spinForTimeoutThreshold = 1000L;
// 將節(jié)點插入隊列
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 加一個等待節(jié)點
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 設置頭節(jié)點
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// 如果存在后繼節(jié)點,就喚醒
private void unparkSuccessor(Node node) {
// 獲得節(jié)點的狀態(tài)
int ws = node.waitStatus;
// 如果為負數(shù),就執(zhí)行比較并設置方法設置狀態(tài)
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 喚醒后面的節(jié)點
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// 共享模式的釋放動作,并且向后繼節(jié)點發(fā)出信號
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
// 設置隊列的頭,并檢查后繼者能否在共享模式下等待,如果可以,就是否傳播設置為>0或者propagate狀態(tài)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
// 取消正在進行的嘗試
private void cancelAcquire(Node node) {
// 節(jié)點為null,直接返回
if (node == null)
return;
node.thread = null;
// 跳過已經(jīng)取消的前一個節(jié)點
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED;
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
// 還有好多方法... 其實本質(zhì)就是基于 隊列的判斷和操作,AQS提供了獨占鎖和共享鎖的設計
// 在AQS中,使用到了Unsafe類,所以AQS其實就是基于CAS算法的,
// AQS的一些方法就是直接調(diào)用 Unsafe 的方法 如下所示
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
// 比較并設置頭
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
// 比較并設置尾
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
// 比較并設置狀態(tài)
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
// 比較并設置下一個節(jié)點
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
// 除此之外 AQS 還有一個實現(xiàn)了Condition的類 如下
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 條件隊列的第一個節(jié)點
private transient Node firstWaiter;
// 條件隊列的最后一個節(jié)點
private transient Node lastWaiter;
public ConditionObject() { }
// 在等待隊列中添加一個新的節(jié)點
private Node addConditionWaiter() {
// 獲取最后一個節(jié)點
Node t = lastWaiter;
// 如果最后一個節(jié)點被取消了,就清除它
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 刪除并轉(zhuǎn)移節(jié)點直到它沒有取消或者不為null
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 刪除所有的節(jié)點
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// 取消節(jié)點的連接
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// 將等待最長的線程,喚醒
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
// 喚醒所有的等待線程
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 實現(xiàn)不間斷的條件等待
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
// 模式意味著在退出等待時重新中斷
private static final int REINTERRUPT = 1;
// 模式的含義是在退出等待時拋出InterruptedException異常
private static final int THROW_IE = -1;
// 檢查中斷,如果在信號通知之前被中斷,則返回THROW_IE;
// 如果在信號通知之后,則返回REINTERRUPT;如果未被中斷,則返回 0
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 拋出InterruptedException,重新中斷當前線程,
// 或不執(zhí)行任何操作,具體取決于模式。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
// 實現(xiàn)不可中斷的條件等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
// 納秒級別的等待
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
// 絕對定時等待
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 超時等待
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
// 判斷是不是獨占的
final boolean isOwnedBy(AbstractQueuedSynchronizer sync) {
return sync == AbstractQueuedSynchronizer.this;
}
// 返回是否有正在等待的
protected final boolean hasWaiters() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
return true;
}
return false;
}
// 獲得等待隊列的長度
protected final int getWaitQueueLength() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int n = 0;
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION)
++n;
}
return n;
}
// 獲取所有正在等待的線程集合
protected final Collection<Thread> getWaitingThreads() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node w = firstWaiter; w != null; w = w.nextWaiter) {
if (w.waitStatus == Node.CONDITION) {
Thread t = w.thread;
if (t != null)
list.add(t);
}
}
return list;
}
}
}
總結(jié):
1.AQS為我們提供了很多實現(xiàn)。AQS內(nèi)部有兩個內(nèi)部類,ConditionObject和Node節(jié)點
2.和開頭說的一樣,其維護了一個state和一個隊列,也提供了獨占和共享的實現(xiàn)
3.總結(jié)一下流程
- 調(diào)用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功就直接返回
- 沒成功,則addWaiter()將該線程加入等待隊列的尾部,并標記為獨占模式
- acquireQueued()使得線程在隊列中休息,有機會(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源之后才會返回。如果在整個等待過程中被中斷過,就返回true,否則返回false
- 如果線程在等待過程中被中斷過,它不是響應的。只是獲取資源之后才再進行自我中斷selfInterrupt(),將中斷補上

4.release() 是獨占模式下線程共享資源的底層入口,它會釋放指定量的資源,如果徹底釋放了(state = 0)
5.如果獲取鎖的線程在release時異常了,沒有unpark隊列中的其他結(jié)點,這時隊列中的其他結(jié)點會怎么辦?是不是沒法再被喚醒了?
這時,隊列中等待鎖的線程將永遠處于park狀態(tài),無法再被喚醒!
6.獲取鎖的線程在什么情形下會release拋出異常呢 ?
- 線程突然死掉了?可以通過thread.stop來停止線程的執(zhí)行,但該函數(shù)的執(zhí)行條件要嚴苛的多,而且函數(shù)注明是非線程安全的,已經(jīng)標明Deprecated;
- 線程被interupt了?線程在運行態(tài)是不響應中斷的,所以也不會拋出異常;
7.acquireShared()的流程
- tryAcquireShared()嘗試獲取資源,成功則直接返回;
- 失敗則通過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()并成功獲取到資源才返回。整個等待過程也是忽略中斷的。
8.releaseShared()
釋放掉資源之后,喚醒和后繼
7.不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現(xiàn)時只需要實現(xiàn)共享資源state的獲取與釋放方式即可,至于具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經(jīng)在頂層實現(xiàn)好了。自定義同步器實現(xiàn)時主要實現(xiàn)以下幾種方法:
- isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現(xiàn)它。
- tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負數(shù)表示失??;0表示成功,但沒有剩余可用資源;正數(shù)表示成功,且有剩余資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,如果釋放后允許喚醒后續(xù)等待結(jié)點返回true,否則返回false。
volatile
volatile是Java提供的關(guān)鍵字,是輕量級的同步機制 JSR133提出,Java5增強了語義
volatile關(guān)鍵字有三個重要的特點
- 保證內(nèi)存可見性
- 不保證原子性
- 禁止指令重排序
提到volatile,就要提到JMM - 什么是JMM
JMM:Java Memory Model
本身就是一種抽象的概念,并不真實存在,它描述的是一組規(guī)范和規(guī)則,通過這種規(guī)則定義了程序的各個變量(包括實例字段、靜態(tài)字段、和構(gòu)造數(shù)組對象的元素)的訪問方式
JMM關(guān)于同步的規(guī)定
- 線程解鎖前,必須把共享變量的值刷新到主內(nèi)存
- 線程加鎖前,必須讀取主內(nèi)存的最新的值到自己的工作內(nèi)存
- 加鎖和解鎖必須是同一把鎖
happens-before 規(guī)則
前一個操作對下一個操作是完全可見的,如果下一個操作對下下一個操作完全可見,那么前一個操作也對下下個操作可見
重排序
JVM對指令的執(zhí)行,會進行優(yōu)化重新排序,可以發(fā)生在編譯重排序、CPU重排序
什么是內(nèi)存屏障?
內(nèi)存屏障分為2種
- 讀屏障(LoadBarrier)
- 寫屏障(Store Barrier)
內(nèi)存屏障的作用
- 阻止屏障兩側(cè)的指令重排序
- 強制把緩沖區(qū) / 高速緩存中的臟數(shù)據(jù)寫回主內(nèi)存,或者讓緩存中相應的的數(shù)據(jù)失效
編譯器生成字節(jié)碼的時候,會在指令序列中插入內(nèi)存屏障來禁止特定類型的處理器重排序。編譯器選擇了一個比較保守的JMM內(nèi)存屏障插入策略,這樣就可以保證在任何處理器平臺,任何程序中都有正確的volatile語義
- 在每個volatile寫操作之前插入一個StoreStore屏障
- 在每個volatile寫操作之后入一個StoreLoad屏障
- 在每個volatile讀操作之前插入一個LoadLoad屏障
- 在每個volatile讀操作之前插入一個LoadStore屏障
原子性
- 問:i++為什么不是線程安全的?
- 因為 i++ 不是原子操作,i++有三個操作
如何解決?
- 使用 synchronized
- 使用AtomicInteger [JUC下的原子類]
有序性
1.計算機在執(zhí)行程序的時候,為了提高性能,編譯器和處理器通常會對指令重排序,一般分為3種-
- 源代碼 -> 編譯器優(yōu)化的重排 -> 指令并行的重排 -> 內(nèi)存系統(tǒng)的重排 -> 最終執(zhí)行的指令
- 單線程環(huán)境里面確保程序最終執(zhí)行結(jié)果和代碼順序執(zhí)行的結(jié)果一致
- 處理器在執(zhí)行重排序之前必須考慮指令之間的數(shù)據(jù)依賴性
- 多線程環(huán)境種線程交替執(zhí)行,由于編譯器優(yōu)化重排序的存在,兩個線程中使用的變量能否保證一致性是無法確定的,結(jié)果無法預測
2.指令重排序
多線程環(huán)境種線程交替執(zhí)行,由于編譯器優(yōu)化重排序的存在,兩個線程中使用的變量能否保證一致性是無法確定的,結(jié)果無法預測此時使用volatile禁用指令重排序,就可以解決這個問題
volatile的使用
單例設計模式中的 安全的雙重檢查鎖
volatile的底層實現(xiàn)
根據(jù)JMM,所有線程拿到的都是主內(nèi)存的副本,然后存儲到各自線程的空間,當某一線程修改之后,立即修改主內(nèi)存,然后主內(nèi)存通知其他線程修改
Java代碼 instance = new Singleton();//instance 是 volatile 變量 匯編代碼:0x01a3de1d: movb $0x0,0x1104800(%esi);0x01a3de24: lock addl $0x0,(%esp); 有 volatile 變量修飾的共享變量進行寫操作的時候會多第二行匯編代碼,通過查 IA-32 架構(gòu)軟件開發(fā)者手冊可知,lock 前綴的指令在多核處理器下會引發(fā)了兩件事情。將當前處理器緩存行的數(shù)據(jù)會寫回到系統(tǒng)內(nèi)存。這個寫回內(nèi)存的操作會引起在其他 CPU 里緩存了該內(nèi)存地址的數(shù)據(jù)無效。
如果對聲明了volatile變量進行寫操作,JVM就會向處理器發(fā)送一條Lock前綴的指令,將這個變量所在緩存行的數(shù)據(jù)寫回到系統(tǒng)內(nèi)存。但是就算寫回到內(nèi)存,如果其他處理器緩存的值還是舊的,再執(zhí)行計算操作就會有問題,所以在多處理器下,為了保證各個處理器的緩存是一致的,就會實現(xiàn)緩存一致性協(xié)議,每個處理器通過嗅探在總線上傳播的數(shù)據(jù)來檢查自己緩存的值是不是過期了,當處理器發(fā)現(xiàn)自己緩存行對應的內(nèi)存地址被修改,就會將當前處理器的緩存行設置成無效狀態(tài),當處理器要對這個數(shù)據(jù)進行修改操作的時候,會強制重新從系統(tǒng)內(nèi)存里把數(shù)據(jù)讀到處理器緩存里。
自旋鎖 ,自旋鎖的其他種類
CAS 自旋鎖
- CAS(Compare And Swap)比較并替換,是線程并發(fā)運行時用到的一種技術(shù)
- CAS是原子操作,保證并發(fā)安全,而不能保證并發(fā)同步
- CAS是CPU的一個指令(需要JNI調(diào)用Native方法,才能調(diào)用CPU的指令)
- CAS是非阻塞的、輕量級的樂觀鎖
我們可以實現(xiàn)通過手寫代碼完成CAS自旋鎖
CAS包括三個操作數(shù)
- 內(nèi)存位置 - V
- 期望值- A
- 新值 - B
如果內(nèi)存位置的值與期望值匹配,那么處理器會自動將該位置的值設置為新值,否則不做改變。無論是哪種情況,都會在CAS指令之前返回該位置的值。
public class Demo {
volatile static int count = 0;
public static void request() throws Exception {
TimeUnit.MILLISECONDS.sleep(5);
// 表示期望值
int expectedCount;
while (!compareAndSwap(expectedCount = getCount(), expectedCount + 1)) {
}
}
public static synchronized boolean compareAndSwap(int expectedCount, int newValue) {
if (expectedCount == getCount()) {
count = newValue;
return true;
}
return false;
}
public static int getCount() {
return count;
}
public static void main(String[] args) throws Exception {
long start = System.currentTimeMillis();
int threadSize = 100;
CountDownLatch countDownLatch = new CountDownLatch(threadSize);
for (int i = 0; i < threadSize; i++) {
new Thread(() -> {
try {
for (int j = 0; j < 10; j++) {
request();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}).start();
}
countDownLatch.await();
long end = System.currentTimeMillis();
System.out.println("count :" + count + " 耗時:" + (end - start));
}
}
上述是我們自己書寫的CAS自旋鎖,但是JDK已經(jīng)提供了響應的方法
Java提供了 CAS 的支持,在 sun.misc.Unsafe 類中,如下
public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5); public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5); public final native boolean compareAndSwapLong(Object var1, long var2, long var4, long var6);
參數(shù)說明
- var1:表示要操作的對象
- var2:表示要操作對象中屬性地址的偏移量
- var4:表示需要修改數(shù)據(jù)的期望的值
- var5:表示需要修改為的新值
CAS的實現(xiàn)原理
CAS通過調(diào)用JNI的代碼實現(xiàn),JNI:Java Native Interface ,允許Java調(diào)用其他語言
而CompareAndSwapXxx系列的方法就是借助“C語言”CPU底層指令實現(xiàn)的
以常用的 Inter x86來說,最后映射到CPU的指令為“cmpxchg”,這個是一個原子指令,CPU執(zhí)行此命令的時候,實現(xiàn)比較并替換的操作
cmpxchg 如何保證多核心下的線程安全
系統(tǒng)底層進行CAS操作的時候,會判斷當前操作系統(tǒng)是否為多核心,如果是,就給“總線”加鎖,只有一個線程對總線加鎖,保證只有一個線程進行操作,加鎖之后會執(zhí)行CAS操作,也就是說CAS的原子性是平臺級別的
CAS這么強,有沒有什么問題?
高并發(fā)情況下,CAS會一直重試,會損耗性能
CAS的ABA問題
CAS需要在操作值得時候檢查下值有沒有變化,如果沒有發(fā)生變化就更新,但是如果原來一個值為A,經(jīng)過一輪的操作之后,變成了B,然后又是一輪的操作,又變成了A,此時這個位置有沒有發(fā)生改變?改變了的,因為不是一直是A,這就是ABA問題
如何解決ABA問題?
解決ABA問題就是給值增加一個修改版本號,每次值的變化,都會修改它的版本號,CAS在操作的時候都會去對比此版本號。
下面給出一個ABA的案例
public class CasAbaDemo {
public static AtomicInteger a = new AtomicInteger(1);
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.get());
try {
int executedNum = a.get();
int newNum = executedNum + 1;
TimeUnit.SECONDS.sleep(3);
boolean isCasSuccess = a.compareAndSet(executedNum, newNum);
System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "主線程");
Thread thread = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
a.incrementAndGet();
System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.get());
a.decrementAndGet();
System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.get());
} catch (Exception e) {
e.printStackTrace();
}
}, "干擾線程");
main.start();
thread.start();
}
}
Java中ABA解決辦法(AtomicStampedReference)
AtomicStampedReference 主要包含一個引用對象以及一個自動更新的整數(shù) “stamp”的pair對象來解決ABA問題
public class AtomicStampedReference<V> {
private static class Pair<T> {
// 數(shù)據(jù)引用
final T reference;
// 版本號
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
private volatile Pair<V> pair;
/**
* 期望引用
* @param expectedReference the expected value of the reference
* 新值引用
* @param newReference the new value for the reference
* 期望引用的版本號
* @param expectedStamp the expected value of the stamp
* 新值的版本號
* @param newStamp the new value for the stamp
* @return {@code true} if successful
*/
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
// 期望引用與當前引用一致
expectedReference == current.reference &&
// 期望版本與當前版本一致
expectedStamp == current.stamp &&
// 數(shù)據(jù)一致
((newReference == current.reference &&
newStamp == current.stamp)
||
// 數(shù)據(jù)不一致
casPair(current, Pair.of(newReference, newStamp)));
}
}
修改之后完成ABA問題
public class CasAbaDemo02 {
public static AtomicStampedReference<Integer> a = new AtomicStampedReference(new Integer(1), 1);
public static void main(String[] args) {
Thread main = new Thread(() -> {
System.out.println("CasAbaDemo.main " + Thread.currentThread().getName() + ",初始值 " + a.getReference());
try {
Integer executedReference = a.getReference();
Integer newReference = executedReference + 1;
Integer expectStamp = a.getStamp();
Integer newStamp = expectStamp + 1;
TimeUnit.SECONDS.sleep(3);
boolean isCasSuccess = a.compareAndSet(executedReference, newReference, expectStamp, newStamp);
System.out.println(Thread.currentThread().getName() + ",CAS 操作:" + isCasSuccess);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "主線程");
Thread thread = new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(2);
a.compareAndSet(a.getReference(), a.getReference() + 1, a.getStamp(), a.getStamp() + 1);
System.out.println(Thread.currentThread().getName() + ",incrementAndGet,之后" + a.getReference());
a.compareAndSet(a.getReference(), a.getReference() - 1, a.getStamp(), a.getStamp() - 1);
System.out.println(Thread.currentThread().getName() + ",decrementAndGet,之后" + a.getReference());
} catch (Exception e) {
e.printStackTrace();
}
}, "干擾線程");
main.start();
thread.start();
}
}到此這篇關(guān)于Java多線程之鎖的強化學習的文章就介紹到這了,更多相關(guān)Java多線程 鎖內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Spring Boot與Kotlin定時任務的示例(Scheduling Tasks)
這篇文章主要介紹了Spring Boot與Kotlin定時任務的示例(Scheduling Tasks),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-03-03
解決mybatis-plus動態(tài)數(shù)據(jù)源切換不生效的問題
本文主要介紹了解決mybatis-plus動態(tài)數(shù)據(jù)源切換不生效的問題,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-01-01

