Kafka調(diào)試技巧及心得分享
基礎(chǔ)理念
對于我們有a,b,c,3臺機(jī)器,那么我們的消息會(huì)被消費(fèi)3次?
- 可能會(huì),也可能不會(huì),這取決于你的配置和策略。
- 消費(fèi)者組機(jī)制:Kafka 使用消費(fèi)組(Consumer Group)來確保每個(gè)消息只會(huì)被每個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)一次。
- 分區(qū)分配:Kafka 主題可以分為多個(gè)分區(qū)(Partitions),每個(gè)分區(qū)只能由一個(gè)消費(fèi)者組中的一個(gè)消費(fèi)者消費(fèi)。
如果新增1臺機(jī)器,那么他的偏移量從0開始?
在kafka中,消費(fèi)組都會(huì)維護(hù)自己的偏移量(offset),以此來記錄消費(fèi)的消息位置,而當(dāng)新增1臺機(jī)器時(shí),會(huì)根據(jù)分區(qū)分配策略,比如:范圍分配、輪詢分配,這就有2種情況,可能加入舊分區(qū),也可能加入新分區(qū),具體可以配置下策略參數(shù)auto.offset.reset,對應(yīng)的值如下:
- earliest: automatically reset the offset to the earliest offset
- latest: automatically reset the offset to the latest offset
- none: throw exception to the consumer if no previous offset is found for the consumer’s group
- anything else: throw exception to the consumer.
本地調(diào)試
所以,當(dāng)我們在調(diào)試kafka消費(fèi)邏輯的時(shí)候,可能由于消費(fèi)邏輯寫的不對,改完代碼需要重新測,重新去造1條消息?還是你會(huì)怎么去做,了解了前面的原理,我們改消費(fèi)組是不可行的,他獲取的是最新的偏移量,無法實(shí)現(xiàn)復(fù)用之前造的某條數(shù)據(jù),特別是我們有不同邏輯,把每種類型的消息都重新推一波,這不僅麻煩,而且也容易出錯(cuò),是否可以直接復(fù)用之前的消息,準(zhǔn)確處理?
我們可以先了解下@KafkaListener里面的一些配置參數(shù),具體如下:
@KafkaListener(topicPartitions = {@TopicPartition(topic = "yourTopic", partitionOffsets = {@PartitionOffset(partition = "指定分區(qū)", initialOffset = "初始偏移量")})})
public void forlanConsumer(ConsumerRecord<String, String> record) {
String messageStr = record.value();
log.info(“測試觸達(dá)記錄消費(fèi):offset = {}, res = {}”,record.offset(), messageStr);
}
上面只是指定了我們從什么偏移量開始消費(fèi),如果要限制范圍,可以在代碼里面加限制
@KafkaListener(topicPartitions = {@TopicPartition(topic = "yourTopic", partitionOffsets = {@PartitionOffset(partition = "指定分區(qū)", initialOffset = "初始偏移量")})})
public void forlanConsumer(ConsumerRecord<String, String> record) {
if (record.offset() > 結(jié)束偏移量) return;
String messageStr = record.value();
log.info(“測試觸達(dá)記錄消費(fèi):offset = {}, res = {}”,record.offset(), messageStr);
}
測試或正式環(huán)境調(diào)試
上面只適合本地場景,如果是線上環(huán)境,我們本地一般是沒有權(quán)限連接監(jiān)聽的,那么可以怎么做?其實(shí)也能做,只不過需要通過接口去拉取處理
@Autowired
private KafkaConfig kafkaConfig;
public void reconsumeMessage(String topic, int partition, long offset) {
ConsumerFactory<Integer, String> consumerFactory = kafkaConfig.consumerFactory();
Map<String, Object> configurationProperties = consumerFactory.getConfigurationProperties();
Map<String, Object> customProps = new HashMap<>();
customProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reconsume-temp-group-" + UUID.randomUUID());
customProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");// 防止 offset 不存在時(shí)報(bào)錯(cuò)
customProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
customProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");// 控制每次 poll 只拉取一條
// 復(fù)用 kafkaConfig 中的基礎(chǔ)配置
Map<String, Object> props = new HashMap<>(configurationProperties);
props.putAll(customProps);
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
TopicPartition topicPartition = new TopicPartition(topic, partition);
// 分配分區(qū)并定位偏移量
consumer.assign(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, offset);
// 拉取消息(設(shè)置超時(shí)時(shí)間)
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(5));
for (ConsumerRecord<String, String> record : records) {
if (record.offset() == offset) {
if (Objects.equals(topic, KafkaTopic.Forlan_MESSAGE_NOTIFY)) {
// 調(diào)用@KafkaListener的方法執(zhí)行邏輯
forlanConsumer.userConsumer(record);
}
break;
}
}
}
}
項(xiàng)目中如果沒有配置kafkaConfig,也可以自定義一個(gè),只要能拿到連接就行
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap();
props.put("bootstrap.servers", this.bootstrapServers);
props.put("group.id", this.groupid);
props.put("enable.auto.commit", this.autoCommit);
props.put("auto.commit.interval.ms", this.interval);
props.put("session.timeout.ms", this.timeout);
props.put("key.deserializer", this.keyDeserializer);
props.put("value.deserializer", this.valueDeserializer);
props.put("auto.offset.reset", this.offsetReset);
props.put("max.poll.records", this.maxPollRecords);
return props;
}
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
RabbitMQ交換機(jī)與Springboot整合的簡單實(shí)現(xiàn)
這篇文章主要介紹了RabbitMQ交換機(jī)與Springboot整合的簡單實(shí)現(xiàn),本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-07-07
SpringBoot整合Sharding-JDBC實(shí)現(xiàn)MySQL8讀寫分離
本文是一個(gè)基于SpringBoot整合Sharding-JDBC實(shí)現(xiàn)讀寫分離的極簡教程,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的可以了解一下2021-07-07
教你Java中的Lock鎖底層AQS到底是如何實(shí)現(xiàn)的
本文是基于ReentrantLock來講解,ReentrantLock加鎖只是對AQS的api的調(diào)用,底層的鎖的狀態(tài)(state)和其他線程等待(Node雙向鏈表)的過程其實(shí)是由AQS來維護(hù)的,對Java?Lock鎖AQS實(shí)現(xiàn)過程感興趣的朋友一起看看吧2022-05-05
Java實(shí)戰(zhàn)之用Swing實(shí)現(xiàn)通訊錄管理系統(tǒng)
今天給大家?guī)淼氖荍ava實(shí)戰(zhàn)的相關(guān)知識,文章圍繞著Swing實(shí)現(xiàn)通訊錄管理系統(tǒng)展開,文中有非常詳細(xì)的代碼示例,需要的朋友可以參考下2021-06-06
m1 Mac設(shè)置多jdk版本并動(dòng)態(tài)切換的實(shí)現(xiàn)
本文主要介紹 Mac 下如何安裝 JDK 并且多版本如何切換,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
淺析Java如何實(shí)現(xiàn)動(dòng)態(tài)線程池的任務(wù)編排
動(dòng)態(tài)線程池是在程序運(yùn)行期間,動(dòng)態(tài)調(diào)整線程池參數(shù)而無需重啟程序的技術(shù),那么如何在動(dòng)態(tài)線程池中進(jìn)行任務(wù)編排呢,下面小編就來和大家詳細(xì)介紹一下吧2025-09-09

