SpringBoot集成Kafka開發(fā)超詳細(xì)過程解析
4.SpringBoot集成Kafka開發(fā)
4.1 創(chuàng)建項(xiàng)目


4.2 配置文件
application.yml
spring:
application:
name: spring-boot-01-kafka-base
kafka:
bootstrap-servers: 192.168.2.118:9092
4.3 創(chuàng)建生產(chǎn)者
package com.zzc.producer;
import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void sendEvent(){
kafkaTemplate.send("hello-topic", "hello kafka");
}
}4.4 測(cè)試
package com.zzc.producer;
import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void sendEvent(){
kafkaTemplate.send("hello-topic", "hello kafka");
}
}hello-topic中已存放一個(gè)消息

4.5 創(chuàng)建消費(fèi)者
package com.zzc.cosumer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class EventConsumer {
// 采用監(jiān)聽的方式接收事件(消息、數(shù)據(jù))
@KafkaListener(topics = {"hello-topic"}, groupId = "hello-group")
public void onEvent(String event){
System.out.printf("讀取到的事件:" + event);
}
}啟動(dòng)springboot,發(fā)現(xiàn)并沒有讀取到之前的消息

此時(shí)使用測(cè)試類調(diào)用生成者再發(fā)送一個(gè)消息,此時(shí)消費(fèi)者成功監(jiān)聽到剛生產(chǎn)的消息

4.6 Kafka的幾個(gè)概念


- 默認(rèn)情況下,當(dāng)啟動(dòng)一個(gè)新的消費(fèi)者組時(shí),它會(huì)從每個(gè)分區(qū)的最新偏移量(即該分區(qū)中最后一條消息的下一個(gè)位置)開始消費(fèi)。如果希望從第一條消息開始消費(fèi),需要將消費(fèi)者的 auto.offset.reset 設(shè)置為 earliest ;
- 注意: 如果之前已經(jīng)用相同的消費(fèi)者組 ID 消費(fèi)過該主題,并且 Kafka 已經(jīng)保存了該消費(fèi)者組的偏移量,那么即使你設(shè)置了 auto.offset.reset=earliest ,該設(shè)置也不會(huì)生效,因?yàn)?Kafka 只會(huì)在找不到偏移量時(shí)使用這個(gè)配置。在這種情況下,你需要手動(dòng)重置偏移量或使用一個(gè)新的消費(fèi)者組 ID ;
4.7 消息消費(fèi)時(shí)偏移量策略的配置
spring: kafka: consumer: auto-offset-reset: earliest
- 取值: earliest 、 latest 、 none 、 exception
- earliest :自動(dòng)將偏移量重置為最早的偏移量;
- latest :自動(dòng)將偏移量重置為最新偏移量;
- none :如果沒有為消費(fèi)者組找到以前的偏移量,則向消費(fèi)者拋出異常;
- exception :向消費(fèi)者拋出異常;( spring-kafka 不支持)
4.7.1 測(cè)試修改配置后能否消費(fèi)之前的消息
修改配置重啟服務(wù)后,并沒有消費(fèi)之前的消息

修改消費(fèi)者組ID,再次重啟服務(wù)進(jìn)行測(cè)試
@Component
public class EventConsumer {
// 采用監(jiān)聽的方式接收事件(消息、數(shù)據(jù))
@KafkaListener(topics = {"hello-topic"}, groupId = "hello-group-02")
public void onEvent(String event){
System.out.println("讀取到的事件:" + event);
}
}成功讀取到之前的消息

4.7.2 手動(dòng)重置偏移量
修改為讀取最早的消息 ./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-earliest --execute 修改為讀取最新的消息 ./kafka-consumer-groups.sh --bootstrap-server <your-kafka-bootstrap-servers> --group <your-consumer-group> --topic <your-topic> --reset-offsets --to-latest --execute
執(zhí)行命令
./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group hello-group-02 --topic hello-topic --reset-offsets --to-earliest --execute

報(bào)錯(cuò):提示我們不能在活躍的情況下進(jìn)行修改偏移量,需要先停止服務(wù)
再次執(zhí)行命令,已經(jīng)重置偏移量成功

此時(shí)啟動(dòng)服務(wù),讀取到之前的消息了

4.8 生產(chǎn)者發(fā)送消息參數(shù)(生產(chǎn)者客戶端向Kafka的主題topic中寫入事件)

4.8.1 message對(duì)象參數(shù)
/**
* 使用message對(duì)象發(fā)送消息
*/
public void sendEvent02(){
// 通過構(gòu)建器模式創(chuàng)建Message對(duì)象
Message<String> message = MessageBuilder.withPayload("hello kafka")
// 在header中放置topic的名字
.setHeader(KafkaHeaders.TOPIC, "test-topic-02")
.build();
kafkaTemplate.send(message);
}測(cè)試是否發(fā)送消息到topic中
@Test
public void test02(){
eventProducer.sendEvent02();
}
成功發(fā)送消息到test-topic-02中

4.8.2 producerRecord對(duì)象參數(shù)
/**
* 使用ProducerRecord對(duì)象發(fā)送消息
*/
public void sendEvent03(){
// Headers里面是放一些信息(信息是key-value鍵值對(duì)),到時(shí)候消費(fèi)者接收到該消息后,可以拿到這個(gè)Headers里面放的信息
Headers headers = new RecordHeaders();
headers.add("phone", "13698001234".getBytes(StandardCharsets.UTF_8));
headers.add("orderId", "12473289472846178242873".getBytes(StandardCharsets.UTF_8));
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
"test-topic-02",
0,
System.currentTimeMillis(),
"k1",
"hello kafka",
headers
);
kafkaTemplate.send(producerRecord);
}測(cè)試
@Test
public void test03(){
eventProducer.sendEvent03();
}
成功向test-topic-02中發(fā)送一條消息

4.8.3 send最多參數(shù)構(gòu)造方法
public void sendEvent04() {
// String topic, Integer partition, Long timestamp, K key, @Nullable V data
kafkaTemplate.send(
"test-topic-02",
0,
System.currentTimeMillis(),
"k2",
"hello kafka"
);
}測(cè)試
@Test
public void test04(){
eventProducer.sendEvent04();
}
成功向test-topic-02中發(fā)送一條消息

4.8.4 sendDefault最多參數(shù)構(gòu)造方法
public void sendEvent05(){
kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello kafka");
}
測(cè)試
@Test
public void test04(){
eventProducer.sendEvent04();
}
執(zhí)行測(cè)試方法,報(bào)錯(cuò)提示 topic不能為空

需要在配置文件中添加配置
spring:
application:
name: spring-boot-01-kafka-base
kafka:
bootstrap-servers: 192.168.2.118:9092
consumer:
auto-offset-reset: earliest
# 配置模板默認(rèn)的主題topic名稱
template:
default-topic: default-topic再次執(zhí)行測(cè)試方法,成功向default-topic中發(fā)送消息

