go語(yǔ)言實(shí)現(xiàn)mqtt協(xié)議的實(shí)踐
一、什么是MQTT
MQTT(Message Queuing Telemetry Transport,消息隊(duì)列遙測(cè)傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的“輕量級(jí)”通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上,由IBM在1999年發(fā)布。
MQTT最大優(yōu)點(diǎn)在于,可以以極少的代碼和有限的帶寬,為連接遠(yuǎn)程設(shè)備提供實(shí)時(shí)可靠的消息服務(wù)。作為一種低開銷、低帶寬占用的即時(shí)通訊協(xié)議,使其在物聯(lián)網(wǎng)、小型設(shè)備、移動(dòng)應(yīng)用等方面有較廣泛的應(yīng)用
MQTT是一個(gè)基于客戶端-服務(wù)器的消息發(fā)布/訂閱傳輸協(xié)議。MQTT協(xié)議是輕量、簡(jiǎn)單、開放和易于實(shí)現(xiàn)的,這些特點(diǎn)使它適用范圍非常廣泛。在很多情況下,包括受限的環(huán)境中,如:機(jī)器與機(jī)器(M2M)通信和物聯(lián)網(wǎng)(IoT)。其在,通過(guò)衛(wèi)星鏈路通信傳感器、偶爾撥號(hào)的醫(yī)療設(shè)備、智能家居、及一些小型化設(shè)備中已廣泛使用
MQTT還有一個(gè)特點(diǎn)就是客戶端之間不用相互通信, MQTT通信更像是郵箱服務(wù),發(fā)布者發(fā)布消息到服務(wù)器,接收者只要訂閱了其服務(wù)在線后即可收到
實(shí)現(xiàn)MQTT協(xié)議需要客戶端和服務(wù)器端通訊完成,在通訊過(guò)程中,MQTT協(xié)議中有三種身份:發(fā)布者(Publish)、代理(Broker)(服務(wù)器)、訂閱者(Subscribe)。其中,消息的發(fā)布者和訂閱者都是客戶端,消息代理是服務(wù)器,消息發(fā)布者可以同時(shí)是訂閱者。
MQTT傳輸?shù)南⒎譃椋褐黝}(Topic)和負(fù)載(payload)兩部分:
(1)Topic,可以理解為消息的類型,訂閱者訂閱(Subscribe)后,就會(huì)收到該主題的消息內(nèi)容(payload);
(2)payload,可以理解為消息的內(nèi)容,是指訂閱者具體要使用的內(nèi)容。
Topic就是消息名,payload就是消息體
MQTT會(huì)構(gòu)建底層網(wǎng)絡(luò)傳輸:它將建立客戶端到服務(wù)器的連接,提供兩者之間的一個(gè)有序的、無(wú)損的、基于字節(jié)流的雙向傳輸。
當(dāng)應(yīng)用數(shù)據(jù)通過(guò)MQTT網(wǎng)絡(luò)發(fā)送時(shí),MQTT會(huì)把與之相關(guān)的**服務(wù)質(zhì)量(QoS)和主題名(Topic)**相關(guān)連。
二、Go語(yǔ)言MQTT服務(wù)器Broker的搭建
服務(wù)端用erlang編寫的一個(gè)開源項(xiàng)目:emqqtd
# 下載安裝包 wget https://github.com/emqx/emqx/releases/download/v4.0.4/emqx-ubuntu18.04-v4.0.4.zip cd mqttd/emqx . ├── bin ├── data ├── erts-10.5.2 ├── etc ├── lib ├── log └── releases # 開啟服務(wù) ./bin/emqx start # 查看狀態(tài) ./bin/emqx_ctl status # 停止服務(wù) ./bin/emqx stop
找到自己的IP,訪問(wèn)http://[你的IP]:18083/#/clients
- 用戶名:admin
- 密碼:public
即可進(jìn)入服務(wù)器的控制臺(tái)

