Springboot使用kafka的兩種方式
1、創(chuàng)建實驗項目
第一步創(chuàng)建一個Springboot項目,引入spring-kafka依賴,這是后面的基礎(chǔ)。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
kafka配置
spring:
kafka:
bootstrap-servers: kafka.tyjt.com:9092
consumer:
auto-offset-reset: earliest
group-id: sharingan-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
2、自動檔
為了方便使用kafka,Springboot提供了spring-kafka 這個包,在已開始我們已經(jīng)導(dǎo)入了,下面直接使用吧
Spring項目里引入Kafka非常方便,使用kafkaTemplate(Producer的模版)+@KafkaListener(Consumer的監(jiān)聽器)即可完成生產(chǎn)者-消費者的代碼開發(fā)
2.1 監(jiān)聽listener
為了使創(chuàng)建 kafka 監(jiān)聽器更加簡單,Spring For Kafka 提供了 @KafkaListener 注解,
@KafkaListener 注解配置方法上,凡是此注解的方法就會被標(biāo)記為是 Kafka 消息監(jiān)聽器,所以可以用
@KafkaListener 注解快速創(chuàng)建消息監(jiān)聽器。
@Configuration
@EnableKafka
public class ConsumerConfigDemo {
@KafkaListener(topics = {"test"},groupId = "group1")
public void kafkaListener(String topic,String message){
System.out.println("消息:"+message);
}
}
2.2 發(fā)布消息
發(fā)布消息通過kafkaTemplate,kafkaTemplate是spring-kafka 的封裝
@Slf4j
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String message) throws Exception {
kafkaTemplate.send(topic,key,message);
}
}
kafkaTemplate 有很多不同的發(fā)送方法,根據(jù)自己的需求使用,這里只記錄最簡單的狀況。
3、手動檔
3.1 手動創(chuàng)建consumer
關(guān)于consumer的主要的封裝在ConcurrentKafkaListenerContainerFactory這個里頭,
本身的KafkaConsumer是線程不安全的,無法并發(fā)操作,這里spring又在包裝了一層,根據(jù)配置的spring.kafka.listener.concurrency來生成多個并發(fā)的KafkaMessageListenerContainer實例
package com.tyjt.sharingan.kafka;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 啟動kafka consumer
*
* @author 種鑫
* @date 2023/10/18 17:26
*/
@EnableKafka
@Component
@Slf4j
public class KafkaConsumerMgr {
@Resource
ConcurrentKafkaListenerContainerFactory<String, byte[]> containerFactory;
Map<String, ConcurrentMessageListenerContainer<?, ?>> containerMap = new ConcurrentHashMap<>();
public void startListener(KafkaProtoConsumer kafkaConsumer) {
// 停止相同的
if (containerMap.containsKey(kafkaConsumer.getTopic())) {
containerMap.get(kafkaConsumer.getTopic()).stop();
}
ConcurrentMessageListenerContainer<String, byte[]> container = createListenerContainer(kafkaConsumer);
container.start();
containerMap.put(kafkaConsumer.getTopic(), container);
}
private ConcurrentMessageListenerContainer<String, byte[]> createListenerContainer(KafkaProtoConsumer consumer) {
ConcurrentMessageListenerContainer<String, byte[]> container = containerFactory.createContainer(consumer.topic());
container.setBeanName(consumer.group() + "-" + consumer.topic());
container.setConcurrency(consumer.getPartitionCount());
consumer.deployContainer(container);
// 防止被修改的配置
ContainerProperties containerProperties = container.getContainerProperties();
containerProperties.setMessageListener(new Listener<>(consumer));
containerProperties.setAsyncAcks(false);
containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
containerProperties.setGroupId(consumer.group());
return container;
}
/**
* 定義監(jiān)聽
*/
private static class Listener<T> implements AcknowledgingConsumerAwareMessageListener<String, T> {
private final KafkaConsumer<T> kafkaConsumer;
public Listener(KafkaConsumer<T> consumer) {
this.kafkaConsumer = consumer;
}
@Override
public void onMessage(ConsumerRecord<String, T> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
log.info("group【{}】接收到來自topic【{}】的消息", kafkaConsumer.group(), data.topic());
// 處理數(shù)據(jù)
kafkaConsumer.process(data.value());
// 提交offset
log.info("group【{}】提交topic【{}】的offset", kafkaConsumer.group(), data.topic());
consumer.commitSync();
}
}
}
這個可以根據(jù)需要動態(tài)的啟動消費者
3.2 手動創(chuàng)建KafkaProducer
@Bean
public KafkaProducer<String, byte[]> kafkaProducer() {
Properties props = new Properties();
// 這里可以配置幾臺broker即可,他會自動從broker去拉取元數(shù)據(jù)進行緩存
props.put("bootstrap.servers", bootstrapServers);
// 這個就是負(fù)責(zé)把發(fā)送的key從字符串序列化為字節(jié)數(shù)組
props.put("key.serializer", keySerializer);
// 這個就是負(fù)責(zé)把你發(fā)送的實際的message從字符串序列化為字節(jié)數(shù)組
props.put("value.serializer", valueSerializer);
// 默認(rèn)是32兆=33554432
props.put("buffer.memory", bufferMemory);
// 一般來說是要自己手動設(shè)置的,不是純粹依靠默認(rèn)值的,16kb
props.put("batch.size", batchSize);
// 發(fā)送一條消息出去,100ms內(nèi)還沒有湊成一個batch發(fā)送,必須立即發(fā)送出去
props.put("linger.ms", lingerMs);
// 這個是說你可以發(fā)送的最大的請求的大小 默認(rèn)是1m=1048576
// props.put("max.request.size", 10485760);
// follower有沒有同步成功你就不管了
props.put("acks", acks);
// 這個重試,一般來說,給個3次~5次就足夠了,可以cover住一般的異常場景
props.put("retries", retries);
// 每次重試間隔100ms
props.put("retry.backoff.ms", retryBackOffMs);
props.put("max.in.flight.requests.per.connection", maxInFlightRequestsPerConnection);
return new KafkaProducer<>(props);
}
4、總結(jié)
4.1 區(qū)別
KafkaProducer是Kafka-client提供的原生Java Kafka客戶端發(fā)送消息的API。
KafkaTemplate是Spring Kafka中提供的一個高級工具類,用于可以方便地發(fā)送消息到Kafka。它封裝了KafkaProducer,提供了更多的便利方法和更高級的消息發(fā)送方式。
org.apache.kafka.clients.producer.KafkaProducer
org.springframework.kafka.core.KafkaTemplate