4.9 KafkaTemplate.send()和KafkaTemplate.sendDefault()的區(qū)別
- 主要區(qū)別是發(fā)送消息到 Kafka 時(shí)是否每次都需要指定主題 topic;
- kafkaTemplate.send(…) 該方法需要明確地指定要發(fā)送消息的目標(biāo)主題 topic ;
- kafkaTemplate.sendDefault() 該方法不需要指定要發(fā)送消息的目標(biāo)主題 topic ;
- kafkaTemplate.send(…) 方法適用于需要根據(jù)業(yè)務(wù)邏輯或外部輸入動(dòng)態(tài)確定消息目標(biāo) topic 的場(chǎng)景;
- kafkaTemplate.sendDefault() 方法適用于總是需要將消息發(fā)送到特定默認(rèn) topic 的場(chǎng)景;
- kafkaTemplate.sendDefault() 是一個(gè)便捷方法,它使用配置中指定的默認(rèn)主題 topic 來發(fā)送消息;
- 如果應(yīng)用中所有消息都發(fā)送到同一個(gè)主題時(shí)采用該方法非常方便,可以減少代碼的重復(fù)或滿足特定的業(yè)務(wù)需求;
4.10 獲取生產(chǎn)者消息發(fā)送結(jié)果
- .send() 方法和 .sendDefault() 方法都返回 CompletableFuture<SendResult<K, V>> ;
- CompletableFuture 是 Java 8 中引入的一個(gè)類,用于異步編程,它表示一個(gè)異步計(jì)算的結(jié)果,這個(gè)特性使得調(diào)用者不必等待操作完成就能繼續(xù)執(zhí)行其他任務(wù),從而提高了應(yīng)用程序的響應(yīng)速度和吞吐量;
- 方式一:調(diào)用 CompletableFuture 的 get() 方法,同步阻塞等待發(fā)送結(jié)果;
- 方式二:使用 thenAccept(), thenApply(), thenRun() 等方法來注冊(cè)回調(diào)函數(shù),回調(diào)函數(shù)將在CompletableFuture 完成時(shí)被執(zhí)行;
4.10.1 調(diào)用 CompletableFuture 的 get() 方法,同步阻塞等待發(fā)送結(jié)果
/**
* 通過get方法同步阻塞等待發(fā)送結(jié)果
*/
public void sendEvent06(){
CompletableFuture<SendResult<String, String>> completableFuture =
kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello kafka");
try {
// 1.阻塞等待的方式拿結(jié)果
SendResult<String, String> sendResult = completableFuture.get();
if (sendResult.getRecordMetadata() != null){
// kafka服務(wù)器確認(rèn)已經(jīng)接收到了消息
System.out.println("消息發(fā)送成功:" + sendResult.getRecordMetadata().toString());
}
System.out.println("producerRecord: " + sendResult.getProducerRecord());
} catch (Exception e) {
throw new RuntimeException(e);
}
}測(cè)試,成功獲取到結(jié)果和發(fā)送的消息信息
@Test
public void test06(){
eventProducer.sendEvent06();
}

4.10.2 使用 thenAccept()方法來注冊(cè)回調(diào)函數(shù),回調(diào)函數(shù)將在CompletableFuture 完成時(shí)被執(zhí)行
/**
* 通過thenAccept方法注冊(cè)回調(diào)函數(shù)
*/
public void sendEvent07(){
CompletableFuture<SendResult<String, String>> completableFuture =
kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello kafka");
completableFuture.thenAccept(sendResult -> {
if (sendResult.getRecordMetadata() != null){
// kafka服務(wù)器確認(rèn)已經(jīng)接收到了消息
System.out.println("消息發(fā)送成功:" + sendResult.getRecordMetadata().toString());
}
System.out.println("producerRecord: " + sendResult.getProducerRecord());
}).exceptionally( throwable -> {
// 做失敗的處理
throwable.printStackTrace();
return null;
});
}
測(cè)試,成功獲取到結(jié)果和發(fā)送的消息信息
@Test
public void test07(){
eventProducer.sendEvent07();
}

4.11 生產(chǎn)者發(fā)送對(duì)象消息
4.11.1 創(chuàng)建User對(duì)象
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {
private int id;
private String phone;
private Date birthDay;
}4.11.2 注入新的kafkaTemplate對(duì)象,因?yàn)橹暗膋ey和value泛型都是String類型
/**
* 發(fā)送對(duì)象消息
*/
@Resource
private KafkaTemplate<String, Object> kafkaTemplate2;
private KafkaTemplate<String, Object> kafkaTemplate2;
public void sendEvent08(){
User user = User.builder().id(1200).phone("13698981234").birthDay(new Date()).build();
// 分區(qū)編號(hào)為 null ,交給 kafka 自己去分配
kafkaTemplate2.sendDefault(null, System.currentTimeMillis(), "k4", user);
}
4.11.3 測(cè)試發(fā)送消息
報(bào)錯(cuò) 說不能將value轉(zhuǎn)成StringSerializer

需要在配置文件中指定value的Serializer類型
producer:
# key和value都默認(rèn)是StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
再次執(zhí)行測(cè)試,執(zhí)行成功

defalut-topic中新增一條消息

4.12 Kafka的核心概念:Replica副本
- Replica :副本,為實(shí)現(xiàn)備份功能,保證集群中的某個(gè)節(jié)點(diǎn)發(fā)生故障時(shí),該節(jié)點(diǎn)上的 partition 數(shù)據(jù)不丟失,且
Kafka 仍然能夠繼續(xù)工作, Kafka 提供了副本機(jī)制,一個(gè) topic 的每個(gè)分區(qū)都有 1 個(gè)或多個(gè)副本; - Replica 副本分為 Leader Replica 和 Follower Replica :
- Leader :每個(gè)分區(qū)多個(gè)副本中的“主”副本,生產(chǎn)者發(fā)送數(shù)據(jù)以及消費(fèi)者消費(fèi)數(shù)據(jù),都是來自 leader 副本
- Follower :每個(gè)分區(qū)多個(gè)副本中的“從”副本,實(shí)時(shí)從 leader 副本中同步數(shù)據(jù),保持和 leader 副本數(shù)據(jù)的同
步, leader 副本發(fā)生故障時(shí),某個(gè) follower 副本會(huì)成為新的 leader 副本;
- 設(shè)置副本個(gè)數(shù)不能為 0 ,也不能大于節(jié)點(diǎn)個(gè)數(shù),否則將不能創(chuàng)建 Topic ;
4.12.1 指定topic的分區(qū)和副本
4.12.1.1 方式一:通過Kafka提供的命令行工具在創(chuàng)建topic時(shí)指定分區(qū)和副本
./kafka-topics.sh --create --topic myTopic --partitions 3 --replication-factor 1 --bootstrap-server 127.0.0.1:9092
創(chuàng)建成功

4.12.1.2 方式二:執(zhí)行代碼時(shí)指定分區(qū)和副本
- kafkaTemplate.send(“topic”, message);
- 直接使用 send() 方法發(fā)送消息時(shí), kafka 會(huì)幫我們自動(dòng)完成 topic 的創(chuàng)建工作,但這種情況下創(chuàng)建的 topic 默認(rèn)只有一個(gè)分區(qū),分區(qū)有 1 個(gè)副本,也就是有它自己本身的副本,沒有額外的副本備份;
- 我們可以在項(xiàng)目中新建一個(gè)配置類專門用來初始化 topic ;
@Configuration
public class KafkaConfig {
// 創(chuàng)建一個(gè)名為helloTopic的Topic并設(shè)置分區(qū)數(shù)為5,分區(qū)副本數(shù)為1
@Bean
public NewTopic newTopic(){
// 副本不能設(shè)置為0 也不能超過節(jié)點(diǎn)數(shù)
return new NewTopic("helloTopic", 5, (short) 1);
}
}創(chuàng)建成功

