Java多線程批量數(shù)據(jù)導(dǎo)入的方法詳解
前言:
當(dāng)遇到大量數(shù)據(jù)導(dǎo)入時,為了提高處理的速度,可以選擇使用多線程來批量處理這些處理。常見的場景有:
- 大文件導(dǎo)入數(shù)據(jù)庫(這個文件不一定是標準的CSV可導(dǎo)入文件或者需要在內(nèi)存中經(jīng)過一定的處理)
- 數(shù)據(jù)同步(從第三方接口拉取數(shù)據(jù)處理后寫入自己的數(shù)據(jù)庫)
以上的場景有一個共性,這類數(shù)據(jù)導(dǎo)入的場景簡單來說就是將數(shù)據(jù)從一個數(shù)據(jù)源移動到另外一個數(shù)據(jù)源,而其中必定可以分為兩步
- 數(shù)據(jù)讀?。簭臄?shù)據(jù)源讀取數(shù)據(jù)到內(nèi)存
- 數(shù)據(jù)寫入:將內(nèi)存中的數(shù)據(jù)寫入到另外一個數(shù)據(jù)源,可能存在數(shù)據(jù)處理
而且根據(jù)讀取的速度一般會比數(shù)據(jù)寫入的速度快很多,即讀取快,寫入慢。
設(shè)計思路
由于場景的特點是讀取快,寫入慢,如果是使用多線程處理,建議是數(shù)據(jù)寫入部分改造為多線程。而數(shù)據(jù)讀取可以改造成批量讀取數(shù)據(jù)。簡單來說就是兩個要點:
- 批量讀取數(shù)據(jù)
- 多線程寫入數(shù)據(jù)
示例
多線程批量處理最簡單的方案是使用線程池來進行處理,下面會通過一個模擬批量讀取和寫入的服務(wù),以及對這個服務(wù)的多線程寫入調(diào)用作為示例,展示如何多線程批量數(shù)據(jù)導(dǎo)入。
模擬服務(wù)
import java.util.concurrent.atomic.AtomicLong;
/**
* 數(shù)據(jù)批量寫入用的模擬服務(wù)
*
* @author RJH
* create at 2019-04-01
*/
public class MockService {
/**
* 可讀取總數(shù)
*/
private long canReadTotal;
/**
* 寫入總數(shù)
*/
private AtomicLong writeTotal=new AtomicLong(0);
/**
* 寫入休眠時間(單位:毫秒)
*/
private final long sleepTime;
/**
* 構(gòu)造方法
*
* @param canReadTotal
* @param sleepTime
*/
public MockService(long canReadTotal, long sleepTime) {
this.canReadTotal = canReadTotal;
this.sleepTime = sleepTime;
}
/**
* 批量讀取數(shù)據(jù)接口
*
* @param num
* @return
*/
public synchronized long readData(int num) {
long readNum;
if (canReadTotal >= num) {
canReadTotal -= num;
readNum = num;
} else {
readNum = canReadTotal;
canReadTotal = 0;
}
//System.out.println("read data size:" + readNum);
return readNum;
}
/**
* 寫入數(shù)據(jù)接口
*/
public void writeData() {
try {
// 休眠一定時間模擬寫入速度慢
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 寫入總數(shù)自增
System.out.println("thread:" + Thread.currentThread() + " write data:" + writeTotal.incrementAndGet());
}
/**
* 獲取寫入的總數(shù)
*
* @return
*/
public long getWriteTotal() {
return writeTotal.get();
}
}
批量數(shù)據(jù)處理器
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 基于線程池的多線程批量寫入處理器
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandler {
private ExecutorService executorService;
private MockService service;
/**
* 每次批量讀取的數(shù)據(jù)量
*/
private int batch;
/**
* 線程個數(shù)
*/
private int threadNum;
public SimpleBatchHandler(MockService service, int batch,int threadNum) {
this.service = service;
this.batch = batch;
//使用固定數(shù)目的線程池
this.executorService = Executors.newFixedThreadPool(threadNum);
}
/**
* 開始處理
*/
public void startHandle() {
// 開始處理的時間
long startTime = System.currentTimeMillis();
System.out.println("start handle time:" + startTime);
long readData;
while ((readData = service.readData(batch)) != 0) {// 批量讀取數(shù)據(jù),知道讀取不到數(shù)據(jù)才停止
for (long i = 0; i < readData; i++) {
executorService.execute(() -> service.writeData());
}
}
// 關(guān)閉線程池
executorService.shutdown();
while (!executorService.isTerminated()) {//等待線程池中的線程執(zhí)行完
}
// 結(jié)束時間
long endTime = System.currentTimeMillis();
System.out.println("end handle time:" + endTime);
// 總耗時
System.out.println("total handle time:" + (endTime - startTime) + "ms");
// 寫入總數(shù)
System.out.println("total write num:" + service.getWriteTotal());
}
}
測試類
/**
* SimpleBatchHandler的測試類
* @author RJH
* create at 2019-04-01
*/
public class SimpleBatchHandlerTest {
public static void main(String[] args) {
// 總數(shù)
long total=100000;
// 休眠時間
long sleepTime=100;
// 每次拉取的數(shù)量
int batch=100;
// 線程個數(shù)
int threadNum=16;
MockService mockService=new MockService(total,sleepTime);
SimpleBatchHandler handler=new SimpleBatchHandler(mockService,batch,threadNum);
handler.startHandle();
}
}
運行結(jié)果
start handle time:1554298681755 thread:Thread[pool-1-thread-2,5,main] write data:1 thread:Thread[pool-1-thread-1,5,main] write data:2 ...省略部分輸出 thread:Thread[pool-1-thread-4,5,main] write data:100000 end handle time:1554299330202 total handle time:648447ms total write num:100000
分析
在單線程情況下的執(zhí)行時間應(yīng)該為total*sleepTime,即10000000ms,而改造為多線程后執(zhí)行時間為648447ms。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Springboot?Vue實現(xiàn)單點登陸功能示例詳解
這篇文章主要為大家介紹了Springboot?Vue實現(xiàn)單點登陸功能示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-01-01
java拋出異常后,后續(xù)代碼是否繼續(xù)執(zhí)行詳解
這篇文章主要給大家介紹了關(guān)于java拋出異常后,后續(xù)代碼是否繼續(xù)執(zhí)行詳?shù)南嚓P(guān)資料,在Java編程中,異常是當(dāng)程序執(zhí)行時遇到問題時拋出的一種特殊情況,需要的朋友可以參考下2023-07-07
Java 中橋接模式——對象結(jié)構(gòu)型模式的實例詳解
這篇文章主要介紹了Java 中橋接模式——對象結(jié)構(gòu)型模式的實例詳解的相關(guān)資料,希望通過本文大家能掌握這部分知識,需要的朋友可以參考下2017-09-09
Springboot整合Java?DL4J實現(xiàn)交通標志識別系統(tǒng)全過程
在自動駕駛系統(tǒng)中,交通標志識別是實現(xiàn)車輛智能化的關(guān)鍵技術(shù)之一,本文介紹了利用SpringBoot和JavaDeeplearning4j構(gòu)建交通標志識別系統(tǒng)的方法,文中通過代碼介紹的非常詳細,需要的朋友可以參考下2024-10-10
如何開發(fā)基于Netty的HTTP/HTTPS應(yīng)用程序
HTTP/HTTPS是最常見的協(xié)議套件之一,并且隨著智能手機的成功,它的應(yīng)用也日益廣泛,因為對于任何公司來說,擁有一個可以被移動設(shè)備訪問的網(wǎng)站幾乎是必須的。下面就來看看如何開發(fā)基于Netty的HTTP/HTTPS應(yīng)用程序2021-06-06

