SpringBoot與Kafka整合方案
為什么 Kafka 是高吞吐場景的首選?
在當今的分布式系統(tǒng)中,消息隊列已成為不可或缺的基礎設施。面對不同的業(yè)務場景,選擇合適的消息隊列至關重要。目前主流的消息中間件中,Kafka 以其獨特的設計脫穎而出:
- 超高吞吐量:單機可輕松處理每秒數(shù)十萬條消息
- 持久化存儲:基于磁盤的高效存儲機制,支持海量消息堆積
- 水平擴展:通過分區(qū)機制實現(xiàn)無縫擴展
- 流處理能力:內(nèi)置流處理 API,支持復雜的數(shù)據(jù)轉換和處理
根據(jù) Apache Kafka 官方數(shù)據(jù),Kafka 在全球財富 100 強公司中被廣泛采用,包括 Netflix、Uber、LinkedIn 等,處理著每天 PB 級別的數(shù)據(jù)。其發(fā)布 - 訂閱模式和日志存儲特性,使其特別適合日志收集、事件溯源、實時分析等場景。
本文將帶你全面掌握 SpringBoot 與 Kafka 的整合方案,從環(huán)境搭建到高級特性,從代碼實現(xiàn)到性能調優(yōu),讓你既能理解底層原理,又能解決實際開發(fā)中的各種問題。
一、Kafka 核心概念與架構
1.1 核心概念解析
Kafka 的核心概念包括:
- Producer:消息生產(chǎn)者,負責向 Kafka 發(fā)送消息
- Consumer:消息消費者,負責從 Kafka 讀取消息
- Broker:Kafka 服務器節(jié)點,一個 Kafka 集群由多個 Broker 組成
- Topic:主題,消息的分類名稱,生產(chǎn)者向主題發(fā)送消息,消費者從主題讀取消息
- Partition:分區(qū),每個主題可以分為多個分區(qū),分區(qū)是 Kafka 并行處理的基本單位
- Replica:副本,為保證數(shù)據(jù)可靠性,每個分區(qū)可以有多個副本
- Leader:主副本,每個分區(qū)有一個主副本,負責處理讀寫請求
- Follower:從副本,同步主副本的數(shù)據(jù),主副本故障時可升級為主副本
- Consumer Group:消費者組,多個消費者可以組成一個消費者組,共同消費一個主題的消息
- Offset:偏移量,每個分區(qū)中的消息都有一個唯一的偏移量,用于標識消息在分區(qū)中的位置
1.2 架構原理
Kafka 的整體架構如圖所示:

消息流轉流程:
- 生產(chǎn)者將消息發(fā)送到指定主題
- 消息被分配到主題的一個分區(qū)中(可通過分區(qū)策略指定)
- 分區(qū)的主副本負責接收并存儲消息,同時從副本同步數(shù)據(jù)
- 消費者組中的消費者從分區(qū)讀取消息,每個分區(qū)只能被消費者組中的一個消費者消費
- 消費者通過偏移量記錄自己的消費位置
根據(jù) Kafka 官方文檔(Apache Kafka),這種架構設計使得 Kafka 具有極高的吞吐量和可靠性,能夠滿足大規(guī)模數(shù)據(jù)處理的需求。
1.3 分區(qū)與消費者組機制
分區(qū)是 Kafka 實現(xiàn)高吞吐量的關鍵機制:
- 每個分區(qū)是一個有序的、不可變的消息序列
- 消息被追加到分區(qū)的末尾,類似日志文件
- 分區(qū)可以分布在不同的 Broker 上,實現(xiàn)負載均衡
消費者組機制則實現(xiàn)了消息的并行消費:
- 每個消費者組獨立消費主題的所有消息
- 同一個消費者組中的消費者共享消費負載
- 每個分區(qū)只能被消費者組中的一個消費者消費
- 消費者數(shù)量不應超過分區(qū)數(shù)量,否則多余的消費者將處于空閑狀態(tài)
分區(qū)與消費者組的關系如圖所示:

二、環(huán)境搭建
2.1 安裝 Kafka
我們采用最新穩(wěn)定版 Kafka 3.6.1 進行安裝,步驟如下:
- 安裝 Java 環(huán)境(Kafka 依賴 Java):
# 對于Ubuntu/Debian sudo apt-get update sudo apt-get install openjdk-17-jdk # 對于CentOS/RHEL sudo yum install java-17-openjdk
- 下載并解壓 Kafka:
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.13-3.6.1.tgz tar -xzf kafka_2.13-3.6.1.tgz cd kafka_2.13-3.6.1
- 啟動 ZooKeeper(Kafka 依賴 ZooKeeper 管理元數(shù)據(jù)):
# 后臺啟動ZooKeeper bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
- 啟動 Kafka Broker:
# 后臺啟動Kafka bin/kafka-server-start.sh -daemon config/server.properties
- 創(chuàng)建測試主題:
bin/kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
- 查看主題列表:
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
2.2 安裝 Docker 方式(推薦)
使用 Docker Compose 安裝 Kafka 更加簡單快捷:
創(chuàng)建 docker-compose.yml 文件:
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
啟動服務:
docker-compose up -d
三、SpringBoot 集成 Kafka 基礎
3.1 創(chuàng)建項目并添加依賴
我們使用 SpringBoot 3.2.0(最新穩(wěn)定版)來創(chuàng)建項目,首先在 pom.xml 中添加必要的依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.0</version>
<relativePath/>
</parent>
<groupId>com.jam</groupId>
<artifactId>springboot-kafka-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot-kafka-demo</name>
<description>SpringBoot集成Kafka示例項目</description>
<properties>
<java.version>17</java.version>
<lombok.version>1.18.30</lombok.version>
<commons-lang3.version>3.14.0</commons-lang3.version>
<mybatis-plus.version>3.5.5</mybatis-plus.version>
<mysql-connector.version>8.2.0</mysql-connector.version>
<springdoc.version>2.1.0</springdoc.version>
<kafka.version>3.6.1</kafka.version>
</properties>
<dependencies>
<!-- SpringBoot核心依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Kafka依賴 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>${kafka.version}</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<!-- 工具類 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<!-- MyBatis-Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>${mybatis-plus.version}</version>
</dependency>
<!-- MySQL驅動 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>${mysql-connector.version}</version>
<scope>runtime</scope>
</dependency>
<!-- Swagger3 -->
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
<version>${springdoc.version}</version>
</dependency>
<!-- 測試依賴 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<version>${kafka.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.2 配置 Kafka
在 application.yml 中添加 Kafka 的配置:
spring:
application:
name: springboot-kafka-demo
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/kafka_demo?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai
username: root
password: root
kafka:
# Kafka集群地址
bootstrap-servers: localhost:9092
# 生產(chǎn)者配置
producer:
# 消息key的序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消息value的序列化器
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 批次大小,當批次滿了之后才會發(fā)送
batch-size: 16384
# 緩沖區(qū)大小
buffer-memory: 33554432
# 消息確認機制:0-不需要確認,1-只需要leader確認,all-所有副本都需要確認
acks: all
# 重試次數(shù)
retries: 3
# 重試間隔時間
retry-backoff-ms: 1000
# 消費者配置
consumer:
# 消息key的反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消息value的反序列化器
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 消費者組ID
group-id: default-group
# 自動偏移量重置策略:earliest-從頭開始消費,latest-從最新的開始消費,none-如果沒有偏移量則拋出異常
auto-offset-reset: earliest
# 是否自動提交偏移量
enable-auto-commit: false
# 自動提交偏移量的間隔時間
auto-commit-interval: 1000
# 指定JsonDeserializer反序列化的目標類
properties:
spring:
json:
trusted:
packages: com.jam.entity
# 監(jiān)聽器配置
listener:
# 消息確認模式:manual-手動確認,auto-自動確認
ack-mode: manual_immediate
# 并發(fā)消費者數(shù)量
concurrency: 3
# 批量消費配置
batch-listener: false
# 每次拉取的記錄數(shù)
consumer:
max-poll-records: 500
# 重試配置
retry:
# 是否啟用重試
enabled: true
# 初始重試間隔時間
initial-interval: 1000
# 最大重試間隔時間
max-interval: 10000
# 重試乘數(shù)
multiplier: 2
# 最大重試次數(shù)
max-attempts: 3
mybatis-plus:
mapper-locations: classpath:mapper/*.xml
type-aliases-package: com.jam.entity
configuration:
map-underscore-to-camel-case: true
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
springdoc:
api-docs:
path: /api-docs
swagger-ui:
path: /swagger-ui.html
operationsSorter: method
server:
port: 8081
3.3 創(chuàng)建 Kafka 常量配置類
創(chuàng)建常量類,定義 Kafka 相關的常量:
package com.jam.config;
/**
* Kafka常量配置類
* 定義Kafka主題名稱、消費者組等常量
*
* @author 果醬
*/
public class KafkaConstant {
/**
* 普通消息主題
*/
public static final String NORMAL_TOPIC = "normal_topic";
/**
* 分區(qū)消息主題
*/
public static final String PARTITION_TOPIC = "partition_topic";
/**
* 事務消息主題
*/
public static final String TRANSACTIONAL_TOPIC = "transactional_topic";
/**
* 死信主題
*/
public static final String DEAD_LETTER_TOPIC = "dead_letter_topic";
/**
* 普通消費者組
*/
public static final String NORMAL_CONSUMER_GROUP = "normal_consumer_group";
/**
* 分區(qū)消費者組
*/
public static final String PARTITION_CONSUMER_GROUP = "partition_consumer_group";
/**
* 事務消費者組
*/
public static final String TRANSACTIONAL_CONSUMER_GROUP = "transactional_consumer_group";
/**
* 死信消費者組
*/
public static final String DEAD_LETTER_CONSUMER_GROUP = "dead_letter_consumer_group";
/**
* 事務ID前綴
*/
public static final String TRANSACTION_ID_PREFIX = "tx-";
}
3.4 創(chuàng)建消息實體類
創(chuàng)建一個通用的消息實體類,用于封裝發(fā)送的消息內(nèi)容:
package com.jam.entity;
import lombok.Data;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 消息實體類
* 用于封裝發(fā)送到Kafka的消息內(nèi)容
*
* @author 果醬
*/
@Data
public class MessageEntity implements Serializable {
/**
* 消息ID
*/
private String messageId;
/**
* 消息內(nèi)容
*/
private String content;
/**
* 業(yè)務類型
*/
private String businessType;
/**
* 業(yè)務ID,用于分區(qū)策略
*/
private String businessId;
/**
* 創(chuàng)建時間
*/
private LocalDateTime createTime;
/**
* 擴展字段,用于存儲額外信息
*/
private String extra;
}
3.5 創(chuàng)建 Kafka 配置類
創(chuàng)建配置類,配置 Kafka 生產(chǎn)者、消費者、分區(qū)策略等:
package com.jam.config;
import com.jam.entity.MessageEntity;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.transaction.KafkaTransactionManager;
import java.util.HashMap;
import java.util.Map;
import static org.apache.kafka.clients.consumer.ConsumerConfig.*;
import static org.apache.kafka.clients.producer.ProducerConfig.*;
/**
* Kafka配置類
* 配置Kafka主題、生產(chǎn)者、消費者等
*
* @author 果醬
*/
@Configuration
public class KafkaConfig {
/**
* 創(chuàng)建普通消息主題
* 3個分區(qū),1個副本
*
* @return 普通消息主題
*/
@Bean
public NewTopic normalTopic() {
// 參數(shù):主題名稱、分區(qū)數(shù)、副本數(shù)
return new NewTopic(KafkaConstant.NORMAL_TOPIC, 3, (short) 1);
}
/**
* 創(chuàng)建分區(qū)消息主題
* 5個分區(qū),1個副本
*
* @return 分區(qū)消息主題
*/
@Bean
public NewTopic partitionTopic() {
return new NewTopic(KafkaConstant.PARTITION_TOPIC, 5, (short) 1);
}
/**
* 創(chuàng)建事務消息主題
* 3個分區(qū),1個副本
*
* @return 事務消息主題
*/
@Bean
public NewTopic transactionalTopic() {
return new NewTopic(KafkaConstant.TRANSACTIONAL_TOPIC, 3, (short) 1);
}
/**
* 創(chuàng)建死信主題
* 1個分區(qū),1個副本
*
* @return 死信主題
*/
@Bean
public NewTopic deadLetterTopic() {
return new NewTopic(KafkaConstant.DEAD_LETTER_TOPIC, 1, (short) 1);
}
/**
* 配置事務生產(chǎn)者工廠
*
* @return 事務生產(chǎn)者工廠
*/
@Bean
public ProducerFactory<String, MessageEntity> transactionalProducerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
configProps.put(KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer.class);
configProps.put(VALUE_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class);
configProps.put(ACKS_CONFIG, "all");
configProps.put(RETRIES_CONFIG, 3);
configProps.put(BATCH_SIZE_CONFIG, 16384);
configProps.put(BUFFER_MEMORY_CONFIG, 33554432);
// 配置事務ID前綴
configProps.put(TRANSACTIONAL_ID_CONFIG, KafkaConstant.TRANSACTION_ID_PREFIX);
DefaultKafkaProducerFactory<String, MessageEntity> factory =
new DefaultKafkaProducerFactory<>(configProps);
// 開啟事務支持
factory.transactionCapable();
return factory;
}
/**
* 配置事務Kafka模板
*
* @return 事務Kafka模板
*/
@Bean
public KafkaTemplate<String, MessageEntity> transactionalKafkaTemplate() {
return new KafkaTemplate<>(transactionalProducerFactory());
}
/**
* 配置Kafka事務管理器
*
* @return Kafka事務管理器
*/
@Bean
public KafkaTransactionManager<String, MessageEntity> kafkaTransactionManager() {
return new KafkaTransactionManager<>(transactionalProducerFactory());
}
}
3.6 創(chuàng)建分區(qū)策略類
創(chuàng)建自定義的分區(qū)策略,根據(jù)業(yè)務 ID 將消息發(fā)送到指定分區(qū):
package com.jam.config;
import com.jam.entity.MessageEntity;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
/**
* 自定義Kafka分區(qū)策略
* 根據(jù)業(yè)務ID將消息發(fā)送到指定分區(qū),確保相同業(yè)務ID的消息在同一分區(qū)
*
* @author 果醬
*/
public class BusinessIdPartitioner implements Partitioner {
/**
* 計算分區(qū)號
*
* @param topic 主題名稱
* @param key 消息鍵
* @param keyBytes 消息鍵的字節(jié)數(shù)組
* @param value 消息值
* @param valueBytes 消息值的字節(jié)數(shù)組
* @param cluster Kafka集群信息
* @return 分區(qū)號
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取主題的所有分區(qū)
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
// 如果消息值不是MessageEntity類型,拋出異常
if (!(value instanceof MessageEntity)) {
throw new InvalidRecordException("消息必須是MessageEntity類型");
}
MessageEntity message = (MessageEntity) value;
String businessId = message.getBusinessId();
// 如果業(yè)務ID為空,使用默認分區(qū)策略
if (StringUtils.isBlank(businessId)) {
if (keyBytes == null) {
// 使用隨機分區(qū)
return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
} else {
// 使用key計算分區(qū)
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
// 根據(jù)業(yè)務ID計算分區(qū),確保相同業(yè)務ID的消息在同一分區(qū)
return Math.abs(businessId.hashCode()) % numPartitions;
}
/**
* 關閉分區(qū)器
*/
@Override
public void close() {
// 關閉資源(如果有的話)
}
/**
* 配置分區(qū)器
*
* @param configs 配置參數(shù)
*/
@Override
public void configure(Map<String, ?> configs) {
// 讀取配置參數(shù)(如果有的話)
}
}
3.7 創(chuàng)建消息生產(chǎn)者服務
創(chuàng)建消息生產(chǎn)者服務,封裝發(fā)送消息的各種方法:
package com.jam.service;
import com.jam.config.KafkaConstant;
import com.jam.entity.MessageEntity;
import com.jam.entity.MessageTrace;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.UUID;
/**
* Kafka消息生產(chǎn)者服務
* 負責向Kafka發(fā)送各種類型的消息
*
* @author 果醬
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaProducerService {
/**
* Kafka模板類,提供發(fā)送消息的各種方法
*/
private final KafkaTemplate<String, MessageEntity> kafkaTemplate;
/**
* 事務Kafka模板類,用于發(fā)送事務消息
*/
private final KafkaTemplate<String, MessageEntity> transactionalKafkaTemplate;
/**
* 消息軌跡服務
*/
private final MessageTraceService messageTraceService;
/**
* 發(fā)送普通消息
*
* @param topic 主題名稱
* @param message 消息實體
*/
public void sendMessage(String topic, MessageEntity message) {
// 參數(shù)校驗
StringUtils.hasText(topic, "主題名稱不能為空");
Objects.requireNonNull(message, "消息實體不能為空");
// 確保消息ID和創(chuàng)建時間不為空
if (StringUtils.isBlank(message.getMessageId())) {
message.setMessageId(UUID.randomUUID().toString());
}
if (message.getCreateTime() == null) {
message.setCreateTime(LocalDateTime.now());
}
// 記錄消息發(fā)送前的軌跡
messageTraceService.recordBeforeSend(message, topic);
log.info("發(fā)送Kafka消息,主題:{},消息ID:{},業(yè)務類型:{}",
topic, message.getMessageId(), message.getBusinessType());
// 發(fā)送消息
ListenableFuture<SendResult<String, MessageEntity>> future =
kafkaTemplate.send(topic, message.getMessageId(), message);
// 處理發(fā)送結果
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, MessageEntity> result) {
log.info("Kafka消息發(fā)送成功,主題:{},消息ID:{},分區(qū):{},偏移量:{}",
topic, message.getMessageId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
// 記錄消息發(fā)送成功的軌跡
messageTraceService.recordSendSuccess(message.getMessageId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("Kafka消息發(fā)送失敗,主題:{},消息ID:{}",
topic, message.getMessageId(), ex);
// 記錄消息發(fā)送失敗的軌跡
messageTraceService.recordSendFailure(message.getMessageId(), ex.getMessage());
}
});
}
/**
* 發(fā)送分區(qū)消息
*
* @param message 消息實體
*/
public void sendPartitionMessage(MessageEntity message) {
// 參數(shù)校驗
Objects.requireNonNull(message, "消息實體不能為空");
StringUtils.hasText(message.getBusinessId(), "業(yè)務ID不能為空");
// 確保消息ID和創(chuàng)建時間不為空
if (StringUtils.isBlank(message.getMessageId())) {
message.setMessageId(UUID.randomUUID().toString());
}
if (message.getCreateTime() == null) {
message.setCreateTime(LocalDateTime.now());
}
String topic = KafkaConstant.PARTITION_TOPIC;
// 記錄消息發(fā)送前的軌跡
messageTraceService.recordBeforeSend(message, topic);
log.info("發(fā)送Kafka分區(qū)消息,主題:{},消息ID:{},業(yè)務ID:{},業(yè)務類型:{}",
topic, message.getMessageId(), message.getBusinessId(), message.getBusinessType());
// 發(fā)送消息,使用業(yè)務ID作為key,配合自定義分區(qū)策略
ListenableFuture<SendResult<String, MessageEntity>> future =
kafkaTemplate.send(topic, message.getBusinessId(), message);
// 處理發(fā)送結果
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult<String, MessageEntity> result) {
log.info("Kafka分區(qū)消息發(fā)送成功,主題:{},消息ID:{},業(yè)務ID:{},分區(qū):{},偏移量:{}",
topic, message.getMessageId(), message.getBusinessId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
// 記錄消息發(fā)送成功的軌跡
messageTraceService.recordSendSuccess(message.getMessageId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
log.error("Kafka分區(qū)消息發(fā)送失敗,主題:{},消息ID:{},業(yè)務ID:{}",
topic, message.getMessageId(), message.getBusinessId(), ex);
// 記錄消息發(fā)送失敗的軌跡
messageTraceService.recordSendFailure(message.getMessageId(), ex.getMessage());
}
});
}
/**
* 發(fā)送事務消息
*
* @param message 消息實體
*/
@Transactional(rollbackFor = Exception.class)
public void sendTransactionalMessage(MessageEntity message) {
// 參數(shù)校驗
Objects.requireNonNull(message, "消息實體不能為空");
// 確保消息ID和創(chuàng)建時間不為空
if (StringUtils.isBlank(message.getMessageId())) {
message.setMessageId(UUID.randomUUID().toString());
}
if (message.getCreateTime() == null) {
message.setCreateTime(LocalDateTime.now());
}
String topic = KafkaConstant.TRANSACTIONAL_TOPIC;
// 記錄消息發(fā)送前的軌跡
messageTraceService.recordBeforeSend(message, topic);
log.info("發(fā)送Kafka事務消息,主題:{},消息ID:{},業(yè)務類型:{}",
topic, message.getMessageId(), message.getBusinessType());
// 開始事務
transactionalKafkaTemplate.executeInTransaction(kafkaOperations -> {
// 發(fā)送消息
SendResult<String, MessageEntity> result = kafkaOperations.send(topic, message.getMessageId(), message).get();
log.info("Kafka事務消息發(fā)送成功,主題:{},消息ID:{},分區(qū):{},偏移量:{}",
topic, message.getMessageId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
// 記錄消息發(fā)送成功的軌跡
messageTraceService.recordSendSuccess(message.getMessageId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
// 這里可以添加數(shù)據(jù)庫操作等其他事務操作
return result;
});
}
/**
* 創(chuàng)建消息實體
*
* @param content 消息內(nèi)容
* @param businessType 業(yè)務類型
* @param businessId 業(yè)務ID
* @param extra 額外信息
* @return 消息實體
*/
public MessageEntity createMessageEntity(String content, String businessType, String businessId, String extra) {
MessageEntity message = new MessageEntity();
message.setMessageId(UUID.randomUUID().toString());
message.setContent(content);
message.setBusinessType(businessType);
message.setBusinessId(businessId);
message.setCreateTime(LocalDateTime.now());
message.setExtra(extra);
return message;
}
}
3.8 創(chuàng)建消息消費者服務
創(chuàng)建消息消費者服務,使用 @KafkaListener 注解消費消息:
package com.jam.service;
import com.jam.config.KafkaConstant;
import com.jam.entity.MessageEntity;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Objects;
/**
* Kafka消息消費者服務
* 負責從Kafka接收并處理消息
*
* @author 果醬
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class KafkaConsumerService {
/**
* 消息軌跡服務
*/
private final MessageTraceService messageTraceService;
/**
* 消費普通消息
*
* @param record 消息記錄
* @param acknowledgment 確認對象
* @param topic 主題名稱
* @param partition 分區(qū)號
* @param offset 偏移量
*/
@KafkaListener(topics = KafkaConstant.NORMAL_TOPIC, groupId = KafkaConstant.NORMAL_CONSUMER_GROUP)
public void consumeNormalMessage(ConsumerRecord<String, MessageEntity> record,
Acknowledgment acknowledgment,
@Header("kafka_receivedTopic") String topic,
@Header("kafka_receivedPartitionId") int partition,
@Header("kafka_offset") long offset) {
MessageEntity message = record.value();
Objects.requireNonNull(message, "消息內(nèi)容不能為空");
log.info("接收到普通消息,主題:{},分區(qū):{},偏移量:{},消息ID:{},業(yè)務類型:{}",
topic, partition, offset, message.getMessageId(), message.getBusinessType());
try {
// 處理消息的業(yè)務邏輯
processMessage(message);
// 記錄消費成功軌跡
messageTraceService.recordConsumeSuccess(message.getMessageId(), partition, offset);
// 手動確認消息
acknowledgment.acknowledge();
log.info("普通消息處理成功并確認,主題:{},消息ID:{}", topic, message.getMessageId());
} catch (Exception e) {
// 記錄消費失敗軌跡
messageTraceService.recordConsumeFailure(message.getMessageId(), partition, offset, e.getMessage());
log.error("普通消息處理失敗,主題:{},消息ID:{}", topic, message.getMessageId(), e);
// 手動確認消息(將失敗消息標記為已消費,避免無限重試)
// 如果需要將消息發(fā)送到死信隊列,可以不確認并配置死信轉發(fā)
acknowledgment.acknowledge();
}
}
/**
* 消費分區(qū)消息
*
* @param record 消息記錄
* @param acknowledgment 確認對象
* @param topic 主題名稱
* @param partition 分區(qū)號
* @param offset 偏移量
*/
@KafkaListener(topics = KafkaConstant.PARTITION_TOPIC, groupId = KafkaConstant.PARTITION_CONSUMER_GROUP)
public void consumePartitionMessage(ConsumerRecord<String, MessageEntity> record,
Acknowledgment acknowledgment,
@Header("kafka_receivedTopic") String topic,
@Header("kafka_receivedPartitionId") int partition,
@Header("kafka_offset") long offset) {
MessageEntity message = record.value();
Objects.requireNonNull(message, "消息內(nèi)容不能為空");
log.info("接收到分區(qū)消息,主題:{},分區(qū):{},偏移量:{},消息ID:{},業(yè)務ID:{},業(yè)務類型:{}",
topic, partition, offset, message.getMessageId(), message.getBusinessId(), message.getBusinessType());
try {
// 處理消息的業(yè)務邏輯
processMessage(message);
// 記錄消費成功軌跡
messageTraceService.recordConsumeSuccess(message.getMessageId(), partition, offset);
// 手動確認消息
acknowledgment.acknowledge();
log.info("分區(qū)消息處理成功并確認,主題:{},消息ID:{}", topic, message.getMessageId());
} catch (Exception e) {
// 記錄消費失敗軌跡
messageTraceService.recordConsumeFailure(message.getMessageId(), partition, offset, e.getMessage());
log.error("分區(qū)消息處理失敗,主題:{},消息ID:{}", topic, message.getMessageId(), e);
acknowledgment.acknowledge();
}
}
/**
* 消費事務消息
*
* @param record 消息記錄
* @param acknowledgment 確認對象
* @param topic 主題名稱
* @param partition 分區(qū)號
* @param offset 偏移量
*/
@Transactional(rollbackFor = Exception.class)
@KafkaListener(topics = KafkaConstant.TRANSACTIONAL_TOPIC, groupId = KafkaConstant.TRANSACTIONAL_CONSUMER_GROUP)
public void consumeTransactionalMessage(ConsumerRecord<String, MessageEntity> record,
Acknowledgment acknowledgment,
@Header("kafka_receivedTopic") String topic,
@Header("kafka_receivedPartitionId") int partition,
@Header("kafka_offset") long offset) {
MessageEntity message = record.value();
Objects.requireNonNull(message, "消息內(nèi)容不能為空");
log.info("接收到事務消息,主題:{},分區(qū):{},偏移量:{},消息ID:{},業(yè)務類型:{}",
topic, partition, offset, message.getMessageId(), message.getBusinessType());
try {
// 處理消息的業(yè)務邏輯
processMessage(message);
// 這里可以添加數(shù)據(jù)庫操作等其他事務操作
// 記錄消費成功軌跡
messageTraceService.recordConsumeSuccess(message.getMessageId(), partition, offset);
// 手動確認消息
acknowledgment.acknowledge();
log.info("事務消息處理成功并確認,主題:{},消息ID:{}", topic, message.getMessageId());
} catch (Exception e) {
// 記錄消費失敗軌跡
messageTraceService.recordConsumeFailure(message.getMessageId(), partition, offset, e.getMessage());
log.error("事務消息處理失敗,主題:{},消息ID:{}", topic, message.getMessageId(), e);
// 事務會回滾,消息不會被確認,將被重新消費
}
}
/**
* 消費死信消息
*
* @param record 消息記錄
* @param acknowledgment 確認對象
* @param topic 主題名稱
* @param partition 分區(qū)號
* @param offset 偏移量
*/
@KafkaListener(topics = KafkaConstant.DEAD_LETTER_TOPIC, groupId = KafkaConstant.DEAD_LETTER_CONSUMER_GROUP)
public void consumeDeadLetterMessage(ConsumerRecord<String, MessageEntity> record,
Acknowledgment acknowledgment,
@Header("kafka_receivedTopic") String topic,
@Header("kafka_receivedPartitionId") int partition,
@Header("kafka_offset") long offset) {
MessageEntity message = record.value();
Objects.requireNonNull(message, "消息內(nèi)容不能為空");
log.error("接收到死信消息,主題:{},分區(qū):{},偏移量:{},消息ID:{},業(yè)務類型:{}",
topic, partition, offset, message.getMessageId(), message.getBusinessType());
try {
// 處理死信消息的業(yè)務邏輯,通常需要人工干預
processDeadLetterMessage(message);
// 記錄消費成功軌跡
messageTraceService.recordConsumeSuccess(message.getMessageId(), partition, offset);
// 手動確認消息
acknowledgment.acknowledge();
log.info("死信消息處理成功并確認,主題:{},消息ID:{}", topic, message.getMessageId());
} catch (Exception e) {
// 記錄消費失敗軌跡
messageTraceService.recordConsumeFailure(message.getMessageId(), partition, offset, e.getMessage());
log.error("死信消息處理失敗,主題:{},消息ID:{}", topic, message.getMessageId(), e);
acknowledgment.acknowledge();
}
}
/**
* 處理消息的業(yè)務邏輯
*
* @param message 要處理的消息
*/
private void processMessage(MessageEntity message) {
// 根據(jù)業(yè)務類型處理不同的消息
String businessType = message.getBusinessType();
if ("ORDER_CREATE".equals(businessType)) {
// 處理訂單創(chuàng)建消息
processOrderCreateMessage(message);
} else if ("USER_REGISTER".equals(businessType)) {
// 處理用戶注冊消息
processUserRegisterMessage(message);
} else {
// 處理未知類型消息
log.warn("收到未知類型的消息,消息ID:{},業(yè)務類型:{}",
message.getMessageId(), businessType);
}
}
/**
* 處理死信消息
*
* @param message 死信消息
*/
private void processDeadLetterMessage(MessageEntity message) {
log.info("處理死信消息,消息ID:{},內(nèi)容:{}",
message.getMessageId(), message.getContent());
// 實際業(yè)務處理邏輯,如記錄到數(shù)據(jù)庫等待人工處理
}
/**
* 處理訂單創(chuàng)建消息
*
* @param message 訂單創(chuàng)建消息
*/
private void processOrderCreateMessage(MessageEntity message) {
log.info("處理訂單創(chuàng)建消息,消息ID:{},訂單信息:{}",
message.getMessageId(), message.getContent());
// 實際業(yè)務處理邏輯...
}
/**
* 處理用戶注冊消息
*
* @param message 用戶注冊消息
*/
private void processUserRegisterMessage(MessageEntity message) {
log.info("處理用戶注冊消息,消息ID:{},用戶信息:{}",
message.getMessageId(), message.getContent());
// 實際業(yè)務處理邏輯...
}
}
3.9 創(chuàng)建消息軌跡服務
為了跟蹤消息的整個生命周期,創(chuàng)建消息軌跡服務:
package com.jam.service;
import com.jam.entity.MessageEntity;
import com.jam.entity.MessageTrace;
import com.jam.mapper.MessageTraceMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* 消息軌跡服務
* 記錄消息的發(fā)送和消費軌跡
*
* @author 果醬
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class MessageTraceService {
private final MessageTraceMapper messageTraceMapper;
/**
* 記錄消息發(fā)送前的軌跡
*
* @param message 消息實體
* @param topic 主題
* @return 消息軌跡ID
*/
@Transactional(rollbackFor = Exception.class)
public Long recordBeforeSend(MessageEntity message, String topic) {
Objects.requireNonNull(message, "消息實體不能為空");
StringUtils.hasText(message.getMessageId(), "消息ID不能為空");
StringUtils.hasText(topic, "主題不能為空");
MessageTrace trace = new MessageTrace();
trace.setMessageId(message.getMessageId());
trace.setTopic(topic);
trace.setBusinessType(message.getBusinessType());
trace.setBusinessId(message.getBusinessId());
trace.setContent(message.getContent());
trace.setSendStatus(0); // 待發(fā)送
trace.setCreateTime(LocalDateTime.now());
trace.setUpdateTime(LocalDateTime.now());
messageTraceMapper.insert(trace);
log.info("記錄消息發(fā)送前軌跡,消息ID:{},軌跡ID:{}", message.getMessageId(), trace.getId());
return trace.getId();
}
/**
* 記錄消息發(fā)送成功的軌跡
*
* @param messageId 消息ID
* @param partition 分區(qū)
* @param offset 偏移量
*/
@Transactional(rollbackFor = Exception.class)
public void recordSendSuccess(String messageId, int partition, long offset) {
StringUtils.hasText(messageId, "消息ID不能為空");
MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
if (trace == null) {
log.warn("未找到消息軌跡,消息ID:{}", messageId);
return;
}
trace.setSendTime(LocalDateTime.now());
trace.setSendStatus(1); // 發(fā)送成功
trace.setPartition(partition);
trace.setOffset(offset);
trace.setUpdateTime(LocalDateTime.now());
messageTraceMapper.updateById(trace);
log.info("記錄消息發(fā)送成功軌跡,消息ID:{}", messageId);
}
/**
* 記錄消息發(fā)送失敗的軌跡
*
* @param messageId 消息ID
* @param errorMsg 錯誤信息
*/
@Transactional(rollbackFor = Exception.class)
public void recordSendFailure(String messageId, String errorMsg) {
StringUtils.hasText(messageId, "消息ID不能為空");
StringUtils.hasText(errorMsg, "錯誤信息不能為空");
MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
if (trace == null) {
log.warn("未找到消息軌跡,消息ID:{}", messageId);
return;
}
trace.setSendTime(LocalDateTime.now());
trace.setSendStatus(2); // 發(fā)送失敗
trace.setSendErrorMsg(errorMsg);
trace.setUpdateTime(LocalDateTime.now());
messageTraceMapper.updateById(trace);
log.info("記錄消息發(fā)送失敗軌跡,消息ID:{}", messageId);
}
/**
* 記錄消息消費成功的軌跡
*
* @param messageId 消息ID
* @param partition 分區(qū)
* @param offset 偏移量
*/
@Transactional(rollbackFor = Exception.class)
public void recordConsumeSuccess(String messageId, int partition, long offset) {
StringUtils.hasText(messageId, "消息ID不能為空");
MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
if (trace == null) {
log.warn("未找到消息軌跡,消息ID:{}", messageId);
return;
}
trace.setConsumeTime(LocalDateTime.now());
trace.setConsumeStatus(1); // 消費成功
trace.setConsumePartition(partition);
trace.setConsumeOffset(offset);
trace.setUpdateTime(LocalDateTime.now());
messageTraceMapper.updateById(trace);
log.info("記錄消息消費成功軌跡,消息ID:{}", messageId);
}
/**
* 記錄消息消費失敗的軌跡
*
* @param messageId 消息ID
* @param partition 分區(qū)
* @param offset 偏移量
* @param errorMsg 錯誤信息
*/
@Transactional(rollbackFor = Exception.class)
public void recordConsumeFailure(String messageId, int partition, long offset, String errorMsg) {
StringUtils.hasText(messageId, "消息ID不能為空");
StringUtils.hasText(errorMsg, "錯誤信息不能為空");
MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
if (trace == null) {
log.warn("未找到消息軌跡,消息ID:{}", messageId);
return;
}
trace.setConsumeTime(LocalDateTime.now());
trace.setConsumeStatus(2); // 消費失敗
trace.setConsumePartition(partition);
trace.setConsumeOffset(offset);
trace.setConsumeErrorMsg(errorMsg);
trace.setUpdateTime(LocalDateTime.now());
messageTraceMapper.updateById(trace);
log.info("記錄消息消費失敗軌跡,消息ID:{}", messageId);
}
}
3.10 創(chuàng)建控制器
創(chuàng)建一個控制器,用于測試消息發(fā)送功能:
package com.jam.controller;
import com.jam.config.KafkaConstant;
import com.jam.entity.MessageEntity;
import com.jam.service.KafkaProducerService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
/**
* Kafka消息測試控制器
* 提供API接口用于測試Kafka消息發(fā)送功能
*
* @author 果醬
*/
@Slf4j
@RestController
@RequestMapping("/api/kafka")
@RequiredArgsConstructor
@Tag(name = "Kafka消息測試接口", description = "用于測試Kafka消息發(fā)送的API接口")
public class KafkaMessageController {
/**
* Kafka消息生產(chǎn)者服務
*/
private final KafkaProducerService kafkaProducerService;
/**
* 發(fā)送普通消息
*
* @param content 消息內(nèi)容
* @param businessType 業(yè)務類型
* @param businessId 業(yè)務ID
* @param extra 額外信息
* @return 響應信息
*/
@PostMapping("/normal")
@Operation(summary = "發(fā)送普通消息", description = "發(fā)送到普通主題的消息")
public ResponseEntity<String> sendNormalMessage(
@Parameter(description = "消息內(nèi)容", required = true)
@RequestParam String content,
@Parameter(description = "業(yè)務類型")
@RequestParam(required = false) String businessType,
@Parameter(description = "業(yè)務ID")
@RequestParam(required = false) String businessId,
@Parameter(description = "額外信息")
@RequestParam(required = false) String extra) {
log.info("接收到發(fā)送普通消息請求");
MessageEntity message = kafkaProducerService.createMessageEntity(content, businessType, businessId, extra);
kafkaProducerService.sendMessage(KafkaConstant.NORMAL_TOPIC, message);
return ResponseEntity.ok("普通消息發(fā)送成功,消息ID:" + message.getMessageId());
}
/**
* 發(fā)送分區(qū)消息
*
* @param content 消息內(nèi)容
* @param businessType 業(yè)務類型
* @param businessId 業(yè)務ID(用于分區(qū))
* @param extra 額外信息
* @return 響應信息
*/
@PostMapping("/partition")
@Operation(summary = "發(fā)送分區(qū)消息", description = "發(fā)送到分區(qū)主題的消息,相同業(yè)務ID的消息會被發(fā)送到同一分區(qū)")
public ResponseEntity<String> sendPartitionMessage(
@Parameter(description = "消息內(nèi)容", required = true)
@RequestParam String content,
@Parameter(description = "業(yè)務類型")
@RequestParam(required = false) String businessType,
@Parameter(description = "業(yè)務ID(用于分區(qū))", required = true)
@RequestParam String businessId,
@Parameter(description = "額外信息")
@RequestParam(required = false) String extra) {
log.info("接收到發(fā)送分區(qū)消息請求,業(yè)務ID:{}", businessId);
MessageEntity message = kafkaProducerService.createMessageEntity(content, businessType, businessId, extra);
kafkaProducerService.sendPartitionMessage(message);
return ResponseEntity.ok("分區(qū)消息發(fā)送成功,消息ID:" + message.getMessageId());
}
/**
* 發(fā)送事務消息
*
* @param content 消息內(nèi)容
* @param businessType 業(yè)務類型
* @param businessId 業(yè)務ID
* @param extra 額外信息
* @return 響應信息
*/
@PostMapping("/transactional")
@Operation(summary = "發(fā)送事務消息", description = "發(fā)送到事務主題的消息,支持事務特性")
public ResponseEntity<String> sendTransactionalMessage(
@Parameter(description = "消息內(nèi)容", required = true)
@RequestParam String content,
@Parameter(description = "業(yè)務類型")
@RequestParam(required = false) String businessType,
@Parameter(description = "業(yè)務ID")
@RequestParam(required = false) String businessId,
@Parameter(description = "額外信息")
@RequestParam(required = false) String extra) {
log.info("接收到發(fā)送事務消息請求");
MessageEntity message = kafkaProducerService.createMessageEntity(content, businessType, businessId, extra);
kafkaProducerService.sendTransactionalMessage(message);
return ResponseEntity.ok("事務消息發(fā)送成功,消息ID:" + message.getMessageId());
}
}
3.11 創(chuàng)建啟動類
package com.jam;
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* SpringBoot應用啟動類
*
* @author 果醬
*/
@SpringBootApplication
@MapperScan("com.jam.mapper")
@OpenAPIDefinition(
info = @Info(
title = "SpringBoot集成Kafka示例項目",
version = "1.0",
description = "SpringBoot集成Kafka的示例項目,包含各種消息發(fā)送和消費的示例"
)
)
public class SpringbootKafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootKafkaDemoApplication.class, args);
}
}
3.12 創(chuàng)建消息軌跡相關實體和數(shù)據(jù)庫表
消息軌跡實體類:
package com.jam.entity;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.time.LocalDateTime;
/**
* 消息軌跡實體類
* 記錄Kafka消息的發(fā)送和消費情況
*
* @author 果醬
*/
@Data
@TableName("t_message_trace")
public class MessageTrace {
/**
* 主鍵ID
*/
@TableId(type = IdType.AUTO)
private Long id;
/**
* 消息ID
*/
private String messageId;
/**
* 主題
*/
private String topic;
/**
* 分區(qū)
*/
private Integer partition;
/**
* 偏移量
*/
private Long offset;
/**
* 業(yè)務類型
*/
private String businessType;
/**
* 業(yè)務ID
*/
private String businessId;
/**
* 消息內(nèi)容
*/
private String content;
/**
* 發(fā)送時間
*/
private LocalDateTime sendTime;
/**
* 發(fā)送狀態(tài):0-待發(fā)送,1-發(fā)送成功,2-發(fā)送失敗
*/
private Integer sendStatus;
/**
* 發(fā)送錯誤信息
*/
private String sendErrorMsg;
/**
* 消費時間
*/
private LocalDateTime consumeTime;
/**
* 消費分區(qū)
*/
private Integer consumePartition;
/**
* 消費偏移量
*/
private Long consumeOffset;
/**
* 消費狀態(tài):0-待消費,1-消費成功,2-消費失敗
*/
private Integer consumeStatus;
/**
* 消費錯誤信息
*/
private String consumeErrorMsg;
/**
* 創(chuàng)建時間
*/
private LocalDateTime createTime;
/**
* 更新時間
*/
private LocalDateTime updateTime;
}
消息軌跡 Mapper 接口:
package com.jam.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.jam.entity.MessageTrace;
import org.apache.ibatis.annotations.Param;
/**
* 消息軌跡Mapper
*
* @author 果醬
*/
public interface MessageTraceMapper extends BaseMapper<MessageTrace> {
/**
* 根據(jù)消息ID查詢消息軌跡
*
* @param messageId 消息ID
* @return 消息軌跡信息
*/
MessageTrace selectByMessageId(@Param("messageId") String messageId);
}
消息軌跡 Mapper XML 文件(resources/mapper/MessageTraceMapper.xml):
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.jam.mapper.MessageTraceMapper">
<select id="selectByMessageId" parameterType="java.lang.String" resultType="com.jam.entity.MessageTrace">
SELECT * FROM t_message_trace WHERE message_id = #{messageId}
</select>
</mapper>
創(chuàng)建消息軌跡表的 SQL:
CREATE TABLE `t_message_trace` ( `id` bigint NOT NULL AUTO_INCREMENT COMMENT '主鍵ID', `message_id` varchar(64) NOT NULL COMMENT '消息ID', `topic` varchar(128) NOT NULL COMMENT '主題', `partition` int DEFAULT NULL COMMENT '分區(qū)', `offset` bigint DEFAULT NULL COMMENT '偏移量', `business_type` varchar(64) DEFAULT NULL COMMENT '業(yè)務類型', `business_id` varchar(64) DEFAULT NULL COMMENT '業(yè)務ID', `content` text COMMENT '消息內(nèi)容', `send_time` datetime DEFAULT NULL COMMENT '發(fā)送時間', `send_status` tinyint DEFAULT NULL COMMENT '發(fā)送狀態(tài):0-待發(fā)送,1-發(fā)送成功,2-發(fā)送失敗', `send_error_msg` text COMMENT '發(fā)送錯誤信息', `consume_time` datetime DEFAULT NULL COMMENT '消費時間', `consume_partition` int DEFAULT NULL COMMENT '消費分區(qū)', `consume_offset` bigint DEFAULT NULL COMMENT '消費偏移量', `consume_status` tinyint DEFAULT NULL COMMENT '消費狀態(tài):0-待消費,1-消費成功,2-消費失敗', `consume_error_msg` text COMMENT '消費錯誤信息', `create_time` datetime NOT NULL COMMENT '創(chuàng)建時間', `update_time` datetime NOT NULL COMMENT '更新時間', PRIMARY KEY (`id`), UNIQUE KEY `uk_message_id` (`message_id`), KEY `idx_topic` (`topic`), KEY `idx_business_type` (`business_type`), KEY `idx_business_id` (`business_id`), KEY `idx_send_status` (`send_status`), KEY `idx_consume_status` (`consume_status`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='Kafka消息軌跡表';
3.13 測試消息發(fā)送與消費
啟動應用程序后,可以通過以下方式測試消息發(fā)送與消費:
- 使用 Swagger UI 測試:訪問http://localhost:8081/swagger-ui.html,通過界面調用消息發(fā)送接口
- 使用 curl 命令測試:
# 發(fā)送普通消息 curl -X POST "http://localhost:8081/api/kafka/normal?content=Hello Kafka&businessType=TEST" # 發(fā)送分區(qū)消息 curl -X POST "http://localhost:8081/api/kafka/partition?content=Hello Partition&businessType=TEST&businessId=BUS123456" # 發(fā)送事務消息 curl -X POST "http://localhost:8081/api/kafka/transactional?content=Hello Transaction&businessType=TEST"
發(fā)送消息后,可以在控制臺看到生產(chǎn)者和消費者的日志輸出,證明消息已經(jīng)成功發(fā)送和消費。
四、Kafka 高級特性
4.1 消息確認機制
Kafka 提供了靈活的消息確認機制,確保消息的可靠傳遞。
生產(chǎn)者確認機制:
通過 acks 參數(shù)控制生產(chǎn)者需要等待的確認數(shù)量:- acks=0:生產(chǎn)者不等待任何確認,直接發(fā)送下一條消息
- acks=1:生產(chǎn)者等待 leader 分區(qū)確認收到消息
- acks=all:生產(chǎn)者等待所有同步副本確認收到消息
消費者確認機制:
通過 ack-mode 參數(shù)控制消費者何時確認消息:- auto:自動確認,消費者收到消息后立即確認
- manual:手動確認,消費者處理完消息后調用 acknowledge () 方法確認
- manual_immediate:手動確認,確認后立即提交偏移量
消息確認流程:

根據(jù) Kafka 官方文檔(Apache Kafka),對于需要高可靠性的場景,推薦使用 acks=all 和 manual 確認模式。
4.2 事務消息
Kafka 從 0.11 版本開始支持事務消息,確保消息的原子性:要么所有消息都被成功發(fā)送,要么都失敗。
事務消息的工作流程:

在前面的代碼中,我們已經(jīng)實現(xiàn)了事務消息的發(fā)送:
- 配置了事務生產(chǎn)者工廠和事務 Kafka 模板
- 使用 @Transactional 注解或 executeInTransaction 方法開啟事務
- 在事務中可以混合發(fā)送消息和數(shù)據(jù)庫操作等
4.3 死信隊列
死信隊列(Dead Letter Queue)用于存儲無法被正常消費的消息。在 Kafka 中,可以通過以下方式實現(xiàn)死信隊列:
- 配置死信主題和死信消費者
- 在消費失敗時,手動將消息發(fā)送到死信主題
- 死信消費者專門處理死信消息
死信隊列的工作流程:

實現(xiàn)死信消息轉發(fā)的代碼示例:
/**
* 轉發(fā)消息到死信隊列
*
* @param message 消息實體
* @param topic 原主題
* @param partition 原分區(qū)
* @param offset 原偏移量
* @param errorMsg 錯誤信息
*/
public void forwardToDeadLetterQueue(MessageEntity message, String topic, int partition, long offset, String errorMsg) {
Objects.requireNonNull(message, "消息實體不能為空");
StringUtils.hasText(topic, "主題不能為空");
StringUtils.hasText(errorMsg, "錯誤信息不能為空");
log.warn("將消息轉發(fā)到死信隊列,原主題:{},消息ID:{},錯誤信息:{}",
topic, message.getMessageId(), errorMsg);
// 創(chuàng)建死信消息,添加原消息的元數(shù)據(jù)
MessageEntity deadLetterMessage = new MessageEntity();
deadLetterMessage.setMessageId(UUID.randomUUID().toString());
deadLetterMessage.setContent(JSON.toJSONString(message));
deadLetterMessage.setBusinessType("DEAD_LETTER");
deadLetterMessage.setBusinessId(message.getMessageId());
deadLetterMessage.setCreateTime(LocalDateTime.now());
deadLetterMessage.setExtra(String.format("原主題:%s,原分區(qū):%d,原偏移量:%d,錯誤信息:%s",
topic, partition, offset, errorMsg));
// 發(fā)送到死信主題
kafkaTemplate.send(KafkaConstant.DEAD_LETTER_TOPIC, deadLetterMessage.getMessageId(), deadLetterMessage);
}
4.4 消息冪等性
在分布式系統(tǒng)中,消息重復消費是不可避免的問題,因此需要保證消息消費的冪等性。常用的實現(xiàn)方式有:
- 基于數(shù)據(jù)庫唯一索引:
/**
* 處理消息(冪等性保證)
*
* @param message 消息實體
*/
@Transactional(rollbackFor = Exception.class)
public void processMessageWithIdempotency(MessageEntity message) {
String messageId = message.getMessageId();
String businessType = message.getBusinessType();
// 檢查消息是否已經(jīng)處理過
MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
if (trace != null && trace.getConsumeStatus() == 1) {
log.info("消息已經(jīng)處理過,消息ID:{}", messageId);
return;
}
// 根據(jù)業(yè)務類型處理不同的消息
if ("ORDER_CREATE".equals(businessType)) {
// 處理訂單創(chuàng)建消息,使用訂單號作為唯一鍵
String orderNo = message.getExtra();
// 檢查訂單是否已經(jīng)處理
Order order = orderMapper.selectByOrderNo(orderNo);
if (order != null) {
log.info("訂單已經(jīng)處理過,訂單號:{}", orderNo);
return;
}
// 處理訂單業(yè)務邏輯
// ...
} else if ("USER_REGISTER".equals(businessType)) {
// 處理用戶注冊消息,使用用戶ID作為唯一鍵
// ...
}
}
- 基于 Redis 的分布式鎖:
/**
* 使用Redis分布式鎖保證冪等性
*
* @param message 消息實體
*/
public void processMessageWithRedisLock(MessageEntity message) {
String messageId = message.getMessageId();
String lockKey = "kafka:message:process:" + messageId;
// 獲取分布式鎖,設置5分鐘過期時間
Boolean locked = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 5, TimeUnit.MINUTES);
if (Boolean.TRUE.equals(locked)) {
try {
// 檢查消息是否已經(jīng)處理過
MessageTrace trace = messageTraceMapper.selectByMessageId(messageId);
if (trace != null && trace.getConsumeStatus() == 1) {
log.info("消息已經(jīng)處理過,消息ID:{}", messageId);
return;
}
// 處理消息業(yè)務邏輯
processMessage(message);
} finally {
// 釋放鎖
redisTemplate.delete(lockKey);
}
} else {
log.info("消息正在處理中,消息ID:{}", messageId);
}
}
五、Kafka 性能調優(yōu)
為了讓 Kafka 在生產(chǎn)環(huán)境中發(fā)揮最佳性能,我們需要進行合理的調優(yōu)。以下是一些關鍵的調優(yōu)方向:
5.1 服務器調優(yōu)
JVM 參數(shù)調優(yōu):
根據(jù)服務器內(nèi)存大小合理配置 JVM 參數(shù)# 在kafka-server-start.sh中設置 export KAFKA_HEAP_OPTS="-Xms8g -Xmx8g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M"
操作系統(tǒng)調優(yōu):
- 增加文件描述符限制
# 在/etc/security/limits.conf中添加 * soft nofile 1000000 * hard nofile 1000000
- 調整網(wǎng)絡參數(shù)
# 在/etc/sysctl.conf中添加 net.core.rmem_default=134217728 net.core.rmem_max=134217728 net.core.wmem_default=134217728 net.core.wmem_max=134217728 net.ipv4.tcp_wmem=134217728 134217728 134217728 net.ipv4.tcp_rmem=134217728 134217728 134217728 net.ipv4.tcp_max_syn_backlog=8192 net.core.netdev_max_backlog=16384
Kafka 配置調優(yōu):
# server.properties # 日志刷新策略 log.flush.interval.messages=10000 log.flush.interval.ms=1000 # 日志保留策略 log.retention.hours=72 log.retention.bytes=107374182400 # 分區(qū)大小限制 log.segment.bytes=1073741824 # I/O線程數(shù) num.io.threads=8 # 網(wǎng)絡線程數(shù) num.network.threads=3 # 分區(qū)副本同步線程數(shù) num.replica.fetchers=2 # 副本滯后閾值 replica.lag.time.max.ms=30000
5.2 生產(chǎn)者調優(yōu)
批量發(fā)送:
配置合理的批次大小和 linger.ms 參數(shù),實現(xiàn)批量發(fā)送spring: kafka: producer: # 批次大小,當批次滿了之后才會發(fā)送 batch-size: 16384 # linger.ms參數(shù),即使批次未滿,達到該時間也會發(fā)送 properties: linger.ms: 5壓縮消息:
啟用消息壓縮,減少網(wǎng)絡傳輸和存儲開銷spring: kafka: producer: # 啟用消息壓縮,可選值:none, gzip, snappy, lz4, zstd properties: compression.type: lz4異步發(fā)送:
使用異步發(fā)送提高吞吐量,避免阻塞主線程自定義分區(qū)策略:
根據(jù)業(yè)務特點實現(xiàn)自定義分區(qū)策略,均衡分區(qū)負載
5.3 消費者調優(yōu)
消費線程池配置:
根據(jù)分區(qū)數(shù)量配置合理的消費者線程數(shù)spring: kafka: listener: # 并發(fā)消費者數(shù)量,建議等于分區(qū)數(shù)量 concurrency: 3 # 每次拉取的記錄數(shù) consumer: max-poll-records: 500批量消費:
開啟批量消費提高消費效率spring: kafka: listener: # 開啟批量消費 batch-listener: true consumer: # 批量消費需要設置為false enable-auto-commit: false # 每次拉取的最大記錄數(shù) properties: max.poll.records: 500批量消費代碼示例:
/** * 批量消費消息 */ @KafkaListener(topics = KafkaConstant.NORMAL_TOPIC, groupId = KafkaConstant.NORMAL_CONSUMER_GROUP) public void batchConsume(List<ConsumerRecord<String, MessageEntity>> records, Acknowledgment acknowledgment) { log.info("接收到批量消息,數(shù)量:{}", records.size()); for (ConsumerRecord<String, MessageEntity> record : records) { MessageEntity message = record.value(); if (message == null) { continue; } try { log.info("處理批量消息,主題:{},分區(qū):{},偏移量:{},消息ID:{}", record.topic(), record.partition(), record.offset(), message.getMessageId()); // 處理消息的業(yè)務邏輯 processMessage(message); // 記錄消費成功軌跡 messageTraceService.recordConsumeSuccess(message.getMessageId(), record.partition(), record.offset()); } catch (Exception e) { // 記錄消費失敗軌跡 messageTraceService.recordConsumeFailure(message.getMessageId(), record.partition(), record.offset(), e.getMessage()); log.error("批量消息處理失敗,消息ID:{}", message.getMessageId(), e); // 轉發(fā)到死信隊列 forwardToDeadLetterQueue(message, record.topic(), record.partition(), record.offset(), e.getMessage()); } } // 手動確認所有消息 acknowledgment.acknowledge(); log.info("批量消息處理完成,數(shù)量:{}", records.size()); }異步處理:
消費者接收到消息后,將消息放入線程池異步處理,快速確認消息,提高消費效率
5.4 主題和分區(qū)調優(yōu)
合理設置分區(qū)數(shù)量:
分區(qū)數(shù)量是影響 Kafka 吞吐量的關鍵因素,一般建議:- 每個主題的分區(qū)數(shù)量 = 預期吞吐量 / 單分區(qū)吞吐量
- 單分區(qū)吞吐量:生產(chǎn)者約 500-1000 條 / 秒,消費者約 1000-2000 條 / 秒
合理設置副本數(shù)量:
- 副本數(shù)量越多,可靠性越高,但會降低吞吐量
- 生產(chǎn)環(huán)境建議設置為 2-3 個副本
清理策略:
根據(jù)業(yè)務需求設置合理的日志清理策略:- 按時間清理:log.retention.hours
- 按大小清理:log.retention.bytes
六、常見問題與解決方案
6.1 消息丟失問題
消息丟失可能發(fā)生在三個階段:生產(chǎn)階段、存儲階段和消費階段。
生產(chǎn)階段丟失:
- 解決方案:設置 acks=all,確保所有副本都收到消息
spring: kafka: producer: acks: all retries: 3存儲階段丟失:
- 解決方案:設置合理的副本數(shù)量和同步策略
# server.properties # 最小同步副本數(shù),應小于等于副本數(shù) min.insync.replicas=2
消費階段丟失:
- 解決方案:使用手動確認模式,確保消息處理完成后再確認
spring: kafka: listener: ack-mode: manual_immediate
6.2 消息積壓問題
消息積壓通常是因為消費速度跟不上生產(chǎn)速度,解決方案如下:
優(yōu)化消費邏輯:
- 減少單次消息處理時間
- 異步處理非關鍵流程
增加消費者數(shù)量:
- 水平擴展消費者實例
- 確保消費者數(shù)量不超過分區(qū)數(shù)量
臨時擴容:
- 對于突發(fā)流量,可以臨時啟動更多的消費者實例
消息遷移:
- 創(chuàng)建新的主題和消費者組,將積壓的消息遷移到新主題
/** * 遷移積壓消息 */ @Scheduled(fixedRate = 60000) public void migrateBacklogMessages() { String sourceTopic = "source_topic"; String targetTopic = "backlog_topic"; String consumerGroup = "backlog_migrate_group"; log.info("開始遷移積壓消息,源主題:{},目標主題:{}", sourceTopic, targetTopic); // 創(chuàng)建臨時消費者 DefaultKafkaConsumerFactory<String, MessageEntity> consumerFactory = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()); try (KafkaConsumer<String, MessageEntity> consumer = (KafkaConsumer<String, MessageEntity>) consumerFactory.createConsumer( consumerGroup, new DefaultPrincipal("migrate-service"))) { // 訂閱源主題 consumer.subscribe(Collections.singleton(sourceTopic)); // 從最早的偏移量開始消費 consumer.seekToBeginning(consumer.assignment()); while (true) { ConsumerRecords<String, MessageEntity> records = consumer.poll(Duration.ofMillis(1000)); if (records.isEmpty()) { break; } // 批量發(fā)送到目標主題 List<ProducerRecord<String, MessageEntity>> producerRecords = new ArrayList<>(); for (ConsumerRecord<String, MessageEntity> record : records) { producerRecords.add(new ProducerRecord<>( targetTopic, record.key(), record.value())); } // 批量發(fā)送 kafkaTemplate.send(producerRecords); log.info("已遷移消息:{}條", producerRecords.size()); // 手動提交偏移量 consumer.commitSync(); // 控制遷移速度,避免影響正常業(yè)務 Thread.sleep(100); } } catch (Exception e) { log.error("遷移積壓消息失敗", e); } log.info("積壓消息遷移完成"); }監(jiān)控告警:
- 配置消息積壓監(jiān)控和告警,及時發(fā)現(xiàn)問題
/** * 消息積壓監(jiān)控 */ @Scheduled(fixedRate = 60000) // 每分鐘檢查一次 public void monitorMessageBacklog() { // 監(jiān)控的主題和消費者組 Map<String, String> monitorTopics = new HashMap<>(); monitorTopics.put(KafkaConstant.NORMAL_TOPIC, KafkaConstant.NORMAL_CONSUMER_GROUP); monitorTopics.put(KafkaConstant.PARTITION_TOPIC, KafkaConstant.PARTITION_CONSUMER_GROUP); // 獲取KafkaAdminClient try (AdminClient adminClient = AdminClient.create(kafkaProperties.buildAdminProperties())) { for (Map.Entry<String, String> entry : monitorTopics.entrySet()) { String topic = entry.getKey(); String consumerGroup = entry.getValue(); // 獲取消費者組的偏移量 Map<TopicPartition, OffsetAndMetadata> committedOffsets = adminClient.listConsumerGroupOffsets(consumerGroup).partitionsToOffsetAndMetadata().get(); // 獲取主題的最新偏移量 Map<TopicPartition, Long> endOffsets = adminClient.listOffsets(committedOffsets.keySet()).all().get(); // 計算每個分區(qū)的積壓數(shù)量 for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : committedOffsets.entrySet()) { TopicPartition topicPartition = offsetEntry.getKey(); long consumerOffset = offsetEntry.getValue().offset(); long endOffset = endOffsets.get(topicPartition); long backlog = endOffset - consumerOffset; log.info("主題:{},分區(qū):{},積壓消息數(shù):{}", topic, topicPartition.partition(), backlog); // 如果積壓數(shù)量超過閾值,發(fā)送告警 if (backlog > 10000) { log.warn("主題消息積壓嚴重,主題:{},分區(qū):{},積壓消息數(shù):{}", topic, topicPartition.partition(), backlog); // 發(fā)送告警通知(郵件、短信等) alertService.sendAlert("Kafka消息積壓告警", String.format("主題:%s,分區(qū):%d,積壓消息數(shù):%d", topic, topicPartition.partition(), backlog)); } } } } catch (Exception e) { log.error("消息積壓監(jiān)控失敗", e); } }
6.3 消息順序性問題
Kafka 中,單個分區(qū)的消息是有序的,但跨分區(qū)的消息無法保證順序。確保消息順序性的解決方案如下:
單分區(qū):
- 所有消息都發(fā)送到同一個分區(qū),保證全局有序
- 缺點:無法利用多分區(qū)的并行處理能力,吞吐量受限
按業(yè)務 ID 分區(qū):
- 相同業(yè)務 ID 的消息發(fā)送到同一個分區(qū),保證局部有序
- 優(yōu)點:兼顧順序性和吞吐量
// 如前面實現(xiàn)的BusinessIdPartitioner
使用狀態(tài)機:
- 對于需要全局有序的場景,可以在消費端實現(xiàn)狀態(tài)機,處理亂序消息
七、總結
本文詳細介紹了 SpringBoot 集成 Kafka 的全過程,從基礎概念到高級特性,從代碼實現(xiàn)到性能調優(yōu),涵蓋了實際開發(fā)中可能遇到的各種場景。
Kafka 作為一款高性能的分布式消息系統(tǒng),在大數(shù)據(jù)領域和實時流處理場景中有著廣泛的應用。合理使用 Kafka 可以幫助我們構建高吞吐、高可靠的分布式系統(tǒng)。
八、參考
- Kafka 核心概念與架構:參考 Kafka 官方文檔(Apache Kafka)
- SpringBoot 集成 Kafka:參考 Spring Kafka 官方文檔(Overview :: Spring Kafka)
- 消息確認機制:參考 Kafka 官方文檔的 "Producer Configs" 和 "Consumer Configs" 章節(jié)
- 事務消息:參考 Kafka 官方文檔的 "Transactions" 章節(jié)(Apache Kafka)
- 性能調優(yōu)參數(shù):參考 Kafka 官方文檔的 "Performance Tuning" 章節(jié)(Apache Kafka)
- 消息冪等性解決方案:參考 Spring 官方博客和《Kafka 權威指南》一書
- 消息丟失與積壓解決方案:參考 Kafka 官方文檔和 Confluent 博客(Confluent Blog | Tutorials, Tips, and News Updates)
到此這篇關于SpringBoot與Kafka整合方案的文章就介紹到這了,更多相關SpringBoot 整合 Kafka 內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
java高并發(fā)下CopyOnWriteArrayList替代ArrayList
這篇文章主要為大家介紹了java高并發(fā)下CopyOnWriteArrayList替代ArrayList的使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-12-12
Java Class.forName()用法和newInstance()方法原理解析
這篇文章主要介紹了Java Class.forName()用法和newInstance()方法原理解析,文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友可以參考下2020-08-08
SpringBoot統(tǒng)一功能處理實現(xiàn)的全過程
最近在做項目時需要對異常進行全局統(tǒng)一處理,主要是一些分類入庫以及記錄日志等,下面這篇文章主要給大家介紹了關于SpringBoot統(tǒng)一功能處理實現(xiàn)的相關資料,文中通過圖文以及實例代碼介紹的非常詳細,需要的朋友可以參考下2023-01-01
Java 數(shù)據(jù)結構與算法系列精講之時間復雜度與空間復雜度
對于一個算法,其時間復雜度和空間復雜度往往是相互影響的,當追求一個較好的時間復雜度時,可能會使空間復雜度的性能變差,即可能導致占用較多的存儲空間,這篇文章主要給大家介紹了關于Java時間復雜度、空間復雜度的相關資料,需要的朋友可以參考下2022-02-02

