RocketMQ消息過濾與查詢的實現(xiàn)
消息過濾
RocketMQ分布式消息隊列的消息過濾方式有別于其它MQ中間件,是在Consumer端訂閱消息時再做消息過濾的。
RocketMQ這么做是還是在于其Producer端寫入消息和Consomer端訂閱消息采用分離存儲的機制來實現(xiàn)的,Consumer端訂閱消息是需要通過ConsumeQueue這個消息消費的邏輯隊列拿到一個索引,然后再從CommitLog里面讀取真正的消息實體內(nèi)容,所以說到底也是還繞不開其存儲結(jié)構(gòu)。
其ConsumeQueue的存儲結(jié)構(gòu)如下,可以看到其中有8個字節(jié)存儲的Message Tag的哈希值,基于Tag的消息過濾正式基于這個字段值的。

主要支持如下2種的過濾方式
(1) Tag過濾方式:
Consumer端在訂閱消息時除了指定Topic還可以指定TAG,如果一個消息有多個TAG,可以用||分隔。
其中,Consumer端會將這個訂閱請求構(gòu)建成一個 SubscriptionData,發(fā)送一個Pull消息的請求給Broker端。Broker端從RocketMQ的文件存儲層—Store讀取數(shù)據(jù)之前,會用這些數(shù)據(jù)先構(gòu)建一個MessageFilter,然后傳給Store。
Store從 ConsumeQueue讀取到一條記錄后,會用它記錄的消息tag hash值去做過濾,由于在服務(wù)端只是根據(jù)hashcode進行判斷,無法精確對tag原始字符串進行過濾,故在消息消費端拉取到消息后,還需要對消息的原始tag字符串進行比對,如果不同,則丟棄該消息,不進行消息消費。
(2) SQL92的過濾方式:
這種方式的大致做法和上面的Tag過濾方式一樣,只是在Store層的具體過濾過程不太一樣,真正的 SQL expression 的構(gòu)建和執(zhí)行由rocketmq-filter模塊負責(zé)的。
每次過濾都去執(zhí)行SQL表達式會影響效率,所以RocketMQ使用了BloomFilter避免了每次都去執(zhí)行。SQL92的表達式上下文為消息的屬性。
消息查詢
RocketMQ支持按照下面兩種維度(“按照Message Id查詢消息”、“按照Message Key查詢消息”)進行消息查詢。
按照MessageId查詢消息
RocketMQ中的MessageId的長度總共有16字節(jié),其中包含了消息存儲主機地址(IP地址和端口),消息Commit Log offset。
“按照MessageId查詢消息”在RocketMQ中具體做法是:Client端從MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封裝成一個RPC請求后通過Remoting通信層發(fā)送(業(yè)務(wù)請求碼:VIEW_MESSAGE_BY_ID)。
Broker端走的是QueryMessageProcessor,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄并解析成一個完整的消息返回。
按照Message Key查詢消息
“按照Message Key查詢消息”,主要是基于RocketMQ的IndexFile索引文件來實現(xiàn)的。RocketMQ的索引文件邏輯結(jié)構(gòu),類似JDK中HashMap的實現(xiàn)。
索引文件的具體結(jié)構(gòu)如下:

IndexFile索引文件為用戶提供通過“按照Message Key查詢消息”的消息索引查詢服務(wù),IndexFile文件的存儲位置是:$HOME\store\index\${fileName},文件名fileName是以創(chuàng)建時的時間戳命名的,文件大小是固定的,等于40+500W\*4+2000W\*20= 420000040個字節(jié)大小。
如果消息的properties中設(shè)置了UNIQ_KEY這個屬性,就用 topic + “#” + UNIQ_KEY的value作為 key 來做寫入操作。
如果消息設(shè)置了KEYS屬性(多個KEY以空格分隔),也會用 topic + “#” + KEY 來做索引。
其中的索引數(shù)據(jù)包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個字段,一共20 Byte。
NextIndex offset 即前面讀出來的 slotValue,如果有 hash沖突,就可以用這個字段將所有沖突的索引用鏈表的方式串起來了。
Timestamp記錄的是消息storeTimestamp之間的差,并不是一個絕對的時間。整個Index File的結(jié)構(gòu)如圖,40 Byte 的Header用于保存一些總的統(tǒng)計信息,4\*500W的 Slot Table并不保存真正的索引數(shù)據(jù),而是保存每個槽位對應(yīng)的單向鏈表的頭。
20\*2000W 是真正的索引數(shù)據(jù),即一個 Index File 可以保存 2000W個索引。
“按照Message Key查詢消息”的方式,RocketMQ的具體做法是,主要通過Broker端的QueryMessageProcessor業(yè)務(wù)處理器來查詢,讀取消息的過程就是用topic和key找到IndexFile索引文件中的一條記錄,根據(jù)其中的commitLog offset從CommitLog文件中讀取消息的實體內(nèi)容。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
spring @Scheduled注解的使用誤區(qū)及解決
這篇文章主要介紹了spring @Scheduled注解的使用誤區(qū)及解決,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-11-11
SpringBoot favicon Chrome設(shè)置問題解決方案
在本篇文章里小編給大家分享的是關(guān)于SpringBoot favicon Chrome設(shè)置問題實例內(nèi)容,小的朋友們可以參考學(xué)習(xí)下。2020-02-02
SpringBoot之RestTemplate在URL中轉(zhuǎn)義字符的問題
這篇文章主要介紹了SpringBoot之RestTemplate在URL中轉(zhuǎn)義字符的問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-06-06
Java通過PowerMockito和Mokito進行單元測試的實現(xiàn)
PowerMockito和Mockito都是Java語言中的測試框架,用于進行單元測試和集成測試,本文就來詳細的介紹一下通過PowerMockito和Mokito進行單元測試,感興趣的可以了解一下2023-08-08
新版本IntelliJ IDEA 構(gòu)建maven,并用Maven創(chuàng)建一個web項目(圖文教程)
這篇文章主要介紹了新版本IntelliJ IDEA 構(gòu)建maven,并用Maven創(chuàng)建一個web項目的圖文教程,需要的朋友可以參考下2018-01-01
mybatis如何使用Java8的日期LocalDate和LocalDateTime詳解
這篇文章主要給大家介紹了關(guān)于mybatis如何使用Java8的日期LocalDate和LocalDateTime的相關(guān)資料,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧。2017-09-09
SpringMVC接收與響應(yīng)json數(shù)據(jù)的幾種方式
這篇文章主要給大家介紹了關(guān)于SpringMVC接收與響應(yīng)json數(shù)據(jù)的幾種方式,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者使用springmvc具有一定的參考學(xué)習(xí)價值,需要的朋友們下面來一起學(xué)習(xí)學(xué)習(xí)吧2019-03-03

