redis實現(xiàn)分布式延時隊列的示例代碼
延時隊列簡介
延時隊列是一種特殊的消息隊列,它允許將消息在一定的延遲時間后再進行消費。延時隊列的主要特點是可以延遲消息的處理時間,以滿足定時任務(wù)或者定時事件的需求。
總之,延時隊列通過延遲消息的消費時間,提供了一種方便、可靠的方式來處理定時任務(wù)和定時事件。它在分布式系統(tǒng)中具有重要的作用,能夠提高系統(tǒng)的可靠性和性能。
延時隊列的實現(xiàn)方式可以有多種,本文介紹一種redis實現(xiàn)的分布式延時隊列。
應(yīng)用場景
定時任務(wù):可以將需要在特定時間執(zhí)行的任務(wù)封裝為延時消息,通過延時隊列來觸發(fā)任務(wù)的執(zhí)行。
訂單超時處理:可以將訂單消息發(fā)送到延時隊列中,并設(shè)置訂單的超時時間,超過時間后,消費者從隊列中獲取到超時的訂單消息,進行相應(yīng)的處理。
消息重試機制:當某個消息處理失敗時,可以將該消息發(fā)送到延時隊列中,并設(shè)置一定的重試時間,超過時間后再次嘗試處理。
案例
12306火車票購買,搶了訂單后,45分鐘沒有支付,自動取消訂單
考慮
數(shù)據(jù)持久化:redis是支持的,可以使用rdb,也可以使用aof
有序存儲:因為只要最小的沒過期,后面的肯定就沒過期,這樣的話檢查最小的節(jié)點就行了,考慮使用redis中的zset結(jié)構(gòu)
高可用:考慮哨兵或者cluster
高伸縮:因為12306用戶量非常大,可能導(dǎo)致redis中存儲的任務(wù)空間非常大,所以考慮擴展節(jié)點,從這個角度來說,使用cluster集群模式,哨兵只有一個節(jié)點即主節(jié)點寫數(shù)據(jù)。
實現(xiàn)
整體思路
- 生產(chǎn)消費者模型:因為12306的用戶量非常大,所以考慮生產(chǎn)者和消費者有多個節(jié)點;
- 采用cluster模式實現(xiàn)高可用以及高伸縮性;
- 采用zset存儲延時任務(wù)(zadd key score member,score表示時間);
- 為了讓數(shù)據(jù)均勻分布在cluster集群中的多個主節(jié)點中:構(gòu)建多個zset,每個zset對應(yīng)一個消費者,生產(chǎn)者隨機向某個zset中生產(chǎn)數(shù)據(jù)。
具體實現(xiàn)
生產(chǎn)者
需要安裝hiredis-cluster集群,安裝編譯如下:
git clone https://github.com/Nordix/hiredis-cluster.git
cd hiredis-cluster
mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -
DENABLE_SSL=ON ..
make
sudo make install
sudo ldconfig
需要安裝libevent庫,最后編譯時執(zhí)行gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl編譯生產(chǎn)者可執(zhí)行程序
#include <hiredis_cluster/adapters/libevent.h>
#include <hiredis_cluster/hircluster.h>
#include <event.h>
#include <event2/listener.h>
#include <event2/bufferevent.h>
#include <event2/buffer.h>
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <sys/time.h>
int64_t g_taskid = 0;
#define MAX_KEY 10
static int64_t hi_msec_now() {
int64_t msec;
struct timeval now;
int status;
status = gettimeofday(&now, NULL);
if (status < 0) {
return -1;
}
msec = (int64_t)now.tv_sec * 1000LL + (int64_t)(now.tv_usec / 1000LL);
return msec;
}
static int _vscnprintf(char *buf, size_t size, const char *fmt, va_list args) {
int n;
n = vsnprintf(buf, size, fmt, args);
if (n <= 0) {
return 0;
}
if (n <= (int)size) {
return n;
}
return (int)(size-1);
}
static int _scnprintf(char *buf, size_t size, const char *fmt, ...) {
va_list args;
int n;
va_start(args, fmt);
n = _vscnprintf(buf, size, fmt, args);
va_end(args);
return n;
}
void connectCallback(const redisAsyncContext *ac, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", ac->errstr);
return;
}
printf("Connected to %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}
void disconnectCallback(const redisAsyncContext *ac, int status) {
if (status != REDIS_OK) {
printf("Error: %s\n", ac->errstr);
return;
}
printf("Disconnected from %s:%d\n", ac->c.tcp.host, ac->c.tcp.port);
}
void addTaskCallback(redisClusterAsyncContext *cc, void *r, void *privdata) {
redisReply *reply = (redisReply *)r;
if (reply == NULL) {
if (cc->errstr) {
printf("errstr: %s\n", cc->errstr);
}
return;
}
int64_t now = hi_msec_now() / 10;
printf("add task success reply: %lld now=%ld\n", reply->integer, now);
}
int addTask(redisClusterAsyncContext *cc, char *desc) {
/* 轉(zhuǎn)化為厘米秒 */
int64_t now = hi_msec_now() / 10;
g_taskid++;
/* key */
char key[256] = {0};
// 為了讓數(shù)據(jù)均勻分布在cluster集群中的多個主節(jié)點中:
? // 構(gòu)建多個zset,每個zset對應(yīng)一個消費者,生產(chǎn)者隨機向某個zset中生產(chǎn)數(shù)據(jù),
// 生產(chǎn)者可以有很多個,只需要保證向task_group:0-task_group:9中均勻的生產(chǎn)數(shù)據(jù)即可
int len = _scnprintf(key, 255, "task_group:%ld", g_taskid % MAX_KEY);
key[len] = '\0';
/* member */
char mem[1024] = {0};
len = _scnprintf(mem, 1023, "task:%ld:%s", g_taskid, desc);
mem[len] = '\0';
int status;
// 為每一個任務(wù)延時5秒中去處理
status = redisClusterAsyncCommand(cc, addTaskCallback, "",
"zadd %s %ld %s", key, now+500, mem);
printf("redisClusterAsyncCommand:zadd %s %ld %s\n", key, now+500, mem);
if (status != REDIS_OK) {
printf("error: err=%d errstr=%s\n", cc->err, cc->errstr);
}
return 0;
}
void stdio_callback(struct bufferevent *bev, void *arg) {
redisClusterAsyncContext *cc = (redisClusterAsyncContext *)arg;
struct evbuffer *evbuf = bufferevent_get_input(bev);
char *msg = evbuffer_readln(evbuf, NULL, EVBUFFER_EOL_LF);
if (!msg) return;
if (strcmp(msg, "quit") == 0) {
printf("safe exit!!!\n");
exit(0);
return;
}
if (strlen(msg) > 1024-5-13-1) {
printf("[err]msg is too long, try again...\n");
return;
}
addTask(cc, msg);
printf("stdio read the data: %s\n", msg);
}
int main(int argc, char **argv) {
printf("Connecting...\n");
// 連接cluster集群,可以從cluster集群中任意一個節(jié)點出發(fā)連接集群
redisClusterAsyncContext *cc =
redisClusterAsyncConnect("127.0.0.1:7006", HIRCLUSTER_FLAG_NULL);
printf("redisClusterAsyncContext...\n");
if (cc && cc->err) {
printf("Error: %s\n", cc->errstr);
return 1;
}
struct event_base *base = event_base_new();
redisClusterLibeventAttach(cc, base);
redisClusterAsyncSetConnectCallback(cc, connectCallback);
redisClusterAsyncSetDisconnectCallback(cc, disconnectCallback);
// nodeIterator ni;
// initNodeIterator(&ni, cc->cc);
// cluster_node *node;
// while ((node = nodeNext(&ni)) != NULL) {
// printf("node %s:%d role:%d pad:%d\n", node->host, node->port, node->role, node->pad);
// }
struct bufferevent *ioev = bufferevent_socket_new(base, 0, BEV_OPT_CLOSE_ON_FREE);
bufferevent_setcb(ioev, stdio_callback, NULL, NULL, cc);
bufferevent_enable(ioev, EV_READ | EV_PERSIST);
printf("Dispatch..\n");
event_base_dispatch(base);
printf("Done..\n");
redisClusterAsyncFree(cc);
event_base_free(base);
return 0;
}
// 需要安裝 hiredis-cluster libevent
// gcc producer.c -o producer -levent -lhiredis_cluster -lhiredis -lhiredis_ssl
說明:
這里構(gòu)建了10個zset,分別是task_group:0,task_group:1,…,task_group:9作為10個zset的key,zset的數(shù)據(jù)其實就代表著消費者的數(shù)量,通常消費者的功能是一摸一樣的,生產(chǎn)者就不管你有多少個了,只需要將任務(wù)均勻的打散在不同的zset中就行了(具體實現(xiàn)可以搞一個全局的id,每一次添加任務(wù)時id++,然后再對zset個數(shù)10取模,最終可以得到0-9之間的一個數(shù),然后再與task_group拼接,這樣就可以將任務(wù)均勻的打散在不同的zset中)。
消費者
消費者是采用skynet+lua腳本實現(xiàn)的,每個消費者會不斷的去檢查redis中的任務(wù)有沒有過期,如果過期,就取出來刪除(這里只是demo,只是打印之后刪除任務(wù))
local skynet = require "skynet"
local function table_dump( object )
if type(object) == 'table' then
local s = '{ '
for k,v in pairs(object) do
if type(k) ~= 'number' then k = string.format("%q", k) end
s = s .. '['..k..'] = ' .. table_dump(v) .. ','
end
return s .. '} '
elseif type(object) == 'function' then
return tostring(object)
elseif type(object) == 'string' then
return string.format("%q", object)
else
return tostring(object)
end
end
local mode, key = ...
if mode == "slave" then
local rediscluster = require "skynet.db.redis.cluster"
local function onmessage(data,channel,pchannel)
print("onmessage",data,channel,pchannel)
end
skynet.start(function ()
local db = rediscluster.new({
{host="127.0.0.1",port=7001},
},
{read_slave=true,auth=nil,db=0,},
onmessage
)
assert(db, "redis-cluster startup error")
skynet.fork(function ()
while true do
local res = db:zrange(key, 0, 0, "withscores")
if not next(res) then
skynet.sleep(50)
else
local expire = tonumber(res[2])
local now = skynet.time()*100
if now >= expire then
print(("%s is comsumed:expire_time:%d"):format(res[1], expire))
db:zrem(key, res[1])
else
skynet.sleep(10)
end
end
end
end)
end)
else
skynet.start(function () -- // 啟動10個程序,并把"slave"傳入mode,task_group:i傳入到key中,即每個程序只消費一個
for i=0,9 do
skynet.newservice(SERVICE_NAME, "slave", "task_group:"..i)
運行結(jié)果

redis分布式延時隊列優(yōu)勢
1.Redis zset支持高性能的 score 排序。
2.Redis是在內(nèi)存上進行操作的,速度非???。
3.Redis可以搭建集群,當消息很多時候,我們可以用集群來提高消息處理的速度,提高可用性。
4.Redis具有持久化機制,當出現(xiàn)故障的時候,可以通過AOF和RDB方式來對數(shù)據(jù)進行恢復(fù),保證了數(shù)據(jù)的可靠性
redis分布式延時隊列劣勢
使用 Redis 實現(xiàn)的延時消息隊列也存在數(shù)據(jù)持久化, 消息可靠性的問題:
- 沒有重試機制 - 處理消息出現(xiàn)異常沒有重試機制, 這些需要自己去實現(xiàn), 包括重試次數(shù)的實現(xiàn)等;
- 沒有 ACK 機制 - 例如在獲取消息并已經(jīng)刪除了消息情況下, 正在處理消息的時候客戶端崩潰了, 這條正在處理的這些消息就會丟失, MQ 是需要明確的返回一個值給 MQ 才會認為這個消息是被正確的消費了。
總結(jié):如果對消息可靠性要求較高, 推薦使用 MQ 來實現(xiàn)
以上就是redis實現(xiàn)分布式延時隊列的示例代碼的詳細內(nèi)容,更多關(guān)于redis分布式延時隊列的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Redis高并發(fā)情況下并發(fā)扣減庫存項目實戰(zhàn)
本文主要介紹了Redis高并發(fā)情況下并發(fā)扣減庫存項目實戰(zhàn),文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2022-04-04
Redis數(shù)據(jù)庫中實現(xiàn)分布式鎖的方法
這篇文章主要介紹了Redis數(shù)據(jù)庫中實現(xiàn)分布式鎖的方法,Redis是一個高性能的主存式數(shù)據(jù)庫,需要的朋友可以參考下2015-06-06

