kafka提交偏移量失敗導(dǎo)致重復(fù)消費(fèi)的解決
問題詳情
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:92)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1365)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1063)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1116)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:983)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2311)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2306)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2292)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2106)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1097)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1031)
... 3 common frames omitted
解決思路
kafka的好多配置,在spring-kafka中沒有明確的配置對應(yīng),但是預(yù)留了一個(gè)properties屬性,可以設(shè)置所有的kafka配置
spring.kafka.properties.session.timeout.ms=10000 // 單位:毫秒 spring.kafka.properties.max.poll.interval.ms=300000 // 單位:毫秒
kafka會(huì)有一個(gè)心跳線程來同步服務(wù)端,告訴服務(wù)端自己是正常可用的,默認(rèn)是3秒發(fā)送一次心跳,超過session.timeout.ms(默認(rèn)10秒)服務(wù)端沒有收到心跳就會(huì)認(rèn)為當(dāng)前消費(fèi)者失效。max.poll.interval.ms決定了獲取消息后提交偏移量的最大時(shí)間,超過設(shè)定的時(shí)間(默認(rèn)5分鐘),服務(wù)端也會(huì)認(rèn)為該消費(fèi)者失效。
Kafka配置max.poll.interval.ms參數(shù)
max.poll.interval.ms默認(rèn)值是5分鐘,如果需要加大時(shí)長就需要給這個(gè)參數(shù)重新賦值
這里解釋下自己為什么要修改這個(gè)參數(shù):因?yàn)榈谝淮谓邮誯afka數(shù)據(jù),需要加載一堆基礎(chǔ)數(shù)據(jù),大概執(zhí)行時(shí)間要8分鐘,而5分鐘后,kafka認(rèn)為我沒消費(fèi),又重新發(fā)送,導(dǎo)致我這邊收到許多重復(fù)數(shù)據(jù),所以我需要調(diào)大這個(gè)值,避免接收重復(fù)數(shù)據(jù)
大部分文章都是如下配置:
public static KafkaConsumer<String, String> createConsumer() {
Properties properties = new Properties();
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 10000);
return new KafkaConsumer<>(properties);
}
或是:
max.poll.interval.ms = 300000
如果需要在yml文件中配置,應(yīng)該怎么寫呢?
spring:
kafka:
consumer:
max-poll-records: 500
properties:
max.poll.interval.ms: 600000
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot整合Redis實(shí)現(xiàn)token緩存
于token通常會(huì)被多次使用,我們需要把它保存到緩存中,以減少頻繁地訪問數(shù)據(jù)庫,本文主要介紹了SpringBoot整合Redis實(shí)現(xiàn)token緩存,感興趣的可以了解一下2024-02-02
spring cloud gateway 限流的實(shí)現(xiàn)與原理
這篇文章主要介紹了spring cloud gateway 限流的實(shí)現(xiàn)與原理,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-12-12
解決java.util.RandomAccessSubList cannot be cas
當(dāng)你嘗試將RandomAccessSubList強(qiáng)制轉(zhuǎn)換為ArrayList時(shí),會(huì)拋出ClassCastException異常,解決這個(gè)問題,你可以使用List接口進(jìn)行操作,或者使用ArrayList的構(gòu)造函數(shù)創(chuàng)建新的ArrayList對象來處理子列表2025-11-11
Spring如何基于aop實(shí)現(xiàn)操作日志功能
這篇文章主要介紹了Spring如何基于aop實(shí)現(xiàn)操作日志功能,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-11-11
SpringBoot使用MyBatis實(shí)現(xiàn)數(shù)據(jù)的CRUD
MyBatis是一個(gè)輕量級(jí)的對象關(guān)系映射(Object-Relational Mapping,ORM)框架,它允許開發(fā)者通過編寫SQL動(dòng)態(tài)查詢數(shù)據(jù)庫,而無需顯式地操作JDBC,對于增刪改查操作,MyBatis提供了一種基于XML或注解的方式來進(jìn)行,本文介紹了SpringBoot使用MyBatis實(shí)現(xiàn)數(shù)據(jù)的CRUD2024-11-11
Spring?@Transactional事務(wù)失效的原因分析
一個(gè)程序中不可能沒有事務(wù),Spring中,事務(wù)的實(shí)現(xiàn)方式分為兩種:編程式事務(wù)和聲明式事務(wù)。日常項(xiàng)目中,我們都會(huì)使用聲明式事務(wù)?@Transactional來實(shí)現(xiàn)事務(wù),本文來和大家聊聊什么情況會(huì)導(dǎo)致@Transactional事務(wù)失效2022-09-09
springboot數(shù)據(jù)庫密碼加密的配置方法
這篇文章主要給大家介紹了關(guān)于springboot數(shù)據(jù)庫密碼加密的配置方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
SpringBoot API增加version版本號(hào)方式
這篇文章主要介紹了SpringBoot API增加version版本號(hào)方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-10-10