4.12.2 測(cè)試重啟服務(wù)會(huì)不會(huì)重置消息,先向helloTopic中發(fā)送一個(gè)消息
public void sendEvent09(){
User user = User.builder().id(1200).phone("13698981234").birthDay(new Date()).build();
kafkaTemplate2.send(
"helloTopic",
null,
System.currentTimeMillis(),
"k9",
user
);
}
測(cè)試代碼
@Test
public void test09(){
eventProducer.sendEvent09();
}
成功向helloTopic中發(fā)送一個(gè)消息

重啟服務(wù)后,并沒有重置消息

4.12.3 修改分區(qū)數(shù)
配置類中增加更新配置代碼
@Configuration
public class KafkaConfig {
// 創(chuàng)建一個(gè)名為helloTopic的Topic并設(shè)置分區(qū)數(shù)為5,分區(qū)副本數(shù)為1
@Bean
public NewTopic newTopic(){
return new NewTopic("helloTopic", 5, (short) 1);
}
// 如果要修改分區(qū)數(shù),只需修改配置值重啟項(xiàng)目即可,修改分區(qū)數(shù)并不會(huì)導(dǎo)致數(shù)據(jù)的丟失,但是分區(qū)數(shù)只能增大不能減少
@Bean
public NewTopic updateTopic(){
return new NewTopic("helloTopic", 10, (short) 1);
}
}重啟項(xiàng)目,分區(qū)數(shù)更新為10,消息的位置也沒發(fā)生變化

4.13 生產(chǎn)者發(fā)送消息的分區(qū)策略(消息發(fā)到哪個(gè)分區(qū)中?是什么策略)
- 生產(chǎn)者寫入消息到topic,Kafka將依據(jù)不同的策略將數(shù)據(jù)分配到不同的分區(qū)中
? 如果指定了分區(qū),那將發(fā)送消息到指定分區(qū)中

執(zhí)行測(cè)試代碼

看send方法源代碼可以看到

- 默認(rèn)分配策略:BuiltInPartitioner
- 有key:Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
- 沒有key:使用隨機(jī)數(shù) % numPartitions
- 輪詢分配策略:RoundRobinPartitioner(實(shí)現(xiàn)的接口:Partitioner)
- 自定義分配策略:我們自己定義
4.13.1 輪詢分配策略
yml配置文件
spring:
application:
name: spring-boot-01-kafka-base
kafka:
bootstrap-servers: 192.168.2.118:9092
producer:
# key和value都默認(rèn)是StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
auto-offset-reset: earliest
# 配置模板默認(rèn)的主題topic名稱
template:
default-topic: default-topic配置類
package com.zzc.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
/**
* 生產(chǎn)者相關(guān)配置
* @return
*/
public Map<String, Object> producerConfigs(){
HashMap<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
return props;
}
public ProducerFactory<String, Object> producerFactory(){
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* KafkaTemplate 覆蓋相關(guān)配置類中的kafkaTemplate
* @return
*/
@Bean
public KafkaTemplate<String, Object> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
// 創(chuàng)建一個(gè)名為helloTopic的Topic并設(shè)置分區(qū)數(shù)為5,分區(qū)副本數(shù)為1
@Bean
public NewTopic newTopic(){
return new NewTopic("helloTopic", 5, (short) 1);
}
// 如果要修改分區(qū)數(shù),只需修改配置值重啟項(xiàng)目即可,修改分區(qū)數(shù)并不會(huì)導(dǎo)致數(shù)據(jù)的丟失,但是分區(qū)數(shù)只能增大不能減少
@Bean
public NewTopic updateTopic(){
return new NewTopic("helloTopic", 10, (short) 1);
}
}執(zhí)行測(cè)試代碼
public void sendEvent09(){
User user = User.builder().id(1200).phone("13698981234").birthDay(new Date()).build();
kafkaTemplate2.send(
"helloTopic",
user
); }
@Test
public void test09(){
for (int i = 0; i < 5; i++) {
eventProducer.sendEvent09();
}
}
debug模式,是進(jìn)入到RoundRobinPartitioner類中

查看消息的分區(qū)情況,發(fā)現(xiàn)并沒有完全的輪詢,有點(diǎn)誤差

4.13.2 自定義分配策略
創(chuàng)建自定義分配策略類實(shí)現(xiàn)Partitioner接口
public class CustomerPartitioner implements Partitioner {
private AtomicInteger nextPartition = new AtomicInteger(0);
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] bytes1, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key == null){
// 使用輪詢方式選擇分區(qū)
int next = nextPartition.getAndIncrement();
// 如果next大于分區(qū)的大小,則重置為0
if (next >= numPartitions){
nextPartition.compareAndSet(next, 0);
}
System.out.println("分區(qū)值:" + next);
return next;
}else {
// 如果key不為null,則使用默認(rèn)的分區(qū)策略
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}配置類代碼中將分配策略修改為自定義分配策略

使用debug模式執(zhí)行測(cè)試代碼,成功執(zhí)行到我們自定義的分配策略類中

執(zhí)行結(jié)果

為什么是每隔一個(gè)存一個(gè)分區(qū)呢?查看源代碼發(fā)現(xiàn)進(jìn)行了二次計(jì)算partition

4.13 生產(chǎn)者發(fā)送消息的流程

