Node.js Stream ondata觸發(fā)時機與順序的探索
上次寫Stream pipe細節(jié)時,在源碼中發(fā)現(xiàn)一段無用邏輯,由此引發(fā)了對Stream data事件觸發(fā)時機與順序的探索。
無用邏輯
當時研究pipe細節(jié)是基于Node.js v8.11.1的源碼,其中針對上游的ondata事件處理有如下一段代碼:
// If the user pushes more data while we're writing to dest then we'll end up
// in ondata again. However, we only want to increase awaitDrain once because
// dest will only emit one 'drain' event for the multiple writes.
// => Introduce a guard on increasing awaitDrain.
var increasedAwaitDrain = false;
src.on('data', ondata);
function ondata(chunk) {
debug('ondata');
increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {
if (((state.pipesCount === 1 && state.pipes === dest) ||
(state.pipesCount > 1 && state.pipes.indexOf(dest) !== -1)) &&
!cleanedUp) {
debug('false write response, pause', src._readableState.awaitDrain);
src._readableState.awaitDrain++;
increasedAwaitDrain = true;
}
src.pause();
}
}
重點關注increasedAwaitDrain變量,理解這個變量期望達到什么目的,然后仔細閱讀代碼,會發(fā)現(xiàn)if (false === ret && !increasedAwaitDrain)語句中increasedAwaitDrain變量肯定是false,因為前一行才將該變量賦值為false,這樣一來這個變量就變得毫無意義。
increasedAwaitDrain = false;
var ret = dest.write(chunk);
if (false === ret && !increasedAwaitDrain) {}
以上就是關鍵的三行代碼,因為Node.js是單線程且dest.write(chunk)內部沒有修改變量increasedAwaitDrain的值,那么if語句中increasedAwaitDrain的值肯定還是false,即increasedAwaitDrain相關邏輯沒有達到所期望的目標。
無用代碼出現(xiàn)的原因
前段雖已經分析出increasedAwaitDrain沒起到作用,但作者為什么寫了這樣一段邏輯呢?其實在定義increasedAwaitDrain語句的上方,作者說可能存在這樣一種情況:“當我們接收到一次上游的ondata事件并嘗試將數(shù)據(jù)寫到下游時,上游可能同時又有一個data事件觸發(fā),而這兩個ondata的數(shù)據(jù)在寫入下游時可能都返回false,從而導致src._readableState.awaitDrain++執(zhí)行兩次”。
awaitDrain++執(zhí)行兩次是作者不希望看到的情況,因為下游觸發(fā)drain事件時awaitDrain相應減1,直到其值為0時才讓上游重新流動,如果awaitDrain++執(zhí)行兩次,下游卻只觸發(fā)一次drain事件,awaitDrain就不會為0,上游不重新流動也就無法繼續(xù)讀取數(shù)據(jù)。
真相的探索過程
雖然從理性上認為increasedAwaitDrain沒起到作用,但也無法肯定加絕對,自己嘗試去求助,沒有出現(xiàn)高手指點出問題所在,但一個同事聽我描述后,說可能這就是個BUG,雖心中覺得可能性不大,但還是抱著試試看的心態(tài)切換到master分支上去瞅瞅,隨即發(fā)現(xiàn)最新的代碼里并沒有與increasedAwaitDrain類似的邏輯,間接說明v8.11.1分支上increasedAwaitDrain相關邏輯的確無用。
雖然比較肯定這里存在一段無用代碼,但應該如何理解作者在increasedAwaitDrain上方的注釋呢?為了進一步揭露真相,自己繼續(xù)花時間去看了看stream.Readable相關代碼,想知道data事件的觸發(fā)時機與順序是如何決定的。
readable流的簡單原理
在進一步解釋data事件的觸發(fā)順序前,簡單講一下readable流的實現(xiàn)原理,如果需要自己實現(xiàn)一個readable流,可以使用new stream.Readable(options)方法,其中options可包含四個屬性:highWaterMark、encoding、objectMode、read。最主要的是read屬性,當流的使用者需要數(shù)據(jù)時,read方法被用來從數(shù)據(jù)源獲取數(shù)據(jù),然后通過this.push(chunk)將數(shù)據(jù)傳遞給使用者,如果沒有更多數(shù)據(jù)可供讀取時使用this.push(null)表示讀取結束。
const Readable = require('stream').Readable;
let letter = 'ABCDEFG'.split('');
let index = 0;
const rs = new Readable({
read(size) {
this.push(letter[index++] || null);
}
});
rs.on('data', chunk => {
console.log(chunk.toString());
});
// 輸出
// A
// B
// C
// ...
這里ondata雖然沒有明顯調用read方法,但內部依舊是通過調用read方法結合this.push輸出數(shù)據(jù),并且在源代碼內部可以發(fā)現(xiàn)通過參數(shù)傳遞的read方法實際上被賦值給this._read,然后在Readable.prototype.read中調用this._read獲取數(shù)據(jù)。
靈魂代碼
為了進一步說明stream.Readable的data事件觸發(fā)順序與場景,將有關官方源碼經過修改和刪減成如下:
function Readable(options) {
this._read = options.read; // 將參數(shù)傳遞的read函數(shù)賦值到this._read
}
// 使用者通過調用read方法獲取數(shù)據(jù)
Readable.prototype.read = function (size) {
var state = this._readableState;
// 模擬鎖,一次_read如果沒有返回(this.push),后續(xù)read不會繼續(xù)調用_read讀取數(shù)據(jù)
if (!state.reading) {
state.reading = true;
state.sync = true; // sync用于在push方法中指示_read內部是否同步調用了push
this._read(size);
state.sync = false;
}
// _read內部如果是同步調用push,數(shù)據(jù)會放入緩沖區(qū)
// _read內部如果是異步調用push且緩沖區(qū)沒有內容,數(shù)據(jù)可能emit data返回
// 嘗試從緩沖區(qū)(state.buffer)中獲取大小為size的數(shù)據(jù),如果獲取成功則觸發(fā)data事件
if (ret)
this.emit('data', ret);
return ret;
};
// 在this._read執(zhí)行過程中通過this.push輸出數(shù)據(jù)
Readable.prototype.push = function (chunk, encoding) {
var state = this._readableState;
// 本次_read獲取到數(shù)據(jù),打開鎖
state.reading = false;
// 流動模式 & 緩沖區(qū)沒有數(shù)據(jù) & 非同步返回,則直接觸發(fā)data事件
if (state.flowing && state.length === 0 && !state.sync) {
stream.emit('data', chunk);
stream.read(0); // 觸發(fā)下一次讀取,_read異步push的話還是會到這里,類似flow中的保持流出于流動
}
else {
// 將數(shù)據(jù)放入緩沖區(qū)
state.length += chunk.length;
state.buffer.push(chunk);
}
};
// 暫停流動
Readable.prototype.pause = function() {
if (this._readableState.flowing !== false) {
this._readableState.flowing = false;
this.emit('pause');
}
return this;
};
function flow(stream) {
const state = stream._readableState;
while (state.flowing && stream.read() !== null);
}
data事件的觸發(fā)時機與順序
時機
data的觸發(fā)只有兩處:
- 流如果處于流動模式 & 緩沖區(qū)沒有數(shù)據(jù) & 異步調用push,此時數(shù)據(jù)不經過緩沖區(qū),直接觸發(fā)data事件
- 不滿足上述情況時,push的數(shù)據(jù)會被放入緩沖區(qū),然后再嘗試從緩沖區(qū)讀取指定size的數(shù)據(jù)并觸發(fā)data事件
順序
關于data的觸發(fā)順序,實際是由emit順序決定,為討論原始問題:“increasedAwaitDrain相關邏輯為什么可以被刪除?”,將代碼簡化:
let count = 0;
src.on('data', chunk => {
let ret = dest.write(chunk);
if (!ret) {
count++;
src.pause();
}
});
當監(jiān)聽流的data事件時,流最終會通過resume并調用flow函數(shù)進入流動模式模式,即不斷的調用read方法讀取數(shù)據(jù)。接下來分析以下幾種場景,當dest.write(chunk)返回false時++count會執(zhí)行幾次,注意結合前文的靈魂代碼。
- 場景一:每次_read同步push一次數(shù)據(jù)
當發(fā)生第一次讀取,數(shù)據(jù)同步push到緩沖區(qū),緊接著從緩沖區(qū)中讀取數(shù)據(jù)并通過emit data的方式傳遞到ondata中,如果此時dest.write(chunk)返回false,count++將執(zhí)行一次,接著由于調用了stream.pause(),while條件state.flowing為false導致stream.read不再被調用,在流重新流動前,count的值不會繼續(xù)增加。
- 場景二:每次_read異步push一次數(shù)據(jù)
當發(fā)生第一次讀取,異步push的數(shù)據(jù)將直接通過emit data傳遞到ondata中,而read函數(shù)中的emit由于無法從緩沖區(qū)讀取數(shù)據(jù)從而不會觸發(fā),同時read返回null導致while循環(huán)也相應停止,此種情況下異步push觸發(fā)data事件后,緊接著的stream.read(0)會繼續(xù)保持流的流動,當dest.write(chunk)返回false,count++執(zhí)行一次并將流暫停,緊接著會繼續(xù)調用一次read,但這次數(shù)據(jù)將被放入緩沖區(qū)且不觸發(fā)data事件,count++依舊只執(zhí)行一次。
場景二流暫停一次后再次流動時,數(shù)據(jù)消耗模式與之前會有所差異,會優(yōu)先消耗緩沖區(qū)數(shù)據(jù)直至為空時回到之前的模式,但這同樣不會導致count++執(zhí)行多次。
- 場景三:每次_read多次同步push數(shù)據(jù)
與場景一類似,只是每次_read會多次往緩沖區(qū)寫入數(shù)據(jù),最終data事件還是依靠從緩沖區(qū)讀數(shù)據(jù)后觸發(fā)。
- 場景四:每次_read多次異步push數(shù)據(jù)
同場景二類似,假設在一次_read中有兩次異步push,當?shù)谝粋€異步push執(zhí)行時,data事件觸發(fā)且其中的dest.write(chunk)返回false,導致count++同時流被暫停,等第二個異步push執(zhí)行時,由于流已經暫停,數(shù)據(jù)將寫入緩沖區(qū)而不是觸發(fā)data事件,所以count++只執(zhí)行一次。
- 場景五:_read操作可能同步或異步push
不管是同步或者異步push,當一次ondata內部將流設置為暫停模式后,flow函數(shù)中while條件state.flowing為false將導致stream.read不再調用,異步的push的emit data判斷條件同樣不再滿足,即目前階段內部不會再有data事件觸發(fā)直到外部再次間接或直接調用read方法。
以上五個場景是為了分析該問題而模擬的,實際只要能理解第五個場景就能明白所有。
小結
文章最終寫出來的內容與我最開始的初衷所偏離,而且自己不知道如何評價這篇文章的好壞,但為了寫這文章花了兩天業(yè)余時間去深入理解stream.Readable卻是非常有收獲的一件事情,更堅定自己在寫文章的路途上可以走的更遠。
PS:猜測為什么有爛電影的存在,可能是因為導演長時間投入的創(chuàng)作會讓他迷失在內部而無法發(fā)現(xiàn)問題,寫文章也是,難以通過閱讀去優(yōu)化費心思寫的文章。
PS:下圖是美團博客的,也許我寫了這么多卻抵不上這張圖,說明方式很重要。

總結
以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對腳本之家的支持。如果你想了解更多相關內容請查看下面相關鏈接
相關文章
nodejs使用readline逐行讀取和寫入文件的實現(xiàn)
這篇文章給大家介紹了nodejs使用readline逐行讀取和寫入文件的實現(xiàn)方法,文中通過代碼示例給大家講解的非常詳細,對大家的學習或工作有一定的幫助,需要的朋友可以參考下2024-01-01

