RocketMq深入分析講解兩種削峰方式
何時(shí)需要削峰
當(dāng)上游調(diào)用下游服務(wù)速率高于下游服務(wù)接口QPS時(shí),那么如果不對(duì)調(diào)用速率進(jìn)行控制,那么會(huì)發(fā)生很多失敗請(qǐng)求
通過(guò)消息隊(duì)列的削峰方法有兩種
控制消費(fèi)者消費(fèi)速率和生產(chǎn)者投放延時(shí)消息,本質(zhì)都是控制消費(fèi)速度
通過(guò)消費(fèi)者參數(shù)控制消費(fèi)速度
先分析那些參數(shù)對(duì)控制消費(fèi)速度有作用
1.PullInterval: 設(shè)置消費(fèi)端,拉取mq消息的間隔時(shí)間。
注意:該時(shí)間算起時(shí)間是rocketMq消費(fèi)者從broker消息后算起。經(jīng)過(guò)PullInterval再次向broker拉去消息
源碼分析:
首先需要了解rocketMq的消息拉去過(guò)程
拉去消息的類(lèi)
PullMessageService
public class PullMessageService extends ServiceThread {
private final InternalLogger log = ClientLogger.getLog();
private final LinkedBlockingQueue<PullRequest> pullRequestQueue = new LinkedBlockingQueue<PullRequest>();
private final MQClientInstance mQClientFactory;
private final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "PullMessageServiceScheduledThread");
}
});
public PullMessageService(MQClientInstance mQClientFactory) {
this.mQClientFactory = mQClientFactory;
}
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
PullMessageService.this.executePullRequestImmediately(pullRequest);
}
}, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public void executePullRequestImmediately(final PullRequest pullRequest) {
try {
this.pullRequestQueue.put(pullRequest);
} catch (InterruptedException e) {
log.error("executePullRequestImmediately pullRequestQueue.put", e);
}
}
public void executeTaskLater(final Runnable r, final long timeDelay) {
if (!isStopped()) {
this.scheduledExecutorService.schedule(r, timeDelay, TimeUnit.MILLISECONDS);
} else {
log.warn("PullMessageServiceScheduledThread has shutdown");
}
}
public ScheduledExecutorService getScheduledExecutorService() {
return scheduledExecutorService;
}
private void pullMessage(final PullRequest pullRequest) {
final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
if (consumer != null) {
DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
} else {
log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
}
}
@Override
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped()) {
try {
PullRequest pullRequest = this.pullRequestQueue.take();
this.pullMessage(pullRequest);
} catch (InterruptedException ignored) {
} catch (Exception e) {
log.error("Pull Message Service Run Method exception", e);
}
}
log.info(this.getServiceName() + " service end");
}
@Override
public void shutdown(boolean interrupt) {
super.shutdown(interrupt);
ThreadUtils.shutdownGracefully(this.scheduledExecutorService, 1000, TimeUnit.MILLISECONDS);
}
@Override
public String getServiceName() {
return PullMessageService.class.getSimpleName();
}
}繼承自ServiceThread,這是一個(gè)單線程執(zhí)行的service,不斷獲取阻塞隊(duì)列中的pullRequest,進(jìn)行消息拉取。
executePullRequestLater會(huì)延時(shí)將pullrequest放入到pullRequestQueue,達(dá)到延時(shí)拉去的目的。
那么PullInterval參數(shù)就是根據(jù)這個(gè)功能發(fā)揮的作用,在消費(fèi)者拉去消息成功的回調(diào)
PullCallback pullCallback = new PullCallback() {
@Override
public void onSuccess(PullResult pullResult) {
if (pullResult != null) {
pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
subscriptionData);
switch (pullResult.getPullStatus()) {
case FOUND:
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullRT);
long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
}
if (pullResult.getNextBeginOffset() < prevRequestOffset
|| firstMsgOffset < prevRequestOffset) {
log.warn(
"[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}",
pullResult.getNextBeginOffset(),
firstMsgOffset,
prevRequestOffset);
}
break;
case NO_NEW_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case NO_MATCHED_MSG:
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
break;
case OFFSET_ILLEGAL:
log.warn("the pull request offset illegal, {} {}",
pullRequest.toString(), pullResult.toString());
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
pullRequest.getProcessQueue().setDropped(true);
DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {
@Override
public void run() {
try {
DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
pullRequest.getNextOffset(), false);
DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());
DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());
log.warn("fix the pull request offset, {}", pullRequest);
} catch (Throwable e) {
log.error("executeTaskLater Exception", e);
}
}
}, 10000);
break;
default:
break;
}
}
}
@Override
public void onException(Throwable e) {
if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
log.warn("execute the pull request exception", e);
}
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
}
};在 case found的情況下,也就是拉取到消息的q情況,在PullInterval>0的情況下,會(huì)延時(shí)投遞到pullRequestQueue中,實(shí)現(xiàn)拉取消息的間隔
if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}
2.PullBatchSize: 設(shè)置每次pull消息的數(shù)量,該參數(shù)設(shè)置是針對(duì)邏輯消息隊(duì)列,并不是每次pull消息拉到的總消息數(shù)
消費(fèi)端分配了兩個(gè)消費(fèi)隊(duì)列來(lái)監(jiān)聽(tīng)。那么PullBatchSize 設(shè)置為32,那么該消費(fèi)端每次pull到 64個(gè)消息。
消費(fèi)端每次pull到消息總數(shù)=PullBatchSize*監(jiān)聽(tīng)隊(duì)列數(shù)
源碼分析
消費(fèi)者拉取消息時(shí)
org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage中
會(huì)執(zhí)行
this.pullAPIWrapper.pullKernelImpl(
pullRequest.getMessageQueue(),
subExpression,
subscriptionData.getExpressionType(),
subscriptionData.getSubVersion(),
pullRequest.getNextOffset(),
this.defaultMQPushConsumer.getPullBatchSize(),
sysFlag,
commitOffsetValue,
BROKER_SUSPEND_MAX_TIME_MILLIS,
CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
CommunicationMode.ASYNC,
pullCallback
);
其中 this.defaultMQPushConsumer.getPullBatchSize(),就是配置的PullBatchSize,代表的是每次從broker的一個(gè)隊(duì)列上拉取的最大消息數(shù)。
3.ThreadMin和ThreadMax: 消費(fèi)端消費(fèi)pull到的消息需要的線程數(shù)量。
源碼分析:
還是在消費(fèi)者拉取消息成功時(shí)
boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
pullResult.getMsgFoundList(),
processQueue,
pullRequest.getMessageQueue(),
dispatchToConsume);
通過(guò)consumeMessageService執(zhí)行
默認(rèn)情況下是并發(fā)消費(fèi)
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService#submitConsumeRequest
@Override
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispatchToConsume) {
final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
if (msgs.size() <= consumeBatchSize) {
ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
this.submitConsumeRequestLater(consumeRequest);
}
} else {
for (int total = 0; total < msgs.size(); ) {
List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
for (int i = 0; i < consumeBatchSize; i++, total++) {
if (total < msgs.size()) {
msgThis.add(msgs.get(total));
} else {
break;
}
}
ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
try {
this.consumeExecutor.submit(consumeRequest);
} catch (RejectedExecutionException e) {
for (; total < msgs.size(); total++) {
msgThis.add(msgs.get(total));
}
this.submitConsumeRequestLater(consumeRequest);
}
}
}
}
其中consumeExecutor初始化
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(),
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
對(duì)象線程池最大和核心線程數(shù)。對(duì)于順序消費(fèi)ConsumeMessageOrderlyService也會(huì)使用最大和最小線程數(shù)這兩個(gè)參數(shù),只是消費(fèi)時(shí)會(huì)鎖定隊(duì)列。
以上三種情況:是針對(duì)參數(shù)配置,來(lái)調(diào)整消費(fèi)速度。
除了這三種情況外還有兩種服務(wù)部署情況,可以調(diào)整消費(fèi)速度:
4.rocketMq 邏輯消費(fèi)隊(duì)列配置數(shù)量 有消費(fèi)端每次pull到消息總數(shù)=PullBatchSize*監(jiān)聽(tīng)隊(duì)列數(shù)
可知rocketMq 邏輯消費(fèi)隊(duì)列配置數(shù)量即上圖中的 queue1 ,queue2,配置數(shù)量越多每次pull到的消息總數(shù)也就越多。如果下邊配置讀隊(duì)列數(shù)量:修改tocpic的邏輯隊(duì)列數(shù)量
5.消費(fèi)端節(jié)點(diǎn)部署數(shù)量 :
部署數(shù)量無(wú)論一個(gè)節(jié)點(diǎn)監(jiān)聽(tīng)所有隊(duì)列,還是多個(gè)節(jié)點(diǎn)按照分配策略分配監(jiān)聽(tīng)隊(duì)列數(shù)量,理論上每秒pull到的數(shù)量都一樣的,但是多節(jié)點(diǎn)消費(fèi)端消費(fèi)線程數(shù)量要比單節(jié)點(diǎn)消費(fèi)線程數(shù)量多,也就是多節(jié)點(diǎn)消費(fèi)速度大于單節(jié)點(diǎn)。
消費(fèi)延時(shí)控流
針對(duì)消息訂閱者的消費(fèi)延時(shí)流控的基本原理是,每次消費(fèi)時(shí)在客戶(hù)端增加一個(gè)延時(shí)來(lái)控制消費(fèi)速度,此時(shí)理論上消費(fèi)并發(fā)最快速度為:
單節(jié)點(diǎn)部署:
ConsumInterval :延時(shí)時(shí)間單位毫秒
ConcurrentThreadNumber:消費(fèi)端線程數(shù)量
MaxRate :理論每秒處理數(shù)量
MaxRate = 1 / ConsumInterval * ConcurrentThreadNumber
如果消息并發(fā)消費(fèi)線程(ConcurrentThreadNumber)為 20,延時(shí)(ConsumInterval)為 100 ms,代入上述公式可得
如果消息并發(fā)消費(fèi)線程(ConcurrentThreadNumber)為 20,延時(shí)(ConsumInterval)為 100 ms,代入上述公式可得
200 = 1 / 0.1 * 20
由上可知,理論上可以將并發(fā)消費(fèi)控制在 200 以下
如果是多個(gè)節(jié)點(diǎn)部署如兩個(gè)節(jié)點(diǎn),理論消費(fèi)速度最高為每秒處理400個(gè)消息。
如下延時(shí)流控代碼:
/**
* 測(cè)試mq 并發(fā) 接受
*/
@Component
@RocketMQMessageListener(topic = ConstantTopic.WRITING_LIKE_TOPIC,selectorExpression = ConstantTopic.WRITING_LIKE_ADD_TAG, consumerGroup = "writing_like_topic_add_group")
class ConsumerLikeSave implements RocketMQListener<LikeWritingParams>, RocketMQPushConsumerLifecycleListener{
@SneakyThrows
@Override
public void onMessage(LikeWritingParams params) {
System.out.println("睡上0.1秒");
Thread.sleep(100);
long begin = System.currentTimeMillis();
System.out.println("mq消費(fèi)速度"+Thread.currentThread().getName()+" "+DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").format(LocalDateTime.now()));
//writingLikeService.saveLike2Db(params.getUserId(),params.getWritingId());
long end = System.currentTimeMillis();
// System.out.println("消費(fèi):: " +Thread.currentThread().getName()+ "毫秒:"+(end - begin));
}
@Override
public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
defaultMQPushConsumer.setConsumeThreadMin(20); //消費(fèi)端拉去到消息以后分配線索去消費(fèi)
defaultMQPushConsumer.setConsumeThreadMax(50);//最大消費(fèi)線程,一般情況下,默認(rèn)隊(duì)列沒(méi)有塞滿(mǎn),是不會(huì)啟用新的線程的
defaultMQPushConsumer.setPullInterval(0);//消費(fèi)端多久一次去rocketMq 拉去消息
defaultMQPushConsumer.setPullBatchSize(32); //消費(fèi)端每個(gè)隊(duì)列一次拉去多少個(gè)消息,若該消費(fèi)端分賠了N個(gè)監(jiān)控隊(duì)列,那么消費(fèi)端每次去rocketMq拉去消息說(shuō)為N*1
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
defaultMQPushConsumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
defaultMQPushConsumer.setConsumeMessageBatchMaxSize(2);
}
}注釋?zhuān)喝缟舷M(fèi)端,單節(jié)點(diǎn)每秒處理速度也就是最高200個(gè)消息,實(shí)際上要小于200,業(yè)務(wù)代碼執(zhí)行也是需要時(shí)間。
但是要注意實(shí)際操作中并發(fā)流控實(shí)際是默認(rèn)存在的,
spring boot 消費(fèi)端默認(rèn)配置
this.consumeThreadMin = 20;
this.consumeThreadMax = 20;
this.pullInterval = 0L;
this.pullBatchSize = 32;
若業(yè)務(wù)邏輯執(zhí)行需要20ms,那么單節(jié)點(diǎn)處理速度就是:1/0.02*20=1000
這里默認(rèn)拉去的速度1s內(nèi)遠(yuǎn)大于1000
注意: 這里雖然pullInterval 等于0 當(dāng)時(shí)受限于每次拉去64個(gè),處理完也是需要一端時(shí)間才能回復(fù)ack,才能再次拉取,所以消費(fèi)速度應(yīng)該小于1000
所以并發(fā)流控要消費(fèi)速度大于消費(fèi)延時(shí)流控 ,那么消費(fèi)延時(shí)流控才有意義
使用rokcetMq支持的延時(shí)消息也可以實(shí)現(xiàn)消息的延時(shí)消費(fèi),通過(guò)對(duì)delayLevel對(duì)應(yīng)的時(shí)間進(jìn)行配置為我們的需求。為不同的消息設(shè)置不同delayLevel,達(dá)到延時(shí)消費(fèi)的目的。
總結(jié)
rocketMq 肖鋒流控兩種方式:
并發(fā)流控:就是根據(jù)業(yè)務(wù)流控速率要求,來(lái)調(diào)整topic 消費(fèi)隊(duì)列數(shù)量(read queue),消費(fèi)端部署節(jié)點(diǎn),消費(fèi)端拉去間隔時(shí)間,消費(fèi)端消費(fèi)線程數(shù)量等,來(lái)達(dá)到要求的速率內(nèi)
延時(shí)消費(fèi)流控:就是在消費(fèi)端延時(shí)消費(fèi)消息(sleep),具體延時(shí)多少要根據(jù)業(yè)務(wù)要求速率,和消費(fèi)端線程數(shù)量,和節(jié)點(diǎn)部署數(shù)量來(lái)控制
到此這篇關(guān)于RocketMq深入分析講解兩種削峰方式的文章就介紹到這了,更多相關(guān)RocketMq削峰內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java使用Hutool實(shí)現(xiàn)AES、DES加密解密的方法
本篇文章主要介紹了Java使用Hutool實(shí)現(xiàn)AES、DES加密解密的方法,具有一定的參考價(jià)值,有興趣的可以了解一下2017-08-08
SpringCloud OpenFeign超詳細(xì)講解模板化遠(yuǎn)程通信的實(shí)現(xiàn)
這篇文章主要介紹了SpringCloudSpringboot集成OpenFeign實(shí)現(xiàn)模板化遠(yuǎn)程通信,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2022-07-07
重寫(xiě)equals的同時(shí)為何要重寫(xiě)hashCode?
這篇文章主要給大家介紹了關(guān)于重寫(xiě)equals的同時(shí)為何要重寫(xiě)hashCode的相關(guān)資料,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01
基于java ssm springboot實(shí)現(xiàn)選課推薦交流平臺(tái)系統(tǒng)
這篇文章主要介紹了選課推薦交流平臺(tái)系統(tǒng)是基于java ssm springboot來(lái)的實(shí)現(xiàn)的,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-08-08
SpringBoot修改內(nèi)置tomcat版本的操作步驟
生產(chǎn)環(huán)境使用的外部部署Tomcat還是內(nèi)置Tomcat由于版本安全漏洞,往往需要升級(jí)到指定的安全版本,本文演示一下SpringBoot升級(jí)內(nèi)置的Tomcat版本,感興趣的小伙伴跟著小編一起來(lái)看看吧2024-07-07
springboot?+rabbitmq+redis實(shí)現(xiàn)秒殺示例
本文主要介紹了springboot?+rabbitmq+redis實(shí)現(xiàn)秒殺示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07
使用JPA+querydsl如何實(shí)現(xiàn)多條件動(dòng)態(tài)查詢(xún)
這篇文章主要介紹了使用JPA+querydsl如何實(shí)現(xiàn)多條件動(dòng)態(tài)查詢(xún),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-03-03
SpringBoot自動(dòng)裝配Import示例詳解
SpringBoot中@Import注解的使用可以幫助開(kāi)發(fā)者將指定的Bean或配置類(lèi)導(dǎo)入到IOC容器中,該注解支持四種用法:導(dǎo)入Bean、導(dǎo)入配置類(lèi)、實(shí)現(xiàn)ImportSelector接口和實(shí)現(xiàn),感興趣的朋友一起看看吧2024-09-09

