spring kafka框架中@KafkaListener 注解解讀和使用案例
簡介
Kafka 目前主要作為一個分布式的發(fā)布訂閱式的消息系統(tǒng)使用,也是目前最流行的消息隊(duì)列系統(tǒng)之一。因此,也越來越多的框架對 kafka 做了集成,比如本文將要說到的 spring-kafka。
Kafka 既然作為一個消息發(fā)布訂閱系統(tǒng),就包括消息生成者和消息消費(fèi)者。本文主要講述的 spring-kafka 框架的 kafkaListener 注解的深入解讀和使用案例。
解讀
源碼解讀
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
/**
* 消費(fèi)者的id,當(dāng)GroupId沒有被配置的時候,默認(rèn)id為GroupId
*/
String id() default "";
/**
* 監(jiān)聽容器工廠,當(dāng)監(jiān)聽時需要區(qū)分單數(shù)據(jù)還是多數(shù)據(jù)消費(fèi)需要配置containerFactory 屬性
*/
String containerFactory() default "";
/**
* 需要監(jiān)聽的Topic,可監(jiān)聽多個,和 topicPattern 屬性互斥
*/
String[] topics() default {};
/**
* 需要監(jiān)聽的Topic的正則表達(dá)。和 topics,topicPartitions屬性互斥
*/
String topicPattern() default "";
/**
* 可配置更加詳細(xì)的監(jiān)聽信息,必須監(jiān)聽某個Topic中的指定分區(qū),或者從offset為200的偏移量開始監(jiān)聽,可配置該參數(shù), 和 topicPattern 屬性互斥
*/
TopicPartition[] topicPartitions() default {};
/**
*偵聽器容器組
*/
String containerGroup() default "";
/**
* 監(jiān)聽異常處理器,配置BeanName
*/
String errorHandler() default "";
/**
* 消費(fèi)組ID
*/
String groupId() default "";
/**
* id是否為GroupId
*/
boolean idIsGroup() default true;
/**
* 消費(fèi)者Id前綴
*/
String clientIdPrefix() default "";
/**
* 真實(shí)監(jiān)聽容器的BeanName,需要在 BeanName前加 "__"
*/
String beanRef() default "__listener";
}使用案例
ConsumerRecord 類消費(fèi)
使用 ConsumerRecord 類接收有一定的好處,ConsumerRecord 類里面包含分區(qū)信息、消息頭、消息體等內(nèi)容,如果業(yè)務(wù)需要獲取這些參數(shù)時,使用 ConsumerRecord 會是個不錯的選擇。如果使用具體的類型接收消息體則更加方便,比如說用 String 類型去接收消息體。
這里我們編寫一個 Listener 方法,監(jiān)聽 "topic1"Topic,并把 ConsumerRecord 里面所包含的內(nèi)容打印到控制臺中:
@Component
public class Listener {
private static final Logger log = LoggerFactory.getLogger(Listener.class);
@KafkaListener(id = "consumer", topics = "topic1")
public void consumerListener(ConsumerRecord record) {
log.info("topic.quick.consumer receive : " + record.toString());
}
}批量消費(fèi)
批量消費(fèi)在現(xiàn)實(shí)業(yè)務(wù)場景中是很有實(shí)用性的。因?yàn)榕肯M(fèi)可以增大 kafka 消費(fèi)吞吐量, 提高性能。
批量消費(fèi)實(shí)現(xiàn)步驟:
1、重新創(chuàng)建一份新的消費(fèi)者配置,配置為一次拉取 10 條消息
2、創(chuàng)建一個監(jiān)聽容器工廠,命名為:batchContainerFactory,設(shè)置其為批量消費(fèi)并設(shè)置并發(fā)量為 5,這個并發(fā)量根據(jù)分區(qū)數(shù)決定,必須小于等于分區(qū)數(shù),否則會有線程一直處于空閑狀態(tài)。
3、創(chuàng)建一個分區(qū)數(shù)為 8 的 Topic。
4、創(chuàng)建監(jiān)聽方法,設(shè)置消費(fèi) id 為 “batchConsumer”,clientID 前綴為“batch”,監(jiān)聽“batch”,使用“batchContainerFactory” 工廠創(chuàng)建該監(jiān)聽容器。
@Component
public class BatchListener {
private static final Logger log= LoggerFactory.getLogger(BatchListener.class);
private Map consumerProps() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
//一次拉取消息數(shù)量
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
NumberDeserializers.IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
return props;
}
@Bean("batchContainerFactory")
public ConcurrentKafkaListenerContainerFactory listenerContainer() {
ConcurrentKafkaListenerContainerFactory container
= new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
//設(shè)置并發(fā)量,小于或等于Topic的分區(qū)數(shù)
container.setConcurrency(5);
//必須 設(shè)置為批量監(jiān)聽
container.setBatchListener(true);
return container;
}
@Bean
public NewTopic batchTopic() {
return new NewTopic("topic.batch", 8, (short) 1);
}
@KafkaListener(id = "batchConsumer",clientIdPrefix = "batch"
,topics = {"topic.batch"},containerFactory = "batchContainerFactory")
public void batchListener(List data) {
log.info("topic.batch receive : ");
for (String s : data) {
log.info( s);
}
}
}監(jiān)聽 Topic 中指定的分區(qū)
使用 @KafkaListener 注解的 topicPartitions 屬性監(jiān)聽不同的 partition 分區(qū)。
@TopicPartition:topic-- 需要監(jiān)聽的 Topic 的名稱,partitions – 需要監(jiān)聽 Topic 的分區(qū) id。
partitionOffsets – 可以設(shè)置從某個偏移量開始監(jiān)聽,@PartitionOffset:partition – 分區(qū) Id,非數(shù)組,initialOffset – 初始偏移量。
@Bean
public NewTopic batchWithPartitionTopic() {
return new NewTopic("topic.batch.partition", 8, (short) 1);
}
@KafkaListener(id = "batchWithPartition",clientIdPrefix = "bwp",containerFactory = "batchContainerFactory",
topicPartitions = {
@TopicPartition(topic = "topic.batch.partition",partitions = {"1","3"}),
@TopicPartition(topic = "topic.batch.partition",partitions = {"0","4"},
partitionOffsets = @PartitionOffset(partition = "2",initialOffset = "100"))
}
)
public void batchListenerWithPartition(List data) {
log.info("topic.batch.partition receive : ");
for (String s : data) {
log.info(s);
}
}
注解方式獲取消息頭及消息體
當(dāng)你接收的消息包含請求頭,以及你監(jiān)聽方法需要獲取該消息非常多的字段時可以通過這種方式。。這里使用的是默認(rèn)的監(jiān)聽容器工廠創(chuàng)建的,如果你想使用批量消費(fèi),把對應(yīng)的類型改為 List 即可,比如 List data , List key。
@Payload:獲取的是消息的消息體,也就是發(fā)送內(nèi)容
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY):獲取發(fā)送消息的 key
@Header(KafkaHeaders.RECEIVED_PARTITION_ID):獲取當(dāng)前消息是從哪個分區(qū)中監(jiān)聽到的
@Header(KafkaHeaders.RECEIVED_TOPIC):獲取監(jiān)聽的 TopicName
@Header(KafkaHeaders.RECEIVED_TIMESTAMP):獲取時間戳
@KafkaListener(id = "params", topics = "topic.params")
public void otherListener(@Payload String data,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts) {
log.info("topic.params receive : \n"+
"data : "+data+"\n"+
"key : "+key+"\n"+
"partitionId : "+partition+"\n"+
"topic : "+topic+"\n"+
"timestamp : "+ts+"\n"
);
}使用 Ack 機(jī)制確認(rèn)消費(fèi)
Kafka 是通過最新保存偏移量進(jìn)行消息消費(fèi)的,而且確認(rèn)消費(fèi)的消息并不會立刻刪除,所以我們可以重復(fù)的消費(fèi)未被刪除的數(shù)據(jù),當(dāng)?shù)谝粭l消息未被確認(rèn),而第二條消息被確認(rèn)的時候,Kafka 會保存第二條消息的偏移量,也就是說第一條消息再也不會被監(jiān)聽器所獲取,除非是根據(jù)第一條消息的偏移量手動獲取。Kafka 的 ack 機(jī)制可以有效的確保消費(fèi)不被丟失。因?yàn)樽詣犹峤皇窃?kafka 拉取到數(shù)據(jù)之后就直接提交,這樣很容易丟失數(shù)據(jù),尤其是在需要事物控制的時候。
使用 Kafka 的 Ack 機(jī)制比較簡單,只需簡單的三步即可:
- 設(shè)置 ENABLE_AUTO_COMMIT_CONFIG=false,禁止自動提交
- 設(shè)置 AckMode=MANUAL_IMMEDIATE
- 監(jiān)聽方法加入 Acknowledgment ack 參數(shù)
4.使用 Consumer.seek 方法,可以指定到某個偏移量的位置
@Component
public class AckListener {
private static final Logger log = LoggerFactory.getLogger(AckListener.class);
private Map consumerProps() {
Map props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean("ackContainerFactory")
public ConcurrentKafkaListenerContainerFactory ackContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(consumerProps()));
return factory;
}
@KafkaListener(id = "ack", topics = "topic.ack", containerFactory = "ackContainerFactory")
public void ackListener(ConsumerRecord record, Acknowledgment ack) {
log.info("topic.quick.ack receive : " + record.value());
ack.acknowledge();
}
}
解決重復(fù)消費(fèi)
上一節(jié)中使用 ack 手動提交偏移量時,假如 consumer 掛了重啟,那它將從 committed offset 位置開始重新消費(fèi),而不是 consume offset 位置。這也就意味著有可能重復(fù)消費(fèi)。
在 0.9 客戶端中,有 3 種 ack 策略:
策略 1: 自動的,周期性的 ack。
策略 2:consumer.commitSync(),調(diào)用 commitSync,手動同步 ack。每處理完 1 條消息,commitSync 1 次。
策略 3:consumer. commitASync(),手動異步 ack。、
那么使用策略 2,提交每處理完 1 條消息,就發(fā)送一次 commitSync。那這樣是不是就可以解決 “重復(fù)消費(fèi)” 了呢?如下代碼:
while (true) {
List buffer = new ArrayList<>();
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
buffer.add(record);
}
insertIntoDb(buffer); //消除處理,存到db
consumer.commitSync(); //同步發(fā)送ack
buffer.clear();
}
}答案是否定的!因?yàn)樯厦娴?insertIntoDb 和 commitSync 做不到原子操作:如果在數(shù)據(jù)處理完成,commitSync 的時候掛了,服務(wù)器再次重啟,消息仍然會重復(fù)消費(fèi)。
那么如何解決重復(fù)消費(fèi)的問題呢?答案是自己保存 committed offset,而不是依賴 kafka 的集群保存 committed offset,把消息的處理和保存 offset 做成一個原子操作,并且對消息加入唯一 id, 進(jìn)行判重。
依照官方文檔, 要自己保存偏移量, 需要:
- enable.auto.commit=false, 禁用自動 ack。
- 每次取到消息,把對應(yīng)的 offset 存下來。
- 下次重啟,通過 consumer.seek 函數(shù),定位到自己保存的 offset,從那開始消費(fèi)。
- 更進(jìn)一步處理可以對消息加入唯一 id, 進(jìn)行判重。
到此這篇關(guān)于kafka @KafkaListener 注解解讀的文章就介紹到這了,更多相關(guān)@KafkaListener 注解內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Mybatis關(guān)聯(lián)查詢結(jié)果集對象嵌套的具體使用
在查詢時經(jīng)常出現(xiàn)一對多”的關(guān)系,所有會出現(xiàn)嵌套對象的情況,本文主要介紹了Mybatis關(guān)聯(lián)查詢結(jié)果集對象嵌套的具體使用,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-02-02
springboot+redis自定義注解實(shí)現(xiàn)發(fā)布訂閱的實(shí)現(xiàn)代碼
在Redis中客戶端可以通過訂閱特定的頻道來接收發(fā)送至該頻道的消息,本文主要介紹了springboot+redis自定義注解實(shí)現(xiàn)發(fā)布訂閱,具有一定的參考價值,感興趣的可以了解一下2023-08-08
Spring中TransactionSynchronizationManager的使用詳解
這篇文章主要介紹了Spring中TransactionSynchronizationManager的使用詳解,TransactionSynchronizationManager是事務(wù)同步管理器,監(jiān)聽事務(wù)的操作,來實(shí)現(xiàn)在事務(wù)前后可以添加一些指定操作,需要的朋友可以參考下2023-09-09
在SpringBoot當(dāng)中使用Thymeleaf視圖解析器的詳細(xì)教程
Thymeleaf是一款開源的模板引擎,它允許前端開發(fā)者使用HTML與XML編寫動態(tài)網(wǎng)頁,hymeleaf的主要特點(diǎn)是將表達(dá)式語言嵌入到HTML結(jié)構(gòu)中,它支持Spring框架,使得在Spring MVC應(yīng)用中集成非常方便,本文給大家介紹了在SpringBoot當(dāng)中使用Thymeleaf視圖解析器的詳細(xì)教程2024-09-09
使用Java實(shí)現(xiàn)系統(tǒng)托盤功能的介紹(附源碼以及截圖)
本篇文章介紹了,在Java中實(shí)現(xiàn)系統(tǒng)托盤功能的詳解,文中附源碼以及截圖介紹。需要的朋友參考下2013-05-05
java實(shí)現(xiàn)適用于安卓的文件下載線程類
本文給大家分享的是java實(shí)現(xiàn)適用于安卓的文件下載線程類的代碼,有需要的小伙伴可以參考下2015-07-07
Springboot使用異步請求提高系統(tǒng)的吞吐量詳解
這篇文章主要介紹了Springboot使用異步請求提高系統(tǒng)的吞吐量詳解,和同步請求相對,異步不需要等待響應(yīng),隨時可以發(fā)送下一次請求,如果是同步請求,需要將信息填寫完整,再發(fā)送請求,服務(wù)器響應(yīng)填寫是否正確,再做修改,需要的朋友可以參考下2023-08-08
解決Mybatis在IDEA中找不到mapper映射文件的問題
這篇文章主要介紹了解決Mybatis在IDEA中找不到mapper映射文件的問題,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-10-10

