golang gin 監(jiān)聽rabbitmq隊列無限消費的案例代碼
golang gin 監(jiān)聽rabbitmq隊列無限消費
連接rabbitmq
package database
import (
"github.com/streadway/amqp"
"log"
"reflect"
"yy-data-processing/common/config"
)
var RabbitConn *amqp.Connection
var RabbitChannel *amqp.Channel
func InitRabbitmq() {
var err error
RabbitConn, err = amqp.Dial(config.Config.RabbitUrl)
if err != nil {
log.Println("連接RabbitMQ失敗")
panic(err)
}
RabbitChannel, err = RabbitConn.Channel()
if err != nil {
log.Println("獲取RabbitMQ channel失敗")
panic(err)
}
}
// 0表示channel未關(guān)閉,1表示channel已關(guān)閉
func CheckRabbitClosed(ch amqp.Channel) int64 {
d := reflect.ValueOf(ch)
i := d.FieldByName("closed").Int()
return i
}創(chuàng)建生產(chǎn)者
package service
import (
"encoding/json"
"github.com/streadway/amqp"
"log"
"yy-data-processing/common/config"
"yy-data-processing/common/database"
"yy-data-processing/model"
)
func Producer() {
// 聲明隊列,沒有則創(chuàng)建
// 隊列名稱、是否持久化、所有消費者與隊列斷開時是否自動刪除隊列、是否獨享(不同連接的channel能否使用該隊列)
declare, err := database.RabbitChannel.QueueDeclare(config.Config.HawkSaveQueueName, true, false, false, false, nil)
if err != nil {
log.Printf("聲明隊列 %v 失敗, error: %v", config.Config.HawkSaveQueueName, err)
panic(err)
}
request := model.Request{}
marshal, _ := json.Marshal(request )
// exchange、routing key、mandatory、immediate
err = database.RabbitChannel.Publish("", declare.Name, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(marshal),
})
if err != nil {
log.Printf("生產(chǎn)者發(fā)送消息失敗, error: %v", err)
} else {
log.Println("生產(chǎn)者發(fā)送消息成功")
}
}
創(chuàng)建消費者
package service
import (
"encoding/json"
"log"
"os"
"strings"
"sync"
"time"
"yy-data-processing/common/config"
"yy-data-processing/common/database"
"yy-data-processing/model"
)
func Consumer() {
// 聲明隊列,沒有則創(chuàng)建
// 隊列名稱、是否持久化、所有消費者與隊列斷開時是否自動刪除隊列、是否獨享(不同連接的channel能否使用該隊列)
_, err := database.RabbitChannel.QueueDeclare(config.Config.QueueName, true, false, false, false, nil)
if err != nil {
log.Printf("聲明隊列 %v 失敗, error: %v", config.Config.QueueName, err)
panic(err)
}
// 隊列名稱、consumer、auto-ack、是否獨享
// deliveries是一個管道,有消息到隊列,就會消費,消費者的消息只需要從deliveries這個管道獲取
deliveries, err := database.RabbitChannel.Consume(config.Config.QueueName, "", true, false, false, false, nil)
if err != nil {
log.Printf("從隊列 %v 獲取數(shù)據(jù)失敗, error: %v", config.Config.QueueName, err)
} else {
log.Println("從消費隊列獲取任務(wù)成功")
}
// 阻塞住
for {
select {
case message := <-deliveries:
closed := database.CheckRabbitClosed(*database.RabbitChannel)
if closed == 1 { // channel 已關(guān)閉,重連一下
database.InitRabbitmq()
} else {
msgData := string(message.Body)
request := model.Request{}
err := json.Unmarshal([]byte(msgData), &request)
if err != nil {
log.Printf("解析rabbitmq數(shù)據(jù) %v 失敗, error: %v", msgData, err)
} else {
// TODO...
// 處理邏輯
}
}
}
}
}
main方法協(xié)程調(diào)用
package main
import (
"log"
"yy-data-processing/common/config"
"yy-data-processing/common/database"
"yy-data-processing/router"
"yy-data-processing/service"
)
func main() {
// 初始化路由
routers := router.InitRouters()
// 初始化RabbitMQ
database.InitRabbitmq()
go service.Producer()
go service.Consumer()
port := config.Config.Port
if err := routers.Run(":" + port); err != nil {
log.Printf("啟動服務(wù)失敗: ", err)
}
}到此這篇關(guān)于golang gin 監(jiān)聽rabbitmq隊列無限消費的文章就介紹到這了,更多相關(guān)golang監(jiān)聽rabbitmq內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Golang 統(tǒng)計字符串字數(shù)的方法示例
本篇文章主要介紹了Golang 統(tǒng)計字符串字數(shù)的方法示例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-05-05
Go?Gin框架優(yōu)雅重啟和停止實現(xiàn)方法示例
Web應(yīng)用程序中,有時需要重啟或停止服務(wù)器,無論是因為更新代碼還是進行例行維護,這時需要保證應(yīng)用程序的可用性和數(shù)據(jù)的一致性,就需要優(yōu)雅地關(guān)閉和重啟應(yīng)用程序,即不丟失正在處理的請求和不拒絕新的請求,本文將詳解如何在Go語言中使用Gin這個框架實現(xiàn)優(yōu)雅的重啟停止2024-01-01
Go語言開發(fā)kube-scheduler整體架構(gòu)深度剖析
這篇文章主要為大家介紹了Go語言開發(fā)kube-scheduler整體架構(gòu)深度剖析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-04-04
Go語言中三種容器類型的數(shù)據(jù)結(jié)構(gòu)詳解
在?Go?語言中,有三種主要的容器類型用于存儲和操作集合數(shù)據(jù)這篇文章主要為大家介紹了三者的使用與區(qū)別,感興趣的小伙伴可以跟隨小編一起學習一下2025-02-02
Golang 定時器(Timer 和 Ticker),這篇文章就夠了
這篇文章主要介紹了Golang 定時器(Timer 和 Ticker),這篇文章就夠了,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-10-10

