Node.js中的流(Stream)的作用詳解
Node.js 中的流(Stream)是用來比喻數(shù)據(jù)傳輸?shù)囊环N形式,數(shù)據(jù)傳輸?shù)钠瘘c就是流的源頭,數(shù)據(jù)傳輸?shù)慕K點就是流的終點。例如在網(wǎng)頁發(fā)起一個 HTTP 請求,瀏覽器就是流的源頭,服務器就是流的終點。等服務器處理完請求,返回響應時,服務器就變成了流的源頭,瀏覽器變成了流的終點。
數(shù)據(jù)從一端連續(xù)不斷地傳輸?shù)搅硪欢耍拖袼粯訌囊欢肆鞯搅硪欢?,所以用流來比喻?shù)據(jù)的傳輸形式。只不過計算機中的流傳輸?shù)氖菙?shù)據(jù)(字節(jié)),而不是水。
在 Node.js 中,stream 模塊提供了用于實現(xiàn)流接口的 API。但是很多內置模塊都提供了關于流的 API,所以通常不需要顯式的調用 stream 模塊來使用流。
為什么要使用流
v1 版本示例程序
下面看一個簡單的示例:
const path = require('path')
printMemoryUsage()
fs.readFile(resolveFile('./test.txt'), (err, data) => {
if (err) throw err
printMemoryUsage()
fs.writeFile(resolveFile('./test2.txt'), data, err => {
if (err) throw err
console.log('done')
})
})
function resolveFile(filepath) {
return path.resolve(__dirname, filepath)
}
// 打印內存占用情況
function printMemoryUsage() {
const info = process.memoryUsage();
// heapTotal:對應v8的堆內存信息,是堆中總共申請的內存量。
// heapUsed:表示堆中使用的內存量。
// rss:是resident set size的縮寫,即常駐內存的部分。
console.log('rss=%s, heapTotal=%s, heapUsed=%s', formatMemory(info.rss), formatMemory(info.heapTotal), formatMemory(info.heapUsed));
}v1 版本的程序每次執(zhí)行時都得把整個 ./test.txt 文件讀取到內存,然后再把內容寫入到 ./test2.txt 文件。這個 ./test.txt 文件大小為 1.04 GB,下面的信息就是在拷貝過程中打印的內存占用信息。
rss=18.09MB, heapTotal=4.68MB, heapUsed=2.64MB rss=1011.52MB, heapTotal=7.18MB, heapUsed=2.36MB done
從這個信息可以看出,當程序讀取的文件越大,內存占用就越大(1011.52MB),因此會導致其他進程處理變慢以及過多的垃圾回收,甚至內存耗盡,導致程序崩潰。
v2 版本示例程序
如果用流來重寫 v1 程序,我們就可以避免內存占用過大的問題。因為流是可以一邊讀取數(shù)據(jù)一邊消費數(shù)據(jù)的,它不需要等到所有的數(shù)據(jù)都準備好。
// 可讀流
const readStream = fs.createReadStream(resolveFile('./test.txt'));
// 可寫流
const writeStream = fs.createWriteStream(resolveFile('./test2.txt'));
// 每讀取到一塊數(shù)據(jù),就會觸發(fā) data 事件
readStream.on('data', data => {
printMemoryUsage()
writeStream.write(data)
});
readStream.on('end', () => {
console.log('done')
});... rss=100.89MB, heapTotal=7.98MB, heapUsed=4.18MB rss=100.89MB, heapTotal=7.98MB, heapUsed=4.18MB rss=100.89MB, heapTotal=7.98MB, heapUsed=4.19MB done
從控制臺打印的信息來看,內存占用一直穩(wěn)定為 100.89 MB,沒有給系統(tǒng)造成太大的負擔。因此,在需要處理一些尺寸較大的文件時,使用流是最好的選擇。
v3 版本示例程序
但是 v2 程序也不完美,因為可讀流和可寫流的速率不一定相等。而 v2 程序在每次觸發(fā)可讀流的 data 事件時就向可寫流寫入數(shù)據(jù),這時可寫流的緩沖區(qū)有可能已經(jīng)滿了。如果繼續(xù)寫入更多的數(shù)據(jù),會導致內存占用越來越大,甚至內存耗盡,丟失數(shù)據(jù)。這個現(xiàn)象又叫背壓(Back pressure)。
在數(shù)據(jù)流從上游生產(chǎn)者向下游消費者傳輸?shù)倪^程中,上游生產(chǎn)速度大于下游消費速度,導致下游的 Buffer 溢出,這種現(xiàn)象就叫做 Backpressure。這句話的重點不在于「上游生產(chǎn)速度大于下游消費速度」,而在于「Buffer 溢出」。
如果出現(xiàn)這個現(xiàn)象,解決方案是什么呢?我們可以在寫入流緩沖區(qū)已經(jīng)滿載的情況下,暫??勺x流讀取數(shù)據(jù)的行為。這可以通過 write() 的返回值來判斷。
每個流在創(chuàng)建時都可以設置 highWaterMark 屬性的值(默認為16384,即 16 KB),這個值就是緩沖區(qū)閾值的大小??蓪懥鞯木彌_區(qū)如果超過了閾值,再調用 write() 寫入數(shù)據(jù)時,會返回 false;如果緩沖區(qū)未超過閾值,則返回 true。
因此我們可以把 v2 版本的程序改寫一下:
const readStream = fs.createReadStream(resolveFile('./test.txt'));
const writeStream = fs.createWriteStream(resolveFile('./test2.txt'));
readStream.on('data', data => {
printMemoryUsage()
if (!writeStream.write(data)) {
// 暫停讀取數(shù)據(jù)
readStream.pause()
// 當可寫流的緩沖區(qū)排空時,會觸發(fā) drain 事件
writeStream.once('drain', () => {
// 繼續(xù)讀取數(shù)據(jù)
readStream.resume()
});
}
});
readStream.on('end', () => {
console.log('done')
});然后看一下內存占用的信息:
... rss=84.20MB, heapTotal=7.98MB, heapUsed=4.75MB rss=84.20MB, heapTotal=7.98MB, heapUsed=4.76MB done
從上面的信息可以看出,v3 程序最大內存占用為 84.20 MB,比起上一版的內存占用更小,這就是優(yōu)化后的效果。
v4 版本示例程序
v3 版本的程序效果很好,但是要寫的代碼稍微有點多。還好流模塊提供了 pipe() 來幫我們做這件事:
const readStream = fs.createReadStream(resolveFile('../test.txt'));
const writeStream = fs.createWriteStream(resolveFile('../test2.txt'));
function resolveFile(filepath) {
return path.resolve(__dirname, filepath)
}
readStream.on('data', () => {
printMemoryUsage()
});
readStream.on('end', () => {
console.log('done')
});
readStream.pipe(writeStream)... rss=94.80MB, heapTotal=7.98MB, heapUsed=4.89MB rss=94.80MB, heapTotal=7.98MB, heapUsed=4.90MB rss=94.80MB, heapTotal=7.98MB, heapUsed=4.90MB done
pipe() 將可寫流綁定到可讀流,使其自動切換到流動模式并將其所有數(shù)據(jù)推送到綁定的可寫流。 數(shù)據(jù)流將被自動管理,以便目標可寫流不會被更快的可讀流漫過。也就是說,pipe() 將數(shù)據(jù)緩沖限制在可接受的水平,以便不同速度的來源和目標不會壓倒可用內存。
流的類型
Node.js 中有四種基本的流類型:
- Readable: 可讀流,可以從中讀取數(shù)據(jù)的流(例如,fs.createReadStream())。
- Writable: 可寫流,可以寫入數(shù)據(jù)的流(例如,fs.createWriteStream())。
- Duplex: 雙工流,Readable 和 Writable 的流(例如,net.Socket)。
- Transform: 可以在寫入和讀取數(shù)據(jù)時修改或轉換數(shù)據(jù)的 Duplex 流(例如,zlib.createDeflate())。
緩沖
Writable 和 Readable 流都將數(shù)據(jù)存儲在內部緩沖區(qū)中。
允許緩沖的數(shù)據(jù)量取決于傳給流的構造函數(shù)的 highWaterMark 選項。 對于普通的流,highWaterMark 選項指定字節(jié)的總數(shù)。
當實現(xiàn)調用 stream.push(chunk) 時,數(shù)據(jù)緩存在 Readable 流中。 如果流的消費者沒有調用 stream.read(),則數(shù)據(jù)會一直駐留在內部隊列中,直到被消費。
一旦內部讀取緩沖區(qū)的總大小達到 highWaterMark 指定的閾值,則流將暫時停止從底層資源讀取數(shù)據(jù),直到可以消費當前緩沖的數(shù)據(jù)(也就是,流將停止調用內部的用于填充讀取緩沖區(qū) readable._read() 方法)。
當重復調用 writable.write(chunk) 方法時,數(shù)據(jù)會緩存在 Writable 流中。 雖然內部的寫入緩沖區(qū)的總大小低于 highWaterMark 設置的閾值,但對 writable.write() 的調用將返回 true。 一旦內部緩沖區(qū)的大小達到或超過 highWaterMark,則將返回 false。
stream API 的一個關鍵目標,尤其是 stream.pipe() 方法,是將數(shù)據(jù)緩沖限制在可接受的水平,以便不同速度的來源和目標不會壓倒可用內存。
highWaterMark 選項是閾值,而不是限制:它規(guī)定了流在停止請求更多數(shù)據(jù)之前緩沖的數(shù)據(jù)量。 它通常不強制執(zhí)行嚴格的內存限制。 特定的流實現(xiàn)可能會選擇實施更嚴格的限制,但這樣做是可選的。
由于 Duplex 和 Transform 流都是 Readable 和 Writable,因此每個流都維護兩個獨立的內部緩沖區(qū),用于讀取和寫入,允許每一端獨立操作,同時保持適當且高效的數(shù)據(jù)流。 例如,net.Socket 實例是 Duplex 流,其 Readable 端允許消費從套接字接收的數(shù)據(jù),其 Writable 端允許將數(shù)據(jù)寫入套接字。 因為數(shù)據(jù)可能以比接收數(shù)據(jù)更快或更慢的速度寫入套接字,所以每一端都應該獨立于另一端進行操作(和緩沖)。
Readable
可讀流是對被消費的數(shù)據(jù)的來源的抽象。
Readable 流的示例包括:
- 客戶端上的 HTTP 響應
- 服務器上的 HTTP 請求
- 文件系統(tǒng)讀取流
- 壓縮流
- 加密流
- TCP 套接字
- 子進程的標準輸出和標準錯誤
- process.stdin
所有的 Readable 流都實現(xiàn)了 stream.Readable 類定義的接口。
Readable 流以兩種模式之一有效地運行:流動和暫停。在流動模式下,數(shù)據(jù)會自動從底層系統(tǒng)讀取,并通過 EventEmitter 接口使用事件盡快提供給應用程序。在暫停模式下,必須顯式調用 stream.read() 方法以從流中讀取數(shù)據(jù)塊。
所有的 Readable 流都以暫停模式開始,但可以通過以下方式之一切換到流動模式:
- 添加
data事件句柄。 - 調用
stream.resume()方法。 - 調用
stream.pipe()方法將數(shù)據(jù)發(fā)送到 Writable。
Readable 可以使用以下方法之一切換回暫停模式:
- 如果沒有管道目標,則通過調用
stream.pause()方法。 - 如果有管道目標,則刪除所有管道目標。 可以通過調用
stream.unpipe()方法刪除多個管道目標。
Writable
可寫流是數(shù)據(jù)寫入目標的抽象。
Writable 流的示例包括:
- 客戶端上的 HTTP 請求
- 服務器上的 HTTP 響應
- 文件系統(tǒng)寫入流
- 壓縮流
- 加密流
- TCP 套接字
- 子進程標準輸入
- process.stdout、process.stderr
其中一些示例實際上是實現(xiàn) Writable 接口的 Duplex 流。
所有的 Writable 流都實現(xiàn)了 stream.Writable 類定義的接口。
雖然 Writable 流的特定實例可能以各種方式不同,但所有的 Writable 流都遵循相同的基本使用模式,如下例所示:
const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data');drain 事件
如果對 stream.write(chunk) 的調用返回 false,則 drain 事件將在可以繼續(xù)將數(shù)據(jù)寫入流時觸發(fā)。
Duplex 與 Transform
雙工流是同時實現(xiàn) Readable 和 Writable 接口的流。
Duplex 流的示例包括:
- TCP 套接字
- 壓縮流
- 加密流
轉換流是可以在寫入和讀取數(shù)據(jù)時修改或轉換數(shù)據(jù)的雙工流。
Transform 流的示例包括:
- 壓縮流
- 加密流
以上就是Node.js中的流(Stream)的作用詳解的詳細內容,更多關于Node.js 流(Stream)作用的資料請關注腳本之家其它相關文章!
- node.js同步/異步文件讀寫-fs,Stream文件流操作實例詳解
- Node.js數(shù)據(jù)流Stream之Duplex流和Transform流用法
- Node.js數(shù)據(jù)流Stream之Readable流和Writable流用法
- node.js中stream流中可讀流和可寫流的實現(xiàn)與使用方法實例分析
- node.js使用stream模塊實現(xiàn)自定義流示例
- Node.js中你不可不精的Stream(流)
- Node.js中流(stream)的使用方法示例
- Node.js中的流(Stream)介紹
- Node.js 中的流Stream模塊簡介及如何使用流進行數(shù)據(jù)處理
相關文章
Bun入門學習教程吊打Node或Deno的現(xiàn)代JS運行時
這篇文章主要為大家介紹了一款吊打Node或Deno的現(xiàn)代JS運行時,Bun入門學習教程,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-07-07
Nodejs?連接?mysql時報Error:?Cannot?enqueue?Query?after?fa
這篇文章主要介紹了Nodejs?連接?mysql時報Error:?Cannot?enqueue?Query?after?fatal?error錯誤的處理辦法,需要的朋友可以參考下2023-05-05

