Java自定義線程池的實(shí)現(xiàn)示例
一、Java語(yǔ)言本身也是多線程,回顧Java創(chuàng)建線程方式如下:
1、繼承Thread類,(Thread類實(shí)現(xiàn)Runnable接口),來(lái)個(gè)類圖加深印象。

2、實(shí)現(xiàn)Runnable接口實(shí)現(xiàn)無(wú)返回值、實(shí)現(xiàn)run()方法,啥時(shí)候run,黑話了。
3、實(shí)現(xiàn)Callable接口重寫call()+FutureTask獲取.
public class CustomThread {
public static void main(String[] args) {
// 自定義線程
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("Custom Run");
System.out.println(Thread.currentThread().getName());
}
},"custom-thread-1").start();
}
}
4、基于線程池集中管理創(chuàng)建線程系列周期.【本篇文章重點(diǎn)介紹】
二、JDK線程池工具類.
1、Executors工具類,是JDK中Doug Lea大佬實(shí)現(xiàn)供開發(fā)者使用。

隨著JDK版本迭代逐漸加入了基于工作竊取算法的線程池了,阿里編碼規(guī)范也推薦開發(fā)者自定義線程池,禁止生產(chǎn)直接使用Executos線程池工具類,因此很有可能造成OOM異常。同時(shí)在某些類型的線程池里面,使用無(wú)界隊(duì)列還會(huì)導(dǎo)致maxinumPoolSize、keepAliveTime、handler等參數(shù)失效。因此目前在大廠的開發(fā)規(guī)范中會(huì)強(qiáng)調(diào)禁止使用Executors來(lái)創(chuàng)建線程池。這里說(shuō)道阻塞隊(duì)列。LinkedBlockingQueue。

2、自定義線程池工具類基于ThreadPoolExecutor實(shí)現(xiàn),那個(gè)JDK封裝的線程池工具類也是基于這個(gè)ThreadPoolExecutor實(shí)現(xiàn)的。
public class ConstomThreadPool extends ThreadPoolExecutor{
/**
*
* @param corePoolSize 核心線程池
* @param maximumPoolSize 線程池最大數(shù)量
* @param keepAliveTime 線程存活時(shí)間
* @param unit TimeUnit
* @param workQueue 工作隊(duì)列,自定義大小
* @param poolName 線程工廠自定義線程名稱
*/
public ConstomThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
setThreadFactory(new CustomThreadFactory(poolName, false));
}
}自定義線程工廠類,這樣線程命名有開發(fā)者控制實(shí)現(xiàn)了,這樣參數(shù)可以做到可配置化,生產(chǎn)環(huán)境可以供不同業(yè)務(wù)模塊使用,如果系統(tǒng)配置值不生效,就給一個(gè)默認(rèn)值,更加滿足業(yè)務(wù)需要.
/**
* 自定義線程工廠
*/
public class CustomThreadFactory implements ThreadFactory {
/**
* 線程前綴,采用AtomicInteger實(shí)現(xiàn)線程編號(hào)線程安全自增
*/
private final AtomicInteger atomicInteger = new AtomicInteger(1);
/**
* 線程命名前綴
*/
private final String namePrefix;
/**
* 線程工廠創(chuàng)建的線程是否是守護(hù)線程
*/
private final boolean isDaemon;
public CustomThreadFactory(String prefix, boolean daemin) {
if (StringUtils.isNoneBlank(prefix)) {
this.namePrefix = prefix;
} else {
this.namePrefix = "thread_pool";
}
// 是否是守護(hù)線程
isDaemon = daemin;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, namePrefix + "-" + atomicInteger.getAndIncrement());
thread.setDaemon(isDaemon);
// 設(shè)置線程優(yōu)先級(jí)
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
}這里Spring框架提供的自定義線程池工廠類,當(dāng)然了一些開源包也會(huì)提供這樣的輪子,這個(gè)比較簡(jiǎn)單了.
@SuppressWarnings("serial")
public class CustomizableThreadFactory extends CustomizableThreadCreator implements ThreadFactory {
/**
* Create a new CustomizableThreadFactory with default thread name prefix.
*/
public CustomizableThreadFactory() {
super();
}
/**
* Create a new CustomizableThreadFactory with the given thread name prefix.
* @param threadNamePrefix the prefix to use for the names of newly created threads
*/
public CustomizableThreadFactory(String threadNamePrefix) {
super(threadNamePrefix);
}
@Override
public Thread newThread(Runnable runnable) {
return createThread(runnable);
}
}3、SpringBoot框架提供的自定義線程池,基于異步注解@Async名稱和一些業(yè)務(wù)自定義配置項(xiàng),很好的實(shí)現(xiàn)了業(yè)務(wù)間線程池的隔離。
@Configuration
public class ThreadPoolConfig {
/**
*
* @return ThreadPoolTaskExecutor
*/
@Bean("serviceTaskA")
public ThreadPoolTaskExecutor serviceTaskA() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("service-a");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
/**
*
* @return ThreadPoolTaskExecutor
*/
@Bean("serviceTaskB")
public ThreadPoolTaskExecutor serviceTaskB() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(10);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("service-b");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}整體來(lái)看是Spring框架對(duì)JDK的線程池做了封裝,公開發(fā)者使用,畢竟框架嘛,肯定是把方便留給開發(fā)者。

