Java中的Semaphore信號(hào)量使用解析
1、Semaphore 是什么
Semaphore 通常我們叫它信號(hào)量, 可以用來(lái)控制同時(shí)訪問特定資源的線程數(shù)量,通過(guò)協(xié)調(diào)各個(gè)線程,以保證合理的使用資源。
可以把它簡(jiǎn)單的理解成我們停車場(chǎng)入口立著的那個(gè)顯示屏,每有一輛車進(jìn)入停車場(chǎng)顯示屏就會(huì)顯示剩余車位減1,每有一輛車從停車場(chǎng)出去,顯示屏上顯示的剩余車輛就會(huì)加1,當(dāng)顯示屏上的剩余車位為0時(shí),停車場(chǎng)入口的欄桿就不會(huì)再打開,車輛就無(wú)法進(jìn)入停車場(chǎng)了,直到有一輛車從停車場(chǎng)出去為止。
2、使用場(chǎng)景
主要用于那些資源有明確訪問數(shù)量限制的場(chǎng)景,常用于限流 。
比如:數(shù)據(jù)庫(kù)連接池,同時(shí)進(jìn)行連接的線程有數(shù)量限制,連接不能超過(guò)一定的數(shù)量,當(dāng)連接達(dá)到了限制數(shù)量后,后面的線程只能排隊(duì)等前面的線程釋放了數(shù)據(jù)庫(kù)連接才能獲得數(shù)據(jù)庫(kù)連接。
public class TestPoolSemaphore {
public static void main(String[] args) {
Pool pool = new Pool(2);
for (int i = 0; i < 5; i++) {
new Thread(() -> {
Connection conn = pool.borrow();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
pool.free(conn);
}).start();
}
}
}
@Slf4j(topic = "c.Pool")
class Pool {
// 1. 連接池大小
private final int poolSize;
// 2. 連接對(duì)象數(shù)組
private Connection[] connections;
// 3. 連接狀態(tài)數(shù)組 0 表示空閑, 1 表示繁忙
private AtomicIntegerArray states;
private Semaphore semaphore;
// 4. 構(gòu)造方法初始化
public Pool(int poolSize) {
this.poolSize = poolSize;
// 讓許可數(shù)與資源數(shù)一致
this.semaphore = new Semaphore(poolSize);
this.connections = new Connection[poolSize];
this.states = new AtomicIntegerArray(new int[poolSize]);
for (int i = 0; i < poolSize; i++) {
connections[i] = new MockConnection("連接" + (i+1));
}
}
// 5. 借連接
public Connection borrow() {// t1, t2, t3
// 獲取許可
try {
semaphore.acquire(); // 沒有許可的線程,在此等待
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < poolSize; i++) {
// 獲取空閑連接
if(states.get(i) == 0) {
if (states.compareAndSet(i, 0, 1)) {
log.debug("borrow {}", connections[i]);
return connections[i];
}
}
}
// 不會(huì)執(zhí)行到這里
return null;
}
// 6. 歸還連接
public void free(Connection conn) {
for (int i = 0; i < poolSize; i++) {
if (connections[i] == conn) {
states.set(i, 0);
log.debug("free {}", conn);
semaphore.release();
break;
}
}
}
}
class MockConnection implements Connection {
private String name;
public MockConnection(String name) {
this.name = name;
}
@Override
public String toString() {
return "MockConnection{" +
"name='" + name + '\'' +
'}';
}
@Override
public Statement createStatement() throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql) throws SQLException {
return null;
}
@Override
public String nativeSQL(String sql) throws SQLException {
return null;
}
@Override
public void setAutoCommit(boolean autoCommit) throws SQLException {
}
@Override
public boolean getAutoCommit() throws SQLException {
return false;
}
@Override
public void commit() throws SQLException {
}
@Override
public void rollback() throws SQLException {
}
@Override
public void close() throws SQLException {
}
@Override
public boolean isClosed() throws SQLException {
return false;
}
@Override
public DatabaseMetaData getMetaData() throws SQLException {
return null;
}
@Override
public void setReadOnly(boolean readOnly) throws SQLException {
}
@Override
public boolean isReadOnly() throws SQLException {
return false;
}
@Override
public void setCatalog(String catalog) throws SQLException {
}
@Override
public String getCatalog() throws SQLException {
return null;
}
@Override
public void setTransactionIsolation(int level) throws SQLException {
}
@Override
public int getTransactionIsolation() throws SQLException {
return 0;
}
@Override
public SQLWarning getWarnings() throws SQLException {
return null;
}
@Override
public void clearWarnings() throws SQLException {
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency) throws SQLException {
return null;
}
@Override
public Map<String, Class<?>> getTypeMap() throws SQLException {
return null;
}
@Override
public void setTypeMap(Map<String, Class<?>> map) throws SQLException {
}
@Override
public void setHoldability(int holdability) throws SQLException {
}
@Override
public int getHoldability() throws SQLException {
return 0;
}
@Override
public Savepoint setSavepoint() throws SQLException {
return null;
}
@Override
public Savepoint setSavepoint(String name) throws SQLException {
return null;
}
@Override
public void rollback(Savepoint savepoint) throws SQLException {
}
@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
}
@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public CallableStatement prepareCall(String sql, int resultSetType, int resultSetConcurrency, int resultSetHoldability) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int autoGeneratedKeys) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, int[] columnIndexes) throws SQLException {
return null;
}
@Override
public PreparedStatement prepareStatement(String sql, String[] columnNames) throws SQLException {
return null;
}
@Override
public Clob createClob() throws SQLException {
return null;
}
@Override
public Blob createBlob() throws SQLException {
return null;
}
@Override
public NClob createNClob() throws SQLException {
return null;
}
@Override
public SQLXML createSQLXML() throws SQLException {
return null;
}
@Override
public boolean isValid(int timeout) throws SQLException {
return false;
}
@Override
public void setClientInfo(String name, String value) throws SQLClientInfoException {
}
@Override
public void setClientInfo(Properties properties) throws SQLClientInfoException {
}
@Override
public String getClientInfo(String name) throws SQLException {
return null;
}
@Override
public Properties getClientInfo() throws SQLException {
return null;
}
@Override
public Array createArrayOf(String typeName, Object[] elements) throws SQLException {
return null;
}
@Override
public Struct createStruct(String typeName, Object[] attributes) throws SQLException {
return null;
}
@Override
public void setSchema(String schema) throws SQLException {
}
@Override
public String getSchema() throws SQLException {
return null;
}
@Override
public void abort(Executor executor) throws SQLException {
}
@Override
public void setNetworkTimeout(Executor executor, int milliseconds) throws SQLException {
}
@Override
public int getNetworkTimeout() throws SQLException {
return 0;
}
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
return null;
}
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return false;
}
}比如:停車場(chǎng)場(chǎng)景,車位數(shù)量有限,同時(shí)只能容納多少臺(tái)車,車位滿了之后只有等里面的車離開停車場(chǎng)外面的車才可以進(jìn)入。
/**
* @author WGR
* @create 2020/12/27 -- 22:19
*/
public class Test1 {
public static void main(String[] args) {
// 1. 創(chuàng)建 semaphore 對(duì)象
Semaphore semaphore = new Semaphore(3);
// 2. 10個(gè)線程同時(shí)運(yùn)行
for (int i = 0; i < 10; i++) {
final int x = i;
new Thread(() -> {
// 3. 獲取許可
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
System.out.println(x +"占到車位。。。");
Thread.sleep(1);
System.out.println(x +"釋放車位。。。");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 4. 釋放許可
semaphore.release();
}
}).start();
}
}
}3、Semaphore實(shí)現(xiàn)原理
(1)、Semaphore初始化
Semaphore semaphore=new Semaphore(3);
1、當(dāng)調(diào)用new Semaphore(3) 方法時(shí),默認(rèn)會(huì)創(chuàng)建一個(gè)非公平的鎖的同步阻塞隊(duì)列。
2、把初始令牌數(shù)量賦值給同步隊(duì)列的state狀態(tài),state的值就代表當(dāng)前所剩余的令牌數(shù)量。
(2)獲取令牌
semaphore.acquire();
1、當(dāng)前線程會(huì)嘗試去同步隊(duì)列獲取一個(gè)令牌,獲取令牌的過(guò)程也就是使用原子的操作去修改同步隊(duì)列的state ,獲取一個(gè)令牌則修改為state=state-1。
2、 當(dāng)計(jì)算出來(lái)的state<0,則代表令牌數(shù)量不足,此時(shí)會(huì)創(chuàng)建一個(gè)Node節(jié)點(diǎn)加入阻塞隊(duì)列,掛起當(dāng)前線程。
3、當(dāng)計(jì)算出來(lái)的state>=0,則代表獲取令牌成功。
源碼:
/**
* 獲取1個(gè)令牌
*/
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}/**
* 共享模式下獲取令牌,獲取成功則返回,失敗則加入阻塞隊(duì)列,掛起線程
* @param arg
* @throws InterruptedException
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//嘗試獲取令牌,arg為獲取令牌個(gè)數(shù),當(dāng)可用令牌數(shù)減當(dāng)前令牌數(shù)結(jié)果小于0,則創(chuàng)建一個(gè)節(jié)點(diǎn)加入阻塞隊(duì)列,掛起當(dāng)前線程。
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}/**
* 1、創(chuàng)建節(jié)點(diǎn),加入阻塞隊(duì)列,
* 2、重雙向鏈表的head,tail節(jié)點(diǎn)關(guān)系,清空無(wú)效節(jié)點(diǎn)
* 3、掛起當(dāng)前節(jié)點(diǎn)線程
* @param arg
* @throws InterruptedException
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//創(chuàng)建節(jié)點(diǎn)加入阻塞隊(duì)列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//獲得當(dāng)前節(jié)點(diǎn)pre節(jié)點(diǎn)
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);//返回鎖的state
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//重組雙向鏈表,清空無(wú)效節(jié)點(diǎn),掛起當(dāng)前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}Semaphore 有點(diǎn)像一個(gè)停車場(chǎng),permits 就好像停車位數(shù)量,當(dāng)線程獲得了 permits 就像是獲得了停車位,然后停車場(chǎng)顯示空余車位減一剛開始,permits(state)為 3,這時(shí) 5 個(gè)線程來(lái)獲取資源

假設(shè)其中 Thread-1,Thread-2,Thread-4 cas 競(jìng)爭(zhēng)成功,而 Thread-0 和 Thread-3 競(jìng)爭(zhēng)失敗,進(jìn)入 AQS 隊(duì)列park 阻塞

(3)、釋放令牌
semaphore.release();
當(dāng)調(diào)用semaphore.release() 方法時(shí)
1、線程會(huì)嘗試釋放一個(gè)令牌,釋放令牌的過(guò)程也就是把同步隊(duì)列的state修改為state=state+1的過(guò)程
2、釋放令牌成功之后,同時(shí)會(huì)喚醒同步隊(duì)列的所有阻塞節(jié)共享節(jié)點(diǎn)線程
3、被喚醒的節(jié)點(diǎn)會(huì)重新嘗試去修改state=state-1 的操作,如果state>=0則獲取令牌成功,否則重新進(jìn)入阻塞隊(duì)列,掛起線程。
源碼:
/**
* 釋放令牌
*/
public void release() {
sync.releaseShared(1);
}/**
*釋放共享鎖,同時(shí)喚醒所有阻塞隊(duì)列共享節(jié)點(diǎn)線程
* @param arg
* @return
*/
public final boolean releaseShared(int arg) {
//釋放共享鎖
if (tryReleaseShared(arg)) {
//喚醒所有共享節(jié)點(diǎn)線程
doReleaseShared();
return true;
}
return false;
}/**
* 喚醒所有共享節(jié)點(diǎn)線程
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//是否需要喚醒后繼節(jié)點(diǎn)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改狀態(tài)為初始0
continue;
unparkSuccessor(h);//喚醒h.nex節(jié)點(diǎn)線程
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE));
}
if (h == head) // loop if head changed
break;
}
}這時(shí) Thread-4 釋放了 permits,狀態(tài)如下

接下來(lái) Thread-0 競(jìng)爭(zhēng)成功,state 再次設(shè)置為 0,設(shè)置自己為 head 節(jié)點(diǎn),斷開原來(lái)的 head 節(jié)點(diǎn),unpark 接下來(lái)的 Thread-3 節(jié)點(diǎn),但由于 state 是 0,因此 Thread-3 在嘗試不成功后再次進(jìn)入 park 狀態(tài)

到此這篇關(guān)于Java中的Semaphore信號(hào)量使用解析的文章就介紹到這了,更多相關(guān)Semaphore信號(hào)量?jī)?nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)數(shù)字轉(zhuǎn)成英文的方法
這篇文章主要介紹了Java實(shí)現(xiàn)數(shù)字轉(zhuǎn)成英文的方法,涉及java數(shù)組與字符串的相關(guān)操作技巧,需要的朋友可以參考下2015-05-05
Java微信公眾平臺(tái)開發(fā)(7) 公眾平臺(tái)測(cè)試帳號(hào)的申請(qǐng)
這篇文章主要為大家詳細(xì)介紹了Java微信公眾平臺(tái)開發(fā)第七步,微信公眾平臺(tái)測(cè)試帳號(hào)的申請(qǐng),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-04-04
Java將時(shí)間按月份分段的實(shí)現(xiàn)思路與方法
這篇文章主要給大家介紹了關(guān)于Java將時(shí)間按月份分段的實(shí)現(xiàn)思路與方法,通過(guò)文中介紹的方法可以將時(shí)間分成我們想要的時(shí)間段,文中給出了詳細(xì)的實(shí)例代碼,需要的朋友可以參考下2021-07-07
解決Unable to start embedded container&nbs
這篇文章主要介紹了解決Unable to start embedded container SpringBoot啟動(dòng)報(bào)錯(cuò)問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07
IDEA:Git stash 暫存分支修改的實(shí)現(xiàn)代碼
這篇文章主要介紹了IDEA:Git stash 暫存分支修改的實(shí)現(xiàn)代碼,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-03-03
spring?boot?validation參數(shù)校驗(yàn)與分組嵌套各種類型及使用小結(jié)
參數(shù)校驗(yàn)基本上是controller必做的事情,畢竟前端傳過(guò)來(lái)的一切都不可信,validation可以簡(jiǎn)化這一操作,這篇文章主要介紹了spring?boot?validation參數(shù)校驗(yàn)分組嵌套各種類型及使用小結(jié),需要的朋友可以參考下2023-09-09

