Java實(shí)現(xiàn)自定義線程池拒絕策略的完整代碼
自定義拒絕策略的核心是實(shí)現(xiàn)java.util.concurrent.RejectedExecutionHandler接口,重寫唯一的rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法:
- 參數(shù)
r:被線程池拒絕的任務(wù)實(shí)例; - 參數(shù)
executor:當(dāng)前線程池實(shí)例,可通過它獲取實(shí)時(shí)運(yùn)行狀態(tài)(活躍線程數(shù)、隊(duì)列任務(wù)數(shù)、完成任務(wù)數(shù)等),用于問題排查。
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 自定義拒絕策略Demo:詳細(xì)日志+線程池狀態(tài)監(jiān)控
*/
public class CustomRejectPolicyDemo {
// 1. 實(shí)現(xiàn)RejectedExecutionHandler,自定義拒絕策略
static class MyCustomRejectPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// 核心:打印詳細(xì)拒絕日志 + 線程池實(shí)時(shí)狀態(tài)(生產(chǎn)級(jí)排查必備)
String rejectLog = String.format(
"===== 任務(wù)被拒絕 =====\n" +
"拒絕時(shí)間:%s\n" +
"被拒任務(wù):%s\n" +
"線程池狀態(tài):運(yùn)行中=%s | 核心線程數(shù)=%d | 最大線程數(shù)=%d\n" +
"當(dāng)前狀態(tài):活躍線程數(shù)=%d | 隊(duì)列任務(wù)數(shù)=%d | 已完成任務(wù)數(shù)=%d | 總?cè)蝿?wù)數(shù)=%d",
System.currentTimeMillis(),
r.toString(),
executor.isRunning(), // 線程池是否處于RUNNING狀態(tài)
executor.getCorePoolSize(),
executor.getMaximumPoolSize(),
executor.getActiveCount(), // 實(shí)時(shí)活躍線程數(shù)
executor.getQueue().size(), // 隊(duì)列中等待的任務(wù)數(shù)
executor.getCompletedTaskCount(), // 已完成的任務(wù)總數(shù)
executor.getTaskCount() // 提交的總?cè)蝿?wù)數(shù)(完成+執(zhí)行中+隊(duì)列中)
);
// 實(shí)際生產(chǎn)中可替換為SLF4J/Logback日志框架
System.err.println(rejectLog);
// 可選:根據(jù)業(yè)務(wù)需求擴(kuò)展(如任務(wù)持久化到MQ/Redis、觸發(fā)限流告警、主線程兜底執(zhí)行等)
}
}
// 2. 自定義線程工廠(可選,用于設(shè)置線程名,便于排查任務(wù)執(zhí)行線程)
static class MyThreadFactory implements ThreadFactory {
private final AtomicInteger threadId = new AtomicInteger(1);
private final String threadPrefix = "biz-worker-thread-";
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, threadPrefix + threadId.getAndIncrement());
thread.setDaemon(false); // 非守護(hù)線程,保證任務(wù)執(zhí)行完成
thread.setPriority(Thread.NORM_PRIORITY); // 正常優(yōu)先級(jí)
return thread;
}
}
// 主程序:測試自定義拒絕策略
public static void main(String[] args) throws InterruptedException {
// 線程池核心參數(shù):故意設(shè)置小參數(shù),方便觸發(fā)拒絕策略
int corePoolSize = 2; // 核心線程數(shù)
int maximumPoolSize = 4; // 最大線程數(shù)
long keepAliveTime = 1; // 非核心線程空閑超時(shí)時(shí)間
int queueCapacity = 2; // 任務(wù)隊(duì)列容量(有限隊(duì)列,必選,否則無法觸發(fā)拒絕策略)
// 3. 創(chuàng)建線程池,指定【自定義拒絕策略】
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity), // 有限容量隊(duì)列,觸發(fā)拒絕策略的關(guān)鍵
new MyThreadFactory(), // 自定義線程工廠(可選)
new MyCustomRejectPolicy() // 核心:指定自定義拒絕策略
);
try {
// 提交7個(gè)任務(wù),觸發(fā)拒絕策略(2核心+2隊(duì)列+2非核心=6,第7個(gè)任務(wù)被拒絕)
for (int i = 1; i <= 7; i++) {
int taskId = i;
executor.submit(() -> {
try {
// 模擬任務(wù)執(zhí)行耗時(shí)1秒
TimeUnit.SECONDS.sleep(1);
System.out.printf("任務(wù)%d執(zhí)行成功,執(zhí)行線程:%s%n", taskId, Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.printf("任務(wù)%d被中斷%n", taskId);
}
});
System.out.printf("任務(wù)%d已提交%n", taskId);
}
} finally {
// 4. 優(yōu)雅關(guān)閉線程池(必須,避免資源泄漏)
executor.shutdown();
// 等待線程池終止,超時(shí)則強(qiáng)制關(guān)閉
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
}
}
}
本次線程池參數(shù)設(shè)置為核心2+最大4+隊(duì)列2,總處理能力 = 6(2 核心線程執(zhí)行 + 2 隊(duì)列等待 + 2 非核心線程執(zhí)行),提交第 7 個(gè)任務(wù)時(shí),滿足任務(wù)隊(duì)列已滿 + 線程數(shù)達(dá)到 maximumPoolSize,觸發(fā)自定義拒絕策略。
任務(wù)1已提交
任務(wù)2已提交
任務(wù)3已提交
任務(wù)4已提交
任務(wù)5已提交
任務(wù)6已提交
任務(wù)7已提交
===== 任務(wù)被拒絕 =====
拒絕時(shí)間:1738456200000
被拒任務(wù):java.util.concurrent.FutureTask@1b6d3586
線程池狀態(tài):運(yùn)行中=true | 核心線程數(shù)=2 | 最大線程數(shù)=4
當(dāng)前狀態(tài):活躍線程數(shù)=4 | 隊(duì)列任務(wù)數(shù)=2 | 已完成任務(wù)數(shù)=0 | 總?cè)蝿?wù)數(shù)=7
任務(wù)1執(zhí)行成功,執(zhí)行線程:biz-worker-thread-1
任務(wù)2執(zhí)行成功,執(zhí)行線程:biz-worker-thread-2
任務(wù)3執(zhí)行成功,執(zhí)行線程:biz-worker-thread-3
任務(wù)4執(zhí)行成功,執(zhí)行線程:biz-worker-thread-4
任務(wù)5執(zhí)行成功,執(zhí)行線程:biz-worker-thread-1
任務(wù)6執(zhí)行成功,執(zhí)行線程:biz-worker-thread-2
- 自定義拒絕策略必須實(shí)現(xiàn)
RejectedExecutionHandler接口,重寫rejectedExecution方法; - 觸發(fā)拒絕策略的唯一條件:任務(wù)隊(duì)列已滿 + 線程池當(dāng)前線程數(shù)達(dá)到
maximumPoolSize; - 通過
ThreadPoolExecutor實(shí)例可獲取全量運(yùn)行狀態(tài),是生產(chǎn)排查任務(wù)拒絕問題的關(guān)鍵; - 線程池必須使用有限容量隊(duì)列,否則任務(wù)會(huì)無限入隊(duì),拒絕策略永遠(yuǎn)不會(huì)生效;
- 業(yè)務(wù)結(jié)束后必須優(yōu)雅關(guān)閉線程池(
shutdown()+awaitTermination()),避免資源泄漏。
到此這篇關(guān)于Java實(shí)現(xiàn)自定義線程池拒絕策略的完整代碼的文章就介紹到這了,更多相關(guān)Java拒絕策略內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Disconf實(shí)現(xiàn)分布式配置管理的原理與設(shè)計(jì)
這篇文章主要為大家介紹了Disconf實(shí)現(xiàn)分布式配置管理的原理與設(shè)計(jì)分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步2022-03-03
SpringBoot從2.7.x 升級(jí)到3.3注意事項(xiàng)
從SpringBoot 2.7.x升級(jí)到3.3涉及多個(gè)重要變更,特別是因?yàn)?nbsp;Spring Boot 3.x 系列基于 Jakarta EE 9,而不再使用 Java EE,本文就來詳細(xì)的介紹一下,感興趣的可以了解一下2024-09-09
實(shí)例講解Java設(shè)計(jì)模式編程中如何運(yùn)用代理模式
這篇文章主要介紹了Java設(shè)計(jì)模式編程中如何運(yùn)用代理模式,文中舉了普通代理和強(qiáng)制代理的例子作為代理模式的擴(kuò)展內(nèi)容,需要的朋友可以參考下2016-02-02
Springboot居然可以設(shè)置動(dòng)態(tài)的Banner(推薦)
這篇文章主要介紹了Springboot居然可以設(shè)置動(dòng)態(tài)的Banner,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-03-03
使用spring boot 整合kafka,延遲啟動(dòng)消費(fèi)者
這篇文章主要介紹了使用spring boot 整合kafka,延遲啟動(dòng)消費(fèi)者的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-08-08
SpringBoot+OCR實(shí)現(xiàn)PDF內(nèi)容識(shí)別的示例代碼
在SpringBoot中,您可以結(jié)合OCR庫來實(shí)現(xiàn)對(duì)PDF文件內(nèi)容的識(shí)別和提取,本文就來介紹一下如何使用 Tesseract 和 pdf2image 對(duì) PDF 文件進(jìn)行OCR識(shí)別和提取,具有一定的參考價(jià)值,感興趣的可以了解一下2023-12-12

