Android開發(fā)MQTT協(xié)議的模型及通信淺析
前言
為什么要講MQTT協(xié)議?因為現(xiàn)在越來越多的領(lǐng)域會使用到這個協(xié)議,無論是做M2M,還是做Iot,或是想實現(xiàn)推送功能,MQTT都是一個不錯的選擇。
什么是MQTT協(xié)議
MQTT協(xié)議又稱為消息隊列要測傳輸協(xié)議,他是一種基于發(fā)布/訂閱范式的消息協(xié)議,并且它是一種基于TCP/IP協(xié)議族的應(yīng)用層協(xié)議。
可以看出的它的特點:輕量、簡單、基于發(fā)布/訂閱范式、基于TCP/IP、是一種應(yīng)用層協(xié)議。
如果還是不明白,我們可以簡單拿它和我們常用的http協(xié)議做個比較。
| HTTP協(xié)議 | MQTT協(xié)議 |
|---|---|
| 基于TCP或UDP | 基于TCP |
| 基于 請求/響應(yīng) 模型 | 基于 發(fā)布/訂閱 模型 |
| http1.x是傳數(shù)據(jù)包 | 傳輸二進(jìn)制數(shù)據(jù) |
MQTT協(xié)議的模型
我們得知道它是一個怎樣的模型才好去了解它的一個工作方式。比如說HTTP協(xié)議簡單分為兩個角色,一個Client代表客戶端,一個Server代表服務(wù)端。
而MQTT簡單來看分為3個角色,publisher表示發(fā)布者,subscriber表示訂閱者,它們兩個都是Client,所以任何一個Client客戶端既能充當(dāng)publisher,也能充當(dāng)subscriber。還有一個角色是broker表示代理,它是Server服務(wù)端??梢钥闯鯩QTT也是基于C/S的通信架構(gòu),只不過分為3種角色。
如果理解了這個模型之后,你就會有個疑問,發(fā)布和訂閱什么呢?這就需要引入一個新的東西叫主題topic(如果不理解主題這個概念的話也沒關(guān)系,后面用代碼就很容易理解主題是什么)

所以它的工作流程就是:
- subscriber訂閱者連接broker代理,并訂閱主題topic
- publisher發(fā)布者連接broker代理(當(dāng)然如何訂閱者和發(fā)布者是同一個Client的話就不需要重復(fù)連接),并發(fā)布消息到相應(yīng)的主題
- broker代理會把消息發(fā)給對應(yīng)訂閱的主題的subscriber訂閱者
開發(fā)MQTT通信
1. 處理客戶端和服務(wù)端
前面我們說了MQTT是繼續(xù)C/S的結(jié)構(gòu),那我們就需要有一個客戶端和一個服務(wù)端。
(1)服務(wù)端開發(fā)
很不幸我是開發(fā)前端的,后臺的開發(fā)我并不熟悉,所以這里的演示中我選擇用云服務(wù)EMQX,想嘗試的朋友可以上這個網(wǎng)頁去部署自己的云服務(wù),流程很簡單 cloud.emqx.com/ ,免費試用14天。

