關(guān)于java自定義線(xiàn)程池的原理與實(shí)現(xiàn)
這一節(jié)來(lái)自定義一個(gè)簡(jiǎn)單的線(xiàn)程池。
一、自定義阻塞隊(duì)列
生產(chǎn)者創(chuàng)建任務(wù)添加到線(xiàn)程池中,線(xiàn)程池中有若干線(xiàn)程來(lái)執(zhí)行任務(wù),如果任務(wù)數(shù)大于線(xiàn)程數(shù),線(xiàn)程池中要有一個(gè)地方來(lái)存儲(chǔ)多余的任務(wù)
線(xiàn)程池中需要一個(gè)存放任務(wù)的阻塞隊(duì)列,所以需要先定義一個(gè)阻塞隊(duì)列
class BlockingQueue<T> {
static Logger LOG = LoggerFactory.getLogger(BlockingQueue.class);
//隊(duì)列
private Deque<T> queue = new ArrayDeque<>();
//隊(duì)列的容量
private int capcity;
private ReentrantLock lock= new ReentrantLock();
//獲取元素時(shí)隊(duì)列為空就到這個(gè)Condition中等待
private Condition emptySet = lock.newCondition();
// 添加元素時(shí)如果隊(duì)列已到達(dá)最大容量就到這個(gè)condition等待
private Condition fullSet = lock.newCondition();
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
//添加元素
public void put(T t){
//queue是共享變量,多線(xiàn)程操作要加鎖
try {
lock.lock();
while(queue.size()==capcity){
//隊(duì)列中元素已達(dá)到最大容量,添加元素的線(xiàn)程等待
try {
LOG.info("隊(duì)列元素已滿(mǎn),添加元素線(xiàn)程等待");
fullSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//走到這里表示隊(duì)列中有空位了
queue.addLast(t);
LOG.info("元素添加成功");
//喚醒等待的獲取元素的線(xiàn)程
emptySet.signalAll();
} finally {
lock.unlock();
}
}
//獲取元素的方法
public T take(){
try {
lock.lock();
while (queue.size()==0){
//隊(duì)列中沒(méi)有元素
try {
LOG.info("隊(duì)列為空,獲取元素線(xiàn)程等待");
emptySet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//走到這里表示隊(duì)列中有元素了
T t = queue.removeFirst();
//叫醒添加元素的等待線(xiàn)程
fullSet.signalAll();
return t;
} finally {
lock.unlock();
}
}
// 帶超時(shí)時(shí)間的獲取元素的方法
public T poll(long time, TimeUnit timeUnit){
try {
lock.lock();
long nanos = timeUnit.toNanos(time);
while (queue.size()==0){
//隊(duì)列中沒(méi)有元素
try {
if(nanos<=0){
LOG.info("等待超時(shí)時(shí)間到,返回null");
return null;
}
LOG.info("隊(duì)列為空,獲取元素線(xiàn)程等待");
//這個(gè)方法的返回值表示剩余的等待時(shí)間,例如本來(lái)等待5s,等了三秒被叫醒了,返回值就是2
nanos = emptySet.awaitNanos(nanos);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//走到這里表示隊(duì)列中有元素了
T t = queue.removeFirst();
//叫醒添加元素的等待線(xiàn)程
fullSet.signalAll();
return t;
} finally {
lock.unlock();
}
}
}我們的線(xiàn)程池中需要一個(gè)阻塞隊(duì)列來(lái)存放任務(wù),可以使用上邊定義的這個(gè)
二、自定義線(xiàn)程池
線(xiàn)程池的代碼
class ThreadPool {
static Logger logger = LoggerFactory.getLogger(ThreadPool.class);
//用來(lái)存儲(chǔ)任務(wù)的隊(duì)列
private BlockingQueue<Runnable> taskQueue;
//核心數(shù),即線(xiàn)程中可以創(chuàng)建的最大線(xiàn)程數(shù)
private int coreSize;
//存放線(xiàn)程的集合
private HashSet<Worker> workers = new HashSet<>();
//線(xiàn)程的空閑時(shí)間,池中的一個(gè)線(xiàn)程如果在這段時(shí)間后還獲取不到任務(wù)就會(huì)自動(dòng)終止
private long time;
private TimeUnit timeUnit;
/**
*
* @param coreSize
* @param capacity 線(xiàn)程池中任務(wù)隊(duì)列的容量
* @param time
* @param timeUnit
*/
public ThreadPool(int coreSize,int capacity, long time, TimeUnit timeUnit) {
this.coreSize = coreSize;
this.time = time;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(capacity);
}
//提交任務(wù)的方法
public void execute(Runnable task){
synchronized (workers){
if(workers.size()<coreSize){
//線(xiàn)程數(shù)小于核心數(shù),啟動(dòng)新線(xiàn)程來(lái)執(zhí)行任務(wù)
Worker worker = new Worker(task);
workers.add(worker);
logger.info("線(xiàn)程池新增線(xiàn)程執(zhí)行任務(wù)");
worker.start();
} else{
//線(xiàn)程數(shù)已達(dá)到核心數(shù),把任務(wù)添加到隊(duì)列中
taskQueue.put(task);
logger.info("線(xiàn)程池添加任務(wù)到隊(duì)列中");
}
}
}
//用來(lái)描述線(xiàn)程池中工作線(xiàn)程的類(lèi)
class Worker extends Thread {
private Runnable task;
public Worker(Runnable task){
this.task =task;
}
@Override
public void run() {
//執(zhí)行任務(wù)的邏輯
//一個(gè)任務(wù)執(zhí)行完后繼續(xù)從任務(wù)隊(duì)列中獲取任務(wù)來(lái)執(zhí)行
while (task!=null || (task=taskQueue.poll(time,timeUnit))!=null){
try {
task.run();
} catch (Exception e) {
e.printStackTrace();
} finally {
//任務(wù)執(zhí)行完后清空
task = null;
}
}
//走到這里表示沒(méi)有從任務(wù)隊(duì)列中獲取到任務(wù),當(dāng)前線(xiàn)程將要結(jié)束
synchronized (workers){
//從線(xiàn)程集合中刪除當(dāng)前線(xiàn)程
workers.remove(this);
}
}
}
}三、測(cè)試
public class Test9 {
private static Logger LOG = LoggerFactory.getLogger(Test9.class);
public static void main(String[] args) {
ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS);
for (int i = 0; i < 4; i++) {
int a=i;
threadPool.execute(new Runnable() {
@Override
public void run() {
LOG.info("第{}個(gè)任務(wù)",a);
}
});
}
}
}四、拒絕策略
上邊的線(xiàn)程池存在一個(gè)問(wèn)題,當(dāng)有大量任務(wù)提交到線(xiàn)程池超過(guò)了任務(wù)隊(duì)列的容量時(shí),提交任務(wù)的線(xiàn)程就會(huì)一直阻塞等待,
//核心1,隊(duì)列容量1
ThreadPool threadPool = new ThreadPool(1,1,1000,TimeUnit.MILLISECONDS);
for (int i = 0; i < 4; i++) {
int a=i;
threadPool.execute(new Runnable() {
@Override
public void run() {
LOG.info("第{}個(gè)任務(wù)",a);
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}像這樣把任務(wù)時(shí)間延長(zhǎng),提交的任務(wù)就會(huì)超過(guò)隊(duì)列容量,這時(shí)主線(xiàn)程就會(huì)阻塞住
實(shí)際上應(yīng)該提供一種拒絕策略來(lái)讓提交任務(wù)的線(xiàn)程自己決定是阻塞死等還是放棄執(zhí)行任務(wù),
為了實(shí)現(xiàn)這個(gè)功能,先抽象一個(gè)接口來(lái)封裝拒絕策略
interface RejectPolicy<T> {
// 把任務(wù)隊(duì)列和當(dāng)前要提交的任務(wù)作為參數(shù)
public void applyPolicy(BlockingQueue<T> queue,T task);
}然后為了應(yīng)用拒絕策略需要在阻塞隊(duì)列BlockingQueue中添加一個(gè)tryPut方法
public void tryPut(RejectPolicy<T> rejectPolicy,T t){
try {
lock.lock();
if(queue.size()>=capacity){
//應(yīng)用拒絕測(cè)試
rejectPolicy.applyPolicy(this,t);
} else {
//還有空間正常添加任務(wù)
queue.addLast(t);
}
} finally {
lock.unlock();
}
}然后修改線(xiàn)程池,添加一個(gè)rejectPolicy屬性,在構(gòu)造方法中由任務(wù)提交者來(lái)賦值
線(xiàn)程池新增的屬性
private RejectPolicy<Runnable> rejectPolicy;
線(xiàn)程池的構(gòu)造方法
public ThreadPool(int coreSize,int capacity, long time, TimeUnit timeUnit,RejectPolicy<Runnable> rejectPolicy) {
this.coreSize = coreSize;
this.time = time;
this.timeUnit = timeUnit;
this.taskQueue = new BlockingQueue<>(capacity);
this.rejectPolicy=rejectPolicy;
}線(xiàn)程池的execute方法
public void execute(Runnable task){
synchronized (workers){
if(workers.size()<coreSize){
//線(xiàn)程數(shù)小于核心數(shù),啟動(dòng)新線(xiàn)程來(lái)執(zhí)行任務(wù)
Worker worker = new Worker(task);
workers.add(worker);
logger.info("線(xiàn)程池新增線(xiàn)程執(zhí)行任務(wù)");
worker.start();
} else{
//線(xiàn)程數(shù)已達(dá)到核心數(shù),把任務(wù)添加到隊(duì)列中,傳遞拒絕策略
taskQueue.tryPut(rejectPolicy,task);
logger.info("線(xiàn)程池添加任務(wù)到隊(duì)列中");
}
}
}可以看到,拒絕策略是有提交任務(wù)的線(xiàn)程指定,最終是由阻塞隊(duì)列來(lái)執(zhí)行,阻塞隊(duì)列不知道拒絕策略具體是什么,這也是java多態(tài)的一種體現(xiàn),面向抽象編程。
到此這篇關(guān)于關(guān)于java自定義線(xiàn)程池的文章就介紹到這了,更多相關(guān)java自定義線(xiàn)程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
elasticsearch聚合查詢(xún)實(shí)踐示例
這篇文章主要為大家介紹了elasticsearch聚合查詢(xún)實(shí)踐示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-12-12
Java數(shù)據(jù)結(jié)構(gòu)之常見(jiàn)排序算法(下)
這篇文章主要介紹了Java數(shù)據(jù)結(jié)構(gòu)之常見(jiàn)排序算法(下),與之相對(duì)有(上),想了解的朋友可以去本網(wǎng)站掃搜,在這兩篇文章里涵蓋關(guān)于八大排序算法的所有內(nèi)容,需要的朋友可以參考下2023-01-01
詳解Spring Boot實(shí)戰(zhàn)之Filter實(shí)現(xiàn)使用JWT進(jìn)行接口認(rèn)證
本篇文章主要介紹了詳解Spring Boot實(shí)戰(zhàn)之Filter實(shí)現(xiàn)使用JWT進(jìn)行接口認(rèn)證,具有一定的參考價(jià)值,有興趣的可以了解一下2017-07-07
java實(shí)現(xiàn)分段讀取文件并通過(guò)HTTP上傳的方法
這篇文章主要介紹了java實(shí)現(xiàn)分段讀取文件并通過(guò)HTTP上傳的方法,實(shí)例分析了java分段讀取文件及使用http實(shí)現(xiàn)文件傳輸?shù)南嚓P(guān)技巧,具有一定參考借鑒價(jià)值,需要的朋友可以參考下2015-07-07
Java的Synchronized關(guān)鍵字學(xué)習(xí)指南(全面 & 詳細(xì))
這篇文章主要給大家介紹了關(guān)于Java的Synchronized關(guān)鍵字的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03

