Java實(shí)現(xiàn)手寫乞丐版線程池的示例代碼
前言
在上篇文章線程池的前世今生當(dāng)中我們介紹了實(shí)現(xiàn)線程池的原理,在這篇文章當(dāng)中我們主要介紹實(shí)現(xiàn)一個(gè)非常簡(jiǎn)易版的線程池,深入的去理解其中的原理,麻雀雖小,五臟俱全。
線程池的具體實(shí)現(xiàn)
線程池實(shí)現(xiàn)思路
任務(wù)保存到哪里?
在上篇文章線程池的前世今生當(dāng)中我們具體去介紹了線程池當(dāng)中的原理。在線程池當(dāng)中我們有很多個(gè)線程不斷的從任務(wù)池(用戶在使用線程池的時(shí)候不斷的使用execute方法將任務(wù)添加到線程池當(dāng)中)里面去拿任務(wù)然后執(zhí)行,現(xiàn)在需要思考我們應(yīng)該用什么去實(shí)現(xiàn)任務(wù)池呢?
答案是阻塞隊(duì)列,因?yàn)槲覀冃枰WC在多個(gè)線程往任務(wù)池里面加入任務(wù)的時(shí)候并發(fā)安全,JDK已經(jīng)給我們提供了這樣的數(shù)據(jù)結(jié)構(gòu)——BlockingQueue,這個(gè)是一個(gè)并發(fā)安全的阻塞隊(duì)列,他之所以叫做阻塞隊(duì)列,是因?yàn)槲覀兛梢栽O(shè)置隊(duì)列當(dāng)中可以容納數(shù)據(jù)的個(gè)數(shù),當(dāng)加入到隊(duì)列當(dāng)中的數(shù)據(jù)超過這個(gè)值的時(shí)候,試圖將數(shù)據(jù)加入到阻塞隊(duì)列當(dāng)中的線程就會(huì)被掛起。當(dāng)隊(duì)列當(dāng)中為空的時(shí)候,試圖從隊(duì)列當(dāng)中取出數(shù)據(jù)的線程也會(huì)被掛起。
線程的設(shè)計(jì)
在我們自己實(shí)現(xiàn)的線程池當(dāng)中我們定一個(gè)Worker類去不斷的從任務(wù)池當(dāng)中取出任務(wù),然后進(jìn)行執(zhí)行。在我們自己定義的worker當(dāng)中還需要有一個(gè)變量isStopped表示線程是否停止工作。同時(shí)在worker當(dāng)中還需要保存當(dāng)前是哪個(gè)線程在執(zhí)行任務(wù),因此在我們自己設(shè)計(jì)的woker類當(dāng)中還需要有一個(gè)thisThread變量,保存正在執(zhí)行任務(wù)的線程,因此worker的整體設(shè)計(jì)如下:
package cscore.concurrent.java.threadpool;
import java.util.concurrent.BlockingQueue;
public class Worker implements Runnable {
private Thread thisThread; // 表示正在執(zhí)行任務(wù)的線程
private BlockingQueue<Runnable> taskQueue; // 由線程池傳遞過來的任務(wù)隊(duì)列
private volatile boolean isStopped; // 表示 worker 是否停止工作 需要使用 volatile 保證線程之間的可見性
public Worker(BlockingQueue taskQueue) { // 這個(gè)構(gòu)造方法是在線程池的實(shí)現(xiàn)當(dāng)中會(huì)被調(diào)用
this.taskQueue = taskQueue;
}
// 線程執(zhí)行的函數(shù)
@Override
public void run() {
thisThread = Thread.currentThread(); // 獲取執(zhí)行任務(wù)的線程
while (!isStopped) { // 當(dāng)線程沒有停止的時(shí)候就不斷的去任務(wù)池當(dāng)中取出任務(wù)
try {
Runnable task = taskQueue.take(); // 從任務(wù)池當(dāng)中取出任務(wù) 當(dāng)沒有任務(wù)的時(shí)候線程會(huì)被這個(gè)方法阻塞
task.run(); // 執(zhí)行任務(wù) 任務(wù)就是一個(gè) Runnable 對(duì)象
} catch (InterruptedException e) {
// do nothing
// 這個(gè)地方很重要 你有沒有思考過一個(gè)問題當(dāng)任務(wù)池當(dāng)中沒有任務(wù)的時(shí)候 線程會(huì)被阻塞在 take 方法上
// 如果我們后面沒有任務(wù)提交拿他就會(huì)一直阻塞 那么我們?cè)撊绾螁拘阉?
// 答案就在下面的函數(shù)當(dāng)中 調(diào)用線程的 interruput 方法 那么take方法就會(huì)產(chǎn)生一個(gè)異常 然后我們
// 捕獲到一異常 然后線程退出
}
}
}
public synchronized void stopWorker() {
if (isStopped) {
throw new RuntimeException("thread has been interrupted");
}
isStopped = true;
thisThread.interrupt(); // 中斷線程產(chǎn)生異常
}
public synchronized boolean isStopped() {
return isStopped;
}
}
線程池的參數(shù)
在我們自己實(shí)現(xiàn)的線程池當(dāng)中,我們只需要定義兩個(gè)參數(shù)一個(gè)是線程的個(gè)數(shù),另外一個(gè)是阻塞隊(duì)列(任務(wù)池)當(dāng)中最大的任務(wù)個(gè)數(shù)。在我們自己實(shí)現(xiàn)的線程池當(dāng)中還需要有一個(gè)變量isStopped表示線程池是否停止工作了,因此線程池的初步設(shè)計(jì)大致如下:
private BlockingQueue taskQueue; // 任務(wù)池
private volatile boolean isStopped; //
private final List<Worker> workers = new ArrayList<>();// 保存所所有的執(zhí)行任務(wù)的線程
public ThreadPool(int numThreads, int maxTasks) {
this.taskQueue = new ArrayBlockingQueue(maxTasks);
for (int i = 0; i < numThreads; i++) {
workers.add(new Worker(this.taskQueue));
}
int i = 1;
// 這里產(chǎn)生線程 然后啟動(dòng)線程
for (Worker worker : workers) {
new Thread(worker, "ThreadPool-" + i + "-thread").start();
i++;
}
}
線程池實(shí)現(xiàn)代碼
在上文當(dāng)中我們大致設(shè)計(jì)的線程池的初步結(jié)構(gòu),從上面的結(jié)果可以看出當(dāng)我們?cè)煲粋€(gè)ThreadPool對(duì)象的時(shí)候會(huì)產(chǎn)生指定線程的數(shù)目線程并且啟動(dòng)他們?nèi)?zhí)行任務(wù),現(xiàn)在我們還需要設(shè)計(jì)的就是如果關(guān)閉線程!我們?cè)陉P(guān)閉線程的時(shí)候還需要保證所有的任務(wù)都被執(zhí)行完成然后才關(guān)閉所有的線程,再退出,我們?cè)O(shè)計(jì)這個(gè)方法為shutDown。除此之外我們還設(shè)計(jì)一個(gè)函數(shù)可以強(qiáng)制退出,不用執(zhí)行所有的任務(wù)了,就直接退出,這個(gè)方法為stop。整個(gè)線程池實(shí)現(xiàn)的代碼如下:
package cscore.concurrent.java.threadpool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ThreadPool {
private BlockingQueue taskQueue;
private volatile boolean isStopped;
private final List<Worker> workers = new ArrayList<>();
public ThreadPool(int numThreads, int maxTasks) {
this.taskQueue = new ArrayBlockingQueue(maxTasks);
for (int i = 0; i < numThreads; i++) {
workers.add(new Worker(this.taskQueue));
}
int i = 1;
for (Worker worker : workers) {
new Thread(worker, "ThreadPool-" + i + "-thread").start();
i++;
}
}
// 下面這個(gè)方法是向線程池提交任務(wù)
public void execute(Runnable runnable) throws InterruptedException {
if (isStopped) {
// 如果線程池已經(jīng)停下來了,就不在向任務(wù)隊(duì)列當(dāng)中提交任務(wù)了
System.err.println("thread pool has been stopped, so quit submitting task");
return;
}
taskQueue.put(runnable);
}
// 強(qiáng)制關(guān)閉線程池
public synchronized void stop() {
isStopped = true;
for (Worker worker : workers) {
worker.stopWorker();
}
}
public synchronized void shutDown() {
// 先表示關(guān)閉線程池 線程就不能再向線程池提交任務(wù)
isStopped = true;
// 先等待所有的任務(wù)執(zhí)行完成再關(guān)閉線程池
waitForAllTasks();
stop();
}
private void waitForAllTasks() {
// 當(dāng)線程池當(dāng)中還有任務(wù)的時(shí)候 就不退出循環(huán)
while (taskQueue.size() > 0)
Thread.yield();
}
}
線程池測(cè)試代碼
package cscore.concurrent.java.threadpool;
public class TestPool {
public static void main(String[] args) throws InterruptedException {
ThreadPool pool = new ThreadPool(3, 1024);
for (int i = 0; i < 10; i++) {
int tmp = i;
pool.execute(() -> {
System.out.println(Thread.currentThread().getName() + " say hello " + tmp);
});
}
pool.shutDown();
}
}
上面的代碼輸出結(jié)果:
ThreadPool-2-thread say hello 1
ThreadPool-2-thread say hello 3
ThreadPool-2-thread say hello 4
ThreadPool-2-thread say hello 5
ThreadPool-2-thread say hello 6
ThreadPool-2-thread say hello 7
ThreadPool-2-thread say hello 8
ThreadPool-2-thread say hello 9
ThreadPool-3-thread say hello 2
ThreadPool-1-thread say hello 0
從上面的結(jié)果來看確實(shí)實(shí)現(xiàn)了線程池的效果。
雜談
可能你會(huì)有疑問,當(dāng)我們調(diào)用 interrupt的時(shí)候是如何產(chǎn)生異常的,我們仔細(xì)看一個(gè)阻塞隊(duì)列的實(shí)現(xiàn)。在ArrayBlockingQueue當(dāng)中take方法實(shí)現(xiàn)如下:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}在這個(gè)方法當(dāng)中調(diào)用的是鎖的lock.lockInterruptibly();方法,當(dāng)調(diào)用這個(gè)方法的時(shí)候線程是可以被interrupt方法中斷的,然后會(huì)拋出InterruptedException異常。
總結(jié)
在本篇文章當(dāng)中我們主要實(shí)現(xiàn)了一個(gè)乞丐版的線程池,這個(gè)線程池離JDK給我們提供的線程池還是有一點(diǎn)距離,JDK給我們提供給的線程池還有很多其他的參數(shù),我們將在后續(xù)的幾篇文章當(dāng)中繼續(xù)向JDK給我們提供的線程池靠近,直至實(shí)現(xiàn)一個(gè)盜版的JDK的線程池。本篇文章的代碼在下面的鏈接當(dāng)中也可以訪問。
到此這篇關(guān)于Java實(shí)現(xiàn)手寫乞丐版線程池的示例代碼的文章就介紹到這了,更多相關(guān)Java線程池內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- Java實(shí)現(xiàn)手寫一個(gè)線程池的示例代碼
- 一文了解Java?線程池的正確使用姿勢(shì)
- Java手寫線程池之向JDK線程池進(jìn)發(fā)
- Java線程池源碼的深度解析
- 一篇文章帶你搞懂Java線程池實(shí)現(xiàn)原理
- 詳解Java線程池如何統(tǒng)計(jì)線程空閑時(shí)間
- Java中的異步與線程池解讀
- 詳解Java線程池隊(duì)列中的延遲隊(duì)列DelayQueue
- 一文帶你弄懂Java中線程池的原理
- java 線程池的實(shí)現(xiàn)原理、優(yōu)點(diǎn)與風(fēng)險(xiǎn)、以及4種線程池實(shí)現(xiàn)
相關(guān)文章
Spring MVC實(shí)現(xiàn)一次簡(jiǎn)單的CRUD示例
這篇文章主要介紹了Spring MVC實(shí)現(xiàn)一次簡(jiǎn)單的CRUD示例,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-08-08
SpringBoot參數(shù)校驗(yàn)及原理全面解析
文章介紹了SpringBoot中使用@Validated和@Valid注解進(jìn)行參數(shù)校驗(yàn)的方法,包括基本用法和進(jìn)階用法,如自定義驗(yàn)證注解、多屬性聯(lián)合校驗(yàn)和嵌套校驗(yàn),并簡(jiǎn)要介紹了實(shí)現(xiàn)原理2024-11-11
springboot中使用@Transactional注解事物不生效的坑
這篇文章主要介紹了springboot中使用@Transactional注解事物不生效的原因,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01
Linux中使用shell腳本管理Java應(yīng)用程序
在日常開發(fā)和運(yùn)維工作中,管理基于Java的應(yīng)用程序是一項(xiàng)基礎(chǔ)且頻繁的任務(wù),本文將通過一個(gè)示例腳本,展示如何利用Shell腳本簡(jiǎn)化這一流程,實(shí)現(xiàn)Java應(yīng)用的一鍵式啟動(dòng)、停止與重啟操作,本腳本不僅提升了工作效率,還確保了操作的標(biāo)準(zhǔn)化與可靠性2024-06-06
springboot攔截器Interceptor的使用,你都了解嗎
springmvc 中的攔截器可以對(duì)請(qǐng)求進(jìn)行判別,在請(qǐng)求到達(dá)控制器之前,把非法的請(qǐng)求給攔截掉下面來說一說, 它在springboot中的使用,感興趣的朋友一起看看吧2021-07-07

