Springboot集成mqtt客戶端詳解
1. 前言
? 這里我們使用springboot搭建一個(gè)輕量級(jí)的mqtt客戶端,連接mqtt的Broker服務(wù)。
? 連接信息寫在配置文件里application.properties
spring.mqtt.username=admin
spring.mqtt.mqpassword=admin
spring.mqtt.host-url= tcp://127.0.0.1:1883
spring.mqtt.client-id= server_client_${random.value}
spring.mqtt.default-topic= $SYS/brokers/+/clients/#
spring.mqtt.completionTimeout= 3000
spring.mqtt.keepAlive= 60
2. 引入依賴
<!--mqtt -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
3. 配置文件
? 新建MqttProperties.java文件,初始化application里的mqtt配置項(xiàng)
@ConfigurationProperties("spring.mqtt")
@Component
@Getter
@Setter
public class MqttProperties {
private String username;
private String mqpassword;
private String hostUrl;
private String clientId;
private String defaultTopic;
private String completionTimeout;
private Integer keepAlive;
}? 新建MqttConfiguration.java文件,為mqtt做初始化配置
@Configuration
@Slf4j
public class MqttConfiguration {
@Autowired
private MqttProperties mqttProperties;
/**
* 事件觸發(fā)
*/
@Autowired
private ApplicationEventPublisher eventPublisher;
@Bean
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
mqttConnectOptions.setUserName(mqttProperties.getUsername());
mqttConnectOptions.setPassword(mqttProperties.getMqpassword().toCharArray());
mqttConnectOptions.setServerURIs(new String[]{mqttProperties.getHostUrl()});
mqttConnectOptions.setKeepAliveInterval(2);
mqttConnectOptions.setKeepAliveInterval(mqttProperties.getKeepAlive());
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* 配置client,監(jiān)聽(tīng)的topic
*/
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttProperties.getClientId()+"_inbound", mqttClientFactory(),
mqttProperties.getDefaultTopic().split(","));
adapter.setCompletionTimeout(Long.valueOf(mqttProperties.getCompletionTimeout()));
adapter.setConverter(new DefaultPahoMessageConverter());
//默認(rèn)添加TopicName中所有tipic
adapter.addTopic("+/+/test");
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
String qos = message.getHeaders().get("mqtt_receivedQos").toString();
//觸發(fā)事件 這里不再做業(yè)務(wù)處理,包 listener中做處理
eventPublisher.publishEvent(new MqttEvent(this,topic,message.getPayload().toString()));
}
};
}
/**
* 發(fā)送消息和消費(fèi)消息Channel可以使用相同MqttPahoClientFactory
*
* @return
*/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
// 在這里進(jìn)行mqttOutboundChannel的相關(guān)設(shè)置
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttProperties.getClientId(), mqttClientFactory());
// 如果設(shè)置成true,發(fā)送消息時(shí)將不會(huì)阻塞。
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttProperties.getDefaultTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}4. MQTT消息類
新建MqttEvent.java 消息類。用于發(fā)送mqtt的消息
@Getter
public class MqttEvent extends ApplicationEvent {
private String topic;
/**
* 發(fā)送的消息
*/
private String message;
public MqttEvent(Object source,String topic,String message) {
super(source);
this.topic = topic;
this.message = message;
}
}5. MQTT消息接收器
新建JobListener.java文件作為 mqtt的消息接收類
@Slf4j
@Component
public class JobListener {
@Autowired
DeviceDao deviceDao;
/**
* 監(jiān)聽(tīng)topic
* @param mqttEvent
*/
@EventListener(condition = "#mqttEvent.topic.startsWith('pay')")
public void onEmqttCall1(MqttEvent mqttEvent) throws Exception {
String topic = mqttEvent.getTopic();
//寫邏輯處理
}
/**
* 監(jiān)聽(tīng)topic
* @param mqttEvent
*/
@EventListener(condition = "#mqttEvent.topic.equals('device')")
public void onEmqttCallT(MqttEvent mqttEvent){
log.info("接收到消11111111111:"+mqttEvent.getMessage());
}
}6. MQTT消息發(fā)送器
新建MqttGateway.java 提供發(fā)送mqttt消息的接口服務(wù)
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}7. 測(cè)試MQTT發(fā)送消息
@SpringBootTest
public class Test3 {
@Autowired
MqttGateway mqttGateway;
@Test
public void mqttTest () {
mqttGateway.sendToMqtt("111//222/33","消息內(nèi)容");
}
}到此這篇關(guān)于Springboot集成mqtt客戶端詳解的文章就介紹到這了,更多相關(guān)Springboot集成mqtt內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java對(duì)象與json對(duì)象間的相互轉(zhuǎn)換的方法
本篇文章主要介紹了java對(duì)象與json對(duì)象間的相互轉(zhuǎn)換的方法,詳細(xì)介紹了json字符串和java對(duì)象相互轉(zhuǎn)換,有興趣的可以了解一下2017-01-01
Java中JMM與volatile關(guān)鍵字的學(xué)習(xí)
這篇文章主要介紹了通過(guò)實(shí)例解析JMM和Volatile關(guān)鍵字的學(xué)習(xí),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2021-09-09
使用httpclient無(wú)需證書調(diào)用https的示例(java調(diào)用https)
這篇文章主要介紹了使用httpclient無(wú)需證書調(diào)用https的示例(java調(diào)用https),需要的朋友可以參考下2014-04-04
Java中的PrintWriter 介紹_動(dòng)力節(jié)點(diǎn)Java學(xué)院整理
PrintWriter 是字符類型的打印輸出流,它繼承于Writer。接下來(lái)通過(guò)本文給大家介紹java中的 PrintWriter 相關(guān)知識(shí),感興趣的朋友一起學(xué)習(xí)吧2017-05-05
Springboot中的@ConditionalOnBean注解詳細(xì)解讀
這篇文章主要介紹了Springboot中的@ConditionalOnBean注解詳細(xì)解讀,@ConditionalOnMissingBean注解兩個(gè)類,一個(gè)Computer類,一個(gè)配置類,想要完成;如果容器中沒(méi)有Computer類,就注入備用電腦Computer類,如果有Computer就不注入,需要的朋友可以參考下2023-11-11
Springboot整合ActiveMQ實(shí)現(xiàn)消息隊(duì)列的過(guò)程淺析
昨天仔細(xì)研究了activeMQ消息隊(duì)列,也遇到了些坑,下面這篇文章主要給大家介紹了關(guān)于SpringBoot整合ActiveMQ的相關(guān)資料,文中通過(guò)實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2023-02-02
網(wǎng)易Java程序員兩輪面試 請(qǐng)問(wèn)你能答對(duì)幾個(gè)?
為大家分享網(wǎng)易Java程序員兩輪面試題,考考大家,這些問(wèn)題你能答對(duì)幾個(gè)?2017-11-11

