SpringCloudStream中的消息分區(qū)數(shù)詳解
一、前言
本文僅針對 Kafka 來聊消息分區(qū)數(shù)相關(guān)的話題。
SpringCloudStream 中的消息分區(qū)數(shù)如何配置?
或者說消息分區(qū)數(shù)會受到哪些配置的影響。
- SpringCloudStream:Greenwich.SR2
- Kafka:kafka_2.12-2.3.0
二、影響因素
2.1 Kafka服務(wù)端
首先應(yīng)該想到的,Kafka 配置文件 server.properties 中默認(rèn)每一個 topic 的分區(qū)數(shù) num.partitions=1
# The default number of log partitions per topic. More partitions allow greater num.partitions=1
2.2 生產(chǎn)者端
從SpringCloudStream的配置中可以看到,生產(chǎn)者可以指定分區(qū)數(shù),默認(rèn)1:
spring.cloud.stream.bindings.<channelName>.partitionCount.producer=n
【說明】:當(dāng)分區(qū)功能開啟時,使用該參數(shù)來配置消息數(shù)據(jù)的分區(qū)數(shù)。
如果消息生產(chǎn)者已經(jīng)配置了分區(qū)鍵的生成策略,那么它的值必須大于1。
2.3 消費者端
SpringCloudStream 允許通過配置,使得消費者能夠自動創(chuàng)建分區(qū)。
#輸入通道消費者的并發(fā)數(shù),默認(rèn)1 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2
若想以上配置生效,還需添加如下通用配置:
#Kafka綁定器允許在需要的時候自動創(chuàng)建分區(qū)。默認(rèn)false spring.cloud.stream.kafka.binder.autoAddPartitions=true
消費者端如此配置以后,將表現(xiàn)為一個消費者服務(wù)或進程中,會有2個線程各自消費1個分區(qū),即2個消費者線程同時消費。
以下是該配置的效果驗證步驟:
消費者代碼:
1個 @StreamListener 消費自己的 topic 或自己的輸出channel:
@EnableBinding(SpiderSink.class)
@Slf4j
public class SpiderSinkReceiver {
@Autowired
private SpiderMessageService spiderMessageService;
@StreamListener(SpiderSink.INPUT)
public void receive(Object payload) {
log.info("SPIDER-SINK received: {}", payload);
}
}方式一:通過日志驗證:
通過在 log4j 日志中,打印線程名稱的方式,驗證 spring.cloud.stream.bindings.<channelName>.consumer.concurrency 的配置確確實實會新增1個消費者線程。
[INFO ] 2020-05-09 01:19:34,700 [thread: [Ljava.lang.String;@5b40de43.container-1-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50) [INFO ] 2020-05-09 01:19:35,888 [thread: [Ljava.lang.String;@5b40de43.container-0-C-1] com.cjia.spidersink.sink.SpiderSinkReceiver.receive(SpiderSinkReceiver.java:50)
方式二:直接查看分區(qū)數(shù)來驗證:
另外,也可在啟動一個生產(chǎn)者服務(wù)時,等待自動創(chuàng)建一個新 topic 后(此時默認(rèn)分區(qū)數(shù)為1),比如我們創(chuàng)建的 topic 為“topic-spider-dev”,此時通過kafka命令查看分區(qū)數(shù),此時分區(qū)數(shù)為1:
[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev Topic:topic-spider-dev ?PartitionCount:1 ? ? ? ?ReplicationFactor:1 ? ? Configs: ? ? ? ? Topic: topic-spider-dev Partition: 0 ? ?Leader: 1 ? ? ? Replicas: 1 ? ? Isr: 1
然后,配置消費者服務(wù)的 spring.cloud.stream.bindings.<channelName>.consumer.concurrency=2,啟動一個消費者服務(wù),再次查看分區(qū)數(shù),已經(jīng)變?yōu)?了:
[root@bi-zhaopeng01 kafka]# ./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic-spider-dev Topic:topic-spider-dev ?PartitionCount:2 ? ? ? ?ReplicationFactor:1 ? ? Configs: ? ? ? ? Topic: topic-spider-dev Partition: 0 ? ?Leader: 1 ? ? ? Replicas: 1 ? ? Isr: 1 ? ? ? ? Topic: topic-spider-dev Partition: 1 ? ?Leader: 2 ? ? ? Replicas: 2 ? ? Isr: 2
同時查看消費者端的應(yīng)用日志,看到2個消費者線程各自分配了一個分區(qū):
[INFO ] 2020-05-12 17:22:43,940 [thread: [Ljava.lang.String;@299dd381.container-0-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363) partitions assigned: [topic-spider-dev-0] [INFO ] 2020-05-12 17:22:44,004 [thread: [Ljava.lang.String;@299dd381.container-1-C-1] org.springframework.kafka.listener.AbstractMessageListenerContainer$1.onPartitionsAssigned(AbstractMessageListenerContainer.java:363) partitions assigned: [topic-spider-dev-1]
最終,確確實實地驗證了 concurrency 配置對消費者線程數(shù)和分區(qū)數(shù)的影響。
2.4 其他因素
比如,SpringCloudStream 中 Kafka 綁定器的配置中,也有一個相關(guān)的影響因素:
#最小分區(qū)數(shù),默認(rèn)1 spring.cloud.stream.kafka.binder.minPartitionCount=n
【說明】:該參數(shù)僅在設(shè)置了 autoCreateTopics 和 autoAddPartitions 時生效,用來設(shè)置該綁定器所使用主題的全局分區(qū)最小數(shù)量。
如果當(dāng)生產(chǎn)者的 partitionCount 參數(shù)或 instanceCount * concurrency 設(shè)置大于該參數(shù)配置時,該參數(shù)值將被覆蓋。
總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
- SpringCloudStream原理和深入使用小結(jié)
- SpringCloud中的Stream服務(wù)間消息傳遞詳解
- 使用Spring?Cloud?Stream處理事件的示例詳解
- spring-cloud-stream的手動消息確認(rèn)問題
- 關(guān)于SpringCloudStream配置問題
- Spring Cloud Stream 高級特性使用詳解
- SpringCloud微服務(wù)開發(fā)基于RocketMQ實現(xiàn)分布式事務(wù)管理詳解
- SpringCloud+RocketMQ實現(xiàn)分布式事務(wù)的實踐
- Spring Cloud Stream整合RocketMQ的搭建方法
相關(guān)文章
解決jackson反序列化失敗InvalidFormatException:Can not dese
這篇文章主要介紹了解決jackson反序列化失敗InvalidFormatException:Can not deserialize value of type java.util.Date問題,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12
String與XML互轉(zhuǎn)以及從XML取節(jié)點值并修改的方法
今天小編就為大家分享一篇String與XML互轉(zhuǎn)以及從XML取節(jié)點值并修改的方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-07-07
springboot結(jié)合全局異常處理實現(xiàn)登錄注冊驗證
這篇文章主要介紹了springboot結(jié)合全局異常處理實現(xiàn)登錄注冊驗證,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-05-05
mybatis框架之mybatis中dao層開發(fā)的兩種方法
這篇文章主要介紹了mybatis框架之mybatis中dao層開發(fā)的兩種方法,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-07-07
Java中的反射,枚舉及l(fā)ambda表達(dá)式的使用詳解
這篇文章主要為大家詳細(xì)介紹了Java的反射,枚舉及l(fā)ambda表達(dá)式,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下,希望能夠給你帶來幫助2022-03-03

