golang redigo發(fā)布訂閱使用的方法
redigo 對 發(fā)布訂閱的使用
redigo 對redis 的發(fā)布訂閱機制放在pubsub.go 中
訂閱主題后 通過Receive() 函數(shù)接受發(fā)布訂閱主題的消息
// Receive returns a pushed message as a Subscription, Message, Pong or error.
// The return value is intended to be used directly in a type switch as
// illustrated in the PubSubConn example.
func (c PubSubConn) Receive() interface{} {
return c.receiveInternal(c.Conn.Receive())
}
返回的是一個空類型的interface{} , 由于空接口沒有方法, 因此所有的類型都實現(xiàn)了空接口, 也就是說可以返回任意類型。
具體返回的類型 在receiveInternal() 方法里面可以看到
func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interface{} {
reply, err := Values(replyArg, errArg)
if err != nil {
return err
}
var kind string
reply, err = Scan(reply, &kind)
if err != nil {
return err
}
switch kind {
case "message":
var m Message
if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {
return err
}
return m
case "pmessage":
var m Message
if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil {
return err
}
return m
case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
s := Subscription{Kind: kind}
if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
return err
}
return s
case "pong":
var p Pong
if _, err := Scan(reply, &p.Data); err != nil {
return err
}
return p
}
return errors.New("redigo: unknown pubsub notification")
}
目前返回 Message Subscription Pone
訂閱的主題
收到消息之后通過注冊的回調(diào)函數(shù)處理的方式, 所以代碼中多了長map存放回調(diào)函數(shù)
package main
import (
//"github.com/go-redis/redis"
"fmt"
"github.com/labstack/gommon/log"
"github.com/gomodule/redigo/redis"
"time"
//"reflect"
"unsafe"
)
type SubscribeCallback func (channel, message string)
type Subscriber struct {
client redis.PubSubConn
cbMap map[string]SubscribeCallback
}
func (c *Subscriber) Connect(ip string, port uint16) {
conn, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Error("redis dial failed.")
}
c.client = redis.PubSubConn{conn}
c.cbMap = make(map[string]SubscribeCallback)
go func() {
for {
log.Info("wait...")
switch res := c.client.Receive().(type) {
case redis.Message:
channel := (*string)(unsafe.Pointer(&res.Channel))
message := (*string)(unsafe.Pointer(&res.Data))
c.cbMap[*channel](*channel, *message)
case redis.Subscription:
fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count)
case error:
log.Error("error handle...")
continue
}
}
}()
}
func (c *Subscriber) Close() {
err := c.client.Close()
if err != nil{
log.Error("redis close error.")
}
}
func (c *Subscriber) Subscribe(channel interface{}, cb SubscribeCallback) {
err := c.client.Subscribe(channel)
if err != nil{
log.Error("redis Subscribe error.")
}
c.cbMap[channel.(string)] = cb
}
func TestCallback1(chann, msg string){
log.Info("TestCallback1 channel : ", chann, " message : ", msg)
}
func TestCallback2(chann, msg string){
log.Info("TestCallback2 channel : ", chann, " message : ", msg)
}
func TestCallback3(chann, msg string){
log.Info("TestCallback3 channel : ", chann, " message : ", msg)
}
func main() {
log.Info("===========main start============")
var sub Subscriber
sub.Connect("127.0.0.1", 6397)
sub.Subscribe("test_chan1", TestCallback1)
sub.Subscribe("test_chan2", TestCallback2)
sub.Subscribe("test_chan3", TestCallback3)
for{ // 這段代碼的作用就是 阻止線程結束
time.Sleep(1 * time.Second)
}
}
運行main 文件 然后 看到

在redis 客戶端 執(zhí)行 發(fā)布信息

在控制臺 看到監(jiān)控結果 回調(diào)函數(shù) 執(zhí)行的結果

發(fā)布
上面是訂閱的代碼和 代碼要處理 的回調(diào)函數(shù)
發(fā)布直接使用默認的 Conn來Send Publish 就可以
redigo 的管道使用方法設計到三個函數(shù) Do 函數(shù)也是下面這個函數(shù)的合并
- c.Send()
- c.Flush()
- c.Receive()
解釋:
send() 方法吧命令寫到緩沖區(qū), flush() 把緩沖區(qū)的命令刷新到redis 服務器 receive() 函數(shù)接受redis 給予的 回應, 三個操作共同完成一套命令流程。
代碼
package main
import(
//"github.com/go-redis/redis"
"github.com/gomodule/redigo/redis"
log "github.com/astaxie/beego/logs"
)
func main() {
client, err := redis.Dial("tcp", "127.0.0.1:6379")
if err != nil {
log.Critical("redis dial failed.")
}
defer client.Close()
_, err = client.Do("Publish", "test_chan1", "hello")
if err != nil {
log.Critical("redis Publish failed.")
}
_, err = client.Do("Publish", "test_chan2", "hello")
if err != nil {
log.Critical("redis Publish failed.")
}
_, err = client.Do("Publish", "test_chan3", "hello")
if err != nil {
log.Critical("redis Publish failed.")
}
}
到此這篇關于golang redigo發(fā)布訂閱使用的方法的文章就介紹到這了,更多相關golang redigo發(fā)布訂閱內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Golang使用原生http實現(xiàn)中間件的代碼詳解
中間件(middleware):常被用來做認證校驗、審計等,家常用的Iris、Gin等web框架,都包含了中間件邏輯,但有時我們引入該框架顯得較為繁重,本文將介紹通過golang原生http來實現(xiàn)中間件操作,需要的朋友可以參考下2024-05-05
Go?處理大數(shù)組使用?for?range?和?for?循環(huán)的區(qū)別
這篇文章主要介紹了Go處理大數(shù)組使用for?range和for循環(huán)的區(qū)別,對于遍歷大數(shù)組而言,for循環(huán)能比for?range循環(huán)更高效與穩(wěn)定,這一點在數(shù)組元素為結構體類型更加明顯,下文具體分析感興趣得小伙伴可以參考一下2022-05-05

