RocketMQ設(shè)計(jì)之故障規(guī)避機(jī)制
NameServer為了簡化和客戶端通信,發(fā)現(xiàn)Broker故障時(shí)并不會立即通知客戶端。故障規(guī)避機(jī)制就是用來解決當(dāng)Broker出現(xiàn)故障,Producer不能及時(shí)感知而導(dǎo)致消息發(fā)送失敗的問題。默認(rèn)不開啟,如果開啟,消息發(fā)送失敗的時(shí)候會將失敗的Broker暫時(shí)排除在隊(duì)列選擇列表外
MQFaultStrategy類的:
public class MQFaultStrategy {
? ? private final static InternalLogger log = ClientLogger.getLog();
? ? private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
? ? private boolean sendLatencyFaultEnable = false;
? ? private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
? ? private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
? ? public long[] getNotAvailableDuration() {
? ? ? ? return notAvailableDuration;
? ? }
? ? public void setNotAvailableDuration(final long[] notAvailableDuration) {
? ? ? ? this.notAvailableDuration = notAvailableDuration;
? ? }
? ? public long[] getLatencyMax() {
? ? ? ? return latencyMax;
? ? }
? ? public void setLatencyMax(final long[] latencyMax) {
? ? ? ? this.latencyMax = latencyMax;
? ? }
? ? public boolean isSendLatencyFaultEnable() {
? ? ? ? return sendLatencyFaultEnable;
? ? }
? ? public void setSendLatencyFaultEnable(final boolean sendLatencyFaultEnable) {
? ? ? ? this.sendLatencyFaultEnable = sendLatencyFaultEnable;
? ? }
? ? public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
? ? ? ? //是否開啟故障延遲機(jī)制
? ? ? ? if (this.sendLatencyFaultEnable) {
? ? ? ? ? ? try {
? ? ? ? ? ? ? ? int index = tpInfo.getSendWhichQueue().getAndIncrement();
? ? ? ? ? ? ? ? for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
? ? ? ? ? ? ? ? ? ? int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
? ? ? ? ? ? ? ? ? ? if (pos < 0)
? ? ? ? ? ? ? ? ? ? ? ? pos = 0;
? ? ? ? ? ? ? ? ? ? MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
? ? ? ? ? ? ? ? ? ? //判斷Queue是否可用
? ? ? ? ? ? ? ? ? ? if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
? ? ? ? ? ? ? ? ? ? ? ? if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
? ? ? ? ? ? ? ? ? ? ? ? ? ? return mq;
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
? ? ? ? ? ? ? ? int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
? ? ? ? ? ? ? ? if (writeQueueNums > 0) {
? ? ? ? ? ? ? ? ? ? final MessageQueue mq = tpInfo.selectOneMessageQueue();
? ? ? ? ? ? ? ? ? ? if (notBestBroker != null) {
? ? ? ? ? ? ? ? ? ? ? ? mq.setBrokerName(notBestBroker);
? ? ? ? ? ? ? ? ? ? ? ? mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? ? return mq;
? ? ? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? ? ? latencyFaultTolerance.remove(notBestBroker);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? log.error("Error occurred when selecting message queue", e);
? ? ? ? ? ? }
? ? ? ? ? ? return tpInfo.selectOneMessageQueue();
? ? ? ? }
? ? ? ? //默認(rèn)輪詢
? ? ? ? return tpInfo.selectOneMessageQueue(lastBrokerName);
? ? }
? ? public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
? ? ? ? if (this.sendLatencyFaultEnable) {
? ? ? ? ? ? long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
? ? ? ? ? ? this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
? ? ? ? }
? ? }
? ? private long computeNotAvailableDuration(final long currentLatency) {
? ? ? ? for (int i = latencyMax.length - 1; i >= 0; i--) {
? ? ? ? ? ? if (currentLatency >= latencyMax[i])
? ? ? ? ? ? ? ? return this.notAvailableDuration[i];
? ? ? ? }
? ? ? ? return 0;
? ? }
}在選擇查找路由時(shí),選擇消息隊(duì)列的關(guān)鍵步驟:
- 先按輪詢算法選擇一個消息隊(duì)列
- 從故障列表判斷該消息隊(duì)列是否可用
LatencyFaultToleranceImpl中判斷是否可用:
@Override
public boolean isAvailable(final String name) {
? ? final FaultItem faultItem = this.faultItemTable.get(name);
? ? if (faultItem != null) {
? ? ? ? return faultItem.isAvailable();
? ? }
? ? return true;
}
public boolean isAvailable() {
? ? ? ? ? ? return (System.currentTimeMillis() - startTimestamp) >= 0;
? ? ? ? }- 判斷是否在故障列表中,不在故障列表中代表可用。
- 在故障列表中判斷當(dāng)前時(shí)間是否大于等于故障規(guī)避的開始時(shí)間
startTimestamp
在消息發(fā)送結(jié)束后和發(fā)送出現(xiàn)異常時(shí)調(diào)用updateFaultItem()方法來更新故障列表,computeNotAvailableDuration()根據(jù)響應(yīng)時(shí)間來計(jì)算故障周期時(shí)長,響應(yīng)時(shí)間越長故障周期越長。網(wǎng)絡(luò)異常、Broker異常、客戶端異常都是固定響應(yīng)時(shí)長30s,它們故障周期時(shí)長為10分鐘。消息發(fā)送成功或線程中斷異常響應(yīng)時(shí)間在100毫秒以內(nèi),故障周期時(shí)長為0。
LatencyFaultToleranceImpl類的updateFaultItem方法:
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
? ? FaultItem old = this.faultItemTable.get(name);
? ? if (null == old) {
? ? ? ? final FaultItem faultItem = new FaultItem(name);
? ? ? ? faultItem.setCurrentLatency(currentLatency);
? ? ? ? faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
? ? ? ? //加入故障列表
? ? ? ? old = this.faultItemTable.putIfAbsent(name, faultItem);
? ? ? ? if (old != null) {
? ? ? ? ? ? old.setCurrentLatency(currentLatency);
? ? ? ? ? ? old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
? ? ? ? }
? ? } else {
? ? ? ? old.setCurrentLatency(currentLatency);
? ? ? ? old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
? ? }
}FaultItem存儲Broker名稱、響應(yīng)時(shí)長、故障規(guī)避開始時(shí)間,最重要的是故障規(guī)避開始時(shí)間,用來判斷Queue是否可用
到此這篇關(guān)于RocketMQ設(shè)計(jì)之故障規(guī)避機(jī)制的文章就介紹到這了,更多相關(guān)RocketMQ故障規(guī)避機(jī)制內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)的程序員老黃歷實(shí)例
這篇文章主要介紹了Java實(shí)現(xiàn)的程序員老黃歷實(shí)例,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-05-05
java獲取本地文件的多種方式實(shí)現(xiàn)與比較
這篇文章主要為大家詳細(xì)介紹了java獲取本地文件的多種方式實(shí)現(xiàn)與結(jié)果比較,文中的示例代碼講解詳細(xì),感興趣的小伙伴可以跟隨小編一起學(xué)習(xí)一下2023-11-11
springboot+hutool批量生成二維碼壓縮導(dǎo)出功能
這篇文章主要介紹了springboot+hutool批量生成二維碼壓縮導(dǎo)出功能,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-10-10
Java中資源加載的方法及Spring的ResourceLoader應(yīng)用小結(jié)
在Java開發(fā)中,資源加載是一個基礎(chǔ)而重要的操作,這篇文章主要介紹了深入理解Java中資源加載的方法及Spring的ResourceLoader應(yīng)用,本文通過實(shí)例代碼演示了通過ClassLoader和Class獲取資源的內(nèi)容,以及使用Spring的ResourceLoader加載多個資源的過程,需要的朋友可以參考下2024-01-01
java.math包下計(jì)算浮點(diǎn)數(shù)和整數(shù)的類的實(shí)例
這篇文章主要介紹了java.math包下計(jì)算浮點(diǎn)數(shù)和整數(shù)的類的實(shí)例代碼,本文通過使用BigDecimal進(jìn)行浮點(diǎn)數(shù)比較給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-02-02
30分鐘入門Java8之lambda表達(dá)式學(xué)習(xí)
本篇文章主要介紹了30分鐘入門Java8之lambda表達(dá)式學(xué)習(xí),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-04-04

