解決kafka消息堆積及分區(qū)不均勻的問(wèn)題
kafka消息堆積及分區(qū)不均勻的解決
我在環(huán)境中發(fā)現(xiàn)代碼里面的kafka有所延遲,查看kafka消息發(fā)現(xiàn)堆積嚴(yán)重,經(jīng)過(guò)檢查發(fā)現(xiàn)是kafka消息分區(qū)不均勻造成的,消費(fèi)速度過(guò)慢。這里由自己在虛擬機(jī)上演示相關(guān)問(wèn)題,給大家提供相應(yīng)問(wèn)題的參考思路。
這篇文章有點(diǎn)遺憾并沒(méi)重現(xiàn)分區(qū)不均衡的樣例和Warning: Consumer group ‘testGroup1' is rebalancing. 這里僅將正確的方式展示,等后續(xù)重現(xiàn)了在進(jìn)行補(bǔ)充。
主要有兩個(gè)要點(diǎn):
- 1、一個(gè)消費(fèi)者組只消費(fèi)一個(gè)topic.
- 2、factory.setConcurrency(concurrency);這里設(shè)置監(jiān)聽(tīng)并發(fā)數(shù)為 部署單元節(jié)點(diǎn)*concurrency=分區(qū)數(shù)量
1、先在kafka消息中創(chuàng)建
對(duì)應(yīng)分區(qū)數(shù)目的topic(testTopic2,testTopic3)testTopic1由代碼創(chuàng)建
./kafka-topics.sh --create --zookeeper 192.168.25.128:2181 --replication-factor 1 --partitions 2 --topic testTopic2
2、添加配置文件application.properties
kafka.test.topic1=testTopic1 kafka.test.topic2=testTopic2 kafka.test.topic3=testTopic3 kafka.broker=192.168.25.128:9092 auto.commit.interval.time=60000 #kafka.test.group=customer-test kafka.test.group1=testGroup1 kafka.test.group2=testGroup2 kafka.test.group3=testGroup3 kafka.offset=earliest kafka.auto.commit=false session.timeout.time=10000 kafka.concurrency=2
3、創(chuàng)建kafka工廠
package com.yin.customer.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @author yin
* @Date 2019/11/24 15:54
* @Method
*/
@Configuration
@Component
public class KafkaConfig {
@Value("${kafka.broker}")
private String broker;
@Value("${kafka.auto.commit}")
private String autoCommit;
// @Value("${kafka.test.group}")
//private String testGroup;
@Value("${session.timeout.time}")
private String sessionOutTime;
@Value("${auto.commit.interval.time}")
private String autoCommitTime;
@Value("${kafka.offset}")
private String offset;
@Value("${kafka.concurrency}")
private Integer concurrency;
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//監(jiān)聽(tīng)設(shè)置兩個(gè)個(gè)分區(qū)
factory.setConcurrency(concurrency);
//打開(kāi)批量拉取數(shù)據(jù)
factory.setBatchListener(true);
//這里設(shè)置的是心跳時(shí)間也是拉的時(shí)間,也就說(shuō)每間隔max.poll.interval.ms我們就調(diào)用一次poll,kafka默認(rèn)是300s,心跳只能在poll的時(shí)候發(fā)出,如果連續(xù)兩次poll的時(shí)候超過(guò)
//max.poll.interval.ms 值就會(huì)導(dǎo)致rebalance
//心跳導(dǎo)致GroupCoordinator以為本地consumer節(jié)點(diǎn)掛掉了,引發(fā)了partition在consumerGroup里的rebalance。
// 當(dāng)rebalance后,之前該consumer擁有的分區(qū)和offset信息就失效了,同時(shí)導(dǎo)致不斷的報(bào)auto offset commit failed。
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
private ConsumerFactory<String,String> consumerFactory() {
return new DefaultKafkaConsumerFactory<String, String>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
//kafka的地址
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
//是否自動(dòng)提交 Offset
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
// enable.auto.commit 設(shè)置成 false,那么 auto.commit.interval.ms 也就不被再考慮
//默認(rèn)5秒鐘,一個(gè) Consumer 將會(huì)提交它的 Offset 給 Kafka
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000);
//這個(gè)值必須設(shè)置在broker configuration中的group.min.session.timeout.ms 與 group.max.session.timeout.ms之間。
//zookeeper.session.timeout.ms 默認(rèn)值:6000
//ZooKeeper的session的超時(shí)時(shí)間,如果在這段時(shí)間內(nèi)沒(méi)有收到ZK的心跳,則會(huì)被認(rèn)為該Kafka server掛掉了。
// 如果把這個(gè)值設(shè)置得過(guò)低可能被誤認(rèn)為掛掉,如果設(shè)置得過(guò)高,如果真的掛了,則需要很長(zhǎng)時(shí)間才能被server得知。
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionOutTime);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
//組與組間的消費(fèi)者是沒(méi)有關(guān)系的。
//topic中已有分組消費(fèi)數(shù)據(jù),新建其他分組ID的消費(fèi)者時(shí),之前分組提交的offset對(duì)新建的分組消費(fèi)不起作用。
//propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, testGroup);
//當(dāng)創(chuàng)建一個(gè)新分組的消費(fèi)者時(shí),auto.offset.reset值為latest時(shí),
// 表示消費(fèi)新的數(shù)據(jù)(從consumer創(chuàng)建開(kāi)始,后生產(chǎn)的數(shù)據(jù)),之前產(chǎn)生的數(shù)據(jù)不消費(fèi)。
// https://blog.csdn.net/u012129558/article/details/80427016
//earliest 當(dāng)分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從頭開(kāi)始消費(fèi)。
// latest 當(dāng)分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)。
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, offset);
//不是指每次都拉50條數(shù)據(jù),而是一次最多拉50條數(shù)據(jù)()
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5);
return propsMap;
}
}
4、展示kafka消費(fèi)者
@Component
public class KafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
@KafkaListener(topics = "${kafka.test.topic1}",groupId = "${kafka.test.group1}",containerFactory = "kafkaListenerContainerFactory")
public void listenPartition1(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {
logger.info("testTopic1 recevice a message size :{}" , records.size());
try {
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
logger.info("received:{} " , record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
Thread.sleep(300);
logger.info("p1 topic is:{} received message={}",topic, message);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
@KafkaListener(topics = "${kafka.test.topic2}",groupId = "${kafka.test.group2}",containerFactory = "kafkaListenerContainerFactory")
public void listenPartition2(List<ConsumerRecord<?, ?>> records,Acknowledgment ack) {
logger.info("testTopic2 recevice a message size :{}" , records.size());
try {
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
logger.info("received:{} " , record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
Thread.sleep(300);
logger.info("p2 topic :{},received message={}",topic, message);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
@KafkaListener(topics = "${kafka.test.topic3}",groupId = "${kafka.test.group3}",containerFactory = "kafkaListenerContainerFactory")
public void listenPartition3(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
logger.info("testTopic3 recevice a message size :{}" , records.size());
try {
for (ConsumerRecord<?, ?> record : records) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
logger.info("received:{} " , record);
if (kafkaMessage.isPresent()) {
Object message = record.value();
String topic = record.topic();
logger.info("p3 topic :{},received message={}",topic, message);
Thread.sleep(300);
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
ack.acknowledge();
}
}
}
查看分區(qū)消費(fèi)情況:

kafka出現(xiàn)若干分區(qū)不消費(fèi)的現(xiàn)象
近日,有用戶反饋kafka有topic出現(xiàn)某個(gè)消費(fèi)組消費(fèi)的時(shí)候,有幾個(gè)分區(qū)一直不消費(fèi)消息,消息一直積壓(圖1)。除了一直積壓外,還有一個(gè)現(xiàn)象就是消費(fèi)組一直在重均衡,大約每5分鐘就會(huì)重均衡一次。具體表現(xiàn)為消費(fèi)分區(qū)的owner一直在改變(圖2)。

(圖1)

(圖2)
定位過(guò)程
業(yè)務(wù)側(cè)沒(méi)有報(bào)錯(cuò),同時(shí)kafka服務(wù)端日志也一切正常,同事先將消費(fèi)組的機(jī)器滾動(dòng)重啟,仍然還是那幾個(gè)分區(qū)沒(méi)有消費(fèi),之后將這幾個(gè)不消費(fèi)的分區(qū)遷移至別的broker上,依然沒(méi)有消費(fèi)。
還有一個(gè)奇怪的地方,就是每次重均衡后,不消費(fèi)的那幾個(gè)分區(qū)的消費(fèi)owner所在機(jī)器的網(wǎng)絡(luò)都有流量變化。按理說(shuō)不消費(fèi)應(yīng)該就是拉取不到分區(qū)不會(huì)有流量的。于是讓運(yùn)維去拉了下不消費(fèi)的consumer的jstack日志。一看果然發(fā)現(xiàn)了問(wèn)題所在。

從堆???,consumer已經(jīng)拉取到消息,然后就一直卡在處理消息的業(yè)務(wù)邏輯上。這說(shuō)明kafka是沒(méi)有問(wèn)題的,用戶的業(yè)務(wù)邏輯有問(wèn)題。
consumer在拉取完一批消息后,就一直在處理這批消息,但是這批消息中有若干條消息無(wú)法處理,而業(yè)務(wù)又沒(méi)有超時(shí)操作或者異常處理導(dǎo)致進(jìn)程一直處于消費(fèi)中,無(wú)法去poll下一批數(shù)據(jù)。
又由于業(yè)務(wù)采用的是autocommit的offset提交方式,而根據(jù)源碼可知,consumer只有在下一次poll中才會(huì)自動(dòng)提交上次poll的offset,所以業(yè)務(wù)一直在拉取同一批消息而無(wú)法更新offset。反映的現(xiàn)象就是該consumer對(duì)應(yīng)的分區(qū)的offset一直沒(méi)有變,所以有積壓的現(xiàn)象。
至于為什么會(huì)一直在重均衡消費(fèi)組的原因也很明了了,就是因?yàn)橛邢M(fèi)者一直卡在處理消息的業(yè)務(wù)邏輯上,超過(guò)了max.poll.interval.ms(默認(rèn)5min),消費(fèi)組就會(huì)將該消費(fèi)者踢出消費(fèi)組,從而發(fā)生重均衡。
驗(yàn)證
讓業(yè)務(wù)方去查證業(yè)務(wù)日志,驗(yàn)證了積壓的這幾個(gè)分區(qū),總是在循環(huán)的拉取同一批消息。
解決方法
臨時(shí)解決方法就是跳過(guò)有問(wèn)題的消息,將offset重置到有問(wèn)題的消息之后。本質(zhì)上還是要業(yè)務(wù)側(cè)修改業(yè)務(wù)邏輯,增加超時(shí)或者異常處理機(jī)制,最好不要采用自動(dòng)提交offset的方式,可以手動(dòng)管理。
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
JAVA的Dubbo如何實(shí)現(xiàn)各種限流算法
Dubbo是一種高性能的Java RPC框架,廣泛應(yīng)用于分布式服務(wù)架構(gòu)中,在Dubbo中實(shí)現(xiàn)限流可以幫助服務(wù)在高并發(fā)場(chǎng)景下保持穩(wěn)定性和可靠性,常見(jiàn)的限流算法包括固定窗口算法、滑動(dòng)窗口算法、令牌桶算法和漏桶算法,在Dubbo中集成限流器可以通過(guò)實(shí)現(xiàn)自定義過(guò)濾器來(lái)實(shí)現(xiàn)2025-01-01
Java實(shí)現(xiàn)Excel表單控件的添加與刪除
本文通過(guò)Java代碼示例介紹如何在Excel表格中添加表單控件,包括文本框、單選按鈕、復(fù)選框、組合框、微調(diào)按鈕等,以及如何刪除Excel中的指定表單控件,需要的可以參考一下2022-05-05
MyBatis?Generator生成的$?sql是否存在注入風(fēng)險(xiǎn)詳解
這篇文章主要介紹了MyBatis?Generator生成的$?sql是否存在注入風(fēng)險(xiǎn)詳解,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-12-12
Java堆空間爆滿導(dǎo)致宕機(jī)的問(wèn)題分析及解決
團(tuán)隊(duì)有一個(gè)服務(wù),一直運(yùn)行的好好的,突然訪問(wèn)異常了,先是請(qǐng)求超時(shí),然后直接無(wú)法訪問(wèn),本文將給大家介紹Java堆空間爆滿導(dǎo)致宕機(jī)的問(wèn)題分析及解決,需要的朋友可以參考下2024-02-02
SpringBoot的jar包如何啟動(dòng)的實(shí)現(xiàn)
本文主要介紹了SpringBoot的jar包如何啟動(dòng)的實(shí)現(xiàn),文中根據(jù)實(shí)例編碼詳細(xì)介紹的十分詳盡,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03
springboot+zookeeper實(shí)現(xiàn)分布式鎖的示例代碼
本文主要介紹了springboot+zookeeper實(shí)現(xiàn)分布式鎖的示例代碼,文中根據(jù)實(shí)例編碼詳細(xì)介紹的十分詳盡,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03
SpringBoot實(shí)現(xiàn)國(guó)際化的教程
這篇文章主要介紹了SpringBoot實(shí)現(xiàn)國(guó)際化的教程,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2025-03-03
java使用freemarker模板生成html再轉(zhuǎn)為pdf
這篇文章主要為大家詳細(xì)介紹了java如何使用freemarker模板生成html,再利用iText將生成的HTML轉(zhuǎn)換為PDF文件,感興趣的小伙伴可以參考下2025-04-04

