Node.js + Redis Sorted Set實(shí)現(xiàn)任務(wù)隊(duì)列
需求:功能 A 需要調(diào)用第三方 API 獲取數(shù)據(jù),而第三方 API 自身是異步處理方式,在調(diào)用后會(huì)返回?cái)?shù)據(jù)與狀態(tài) { data: "查詢結(jié)果", "status": "正在異步處理中" } ,這樣就需要間隔一段時(shí)間后再去調(diào)用第三方 API 獲取數(shù)據(jù)。為了用戶在使用功能 A 時(shí)不會(huì)因?yàn)榈谌?API 正在異步處理中而必須等待,將用戶請(qǐng)求加入任務(wù)隊(duì)列中,返回部分?jǐn)?shù)據(jù)并關(guān)閉請(qǐng)求。然后定時(shí)從任務(wù)隊(duì)列里中取出任務(wù)調(diào)用第三方 API,若返回狀態(tài)為”異步處理中“,將該任務(wù)再次加入任務(wù)隊(duì)列,若返回狀態(tài)為”已處理完畢“,將返回?cái)?shù)據(jù)入庫(kù)。
根據(jù)以上問(wèn)題,想到使用 Node.js + Redis sorted set 來(lái)實(shí)現(xiàn)任務(wù)隊(duì)列。Node.js 實(shí)現(xiàn)自身應(yīng)用 API 用來(lái)接受用戶請(qǐng)求,合并數(shù)據(jù)庫(kù)已存數(shù)據(jù)與 API 返回的部分?jǐn)?shù)據(jù)返回給用戶,并將任務(wù)加入到任務(wù)隊(duì)列中。利用 Node.js child process 與 cron 定時(shí)從任務(wù)隊(duì)列中取出任務(wù)執(zhí)行。
在設(shè)計(jì)任務(wù)隊(duì)列的過(guò)程中需要考慮到的幾個(gè)問(wèn)題
- 并行執(zhí)行多個(gè)任務(wù)
- 任務(wù)唯一性
- 任務(wù)成功或失敗后的處理
針對(duì)以上問(wèn)題的解決方案
- 并行執(zhí)行多個(gè)任務(wù)利用 Promise.all 來(lái)實(shí)現(xiàn)
- 任務(wù)唯一性利用 Redis sorted set 來(lái)實(shí)現(xiàn)。使用時(shí)間戳作為分值可以實(shí)現(xiàn)將 sorted set 作為 list 來(lái)使用,在加入任務(wù)時(shí)判斷任務(wù)是否已經(jīng)存在,在取出任務(wù)執(zhí)行時(shí)將該任務(wù)分值設(shè)置為 0,每次取出分值大于 0 的任務(wù)來(lái)執(zhí)行,可以避免重復(fù)執(zhí)行任務(wù)。
- 執(zhí)行任務(wù)成功后刪除任務(wù),執(zhí)行任務(wù)失敗后將任務(wù)分值更新為當(dāng)前時(shí)間時(shí)間戳,這樣就可以將失敗的任務(wù)重新加入任務(wù)隊(duì)列尾部
示例代碼
// remote_api.js 模擬第三方 API
'use strict';
const app = require('express')();
app.get('/', (req, res) => {
setTimeout(() => {
let arr = [200, 300]; // 200 代表成功,300 代表失敗需要重新請(qǐng)求
res.status(200).send({ 'status': arr[parseInt(Math.random() * 2)] });
}, 3000);
});
app.listen('9001', () => {
console.log('API 服務(wù)監(jiān)聽端口:9001');
});
// producer.js 自身應(yīng)用 API,用來(lái)接受用戶請(qǐng)求并將任務(wù)加入任務(wù)隊(duì)列
'use strict';
const app = require('express')();
const redisClient = require('redis').createClient();
const QUEUE_NAME = 'queue:example';
function addTaskToQueue(taskName, callback) {
// 先判斷任務(wù)是否已經(jīng)存在,存在:跳過(guò),不存在:加入任務(wù)隊(duì)列
redisClient.zscore(QUEUE_NAME, taskName, (error, task) => {
if (error) {
console.log(error);
} else {
if (task) {
console.log('任務(wù)已存在,不新增相同任務(wù)');
callback(null, task);
} else {
redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
});
}
}
});
}
app.get('/', (req, res) => {
let taskName = req.query['task-name'];
addTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
res.status(200).send('正在查詢中......');
}
});
});
app.listen(9002, () => {
console.log('生產(chǎn)者服務(wù)監(jiān)聽端口:9002');
});
// consumer.js 定時(shí)獲取任務(wù)并執(zhí)行
'use strict';
const redisClient = require('redis').createClient();
const request = require('request');
const schedule = require('node-schedule');
const QUEUE_NAME = 'queue:expmple';
const PARALLEL_TASK_NUMBER = 2; // 并行執(zhí)行任務(wù)數(shù)量
function getTasksFromQueue(callback) {
// 獲取多個(gè)任務(wù)
redisClient.zrangebyscore([QUEUE_NAME, 1, new Date().getTime(), 'LIMIT', 0, PARALLEL_TASK_NUMBER], (error, tasks) => {
if (error) {
callback(error);
} else {
// 將任務(wù)分值設(shè)置為 0,表示正在處理
if (tasks.length > 0) {
let tmp = [];
tasks.forEach((task) => {
tmp.push(0);
tmp.push(task);
});
redisClient.zadd([QUEUE_NAME].concat(tmp), (error, result) => {
if (error) {
callback(error);
} else {
callback(null, tasks)
}
});
}
}
});
}
function addFailedTaskToQueue(taskName, callback) {
redisClient.zadd(QUEUE_NAME, new Date().getTime(), taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
});
}
function removeSucceedTaskFromQueue(taskName, callback) {
redisClient.zrem(QUEUE_NAME, taskName, (error, result) => {
if (error) {
callback(error);
} else {
callback(null, result);
}
})
}
function execTask(taskName) {
return new Promise((resolve, reject) => {
let requestOptions = {
'url': 'http://127.0.0.1:9001',
'method': 'GET',
'timeout': 5000
};
request(requestOptions, (error, response, body) => {
if (error) {
resolve('failed');
console.log(error);
addFailedTaskToQueue(taskName, (error) => {
if (error) {
console.log(error);
} else {
}
});
} else {
try {
body = typeof body !== 'object' ? JSON.parse(body) : body;
} catch (error) {
resolve('failed');
console.log(error);
addFailedTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
}
});
return;
}
if (body.status !== 200) {
resolve('failed');
addFailedTaskToQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
}
});
} else {
resolve('succeed');
removeSucceedTaskFromQueue(taskName, (error, result) => {
if (error) {
console.log(error);
} else {
}
});
}
}
});
});
}
// 定時(shí),每隔 5 秒獲取新的任務(wù)來(lái)執(zhí)行
let job = schedule.scheduleJob('*/5 * * * * *', () => {
console.log('獲取新任務(wù)');
getTasksFromQueue((error, tasks) => {
if (error) {
console.log(error);
} else {
if (tasks.length > 0) {
console.log(tasks);
Promise.all(tasks.map(execTask))
.then((results) => {
console.log(results);
})
.catch((error) => {
console.log(error);
});
}
}
});
});
- 在Node.js應(yīng)用中讀寫Redis數(shù)據(jù)庫(kù)的簡(jiǎn)單方法
- Node.js開發(fā)之訪問(wèn)Redis數(shù)據(jù)庫(kù)教程
- Node.js操作redis實(shí)現(xiàn)添加查詢功能
- node.js利用redis數(shù)據(jù)庫(kù)緩存數(shù)據(jù)的方法
- 在Node.js應(yīng)用中使用Redis的方法簡(jiǎn)介
- Node.js與Sails redis組件的使用教程
- node.js使用redis儲(chǔ)存session的方法
- 提升node.js中使用redis的性能遇到的問(wèn)題及解決方法
- node.js中 redis 的安裝和基本操作示例
相關(guān)文章
Node.js服務(wù)器環(huán)境下使用Mock.js攔截AJAX請(qǐng)求的教程
Mock.js這個(gè)JavaScript庫(kù)最常見的用法便是被用來(lái)攔截AJAX請(qǐng)求,well,這里我們就來(lái)看一下Node.js服務(wù)器環(huán)境下使用Mock.js攔截AJAX請(qǐng)求的教程:2016-05-05
Node.js學(xué)習(xí)之內(nèi)置模塊fs用法示例
這篇文章主要介紹了Node.js學(xué)習(xí)之內(nèi)置模塊fs用法,結(jié)合實(shí)例形式詳細(xì)分析了node.js內(nèi)置模塊fs的基本功能、用法與相關(guān)操作注意事項(xiàng),需要的朋友可以參考下2020-01-01
node將geojson轉(zhuǎn)shp返回給前端的實(shí)現(xiàn)方法
這篇文章主要介紹了node將geojson轉(zhuǎn)shp返回給前端的實(shí)現(xiàn)方法,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-05-05
用C/C++來(lái)實(shí)現(xiàn) Node.js 的模塊(二)
上篇文章的主要內(nèi)容講訴了用C/C++來(lái)實(shí)現(xiàn) Node.js 的模塊,本文更深一步繼續(xù)探討這個(gè)問(wèn)題,有需要的朋友可以參考下2014-09-09
node.js中的fs.readlinkSync方法使用說(shuō)明
這篇文章主要介紹了node.js中的fs.readlinkSync方法使用說(shuō)明,本文介紹了fs.readlinkSync方法說(shuō)明、語(yǔ)法、接收參數(shù)、使用實(shí)例和實(shí)現(xiàn)源碼,需要的朋友可以參考下2014-12-12
node爬取新型冠狀病毒的疫情實(shí)時(shí)動(dòng)態(tài)
這篇文章主要介紹了node爬取新型冠狀病毒的疫情實(shí)時(shí)動(dòng)態(tài),非常不錯(cuò),本文通過(guò)實(shí)例代碼給大家講解的非常詳細(xì),需要的朋友可以參考下2020-02-02
Node.js使用Express創(chuàng)建Web項(xiàng)目詳細(xì)教程
如果需要入門使用node.js進(jìn)行web開發(fā),正在學(xué)習(xí) nodejs web開發(fā)指南 的和想快速了解node.js web開發(fā)模式的朋友,相信本文是有一定幫助意義的。2017-03-03