4.13.自定義攔截器攔截消息的發(fā)送
實(shí)現(xiàn)ProducerInterceptor接口,創(chuàng)建CustomerProducerInterceptor類
package com.zzc.config;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
public class CustomerProducerInterceptor implements ProducerInterceptor<String, Object> {
/**
* 發(fā)送消息時(shí),會(huì)先調(diào)用該方法,對(duì)信息進(jìn)行攔截,可以在攔截中對(duì)消息做一些處理,記錄日志等操作...
* @param producerRecord
* @return
*/
@Override
public ProducerRecord<String, Object> onSend(ProducerRecord<String, Object> producerRecord) {
System.out.println("攔截消息:" + producerRecord.toString());
return producerRecord;
}
/**
* 服務(wù)器收到消息后的一個(gè)確認(rèn)
* @param recordMetadata
* @param e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata != null){
System.out.println("服務(wù)器收到該消息:" + recordMetadata.offset());
}else {
System.out.println("消息發(fā)送失敗了,exception = " + e.getMessage());
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}配置類中添加攔截器

執(zhí)行測(cè)試,發(fā)現(xiàn)報(bào)錯(cuò)了

需要配置類中添加攔截器的名字

再次執(zhí)行測(cè)試,成功執(zhí)行了

4.14 獲取生產(chǎn)者發(fā)送的消息
之前模塊內(nèi)容比較多,重新創(chuàng)建一個(gè)模塊


消費(fèi)者類
@Component
public class EventConsumer {
// 采用監(jiān)聽的方式接收事件(消息、數(shù)據(jù))
@KafkaListener(topics = {"helloTopic"}, groupId = "helloGroup")
public void onEvent(String event){
System.out.println("讀取到的事件:" + event);
}
}生產(chǎn)者類
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void sendEvent() {
kafkaTemplate.send("helloTopic", "hello kafka");
}
}配置文件
spring:
application:
name: spring-boot-02-kafka-base
kafka:
bootstrap-servers: 192.168.2.118:9092測(cè)試代碼
@SpringBootTest
class KafkaBaseApplicationTests {
@Resource
private EventProducer eventProducer;
@Test
void test01(){
System.out.println(111);
eventProducer.sendEvent();
}
}啟動(dòng)服務(wù),執(zhí)行測(cè)試代碼,成功讀取到最新發(fā)送的消息

4.14.1 @Payload : 標(biāo)記該參數(shù)是消息體內(nèi)容
消費(fèi)者類參數(shù)添加@Payload注解

重啟服務(wù),執(zhí)行測(cè)試代碼 成功讀取到最新消息

4.14.2 @Header注解:標(biāo)記該參數(shù)是消息頭內(nèi)容
消費(fèi)者類參數(shù)添加@Header注解 獲取header中的topic和partition
@Component
public class EventConsumer {
// 采用監(jiān)聽的方式接收事件(消息、數(shù)據(jù))
@KafkaListener(topics = {"helloTopic"}, groupId = "helloGroup")
public void onEvent(@Payload String event,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition
){
System.out.println("讀取到的事件:" + event + ", topic:" + topic + ", partition:" + partition);
}
}重啟服務(wù)類,測(cè)試代碼不變,進(jìn)行測(cè)試

4.14.3 ConsumerRecord對(duì)象
可以從ConsumerRecord對(duì)象中獲取想要的內(nèi)容
@Component
public class EventConsumer {
// 采用監(jiān)聽的方式接收事件(消息、數(shù)據(jù))
@KafkaListener(topics = {"helloTopic"}, groupId = "helloGroup")
public void onEvent(@Payload String event,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> consumerRecord
){
System.out.println("讀取到的事件:" + event + ", topic:" + topic + ", partition:" + partition);
System.out.println("讀取到的consumerRecord:" + consumerRecord.toString());
}
}重啟服務(wù)類,測(cè)試代碼不變,進(jìn)行測(cè)試
想要的內(nèi)容都可以從ConsumerRecord對(duì)象中獲取

4.14.4 獲取對(duì)象類型數(shù)據(jù)
User類代碼
package com.zzc.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {
private int id;
private String phone;
private Date birthDay;
}EventConsumer類新增onEvent2方法
@KafkaListener(topics = {"helloTopic"}, groupId = "helloGroup")
public void onEvent2(User user,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> consumerRecord
){
System.out.println("讀取到的事件:" + user + ", topic:" + topic + ", partition:" + partition);
System.out.println("讀取到的consumerRecord:" + consumerRecord.toString());
}
EventProducer類新增sendEvent2方法
@Resource
private KafkaTemplate<String, Object> kafkaTemplate2;
public void sendEvent2(){
User user = User.builder().id(213234).phone("13239407234").birthDay(new Date()).build();
kafkaTemplate2.send("helloTopic", user);
}
測(cè)試類新增test02方法
@Test
public void test02(){
eventProducer.sendEvent2();
}
執(zhí)行測(cè)試,報(bào)錯(cuò)生產(chǎn)者不能將User轉(zhuǎn)換成String類型

去配置文件中修改生產(chǎn)者和消費(fèi)者的value序列化器
spring:
application:
name: spring-boot-02-kafka-base
kafka:
bootstrap-servers: 192.168.2.118:9092
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
value-deserializer: org.springframework.kafka.support.seri重新啟動(dòng)服務(wù),依然報(bào)錯(cuò),說沒有找到j(luò)ackson的jar包

那我們?nèi)om文件中添加jackson依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>添加依賴后可以正常啟動(dòng)了

執(zhí)行測(cè)試代碼,服務(wù)一直報(bào)錯(cuò),說User類不受安全的,只有java.util, java.lang下的類才是安全的

解決方案:將對(duì)象類型轉(zhuǎn)為String類型進(jìn)行發(fā)送,讀取的時(shí)候再將String類型轉(zhuǎn)為對(duì)象類型
創(chuàng)建JSONUtils類
package com.zzc.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JSONUtils {
private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();
public static String toJSON(Object object){
try {
return OBJECTMAPPER.writeValueAsString(object);
}catch (JsonProcessingException e){
throw new RuntimeException(e);
}
}
public static <T> T toBean(String jsonStr, Class<T> clazz){
try {
return OBJECTMAPPER.readValue(jsonStr, clazz);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}修改EventProducer代碼,將原本的User類型改為String類型發(fā)送到topic中
public void sendEvent2(){
User user = User.builder().id(213234).phone("13239407234").birthDay(new Date()).build();
String userJson = JSONUtils.toJSON(user);
kafkaTemplate.send("helloTopic", userJson);
}
修改EventConsumer代碼,將原本中參數(shù)的User類型改為String類型,再轉(zhuǎn)換成User類型進(jìn)行消費(fèi)
@KafkaListener(topics = {"helloTopic"}, groupId = "helloGroup")
public void onEvent2(String userStr,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> consumerRecord
){
User user = (User) JSONUtils.toBean(userStr, User.class);
System.out.println("讀取到的事件:" + user + ", topic:" + topic + ", partition:" + partition);
System.out.println("讀取到的consumerRecord:" + consumerRecord.toString());
}
將配置文件中的消費(fèi)者和生產(chǎn)者配置都注釋掉
spring:
application:
name: spring-boot-02-kafka-base
kafka:
bootstrap-servers: 192.168.2.118:9092
# producer:
# value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# consumer:
# value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer重啟服務(wù),再次執(zhí)行測(cè)試代碼

4.14.5 獲取自定義配置參數(shù)的數(shù)據(jù)
自定義配置topic的name和consumer的group值,消費(fèi)者進(jìn)行讀取
spring:
application:
name: spring-boot-02-kafka-base
kafka:
bootstrap-servers: 192.168.2.118:9092
# producer:
# value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# consumer:
# value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
kafka:
topic:
name: helloTopic
consumer:
group: helloGroup使用${}的方式進(jìn)行讀取配置文件中的值
@KafkaListener(topics = {"${kafka.topic.name}"}, groupId = "kafka.consumer.group")
public void onEvent3(String userStr,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> consumerRecord
){
User user = (User) JSONUtils.toBean(userStr, User.class);
System.out.println("讀取到的事件3:" + user + ", topic:" + topic + ", partition:" + partition);
System.out.println("讀取到的consumerRecord3:" + consumerRecord.toString());
}重啟服務(wù),執(zhí)行測(cè)試代碼,能夠讀取到消息

4.14.6 ACK手動(dòng)確認(rèn)消息
? 默認(rèn)情況下, Kafka 消費(fèi)者消費(fèi)消息后會(huì)自動(dòng)發(fā)送確認(rèn)信息給 Kafka 服務(wù)器,表示消息已經(jīng)被成功消費(fèi)。但在
某些場(chǎng)景下,我們希望在消息處理成功后再發(fā)送確認(rèn),或者在消息處理失敗時(shí)選擇不發(fā)送確認(rèn),以便 Kafka 能
夠重新發(fā)送該消息;
EventConsumer類代碼
@KafkaListener(topics = {"${kafka.topic.name}"}, groupId = "kafka.consumer.group")
public void onEvent4(String userStr,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> consumerRecord,
Acknowledgment acknowledgment
){
User user = (User) JSONUtils.toBean(userStr, User.class);
System.out.println("讀取到的事件4:" + user + ", topic:" + topic + ", partition:" + partition);
System.out.println("讀取到的consumerRecord4:" + consumerRecord.toString());
}
配置文件中添加手動(dòng)ack模式
kafka:
bootstrap-servers: 192.168.2.118:9092
listener:
ack-mode: manual重啟服務(wù),執(zhí)行測(cè)試代碼。無論重啟多少此服務(wù),都能讀取到這條消息,因?yàn)檫€沒有確認(rèn)消費(fèi)這條消息,所以offset一直沒有變

如果在代碼中加入確認(rèn)消費(fèi)的話,那么就只會(huì)讀取一次,offset也會(huì)發(fā)生變化

重啟服務(wù)后,不再讀取到這條消息了

平常業(yè)務(wù)中可以這么寫
@KafkaListener(topics = {"${kafka.topic.name}"}, groupId = "kafka.consumer.group")
public void onEvent4(String userStr,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> consumerRecord,
Acknowledgment acknowledgment
){
try {
User user = (User) JSONUtils.toBean(userStr, User.class);
System.out.println("讀取到的事件4:" + user + ", topic:" + topic + ", partition:" + partition);
System.out.println("讀取到的consumerRecord4:" + consumerRecord.toString());
int i = 1 / 0;
// 可以執(zhí)行完所有業(yè)務(wù),再進(jìn)行確認(rèn)消息。如果執(zhí)行過程中發(fā)生異常,那么可以再次消費(fèi)此消息
acknowledgment.acknowledge();
}catch (Exception e){
e.printStackTrace();
}
}
4.14.7 指定 topic 、 partition 、 offset 消費(fèi)
創(chuàng)建配置類,指定生成5個(gè)分區(qū)
@Configuration
public class KafkaConfig {
// 創(chuàng)建一個(gè)名為helloTopic的Topic并設(shè)置分區(qū)數(shù)為5,分區(qū)副本數(shù)為1
@Bean
public NewTopic newTopic(){
return new NewTopic("helloTopic", 5, (short) 1);
}
}EventConsumer類中新增onEvent5方法
@KafkaListener(groupId = "${kafka.consumer.group}",
// 配置更加詳細(xì)的監(jiān)聽信息 topics和topicPartitions不能同時(shí)使用
topicPartitions = {
@TopicPartition(
topic = "${kafka.topic.name}",
// 監(jiān)聽topic的0、1、2號(hào)分區(qū)的所有消息
partitions = {"0", "1", "2"},
// 監(jiān)聽3、4號(hào)分區(qū)中offset從3開始的消息
partitionOffsets = {
@PartitionOffset(partition = "3", initialOffset = "3"),
@PartitionOffset(partition = "4", initialOffset = "3")
}
)
})
public void onEvent5(String userStr,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> consumerRecord,
Acknowledgment acknowledgment
){
try {
User user = (User) JSONUtils.toBean(userStr, User.class);
System.out.println("讀取到的事件5:" + user + ", topic:" + topic + ", partition:" + partition);
System.out.println("讀取到的consumerRecord5:" + consumerRecord.toString());
acknowledgment.acknowledge();
}catch (Exception e){
e.printStackTrace();
}
}
EventProducer新增sendEvent3方法
public void sendEvent3(){
for (int i = 0; i < 25; i++) {
User user = User.builder().id(i).phone("13239407234" + i).birthDay(new Date()).build();
String userJson = JSONUtils.toJSON(user);
kafkaTemplate2.send("helloTopic", "k" + i, userJson);
}
}
重啟服務(wù),執(zhí)行測(cè)試代碼
@Test
public void test03(){
eventProducer.sendEvent3();
}
生成的25個(gè)消息已經(jīng)發(fā)送到0~4號(hào)分區(qū)里了

消費(fèi)消息,注意:需要停止服務(wù),先運(yùn)行測(cè)試代碼,再啟動(dòng)服務(wù)
發(fā)現(xiàn)只消費(fèi)了3條消息


現(xiàn)在去配置文件中修改成從最早的消息開始消費(fèi)
consumer:
# 從最早的消息開始消費(fèi)
auto-offset-reset: earliest再次重啟服務(wù)進(jìn)行消費(fèi),發(fā)現(xiàn)還是只消費(fèi)到3條消息

這是怎么回事呢?我們之前有遇到過這種情況,有兩個(gè)解決方案
- 手動(dòng)修改分區(qū)的偏移量
- 換一個(gè)消費(fèi)組id
我們?nèi)ヅ渲梦募袚Q一個(gè)groupId,由原來的helloGroup改為helloGroup1

再次重啟服務(wù),發(fā)現(xiàn)已經(jīng)讀取到19個(gè)消息了


再次重啟服務(wù)的話,發(fā)現(xiàn)又只能消費(fèi)3個(gè)消息了

4.14.8 批量消費(fèi)消息
重新創(chuàng)建一個(gè)模塊 spring-boot-03-kafka-base
配置文件進(jìn)行批量消費(fèi)配置
spring:
application:
name: spring-boot-03-kafka-base
kafka:
bootstrap-servers: 192.168.2.118:9092
consumer:
# 設(shè)置批量最多消費(fèi)多少條消息
max-poll-records: 20
listener:
# 設(shè)置批量消費(fèi)
type: batch創(chuàng)建EventConsumer類
package com.zzc.springboot03kafkabase.cosumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class EventConsumer {
@KafkaListener(topics = "batchTopic", groupId = "bactchGroup")
public void onEvent(List<ConsumerRecord<String, String>> records) {
System.out.println(" 批量消費(fèi), records.size() = " + records.size() + " , records = " + records);
}
}User類
package com.zzc.springboot03kafkabase.model;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.Date;
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {
private int id;
private String phone;
private Date birthDay;
}創(chuàng)建EventProducer類
package com.zzc.springboot03kafkabase.producer;
import com.zzc.springboot03kafkabase.model.User;
import com.zzc.springboot03kafkabase.util.JSONUtils;
import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void sendEvent(){
for (int i = 0; i < 125; i++) {
User user = User.builder().id(i).phone("13239407234" + i).birthDay(new Date()).build();
String userJson = JSONUtils.toJSON(user);
kafkaTemplate.send("batchTopic", "k" + i, userJson);
}
}
}創(chuàng)建Json字符串轉(zhuǎn)換對(duì)象工具類
package com.zzc.springboot03kafkabase.util;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JSONUtils {
private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();
public static String toJSON(Object object){
try {
return OBJECTMAPPER.writeValueAsString(object);
}catch (JsonProcessingException e){
throw new RuntimeException(e);
}
}
public static <T> T toBean(String jsonStr, Class<T> clazz){
try {
return OBJECTMAPPER.readValue(jsonStr, clazz);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.5</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.zzc</groupId>
<artifactId>spring-boot-03-kafka-base</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-boot-03-kafka-base</name>
<description>spring-boot-03-kafka-base</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>先執(zhí)行測(cè)試文件,生成125個(gè)消息到batchTopic的主題中

啟動(dòng)服務(wù),發(fā)現(xiàn)一條消息也沒有消費(fèi)到

這個(gè)問題之前也遇到過,因?yàn)槟J(rèn)是最后一個(gè)偏移量+1開始消費(fèi)的。
此時(shí)我們需要先在配置文件中將消費(fèi)消息配置成從最早消息開始消費(fèi)
consumer:
# 設(shè)置批量最多消費(fèi)多少條消息
max-poll-records: 20
auto-offset-reset: earliest修改groupId,因?yàn)橹耙呀?jīng)使用這個(gè)groupId消費(fèi)過次一次了 所以要換一個(gè)groupId

重啟服務(wù),成功消費(fèi)到消息。每次最多消費(fèi)20條,總共125條消息都消費(fèi)到了。

4.15 消費(fèi)消息攔截器
? 在消息消費(fèi)之前,我們可以通過配置攔截器對(duì)消息進(jìn)行攔截,在消息被實(shí)際處理之前對(duì)其進(jìn)行一些操作,例如記錄日志、修改消息內(nèi)容或執(zhí)行一些安全檢查等;
4.15.1 創(chuàng)建新模塊spring-boot-04-kafka-base,依賴還是springboot、Lombok、kafka這三個(gè)
4.15.2 主文件中添加代碼
package com.zzc;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import java.util.Map;
@SpringBootApplication
public class SpringBoot04KafkaBaseApplication {
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(SpringBoot04KafkaBaseApplication.class, args);
Map<String, ConsumerFactory> beansOfType = context.getBeansOfType(ConsumerFactory.class);
beansOfType.forEach((k, v) -> {
System.out.println(k + " -- " + v);
});
Map<String, KafkaListenerContainerFactory> beansOfType2 = context.getBeansOfType(KafkaListenerContainerFactory.class);
beansOfType2.forEach((k, v) -> {
System.out.println(k + " -- " + v);
});
}
}啟動(dòng)服務(wù)類,發(fā)現(xiàn)容器中默認(rèn)有kafkaConsumerFactory和kafkaListenerContainerFactory類

我們需要使用自己的kafkaConsumerFactory和kafkaListenerContainerFactory,因?yàn)槲覀冃枰由蠑r截器
4.15.2 創(chuàng)建攔截器CustomConsumerInterceptor
package com.zzc.interceptor;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import java.util.Map;
public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String > {
/**
* 在消費(fèi)消息之前執(zhí)行
* @param consumerRecords
* @return
*/
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
System.out.println("onConsumer方法執(zhí)行:" + consumerRecords);
return consumerRecords;
}
/**
* 消息拿到之后,提交offset之前執(zhí)行該方法
* @param offsets
*/
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println("onCommit方法執(zhí)行:" + offsets);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}4.15.3 創(chuàng)建配置類
package com.zzc.config;
import com.zzc.interceptor.CustomConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeSerializer;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeSerializer;
public Map<String, Object> consumerConfigs(){
HashMap<String, Object> consumer = new HashMap<>();
consumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
consumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeSerializer);
consumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeSerializer);
// 添加一個(gè)消費(fèi)攔截器
consumer.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());
return consumer;
}
/**
* 消費(fèi)者創(chuàng)建工廠
* @return
*/
@Bean
public ConsumerFactory<String, String> ourConsumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/**
* 監(jiān)聽器容器工廠
* @param ourConsumerFactory
* @return
*/
@Bean
public KafkaListenerContainerFactory ourKafkaListenerContainerFactory(ConsumerFactory ourConsumerFactory){
ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
listenerContainerFactory.setConsumerFactory(ourConsumerFactory);
return listenerContainerFactory;
}
}4.15.4 測(cè)試spring容器默認(rèn)的和自定義的消費(fèi)者創(chuàng)建工廠和監(jiān)聽器容器工廠
重啟服務(wù),測(cè)試容器中用的已經(jīng)是我們自己創(chuàng)建的消費(fèi)者創(chuàng)建工廠和監(jiān)聽器容器工廠了