三、Go客戶端訪問(wèn)簡(jiǎn)單API
客戶端用golang客戶端的庫(kù):“github.com/eclipse/paho.mqtt.golang”
# 下載依賴包 go get -u github.com/eclipse/paho.mqtt.golang
實(shí)例如下:
編寫了兩個(gè)函數(shù)一個(gè)發(fā)布一個(gè)訂閱,傳入?yún)?shù)即可服務(wù)
修改EMQServerAddress為你服務(wù)器的IP
package main
// 與后端mqtt服務(wù)交互
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"log"
"os"
"strconv"
"time"
)
const EMQServerAddress = "你的IP"
// 創(chuàng)建全局mqtt publish消息處理 handler
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Println("Push Message:")
fmt.Printf("TOPIC: %s\n", msg.Topic())
fmt.Printf("MSG: %s\n", msg.Payload())
}
// 創(chuàng)建全局mqtt sub消息處理 handler
var messageSubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Println("收到訂閱消息:")
fmt.Printf("Sub Client Topic : %s \n", msg.Topic())
fmt.Printf("Sub Client msg : %s \n", msg.Payload())
}
// 連接的回掉函數(shù)
var connectHandler mqtt.OnConnectHandler =func(client mqtt.Client) {
fmt.Println("新的連接!" + " Connected")
}
// 丟失連接的回掉函數(shù)
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect loss: %v\n", err)
}
func init() {
// 配置錯(cuò)誤提示
mqtt.DEBUG = log.New(os.Stdout, " [mqttDEBUG]", 0)
mqtt.ERROR = log.New(os.Stdout, " [mqttERROR]", 0)
}
/**
* @Description: 發(fā)布訂閱
* @param clientID
* @param addr
* @param topic
* @param payload
*/
func Push(topic string, qos byte, retain bool, payload string) {
// opts ClientOptions 用于設(shè)置 broker,端口,客戶端 id ,用戶名密碼等選項(xiàng)
opts := mqtt.NewClientOptions().AddBroker("tcp://" + EMQServerAddress + ":1883").SetClientID("test_push")
opts.SetKeepAlive(60 * time.Second)
// Message callback handler,在沒有任何訂閱時(shí),發(fā)布端調(diào)用此函數(shù)
opts.SetDefaultPublishHandler(messagePubHandler)
opts.SetPingTimeout(1 * time.Second)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
//發(fā)布消息
// qos是服務(wù)質(zhì)量: ==1: 一次, >=1: 至少一次, <=1:最多一次
// retained: 表示mqtt服務(wù)器要保留這次推送的信息,如果有新的訂閱者出現(xiàn),就會(huì)把這消息推送給它(持久化推送)
token := client.Publish(topic, qos, retain, payload)
token.Wait()
fmt.Println("Push Data : "+topic, "Data Size is "+strconv.Itoa(len(payload)))
fmt.Println("Disconnect with broker")
client.Disconnect(250)
}
/**
* @Description: 訂閱與取消訂閱
* @param clientID
* @param addr
* @param topic
* @param isSub
*/
func Subscription(topic string, qos byte, isSub bool, handleFun func([]byte)) {
opts := mqtt.NewClientOptions().AddBroker("tcp://" + EMQServerAddress + ":1883").SetClientID("sub_test")
opts.SetKeepAlive(60 * time.Second)
opts.SetPingTimeout(1 * time.Second)
opts.OnConnect = func(client mqtt.Client) {
fmt.Println("New Subscription! Connected" + " => " + topic)
}
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
if isSub {
// 訂閱消息
if token := client.Subscribe(topic, qos, func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Receive Subscribe Message :")
fmt.Printf("Sub Client Topic : %s, Data size is %d \n", msg.Topic(), len(msg.Payload()))
if len(msg.Payload()) > 0 {
handleFun(msg.Payload())
}
}); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
} else {
// 取消訂閱
if token := client.Unsubscribe(topic); token.Wait() && token.Error() != nil {
fmt.Println(token.Error())
os.Exit(1)
}
}
}
學(xué)習(xí)資料:
到此這篇關(guān)于go語(yǔ)言實(shí)現(xiàn)mqtt協(xié)議的實(shí)踐的文章就介紹到這了,更多相關(guān)go語(yǔ)言 mqtt協(xié)議內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Goland使用Go Modules創(chuàng)建/管理項(xiàng)目的操作
這篇文章主要介紹了Goland使用Go Modules創(chuàng)建/管理項(xiàng)目的操作,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2021-05-05
Golang語(yǔ)言學(xué)習(xí)拿捏Go反射示例教程
這篇文章主要為大家介紹了Golang語(yǔ)言中Go反射示例的教程,教你拿捏Go反射,再也不用被Go反射折磨,有需要的朋友可以共同學(xué)習(xí)參考下2021-11-11
Golang操作MySql數(shù)據(jù)庫(kù)的完整步驟記錄
這篇文章主要給大家介紹了關(guān)于Golang操作MySql數(shù)據(jù)庫(kù)的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11

