springboot項(xiàng)目配置多個kafka的示例代碼
1.spring-kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.3.5.RELEASE</version> </dependency>
2.配置文件相關(guān)信息
kafka.bootstrap-servers=localhost:9092
kafka.consumer.group.id=20230321
#可以并發(fā)消費(fèi)的線程數(shù) (通常與partition數(shù)量一致)
kafka.consumer.concurrency=10
kafka.consumer.enable.auto.commit=false
kafka.bootstrap-servers.pic=localhost:29092
kafka.consumer.group.id.pic=20230322_pic
kafka.consumer.concurrency.pic=10
kafka.consumer.enable.auto.commit.pic=false3.kafka配置類
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.concurrency}")
private int concurrency;
@Value("${kafka.consumer.enable.auto.commit}")
private String autoCommit;
@Value("${kafka.bootstrap-servers}")
private String bootstrapServer;
@Value("${kafka.consumer.group.id.pic}")
private String groupIdPic;
@Value("${kafka.consumer.concurrency.pic}")
private int concurrencyPic;
@Value("${kafka.consumer.enable.auto.commit.pic}")
private String autoCommitPic;
@Value("${kafka.bootstrap-servers.pic}")
private String bootstrapServerPic;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
String bootstrapServers = bootstrapServer;
Map<String, Object> configProps = new HashMap<>(16);
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactoryPic() {
String bootstrapServers = bootstrapServerPic;
Map<String, Object> configProps = new HashMap<>(16);
configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
return new DefaultKafkaConsumerFactory<>(configProps);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryPic());
factory.setConcurrency(concurrencyPic);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
return factory;
}
}4.消費(fèi)主題消息
@KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic")
public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
try {
String jsonString = message.value();
if (StringUtils.isNoneBlank(jsonString)) {
log.info("消費(fèi):{}",jsonString);
//TODO ....
}
} catch (Exception e) {
log.error(" receive topic error ", e);
} finally {
ack.acknowledge();
}
}
@KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory")
public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
try {
if (StringUtils.isNoneBlank(message.value())) {
//TODO ....
}
} catch (Exception e) {
logger.error(" receive topic error ", e);
} finally {
ack.acknowledge();
}
}到此這篇關(guān)于springboot項(xiàng)目配置多個kafka的文章就介紹到這了,更多相關(guān)springboot配置多個kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot如何集成Kafka低版本和高版本
- springboot如何配置多kafka
- kafka springBoot配置的實(shí)現(xiàn)
- Springboot使用kafka的兩種方式
- springboot連接kafka集群的使用示例
- SpringBoot3集成Kafka的方法詳解
- Springboot系列之kafka操作使用詳解
- SpringBoot如何正確配置并運(yùn)行Kafka
- springboot+kafka中@KafkaListener動態(tài)指定多個topic問題
- springboot使用@KafkaListener監(jiān)聽多個kafka配置實(shí)現(xiàn)
相關(guān)文章
SpringBoot+Spring?Data?JPA整合H2數(shù)據(jù)庫的示例代碼
H2數(shù)據(jù)庫是一個開源的關(guān)系型數(shù)據(jù)庫,本文重點(diǎn)給大家介紹SpringBoot+Spring?Data?JPA整合H2數(shù)據(jù)庫的示例代碼,感興趣的朋友跟隨小編一起看看吧2022-02-02
mybatis關(guān)聯(lián)關(guān)系映射的實(shí)現(xiàn)
MyBatis的關(guān)聯(lián)關(guān)系映射在復(fù)雜數(shù)據(jù)模型中至關(guān)重要,使開發(fā)人員能夠以最靈活的方式滿足不同項(xiàng)目的需求,本文就來介紹一下mybatis關(guān)聯(lián)關(guān)系映射的實(shí)現(xiàn),感興趣的可以了解一下2023-09-09
springboot docker jenkins 自動化部署并上傳鏡像的步驟詳解
這篇文章主要介紹了springboot docker jenkins 自動化部署并上傳鏡像的相關(guān)資料,本文給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05
java在linux本地執(zhí)行shell命令的實(shí)現(xiàn)方法
本文主要介紹了java在linux本地執(zhí)行shell命令的實(shí)現(xiàn)方法,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-02-02
Java排序之冒泡排序的實(shí)現(xiàn)與優(yōu)化
冒泡排序是一種簡單的交換排序。之所以叫做冒泡排序,因?yàn)槲覀兛梢园衙總€元素當(dāng)成一個小氣泡,根據(jù)氣泡大小,一步一步移動到隊(duì)伍的一端,最后形成一定對的順序。本文將利用Java實(shí)現(xiàn)冒泡排序,并進(jìn)行一定的優(yōu)化,希望對大家有所幫助2022-11-11
SpringBoot普通類獲取spring容器中bean的操作
這篇文章主要介紹了SpringBoot普通類獲取spring容器中bean的操作,具有很好的參考價(jià)值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09
SpringBoot+JavaMailSender實(shí)現(xiàn)騰訊企業(yè)郵箱配置
這篇文章主要介紹了SpringBoot+JavaMailSender實(shí)現(xiàn)騰訊企業(yè)郵箱配置,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-04-04
spring中實(shí)現(xiàn)容器加載完成后再執(zhí)行自己的方法
這篇文章主要介紹了spring中實(shí)現(xiàn)容器加載完成后再執(zhí)行自己的方法,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-02-02

