Springboot詳解線程池與多線程及阻塞隊(duì)列的應(yīng)用詳解
版本:Spring Boot 2.6.3
一、案例場景
1>web端接收restful請求生成任務(wù)A,并把任務(wù)放入隊(duì)列Queue_A。
2>線程池A的任務(wù)線程從隊(duì)列Queue_A取出任務(wù),處理完成后放入Queue_B。
3>線程池B的任務(wù)線程從Queue_B取出任務(wù),處理完成后入庫。
本例就使用兩個(gè)任務(wù)步驟,按需擴(kuò)展延長任務(wù)鏈。
二、使用類
java.util.LinkedHashMap,雙向鏈表。
java.util.concurrent.BlockingQueue,阻塞隊(duì)列接口。
java.util.concurrent.LinkedBlockingQueue,阻塞隊(duì)列實(shí)現(xiàn)類。
java.util.concurrent.CountDownLatch,線程計(jì)數(shù)器。
java.util.concurrent.locks.ReentrantLock,可重入鎖。
三、本例說明
1.接收web請求
OrderController接收web請求,業(yè)務(wù)數(shù)據(jù)封裝成任務(wù)對象,并寫入隊(duì)列QUEUE_A。Web請求結(jié)束,立即返回。
2.后臺(tái)任務(wù)處理
FlowStarter流程啟動(dòng)器
管理FlowManager,創(chuàng)建流程管理器和啟動(dòng)流程管理器。創(chuàng)建線程池容器StepContainer,指定隊(duì)列、線程池線程數(shù)量,以及業(yè)務(wù)處理Handler。
FlowManager流程管理器
管理線程池容器StepContainer。創(chuàng)建線程池容器,啟動(dòng)線程池容器,關(guān)閉線程池容器,線程池容器之間數(shù)據(jù)傳遞。使用LinkedHashMap維護(hù)一個(gè)流程中的多個(gè)線程池容器。
StepContainer線程池容器
創(chuàng)建線程池,啟動(dòng)線程執(zhí)行器(Executor),初始化業(yè)務(wù)處理Handler,讀寫隊(duì)列。使用LinkedHashMap維護(hù)一個(gè)流程中的多個(gè)StepExecutor。
StepExecutor線程執(zhí)行器
執(zhí)行抽象公用業(yè)務(wù)邏輯。實(shí)現(xiàn)線程Runnable接口。調(diào)用StepHandler的實(shí)現(xiàn)類的execute執(zhí)行具體業(yè)務(wù)邏輯。
StepHandler業(yè)務(wù)處理器handler
具體業(yè)務(wù)在StepHandler的實(shí)現(xiàn)類的execute中實(shí)現(xiàn)。
任務(wù)模型對象StepModel和執(zhí)行結(jié)果對象StepResult
每個(gè)具體業(yè)務(wù)數(shù)據(jù)必須包裝成任務(wù)模型對象StepModel,執(zhí)行結(jié)果包裝成執(zhí)行結(jié)果對象StepResult,才能在線程池和隊(duì)列中流轉(zhuǎn)。
3.關(guān)系說明
一個(gè)FlowStarter可以啟動(dòng)一個(gè)或者多個(gè)FlowManager。支持一對多和一對一,按需擴(kuò)展。
一個(gè)FlowManager對應(yīng)一個(gè)業(yè)務(wù)流程。一個(gè)業(yè)務(wù)流程可以拆分為多個(gè)步驟。一個(gè)步驟對應(yīng)一個(gè)線程池容器StepContainer。一個(gè)線程池容器StepContainer,啟動(dòng)多個(gè)線程執(zhí)行器StepExecutor。效果就是并發(fā)執(zhí)行任務(wù)。
一個(gè)業(yè)務(wù)流程拆分成若干個(gè)步驟,每個(gè)步驟之間數(shù)據(jù)流轉(zhuǎn),使用任務(wù)模型StepModel中的狀態(tài)標(biāo)識(shí)isFinished,isPutInQueueAgain,isPutInQueueNext 字段來分析任務(wù)流向。使用StepModel的StepResult的 nextStepName字段來識(shí)別具體流向的線程池容器。
四、代碼
1.OrderController
OrderController,接收請求、封裝任務(wù)、寫隊(duì)列。
@Slf4j
@RestController
@RequestMapping("/order")
public class OrderController {
@PostMapping("/f1")
public Object f1(@RequestBody Object obj) {
log.info("OrderController->f1,接收參數(shù),obj = " + obj.toString());
Map objMap = (Map) obj;
OrderInfo orderInfo = new OrderInfo();
orderInfo.setUserName((String) objMap.get("userName"));
orderInfo.setTradeName((String) objMap.get("tradeName"));
orderInfo.setOrderTime(System.currentTimeMillis());
LinkedBlockingQueue<StepModel> queueA = FlowQueue.getBlockingQueue("QUEUE_A");
QueueUtils.putStepPutInQueue(queueA,orderInfo);
log.info("OrderController->f1,返回." );
return ResultObj.builder().code("200").message("成功").build();
}
}2.FlowStarter流程啟動(dòng)器
FlowStarter,后臺(tái)任務(wù)線程池和線程啟動(dòng)。實(shí)現(xiàn)InitializingBean了接口。那么在spring初始化化bean完成后,就能觸發(fā)啟動(dòng)線程池和線程。
@Slf4j
@Service
public class FlowStarter implements InitializingBean {
@Override
public void afterPropertiesSet() throws Exception {
log.info("FlowWorker創(chuàng)建流程.");
FlowManager flowManager = new FlowManager();
flowManager.buildContainer(ConstantUtils.STEP_01,5,
FlowQueue.getBlockingQueue("QUEUE_A"), Step01Handler.class
);
flowManager.buildContainer(ConstantUtils.STEP_02,5,
FlowQueue.getBlockingQueue("QUEUE_B"), Step02Handler.class
);
flowManager.startContainers();
log.info("FlowWorker啟動(dòng)流程完成.");
}
}3.FlowManager流程管理器
一個(gè)FlowManager流程管理器,維護(hù)多個(gè)線程池容器StepContainer,共同完成一個(gè)流程的多個(gè)步驟。
public class FlowManager {
// 管理器名稱
private String name;
// 管理線程池容器
private Map<String, StepContainer> stepContainerMap = new LinkedHashMap<>();
public FlowManager() {}
// 創(chuàng)建線程池容器
public void buildContainer(String name, int poolSize, BlockingQueue<StepModel> queue,
Class<? extends StepHandler> handlerClazz) {
StepContainer stepWorker = new StepContainer();
stepWorker.createThreadPool(poolSize, queue, handlerClazz);
stepWorker.setName(name);
stepWorker.setFlowManager(this);
this.stepContainerMap.put(name, stepWorker);
}
// 啟動(dòng)線程池容器
public void startContainers() {
for (StepContainer stepContainer : this.stepContainerMap.values()) {
stepContainer.startRunExecutor();
}
}
// 關(guān)閉線程池容器
public void stopContainers() {
for (StepContainer stepContainer : this.stepContainerMap.values()) {
stepContainer.stopRunExecutor();
}
this.stepContainerMap.clear();
}
// 任務(wù)放入下一個(gè)線程池
public boolean sendToNextContainer(String nextStepName, Object obj) {
if (nextStepName != null && !StringUtils.equals(nextStepName, "")) {
if (this.stepContainerMap.containsKey(nextStepName)) {
this.stepContainerMap.get(nextStepName).putStepInQueue(obj);
return true;
} else {
return false;
}
} else {
return false;
}
}
public String getName() {
return name;
}
}4.StepContainer線程池容器
StepContainer線程池容器,維護(hù)多個(gè)線程執(zhí)行器StepExecutor,實(shí)現(xiàn)多線程異步完成每個(gè)獨(dú)立任務(wù)。
@Slf4j
public class StepContainer {
// 線程池名稱
private String name;
// 線程池
private ExecutorService threadPool;
// 線程數(shù)目
private int nThreads = 0;
// 線程處理業(yè)務(wù)handler類
private Class handlerClazz;
// 線程處理業(yè)務(wù)隊(duì)列
private BlockingQueue<StepModel> queue = null;
// 線程池內(nèi)線程管理
private Map<String, StepExecutor> stepExecutorMap = new LinkedHashMap<>();
// 線程池運(yùn)行狀態(tài)
private boolean isRun = false;
// 線程池管理器
private FlowManager flowManager = null;
// 構(gòu)造函數(shù)
public StepContainer() {}
// 創(chuàng)建線程池
public boolean createThreadPool(int nThreads, BlockingQueue<StepModel> queue,
Class<? extends StepHandler> handlerClazz) {
try {
this.nThreads = nThreads;
this.queue = queue;
this.handlerClazz = handlerClazz;
this.threadPool = Executors.newFixedThreadPool(this.nThreads, new ThreadFactory() {
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable);
}
});
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
// 啟動(dòng)線程
public void startRunExecutor() {
if (!this.isRun) {
if (this.handlerClazz != null) {
log.info("線程池: " + this.name + ",啟動(dòng),加載線程Executor.");
StepExecutor stepExecutor;
String executorName = "";
for (int num = 0; num < this.nThreads; num++) {
try {
executorName = this.name + "_" + (num + 1);
StepHandler stepHandler = (StepHandler) createStepHandler(this.handlerClazz);
stepExecutor = new StepExecutor(executorName, this.queue, stepHandler, this);
this.threadPool.execute(stepExecutor);
this.stepExecutorMap.put(executorName, stepExecutor);
} catch (Exception e) {
e.printStackTrace();
}
}
this.isRun = true;
}
}
}
// 關(guān)閉線程
public void stopRunExecutor() {
if (isRun) {
Iterator iterator = this.stepExecutorMap.values().iterator();
while (iterator.hasNext()) {
StepExecutor stepExecutor = (StepExecutor) iterator.next();
stepExecutor.stop();
}
this.stepExecutorMap.clear();
this.isRun = false;
}
}
// 從隊(duì)列獲取任務(wù)
public StepModel getStepFromQueue() {
StepModel stepModel = null;
synchronized (this.queue) {
try {
if (this.queue.size() > 0) {
stepModel = this.queue.take();
}
} catch (Exception e) {
log.info("從隊(duì)列獲取任務(wù)異常.");
e.printStackTrace();
}
}
return stepModel;
}
// 任務(wù)放入隊(duì)列
public void putStepInQueue(Object obj) {
try {
StepModel stepModel = new StepModel(obj);
stepModel.setPutInQueueTime(System.currentTimeMillis());
this.queue.put(stepModel);
} catch (InterruptedException e) {
log.info("任務(wù)放入隊(duì)列異常.");
e.printStackTrace();
}
}
// 重新放入
public void putStepInQueueAgain(StepModel stepModel) {
stepModel.setFinished(false);
stepModel.setPutInQueueNext(false);
stepModel.setPutInQueueAgain(false);
try {
this.queue.put(stepModel);
} catch (InterruptedException e) {
log.info("任務(wù)重新放入隊(duì)列異常.");
e.printStackTrace();
}
}
// 清空隊(duì)列
public void clearQueue() {
if (this.queue != null) {
this.queue.clear();
}
}
// 初始化實(shí)例對象
public Object createStepHandler(Class clazz)
throws InstantiationException, IllegalAccessException {
Object object = clazz.newInstance();
return object;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public FlowManager getFlowManager() {
return flowManager;
}
public void setFlowManager(FlowManager flowManager) {
this.flowManager = flowManager;
}
}5.StepExecutor線程執(zhí)行器
StepExecutor線程執(zhí)行器,實(shí)現(xiàn)Runnable接口。線程執(zhí)行單元通用邏輯,具體業(yè)務(wù)邏輯通過調(diào)用StepHandler的execute方法實(shí)現(xiàn)。
@Slf4j
public class StepExecutor implements Runnable {
// 執(zhí)行器名稱
private String name;
// 線程執(zhí)行的任務(wù)
private StepModel stepModel;
// 線程執(zhí)行的隊(duì)列
private BlockingQueue<StepModel> queue;
// 線程執(zhí)行的業(yè)務(wù)處理邏輯
private Object stepHandler;
// 線程運(yùn)行狀態(tài)
private volatile boolean isRun = false;
// 線程開啟(True)和關(guān)閉(False)
private volatile boolean isClose = false;
// 線程隸屬容器
private StepContainer stepContainer;
// 線程計(jì)數(shù)器(關(guān)閉線程使用)
private CountDownLatch countDownLatch = null;
public StepExecutor() {}
public StepExecutor(String name, BlockingQueue<StepModel> queue,
StepHandler stepHandler, StepContainer stepContainer) {
this.name = name;
this.queue = queue;
this.stepHandler = stepHandler;
this.stepContainer = stepContainer;
}
@Override
public void run() {
this.isRun = true;
this.countDownLatch = new CountDownLatch(1);
// 沒收到關(guān)閉信號(hào),則循環(huán)運(yùn)行
while (!this.isClose) {
this.stepModel = null;
String threadName = "【線程池:" + this.stepContainer.getName()
+ ",線程:" + Thread.currentThread().getName() + "】";
// 循環(huán)運(yùn)行,為防止中斷和卡主,需捕獲異常
try {
StepHandler stepHandler = (StepHandler) this.stepHandler;
this.stepModel = this.stepContainer.getStepFromQueue();
if (this.stepModel != null) {
log.info(threadName + ",處理任務(wù).");
this.stepModel.getStepResultList().clear();
stepHandler.execute(this.stepModel);
// 執(zhí)行完成后結(jié)果數(shù)據(jù)
List<StepResult> stepResultList = this.stepModel.getStepResultList();
boolean isFinished = this.stepModel.isFinished();
boolean isPutInQueueAgain = this.stepModel.isPutInQueueAgain();
boolean isPutInQueueNext = this.stepModel.isPutInQueueNext();
if (isFinished && !isPutInQueueAgain && !isPutInQueueNext) {
log.info(threadName + ",任務(wù)結(jié)束.");
}
if (!isFinished && isPutInQueueAgain && !isPutInQueueNext) {
log.info(threadName + ",任務(wù)在本步驟未完成,重新放隊(duì)列.");
this.stepContainer.putStepInQueueAgain(this.stepModel);
}
if (!isFinished && !isPutInQueueAgain && isPutInQueueNext) {
int resultNum = stepResultList.size();
if (resultNum > 0) {
for (StepResult stepResult : stepResultList) {
log.info(threadName + ",任務(wù)在本步驟已經(jīng)完成,發(fā)送給下一個(gè)線程池: "
+ stepResult.getNextStepName() + ",執(zhí)行.");
this.stepContainer.getFlowManager().sendToNextContainer(
stepResult.getNextStepName(),
stepResult.getResult());
}
}
}
} else {
threadToSleep(1000 * 3L);
}
} catch (Exception e) {
log.info("執(zhí)行器異常.");
e.printStackTrace();
this.stepContainer.putStepInQueueAgain(this.stepModel);
}
}
// 跳出循環(huán)后,線程計(jì)數(shù)減1
this.countDownLatch.countDown();
this.isRun = false;
}
public void stop() {
this.isClose = true;
if (this.countDownLatch != null) {
while (this.countDownLatch.getCount() > 0L) {
try {
this.countDownLatch.await();
} catch (InterruptedException e) {
log.info("線程關(guān)閉異常.");
e.printStackTrace();
}
}
}
this.isClose = false;
}
public void threadToSleep(long time) {
try {
Thread.sleep(time);
} catch (Exception e) {
log.info("線程休眠異常.");
e.printStackTrace();
}
}
}6.StepHandler業(yè)務(wù)處理handler
StepHandler是StepExecutor線程執(zhí)行器,具體執(zhí)行業(yè)務(wù)邏輯的入口。
StepHandler抽象類
每個(gè)具體的實(shí)現(xiàn)類都繼承抽象的StepHandler。
public abstract class StepHandler {
public StepHandler() {}
public abstract void execute(StepModel stepModel);
}Step01Handler
Step01Handler是StepHandler實(shí)現(xiàn)類,從隊(duì)列中取任務(wù)執(zhí)行,執(zhí)行完成后放入下一個(gè)業(yè)務(wù)處理器Step02Handler。
@Slf4j
public class Step01Handler extends StepHandler {
@Override
public void execute(StepModel stepModel) {
log.info("Step01Handler執(zhí)行開始,stepModel: " + stepModel.toString());
OrderInfo orderInfo = (OrderInfo) stepModel.getObj();
List<StepResult> stepResultList = stepModel.getStepResultList();
try {
log.info("Step01Handler執(zhí)行,處理訂單.");
String orderNo = UUID.randomUUID().toString()
.replace("-", "").toUpperCase();
orderInfo.setOrderNo(orderNo);
orderInfo.setPlatformType("線上");
orderInfo.setOrderSource("Web");
stepModel.setFinished(false);
stepModel.setPutInQueueNext(true);
stepModel.setPutInQueueAgain(false);
stepResultList.add(new StepResult(ConstantUtils.STEP_02, orderInfo));
} catch (Exception e) {
stepModel.setFinished(false);
stepModel.setPutInQueueNext(false);
stepModel.setPutInQueueAgain(true);
stepResultList.add(new StepResult(ConstantUtils.STEP_01, orderInfo));
}
log.info("Step01Handler執(zhí)行完成,stepModel: " + stepModel.toString());
}
}Step02Handler
Step02Handler是StepHandler實(shí)現(xiàn)類,從隊(duì)列中取任務(wù)執(zhí)行。
@Slf4j
public class Step02Handler extends StepHandler{
@Override
public void execute(StepModel stepModel) {
log.info("Step02Handler執(zhí)行開始,stepModel: " + stepModel.toString());
OrderInfo orderInfo = (OrderInfo) stepModel.getObj();
List<StepResult> stepResultList = stepModel.getStepResultList();
try {
orderInfo.setEndTime(System.currentTimeMillis());
stepModel.setFinished(true);
stepModel.setPutInQueueNext(false);
stepModel.setPutInQueueAgain(false);
log.info("Step02Handler執(zhí)行,入庫.");
} catch (Exception e) {
stepModel.setFinished(true);
stepModel.setPutInQueueNext(false);
stepModel.setPutInQueueAgain(false);
}
log.info("Step02Handler執(zhí)行完成,stepModel: " + stepModel.toString());
}
}7.阻塞隊(duì)列
BlockingQueue是線程安全的阻塞隊(duì)列。
7.1 FlowQueue
FlowQueue,管理本例使用的兩個(gè)阻塞隊(duì)列。
public class FlowQueue {
private static final LinkedBlockingQueue<StepModel> queueA = new LinkedBlockingQueue<StepModel>();
private static final LinkedBlockingQueue<StepModel> queueB = new LinkedBlockingQueue<StepModel>();
public static LinkedBlockingQueue<StepModel> getBlockingQueue(String queueName) {
LinkedBlockingQueue<StepModel> queue = null;
switch (queueName) {
case "QUEUE_A":
queue = queueA;
break;
case "QUEUE_B":
queue = queueB;
break;
}
return queue;
}
}7.2 QueueUtils
QueueUtils,隊(duì)列簡易工具。
@Slf4j
public class QueueUtils {
public static StepModel getStepFromQueue(
LinkedBlockingQueue<StepModel> queue) {
StepModel stepModel = null;
try {
if (queue.size() > 0) {
stepModel = queue.take();
}
} catch (Exception e) {
log.info("讀隊(duì)列異常.");
e.printStackTrace();
}
return stepModel;
}
public static void putStepPutInQueue(
LinkedBlockingQueue<StepModel> queue, Object obj) {
try {
StepModel stepModel = new StepModel(obj);
stepModel.setPutInQueueTime(System.currentTimeMillis());
queue.put(stepModel);
} catch (Exception e) {
log.info("寫隊(duì)列異常.");
e.printStackTrace();
}
}
public static int getQueueSize(
LinkedBlockingQueue<StepModel> queue) {
int size = 0;
try {
size = queue.size();
} catch (Exception e) {
log.info("獲取隊(duì)列Size異常.");
e.printStackTrace();
}
return size;
}
}7.3 ConstantUtils
ConstantUtils,管理常量,即線程池名稱。
public class ConstantUtils {
public static final String STEP_01 = "STEP_01_THREAD_POOL";
public static final String STEP_02 = "STEP_02_THREAD_POOL";
}8.任務(wù)模型
任務(wù)模型,即具體需要處理對象,封裝成線程使用的任務(wù)模型,這樣可以把業(yè)務(wù)和流程框架解耦。
8.1 StepModel
StepModel,任務(wù)模型封裝。
@Data
public class StepModel {
// 任務(wù)對象
private Object obj;
// 任務(wù)執(zhí)行結(jié)果
private List<StepResult> stepResultList;
// 任務(wù)接收時(shí)間
private long putInQueueTime;
// 任務(wù)完成標(biāo)識(shí)
private boolean isFinished = false;
// 任務(wù)重新放入隊(duì)列標(biāo)識(shí)
private boolean isPutInQueueAgain = false;
// 任務(wù)放入下一個(gè)隊(duì)列標(biāo)識(shí)
private boolean isPutInQueueNext = false;
public StepModel(Object object) {
this.obj = object;
this.stepResultList = new ArrayList<>();
}
}8.2 StepResult
StepResult,執(zhí)行結(jié)果模型封裝。
@Data
public class StepResult {
// 目標(biāo)線程池名
private String nextStepName;
// 執(zhí)行結(jié)果
private Object result;
public StepResult(String nextStepName,Object result){
this.nextStepName = nextStepName;
this.result = result;
}
}9.業(yè)務(wù)數(shù)據(jù)模型
業(yè)務(wù)數(shù)據(jù)模型,即生成具體需要處理的數(shù)據(jù),在傳入給線程池的線程執(zhí)行前,需要封裝成任務(wù)模型。
9.1 OrderInfo
OrderInfo,本例要處理的業(yè)務(wù)數(shù)據(jù)模型。
@Data
@NoArgsConstructor
public class OrderInfo {
private String userName;
private String orderNo;
private String tradeName;
private String platformType;
private String orderSource;
private long orderTime;
private long endTime;
}9.2 ResultObj
ResultObj,web請求返回的統(tǒng)一封裝對象。
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class ResultObj {
private String code;
private String message;
}10.測試
包括web請求和后臺(tái)任務(wù)
10.1 web請求
請求URL: http://127.0.0.1:8080/server/order/f1
入?yún)ⅲ?/p>
{
"userName": "HangZhou0614",
"tradeName": "Vue進(jìn)階教程"
}
返回值:
{
"code": "200",
"message": "成功"
}
10.2 后臺(tái)任務(wù)日志
日志輸出:

到此這篇關(guān)于Springboot詳解線程池與多線程及阻塞隊(duì)列的應(yīng)用詳解的文章就介紹到這了,更多相關(guān)Springboot線程池內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
SpringBoot整合Hashids實(shí)現(xiàn)數(shù)據(jù)ID加密隱藏的全過程
這篇文章主要為大家詳細(xì)介紹了SpringBoot整合Hashids實(shí)現(xiàn)數(shù)據(jù)ID加密隱藏的全過程,文中的示例代碼講解詳細(xì),對大家的學(xué)習(xí)或工作有一定的幫助,感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2024-01-01
SpringCloud?Gateway?DispatcherHandler調(diào)用方法詳細(xì)介紹
我們第一個(gè)關(guān)注的類就是DispatcherHandler,這個(gè)類提供的handle()方法,封裝了我們之后所有的handlerMappings,這個(gè)DispatcherHandler有點(diǎn)想SpringMVC的DispatchServlet,里面也是封裝了請求和對應(yīng)的處理方法的關(guān)系2022-10-10
Maven?項(xiàng)目用Assembly打包可執(zhí)行jar包的方法
這篇文章主要介紹了Maven?項(xiàng)目用Assembly打包可執(zhí)行jar包的方法,該方法只可打包非spring項(xiàng)目的可執(zhí)行jar包,需要的朋友可以參考下2023-03-03
java項(xiàng)目中常用指標(biāo)UV?PV?QPS?TPS含義以及統(tǒng)計(jì)方法