我們自定義的監(jiān)聽器容器工廠的配置中可以看到有我們創(chuàng)建的攔截器對(duì)象

spring的默認(rèn)監(jiān)聽器工廠對(duì)象的配置中就沒有我們創(chuàng)建的攔截器對(duì)象

4.15.5 消費(fèi)消息
創(chuàng)建消費(fèi)者對(duì)象,KafkaListener注解加上containerFactory參數(shù)
package com.zzc.cosumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class EventConsumer {
@KafkaListener(topics = {"interTopic"}, groupId = "interGroup", containerFactory = "ourKafkaListenerContainerFactory")
public void onEvent(ConsumerRecord<String, String> records) {
System.out.println(" 消費(fèi)消息, records = " + records);
}
}創(chuàng)建生產(chǎn)者對(duì)象
package com.zzc.producer;
import com.zzc.model.User;
import com.zzc.util.JSONUtils;
import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void sendEvent() {
User user = User.builder().id(1023).phone("13239407234").birthDay(new Date()).build();
String userJson = JSONUtils.toJSON(user);
kafkaTemplate.send("interTopic", "k", userJson);
}
}測(cè)試代碼
@Resource
private EventProducer eventProducer;
@Test
public void test(){
eventProducer.sendEvent();
}啟動(dòng)服務(wù),再執(zhí)行測(cè)試代碼,成功打印出攔截器中的消息

