Java實(shí)現(xiàn)多任務(wù)執(zhí)行助手
本文實(shí)例為大家分享了Java實(shí)現(xiàn)多任務(wù)執(zhí)行助手的具體代碼,供大家參考,具體內(nèi)容如下
1.多線程執(zhí)行任務(wù)類
package com.visy.threadpool;
import com.visy.executor.ExecutorFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ThreadPoolConfig {
? ? private TheadPoolProperties theadPoolProperties;
? ? private ThreadPoolExecutor executor;
? ? private ThreadPoolExecutor executorChild;
? ? public ThreadPoolConfig(TheadPoolProperties theadPoolProperties) {
? ? ? ? this.theadPoolProperties = theadPoolProperties;
? ? ? ? this.executor = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize());
? ? ? ? this.executorChild = ExecutorFactory.getInstance().getThreadPoolExecutor("ThreadPoolConfig-service-child", theadPoolProperties.getQueueSize(), theadPoolProperties.getCoreThreadNum(), theadPoolProperties.getMaxPoolSize());
? ? }
? ? public <V> List<V> doConcurrentTask(List<Callable<V>> taskList, ThreadPoolExecutor... executorChilds) {
? ? ? ? if (taskList != null && !taskList.isEmpty()) {
? ? ? ? ? ? List<V> resultList = new ArrayList();
? ? ? ? ? ? List futureList = null;
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? if (this.executor.getQueue().size() >= this.theadPoolProperties.getQueueSize()) {
? ? ? ? ? ? ? ? ? ? throw new RuntimeException("queue size bigger than 100, now size is " + this.executor.getQueue().size());
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? if (executorChilds != null && executorChilds.length > 0 && executorChilds[0] != null) {
? ? ? ? ? ? ? ? ? ? futureList = executorChilds[0].invokeAll(taskList);
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? futureList = this.executor.invokeAll(taskList, (long)this.theadPoolProperties.getTimeOut(), TimeUnit.SECONDS);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (InterruptedException var6) {
? ? ? ? ? ? ? ? var6.printStackTrace();
? ? ? ? ? ? }
? ? ? ? ? ? this.doFutureList(resultList, futureList);
? ? ? ? ? ? return resultList;
? ? ? ? } else {
? ? ? ? ? ? return null;
? ? ? ? }
? ? }
? ? <V> void doFutureList(List<V> resultList, List<Future<V>> futureList) {
? ? ? ? if (futureList != null) {
? ? ? ? ? ? Iterator var3 = futureList.iterator();
? ? ? ? ? ? while(var3.hasNext()) {
? ? ? ? ? ? ? ? Future future = (Future)var3.next();
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? resultList.add(future.get());
? ? ? ? ? ? ? ? } catch (ExecutionException | InterruptedException var6) {
? ? ? ? ? ? ? ? ? ? var6.printStackTrace();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public <V> void doVoidConcurrentTask(List<Callable<V>> taskList) {
? ? ? ? if (taskList != null && !taskList.isEmpty()) {
? ? ? ? ? ? Iterator var2 = taskList.iterator();
? ? ? ? ? ? while(var2.hasNext()) {
? ? ? ? ? ? ? ? Callable<V> call = (Callable)var2.next();
? ? ? ? ? ? ? ? this.executor.submit(call);
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? public TheadPoolProperties getTheadPoolProperties() {
? ? ? ? return this.theadPoolProperties;
? ? }
? ? public ThreadPoolExecutor getExecutor() {
? ? ? ? return this.executor;
? ? }
? ? public ThreadPoolExecutor getExecutorChild() {
? ? ? ? return this.executorChild;
? ? }
? ? public void setTheadPoolProperties(TheadPoolProperties theadPoolProperties) {
? ? ? ? this.theadPoolProperties = theadPoolProperties;
? ? }
? ? public void setExecutor(ThreadPoolExecutor executor) {
? ? ? ? this.executor = executor;
? ? }
? ? public void setExecutorChild(ThreadPoolExecutor executorChild) {
? ? ? ? this.executorChild = executorChild;
? ? }
? ? public boolean equals(Object o) {
? ? ? ? if (o == this) {
? ? ? ? ? ? return true;
? ? ? ? } else if (!(o instanceof ThreadPoolConfig)) {
? ? ? ? ? ? return false;
? ? ? ? } else {
? ? ? ? ? ? ThreadPoolConfig other = (ThreadPoolConfig)o;
? ? ? ? ? ? if (!other.canEqual(this)) {
? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? label47: {
? ? ? ? ? ? ? ? ? ? Object this$theadPoolProperties = this.getTheadPoolProperties();
? ? ? ? ? ? ? ? ? ? Object other$theadPoolProperties = other.getTheadPoolProperties();
? ? ? ? ? ? ? ? ? ? if (this$theadPoolProperties == null) {
? ? ? ? ? ? ? ? ? ? ? ? if (other$theadPoolProperties == null) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? break label47;
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? } else if (this$theadPoolProperties.equals(other$theadPoolProperties)) {
? ? ? ? ? ? ? ? ? ? ? ? break label47;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? Object this$executor = this.getExecutor();
? ? ? ? ? ? ? ? Object other$executor = other.getExecutor();
? ? ? ? ? ? ? ? if (this$executor == null) {
? ? ? ? ? ? ? ? ? ? if (other$executor != null) {
? ? ? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } else if (!this$executor.equals(other$executor)) {
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? Object this$executorChild = this.getExecutorChild();
? ? ? ? ? ? ? ? Object other$executorChild = other.getExecutorChild();
? ? ? ? ? ? ? ? if (this$executorChild == null) {
? ? ? ? ? ? ? ? ? ? if (other$executorChild != null) {
? ? ? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } else if (!this$executorChild.equals(other$executorChild)) {
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? protected boolean canEqual(Object other) {
? ? ? ? return other instanceof ThreadPoolConfig;
? ? }
? ? public int hashCode() {
? ? ? ? int PRIME = true;
? ? ? ? int result = 1;
? ? ? ? Object $theadPoolProperties = this.getTheadPoolProperties();
? ? ? ? int result = result * 59 + ($theadPoolProperties == null ? 43 : $theadPoolProperties.hashCode());
? ? ? ? Object $executor = this.getExecutor();
? ? ? ? result = result * 59 + ($executor == null ? 43 : $executor.hashCode());
? ? ? ? Object $executorChild = this.getExecutorChild();
? ? ? ? result = result * 59 + ($executorChild == null ? 43 : $executorChild.hashCode());
? ? ? ? return result;
? ? }
? ? public String toString() {
? ? ? ? return "ThreadPoolConfig(theadPoolProperties=" + this.getTheadPoolProperties() + ", executor=" + this.getExecutor() + ", executorChild=" + this.getExecutorChild() + ")";
? ? }
}2.執(zhí)行器工廠類
package com.visy.executor;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ExecutorFactory {
? ? private static final Logger logger = LoggerFactory.getLogger(ExecutorFactory.class);
? ? private static final Map<String, ThreadPoolExecutor> threadPoolExecutorMap = new ConcurrentHashMap();
? ? private static final int DEFAULT_QUEUE_SIZE = 1000;
? ? private static final String DEFAULT_EXECUTOR_NAME = "default-executor";
? ? private static final int MAX_THREAD_NUM = 100;
? ? private static final int CORE_THREAD_NUM = 1;
? ? private static volatile ExecutorFactory instance;
? ? private ExecutorFactory() {
? ? }
? ? public static ExecutorFactory getInstance() {
? ? ? ? if (instance == null) {
? ? ? ? ? ? Class var0 = ExecutorFactory.class;
? ? ? ? ? ? synchronized(ExecutorFactory.class) {
? ? ? ? ? ? ? ? if (instance == null) {
? ? ? ? ? ? ? ? ? ? instance = new ExecutorFactory();
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return instance;
? ? }
? ? public ThreadPoolExecutor getThreadPoolExecutorByName(String name) {
? ? ? ? return (ThreadPoolExecutor)threadPoolExecutorMap.get(name);
? ? }
? ? public static Map<String, ThreadPoolExecutor> getThreadPoolExecutorMap() {
? ? ? ? return threadPoolExecutorMap;
? ? }
? ? public ThreadPoolExecutor getThreadPoolExecutor(String threadPoolExecutorName, int queueSize, int coreThreadNum, int maxPoolSize) {
? ? ? ? if (StringUtils.isBlank(threadPoolExecutorName)) {
? ? ? ? ? ? throw new IllegalArgumentException("thread name empty");
? ? ? ? } else {
? ? ? ? ? ? if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) {
? ? ? ? ? ? ? ? Class var5 = ExecutorFactory.class;
? ? ? ? ? ? ? ? synchronized(ExecutorFactory.class) {
? ? ? ? ? ? ? ? ? ? if (!threadPoolExecutorMap.containsKey(threadPoolExecutorName)) {
? ? ? ? ? ? ? ? ? ? ? ? ThreadPoolExecutor executor = (new ThreadPool(coreThreadNum, maxPoolSize, 30L, queueSize, threadPoolExecutorName)).getExecutor();
? ? ? ? ? ? ? ? ? ? ? ? threadPoolExecutorMap.put(threadPoolExecutorName, executor);
? ? ? ? ? ? ? ? ? ? ? ? logger.info("thread name: {} executor created", threadPoolExecutorName);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? return (ThreadPoolExecutor)threadPoolExecutorMap.get(threadPoolExecutorName);
? ? ? ? }
? ? }
? ? public <T extends Runnable> void submit(T t) {
? ? ? ? ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor();
? ? ? ? defaultExecutor.submit(t);
? ? }
? ? public <T extends Runnable> void submit(String poolName, T t) {
? ? ? ? ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName);
? ? ? ? if (executor == null) {
? ? ? ? ? ? logger.error("thread name: {} executor not exist.", poolName);
? ? ? ? ? ? throw new IllegalArgumentException("thread name:" + poolName + " executor not exist.");
? ? ? ? } else {
? ? ? ? ? ? executor.submit(t);
? ? ? ? }
? ? }
? ? public <T extends Callable<Object>> Future<Object> submit(T t) {
? ? ? ? ThreadPoolExecutor defaultExecutor = this.getThreadPoolExecutor();
? ? ? ? return defaultExecutor.submit(t);
? ? }
? ? public <T extends Callable<Object>> Future<Object> submit(String poolName, T t) {
? ? ? ? ThreadPoolExecutor executor = this.getThreadPoolExecutorByName(poolName);
? ? ? ? if (executor == null) {
? ? ? ? ? ? logger.error("thread poolName: {} executor not exist.", poolName);
? ? ? ? ? ? throw new IllegalArgumentException("thread poolName:" + poolName + " executor not exist.");
? ? ? ? } else {
? ? ? ? ? ? return executor.submit(t);
? ? ? ? }
? ? }
? ? public ThreadPoolExecutor getThreadPoolExecutor() {
? ? ? ? return this.getThreadPoolExecutor("default-executor", 1000, 1, 100);
? ? }
}3.多線程配置類
package com.visy.threadpool;
import javax.validation.constraints.NotNull;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
import org.springframework.validation.annotation.Validated;
@Validated
@Configuration
@ConfigurationProperties(prefix = "visy.threadpool")
public class TheadPoolProperties {
? ? // 執(zhí)行并行任務(wù)時(shí),等待多久時(shí)間超時(shí)(單位:秒)
? ? @NotNull
? ? private Integer timeOut;
? ? // 隊(duì)列大小
? ? @NotNull
? ? private Integer queueSize;?
? ? // 核心線程數(shù)量
? ? @NotNull
? ? private Integer coreThreadNum;
? ? // 線程池最大線程數(shù)量
? ? @NotNull
? ? private Integer maxPoolSize;
? ? // 并行執(zhí)行每組大小
? ? private Integer groupSize = 20;
? ? public TheadPoolProperties() {
? ? }
? ? public Integer getTimeOut() {
? ? ? ? return this.timeOut;
? ? }
? ? public Integer getQueueSize() {
? ? ? ? return this.queueSize;
? ? }
? ? public Integer getCoreThreadNum() {
? ? ? ? return this.coreThreadNum;
? ? }
? ? public Integer getMaxPoolSize() {
? ? ? ? return this.maxPoolSize;
? ? }
? ? public Integer getGroupSize() {
? ? ? ? return this.groupSize;
? ? }
? ? public void setTimeOut(Integer timeOut) {
? ? ? ? this.timeOut = timeOut;
? ? }
? ? public void setQueueSize(Integer queueSize) {
? ? ? ? this.queueSize = queueSize;
? ? }
? ? public void setCoreThreadNum(Integer coreThreadNum) {
? ? ? ? this.coreThreadNum = coreThreadNum;
? ? }
? ? public void setMaxPoolSize(Integer maxPoolSize) {
? ? ? ? this.maxPoolSize = maxPoolSize;
? ? }
? ? public void setGroupSize(Integer groupSize) {
? ? ? ? this.groupSize = groupSize;
? ? }
? ? public boolean equals(Object o) {
? ? ? ? if (o == this) {
? ? ? ? ? ? return true;
? ? ? ? } else if (!(o instanceof TheadPoolProperties)) {
? ? ? ? ? ? return false;
? ? ? ? } else {
? ? ? ? ? ? TheadPoolProperties other = (TheadPoolProperties)o;
? ? ? ? ? ? if (!other.canEqual(this)) {
? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? label71: {
? ? ? ? ? ? ? ? ? ? Object this$timeOut = this.getTimeOut();
? ? ? ? ? ? ? ? ? ? Object other$timeOut = other.getTimeOut();
? ? ? ? ? ? ? ? ? ? if (this$timeOut == null) {
? ? ? ? ? ? ? ? ? ? ? ? if (other$timeOut == null) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? break label71;
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? } else if (this$timeOut.equals(other$timeOut)) {
? ? ? ? ? ? ? ? ? ? ? ? break label71;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? Object this$queueSize = this.getQueueSize();
? ? ? ? ? ? ? ? Object other$queueSize = other.getQueueSize();
? ? ? ? ? ? ? ? if (this$queueSize == null) {
? ? ? ? ? ? ? ? ? ? if (other$queueSize != null) {
? ? ? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } else if (!this$queueSize.equals(other$queueSize)) {
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? label57: {
? ? ? ? ? ? ? ? ? ? Object this$coreThreadNum = this.getCoreThreadNum();
? ? ? ? ? ? ? ? ? ? Object other$coreThreadNum = other.getCoreThreadNum();
? ? ? ? ? ? ? ? ? ? if (this$coreThreadNum == null) {
? ? ? ? ? ? ? ? ? ? ? ? if (other$coreThreadNum == null) {
? ? ? ? ? ? ? ? ? ? ? ? ? ? break label57;
? ? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? } else if (this$coreThreadNum.equals(other$coreThreadNum)) {
? ? ? ? ? ? ? ? ? ? ? ? break label57;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? Object this$maxPoolSize = this.getMaxPoolSize();
? ? ? ? ? ? ? ? Object other$maxPoolSize = other.getMaxPoolSize();
? ? ? ? ? ? ? ? if (this$maxPoolSize == null) {
? ? ? ? ? ? ? ? ? ? if (other$maxPoolSize != null) {
? ? ? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } else if (!this$maxPoolSize.equals(other$maxPoolSize)) {
? ? ? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? Object this$groupSize = this.getGroupSize();
? ? ? ? ? ? ? ? Object other$groupSize = other.getGroupSize();
? ? ? ? ? ? ? ? if (this$groupSize == null) {
? ? ? ? ? ? ? ? ? ? if (other$groupSize == null) {
? ? ? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } else if (this$groupSize.equals(other$groupSize)) {
? ? ? ? ? ? ? ? ? ? return true;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? return false;
? ? ? ? ? ? }
? ? ? ? }
? ? }
? ? protected boolean canEqual(Object other) {
? ? ? ? return other instanceof TheadPoolProperties;
? ? }
? ? public int hashCode() {
? ? ? ? int PRIME = true;
? ? ? ? int result = 1;
? ? ? ? Object $timeOut = this.getTimeOut();
? ? ? ? int result = result * 59 + ($timeOut == null ? 43 : $timeOut.hashCode());
? ? ? ? Object $queueSize = this.getQueueSize();
? ? ? ? result = result * 59 + ($queueSize == null ? 43 : $queueSize.hashCode());
? ? ? ? Object $coreThreadNum = this.getCoreThreadNum();
? ? ? ? result = result * 59 + ($coreThreadNum == null ? 43 : $coreThreadNum.hashCode());
? ? ? ? Object $maxPoolSize = this.getMaxPoolSize();
? ? ? ? result = result * 59 + ($maxPoolSize == null ? 43 : $maxPoolSize.hashCode());
? ? ? ? Object $groupSize = this.getGroupSize();
? ? ? ? result = result * 59 + ($groupSize == null ? 43 : $groupSize.hashCode());
? ? ? ? return result;
? ? }
? ? public String toString() {
? ? ? ? return "TheadPoolProperties(timeOut=" + this.getTimeOut() + ", queueSize=" + this.getQueueSize() + ", coreThreadNum=" + this.getCoreThreadNum() + ", maxPoolSize=" + this.getMaxPoolSize() + ", groupSize=" + this.getGroupSize() + ")";
? ? }
}4.列表拆分工具類
package com.visy.utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.jar.Attributes;
/**
?* 列表或數(shù)組按指定大小分組,用于批量取一部分?jǐn)?shù)據(jù)循環(huán)處理
?*
?*/
public class ArraySplitUtil<T> {
? ? /**
? ? ?* 按指定大小對(duì)列表分組
? ? ?* @param list
? ? ?* @param splitSize
? ? ?* @return
? ? ?*/
? ? public List<List<T>> splistList(List<T> list, int splitSize) {
? ? ? ? if (null == list || list.size() == 0) {
? ? ? ? ? ? return null;
? ? ? ? }
? ? ? ? int listSize = list.size();
? ? ? ? List<List<T>> newList = new ArrayList<>();
? ? ? ? if (listSize < splitSize) {
? ? ? ? ? ? newList.add(list);
? ? ? ? ? ? return newList;
? ? ? ? }
? ? ? ? int addLength = splitSize;
? ? ? ? int times = listSize / splitSize;
? ? ? ? if (listSize % splitSize != 0) {
? ? ? ? ? ? times += 1;
? ? ? ? }
? ? ? ? int start = 0;
? ? ? ? int end = 0;
? ? ? ? int last = times - 1;
? ? ? ? for (int i = 0; i < times; i++) {
? ? ? ? ? ? start = i * splitSize;
? ? ? ? ? ? if (i < last) {
? ? ? ? ? ? ? ? end = start + addLength;
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? end = listSize;
? ? ? ? ? ? }
? ? ? ? ? ? newList.add(list.subList(start, end));
? ? ? ? }
? ? ? ? return newList;
? ? }
? ? /**
? ? ?* 按指定大小對(duì)數(shù)組分組
? ? ?* @param array
? ? ?* @param splitSize
? ? ?* @return
? ? ?*/
? ? public List<T[]> splistArray(T[] array, int splitSize) {
? ? ? ? if (null == array) {
? ? ? ? ? ? return null;
? ? ? ? }
? ? ? ? int listSize = array.length;
? ? ? ? List<T[]> newList = new ArrayList<>();
? ? ? ? if (listSize < splitSize) {
? ? ? ? ? ? newList.add(array);
? ? ? ? ? ? return newList;
? ? ? ? }
? ? ? ? int addLength = splitSize;
? ? ? ? int times = listSize / splitSize;
? ? ? ? if (listSize % splitSize != 0) {
? ? ? ? ? ? times += 1;
? ? ? ? }
? ? ? ? int start = 0;
? ? ? ? int end = 0;
? ? ? ? int last = times - 1;
? ? ? ? for (int i = 0; i < times; i++) {
? ? ? ? ? ? start = i * splitSize;
? ? ? ? ? ? if (i < last) {
? ? ? ? ? ? ? ? end = start + addLength;
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? end = listSize;
? ? ? ? ? ? }
? ? ? ? ? ? newList.add(Arrays.copyOfRange(array, start, end));
? ? ? ? }
? ? ? ? return newList;
? ? }
? ? public static <E> ArraySplitUtil<E> build(){
? ? ? ? return new ArraySplitUtil<>();
? ? }
}5.多任務(wù)執(zhí)行助手類
package com.visy.helper;
import com.baomidou.mybatisplus.toolkit.CollectionUtils;
import com.google.common.collect.Lists;
import com.visy.utils.ArraySplitUtil;
import com.visy.threadpool.ThreadPoolConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
/**
?* 多任務(wù)助手
?* @author visy.wang
?* @date 2022/5/9 14:38
?*/
@Service
public class MultiTaskHelper {
? ? @Autowired
? ? private ThreadPoolConfig threadPoolConfig;
? ? private static final Map<String,ArraySplitUtil<?>> ArraySplitUtilCache = new ConcurrentHashMap<>();
? ? public <I,O> List<List<O>> createAndRunListTask(List<I> list, Function<I,O> handler){
? ? ? ? return createAndRunListTask(list, null, handler);
? ? }
? ? public <I,O> List<List<O>> createAndRunListTaskV2(List<I> list, Function<List<I>, List<O>> handler){
? ? ? ? return createAndRunListTask(list, handler, null);
? ? }
? ? public <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<I> handler){
? ? ? ? createAndRunListTaskWithoutReturn(list, null, handler);
? ? }
? ? public <I> void createAndRunListTaskWithoutReturnV2(List<I> list, Consumer<List<I>> handler){
? ? ? ? createAndRunListTaskWithoutReturn(list, handler, null);
? ? }
? ? /**
? ? ?* 把列表按線程數(shù)分組
? ? ?* @param list 列表
? ? ?* @return 分組后的列表
? ? ?*/
? ? @SuppressWarnings("unchecked")
? ? private <T> List<List<T>> listSplit(List<T> list){
? ? ? ? String key = list.get(0).getClass().getName();
? ? ? ? int groupSize = threadPoolConfig.getTheadPoolProperties().getGroupSize();
? ? ? ? ArraySplitUtil<T> arraySplitUtil = (ArraySplitUtil<T>)ArraySplitUtilCache.get(key);
? ? ? ? if(Objects.isNull(arraySplitUtil)){
? ? ? ? ? ? arraySplitUtil = ArraySplitUtil.build();
? ? ? ? ? ? ArraySplitUtilCache.put(key, arraySplitUtil);
? ? ? ? }
? ? ? ? return arraySplitUtil.splistList(list, groupSize);
? ? }
? ? /**
? ? ?* 創(chuàng)建并運(yùn)行多任務(wù)
? ? ?* @param list 輸入數(shù)據(jù)列表
? ? ?* @param handler1 處理器1 (優(yōu)先級(jí)使用)
? ? ?* @param handler2 處理器2
? ? ?* @param <I> 輸入數(shù)據(jù)類型
? ? ?* @param <O> 輸出數(shù)據(jù)類型
? ? ?* @return 執(zhí)行結(jié)果分組列表
? ? ?*/
? ? private <I,O> List<List<O>> createAndRunListTask(List<I> list, ?Function<List<I>, List<O>> handler1, Function<I,O> handler2){
? ? ? ? List<List<I>> listGroup = listSplit(list);
? ? ? ? //設(shè)定每個(gè)組的任務(wù)
? ? ? ? List<Callable<List<O>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size());
? ? ? ? listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> {
? ? ? ? ? ? taskList.add(() -> {
? ? ? ? ? ? ? ? if(Objects.nonNull(handler1)){
? ? ? ? ? ? ? ? ? ? return handler1.apply(subList);
? ? ? ? ? ? ? ? }else if(Objects.nonNull(handler2)){
? ? ? ? ? ? ? ? ? ? return subList.stream().map(handler2).collect(Collectors.toList());
? ? ? ? ? ? ? ? }else{
? ? ? ? ? ? ? ? ? ? return null;
? ? ? ? ? ? ? ? }
? ? ? ? ? ? });
? ? ? ? });
? ? ? ? return threadPoolConfig.doConcurrentTask(taskList);
? ? }
? ? /**
? ? ?* 創(chuàng)建并運(yùn)行多任務(wù)(無(wú)返回結(jié)果)
? ? ?* @param list 輸入數(shù)據(jù)列表
? ? ?* @param handler1 處理器1 (優(yōu)先級(jí)更高)
? ? ?* @param handler2 處理器2
? ? ?* @param <I> 輸入數(shù)據(jù)類型
? ? ?*/
? ? private <I> void createAndRunListTaskWithoutReturn(List<I> list, Consumer<List<I>> handler1, Consumer<I> handler2){
? ? ? ? List<List<I>> listGroup = listSplit(list);
? ? ? ? //設(shè)定每個(gè)組的任務(wù)
? ? ? ? List<Callable<List<?>>> taskList = Lists.newArrayListWithExpectedSize(listGroup.size());
? ? ? ? listGroup.stream().filter(CollectionUtils::isNotEmpty).forEach(subList -> {
? ? ? ? ? ? taskList.add(() -> {
? ? ? ? ? ? ? ? if(Objects.nonNull(handler1)){
? ? ? ? ? ? ? ? ? ? handler1.accept(subList);
? ? ? ? ? ? ? ? }else if(Objects.nonNull(handler2)){
? ? ? ? ? ? ? ? ? ? subList.forEach(handler2);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? return null;
? ? ? ? ? ? });
? ? ? ? });
? ? ? ? threadPoolConfig.doConcurrentTask(taskList);
? ? }
}6.多任務(wù)助手使用:
@Autowired
package com.zoom.fleet.schedule.service;
import com.visy.helper.MultiTaskHelper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
?* 多任務(wù)助手使用示例
?* @author visy.wang
?* @date 2022/5/13 14:11
?*/
@Service
public class MultiTaskTest {
? ? @Autowired
? ? private MultiTaskHelper multiTaskHelper;
? ? private void test(){
? ? ? ? //待多任務(wù)執(zhí)行的數(shù)據(jù)列表
? ? ? ? List<String> idList = new ArrayList<>();
? ? ? ? //1.有返回結(jié)果的執(zhí)行方式一, 定義單個(gè)數(shù)據(jù)的處理邏輯,返回多任務(wù)執(zhí)行結(jié)果和合集
? ? ? ? List<List<Long>> resultList = multiTaskHelper.createAndRunListTask(idList, id->{
? ? ? ? ? ? //每一項(xiàng)數(shù)據(jù)的業(yè)務(wù)代碼
? ? ? ? ? ? return Long.valueOf(id);
? ? ? ? });
? ? ? ? //2.有返回結(jié)果的執(zhí)行方式二, 定義單個(gè)數(shù)線程的處理邏輯,返回多任務(wù)執(zhí)行結(jié)果和合集
? ? ? ? resultList = multiTaskHelper.createAndRunListTaskV2(idList, subIdList->{
? ? ? ? ? ? //每一個(gè)線程下列表操作的業(yè)務(wù)代碼
? ? ? ? ? ? return subIdList.stream().map(id->{
? ? ? ? ? ? ? ? //每一項(xiàng)數(shù)據(jù)的業(yè)務(wù)代碼
? ? ? ? ? ? ? ? return Long.valueOf(id);
? ? ? ? ? ? }).collect(Collectors.toList());
? ? ? ? });
? ? ? ? //3.無(wú)返回結(jié)果的執(zhí)行方式一, 定義單個(gè)數(shù)據(jù)的處理邏輯
? ? ? ? multiTaskHelper.createAndRunListTaskWithoutReturn(idList, id->{
? ? ? ? ? ? //每一項(xiàng)數(shù)據(jù)的業(yè)務(wù)代碼...
? ? ? ? });
? ? ? ? //3.無(wú)返回結(jié)果的執(zhí)行方式一, 定義單個(gè)數(shù)據(jù)的處理邏輯
? ? ? ? multiTaskHelper.createAndRunListTaskWithoutReturnV2(idList, subIdList->{
? ? ? ? ? ? subIdList.forEach(id->{
? ? ? ? ? ? ? ? //每一項(xiàng)數(shù)據(jù)的業(yè)務(wù)代碼...
? ? ? ? ? ? });
? ? ? ? ? ? //繼續(xù)操作subIdList...
? ? ? ? });
? ? }
}以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Maven?自動(dòng)化構(gòu)建的實(shí)現(xiàn)示例
本文主要介紹了Maven?自動(dòng)化構(gòu)建的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-05-05
swagger的請(qǐng)求參數(shù)不顯示,@Apimodel的坑點(diǎn)及解決
這篇文章主要介紹了swagger的請(qǐng)求參數(shù)不顯示,@Apimodel的坑點(diǎn)及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-11-11
SpringBoot接收J(rèn)SON類型的參數(shù)方式
這篇文章主要介紹了SpringBoot接收J(rèn)SON類型的參數(shù)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-03-03
使用cmd根據(jù)WSDL網(wǎng)址生成java客戶端代碼的實(shí)現(xiàn)
這篇文章主要介紹了使用cmd根據(jù)WSDL網(wǎng)址生成java客戶端代碼的實(shí)現(xiàn)方式,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-03-03
基于Java實(shí)現(xiàn)ssh命令登錄主機(jī)執(zhí)行shell命令過(guò)程解析
這篇文章主要介紹了基于Java實(shí)現(xiàn)ssh命令登錄主機(jī)執(zhí)行shell命令過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
spring boot 項(xiàng)目利用Jenkins實(shí)現(xiàn)自動(dòng)化部署的教程詳解
這篇文章主要介紹了spring boot 項(xiàng)目利用Jenkins實(shí)現(xiàn)自動(dòng)化部署的方法,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2018-07-07
SpringBoot配置數(shù)據(jù)庫(kù)密碼加密的實(shí)現(xiàn)
這篇文章主要介紹了SpringBoot配置數(shù)據(jù)庫(kù)密碼加密的實(shí)現(xiàn),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-03-03
maven無(wú)法依賴spring-cloud-stater-zipkin的解決方案
這篇文章主要介紹了maven無(wú)法依賴spring-cloud-stater-zipkin如何解決,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-05-05
解決Intellij IDEA覆蓋tomcat配置的問(wèn)題
分析并解決Intellij IDEA覆蓋tomcat配置的問(wèn)題/解決修改server.xml無(wú)效的問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友一起看看吧2021-02-02
springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制(踩坑經(jīng)驗(yàn))
這篇文章主要介紹了springboot + rabbitmq 如何實(shí)現(xiàn)消息確認(rèn)機(jī)制,本文給大家分享小編實(shí)際開發(fā)中的一點(diǎn)踩坑經(jīng)驗(yàn),內(nèi)容簡(jiǎn)單易懂,需要的朋友可以參考下2020-07-07

