SpringBoot整合MQTT協(xié)議實(shí)現(xiàn)消息訂閱與發(fā)布功能
1、相關(guān)依賴 pom.xml文件
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>2、配置文件 application.yml
這里的訂閱主題可不要,我這里用于啟動(dòng)的時(shí)候就訂閱固定主題。適用主題固定的場景。
# MQTT服務(wù)地址,端口默認(rèn)1883 mqtt-broker-url: tcp://127.0.0.1:1883 # 用戶名 mqtt-username: admin # 密碼 mqtt-password: public # 訂閱主題(可以多個(gè)) mqtt-default-topic: mqtt/topic_test # 客戶端Id mqtt-clientId: can
3、MQTT配置類
用于配置項(xiàng)目啟動(dòng)時(shí)就連接MQTT。
@Component
public class MqttConfig {
@Resource
private MqttPushClient mqttPushClient;
@Resource
private MqttSubClient mqttSubClient;
/**
* 用戶名
*/
@Value("${mqtt-username}")
private String username;
/**
* 密碼
*/
@Value("${mqtt-password}")
private String password;
/**
* 連接地址
*/
@Value("${mqtt-broker-url}")
private String hostUrl;
/**
* 客戶Id
*/
@Value("${mqtt-clientId}")
private String clientId;
/**
* 默認(rèn)連接話題,多個(gè)的話用逗號(hào)隔開
*/
@Value("${mqtt-default-topic}")
private String defaultTopic;
/**
* 超時(shí)時(shí)間
*/
private int timeout = 100;
/**
* 保持連接數(shù)
*/
private int keepalive = 60;
/**
* 連接至mqtt服務(wù)器,獲取mqtt連接
*
* @return MqttPushClient
*/
@Bean
public MqttPushClient getMqttPushClient() {
// 連接至mqtt服務(wù)器,獲取mqtt連接
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
// 訂閱默認(rèn)主題
mqttSubClient.subScribeDataPublishTopic(defaultTopic);
return mqttPushClient;
}
}4、發(fā)布連接類
連接MQTT的方法、發(fā)布消息的方法。
@Slf4j
@Component
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
@Getter
private static MqttClient client;
public static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
/**
* 連接
* @param host mqtt://127.0.0.1:1883
* @param clientId can
* @param username admin
* @param password password
* @param timeout 100
* @param keepalive 60
*/
public void connect(String host, String clientId, String username, String password, int timeout, int keepalive) {
MqttClient client;
try {
client = new MqttClient(host, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
// automaticReconnect 為 true 表示斷線自動(dòng)重連,但僅僅只是重新連接,并不訂閱主題;在 connectComplete 回調(diào)函數(shù)重新訂閱
options.setAutomaticReconnect(true);
MqttPushClient.setClient(client);
try {
//設(shè)置回調(diào)類
client.setCallback(pushCallback);
IMqttToken iMqttToken = client.connectWithResult(options);
boolean complete = iMqttToken.isComplete();
log.error("MQTT連接{}", complete ? "成功" : "失敗");
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
}
} catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
/**
* 關(guān)閉MQTT連接
*/
public void close() throws MqttException {
client.disconnect();
client.close();
}
/**
* 發(fā)布,默認(rèn)qos為0,非持久化
*
* @param topic 主題名
* @param pushMessage 消息
*/
public void publish(String topic, String pushMessage) {
publish(0, false, topic, pushMessage);
}
/**
* 發(fā)布
* QoS 0:消息最多傳送一次。如果當(dāng)前客戶端不可用,它將丟失這條消息。
* QoS 1:消息至少傳送一次。
* QoS 2:消息只傳送一次。
* @param qos
* @param retained
* @param topic
* @param pushMessage
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
// MQTT主題不存在
if (null == mTopic) return;
try {
mTopic.publish(message);
} catch (Exception e) {
log.error("MQTT發(fā)送消息異常:", e);
e.printStackTrace();
}
}
}5、訂閱類
用于訂閱某個(gè)或多個(gè)主題、取消訂閱某個(gè)或者多個(gè)主題。
@Slf4j
@Component
public class MqttSubClient {
private static final Logger logger = LoggerFactory.getLogger(MqttSubClient.class);
// 訂閱多個(gè)主題以逗號(hào)分開
public void subScribeDataPublishTopic(String defaultTopic) {
//訂閱test_queue主題
String[] mqttTopic = defaultTopic.split(",");
for (String s : mqttTopic) {
//訂閱主題
subscribe(s, 0);
}
}
/**
* 訂閱某個(gè)主題,qos默認(rèn)為0
*
* @param topic 主題
*/
public void subscribe(String topic) {
subscribe(topic, 0);
}
/**
* 訂閱某個(gè)主題
*
* @param topic 主題名
* @param qos qos
*/
public void subscribe(String topic, int qos) {
try {
MqttClient client = MqttPushClient.getClient();
if (client == null) {
return;
}
client.subscribe(topic, qos);
log.error("MQTT訂閱主題:{}", topic);
} catch (MqttException e) {
logger.error(e.getMessage());
e.printStackTrace();
}
}
/**
* 取消訂閱某個(gè)主題
* @param topic 要取消訂閱的主題名
*/
public void unsubscribe(String topic) {
try {
MqttClient client = MqttPushClient.getClient();
if (client == null || !client.isConnected()) {
return;
}
client.unsubscribe(topic); // 取消訂閱
log.error("MQTT取消訂閱主題: {}", topic);
} catch (MqttException e) {
log.error("取消訂閱失敗: {}", e.getMessage());
e.printStackTrace();
}
}
/**
* 批量取消訂閱多個(gè)主題
* @param topics 主題數(shù)組
*/
public void unsubscribe(String[] topics) {
try {
MqttClient client = MqttPushClient.getClient();
if (client == null || !client.isConnected()) {
return;
}
client.unsubscribe(topics); // 取消訂閱多個(gè)主題
log.error("MQTT取消訂閱主題: {}", Arrays.toString(topics));
} catch (MqttException e) {
log.error("取消訂閱失敗: {}", e.getMessage());
e.printStackTrace();
}
}
}
6、回調(diào)類
處理MQTT連接斷開重連、訂閱主題接收的消息處理。
@Slf4j
@Component
public class PushCallback implements MqttCallback {
@Resource
@Lazy
private MqttPushClient mqttPushClient;
/**
* 連接丟失后,一般在這里面進(jìn)行重連(重連的邏輯需要自己處理)
* @param cause .
*/
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT連接斷開,正在重連:" + cause);
}
/**
* 發(fā)送消息,消息到達(dá)后處理方法
* @param token .
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
log.error("deliveryComplete---------{}", token.isComplete());
}
/**
* 訂閱主題接收到消息處理方法
* @param topic 主題
* @param message 消息
*/
@Override
public void messageArrived(String topic, MqttMessage message) {
// 訂閱主題后得到的消息會(huì)執(zhí)行到這里面,這里在控制臺(tái)有輸出
log.error("MQTT接收消息主題 : {}", topic);
log.error("MQTT接收消息Qos : {}", message.getQos());
log.error("MQTT接收消息內(nèi)容 : {}", message);
}
}7、啟動(dòng)后,進(jìn)入EMQX管理頁面
程序允許打印連接成功,去EMQX管理頁面查看。

