一文帶你搞懂Node中的流
流是什么?
流,通俗來講就是數(shù)據(jù)流動(dòng),數(shù)據(jù)從一個(gè)地方緩慢的流到另一個(gè)地方。
舉個(gè)栗子,可以借助水管中的水流來輔助理解,當(dāng)打開水龍頭后,水便可以從源頭流出水龍頭;關(guān)閉水龍頭,水便不再流動(dòng)。
為什么需要流
那為什么會(huì)需要流吶?
其它介質(zhì)和內(nèi)存的數(shù)據(jù)規(guī)模不一致,例如磁盤的內(nèi)存往往遠(yuǎn)遠(yuǎn)大于內(nèi)存,因此磁盤中有可能會(huì)出現(xiàn)大于內(nèi)存的文件,此時(shí)內(nèi)存無法一次讀入該文件。這種情形可以把水庫比作磁盤,洗碗池比作內(nèi)存,如果不加限制,水庫的水量輕輕拿捏洗碗池,因此就需要水管來進(jìn)行傳輸,限制水的流量。

其他介質(zhì)和內(nèi)存的數(shù)據(jù)處理能力不一致,內(nèi)存的處理速度其他介質(zhì)很難比,內(nèi)存迅速處理數(shù)據(jù),一波流傳給硬盤,硬盤很難吃得消。

