JAVA多線程之實現(xiàn)用戶任務(wù)排隊并預(yù)估排隊時長

實現(xiàn)流程

初始化一定數(shù)量的任務(wù)處理線程和緩存線程池,用戶每次調(diào)用接口,開啟一個線程處理。
假設(shè)初始化5個處理器,代碼執(zhí)行 BlockingQueue.take 時候,每次take都會處理器隊列就會減少一個,當處理器隊列為空時,take就是阻塞線程,當用戶處理某某任務(wù)完成時候,調(diào)用資源釋放接口,在處理器隊列put 一個處理器對象,原來阻塞的take ,就繼續(xù)執(zhí)行。
排隊論簡介
排隊論是研究系統(tǒng)隨機聚散現(xiàn)象和隨機系統(tǒng)工作工程的數(shù)學理論和方法,又稱隨機服務(wù)系統(tǒng)理論,為運籌學的一個分支。我們下面對排隊論做下簡化處理,先看下圖:

代碼具體實現(xiàn)
任務(wù)隊列初始化 TaskQueue
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 初始化隊列及線程池
* @author tarzan
*
*/
@Component
public class TaskQueue {
//處理器隊列
public static BlockingQueue<TaskProcessor> taskProcessors;
//等待任務(wù)隊列
public static BlockingQueue<CompileTask> waitTasks;
//處理任務(wù)隊列
public static BlockingQueue<CompileTask> executeTasks;
//線程池
public static ExecutorService exec;
//初始處理器數(shù)(計算機cpu可用線程數(shù))
public static Integer processorNum=Runtime.getRuntime().availableProcessors();
/**
* 初始化處理器、等待任務(wù)、處理任務(wù)隊列及線程池
*/
@PostConstruct
public static void initEquipmentAndUsersQueue(){
exec = Executors.newCachedThreadPool();
taskProcessors =new LinkedBlockingQueue<TaskProcessor>(processorNum);
//將空閑的設(shè)備放入設(shè)備隊列中
setFreeDevices(processorNum);
waitTasks =new LinkedBlockingQueue<CompileTask>();
executeTasks=new LinkedBlockingQueue<CompileTask>(processorNum);
}
/**
* 將空閑的處理器放入處理器隊列中
*/
private static void setFreeDevices(int num) {
//獲取可用的設(shè)備
for (int i = 0; i < num; i++) {
TaskProcessor dc=new TaskProcessor();
try {
taskProcessors.put(dc);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static CompileTask getWaitTask(Long clazzId) {
return get(TaskQueue.waitTasks,clazzId);
}
public static CompileTask getExecuteTask(Long clazzId) {
return get(TaskQueue.executeTasks,clazzId);
}
private static CompileTask get(BlockingQueue<CompileTask> users, Long clazzId) {
CompileTask compileTask =null;
if (CollectionUtils.isNotEmpty(users)){
Optional<CompileTask> optional=users.stream().filter(e->e.getClazzId().longValue()==clazzId.longValue()).findFirst();
if(optional.isPresent()){
compileTask = optional.get();
}
}
return compileTask;
}
public static Integer getSort(Long clazzId) {
AtomicInteger index = new AtomicInteger(-1);
BlockingQueue<CompileTask> compileTasks = TaskQueue.waitTasks;
if (CollectionUtils.isNotEmpty(compileTasks)){
compileTasks.stream()
.filter(e -> {
index.getAndIncrement();
return e.getClazzId().longValue() == clazzId.longValue();
})
.findFirst();
}
return index.get();
}
//單位秒
public static int estimatedTime(Long clazzId){
return estimatedTime(60,getSort(clazzId)+1);
}
//單位秒
public static int estimatedTime(int cellMs,int num){
int a= (num-1)/processorNum;
int b= cellMs*(a+1);
return b;
}
編譯任務(wù)類 CompileTask
import lombok.Data;
import org.springblade.core.tool.utils.SpringUtil;
import org.springblade.gis.common.enums.DataScheduleEnum;
import org.springblade.gis.dynamicds.service.DynamicDataSourceService;
import org.springblade.gis.modules.feature.schedule.service.DataScheduleService;
import java.util.Date;
@Data
public class CompileTask implements Runnable {
//當前請求的線程對象
private Long clazzId;
//用戶id
private Long userId;
//當前請求的線程對象
private Thread thread;
//綁定處理器
private TaskProcessor taskProcessor;
//任務(wù)狀態(tài)
private Integer status;
//開始時間
private Date startTime;
//結(jié)束時間
private Date endTime;
private DataScheduleService dataScheduleService= SpringUtil.getBean(DataScheduleService.class);
private DynamicDataSourceService dataSourceService= SpringUtil.getBean(DynamicDataSourceService.class);
@Override
public void run() {
compile();
}
/**
* 編譯
*/
public void compile() {
try {
//取出一個設(shè)備
TaskProcessor taskProcessor = TaskQueue.taskProcessors.take();
//取出一個任務(wù)
CompileTask compileTask = TaskQueue.waitTasks.take();
//任務(wù)和設(shè)備綁定
compileTask.setTaskProcessor(taskProcessor);
//放入
TaskQueue.executeTasks.put(compileTask);
System.out.println(DataScheduleEnum.DEAL_WITH.getName()+" "+userId);
//切換用戶數(shù)據(jù)源
dataSourceService.switchDataSource(userId);
//添加進度
dataScheduleService.addSchedule(clazzId, DataScheduleEnum.DEAL_WITH.getState());
} catch (InterruptedException e) {
System.err.println( e.getMessage());
}
}
}
任務(wù)處理器 TaskProcessor?
import lombok.Data;
import java.util.Date;
@Data
public class TaskProcessor {
/**
* 釋放
*/
public static Boolean release(CompileTask task) {
Boolean flag=false;
Thread thread=task.getThread();
synchronized (thread) {
try {
if(null!=task.getTaskProcessor()){
TaskQueue.taskProcessors.put(task.getTaskProcessor());
TaskQueue.executeTasks.remove(task);
task.setEndTime(new Date());
long intervalMilli = task.getEndTime().getTime() - task.getStartTime().getTime();
flag=true;
System.out.println("用戶"+task.getClazzId()+"耗時"+intervalMilli+"ms");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return flag;
}
}
}
Controller控制器接口實現(xiàn)
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springblade.core.tool.api.R;
import org.springblade.gis.multithread.TaskProcessor;
import org.springblade.gis.multithread.TaskQueue;
import org.springblade.gis.multithread.CompileTask;
import org.springframework.web.bind.annotation.*;
import java.util.Date;
@RestController
@RequestMapping("task")
@Api(value = "數(shù)據(jù)編譯任務(wù)", tags = "數(shù)據(jù)編譯任務(wù)")
public class CompileTaskController {
@ApiOperation(value = "添加等待請求 @author Tarzan Liu")
@PostMapping("compile/{clazzId}")
public R<Integer> compile(@PathVariable("clazzId") Long clazzId) {
CompileTask checkUser=TaskQueue.getWaitTask(clazzId);
if(checkUser!=null){
return R.fail("已經(jīng)正在排隊!");
}
checkUser=TaskQueue.getExecuteTask(clazzId);
if(checkUser!=null){
return R.fail("正在執(zhí)行編譯!");
}
//獲取當前的線程
Thread thread=Thread.currentThread();
//創(chuàng)建當前的用戶請求對象
CompileTask compileTask =new CompileTask();
compileTask.setThread(thread);
compileTask.setClazzId(clazzId);
compileTask.setStartTime(new Date());
//將當前用戶請求對象放入隊列中
try {
TaskQueue.waitTasks.put(compileTask);
} catch (InterruptedException e) {
e.printStackTrace();
}
TaskQueue.exec.execute(compileTask);
return R.data(TaskQueue.waitTasks.size()-1);
}
@ApiOperation(value = "查詢當前任務(wù)前還有多少任務(wù)等待 @author Tarzan Liu")
@PostMapping("sort/{clazzId}")
public R<Integer> sort(@PathVariable("clazzId") Long clazzId) {
return R.data(TaskQueue.getSort(clazzId));
}
@ApiOperation(value = "查詢當前任務(wù)預(yù)估時長 @author Tarzan Liu")
@PostMapping("estimate/time/{clazzId}")
public R<Integer> estimatedTime(@PathVariable("clazzId") Long clazzId) {
return R.data(TaskQueue.estimatedTime(clazzId));
}
@ApiOperation(value = "任務(wù)釋放 @author Tarzan Liu")
@PostMapping("release/{clazzId}")
public R<Boolean> release(@PathVariable("clazzId") Long clazzId) {
CompileTask task=TaskQueue.getExecuteTask(clazzId);
if(task==null){
return R.fail("資源釋放異常");
}
return R.status(TaskProcessor.release(task));
}
@ApiOperation(value = "執(zhí)行 @author Tarzan Liu")
@PostMapping("exec")
public R exec() {
Long start=System.currentTimeMillis();
for (Long i = 1L; i < 100; i++) {
compile(i);
}
System.out.println("消耗時間:"+(System.currentTimeMillis()-start)+"ms");
return R.status(true);
}
}
接口測試
根據(jù)任務(wù)id查詢該任務(wù)前還有多少個任務(wù)待執(zhí)行

根據(jù)任務(wù)id查詢該任務(wù)預(yù)估執(zhí)行完成的剩余時間,單位秒

補充知識
BlockingQueue
BlockingQueue即阻塞隊列,它是基于ReentrantLock,依據(jù)它的基本原理,我們可以實現(xiàn)Web中的長連接聊天功能,當然其最常用的還是用于實現(xiàn)生產(chǎn)者與消費者模式,大致如下圖所示:

在Java中,BlockingQueue是一個接口,它的實現(xiàn)類有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它們的區(qū)別主要體現(xiàn)在存儲結(jié)構(gòu)上或?qū)υ夭僮魃系牟煌?,但是對于take與put操作的原理,卻是類似的。
阻塞與非阻塞
入隊
offer(E e):如果隊列沒滿,立即返回true; 如果隊列滿了,立即返回false-->不阻塞
put(E e):如果隊列滿了,一直阻塞,直到隊列不滿了或者線程被中斷-->阻塞
offer(E e, long timeout, TimeUnit unit):在隊尾插入一個元素,,如果隊列已滿,則進入等待,直到出現(xiàn)以下三種情況:-->阻塞
被喚醒
等待時間超時
當前線程被中斷
出隊
poll():如果沒有元素,直接返回null;如果有元素,出隊
take():如果隊列空了,一直阻塞,直到隊列不為空或者線程被中斷-->阻塞
poll(long timeout, TimeUnit unit):如果隊列不空,出隊;如果隊列已空且已經(jīng)超時,返回null;如果隊列已空且時間未超時,則進入等待,直到出現(xiàn)以下三種情況:
被喚醒
等待時間超時
當前線程被中斷?
到此這篇關(guān)于JAVA多線程之實現(xiàn)用戶任務(wù)排隊并預(yù)估排隊時長的文章就介紹到這了,更多相關(guān)JAVA 多線程 用戶任務(wù)排隊內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
spring-boot-starter-thymeleaf加載外部html文件方式
本文介紹了在SpringMVC中使用Thymeleaf模板引擎加載外部HTML文件的方法,以及在Spring Boot中使用Thymeleaf的基本步驟,包括引入依賴、創(chuàng)建Controller、創(chuàng)建HTML文件、參數(shù)化訪問、熱加載和熱更新文件2025-02-02
mybatisplus實現(xiàn)自動創(chuàng)建/更新時間的項目實踐
Mybatis-Plus提供了自動填充功能,可以通過實現(xiàn)MetaObjectHandler接口來實現(xiàn)自動更新時間的功能,本文就來介紹一下mybatisplus實現(xiàn)自動創(chuàng)建/更新時間的項目實踐,感興趣的可以了解下2024-01-01
MyBatis-Puls插入或修改時某些字段自動填充操作示例
這篇文章主要為大家介紹了MyBatis-Puls插入或修改時某些字段自動填充操作示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12

