RabbitMq如何做到消息的可靠性投遞
如何保證消息不丟失
在使用RabbitMQ的時(shí)候,我們需要保證消息不能丟失,消息從生產(chǎn)者生產(chǎn)出來一直到消費(fèi)者消費(fèi)成功,這條鏈路是這樣的:

消息的可靠投遞分為了兩大內(nèi)容:發(fā)送端的確認(rèn)(p->broker和exchange->queue)和消費(fèi)端的確認(rèn)(queue->c)。
發(fā)送端的確認(rèn)
Rabbit提供了兩種方式來保證發(fā)送端的消息可靠性投遞:confirm 確認(rèn)模式
和return 退回模式。
confirm 確認(rèn)模式:消息從 producer 到達(dá) exchange 則會給 producer 發(fā)送一個應(yīng)答,我們需要開啟confirm模式,才能接收到這條應(yīng)答。開啟方式是將Channel.Confirm(noWait bool)參數(shù)設(shè)置為false,表示同意發(fā)送者將當(dāng)前channel信道設(shè)置為confirm模式。
return 退回模式:消息從 exchange–>queue 投遞失敗,會將消息退回給producer。
消費(fèi)端的確認(rèn)
消息從Queue發(fā)送到消費(fèi)端之后,消費(fèi)端會發(fā)送一個確認(rèn)消息:Consumer Ack,有兩種確認(rèn)方式:自動確認(rèn)和手動確認(rèn)。
在編碼中,關(guān)于消息的確認(rèn)方式,我們需要在消費(fèi)者端調(diào)用Consumer函數(shù)時(shí),設(shè)置第三個參數(shù):autoAck是false還是true(false表示手動,true表示自動)。
自動確認(rèn)是指,當(dāng)消息一旦被Consumer接收到,則自動確認(rèn)收到,并將相應(yīng) message 從 RabbitMQ 的消息緩存中移除。
但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會丟失。如果設(shè)置了手動確認(rèn)方式,則需要在業(yè)務(wù)處理成功后,調(diào)用ch.Ack(false),手動簽收,如果出現(xiàn)異常,則調(diào)用d.Reject(true)讓其自動重新發(fā)送消息。
Go 實(shí)現(xiàn)
安裝操作庫
安裝API庫
Go可以使用streadway/amqp庫來操作rabbit,使用以下命令來安裝:
go get github.com/streadway/amqp
封裝rabbitmq
接下來我們對streadway/amqp庫的內(nèi)容進(jìn)行一個二次封裝,封裝為一個rabbitmq.go文件:
package rabbitmq
import (
"encoding/json"
"github.com/streadway/amqp"
"log"
)
// RabbitMQ RabbitMQ結(jié)構(gòu)
type RabbitMQ struct {
channel *amqp.Channel
Name string
exchange string
}
// Connect 連接服務(wù)器
func Connect(s string) *RabbitMQ {
//連接rabbitmq
conn, e := amqp.Dial(s)
failOnError(e, "連接Rabbitmq服務(wù)器失敗!")
ch, e := conn.Channel()
failOnError(e, "無法打開頻道!")
mq := new(RabbitMQ)
mq.channel = ch
return mq
}
// New 初始化消息隊(duì)列
//第一個參數(shù):rabbitmq服務(wù)器的鏈接,第二個參數(shù):隊(duì)列名字
func New(s string, name string) *RabbitMQ {
//連接rabbitmq
conn, e := amqp.Dial(s)
failOnError(e, "連接Rabbitmq服務(wù)器失??!")
ch, e := conn.Channel()
failOnError(e, "無法打開頻道!")
q, e := ch.QueueDeclare(
name, //隊(duì)列名
false, //是否開啟持久化
true, //不使用時(shí)刪除
false, //排他
false, //不等待
nil, //參數(shù)
)
failOnError(e, "初始化消息隊(duì)列失??!")
mq := new(RabbitMQ)
mq.channel = ch
mq.Name = q.Name
return mq
}
// QueueDeclare 聲明queue
func (q *RabbitMQ) QueueDeclare(queue string) {
_, e := q.channel.QueueDeclare(queue, false, true, false, false, nil)
failOnError(e, "聲明queue失??!")
}
// QueueDelete 刪除queue
func (q *RabbitMQ) QueueDelete(queue string) {
_, e := q.channel.QueueDelete(queue, false, true, false)
failOnError(e, "刪除queue失??!")
}
// Qos 配置queue參數(shù)
func (q *RabbitMQ) Qos() {
e := q.channel.Qos(1, 0, false)
failOnError(e, "無法設(shè)置QoS")
}
// NewExchange 初始化交換機(jī)
//第一個參數(shù):rabbitmq服務(wù)器的鏈接,第二個參數(shù):交換機(jī)名字,第三個參數(shù):交換機(jī)類型
func NewExchange(s string, name string, typename string) {
//連接rabbitmq
conn, e := amqp.Dial(s)
failOnError(e, "連接Rabbitmq服務(wù)器失??!")
ch, e := conn.Channel()
failOnError(e, "無法打開頻道!")
e = ch.ExchangeDeclare(
name, // name
typename, // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(e, "初始化交換機(jī)失敗!")
}
// ExchangeDelete 刪除交換機(jī)
func (q *RabbitMQ) ExchangeDelete(exchange string) {
e := q.channel.ExchangeDelete(exchange, false, true)
failOnError(e, "刪除交換機(jī)失??!")
}
// Bind 綁定消息隊(duì)列到exchange
func (q *RabbitMQ) Bind(exchange string, key string) {
e := q.channel.QueueBind(
q.Name,
key,
exchange,
false,
nil,
)
failOnError(e, "綁定隊(duì)列失??!")
q.exchange = exchange
}
// Send 向消息隊(duì)列發(fā)送消息
//Send方法可以往某個消息隊(duì)列發(fā)送消息
func (q *RabbitMQ) Send(queue string, body interface{}) {
str, e := json.Marshal(body)
failOnError(e, "消息序列化失敗!")
e = q.channel.Publish(
"", //交換
queue, //路由鍵
false, //必填
false, //立即
amqp.Publishing{
ReplyTo: q.Name,
Body: []byte(str),
})
msg := "向隊(duì)列:" + q.Name + "發(fā)送消息失??!"
failOnError(e, msg)
}
// Publish 向exchange發(fā)送消息
//Publish方法可以往某個exchange發(fā)送消息
func (q *RabbitMQ) Publish(exchange string, body interface{}, key string) {
str, e := json.Marshal(body)
failOnError(e, "消息序列化失??!")
e = q.channel.Publish(
exchange,
key,
false,
false,
amqp.Publishing{ReplyTo: q.Name,
Body: []byte(str)},
)
failOnError(e, "向交換機(jī)發(fā)送消息失敗!")
}
// Consume 接收某個消息隊(duì)列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
c, e := q.channel.Consume(
q.Name, //指定從哪個隊(duì)列中接收消息
"",
true,
false,
false,
false,
nil,
)
failOnError(e, "接收消息失敗!")
return c
}
// Close 關(guān)閉隊(duì)列連接
func (q *RabbitMQ) Close() {
q.channel.Close()
}
//錯誤處理函數(shù)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}發(fā)送端的確認(rèn)
首先初始化消息隊(duì)列的時(shí)候,我們要開啟confirm模式,才能接收到這條應(yīng)答。開啟方式是將Channel.Confirm(noWait bool)參數(shù)設(shè)置為false,表示同意發(fā)送者將當(dāng)前channel信道設(shè)置為confirm模式。
func New(s string, name string) *RabbitMQ {
conn, e := amqp.Dial(s)
failOnError(e, "連接Rabbitmq服務(wù)器失??!")
ch, e := conn.Channel()
failOnError(e, "無法打開頻道!")
q, e := ch.QueueDeclare(
name, //隊(duì)列名
false, //是否開啟持久化
true, //不使用時(shí)刪除
false, //排他
false, //不等待
nil, //參數(shù)
)
failOnError(e, "初始化消息隊(duì)列失??!")
mq := new(RabbitMQ)
mq.channel = ch
mq.Name = q.Name
// 設(shè)置為confirm模式
mq.channel.Confirm(false)
return mq
}
然后在封裝庫中創(chuàng)建一個函數(shù)handleConfirm()用于接收來自Borker的回復(fù):
func (q *RabbitMQ) ConfirmFromBroker(ch chan amqp.Confirmation) chan amqp.Confirmation {
return q.channel.NotifyPublish(ch)
}
生產(chǎn)者
生產(chǎn)者端在向Broker發(fā)送消息的時(shí)候,我們使用一個無緩沖的通道來接收來自Broker的回復(fù),然后創(chuàng)建一個協(xié)程監(jiān)聽這個無緩沖通道。
func main() {
producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
// 指定為topic類型
rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange1", "fanout")
confirm := producer.ConfirmFromBroker(make(chan amqp.Confirmation))
go handleConfirm(confirm)
var i int
for {
time.Sleep(time.Second)
producer.Publish("exchange1", "fanout message: "+strconv.Itoa(i), "")
i++
}
}
func handleConfirm(confirm <-chan amqp.Confirmation) {
for {
select {
case message := <-confirm:
fmt.Println("接收到來自Broker的回復(fù):", message)
}
}
}
運(yùn)行結(jié)果:
接收到來自Broker的回復(fù): {1 true}
接收到來自Broker的回復(fù): {2 true}
接收到來自Broker的回復(fù): {3 true}
接收到來自Broker的回復(fù): {4 true}
接收到來自Broker的回復(fù): {5 true}
消費(fèi)端的確認(rèn)
首先將Consume函數(shù)的第三個參數(shù)autoAck參數(shù)標(biāo)記為false:
// Consume 接收某個消息隊(duì)列的消息
func (q *RabbitMQ) Consume() <-chan amqp.Delivery {
c, e := q.channel.Consume(
q.Name,
"",
false, // 不自動確認(rèn)消息
false,
false,
false,
nil,
)
failOnError(e, "接收消息失??!")
return c
}
在消費(fèi)者端我們采用公平派遣模式,即隊(duì)列發(fā)送消息給消費(fèi)者的時(shí)候,不再采用輪詢機(jī)制,而是一個消費(fèi)者消費(fèi)完消息之后,會調(diào)用Ack(false)函數(shù)向隊(duì)列發(fā)送一個回復(fù),隊(duì)列每次會將消息優(yōu)先發(fā)送給消費(fèi)完消息的消費(fèi)者(回復(fù)過)。
消費(fèi)端限流:
實(shí)現(xiàn)公平派遣模式我們需要設(shè)置消費(fèi)者端一次只能消費(fèi)一條消息,之前我們已經(jīng)進(jìn)行了封裝,直接在消費(fèi)者端調(diào)用即可:
// Qos 配置queue參數(shù)
func (q *RabbitMQ) Qos() {
e := q.channel.Qos(1, 0, false)
failOnError(e, "無法設(shè)置QoS")
}
生產(chǎn)者
func main() {
producer := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue")
// 指定為direct類型
rabbitmq.NewExchange("amqp://guest:guest@35.76.111.125:5672/", "exchange", "direct")
i := 0
for {
time.Sleep(time.Second)
producer.Publish("exchange", "routing message: "+strconv.Itoa(i), "key1")
i = i + 1
}
}
消費(fèi)者1
消費(fèi)者2在消費(fèi)第三條消息的時(shí)候,假設(shè)發(fā)生了錯誤,我們調(diào)用d.Reject(true)函數(shù)讓隊(duì)列重新發(fā)送消息。
func main() {
//第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊(duì)列的名字
consumer1 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
// 指定一次只消費(fèi)一條消息,直到消費(fèi)完才重新接收
consumer1.Qos()
// 隊(duì)列綁定到exchange
consumer1.Bind("exchange", "key1")
//接收消息
msgs := consumer1.Consume()
go func() {
var i int
for d := range msgs {
time.Sleep(time.Second * 1)
log.Printf("Consumer1 received a message: %s", d.Body)
// 假設(shè)消費(fèi)第三條消息的時(shí)候出現(xiàn)了錯誤,我們就調(diào)用d.Reject(true),隊(duì)列會重新發(fā)送消息給消費(fèi)者
if i == 2 {
d.Reject(true)
} else {
// 消息消費(fèi)成功之后就回復(fù)
d.Ack(false)
}
i++
}
}()
select {}
}
消費(fèi)者2
func main() {
//第一個參數(shù)指定rabbitmq服務(wù)器的鏈接,第二個參數(shù)指定創(chuàng)建隊(duì)列的名字
consumer2 := rabbitmq.New("amqp://guest:guest@35.76.111.125:5672/", "queue1")
// 指定一次只消費(fèi)一條消息,直到消費(fèi)完才重新接收
consumer2.Qos()
// 隊(duì)列綁定到exchange
consumer2.Bind("exchange", "key1")
//接收消息
msgs := consumer2.Consume()
go func() {
for d := range msgs {
time.Sleep(time.Second * 5)
log.Printf("Consumer2 received a message: %s", d.Body)
// 消息消費(fèi)成功之后就回復(fù)
d.Ack(false)
}
}()
select {}
}
運(yùn)行結(jié)果:
# 消費(fèi)者1
2022/11/06 19:55:08 Consumer1 received a message: "routing message: 0"
2022/11/06 19:55:10 Consumer1 received a message: "routing message: 2"
2022/11/06 19:55:11 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:12 Consumer1 received a message: "routing message: 3"
2022/11/06 19:55:13 Consumer1 received a message: "routing message: 4"
2022/11/06 19:55:14 Consumer1 received a message: "routing message: 6"# 消費(fèi)者2
2022/11/06 19:55:13 Consumer2 received a message: "routing message: 1"
到此這篇關(guān)于RabbitMq如何做到消息的可靠性投遞的文章就介紹到這了,更多相關(guān)RabbitMq消息可靠性投遞內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
golang封裝一個執(zhí)行命令行的函數(shù)(return?stderr/stdout/exitcode)示例代碼
在?Go?語言中,您可以使用?os/exec?包來執(zhí)行外部命令,不通過調(diào)用?shell,并且能夠獲得進(jìn)程的退出碼、標(biāo)準(zhǔn)輸出和標(biāo)準(zhǔn)錯誤輸出,下面給大家分享golang封裝一個執(zhí)行命令行的函數(shù)(return?stderr/stdout/exitcode)的方法,感興趣的朋友跟隨小編一起看看吧2024-06-06
關(guān)于golang監(jiān)聽rabbitmq消息隊(duì)列任務(wù)斷線自動重連接的問題
這篇文章主要介紹了golang監(jiān)聽rabbitmq消息隊(duì)列任務(wù)斷線自動重連接,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2022-03-03
Go gorilla securecookie庫的安裝使用詳解
這篇文章主要介紹了Go gorilla securecookie庫的安裝使用詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-08-08
Golang中HTTP服務(wù)的分析與設(shè)計(jì)詳解
這篇文章主要介紹了Golang中HTTP服務(wù)的分析與設(shè)計(jì),HTTP服務(wù)是實(shí)現(xiàn)Web應(yīng)用程序的重要組成部分,為了實(shí)現(xiàn)高效可擴(kuò)展的Web應(yīng)用程序,需要對HTTP服務(wù)進(jìn)行分析與設(shè)計(jì),需要的朋友可以參考下2023-05-05
Go|使用Options模式和建造者模式創(chuàng)建對象實(shí)戰(zhàn)
這篇文章主要介紹了Go使用Options模式和建造者模式創(chuàng)建對象實(shí)戰(zhàn)詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04
Go語言中使用flag包對命令行進(jìn)行參數(shù)解析的方法
這篇文章主要介紹了Go語言中使用flag包對命令行進(jìn)行參數(shù)解析的方法,文中舉了一個實(shí)現(xiàn)flag.Value接口來自定義flag的例子,需要的朋友可以參考下2016-04-04
詳解Go多協(xié)程并發(fā)環(huán)境下的錯誤處理
這篇文章主要介紹了詳解Go多協(xié)程并發(fā)環(huán)境下的錯誤處理,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08