為了更深刻得理解流的作用,接下來我們來試一下不使用流需要如何進(jìn)行文件讀寫。
文件讀寫
首先我們來實(shí)現(xiàn)最簡單的文件拷貝功能,這個(gè)比較簡單,我們可以借助 fs 模塊的 readFile 和 writeFile 方法來實(shí)現(xiàn)。
readFile 和 writeFile 并沒有 promise 化,可以借助 util.promiseify 方法將其 promise 化,但這里并不是文章的重點(diǎn),因此依舊采用回調(diào)的方式
const fs = require("fs");
const path = require("path");
// 利用 path 上的方法組裝路徑
fs.readFile(path.resolve(__dirname, "test.txt"), (err, data) => {
if (err) return console.log("error", err);
fs.writeFile(path.resolve(__dirname, "result.txt"), data, () => {
console.log("拷貝成功");
});
});上面的代碼雖然成功實(shí)現(xiàn)了文件拷貝,但問題也很明顯,不適用于大文件,當(dāng)文件大于或接近內(nèi)存時(shí),會(huì)淹沒內(nèi)存,這也響應(yīng)了為什么需要流的第一點(diǎn)。
對(duì)于大文件,如何進(jìn)行讀寫那: 邊讀邊寫,讀一點(diǎn)寫一點(diǎn),這樣我們便可以控制文件讀寫的速率 ,也稱作分片讀寫??偟膩碚f就是邊讀邊寫。
分片讀寫
分片讀寫需要使用 fs 模塊中的 read,write,close,open 方法。
既然 fs 有方法可以實(shí)現(xiàn)邊讀邊寫,那為什么還會(huì)有流的出現(xiàn)的?這幾個(gè)方法太麻煩了,參數(shù)太多,這里只做一個(gè)演示。
首先來實(shí)現(xiàn)單個(gè)文字的讀寫。
// 創(chuàng)建一個(gè)存儲(chǔ)單位 1 的 Buffer 空間,來存儲(chǔ)中間讀取的數(shù)據(jù)
let buf = Buffer.alloc(1);
// 讀取源文件中的數(shù)據(jù)
fs.open(path.resolve(__dirname, "test.js"), "r", function (err, rfd) {
// rfd 可以理解為文字指針
// 你看到了嗎?6 個(gè)參數(shù),麻爪
// 甚至都有點(diǎn)解釋不動(dòng)
fs.read(rfd, buf, 0, 1, 0, function (err, bytesRead) {
// bytesRead讀取到的字節(jié)長度
// 讀取到的第一個(gè)數(shù)據(jù)存入 buf 中
console.log(buf); // <Buffer 31>
// 打開目標(biāo)文件。
fs.open(path.resolve(__dirname, "result.js"), "w", function (err, wfd) {
// 6 個(gè)參數(shù)
// 這里做的就是將 buf 內(nèi)容寫入 result
fs.write(wfd, buf, 0, 1, 0, function (err, bytesWritten) {
console.log("拷貝成功");
});
});
});
});上面的方法實(shí)現(xiàn)了單次數(shù)據(jù)的讀取,我們只需要重復(fù)這個(gè)過程就可以實(shí)現(xiàn)大文件的讀寫。
如何重復(fù)實(shí)現(xiàn)上述過程那?遞歸,沒錯(cuò),就是遞歸,將讀寫部分封裝成函數(shù),在寫成功的回調(diào)函數(shù)中再次調(diào)用該函數(shù)。
// source 源文件
// target 目標(biāo)文件
// cb 回調(diào)函數(shù)
// bufferSize buffer固定長度,即一次讀寫的數(shù)量
function copy(source, target, cb, bufferSize = 3) {
const SOURCE_PATH = path.resolve(__dirname, source);
const TARGET_PATH = path.resolve(__dirname, target);
let buf = Buffer.alloc(bufferSize); // 創(chuàng)建 buffer 實(shí)例
let rOffset = 0; // 讀取偏移量
let wOffset = 0; // 寫入偏移量
fs.open(SOURCE_PATH, "r", function (err, rfd) {
if (err) return cb(err);
fs.open(TARGET_PATH, "w", function (err, wfd) {
if (err) return cb(err);
// 遞歸讀寫函數(shù) next
function next() {
fs.read(rfd, buf, 0, bufferSize, rOffset, function (err, bytesRead) {
if (err) return cb(err);
// bytesRead 代表一次讀取的字節(jié)數(shù)
// 當(dāng) bytesRead 為 0 時(shí),代表文件已經(jīng)成功讀完
// 則可以停止讀寫操作,關(guān)閉文件
if (bytesRead == 0) {
let index = 0;
let done = () => {
if (++index == 2) {
cb();
}
};
fs.close(wfd, done);
fs.close(rfd, done);
return;
}
fs.write(
wfd,
buf,
0,
bytesRead,
wOffset,
function (err, bytesWritten) {
if (err) return cb(err);
// 讀取成功,并更新偏移量
rOffset += bytesRead;
wOffset += bytesWritten;
next();
}
);
});
}
next();
});
});
}
copy("test.js", "result.js", function (err) {
if (err) return console.log(err);
console.log("拷貝成功");
});這樣我們就成功地實(shí)現(xiàn)大文件分片讀寫,但可以明顯發(fā)現(xiàn):
- write/read 方法參數(shù)多,用起來非常繁瑣
- 上面的代碼有些回調(diào)地獄的傾向,不宜維護(hù)和擴(kuò)展
因此,流就出現(xiàn)了,下面一起來了解一下 nodejs 中的流。
可讀流及源碼編寫
node 中有四種流,下面我們來依次介紹一下,本文主要介紹 Readable 可讀流的使用及其源碼編寫。
Node.js 中的流同樣位于 fs 模塊
EventListener
Nodejs 中的流都繼承于 EventListener ,也就是說其工作原理都是基于發(fā)布訂閱模式。
Readable 可讀流
可讀流用于文件內(nèi)容的讀取,它主要有兩種讀取模式:
- 流動(dòng)模式: 可讀流自動(dòng)讀取數(shù)據(jù),通過
EventListener接口將數(shù)據(jù)傳遞給應(yīng)用 - 暫停模式: 這種模式下不會(huì)主動(dòng)通過
EventListener給應(yīng)用傳遞數(shù)據(jù),當(dāng)顯式調(diào)用stream.read后重啟數(shù)據(jù)流動(dòng)
通過 createReadStream 方法可以創(chuàng)建可讀流,該方法有兩個(gè)參數(shù):
- 參數(shù)一讀取文件的路徑
- 參數(shù)二是
options配置項(xiàng),該項(xiàng)有八個(gè)參數(shù),但日常我們只需要常用帶星號(hào)的幾個(gè)配置。 flags*:標(biāo)識(shí)位,默認(rèn)為 r;encoding:字符編碼,默認(rèn)為 null;fd:文件描述符,默認(rèn)為 null;mode:權(quán)限位,默認(rèn)為 0o666;autoClose:是否自動(dòng)關(guān)閉文件,默認(rèn)為 true;start:讀取文件的起始位置;end:讀取文件的(包含)結(jié)束位置;highWaterMark*:最大讀取文件的字節(jié)數(shù),默認(rèn)64 * 1024。
highWaterMark 是最值得注意的,它表示每次讀取的文件字節(jié)長度。
看起來流的參數(shù)很多,用起來會(huì)很復(fù)雜,那你就錯(cuò)了,下面來看個(gè)例子。
// 流是基于發(fā)布訂閱模式實(shí)現(xiàn)的
// 因此我們只需要訂閱對(duì)應(yīng)事件即可
const fs = require("fs");
const path = require("path");
// 返回一個(gè)可讀流
const rs = fs.createReadStream(path.resolve(__dirname, "test.txt"), {
highWaterMark: 3, // 每次讀取 3kb
});
// 文件打開的鉤子函數(shù)
rs.on("open", (fd) => {
console.log(fd); // 3
});
// 當(dāng)可讀流處于流動(dòng)模式時(shí),data 事件會(huì)不斷觸發(fā)
// 在這里我們可以獲取到讀取的數(shù)據(jù),進(jìn)行后續(xù)操作
rs.on("data", (chunk) => {
console.log(chunk);
});
rs.on("end", () => {
console.log("end"); // 結(jié)束事件
});data 事件會(huì)一直觸發(fā),也就是說在文件讀取完成前, data 會(huì)一直傳遞數(shù)據(jù),有時(shí)候我們并非需要一直讀取,例如讀取一下暫停一下,那該如何實(shí)現(xiàn)那?
// 借助 pause 和 resume 方法可以實(shí)現(xiàn)數(shù)據(jù)讀取的暫停與恢復(fù)
rs.on("data", function (data) {
// 讀取的數(shù)據(jù)為 buffer 類型
console.log(`讀取了 ${data.length} 字節(jié)數(shù)據(jù) : ${data.toString()}`);
//使流動(dòng)模式的流停止觸發(fā)'data'事件,切換出流動(dòng)模式,數(shù)據(jù)都會(huì)保留在內(nèi)部緩存中。
rs.pause();
//等待3秒后,再恢復(fù)觸發(fā)'data'事件,將流切換回流動(dòng)模式。
setTimeout(function () {
rs.resume();
}, 3000);
});下面我們來實(shí)現(xiàn)一下可讀流的源碼。
源碼實(shí)現(xiàn)
Step1: 定義可讀流
可讀流繼承于 EventListener ,因此我們首先建立 ReadStream 類繼承于 EventListener ,這樣 ReadStream 便可以使用 EventListener 類的方法。
EventListener 實(shí)現(xiàn)其實(shí)并不困難,小包前面的文章也講過 EventListener 源碼的解讀及編寫。
let fs = require("fs");
let EventEmitter = require("events");
class ReadStream extends EventEmitter {}Step2: 參數(shù)配置
可讀流有兩個(gè)參數(shù), path 路徑和 options 配置項(xiàng),我們把對(duì)應(yīng)的參數(shù)配置在類上,因此我們需要編寫一下構(gòu)造函數(shù)。
constructor(path, options = {}) {
// 使用繼承,子類必須調(diào)用 super 函數(shù)
super();
this.path = path; //指定要讀取的文件地址
this.highWaterMark = options.highWaterMark || 64 * 1024;
this.autoClose = options.autoClose || true; //是否自動(dòng)關(guān)閉文件
this.start = options.start || 0; // 從文件哪個(gè)位置開始讀取
this.end = options.end || null; // null表示沒傳遞
this.encoding = options.encoding || null;// buffer編碼
this.flags = options.flags || 'r';
}除了 ReadStream 所需的參數(shù)外,我們還需要添加幾個(gè)控制參數(shù)
pos: 記錄當(dāng)前文件讀取到的位置flowing: 當(dāng)前讀取的模式,true為流動(dòng)模式buffer: 每次讀取內(nèi)容的存儲(chǔ)位置
constructor() {
// ...
this.pos = this.start;
this.flowing = null;
this.buffer = Buffer.alloc(this.highWaterMark);
}Step3: 打開待讀文件
ReadStream 中分別使用 close、open、error 注冊(cè)事件來控制對(duì)應(yīng)行為的產(chǎn)生,當(dāng)打開文件后,觸發(fā) open 事件;打開失敗,觸發(fā) error 事件。
這里我們處理一下上面幾個(gè)事件的觸發(fā)時(shí)機(jī),使用 fs.open 方法來打開文件。
open() {
fs.open(this.path, this.flags, (err, fd) => {
if (err) {
if (this.autoClose) { // 如果需要自動(dòng)關(guān)閉則去關(guān)閉文件
this.destroy(); // 銷毀(關(guān)閉文件,觸發(fā)關(guān)閉事件)
}
this.emit('error', err); // 打開錯(cuò)誤,觸發(fā) error 事件
return;
}
this.fd = fd; // 保存文件描述符,方便后續(xù)輪詢判斷
this.emit('open', this.fd); // 文件打開,觸發(fā) open 事件
});
}Step4: 讀取文件內(nèi)容
上文提到, ReadStream 有兩種模式: 流動(dòng)模式和暫停模式,并用 flowing 屬性來標(biāo)識(shí)兩種模式。
ReadStream 通過監(jiān)聽 data 事件來啟動(dòng)文件讀取,即:
rs.on("data", (chunk) => {
console.log(chunk);
});這里實(shí)現(xiàn)有兩個(gè)難點(diǎn):
- 當(dāng)監(jiān)聽
data事件后,ReadStream才開啟數(shù)據(jù)讀取,那應(yīng)該如何監(jiān)聽data事件的注冊(cè)那? fs.open是異步讀取操作,因此有可能出現(xiàn)data事件觸發(fā)時(shí),文件還未讀取完畢,那我們應(yīng)該如何處理這種情況那?
一個(gè)問題一個(gè)問題來解決, EventListener 中提供了 newListener 事件,當(dāng)注冊(cè)新事件后,該事件的處理函數(shù)觸發(fā),因此我們可以監(jiān)聽該事件,判斷事件類型,如果為 data 事件,打開 flowing ,開始讀取
class ReadStream extends EventEmitter {
constructor(path, options) {
// 監(jiān)聽newListener事件,判斷當(dāng)前監(jiān)聽事件是否為 data 事件
// 如果為 data 事件,開啟文件讀取
this.on("newListener", (type) => {
if (type === "data") {
// 開啟流動(dòng)模式,開始讀取文件中的內(nèi)容
this.flowing = true;
this.read();
}
});
}
}
由于 data 事件的觸發(fā)可能發(fā)生在 fs.open 讀取之前,因此 read 函數(shù)中要做一個(gè) 輪詢操作 ,每次判斷是否成功讀取。
read() {
// 文件如果未打卡,fd 是沒有值的
if (typeof this.fd !== "number") {
// 如果文件未打開,觸發(fā) open 事件
return this.once("open", () => this.read());
}
}Step5: 編寫 read 方法
上面編寫完畢后,我們可以成功的監(jiān)聽到 data 事件,且可以打開文件,后續(xù)就可以進(jìn)行文件的讀取了。
文件讀取的內(nèi)容上文案例中提到過,即利用 fs.read 方法進(jìn)行讀取,下面直接在源碼上進(jìn)行解釋。
class ReadStream extends EventEmitter {
read() {
// 計(jì)算當(dāng)前讀取字節(jié)
const howManyToRead = this.end
? Math.min(this.highWaterMark, this.end - this.pos + 1)
: this.highWaterMark;
// 創(chuàng)建 buffer 實(shí)例
const buffer = Buffer.alloc(howManyToRead);
// 利用 fs.read 進(jìn)行文件內(nèi)容讀取
fs.read(
this.fd,
buffer,
0,
howManyToRead,
this.offset,
(err, bytesRead) => {
if (err) return this.destory(err);
this.pos += bytesRead;
// 可能存在最后一次的 buffer 大小 大于 實(shí)際數(shù)據(jù)大小的情況,所以使用slice來進(jìn)行截取
// 將讀取后的內(nèi)容傳遞給 data 事件
this.emit("data", buffer.slice(0, bytesRead));
}
);
}
}這樣便可以實(shí)現(xiàn)一次讀取,一次讀取完畢后,接著調(diào)用 read 方法就可以實(shí)現(xiàn)不斷讀取,即流動(dòng)模式
read() {
// ...
// 流動(dòng)模式下,循環(huán)進(jìn)行讀取
if (this.flowing) {
this.read();
}
}Step6: 流動(dòng)模式與暫停模式
ReadStream 使用 flowing 來控制可讀流的讀取與暫停,最后我們來實(shí)現(xiàn)可讀流的暫停和恢復(fù)。
pause() {
// 判斷當(dāng)前是否讀取完畢了
if (this.flowing) {
this.flowing = false;
}
}
resume() {
// 判斷當(dāng)前是否讀取完畢了
if (!this.flowing) {
this.flowing = true;
this.read();
}
}總結(jié)
本文詳細(xì)的講解了流的前因后果,流可以說是 node 的核心之一,對(duì)此我們需要完美掌握,靈活運(yùn)用。本文為了讓大家更深入的了解流,從源碼和應(yīng)用出發(fā),帶你全方位了解流??蓪懥鞯木帉懜幸馑迹梢詫W(xué)到更多東西,后續(xù)小包會(huì)繼續(xù)撰寫文章。
以上就是一文帶你搞懂Node中的流的詳細(xì)內(nèi)容,更多關(guān)于Node 流的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解nodejs微信公眾號(hào)開發(fā)——1.接入微信公眾號(hào)
本篇文章主要介紹了詳解nodejs微信公眾號(hào)開發(fā)——1.接入微信公眾號(hào),非常具有實(shí)用價(jià)值,需要的朋友可以參考下2017-04-04
淺談Node.js輕量級(jí)Web框架Express4.x使用指南
本篇文章主要介紹了淺談Node.js輕量級(jí)Web框架Express4.x使用指南,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-05-05
Node層模擬實(shí)現(xiàn)multipart表單的文件上傳示例
下面小編就為大家分享一篇Node層模擬實(shí)現(xiàn)multipart表單的文件上傳示例,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2018-01-01
基于NodeJS的前后端分離的思考與實(shí)踐(六)Nginx + Node.js + Java 的軟件棧部署實(shí)踐
關(guān)于前后端分享的思考,我們已經(jīng)有五篇文章闡述思路與設(shè)計(jì)。本文介紹淘寶網(wǎng)收藏夾將 Node.js 引入傳統(tǒng)技術(shù)棧的具體實(shí)踐。2014-09-09
用node開發(fā)并發(fā)布一個(gè)cli工具的方法步驟
這篇文章主要介紹了用node開發(fā)并發(fā)布一個(gè)cli工具的方法步驟,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2019-01-01
nodejs實(shí)現(xiàn)的一個(gè)簡單聊天室功能分享
這篇文章主要介紹了nodejs實(shí)現(xiàn)的一個(gè)簡單聊天室功能分享,本文使用了express和socket.io兩個(gè)庫結(jié)合實(shí)現(xiàn),需要的朋友可以參考下2014-12-12
詳解Node.js中exports和module.exports的區(qū)別
這篇文章主要介紹了詳解Node.js中exports和module.exports的區(qū)別,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-04-04
Nodejs學(xué)習(xí)筆記之測試驅(qū)動(dòng)
本文是本系列文章的第二篇,主要是測試針對(duì)于web后端的驅(qū)動(dòng),在開發(fā)過程中,在開發(fā)完成一段代碼后如果負(fù)責(zé)任而不是說完全把問題交給測試人員去發(fā)現(xiàn)的話,這個(gè)時(shí)候通常都會(huì)去做一些手動(dòng)的測試。2015-04-04
Sequelize中用group by進(jìn)行分組聚合查詢
大家都知道在SQL查詢中,分組查詢是較常用的一種查詢方式。分組查詢是指通過GROUP BY關(guān)鍵字,將查詢結(jié)果按照一個(gè)或多個(gè)字段進(jìn)行分組,分組時(shí)字段值相同的會(huì)被分為一組。在Node.js基于Sequelize的ORM框架中,同樣支持分組查詢,使用非常簡單方便。下面來看看詳細(xì)的介紹。2016-12-12