(2)客戶端開發(fā)
因為我是做Android開發(fā)的,所以這里我用Android來舉例子。正常來說可以在TCP的基礎(chǔ)上開發(fā),自己去封裝,但我這只是淺談,所以我用第三方框架進(jìn)行演示,用Paho的mqtt
2. 客戶端開發(fā)
先導(dǎo)入Paho的mqtt
dependencies {
......
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
}
在manifest中注冊Paho的MqttService
<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:theme="@style/Theme.MyApplication">
<activity
android:name=".MainActivity"
android:exported="true">
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
<service android:name="org.eclipse.paho.android.service.MqttService"/>
<service android:name=".MqttActionService"/>
</application>
我這邊為了用一個項目來演示Mqtt通信,所有把MainActivity當(dāng)成publisher發(fā)布者,把MqttActionService當(dāng)成subscriber訂閱者。
所以整體的流程是這樣的,我們先開啟MqttActionService,然后在MqttActionService中進(jìn)行連接和訂閱。再在MainActivity進(jìn)行連接和發(fā)送消息。
先把Mqtt的Client給封裝起來(我這里防止有些朋友看不懂Kotlin,我就用了Java,后面不重要的地方我直接用Kotlin,一般也比較容易看懂)。
public class MyMqttClient {
private MqttAndroidClient mClient;
private MqttConnectOptions mOptions;
private OnMqttConnectListener mOnMqttConnectListener;
private final String mClientId;
private MqttCallbackExtended mExtended = new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
if (mOnMqttConnectListener != null){
mOnMqttConnectListener.onConnectComplete(serverURI);
}
}
@Override
public void connectionLost(Throwable cause) {
if (mOnMqttConnectListener != null){
mOnMqttConnectListener.onConnectFailure(cause);
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
};
private IMqttActionListener mConnectAction = new IMqttActionListener() {
@Override
public void onSuccess(IMqttToken asyncActionToken) {
}
@Override
public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
if (mOnMqttConnectListener != null){
mOnMqttConnectListener.onConnectFailure(exception);
}
exception.printStackTrace();
}
};
private IMqttMessageListener messageListener = new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
if (mOnMqttConnectListener != null){
mOnMqttConnectListener.onMessageArrived(topic, message);
}
}
};
public MyMqttClient(Context context){
this(context, null);
}
public MyMqttClient(Context context, String clientId){
if (!TextUtils.isEmpty(clientId)) {
this.mClientId = clientId;
}else {
this.mClientId = MqttConfig.clientId;
}
init(context);
}
public void init(Context context){
mClient = new MqttAndroidClient(context, MqttConfig.mqttUrl, mClientId);
mClient.setCallback(mExtended);
mOptions = new MqttConnectOptions();
mOptions.setConnectionTimeout(4000);
mOptions.setKeepAliveInterval(30);
mOptions.setUserName(MqttConfig.username);
mOptions.setPassword(MqttConfig.password.toCharArray());
}
public void setOnMqttConnectListener(OnMqttConnectListener onMqttConnectListener) {
this.mOnMqttConnectListener = onMqttConnectListener;
}
/**
* 連接
*/
public void connect(){
try {
if (!mClient.isConnected()){
mClient.connect(mOptions, null, mConnectAction);
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 訂閱
*/
public void subscribeToTopic(String mTopic){
this.subscribeToTopic(mTopic, 0);
}
public void subscribeToTopic(String mTopic, int qos){
try {
mClient.subscribe(mTopic, qos, null,null, messageListener);
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 發(fā)送消息
*/
public void sendMessage(String mTopic, byte[] data){
try {
MqttMessage message = new MqttMessage();
message.setPayload(data);
mClient.publish(mTopic, message);
}catch (Exception e){
e.printStackTrace();
}
}
public void onDestroy(){
try {
mClient.disconnect();
mExtended = null;
mConnectAction = null;
messageListener = null;
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 提供給外層的回調(diào),更方便進(jìn)行使用
*/
public interface OnMqttConnectListener{
void onConnectComplete(String serverURI);
void onConnectFailure(Throwable e);
void onMessageArrived(String topic, MqttMessage message);
}
}
當(dāng)中有些配置我直接抽出來
public interface MqttConfig {
String mqttUrl = "tcp://r0c36017.cn-shenzhen.emqx.cloud:11005";
String clientId = "deployment-r0c36017";
String username = "yeshuaishizhenshuai";
String password = "123456";
String oneTopic = "kylin/topic/one";
}
可以講一下這些參數(shù):
(1) mqttUrl: 連接代理的連接,可以看到我上面云服務(wù)那張截圖里面的“連接地址”和“連接端口” (2) clientId: 客戶端ID,無論是subscriber還是publisher都屬于客戶端,這個在上面說過,所以都有一個對應(yīng)的ID標(biāo)識他們是屬于哪個客戶端。我下面的Demo中MqttActionService用的ClienId是deployment-r0c36017,MainActivity用的ClienId是deployment-r0c36018,不同的,所以是兩個客戶端。 (3) username和password: 這兩個參數(shù)都是一個標(biāo)識,會和后臺記錄,如果你沒有的話,那你就連不上代理,也就是連不上服務(wù)端。 (4) oneTopic: 就是主題,你訂閱和發(fā)送消息都要對應(yīng)是哪個主題。
然后subscriber連接并訂閱主題
class MqttActionService : Service() {
private var mqttClient : MyMqttClient ?= null
override fun onCreate() {
super.onCreate()
mqttClient = MyMqttClient(this)
mqttClient?.setOnMqttConnectListener(object : MyMqttClient.OnMqttConnectListener{
override fun onConnectComplete(serverURI: String?) {
mqttClient?.subscribeToTopic(MqttConfig.oneTopic)
}
override fun onConnectFailure(e: Throwable?) {
}
override fun onMessageArrived(topic: String?, message: MqttMessage?) {
val h = Handler(Looper.getMainLooper())
h.post {
Toast.makeText(this@MqttActionService.applicationContext, message.toString(), Toast.LENGTH_SHORT).show();
}
}
})
}
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
val handler = Handler()
handler.postDelayed({ mqttClient?.connect() }, 1000)
return START_STICKY
}
override fun onBind(intent: Intent?): IBinder? {
return null
}
override fun onDestroy() {
super.onDestroy()
mqttClient?.onDestroy()
}
}
然后publisher連接并發(fā)送消息
class MainActivity : AppCompatActivity() {
private var clinet : MyMqttClient ?= null
private var isConnect = false
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
init()
val btn : Button = findViewById(R.id.btn_connect)
val send : Button = findViewById(R.id.btn_send)
val open : Button = findViewById(R.id.open)
open.setOnClickListener {
val intent = Intent()
intent.setClass(this, MqttActionService::class.java)
startService(intent)
}
btn.setOnClickListener {
clinet?.connect()
}
send.setOnClickListener {
clinet?.sendMessage(MqttConfig.oneTopic, "你干嘛啊~哎呦~".toByteArray())
}
}
private fun init(){
clinet = MyMqttClient(this, "deployment-r0c36018")
clinet?.setOnMqttConnectListener(object : MyMqttClient.OnMqttConnectListener{
override fun onConnectComplete(serverURI: String?) {
isConnect = true
}
override fun onConnectFailure(e: Throwable?) {
e?.printStackTrace()
isConnect = false
}
override fun onMessageArrived(topic: String?, message: MqttMessage?) {
}
})
}
}
我這定了3個按鈕,第一個按鈕open會跳轉(zhuǎn)Service然后subscriber連接并訂閱主題,第二個按鈕btn會連接代理,第三個按鈕send發(fā)送消息??碝qttActionService的代碼可以看出,我這里發(fā)送消息后,會彈出Toast。

Paho的mqtt的BUG
這庫我也是第一次用,我們那用的都是自己擼的(這邊肯定沒法放上來),然后我用的時候發(fā)現(xiàn)一個問題。我想給Service去開一條進(jìn)程去處理訂閱的操作的,這樣能更真實的去模擬,結(jié)果就在連接時出問題了
經(jīng)檢查,連接的context的進(jìn)程要和org.eclipse.paho.android.service.MqttService的進(jìn)程一致。我去看他源碼是怎么回事。

發(fā)現(xiàn)它內(nèi)部的Binder竟然做了強轉(zhuǎn),這里因為不是代理而會出現(xiàn)報錯。如果使用這個庫的話就小心點你要做的夸進(jìn)程的操作。
總結(jié)
今天只是淺談一些MQTT的一些原理和流程,其實還有更深的功能,比如Qos啊這些還沒說,我覺得一次說太多可能會讓第一次接觸的人混亂。先簡單的了解MQTT是什么,主要使用的場景,內(nèi)部的原理大致是怎樣的。當(dāng)了解這些之后再去深入的看,會能夠更好的去理解。
以上就是Android開發(fā)MQTT協(xié)議的模型及通信淺析的詳細(xì)內(nèi)容,更多關(guān)于Android MQTT協(xié)議模型通信的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Android studio 解決logcat無過濾工具欄的操作
這篇文章主要介紹了Android studio 解決logcat無過濾工具欄的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-04-04
Flutter加載圖片流程之ImageProvider源碼示例解析
這篇文章主要為大家介紹了Flutter加載圖片流程之ImageProvider源碼示例解析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-04-04
CDC與BG-CDC的含義電容觸控學(xué)習(xí)整理
今天小編就為大家分享一篇關(guān)于CDC與BG-CDC的含義電容觸控學(xué)習(xí)整理,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12
Android利用SurfaceView實現(xiàn)下雨的天氣動畫效果
這篇文章主要介紹了Android利用SurfaceView實現(xiàn)下雨天氣效果的相關(guān)資料,文中詳細(xì)介紹 SurfaceView 和 View 的區(qū)別,以及一些需要使用到 SurfaceView 的場景。需要的朋友可以參考借鑒,下面來一起看看吧。2017-03-03

