詳解Java中CountDownLatch異步轉(zhuǎn)同步工具類
使用場(chǎng)景
由于公司業(yè)務(wù)需求,需要對(duì)接socket、MQTT等消息隊(duì)列。
眾所周知 socket 是雙向通信,socket的回復(fù)是人為定義的,客戶端推送消息給服務(wù)端,服務(wù)端的回復(fù)是兩條線。無法像http請(qǐng)求有回復(fù)。
下發(fā)指令給硬件時(shí),需要校驗(yàn)此次數(shù)據(jù)下發(fā)是否成功。
用戶體驗(yàn)而言,點(diǎn)擊按鈕就要知道此次的下發(fā)成功或失敗。

如上圖模型,
第一種方案使用Tread.sleep
優(yōu)點(diǎn):占用資源小,放棄當(dāng)前cpu資源
缺點(diǎn): 回復(fù)速度快,休眠時(shí)間過長(zhǎng),仍然需要等待休眠結(jié)束才能返回,響應(yīng)速度是固定的,無法及時(shí)響應(yīng)第二種方案使用CountDownLatch
package com.lzy.demo.delay;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CountDownLatchPool {
//countDonw池
private final static Map<Integer, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>();
//延遲隊(duì)列
private final static DelayQueue<MessageDelayQueueUtil> delayQueue = new DelayQueue<>();
private volatile static boolean flag =false;
//單線程池
private final static ExecutorService t = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1));
public static void addCountDownLatch(Integer messageId) {
CountDownLatch countDownLatch = countDownLatchMap.putIfAbsent(messageId,new CountDownLatch(1) );
if(countDownLatch == null){
countDownLatch = countDownLatchMap.get(messageId);
}
try {
addDelayQueue(messageId);
countDownLatch.await(3L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("阻塞等待結(jié)束~~~~~~");
}
public static void removeCountDownLatch(Integer messageId){
CountDownLatch countDownLatch = countDownLatchMap.get(messageId);
if(countDownLatch == null)
return;
countDownLatch.countDown();
countDownLatchMap.remove(messageId);
System.out.println("清除Map數(shù)據(jù)"+countDownLatchMap);
}
private static void addDelayQueue(Integer messageId){
delayQueue.add(new MessageDelayQueueUtil(messageId));
clearMessageId();
}
private static void clearMessageId(){
synchronized (CountDownLatchPool.class){
if(flag){
return;
}
flag = true;
}
t.execute(()->{
while (delayQueue.size() > 0){
System.out.println("進(jìn)入線程并開始執(zhí)行");
try {
MessageDelayQueueUtil take = delayQueue.take();
Integer messageId1 = take.getMessageId();
removeCountDownLatch(messageId1);
System.out.println("清除隊(duì)列數(shù)據(jù)"+messageId1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
flag = false;
System.out.println("結(jié)束end----");
});
}
public static void main(String[] args) throws InterruptedException {
/*
測(cè)試超時(shí)清空map
new Thread(()->addCountDownLatch(1)).start();
new Thread(()->addCountDownLatch(2)).start();
new Thread(()->addCountDownLatch(3)).start();
*/
//提前創(chuàng)建線程,清空countdown
new Thread(()->{
try {
Thread.sleep(500L);
removeCountDownLatch(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
//開始阻塞
addCountDownLatch(1);
//通過調(diào)整上面的sleep我們發(fā)現(xiàn)阻塞市場(chǎng)取決于countDownLatch.countDown()執(zhí)行時(shí)間
System.out.println("阻塞結(jié)束----");
}
}
class MessageDelayQueueUtil implements Delayed {
private Integer messageId;
private long avaibleTime;
public Integer getMessageId() {
return messageId;
}
public void setMessageId(Integer messageId) {
this.messageId = messageId;
}
public long getAvaibleTime() {
return avaibleTime;
}
public void setAvaibleTime(long avaibleTime) {
this.avaibleTime = avaibleTime;
}
public MessageDelayQueueUtil(Integer messageId){
this.messageId = messageId;
//avaibleTime = 當(dāng)前時(shí)間+ delayTime
//重試3次,每次3秒+1秒的延遲
this.avaibleTime=3000*3+1000 + System.currentTimeMillis();
}
@Override
public long getDelay(TimeUnit unit) {
long diffTime= avaibleTime- System.currentTimeMillis();
return unit.convert(diffTime,TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
//compareTo用在DelayedUser的排序
return (int)(this.avaibleTime - ((MessageDelayQueueUtil) o).getAvaibleTime());
}
}
由于socket并不確定每次都會(huì)有數(shù)據(jù)返回,所以map的數(shù)據(jù)會(huì)越來越大,最終導(dǎo)致內(nèi)存溢出
需定時(shí)清除map內(nèi)的無效數(shù)據(jù)。
可以使用DelayedQuene延遲隊(duì)列來處理,相當(dāng)于給對(duì)象添加一個(gè)過期時(shí)間
使用方法 addCountDownLatch 等待消息,異步回調(diào)消息清空removeCountDownLatch
到此這篇關(guān)于詳解Java中CountDownLatch異步轉(zhuǎn)同步工具類的文章就介紹到這了,更多相關(guān)CountDownLatch異步轉(zhuǎn)同步工具類內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java請(qǐng)求調(diào)用參數(shù)格式為form-data類型的接口代碼示例
這篇文章主要給大家介紹了關(guān)于Java請(qǐng)求調(diào)用參數(shù)格式為form-data類型的接口的相關(guān)資料,文中給出了詳細(xì)的代碼示例,對(duì)大家的學(xué)習(xí)或者工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-08-08
親身體驗(yàn)Intellij?Idea從卡頓到順暢全過程
這篇文章主要介紹了親身體驗(yàn)Intellij?Idea從卡頓到順暢全過程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-09-09
詳解APP微信支付(java后臺(tái)_統(tǒng)一下單和回調(diào))
這篇文章主要介紹了APP微信支付(java后臺(tái)_統(tǒng)一下單和回調(diào)),文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-05-05
Java實(shí)現(xiàn)在線語(yǔ)音識(shí)別
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)在線語(yǔ)音識(shí)別功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-08-08
Java必備知識(shí)之位運(yùn)算及常見進(jìn)制解讀
從現(xiàn)代計(jì)算機(jī)中所有的數(shù)據(jù)二進(jìn)制的形式存儲(chǔ)在設(shè)備中。即 0、1 兩種狀態(tài),計(jì)算機(jī)對(duì)二進(jìn)制數(shù)據(jù)進(jìn)行的運(yùn)算(+、-、*、/)都是叫位運(yùn)算,即將符號(hào)位共同參與運(yùn)算的運(yùn)算2021-10-10
springboot CompletableFuture并行計(jì)算及使用方法
CompletableFuture基于 Future 和 CompletionStage 接口,利用線程池、回調(diào)函數(shù)、異常處理、組合操作等機(jī)制,提供了強(qiáng)大而靈活的異步編程功能,這篇文章主要介紹了springboot CompletableFuture并行計(jì)算及使用方法,需要的朋友可以參考下2024-05-05
SpringBoot + Spring Security 基本使用及個(gè)性化登錄配置詳解
這篇文章主要介紹了SpringBoot + Spring Security 基本使用及個(gè)性化登錄配置詳解,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-05-05

