使用p-limit?限制并發(fā)數(shù)源碼解析
前言
并發(fā)是指在同一時間內(nèi)處理的任務(wù)數(shù)量。例如,在一臺服務(wù)器上同時運(yùn)行多個 Web 服務(wù)器線程,可以同時處理多個客戶端的請求。有時為了程序的穩(wěn)定運(yùn)行,我們需要限制并發(fā)的數(shù)量,p-limit 就是一個用js實(shí)現(xiàn)的控制并發(fā)數(shù)的庫。
源碼地址:sindresorhus/p-limit
使用
下面是官方提供的使用示例:
import pLimit from 'p-limit';
const limit = pLimit(1);
const input = [
limit(() => fetchSomething('foo')),
limit(() => fetchSomething('bar')),
limit(() => doSomething())
];
// Only one promise is run at once
const result = await Promise.all(input);
console.log(result);
在代碼的第一行,使用了 pLimit(1) 來創(chuàng)建一個 p-limit 實(shí)例,并將并發(fā)限制設(shè)為 1。這意味著,在任意時刻,只能有一個 Promise 在運(yùn)行。
在第四行,使用了 limit(() => fetchSomething('foo')) 來包裝一個異步函數(shù),并返回一個 Promise。同樣的方式,在第五、六行也包裝了其他兩個異步函數(shù)。
最后,使用 Promise.all 方法來等待所有 Promise 的完成,并在完成后將結(jié)果輸出到控制臺。由于 p-limit 的限制,這些 Promise 只會按順序一個一個地運(yùn)行,保證了并發(fā)的數(shù)量不會超過 1。
源碼分析
import Queue from 'yocto-queue';
export default function pLimit(concurrency) {
if (!((Number.isInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency > 0)) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
const queue = new Queue();
let activeCount = 0;
}
yocto-queue 是一種允許高效存儲和檢索數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)。前邊的章節(jié)分析過它的源碼,詳情參見: 源碼共讀|yocto-queue 隊(duì)列 鏈表
pLimit 函數(shù)接受一個參數(shù),并發(fā)數(shù),首先函數(shù)判斷參數(shù)是否是數(shù)組類型,或者是否能夠轉(zhuǎn)換成數(shù)字類型,如果不能,拋出一個錯誤。
之后定義了一個隊(duì)列來存儲待執(zhí)行的函數(shù),并使用一個計(jì)數(shù)器來記錄當(dāng)前正在運(yùn)行的函數(shù)的數(shù)量。
const next = () => {
activeCount--;
if (queue.size > 0) {
queue.dequeue()();
}
};
const run = async (fn, resolve, args) => {
activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {}
next();
};
在代碼的 next 函數(shù)中,如果隊(duì)列不為空,則從隊(duì)列中取出一個函數(shù)并執(zhí)行。這個函數(shù)的執(zhí)行會導(dǎo)致計(jì)數(shù)器的值減 1。
在代碼的 run 函數(shù)中,使用了 async/await 語法來執(zhí)行傳入的函數(shù) fn。它還使用了 resolve 函數(shù)將函數(shù)的返回值包裝成一個 Promise,并將這個 Promise 返回給調(diào)用者。在函數(shù)執(zhí)行完成后,調(diào)用 next 函數(shù)來執(zhí)行下一個函數(shù)。
const enqueue = (fn, resolve, args) => {
queue.enqueue(run.bind(undefined, fn, resolve, args));
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// when the run function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
await Promise.resolve();
if (activeCount < concurrency && queue.size > 0) {
queue.dequeue()();
}
})();
};
在代碼的 enqueue 函數(shù)中,使用了 queue.enqueue 方法將傳入的函數(shù) fn 加入隊(duì)列。然后,它使用了 async/await 語法來在下一個微任務(wù)中檢查當(dāng)前的并發(fā)數(shù)量是否小于設(shè)定的并發(fā)限制。如果是,則從隊(duì)列中取出一個函數(shù)并執(zhí)行。
const generator = (fn, ...args) => new Promise(resolve => {
enqueue(fn, resolve, args);
});
Object.defineProperties(generator, {
activeCount: {
get: () => activeCount,
},
pendingCount: {
get: () => queue.size,
},
clearQueue: {
value: () => {
queue.clear();
},
},
});
return generator;
在代碼的 generator 函數(shù)中,使用了 new Promise 語法來生成一個新的 Promise,并在其中調(diào)用了 enqueue 函數(shù)。這樣,每次調(diào)用生成的函數(shù)時,都會生成一個新的 Promise,并將函數(shù)加入隊(duì)列。
最后,使用了 Object.defineProperties 方法來為生成的函數(shù)添加屬性。這些屬性可以用來查詢當(dāng)前的并發(fā)數(shù)量和等待隊(duì)列的大小,以及清空等待隊(duì)列。
總結(jié)
控制并發(fā)的實(shí)現(xiàn)方式有很多種。例如,可以使用信號量或隊(duì)列來控制并發(fā)請求的數(shù)量。也可以使用計(jì)數(shù)器或令牌桶算法來限制請求的頻率。還可以使用拒絕服務(wù)(DoS)防護(hù)系統(tǒng)來檢測異常請求流量并進(jìn)行限制
以上就是使用p-limit 限制并發(fā)數(shù)源碼解析的詳細(xì)內(nèi)容,更多關(guān)于p-limit限制并發(fā)數(shù)的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
微信小程序 <swiper-item>標(biāo)簽傳入數(shù)據(jù)
這篇文章主要介紹了微信小程序 <swiper-item>標(biāo)簽傳入數(shù)據(jù)的相關(guān)資料,需要的朋友可以參考下2017-05-05
Web應(yīng)用開發(fā)TypeScript使用詳解
這篇文章主要為大家介紹了Web應(yīng)用開發(fā)TypeScript的使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-05-05