測(cè)試KafkaListener注解中不加containerFactory參數(shù)是否會(huì)打印攔截器的消息
@Component
public class EventConsumer {
// @KafkaListener(topics = {"interTopic"}, groupId = "interGroup", containerFactory = "ourKafkaListenerContainerFactory")
@KafkaListener(topics = {"interTopic"}, groupId = "interGroup", )
public void onEvent(ConsumerRecord<String, String> records) {
System.out.println(" 消費(fèi)消息, records = " + records);
}
}重啟服務(wù),再次執(zhí)行測(cè)試代碼,發(fā)現(xiàn)并沒有打印出攔截器的消息

4.16 消息轉(zhuǎn)發(fā)
? 消息轉(zhuǎn)發(fā)就是應(yīng)用 A 從 TopicA 接收到消息,經(jīng)過處理后轉(zhuǎn)發(fā)到 TopicB ,再由應(yīng)用 B 監(jiān)聽接收該消息,即一個(gè)應(yīng)用處理完成后將該消息轉(zhuǎn)發(fā)至其他應(yīng)用處理,這在實(shí)際開發(fā)中,是可能存在這樣的需求的;
創(chuàng)建一個(gè)新模塊spring-boot-05-kafka-base,結(jié)構(gòu)如下

consumer代碼
package com.zzc.cosumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class EventConsumer {
@KafkaListener(topics = {"topicA"}, groupId = "group1")
@SendTo("topicB") // 轉(zhuǎn)發(fā)消息給topicB
public String onEvent(ConsumerRecord<String, String> record) {
System.out.println(" 消費(fèi)消息, record = " + record);
return record.value() + "forward message";
}
@KafkaListener(topics = {"topicB"}, groupId = "group2")
public void onEvent2(List<ConsumerRecord<String, String>> records) {
System.out.println(" 消費(fèi)消息, record = " + records);
}
}producer代碼
package com.zzc.producer;
import com.zzc.model.User;
import com.zzc.util.JSONUtils;
import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void sendEvent() {
User user = User.builder().id(1023).phone("13239407234").birthDay(new Date()).build();
String userJson = JSONUtils.toJSON(user);
kafkaTemplate.send("topicA", "k", userJson);
}
}啟動(dòng)服務(wù),執(zhí)行測(cè)試代碼


