Springboot整合mqtt實現(xiàn)軟硬件通信的示例代碼
前言
本文實現(xiàn)Springboot整合mqtt,更好地實現(xiàn)物聯(lián)網(wǎng)軟硬件通信。
一、mqtt是什么?
MQTT 是消息隊列遙測傳輸的縮寫,是一種輕量級、基于發(fā)布 / 訂閱模式的物聯(lián)網(wǎng)通信協(xié)議。
應用場景:
- 物聯(lián)網(wǎng)設備通信,比如智能家居、工業(yè)傳感器、穿戴設備。
- 遠程監(jiān)控與數(shù)據(jù)采集,例如環(huán)境監(jiān)測、設備狀態(tài)上報。
注:這里實現(xiàn)的是軟件后端層面的mqtt,是對硬件消息的接收和發(fā)送,如果要做一個完整的物聯(lián)網(wǎng)項目,也需要硬件層面連接mqtt服務器,并且訂閱、發(fā)布相關主題的信息。
二、物聯(lián)網(wǎng)項目結構圖

該圖為完整的mqtt項目結構圖。其中mqtt服務器部分可以用官方提供的公共的服務器,也可以在自己的服務器上搭建,只需要在后端配置文件添加mqtt服務器ip地址等相關信息。
注:Springboot項目不是自己內部開一個mqtt服務,所以我們只需要連接并使用相應的mqtt服務器即可。
三、實現(xiàn)代碼
在pom.xml中引入Maven坐標:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>在Springboot配置文件中配置mqtt服務器相關信息
spring:
mqtt:
url: tcp://broker.emqx.io:1883
#用戶名
username: admin
#密碼
password: 123456
#客戶端id(不能重復)
client:
id: consumer-id
#MQTT默認的消息推送主題,實際可在調用接口時指定
default:
topic: topic這里的mqtt服務器地址 tcp://broker.emqx.io:1883 是EMQX官方提供的一個公共的服務器,如果是想看一下初步效果,可以使用該服務器。
接下來我們可以放兩部分代碼,一部分就是接收消息,也就是訂閱消息的部分,另一部分就是用來發(fā)布相關消息。
訂閱消息部分:
@Configuration
public class MqttConsumerConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客戶端對象
*/
private MqttClient client;
/**
* 在bean初始化后連接到服務器
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客戶端連接服務端
*/
public void connect(){
try {
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
// //設置連接用戶名
// options.setUserName(username);
// //設置連接密碼
// options.setPassword(password.toCharArray());
//設置超時時間,單位為秒
options.setConnectionTimeout(100);
//設置心跳時間 單位為秒,表示服務器每隔1.5*20秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發(fā)布客戶端的遺囑信息
options.setWill("willTopic",(clientId + "與服務器斷開連接").getBytes(),0,false);
//設置回調
client.setCallback(new MqttConsumerCallBack());
client.connect(options);
//訂閱主題
//消息等級,和主題數(shù)組一一對應,服務端將按照指定等級給訂閱了主題的客戶端推送消息
int[] qos = {1};
System.out.println("連接");
//主題
//String[] topics = {"data","status"};
client.subscribe(topics,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 斷開連接
*/
public void disConnect(){
try {
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 訂閱主題
*/
public void subscribe(String topic,int qos){
try {
client.subscribe(topic,qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}@Component
public class MqttConsumerCallBack implements MqttCallback{
/**
* 客戶端斷開連接的回調
*/
@Override
public void connectionLost(Throwable throwable) {
System.out.println("與服務器斷開連接,可重連");
}
/**
* 消息到達的回調
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(String.format("接收消息主題 : %s",topic));
System.out.println(String.format("接收消息Qos : %d",message.getQos()));
System.out.println(String.format("接收消息內容 : %s",new String(message.getPayload())));
if(topic.equals("data")){
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
}
System.out.println(String.format("接收消息retained : %b",message.isRetained()));
}
/**
* 消息發(fā)布成功的回調
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
}接下來是發(fā)布消息部分:
@Configuration
@Slf4j
public class MqttProviderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
private String clientId = "provider_id";
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
/**
* 客戶端對象
*/
private MqttClient client;
/**
* 在bean初始化后連接到服務器
*/
@PostConstruct
public void init(){
connect();
}
/**
* 客戶端連接服務端
*/
public void connect(){
try{
// System.out.println("Connecting to MQTT server: " + hostUrl + " with client ID: " + clientId);
//創(chuàng)建MQTT客戶端對象
client = new MqttClient(hostUrl,clientId,new MemoryPersistence());
//連接設置
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
//設置連接用戶名
options.setUserName(username);
//設置連接密碼
options.setPassword(password.toCharArray());
//設置超時時間,單位為秒
options.setConnectionTimeout(100);
//設置心跳時間 單位為秒,表示服務器每隔 1.5*20秒的時間向客戶端發(fā)送心跳判斷客戶端是否在線
options.setKeepAliveInterval(20);
//設置遺囑消息的話題,若客戶端和服務器之間的連接意外斷開,服務器將發(fā)布客戶端的遺囑信息
options.setWill("willTopic",(clientId + "與服務器斷開連接").getBytes(),0,false);
//設置回調
client.setCallback(new MqttProviderCallBack());
client.connect(options);
} catch(MqttException e){
e.printStackTrace();
}
}
public void publish(int qos,boolean retained,String topic,String message){
MqttMessage mqttMessage = new MqttMessage(); //創(chuàng)建消息實例
mqttMessage.setQos(qos); //設置qos
mqttMessage.setRetained(retained); //設置是否保留信息
mqttMessage.setPayload(message.getBytes());
//主題的目的地,用于發(fā)布/訂閱信息
MqttTopic mqttTopic = client.getTopic(topic);
//提供一種機制來跟蹤消息的傳遞進度
//用于在以非阻塞方式(在后臺運行)執(zhí)行發(fā)布是跟蹤消息的傳遞進度
MqttDeliveryToken token;
try {
//將指定消息發(fā)布到主題,但不等待消息傳遞完成,返回的token可用于跟蹤消息的傳遞狀態(tài)
token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
} catch (MqttException e) {
e.printStackTrace();
}
}
}@Configuration
public class MqttProviderCallBack implements MqttCallback{
@Value("${spring.mqtt.client.id}")
private String clientId;
/**
* 與服務器斷開的回調
*/
@Override
public void connectionLost(Throwable cause) {
System.out.println(clientId+"與服務器斷開連接");
}
/**
* 消息到達的回調
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
/**
* 消息發(fā)布成功的回調
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
IMqttAsyncClient client = token.getClient();
System.out.println(client.getClientId()+"發(fā)布消息成功!");
}
}接下來就是調用publish方法,傳入相應參數(shù),即可完成消息的發(fā)布。
這里有幾個參數(shù)要注意一下:
一、MQTT Topic(消息路由核心)
Topic 是客戶端發(fā)布 / 訂閱消息的 “地址標識”,是實現(xiàn) “發(fā)布 - 訂閱” 模式的基礎。
- 核心屬性:字符串格式,無預定義結構,由客戶端自定義,比如 “device/light/ 客廳”“data/sensor/ 溫度”。
- 層級與通配符:用斜杠 “/” 劃分層級,支持兩種通配符訂閱 ——“+” 匹配單個層級(如 “device/+/ 客廳” 匹配 “device/light/ 客廳”),“#” 匹配當前及所有子層級(如 “data/#” 匹配所有數(shù)據(jù)類主題)。
- 核心規(guī)則:發(fā)布者僅需指定 Topic 發(fā)送消息,訂閱者通過匹配 Topic(或通配符)接收消息,發(fā)布者與訂閱者無直接關聯(lián),實現(xiàn)解耦。
二、MQTT QoS(消息傳輸質量等級)
QoS(Quality of Service)定義了消息從發(fā)布者到訂閱者的傳輸可靠性,MQTT 3.1.1 標準規(guī)定了 3 個等級,優(yōu)先級從低到高。
- QoS 0(最多一次):消息僅發(fā)送一次,不確認、不重發(fā),可能丟失。適用于對可靠性要求低的場景,如實時溫度上報。
- QoS 1(至少一次):消息確保送達,但可能重復。發(fā)布者發(fā)送后等待確認,未收到確認則重發(fā),直到訂閱者確認接收。
- QoS 2(恰好一次):消息確保僅送達一次,無丟失、無重復。通過 “發(fā)布 - 確認 - 釋放 - 完成” 四次握手實現(xiàn),適用于金融交易、指令下發(fā)等關鍵場景。
三、MQTT Retained
Retained 消息是 Broker(服務器)為指定 Topic 保存的 “最新一條消息”,具備 “狀態(tài)快照” 屬性。
- 發(fā)布時觸發(fā):客戶端發(fā)布消息時,需顯式設置 “Retain 標志位” 為 true,Broker 才會保存該消息。
- 僅存最新:同一 Topic 后續(xù)發(fā)布的 Retained 消息會覆蓋舊消息,Broker 始終只保留該 Topic 的最新狀態(tài)。
- 在有別的客戶端訂閱該Topic時,如果其“Retain 標志位” 為 true,一旦連接,客戶端馬上會收到一條保存的最后發(fā)布的該Topic的消息。
總結
本文詳細描述了mqtt項目大致結構、Sringboot整合mqtt代碼、mqtt的幾個重要參數(shù),希望對未來的架構師們有幫助,謝謝~
到此這篇關于Springboot整合mqtt實現(xiàn)軟硬件通信的示例代碼的文章就介紹到這了,更多相關Springboot整合mqtt通信內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
使用maven方式創(chuàng)建springboot項目的方式
使用Spring Initializr創(chuàng)建spring boot項目,因為外網(wǎng)問題導致很難成功,所以只能使用maven方式,這里介紹下使用maven方式創(chuàng)建springboot項目的方法,感興趣的朋友一起看看吧2022-09-09
Mybatis-plus如何通過反射實現(xiàn)動態(tài)排序不同字段功能
這篇文章主要介紹了Mybatis-plus如何通過反射實現(xiàn)動態(tài)排序不同字段功能,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2024-02-02
Caused by: java.lang.ClassNotFoundException: org.apache.comm
這篇文章主要介紹了Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type異常,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2020-07-07
JavaWeb dbutils執(zhí)行sql命令并遍歷結果集時不能查到內容的原因分析
這篇文章主要介紹了JavaWeb dbutils執(zhí)行sql命令并遍歷結果集時不能查到內容的原因分析及簡單處理方法,文中給大家介紹了javaweb中dbutils的使用,需要的朋友可以參考下2017-12-12

