golang實(shí)現(xiàn)延遲隊(duì)列(delay queue)的兩種實(shí)現(xiàn)
1 延遲隊(duì)列:郵件提醒、訂單自動(dòng)取消
延遲隊(duì)列:處理需要在未來(lái)某個(gè)特定時(shí)間執(zhí)行的任務(wù)。這些任務(wù)被添加到隊(duì)列中,并且指定了一個(gè)執(zhí)行時(shí)間,只有達(dá)到指定的時(shí)間點(diǎn)時(shí)才能從隊(duì)列中取出并執(zhí)行。
應(yīng)用場(chǎng)景:
- 郵件提醒
- 訂單自動(dòng)取消(超過(guò)多少時(shí)間未支付,就取消訂單)
- 對(duì)超時(shí)任務(wù)的處理等
由于任務(wù)的執(zhí)行是在未來(lái)的某個(gè)時(shí)間點(diǎn),因此這些任務(wù)不會(huì)立即執(zhí)行,而是存儲(chǔ)在隊(duì)列中,直到它的預(yù)定執(zhí)行時(shí)間才會(huì)被執(zhí)行。
2 實(shí)現(xiàn)
2.1 simple簡(jiǎn)單版:go自帶的time包實(shí)現(xiàn)
思路:
定義Task結(jié)構(gòu)體,包含
- ExecuteTime time.Time
- Job func()
定義DelayQueue
- TaskQueue []Task
- func AddTask
- func RemoveTask
- ExecuteTask
這種方案存在的問(wèn)題:
Go程序重啟時(shí),存儲(chǔ)在slice中的延遲處理任務(wù)將全部丟失
完整代碼:
package main
import (
"fmt"
"time"
)
/*
基于go實(shí)現(xiàn)延遲隊(duì)列
*/
type Task struct {
ExecuteTime time.Time
Job func()
}
type DelayQueue struct {
Tasks []*Task
}
func (d *DelayQueue) AddTask(t *Task) {
d.Tasks = append(d.Tasks, t)
}
func (d *DelayQueue) RemoveTask() {
//FIFO: remove the first task to enqueue
d.Tasks = d.Tasks[1:]
}
func (d *DelayQueue) ExecuteTask() {
for len(d.Tasks) > 0 {
//dequeue a task
currentTask := d.Tasks[0]
if time.Now().Before(currentTask.ExecuteTime) {
//if the task execution time is not up, wait
time.Sleep(currentTask.ExecuteTime.Sub(time.Now()))
}
//execute the task
currentTask.Job()
//remove task who has been executed
d.RemoveTask()
}
}
func main() {
fmt.Println("start delayQueue")
delayQueue := &DelayQueue{}
firstTask := &Task{
ExecuteTime: time.Now().Add(time.Second * 1),
Job: func() {
fmt.Println("executed task 1 after delay")
},
}
delayQueue.AddTask(firstTask)
secondTask := &Task{
ExecuteTime: time.Now().Add(time.Second * 7),
Job: func() {
fmt.Println("executed task 2 after delay")
},
}
delayQueue.AddTask(secondTask)
delayQueue.ExecuteTask()
fmt.Println("all tasks have been done!!!")
}
效果:

2.2 complex持久版:go+redis
為了防止Go重啟后存儲(chǔ)到delayQueue的數(shù)據(jù)丟失,我們可以將任務(wù)持久化到redis中。
思路:
初始化redis連接
延遲隊(duì)列采用redis的zset(有序集合)實(shí)現(xiàn)
前置準(zhǔn)備:
# 安裝docker
yum install -y yum-utils
yum-config-manager \
--add-repo \
https://download.docker.com/linux/centos/docker-ce.repo
yum install docker
systemctl start docker
# docker搭建redis
mkdir -p /Users/ziyi2/docker-home/redis
docker run -d --name redis -v /Users/ziyi2/docker-home/redis:/data -p 6379:6379 redis
完整代碼:
package main
import (
"fmt"
"github.com/go-redis/redis"
log "github.com/ziyifast/log"
"time"
)
/*
基于redis zset實(shí)現(xiàn)延遲隊(duì)列
*/
var redisdb *redis.Client
var DelayQueueKey = "delay-queue"
func initClient() (err error) {
redisdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
Password: "", // not set password
DB: 0, //use default db
})
_, err = redisdb.Ping().Result()
if err != nil {
log.Errorf("%v", err)
return err
}
return nil
}
func main() {
err := initClient()
if err != nil {
log.Errorf("init redis client err: %v", err)
return
}
addTaskToQueue("task1", time.Now().Add(time.Second*3).Unix())
addTaskToQueue("task2", time.Now().Add(time.Second*8).Unix())
//執(zhí)行隊(duì)列中的任務(wù)
getAndExecuteTask()
}
// executeTime為unix時(shí)間戳,作為zset中的score。允許redis按照task應(yīng)該執(zhí)行時(shí)間來(lái)進(jìn)行排序
func addTaskToQueue(task string, executeTime int64) {
err := redisdb.ZAdd(DelayQueueKey, redis.Z{
Score: float64(executeTime),
Member: task,
}).Err()
if err != nil {
panic(err)
}
}
// 從redis中取一個(gè)task并執(zhí)行
func getAndExecuteTask() {
for {
tasks, err := redisdb.ZRangeByScore(DelayQueueKey, redis.ZRangeBy{
Min: "-inf",
Max: fmt.Sprintf("%d", time.Now().Unix()),
Offset: 0,
Count: 1,
}).Result()
if err != nil {
time.Sleep(time.Second * 1)
continue
}
//處理任務(wù)
for _, task := range tasks {
fmt.Println("Execute task: ", task)
//執(zhí)行完任務(wù)之后用 ZREM 移除該任務(wù)
redisdb.ZRem(DelayQueueKey, task)
}
time.Sleep(time.Second * 1)
}
}
效果:
redis一直從延遲隊(duì)列中取數(shù)據(jù),如果處理完一批則睡眠1s
- 具體根據(jù)大家的業(yè)務(wù)調(diào)整,此處主要介紹思路

到此這篇關(guān)于golang實(shí)現(xiàn)延遲隊(duì)列(delay queue)的示例代碼的文章就介紹到這了,更多相關(guān)golang 延遲隊(duì)列(delay queue)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang使用sort接口實(shí)現(xiàn)排序示例
這篇文章主要介紹了golang使用sort接口實(shí)現(xiàn)排序的方法,簡(jiǎn)單分析了sort接口的功能并實(shí)例演示了基于sort接口的排序?qū)崿F(xiàn)方法,需要的朋友可以參考下2016-07-07
go mutex互斥鎖使用Lock和Unlock方法占有釋放資源
Go號(hào)稱是為了高并發(fā)而生的,在高并發(fā)場(chǎng)景下,勢(shì)必會(huì)涉及到對(duì)公共資源的競(jìng)爭(zhēng),當(dāng)對(duì)應(yīng)場(chǎng)景發(fā)生時(shí),我們經(jīng)常會(huì)使用 mutex 的 Lock() 和 Unlock() 方法來(lái)占有或釋放資源,雖然調(diào)用簡(jiǎn)單,但 mutex 的內(nèi)部卻涉及挺多的,本文來(lái)好好研究一下2023-09-09
Sublime Text3安裝Go語(yǔ)言相關(guān)插件gosublime時(shí)搜不到gosublime的解決方法
本文主要介紹了Sublime Text3安裝Go語(yǔ)言相關(guān)插件gosublime時(shí)搜不到gosublime的解決方法,具有一定的參考價(jià)值,感興趣的可以了解一下2022-01-01

