SpringBoot集成Kafka并使用多個(gè)死信隊(duì)列詳解
以下是Spring Boot集成Kafka并使用多個(gè)死信隊(duì)列的完整示例,包含代碼和配置說明。
1. 添加依賴 (pom.xml)
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
2. 配置文件 (application.yml)
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: my-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
3. 自定義異常類
public class BusinessException extends RuntimeException {
public BusinessException(String message) {
super(message);
}
}
4. Kafka配置類
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.util.backoff.FixedBackOff;
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
// Kafka生產(chǎn)者配置
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// Kafka消費(fèi)者配置
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(config);
}
// 自定義錯(cuò)誤處理器(支持多個(gè)死信隊(duì)列)
@Bean
public CommonErrorHandler errorHandler(KafkaTemplate<String, String> kafkaTemplate) {
// 重試策略:3次重試,間隔1秒
FixedBackOff backOff = new FixedBackOff(1000L, 3);
DefaultErrorHandler errorHandler = new DefaultErrorHandler((record, exception) -> {
String dlqTopic = determineDlqTopic(exception);
kafkaTemplate.send(dlqTopic, record.key(), record.value());
System.out.println("消息發(fā)送到死信隊(duì)列: " + dlqTopic);
}, backOff);
// 配置需要重試的異常類型
errorHandler.addRetryableExceptions(BusinessException.class);
errorHandler.addNotRetryableExceptions(SerializationException.class);
return errorHandler;
}
// 根據(jù)異常類型選擇死信隊(duì)列
private String determineDlqTopic(Throwable exception) {
if (exception.getCause() instanceof SerializationException) {
return "serialization-error-dlq";
} else if (exception.getCause() instanceof BusinessException) {
return "business-error-dlq";
} else {
return "general-error-dlq";
}
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setCommonErrorHandler(errorHandler(kafkaTemplate()));
return factory;
}
}
5. Kafka消費(fèi)者服務(wù)
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "main-topic")
public void consume(String message) {
try {
if (message.contains("invalid-format")) {
throw new SerializationException("消息格式錯(cuò)誤");
} else if (message.contains("business-error")) {
throw new BusinessException("業(yè)務(wù)處理失敗");
}
System.out.println("成功處理消息: " + message);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
6. 啟動(dòng)類
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
}
7. 測(cè)試步驟
1.創(chuàng)建Kafka主題:
kafka-topics --create --bootstrap-server localhost:9092 --topic main-topic
kafka-topics --create --bootstrap-server localhost:9092 --topic serialization-error-dlq
kafka-topics --create --bootstrap-server localhost:9092 --topic business-error-dlq
kafka-topics --create --bootstrap-server localhost:9092 --topic general-error-dlq
2.發(fā)送測(cè)試消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendTestMessages() {
kafkaTemplate.send("main-topic", "valid-message");
kafkaTemplate.send("main-topic", "invalid-format");
kafkaTemplate.send("main-topic", "business-error");
}
3.觀察死信隊(duì)列:
- 格式錯(cuò)誤的消息會(huì)進(jìn)入 serialization-error-dlq
- 業(yè)務(wù)異常的消息會(huì)進(jìn)入 business-error-dlq
- 其他異常進(jìn)入 general-error-dlq
關(guān)鍵點(diǎn)說明
錯(cuò)誤路由邏輯:通過determineDlqTopic方法根據(jù)異常類型選擇不同的死信隊(duì)列。
重試機(jī)制:通過FixedBackOff配置重試策略(最多重試3次,間隔1秒)。
異常分類:
- SerializationException(序列化問題)直接進(jìn)入死信隊(duì)列,不重試。
- BusinessException(業(yè)務(wù)異常)會(huì)觸發(fā)重試,最終失敗后進(jìn)入死信隊(duì)列。
到此這篇關(guān)于SpringBoot集成Kafka并使用多個(gè)死信隊(duì)列詳解的文章就介紹到這了,更多相關(guān)SpringBoot Kafka使用死信隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Spring Boot實(shí)現(xiàn)操作數(shù)據(jù)庫的接口的過程
本文給大家分享使用Spring Boot實(shí)現(xiàn)操作數(shù)據(jù)庫的接口的過程,包括springboot原理解析及實(shí)例代碼詳解,感興趣的朋友跟隨小編一起看看吧2021-07-07
Security 登錄認(rèn)證流程詳細(xì)分析詳解
本文Security登錄認(rèn)證流程詳細(xì)分析詳解,文中通過示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01
java實(shí)現(xiàn)去除ArrayList重復(fù)字符串
本文主要介紹了java實(shí)現(xiàn)去除ArrayList重復(fù)字符串,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2024-09-09
SpringBoot+Swagger-ui自動(dòng)生成API文檔
今天小編就為大家分享一篇關(guān)于SpringBoot+Swagger-ui自動(dòng)生成API文檔,小編覺得內(nèi)容挺不錯(cuò)的,現(xiàn)在分享給大家,具有很好的參考價(jià)值,需要的朋友一起跟隨小編來看看吧2019-03-03
Java中的異常處理(try,catch,finally,throw,throws)
本文主要介紹了Java中的異常處理,文章主要介紹的異常處理包括5個(gè)關(guān)鍵字try,catch,finally,throw,throws,更多詳細(xì)內(nèi)容需要的朋友可以參考一下2022-06-06
SpringBoot?mybatis-plus使用json字段實(shí)戰(zhàn)指南
在現(xiàn)代應(yīng)用開發(fā)中經(jīng)常會(huì)使用JSON格式存儲(chǔ)和傳輸數(shù)據(jù),為了便捷地處理數(shù)據(jù)庫中的JSON字段,MyBatis-Plus提供了強(qiáng)大的JSON處理器,這篇文章主要給大家介紹了關(guān)于SpringBoot?mybatis-plus使用json字段的相關(guān)資料,需要的朋友可以參考下2024-01-01
Java Integer.valueOf()和Integer.parseInt()的區(qū)別說明
這篇文章主要介紹了Java Integer.valueOf()和Integer.parseInt()的區(qū)別說明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2020-08-08
Java使用EasyExcel進(jìn)行單元格合并的問題詳解
項(xiàng)目中需要導(dǎo)出并合并指定的單元格,下面這篇文章主要給大家介紹了關(guān)于java評(píng)論、回復(fù)功能設(shè)計(jì)與實(shí)現(xiàn)的相關(guān)資料,文中通過實(shí)例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-06-06
java中靜態(tài)導(dǎo)入機(jī)制用法實(shí)例詳解
這篇文章主要介紹了java中靜態(tài)導(dǎo)入機(jī)制用法實(shí)例詳解的相關(guān)資料,需要的朋友可以參考下2017-07-07