EMQX管理頁面這里有所有的主題列表。

包括客戶端訂閱的主題。

8、通過接口給主題發(fā)送消息
@RestController
@Slf4j
@RequestMapping("/api")
public class ApiController {
@Resource
private MqttPushClient mqttPushClient;
@GetMapping("/test")
public String getVersions(@RequestParam String topic, @RequestParam String message) {
mqttPushClient.publish(topic, message);
return "ok";
}
}瀏覽器直接調(diào)用,topic:配置文件里面訂閱的主題。message:你想發(fā)送給主題的消息。

控制臺(tái)日志打印:發(fā)送成功,并且接收到了主題發(fā)送的消息。

到此這篇關(guān)于SpringBoot整合MQTT協(xié)議實(shí)現(xiàn)消息訂閱與發(fā)布功能的文章就介紹到這了,更多相關(guān)SpringBoot整合MQTT訂閱與發(fā)布內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java開發(fā)Dubbo注解Adaptive實(shí)現(xiàn)原理
這篇文章主要為大家介紹了java開發(fā)Dubbo注解Adaptive實(shí)現(xiàn)原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-09-09
SpringBoot啟動(dòng)時(shí)如何通過啟動(dòng)參數(shù)指定logback的位置
這篇文章主要介紹了SpringBoot啟動(dòng)時(shí)如何通過啟動(dòng)參數(shù)指定logback的位置,在spring boot中,使用logback配置的方式常用的有兩種,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07
MyBatis-Plus使用sl4j日志打印SQL的代碼詳解
以下是關(guān)于使用 Spring Boot 起始器替換 slf4j-api 和 logback 依賴的詳細(xì)步驟和注意事項(xiàng),包括 MyBatis-Plus 的默認(rèn)日志級別信息,需要的朋友可以參考下2024-10-10
java實(shí)現(xiàn)哈弗曼編碼與反編碼實(shí)例分享(哈弗曼算法)
本文介紹java實(shí)現(xiàn)哈弗曼編碼與反編碼實(shí)例,大家參考使用吧2014-01-01

