Java多線程并發(fā)之線程池任務請求攔截測試實例
一、需求
前端會傳入一個存儲編碼的list,后臺接收到編碼通過計算返回每個編碼對應的值,每個編碼計算出來的值是固定不變的。
二、設計方案
因為前端請求響應有一個時常要求,比如100ms。
而這個計算比較耗時,因此為了請求能夠快速響應,在第一個請求過來時判斷redis緩存是否存儲編碼對應的計算值,如果沒有就直接返回空,前端根據(jù)這個空值使用補償方案的默認值。
后臺通過線程池執(zhí)行計算方法,然后存入redis,這樣下次用戶帶著相同的編碼請求就可以直接從緩存獲取,不用重復計算。
這里的問題在于,當并發(fā)量高的情況下,比如50個用戶帶著相同的編碼調(diào)用計算方法,而實際上計算方法只需要調(diào)用一次就可以了。
因此我們需要在將任務提交到線程池之前判斷線程池中執(zhí)行線程的數(shù)量來決定是否要將任務提交到線程池。
另外這里千萬不能使用直接創(chuàng)建線程的方式,這會導致并發(fā)情況下突然創(chuàng)建大量線程,導致系統(tǒng)cpu飆升卡死。
三、測試
1、線程池實現(xiàn)類,提供全局唯一的線程池實例
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 固定大小的線程池
*/
public class DisCardThreadPool
{
private static DisCardThreadPool disCardThreadPool=new DisCardThreadPool();
/*
* 將構(gòu)造方法訪問修飾符設為私有,禁止任意實例化。
*/
private DisCardThreadPool() {
}
/**
* 核心線程數(shù)
*/
int corePoolSize = 1;
/**
* 最大線程數(shù)
*/
int maximumPoolSize = 1;
/**
* 空閑線程存活時間
*/
long keepAliveTime = 10;
/*
* 線程池單例創(chuàng)建方法
*/
public static DisCardThreadPool newInstance() {
return disCardThreadPool;
}
private final ThreadPoolExecutor mThreadPool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10), new ThreadPoolExecutor.DiscardPolicy());
public void execute(Runnable r){
mThreadPool.execute(r);
}
/**
* 隊列中等待執(zhí)行的任務數(shù)目
* @return
*/
public synchronized int getQueue(){
return mThreadPool.getQueue().size();
}
/*
* 獲取線程池中剩余線程數(shù)目
* 獲取的結(jié)果不準確
*/
public synchronized int getActiveCount(){
return mThreadPool.getActiveCount();
}2、測試類
這里通過CountDownLatch類同時啟動多個線程來模擬并發(fā)請求
import com.teriste.service.threadpool.DisCardThreadPool;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* 模擬并發(fā)向線程池提交任務。
* 需求:使得線程池滿之后其他請求都不執(zhí)行
*/
public class MultiThreadConcurrencyTest {
//獲取線程池實例
private static DisCardThreadPool threadPool=DisCardThreadPool.newInstance();
@Test
public void test(){
//創(chuàng)建大小20的計數(shù)器,使得20個線程同時執(zhí)行,模擬并發(fā)
CountDownLatch countDownLatch=new CountDownLatch(20);
for (int i=0;i<20;i++){
InvokeThread thread=new InvokeThread(countDownLatch);
System.out.println("創(chuàng)建線程:"+thread.getName());
thread.start();
//啟動一個線程,計數(shù)器就減一,同時在線程的run方法中阻塞線程,等待計數(shù)器喚醒
countDownLatch.countDown();
}
try {
//阻塞主線程,防止子線程還沒執(zhí)行主線程結(jié)束導致子線程無法執(zhí)行
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//線程池執(zhí)行線程
public static void invokeThread(){
//當排隊隊列有線程等待時不繼續(xù)添加線程
synchronized (MultiThreadConcurrencyTest.class){
//System.out.println("等待隊列大小:"+threadPool.getQueue());
//官方api指出getActiveCount()無法獲取準確的存獲線程數(shù)
//因為這里是根據(jù)隊列中待執(zhí)行任務數(shù)來判斷,因此如果線程池大小為1,實際上會有兩個線程被執(zhí)行,
//一個線程是進入線程池,還有一個線程判斷此時隊列待執(zhí)行線程數(shù)是0會進入待執(zhí)行隊列,因此最終執(zhí)行線程數(shù)是線程池大小+1
System.out.println("排隊隊列中的線程個數(shù):"+threadPool.getQueue());
if (threadPool.getQueue()<=0){
threadPool.execute(new WorkThread());
}
}
}
}
//調(diào)用線程池執(zhí)行任務的類,模擬外部請求實體發(fā)起請求
class InvokeThread extends Thread{
private CountDownLatch countDownLatch;
public InvokeThread(CountDownLatch countDownLatch){
this.countDownLatch=countDownLatch;
}
@Override
public void run(){
try {
//等待計數(shù)器喚醒
countDownLatch.await();
//向線程池提交線程
MultiThreadConcurrencyTest.invokeThread();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//任務類
class WorkThread implements Runnable{
@Override
public void run() {
String path="E:\\測試";
File file=new File(path);
if(!file.exists()){
file.mkdirs();//創(chuàng)建目錄
}
String fileName=Thread.currentThread().getName()+System.currentTimeMillis();
File newFile=new File(path,fileName);
try {
newFile.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
//在線程池中有可能是不同的線程使用相同的名稱
//因為線程池中上個結(jié)束的線程繼續(xù)使用來執(zhí)行下個線程
System.out.println("當前執(zhí)行的線程的名稱:"+Thread.currentThread().getName()+fileName);
}
}

3、如果放開對排隊隊列的判斷可以看到,當線程池滿了之后執(zhí)行的是ArrayBlockingQueue.offer(E e);方法:

這說明我們可以通過繼承ArrayBlockingQueue類實現(xiàn)自己的排隊隊列,當線程池滿了之后調(diào)用offer方法時,我們直接丟棄任務什么都不做,這樣就可以準確實現(xiàn)上面的方案,并且可以去掉對隊列中待執(zhí)行線程的判斷,從而不需要加鎖,提高執(zhí)行效率。
下面是自定義隊列的實現(xiàn):
import java.util.Collection;
import java.util.concurrent.ArrayBlockingQueue;
/**
* 線程池使用該類時執(zhí)行插入方法時不會向隊列中插入數(shù)據(jù),會直接丟棄或記錄日志
*/
public class EmptyArrayBlockingQueue<E> extends ArrayBlockingQueue{
public EmptyArrayBlockingQueue(int capacity) {
super(capacity);
}
public EmptyArrayBlockingQueue(int capacity, boolean fair) {
super(capacity, fair);
}
public EmptyArrayBlockingQueue(int capacity, boolean fair, Collection c) {
super(capacity, fair, c);
}
/**
* 注意這里重寫的父類方法參數(shù)是泛型參數(shù)
* 由于Java的類型擦除,在編譯時會自動變?yōu)镺bject類型
* 因此這里使用Object類型實際上就是重寫的父類方法
* @param e
* @return
*/
@Override
public boolean offer(Object e) {
/**不執(zhí)行將線程加入隊列的操作,這樣隊列永遠為空
超過線程池核心線程數(shù)的線程實際上在這里都被丟棄了
可以增加記錄日志的操作
*/
return true;
}
}下面是修改后的線程池類:
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 固定大小的線程池
*/
public class DisCardThreadPool
{
private static DisCardThreadPool disCardThreadPool=new DisCardThreadPool();
/*
* 將構(gòu)造方法訪問修飾符設為私有,禁止任意實例化。
*/
private DisCardThreadPool() {
}
/**
* 核心線程數(shù)
*/
int corePoolSize = 1;
/**
* 最大線程數(shù)
*/
int maximumPoolSize = 1;
/**
* 空閑線程存活時間
*/
long keepAliveTime = 10;
/*
* 線程池單例創(chuàng)建方法
*/
public static DisCardThreadPool newInstance() {
return disCardThreadPool;
}
private final ThreadPoolExecutor mThreadPool=new ThreadPoolExecutor(corePoolSize,maximumPoolSize,keepAliveTime, TimeUnit.MILLISECONDS,new EmptyArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardPolicy());
public void execute(Runnable r){
mThreadPool.execute(r);
}
/**
* 隊列中等待執(zhí)行的任務數(shù)目
* @return
*/
public synchronized int getQueue(){
return mThreadPool.getQueue().size();
}
/*
* 獲取線程池中剩余線程數(shù)目
* 獲取的結(jié)果不準確
*/
public synchronized int getActiveCount(){
return mThreadPool.getActiveCount();
}
}下面是測試類:
import com.teriste.service.threadpool.DisCardThreadPool;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
/**
* 模擬并發(fā)向線程池提交任務。
* 需求:使得線程池滿之后其他請求都不執(zhí)行
*/
public class MultiThreadConcurrencyTest {
//獲取線程池實例
private static DisCardThreadPool threadPool=DisCardThreadPool.newInstance();
@Test
public void test(){
//創(chuàng)建大小20的計數(shù)器,使得20個線程同時執(zhí)行,模擬并發(fā)
CountDownLatch countDownLatch=new CountDownLatch(20);
for (int i=0;i<20;i++){
InvokeThread thread=new InvokeThread(countDownLatch);
System.out.println("創(chuàng)建線程:"+thread.getName());
thread.start();
//啟動一個線程,計數(shù)器就減一,同時在線程的run方法中阻塞線程,等待計數(shù)器喚醒
countDownLatch.countDown();
}
try {
//阻塞主線程,防止子線程還沒執(zhí)行主線程結(jié)束導致子線程無法執(zhí)行
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//線程池執(zhí)行線程
public static void invokeThread(){
//當排隊隊列有線程等待時不繼續(xù)添加線程
synchronized (MultiThreadConcurrencyTest.class){
//System.out.println("等待隊列大小:"+threadPool.getQueue());
//官方api指出getActiveCount()無法獲取準確的存獲線程數(shù)
//因為這里是根據(jù)隊列中待執(zhí)行任務數(shù)來判斷,因此如果線程池大小為1,實際上會有兩個線程被執(zhí)行,
//一個線程是進入線程池,還有一個線程判斷此時隊列待執(zhí)行線程數(shù)是0會進入待執(zhí)行隊列,因此最終執(zhí)行線程數(shù)是線程池大小+1
System.out.println("排隊隊列中的線程個數(shù):"+threadPool.getQueue());
//if (threadPool.getQueue()<=0){
threadPool.execute(new WorkThread());
//}
}
}
}
//調(diào)用線程池執(zhí)行任務的類,模擬外部請求實體發(fā)起請求
class InvokeThread extends Thread{
private CountDownLatch countDownLatch;
public InvokeThread(CountDownLatch countDownLatch){
this.countDownLatch=countDownLatch;
}
@Override
public void run(){
try {
//等待計數(shù)器喚醒
countDownLatch.await();
//向線程池提交線程
MultiThreadConcurrencyTest.invokeThread();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//任務類
class WorkThread implements Runnable{
@Override
public void run() {
String path="E:\\測試";
File file=new File(path);
if(!file.exists()){
file.mkdirs();//創(chuàng)建目錄
}
String fileName=Thread.currentThread().getName()+System.currentTimeMillis();
File newFile=new File(path,fileName);
try {
newFile.createNewFile();
} catch (IOException e) {
e.printStackTrace();
}
//在線程池中有可能是不同的線程使用相同的名稱
//因為線程池中上個結(jié)束的線程繼續(xù)使用來執(zhí)行下個線程
System.out.println("當前執(zhí)行的線程的名稱:"+Thread.currentThread().getName()+fileName);
}
}測試結(jié)果:

從測試結(jié)果可以看到,隊列中永遠沒有線程被加入,即使線程池已滿,也不會導致被加入排隊隊列,實現(xiàn)了只有線程池存在空閑線程的時候才會接受新任務的需求。
到此這篇關(guān)于Java多線程并發(fā)之線程池任務請求攔截測試實例的文章就介紹到這了,更多相關(guān)Java線程池任務請求攔截測試內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java.io.File的renameTo方法移動文件失敗的解決方案
這篇文章主要介紹了java.io.File的renameTo方法移動文件失敗的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-07-07
關(guān)于Http持久連接和HttpClient連接池的深入理解
眾所周知,httpclient是java開發(fā)中非常常見的一種訪問網(wǎng)絡資源的方式了,下面這篇文章主要給大家介紹了關(guān)于Http持久連接和HttpClient連接池的相關(guān)資料,文中通過示例代碼介紹的非常詳細,需要的朋友可以參考借鑒,下面隨著小編來一起學習學習吧2018-05-05
Java 如何讀取Excel格式xls、xlsx數(shù)據(jù)工具類
這篇文章主要介紹了Java 如何讀取Excel格式xls、xlsx數(shù)據(jù)工具類的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-09-09
MyBatis-Plus+達夢數(shù)據(jù)庫實現(xiàn)高效數(shù)據(jù)持久化的示例
這篇文章主要介紹了MyBatis-Plus和達夢數(shù)據(jù)庫實現(xiàn)高效數(shù)據(jù)持久化,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2023-08-08
如何解決java.net.BindException:地址已在使用問題
當Zookeeper啟動報錯“java.net.BindException:地址已在使用”時,通常是因為指定的端口已被其他進程占用,解決這個問題需要按照以下步驟操作:首先,使用命令如lsof -i:2181找到占用該端口的進程號;其次,使用kill命令終止該進程2024-09-09

