Kafka批量消費(fèi)&逐條消費(fèi)詳解
Kafka批量消費(fèi)&逐條消費(fèi)
消費(fèi)者配置參數(shù)
private Map<String, Object> defaultGoodsConsumerConfig() {
Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:port");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer
");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer
");
props.put("listener.type", "batch");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "modify-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, “SASL_PLAINTEXT”);
props.put(SaslConfigs.SASL_MECHANISM, defaultKafkaProperties.getSaslMechanism());
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="###" password="###";
");
return props;
}
@Bean(name = "defaultListenerContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> defaultListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(defaultGoodsConsumerConfig()));
factory.setConcurrency(4);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
log.info("KafkaDefaultConsumer factory獲取實(shí)例:"+ JSON.toJSONString(factory));
return factory;
}
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory(defaultGoodsConsumerConfig()));
factory.setConcurrency(4);
//批量消費(fèi),如果不設(shè)置默認(rèn)是單條消費(fèi)
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
消費(fèi)者監(jiān)聽(tīng)消息
/**
* 監(jiān)聽(tīng)goods變更消息
*/
@KafkaListener(id="sync-modify-goods", topics = "${kafka.sync.goods.topic}", concurrency = "4", containerFactory = "defaultListenerContainerFactory")
public void updateListener(List<ConsumerRecord<String, String>> records){
for (ConsumerRecord<String, String> msg:records) {
GoodsChangeMsg changeMsg = null;
try {
changeMsg = JSONObject.parseObject(msg.value(), GoodsChangeMsg.class);
syncGoodsProcessor.handle(changeMsg);
}catch (Exception exception) {
log.error("解析失敗{}", msg, exception);
}
}
}
List<ConsumerRecord<String, String>> records可以是String[] message
如果是逐條消費(fèi),這里配置list,kafka會(huì)根據(jù)字符串中的逗號(hào)進(jìn)行分割,所以碰見(jiàn)該現(xiàn)象不要慌,看一下批量消費(fèi)的配置。
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
java IO流 之 輸出流 OutputString()的使用
這篇文章主要介紹了java IO流 之 輸出流 OutputString()的使用的相關(guān)資料,需要的朋友可以參考下2016-12-12
springboot集成camunda的實(shí)現(xiàn)示例
本文主要介紹了springboot集成camunda的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-10-10
Java靜態(tài)static與實(shí)例instance方法示例
這篇文章主要為大家介紹了Java靜態(tài)static與實(shí)例instance方法示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-08-08
java實(shí)現(xiàn)自定義日期選擇器的方法實(shí)例
日期選擇器是我們?nèi)粘i_(kāi)發(fā)中經(jīng)常需要用到的一個(gè)功能,下面這篇文章主要給大家介紹了關(guān)于利用java實(shí)現(xiàn)自定義日期選擇器的相關(guān)資料,文中給出了詳細(xì)的示例代碼,需要的朋友可以參考借鑒,下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧。2017-10-10

