???????Golang實(shí)現(xiàn)RabbitMQ中死信隊(duì)列幾種情況
下面這段教程針對是你已經(jīng)有一些基本的MQ的知識(shí),比如說能夠很清楚的理解queue、exchange等概念,如果你還不是很理解,我建議你先訪問官網(wǎng)查看基本的教程。
1、造成死信隊(duì)列的主要原因
- 消費(fèi)者超時(shí)未應(yīng)答
- 隊(duì)列的容量有限
- 消費(fèi)者拒絕了的消息
2、操作邏輯圖

3、代碼實(shí)戰(zhàn)
其實(shí)整體的思路就是分別創(chuàng)建一個(gè)normal_exchange、dead_exchange、normal_queue、dead_queue,然后將normal_exchange與normal_queue進(jìn)行綁定,將dead_exchange與dead_queue進(jìn)行綁定,這里比較關(guān)鍵的一個(gè)點(diǎn)在于說如何將normal_queue與dead_exchange進(jìn)行綁定,這樣才能將錯(cuò)誤的消息傳遞過來。下面就是這段代碼的關(guān)鍵。
// 聲明一個(gè)normal隊(duì)列
_, err = ch.QueueDeclare(
constant.NormalQueue,
true,
false,
false,
false,
amqp.Table{
//"x-message-ttl": 5000, // 指定過期時(shí)間
//"x-max-length": 6, // 指定長度。超過這個(gè)長度的消息會(huì)發(fā)送到dead_exchange中
"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機(jī)
"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
})
3.1 針對原因1:消費(fèi)者超出時(shí)間未應(yīng)答
consumer1.go
package day07
import (
?? ?amqp "github.com/rabbitmq/amqp091-go"
?? ?"log"
?? ?"v1/utils"
)
type Constant struct {
?? ?NormalExchange ? string
?? ?DeadExchange ? ? string
?? ?NormalQueue ? ? ?string
?? ?DeadQueue ? ? ? ?string
?? ?NormalRoutingKey string
?? ?DeadRoutingKey ? string
}
func Consumer1() {
?? ?// 獲取連接
?? ?ch := utils.GetChannel()
?? ?// 創(chuàng)建一個(gè)變量常量
?? ?constant := Constant{
?? ??? ?NormalExchange: ? "normal_exchange",
?? ??? ?DeadExchange: ? ? "dead_exchange",
?? ??? ?NormalQueue: ? ? ?"normal_queue",
?? ??? ?DeadQueue: ? ? ? ?"dead_queue",
?? ??? ?NormalRoutingKey: "normal_key",
?? ??? ?DeadRoutingKey: ? "dead_key",
?? ?}
?? ?// 聲明normal交換機(jī)
?? ?err := ch.ExchangeDeclare(
?? ??? ?constant.NormalExchange,
?? ??? ?amqp.ExchangeDirect,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil,
?? ?)
?? ?utils.FailOnError(err, "Failed to declare a normal exchange")
?? ?// 聲明一個(gè)dead交換機(jī)
?? ?err = ch.ExchangeDeclare(
?? ??? ?constant.DeadExchange,
?? ??? ?amqp.ExchangeDirect,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil,
?? ?)
?? ?utils.FailOnError(err, "Failed to declare a dead exchange")
?? ?// 聲明一個(gè)normal隊(duì)列
?? ?_, err = ch.QueueDeclare(
?? ??? ?constant.NormalQueue,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?amqp.Table{
?? ??? ??? ?"x-message-ttl": 5000, // 指定過期時(shí)間
?? ??? ??? ?//"x-max-length": ? ? ? ? ? ? ?6,
?? ??? ??? ?"x-dead-letter-exchange": ? ?constant.DeadExchange, ? // 指定死信交換機(jī)
?? ??? ??? ?"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
?? ??? ?})
?? ?utils.FailOnError(err, "Failed to declare a normal queue")
?? ?// 聲明一個(gè)dead隊(duì)列:注意不要給死信隊(duì)列設(shè)置消息時(shí)間,否者死信隊(duì)列里面的信息會(huì)再次過期
?? ?_, err = ch.QueueDeclare(
?? ??? ?constant.DeadQueue,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil)
?? ?utils.FailOnError(err, "Failed to declare a dead queue")
?? ?// 將normal_exchange與normal_queue進(jìn)行綁定
?? ?err = ch.QueueBind(constant.NormalQueue, constant.NormalRoutingKey, constant.NormalExchange, false, nil)
?? ?utils.FailOnError(err, "Failed to binding normal_exchange with normal_queue")
?? ?// 將dead_exchange與dead_queue進(jìn)行綁定
?? ?err = ch.QueueBind(constant.DeadQueue, constant.DeadRoutingKey, constant.DeadExchange, false, nil)
?? ?utils.FailOnError(err, "Failed to binding dead_exchange with dead_queue")
?? ?// 消費(fèi)消息
?? ?msgs, err := ch.Consume(constant.NormalQueue,
?? ??? ?"",
?? ??? ?false, // 這個(gè)地方一定要關(guān)閉自動(dòng)應(yīng)答
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil)
?? ?utils.FailOnError(err, "Failed to consume in Consumer1")
?? ?var forever chan struct{}
?? ?go func() {
?? ??? ?for d := range msgs {
?? ??? ??? ?if err := d.Reject(false); err != nil {
?? ??? ??? ??? ?utils.FailOnError(err, "Failed to Reject a message")
?? ??? ??? ?}
?? ??? ?}
?? ?}()
?? ?log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
?? ?<-forever
}consumer2.go
package day07
import (
?? ?amqp "github.com/rabbitmq/amqp091-go"
?? ?"log"
?? ?"v1/utils"
)
func Consumer2() {
?? ?// 拿取信道
?? ?ch := utils.GetChannel()
?? ?// 聲明一個(gè)交換機(jī)
?? ?err := ch.ExchangeDeclare(
?? ??? ?"dead_exchange",
?? ??? ?amqp.ExchangeDirect,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil)
?? ?utils.FailOnError(err, "Failed to Declare a exchange")
?? ?// 接收消息的應(yīng)答
?? ?msgs, err := ch.Consume("dead_queue",
?? ??? ?"",
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil,
?? ?)
?? ?var forever chan struct{}
?? ?go func() {
?? ??? ?for d := range msgs {
?? ??? ??? ?log.Printf("[x] %s", d.Body)
?? ??? ??? ?// 開啟手動(dòng)應(yīng)答?
?? ??? ??? ?d.Ack(false)
?? ??? ?}
?? ?}()
?? ?log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
?? ?<-forever
}produce.go
package day07
import (
?? ?"context"
?? ?amqp "github.com/rabbitmq/amqp091-go"
?? ?"strconv"
?? ?"time"
?? ?"v1/utils"
)
func Produce() {
?? ?// 獲取信道
?? ?ch := utils.GetChannel()
?? ?// 聲明一個(gè)交換機(jī)
?? ?err := ch.ExchangeDeclare(
?? ??? ?"normal_exchange",
?? ??? ?amqp.ExchangeDirect,
?? ??? ?true,
?? ??? ?false,
?? ??? ?false,
?? ??? ?false,
?? ??? ?nil)
?? ?utils.FailOnError(err, "Failed to declare a exchange")
?? ?ctx, cancer := context.WithTimeout(context.Background(), 5*time.Second)
?? ?defer cancer()
?? ?// 發(fā)送了10條消息
?? ?for i := 0; i < 10; i++ {
?? ??? ?msg := "Info:" + strconv.Itoa(i)
?? ??? ?ch.PublishWithContext(ctx,
?? ??? ??? ?"normal_exchange",
?? ??? ??? ?"normal_key",
?? ??? ??? ?false,
?? ??? ??? ?false,
?? ??? ??? ?amqp.Publishing{
?? ??? ??? ??? ?ContentType: "text/plain",
?? ??? ??? ??? ?Body: ? ? ? ?[]byte(msg),
?? ??? ??? ?})
?? ?}
}3.2 針對原因2:限制一定的長度
只需要改變consumer1.go中的對normal_queue的聲明
// 聲明一個(gè)normal隊(duì)列
_, err = ch.QueueDeclare(
constant.NormalQueue,
true,
false,
false,
false,
amqp.Table{
//"x-message-ttl": 5000, // 指定過期時(shí)間
"x-max-length": 6,
"x-dead-letter-exchange": constant.DeadExchange, // 指定死信交換機(jī)
"x-dead-letter-routing-key": constant.DeadRoutingKey, // 指定死信routing-key
})
3.3 針對原因3:消費(fèi)者拒絕的消息回到死信隊(duì)列中
這里需要完成兩點(diǎn)工作
工作1:需要在consumer1中作出拒絕的操作
go func() {
for d := range msgs {
if err := d.Reject(false); err != nil {
utils.FailOnError(err, "Failed to Reject a message")
}
}
}()
工作2:如果你consume的時(shí)候開啟了自動(dòng)應(yīng)答一定要關(guān)閉
// 消費(fèi)消息
msgs, err := ch.Consume(constant.NormalQueue,
"",
false, // 這個(gè)地方一定要關(guān)閉自動(dòng)應(yīng)答
false,
false,
false,
nil)
其他的部分不需要改變,按照問題1中的設(shè)計(jì)即可。
到此這篇關(guān)于Golang實(shí)現(xiàn)RabbitMQ中死信隊(duì)列幾種情況的文章就介紹到這了,更多相關(guān)???????Golang RabbitMQ死信隊(duì)列內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
go本地環(huán)境配置及vscode go插件安裝的詳細(xì)教程
這篇文章主要介紹了go本地環(huán)境配置及vscode go插件安裝的詳細(xì)教程,本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05
golang MarshalJson的實(shí)現(xiàn)
本文主要介紹了golang MarshalJson的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2025-03-03
Go?中?time.After?可能導(dǎo)致的內(nèi)存泄露問題解析
這篇文章主要介紹了Go?中?time.After?可能導(dǎo)致的內(nèi)存泄露,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-05-05
Go語言并發(fā)編程之控制并發(fā)數(shù)量實(shí)現(xiàn)實(shí)例
這篇文章主要為大家介紹了Go語言并發(fā)編程之控制并發(fā)數(shù)量實(shí)例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01
Go語言append切片添加元素的實(shí)現(xiàn)
本文主要介紹了Go語言append切片添加元素的實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-04-04
Golang 實(shí)現(xiàn)分片讀取http超大文件流和并發(fā)控制
這篇文章主要介紹了Golang 實(shí)現(xiàn)分片讀取http超大文件流和并發(fā)控制,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-12-12
Go語言for-range函數(shù)使用技巧實(shí)例探究
這篇文章主要為大家介紹了Go語言for-range函數(shù)使用技巧實(shí)例探究,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2024-01-01