4.17 消息消費(fèi)的分區(qū)策略
- Kafka 消費(fèi)消息時(shí)的分區(qū)策略:是指 Kafka 主題 topic 中哪些分區(qū)應(yīng)該由哪些消費(fèi)者來消費(fèi);

- Kafka 有多種分區(qū)分配策略,默認(rèn)的分區(qū)分配策略是RangeAssignor ,除了 RangeAssignor 策略外, Kafka 還有其他分區(qū)分配策略:
- RoundRobinAssignor
- StickyAssignor
- CooperativeStickyAssignor ,
- 這些策略各有特點(diǎn),可以根據(jù)實(shí)際的應(yīng)用場(chǎng)景和需求來選擇適合的分區(qū)分配策略;

4.17.1 RangeAssignor 策略
創(chuàng)建新模塊spring-boot-06-kafka-base
配置類KafkaConfig
package com.zzc.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class KafkaConfig {
// 創(chuàng)建一個(gè)名為helloTopic的Topic并設(shè)置分區(qū)數(shù)為5,分區(qū)副本數(shù)為1
@Bean
public NewTopic newTopic(){
return new NewTopic("myTopic", 10, (short) 1);
}
}消費(fèi)者類EventConsumer
package com.zzc.cosumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class EventConsumer {
// concurrency 消費(fèi)者數(shù)量
@KafkaListener(topics = {"myTopic"}, groupId = "myGroup", concurrency = "3")
public void onEvent(ConsumerRecord<String, String> records) {
System.out.println(" 消費(fèi)消息, records = " + records);
}
}生產(chǎn)者類
package com.zzc.producer;
import com.zzc.model.User;
import com.zzc.util.JSONUtils;
import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void sendEvent() {
for (int i = 0; i < 100; i++) {
User user = User.builder().id(i).phone("13239407234" + i).birthDay(new Date()).build();
String userJson = JSONUtils.toJSON(user);
kafkaTemplate.send("myTopic", "k" + i, userJson);
}
}
}測(cè)試代碼
package com.zzc;
import com.zzc.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringBoot06KafkaBaseApplicationTests {
@Resource
private EventProducer eventProducer;
@Test
public void test(){
eventProducer.sendEvent();
}
}配置文件
spring:
application:
name: spring-boot-06-kafka-base
kafka:
bootstrap-servers: 192.168.2.118:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest先執(zhí)行測(cè)試代碼,生產(chǎn)100個(gè)消息發(fā)送到10個(gè)分區(qū)中

啟動(dòng)服務(wù),進(jìn)行消費(fèi),打印出100個(gè)消息

我們來看一下最小的線程id38是否消費(fèi)4個(gè)分區(qū)


線程id38確實(shí)是消費(fèi)了0、1、2、3號(hào)共4個(gè)分區(qū)。其他兩個(gè)線程各消費(fèi)3個(gè)分區(qū)
4.17.2 RoundRobinAssignor策略
配置文件中無法修改策略,所以需要在配置類中設(shè)置
配置類代碼
package com.zzc.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeSerializer;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeSerializer;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
public Map<String, Object> consumerConfigs(){
HashMap<String, Object> consumer = new HashMap<>();
consumer.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);
consumer.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeSerializer);
consumer.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeSerializer);
consumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
// 設(shè)置消費(fèi)者策略為輪詢模式
consumer.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
return consumer;
}
// 創(chuàng)建一個(gè)名為helloTopic的Topic并設(shè)置分區(qū)數(shù)為5,分區(qū)副本數(shù)為1
@Bean
public NewTopic newTopic(){
return new NewTopic("myTopic", 10, (short) 1);
}
/**
* 消費(fèi)者創(chuàng)建工廠
* @return
*/
@Bean
public ConsumerFactory<String, String> ourConsumerFactory(){
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/**
* 監(jiān)聽器容器工廠
* @param ourConsumerFactory
* @return
*/
@Bean
public KafkaListenerContainerFactory ourKafkaListenerContainerFactory(ConsumerFactory ourConsumerFactory){
ConcurrentKafkaListenerContainerFactory<String, String> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
listenerContainerFactory.setConsumerFactory(ourConsumerFactory);
return listenerContainerFactory;
}
}消費(fèi)者代碼中設(shè)置為自定義監(jiān)聽器容器創(chuàng)建工廠
package com.zzc.cosumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class EventConsumer {
// concurrency 設(shè)置消費(fèi)者數(shù)量 containerFactory 設(shè)置監(jiān)聽器容器工廠
@KafkaListener(topics = {"myTopic"}, groupId = "myGroup4", concurrency = "3", containerFactory = "ourKafkaListenerContainerFactory")
public void onEvent(ConsumerRecord<String, String> records) {
System.out.println(Thread.currentThread().getId() + " --> 消費(fèi)消息, records = " + records);
}
}執(zhí)行測(cè)試代碼,發(fā)現(xiàn)線程id39消費(fèi)的分區(qū)變成0、3、6、9號(hào)分區(qū)了


采用 RoundRobinAssignor 策略進(jìn)行測(cè)試,得到的結(jié)果如下:
39 : 0 , 3 , 6 , 9
41 : 1 , 4 , 7
43 : 2 , 5 , 8
4.17.3 StickyAssignor 消費(fèi)分區(qū)策略
- 盡可能保持消費(fèi)者與分區(qū)之間的分配關(guān)系不變,即使消費(fèi)組的消費(fèi)者成員發(fā)生變化,減少不必要的分區(qū)重分配;
- 盡量保持現(xiàn)有的分區(qū)分配不變,僅對(duì)新加入的消費(fèi)者或離開的消費(fèi)者進(jìn)行分區(qū)調(diào)整。這樣,大多數(shù)消費(fèi)者可以
繼續(xù)消費(fèi)它們之前消費(fèi)的分區(qū),只有少數(shù)消費(fèi)者需要處理額外的分區(qū);所以叫“粘性”分配;
4.17.4 CooperativeStickyAssignor 消費(fèi)分區(qū)策略
- 與 StickyAssignor 類似,但增加了對(duì)協(xié)作式重新平衡的支持,即消費(fèi)者可以在它離開消費(fèi)者組之前通知協(xié)調(diào)
器,以便協(xié)調(diào)器可以預(yù)先計(jì)劃分區(qū)遷移,而不是在消費(fèi)者突然離開時(shí)立即進(jìn)行分區(qū)重分配;
4.18 Kafka 事件 ( 消息、數(shù)據(jù) ) 的存儲(chǔ)
kafka的所有事件(消息、數(shù)據(jù))都存儲(chǔ)在/tmp/kafka-logs目錄中,可通過log.dirs=/tmp/kafka-logs配置
Kafka的所有事件(消息、數(shù)據(jù))都是以日志文件的方式來保存
Kafka一般都是海量的消息數(shù)據(jù),為了避免日志文件過大,日志文件被存放在多個(gè)日志目錄下,日志目錄的命名規(guī)則為:<topic_name>-<partiton_id>
比如創(chuàng)建一個(gè)名為 firstTopic 的 topic ,其中有 3 個(gè) partition ,那么在 kafka 的數(shù)據(jù)目錄( /tmp/kafka-
log )中就有 3 個(gè)目錄, firstTopic-0 、 firstTopic-1 、 firstTopic-2 ;
? 進(jìn)入myTopic-0中