4.2 場景選擇
在spring應(yīng)用中如果需要訂閱kafka消息,通常情況下我們不會直接使用kafka-client, 而是使用更方便的一層封裝spring-kafka。
不需要動態(tài)的選擇時候可以使用Spring-kafka,在需要動態(tài)創(chuàng)建時可以使用kafka-client的api進行處理
4.3 ConsumerRecord和ProducerRecord
兩者都是kafka-client的類,在Spring-kafka中依然可以使用,可以發(fā)送和接受
以上就是Springboot使用kafka的兩種方式的詳細(xì)內(nèi)容,更多關(guān)于Springboot使用kafka的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
詳解SpringBoot中添加@ResponseBody注解會發(fā)生什么
這篇文章主要介紹了詳解SpringBoot中添加@ResponseBody注解會發(fā)生什么,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-11-11
使用BeanFactory實現(xiàn)創(chuàng)建對象
這篇文章主要為大家詳細(xì)介紹了使用BeanFactory實現(xiàn)創(chuàng)建對象,具有一定的參考價值,感興趣的小伙伴們可以參考一下2018-08-08
java調(diào)用微信現(xiàn)金紅包接口的心得與體會總結(jié)
這篇文章主要介紹了java調(diào)用微信現(xiàn)金紅包接口的心得與體會總結(jié),有需要的朋友可以了解一下。2016-11-11
Springboot使用maven打包指定mainClass問題
這篇文章主要介紹了Springboot使用maven打包指定mainClass問題,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-04-04
Spring @Valid和@Validated區(qū)別和用法實例
這篇文章主要介紹了Spring @Valid和@Validated區(qū)別和用法實例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-04-04
Mybatis-Plus中的MetaObjectHandler組件的使用
MetaObjectHandler是Mybatis-Plus中一個實用組件,專門用于自動處理實體對象中的特定字段,如創(chuàng)建時間、更新時間、創(chuàng)建人和修改人等,該接口允許開發(fā)者在不修改業(yè)務(wù)代碼的情況下,實現(xiàn)自動填充功能,極大地簡化了代碼的復(fù)雜性,感興趣的可以了解一下2024-10-10
Java中Lombok @Value注解導(dǎo)致的variable not been initialized問題
本文主要介紹了Java中Lombok @Value注解導(dǎo)致的variable not been initialized問題,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-07-07

