Java利用多線程和分塊實現(xiàn)快速讀取文件
背景
在工作中經(jīng)常會有接收文件并且讀取落庫的需求,讀取方式都是串行讀取,即一行行的讀取,如果文件小還可以,但是如果文件比較大,類似于全量文件的話,這樣的讀取就會非常效率低。
本文主要介紹的是如何正確的將文件分塊,多線程的實現(xiàn)方式有多種,這里用的是CompletableFuture
方法
因為我們文件里面的每條數(shù)據(jù)之間沒有任何依賴關(guān)系也不存在順序要求。如何提高讀取速度,第一個想到當然就是并行讀取文件,并行讀取的前提就是要給文件分塊,讓每個線程只讀取對應分塊的數(shù)據(jù),先看看我們的文件格式

可以看見我們的文件格式每一行的長度不一,同時文件也無法像TCP通過指定數(shù)據(jù)體的長度來讀取數(shù)據(jù),所以如何能正確的分塊是整個方法的關(guān)鍵,如果分多或者分少了就會導致數(shù)據(jù)讀取錯誤的可能。

可以看見錯誤的分塊就會導致我們讀取的數(shù)據(jù)會被截取掉一部分,截取掉多少都是隨機的。這里我們用的方法是用填充來讓每個分塊都是正確的。具體來說就是我們在分塊的時候,判斷一下當前的分塊位置會不會導致數(shù)據(jù)被截取,因為我們的數(shù)據(jù)是一行行的,所以最好的分塊位置都是分在了行尾。如果說當前的分塊位置是在一行的中間的話,那我們就要移動這個分塊的位置到這行的行尾去
private static int THREAD_NUM = 5; long total = file.length(); long chunkSize = total < THREAD_NUM ? total : total / THREAD_NUM;
先確定要用幾個線程并行讀取,然后根據(jù)線程數(shù)和文件的大小來確定每一塊的大小,接下來就進行判斷是否需要填充
private static long padding(long start, long chunkSize, File file) {
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")){
randomAccessFile.seek(start + chunkSize);
boolean eol = false;
//判斷當前的位置是否需要填充,如果當前沒有數(shù)據(jù)或者是行尾,則不需要填充
switch (randomAccessFile.read()) {
case -1:
case '\n':
eol = true;
break;
case '\r':
eol = true;
break;
default:
break;
}
//如果符合填充條件,對其進行填充,首先是讀取一行數(shù)據(jù),然后計算出這行數(shù)據(jù)的長度,然后將這行數(shù)據(jù)的長度加上前面讀取了一字節(jié),然后將這些長度加到chunkSize上
if (!eol){
String readLine = randomAccessFile.readLine();
chunkSize += readLine.getBytes().length;
//加上前面讀取的一字節(jié)
chunkSize += 1;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return chunkSize;
}如何填充的可以看看代碼注釋,是參照了readline的實現(xiàn)思路。 經(jīng)過填充后就可以確保每塊的分塊的最后一個位置都是在行尾。將每個分塊的起始位置和分塊大小儲存起來再結(jié)合上CompletableFuture就可以多線程分塊讀取文件了
Map<Long, Long> chunkMap = new HashMap<>();
for (int i = 0; i < THREAD_NUM; i++) {
chunkSize = padding(start, chunkSize, file);
chunkMap.put(start, chunkSize);
start += chunkSize;
}
CompletableFuture.allOf(chunkMap.entrySet().stream().map(entry -> CompletableFuture.runAsync( () -> handlerReportTreeBaseData(entry.getKey(), entry.getValue()))).toArray(CompletableFuture[]::new))
.exceptionally(throwable -> {
System.out.println(throwable.getMessage());
return null;
}).join();完整實現(xiàn)
接下來給出整個的實現(xiàn)代碼,歡迎大家看看有沒有什么我沒有考慮到的,有可能的隱藏BUG和還能優(yōu)化改善的地方,歡迎討論
public class SpiltFIle {
private static int THREAD_NUM = 5;
private static void splitChunks() {
File file = new File("test.txt");
long total = file.length();
long chunkSize = total < THREAD_NUM ? total : total / THREAD_NUM;
long start = 0;
Map<Long, Long> chunkMap = new HashMap<>();
for (int i = 0; i < THREAD_NUM; i++) {
chunkSize = padding(start, chunkSize, file);
handlerReportTreeBaseData(start, chunkSize);
chunkMap.put(start, chunkSize);
start += chunkSize;
}
CompletableFuture.allOf(chunkMap.entrySet().stream().map(entry -> CompletableFuture.runAsync( () -> handlerReportTreeBaseData(entry.getKey(), entry.getValue()))).toArray(CompletableFuture[]::new))
.exceptionally(throwable -> {
System.out.println(throwable.getMessage());
return null;
}).join();
}
private static long padding(long start, long chunkSize, File file) {
try (RandomAccessFile randomAccessFile = new RandomAccessFile(file, "r")){
randomAccessFile.seek(start + chunkSize);
boolean eol = false;
//判斷當前的位置是否需要填充,如果當前沒有數(shù)據(jù)或者是行尾,則不需要填充
switch (randomAccessFile.read()) {
case -1:
case '\n':
eol = true;
break;
case '\r':
eol = true;
break;
default:
break;
}
//如果符合填充條件,對其進行填充,首先是讀取一行數(shù)據(jù),然后計算出這行數(shù)據(jù)的長度,然后將這行數(shù)據(jù)的長度加上前面讀取了一字節(jié),然后將這些長度加到chunkSize上
if (!eol){
String readLine = randomAccessFile.readLine();
chunkSize += readLine.getBytes().length;
chunkSize += 1; //加上前面讀取的一字節(jié)
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return chunkSize;
}
private static void handlerReportTreeBaseData(long start, long chunkSize) {
try (RandomAccessFile randomAccessFile = new RandomAccessFile("test.txt", "r")) {
randomAccessFile.seek(start);
long currentCount = 0L;
String line;
while (currentCount < chunkSize && (line = randomAccessFile.readLine()) != null){
if (!line.isEmpty()){
currentCount += line.getBytes().length + System.lineSeparator().getBytes().length;
System.out.println(line);
}
}
}catch (Exception ignored){
}
}
public static void main(String[] args) throws IOException {
SpiltFIle.splitChunks();
}
}最后也是能正常的讀取完文件

以上就是Java利用多線程和分塊實現(xiàn)快速讀取文件的詳細內(nèi)容,更多關(guān)于Java讀取文件的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
關(guān)于maven依賴 ${xxx.version}報錯問題
這篇文章主要介紹了關(guān)于maven依賴 ${xxx.version}報錯問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
Spring boot整合Springfox生成restful的在線api文檔
這篇文章主要為大家介紹了Spring boot整合Springfox生成restful在線api文檔,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步2022-03-03
JAVA JSP頁面技術(shù)之EL表達式整理歸納總結(jié)
這篇文章主要介紹了java中JSP頁面技術(shù)之EL表達式概念作用以及語法等的使用,需要的朋友可以參考2017-04-04
聊聊Spring循環(huán)依賴三級緩存是否可以減少為二級緩存的情況
這篇文章主要介紹了聊聊Spring循環(huán)依賴三級緩存是否可以減少為二級緩存的情況,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02