4、并發(fā)流線程池。
List<String> list = new ArrayList<>(4);
list.add("A");
list.add("B");
list.add("C");
list.add("D");
list.parallelStream().forEach(string -> {
string = string + "paralleStream";
System.out.println(Thread.currentThread().getName()+":-> "+string);
});運(yùn)行實(shí)例:

說(shuō)明:并發(fā)流默認(rèn)使用系統(tǒng)公共的線程池ForkJoinWorkerThread,供整個(gè)程序使用。

類圖如下,基于分治法,雙端竊取算法實(shí)現(xiàn)的一種線程池。

ForkJoin實(shí)現(xiàn)的了自己的線程工廠命名。

也可以自定義并發(fā)流線程,然后提交任務(wù),一般并發(fā)流適用于短暫耗時(shí)業(yè)務(wù),避免拖垮整個(gè)線程池業(yè)務(wù).
5、實(shí)現(xiàn)一個(gè)基于系統(tǒng)公用線程池工具類,運(yùn)行這個(gè)系統(tǒng)中的異步業(yè)務(wù).
public final class CustomExecutors {
/**
* 核心線程數(shù)大小
*/
private static final int CORE_POOL_SIZE=5;
/**
* 核心線程池大小
*/
private static final int MAX_POOL_SIZE=10;
/**
* 線程存活時(shí)間
*/
private static final int KEEP_ALIVE_TIME=60;
/**
* 工作隊(duì)列大小
*/
private static final LinkedBlockingQueue queue=new LinkedBlockingQueue(100);
/**
* 自定義線程池名前綴
*/
private static final String POOL_PREFIX_NAME="Custom-Common-Pool";
private CustomExecutors(){
//throw new XXXXException("un support create pool!");
}
private static ConstomThreadPool constomThreadPool;
/**
* 靜態(tài)塊初始化只執(zhí)行一次,不關(guān)閉,整個(gè)系統(tǒng)公用一個(gè)線程池
*/
static {
constomThreadPool=new ConstomThreadPool(CORE_POOL_SIZE,MAX_POOL_SIZE,KEEP_ALIVE_TIME,TimeUnit.SECONDS,queue,POOL_PREFIX_NAME);
}
/**
* 單例模式獲取線程池
* @return ExecutorService
*/
private static ExecutorService getInstance(){
return constomThreadPool;
}
private static Future<?> submit(Runnable task){
return constomThreadPool.submit(task);
}
private static <T> Future<T> submit(Runnable task, T result){
return constomThreadPool.submit(task,result);
}
private static <T> Future<T> submit(Callable<T> task){
return constomThreadPool.submit(task);
}
private static void execute(Runnable task){
constomThreadPool.execute(task);
}
}三、業(yè)界知名自定義線程池?cái)U(kuò)展使用.
1、org.apache.tomcat.util.threads;【Tomcat線程池】