查看日志信息

- 00000000000000000000.index 消息索引文件
- 00000000000000000000.log 消息數(shù)據(jù)文件
- 00000000000000000000.timeindex 消息的時(shí)間戳索引文件
- 00000000000000000006.snapshot 快照文件,生產(chǎn)者發(fā)生故障或重啟時(shí)能夠恢復(fù)并繼續(xù)之前的操作
- leader-epoch-checkpoint 記錄每個(gè)分區(qū)當(dāng)前領(lǐng)導(dǎo)者的 epoch 以及領(lǐng)導(dǎo)者開始寫入消息時(shí)的起始偏移量
- partition.metadata 存儲(chǔ)關(guān)于特定分區(qū)的元數(shù)據(jù)( metadata )信息
每次消費(fèi)一個(gè)消息并且提交以后,會(huì)保存當(dāng)前消費(fèi)到的最近的一個(gè) offset ; 在 kafka 中,有一個(gè) __consumer_offsets 的 topic , 消費(fèi)者消費(fèi)提交的 offset 信息會(huì)寫入到 該 topic 中, __consumer_offsets 保存了每個(gè) consumer group 某一時(shí)刻提交的 offset 信息 , __consumer_offsets 默認(rèn)有 50 個(gè)分區(qū); consumer_group 保存在哪個(gè)分區(qū)中的計(jì)算公式: Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ;
4.19 Offset詳解
4.19.1 生產(chǎn)者Offset
生產(chǎn)者發(fā)送一條消息到 Kafka 的 broker 的某個(gè) topic 下某個(gè) partition 中;
Kafka 內(nèi)部會(huì)為每條消息分配一個(gè)唯一的 offset ,該 offset 就是該消息在 partition 中的位置


創(chuàng)建spring-boot-07-kafka-base模塊
消費(fèi)者代碼
package com.zzc.cosumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class EventConsumer {
@KafkaListener(topics = {"offsetTopic"}, groupId = "offsetGroup")
public void onEvent(ConsumerRecord<String, String> records) {
System.out.println(Thread.currentThread().getId() + " --> 消費(fèi)消息, records = " + records);
}
}生產(chǎn)者代碼
package com.zzc.producer;
import com.zzc.model.User;
import com.zzc.util.JSONUtils;
import jakarta.annotation.Resource;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import java.util.Date;
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void sendEvent() {
for (int i = 0; i < 2; i++) {
User user = User.builder().id(i).phone("13239407234" + i).birthDay(new Date()).build();
String userJson = JSONUtils.toJSON(user);
kafkaTemplate.send("offsetTopic", "k" + i, userJson);
}
}
}配置文件
spring:
application:
name: spring-boot-07-kafka-base
kafka:
bootstrap-servers: 192.168.2.118:9092
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer測(cè)試代碼
package com.zzc;
import com.zzc.producer.EventProducer;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringBoot07KafkaBaseApplicationTests {
@Resource
private EventProducer eventProducer;
@Test
public void test(){
eventProducer.sendEvent();
}
}執(zhí)行測(cè)試代碼

4.19.2 消費(fèi)者Offset
- 每個(gè)消費(fèi)者組啟動(dòng)開始監(jiān)聽消息,默認(rèn)從消息的最新的位置開始監(jiān)聽消息,即把最新的位置作為消費(fèi)者
offset ;- 分區(qū)中還沒有發(fā)送消息,則最新的位置就是0
- 分區(qū)中已經(jīng)發(fā)送過消息,則最新的位置就是生產(chǎn)者offset的下一個(gè)位置
- 消費(fèi)者消費(fèi)消息后,如果不提交確認(rèn)( ack ),則 offset 不更新,提交了才更新;
- 命令行命令: ./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group 消費(fèi)者組名 --describe
4.19.2.1 驗(yàn)證分區(qū)中已經(jīng)發(fā)送過消息的情況
啟動(dòng)服務(wù),監(jiān)聽器并沒有消費(fèi)到消息

使用命令看一下offsetGroup的offset是在哪

我們?cè)侔l(fā)兩條消息試試,先把服務(wù)停了,執(zhí)行測(cè)試代碼發(fā)送消息
再次執(zhí)行命令 查看offsetGroup的offset是在哪

我們現(xiàn)在啟動(dòng)服務(wù),能夠消費(fèi)到消息了

消費(fèi)完消息,再次執(zhí)行命令,發(fā)現(xiàn)current-offset已經(jīng)變成4了,也沒有消息可讀了

4.19.2.2 驗(yàn)證分區(qū)中還沒有發(fā)過消息的情況
我們把offsetTopic刪除,然后重啟服務(wù),再執(zhí)行命令

然后停止服務(wù),執(zhí)行測(cè)試代碼 發(fā)送消息,在執(zhí)行命令

我們?cè)賳?dòng)服務(wù),就能夠消費(fèi)這2個(gè)消息

4.19.3 offset總結(jié)
? 消費(fèi)者從什么位置開始消費(fèi),就看消費(fèi)者的 offset 是多少,消費(fèi)者 offset 是多少,它啟動(dòng)后,可以通過上面
的命令查看;
到此這篇關(guān)于SpringBoot集成Kafka開發(fā)的文章就介紹到這了,更多相關(guān)SpringBoot Kafka開發(fā)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實(shí)現(xiàn)九宮格的簡(jiǎn)單實(shí)例
這篇文章主要介紹了 Java實(shí)現(xiàn)九宮格的簡(jiǎn)單實(shí)例的相關(guān)資料,需要的朋友可以參考下2017-06-06
IntelliJ Plugin 開發(fā)之添加第三方j(luò)ar的示例代碼
這篇文章主要介紹了IntelliJ Plugin 開發(fā)之添加第三方j(luò)ar的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09
Java排序算法之歸并排序簡(jiǎn)單實(shí)現(xiàn)
這篇文章主要介紹了Java排序算法之歸并排序簡(jiǎn)單實(shí)現(xiàn),具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-12-12
java環(huán)境變量的配置方法圖文詳解【win10環(huán)境為例】
這篇文章主要介紹了java環(huán)境變量的配置方法,結(jié)合圖文形式詳細(xì)分析了win10環(huán)境下java環(huán)境變量的配置方法與相關(guān)操作注意事項(xiàng),需要的朋友可以參考下2020-04-04

