java如何多線程批量更新10萬(wàn)級(jí)的數(shù)據(jù)
java多線程批量更新10萬(wàn)級(jí)的數(shù)據(jù)
好久沒(méi)有寫(xiě)文章,今天剛好沒(méi)啥事,就動(dòng)手記錄一下,好記性不如爛筆頭!言歸正傳,我最近接到的一個(gè)工作任務(wù)大概內(nèi)容是,有一張數(shù)據(jù)量在十萬(wàn)+級(jí)別的表,需要新增一個(gè)字段,并且要寫(xiě)入初始化值。
業(yè)務(wù)其實(shí)非常的簡(jiǎn)單,全部查詢出來(lái)一個(gè)列表,然后用mybatis的updateBatch批量更新,其實(shí)在我的實(shí)踐過(guò)程中也沒(méi)什么問(wèn)題,但是執(zhí)行的效率是很低的,而且一旦數(shù)據(jù)量過(guò)大,如果機(jī)器配置不太行的話,很可能會(huì)直接OOM,如果在正式環(huán)境出現(xiàn)這個(gè)問(wèn)題,那完?duì)僮?,?zhǔn)備刪庫(kù)跑路!
所以呢,我就想了一個(gè)比較保險(xiǎn)但是比較低級(jí)的辦法,每次查詢出5000條數(shù)據(jù),去做批量更新,確保內(nèi)存不會(huì)溢出導(dǎo)致服務(wù)崩盤(pán),這當(dāng)然也是可以解決問(wèn)題,但是就是修復(fù)數(shù)據(jù)需要執(zhí)行很多次,顯得比較愚蠢一點(diǎn)。
如何用比較方便并高效的方式來(lái)修復(fù)大數(shù)量的數(shù)據(jù)
第一反應(yīng)肯定是多線程啦,方案是:
- 1.查詢出全部的數(shù)據(jù)(10萬(wàn)條)
- 2.對(duì)數(shù)據(jù)進(jìn)行分批,每批5000條,
- 3.多線程同時(shí)處理多批數(shù)據(jù)
- 4.等待執(zhí)行完成,返回成功
直接上核心代碼
寫(xiě)一個(gè)通用的分批工具類,把一個(gè)List集合,拆分成多個(gè)小的List集合
/**
* 拆分集合
*
* @param <T> 泛型對(duì)象
* @param resList 需要拆分的集合
* @param subListLength 每個(gè)子集合的元素個(gè)數(shù)
* @return 返回拆分后的各個(gè)集合組成的列表
**/
public static <T> List<List<T>> splitList(List<T> resList, int subListLength) {
if (CollectionUtils.isEmpty(resList) || subListLength <= 0) {
return new ArrayList<>();
}
List<List<T>> ret = new ArrayList<>();
int size = resList.size();
if (size <= subListLength) {
// 數(shù)據(jù)量不足 subListLength 指定的大小
ret.add(resList);
} else {
int pre = size / subListLength;
int last = size % subListLength;
// 前面pre個(gè)集合,每個(gè)大小都是 subListLength 個(gè)元素
for (int i = 0; i < pre; i++) {
List<T> itemList = new ArrayList<>(subListLength);
for (int j = 0; j < subListLength; j++) {
itemList.add(resList.get(i * subListLength + j));
}
ret.add(itemList);
}
// last的進(jìn)行處理
if (last > 0) {
List<T> itemList = new ArrayList<>(last);
for (int i = 0; i < last; i++) {
itemList.add(resList.get(pre * subListLength + i));
}
ret.add(itemList);
}
}
return ret;
}然后就是用多線程業(yè)務(wù)處理了,代碼如下
public static void doThreadBusiness(List<String> totalList) {
Long startTime = System.currentTimeMillis();
System.out.println("本次更新任務(wù)開(kāi)始");
System.out.println("本機(jī)CPU核心數(shù):"+Runtime.getRuntime().availableProcessors());
List<String> updateList = new ArrayList();
// 初始化線程池, 參數(shù)一定要一定要一定要調(diào)好?。。?!
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 50,
4, TimeUnit.SECONDS, new ArrayBlockingQueue(10), new ThreadPoolExecutor.DiscardPolicy());
// 大集合拆分成N個(gè)小集合,然后用多線程去處理數(shù)據(jù),確保不會(huì)因?yàn)閿?shù)據(jù)量過(guò)大導(dǎo)致執(zhí)行過(guò)慢
List<List<String>> splitNList = SplitListUtils.splitList(totalList, 5000);
// 記錄單個(gè)任務(wù)的執(zhí)行次數(shù)
CountDownLatch countDownLatch = new CountDownLatch(splitNList.size());
// 對(duì)拆分的集合進(jìn)行批量處理, 先拆分的集合, 再多線程執(zhí)行
for (List<String> singleList : splitNList) {
// 線程池執(zhí)行
threadPool.execute(new Thread(new Runnable(){
@Override
public void run() {
//模擬執(zhí)行時(shí)間
System.out.println("當(dāng)前線程:"+Thread.currentThread().getName());
List<String> batchUpdateVipList = new ArrayList<>();
for (String str : singleList) {
//組裝要執(zhí)行的批量更新數(shù)據(jù)
batchUpdateVipList.add(str);
}
// 這里模擬執(zhí)行數(shù)據(jù)庫(kù)批量更新操作
System.out.println("本次批量更新數(shù)據(jù)量:"+ batchUpdateVipList.size());
// 任務(wù)個(gè)數(shù) - 1, 直至為0時(shí)喚醒a(bǔ)wait()
countDownLatch.countDown();
}
}));
}
try {
// 讓當(dāng)前線程處于阻塞狀態(tài),直到鎖存器計(jì)數(shù)為零
countDownLatch.await();
} catch (Exception e) {
System.out.println("系統(tǒng)出現(xiàn)異常");
}
Long endTime = System.currentTimeMillis();
Long useTime = endTime - startTime;
System.out.println("本次更新任務(wù)結(jié)束,共計(jì)用時(shí)"+useTime+"毫秒");
}代碼很 簡(jiǎn)單一看就懂了,這里要說(shuō)一下CountDownLatch的使用,其實(shí)開(kāi)發(fā)中并不常用,但是面試卻很常用,這里蠻寫(xiě)一下,我使用CountDownLatch來(lái)阻塞主線程,等待多線程執(zhí)行完畢后,再繼續(xù)主線程,返回更新的結(jié)果,這個(gè)場(chǎng)景其實(shí)很經(jīng)常使用到。
如果不用CountDownLatch,主線程會(huì)馬上返回,如果是數(shù)據(jù)量大的情況下,往往會(huì)執(zhí)行蠻久的,但是結(jié)果秒返回,就會(huì)給人一種錯(cuò)覺(jué)。
CountDownLatch countDownLatch = new CountDownLatch(splitNList.size());
在線程執(zhí)行完畢后,需要調(diào)用一下countDown,
// 任務(wù)個(gè)數(shù) - 1, 直至為0時(shí)喚醒a(bǔ)wait() countDownLatch.countDown();
在主線程用await()進(jìn)行阻塞等待,這樣主線程就會(huì)一直等到所有的子線程都執(zhí)行完成了,繼續(xù)執(zhí)行主線程的后續(xù)代碼
countDownLatch.await();
其實(shí)這里面還有一個(gè)非常重要的面試點(diǎn),就是多線程的七大參數(shù)如何設(shè)置,
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(10, 50, 4, TimeUnit.SECONDS, new ArrayBlockingQueue(10), new ThreadPoolExecutor.DiscardPolicy() );
有興趣了解具體是設(shè)置方法可以另行查詢資料,這也是面試必問(wèn)的考點(diǎn)。
最后貼一下返回的打印結(jié)果吧,如下圖所示:

總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java后端pageHelper分頁(yè)實(shí)現(xiàn)方法步驟
這篇文章主要給大家介紹了關(guān)于java后端pageHelper分頁(yè)實(shí)現(xiàn)方法的相關(guān)資料,在我們的Java項(xiàng)目中分頁(yè)是必不可少的數(shù)據(jù)展示頁(yè)面,文中通過(guò)圖文以及示例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07
Java關(guān)鍵字詳解之final static this super的用法
this用來(lái)調(diào)用目前類自身的成員變量,super多用來(lái)調(diào)用父類的成員,final多用來(lái)定義常量用的,static定義靜態(tài)變量方法用的,靜態(tài)變量方法只能被類本身調(diào)用,下文將詳細(xì)介紹,需要的朋友可以參考下2021-10-10
IDEA運(yùn)行java程序時(shí)總彈出提示配置Edit Configurations問(wèn)題
更新IDEA后,運(yùn)行Java程序時(shí)可能需要手動(dòng)配置JDK版本,通過(guò)統(tǒng)一設(shè)置默認(rèn)JDK可以解決每次彈出EditConfigurations的問(wèn)題,此操作可以簡(jiǎn)化開(kāi)發(fā)流程,提高效率2024-09-09
JAVA代碼實(shí)現(xiàn)MongoDB動(dòng)態(tài)條件之分頁(yè)查詢
這篇文章主要介紹了JAVA如何實(shí)現(xiàn)MongoDB動(dòng)態(tài)條件之分頁(yè)查詢,文中講解非常細(xì)致,代碼幫助大家更好的理解和學(xué)習(xí),感興趣的朋友可以了解下2020-07-07
Java設(shè)計(jì)模式開(kāi)發(fā)中使用觀察者模式的實(shí)例教程
這篇文章主要介紹了Java設(shè)計(jì)模式開(kāi)發(fā)中使用觀察者模式的實(shí)例教程,松耦合和邏輯清晰的消息監(jiān)聽(tīng)是觀察者模式的大特色,需要的朋友可以參考下2016-04-04