2、XXL-JOB分布式任務(wù)調(diào)度框架的快慢線程池,線程池任務(wù)隔離.
public class JobTriggerPoolHelper {
private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class);
// ---------------------- trigger pool ----------------------
// fast/slow thread pool
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
public void start(){
fastTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(1000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
}
});
slowTriggerPool = new ThreadPoolExecutor(
10,
XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
}
});
}
public void stop() {
//triggerPool.shutdown();
fastTriggerPool.shutdownNow();
slowTriggerPool.shutdownNow();
logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success.");
}
// job timeout count
private volatile long minTim = System.currentTimeMillis()/60000; // ms > min
private volatile ConcurrentMap<Integer, AtomicInteger> jobTimeoutCountMap = new ConcurrentHashMap<>();
/**
* add trigger
*/
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// do trigger
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
// check timeout-count-map
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
}
}
});
}
// ---------------------- helper ----------------------
private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper();
public static void toStart() {
helper.start();
}
public static void toStop() {
helper.stop();
}
/**
* @param jobId
* @param triggerType
* @param failRetryCount
* >=0: use this param
* <0: use param from job info config
* @param executorShardingParam
* @param executorParam
* null: use job param
* not null: cover job param
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
}①、定義兩個(gè)線程池,一個(gè)是fastTriggerPool,另一個(gè)是slowTriggerPool。
②、定義一個(gè)容器ConcurrentMap,存放每個(gè)任務(wù)的執(zhí)行慢次數(shù),60秒后自動(dòng)清空該容器。
③、在線程的run()方法中計(jì)算每個(gè)任務(wù)的耗時(shí),如果大于500ms,則任務(wù)的慢執(zhí)行次數(shù)+1。

3、基于線程池動(dòng)態(tài)監(jiān)控動(dòng)態(tài)線程池

引用圖片,線程池常見(jiàn)問(wèn)題

還有比較多啦,例如ES的基于JDK的線程池,Dubbo中等。
到此這篇關(guān)于Java自定義線程池的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)Java自定義線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java基礎(chǔ)知識(shí)之FileInputStream流的使用
這篇文章主要介紹了java基礎(chǔ)知識(shí)之FileInputStream流的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
IDEA中添加xml配置文件時(shí),顯示file問(wèn)題
這篇文章主要介紹了IDEA中添加xml配置文件時(shí),顯示file問(wèn)題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-12-12
MyBatis找不到mapper文件的實(shí)現(xiàn)
這篇文章主要介紹了MyBatis找不到mapper文件的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10
解決IDEA?2022?Translation?翻譯文檔失敗:?未知錯(cuò)誤的問(wèn)題
這篇文章主要介紹了IDEA?2022?Translation?翻譯文檔失敗:?未知錯(cuò)誤,本文較詳細(xì)的給大家介紹了IDEA?2022?Translation未知錯(cuò)誤翻譯文檔失敗的解決方法,需要的朋友可以參考下2022-04-04
java BASE64Encoder詳細(xì)介紹及簡(jiǎn)單實(shí)例
這篇文章主要介紹了java BASE64Encoder詳細(xì)介紹及簡(jiǎn)單實(shí)例的相關(guān)資料,需要的朋友可以參考下2017-01-01
Java實(shí)現(xiàn)動(dòng)態(tài)獲取圖片驗(yàn)證碼的示例代碼
這篇文章主要介紹了Java實(shí)現(xiàn)動(dòng)態(tài)獲取圖片驗(yàn)證碼的示例代碼,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-08-08

