spring boot 使用 Kafka的場(chǎng)景分析
一、Kafka作為消息隊(duì)列的好處

- 高吞吐量:Kafka能夠處理大規(guī)模的數(shù)據(jù)流,并支持高吞吐量的消息傳輸。
- 持久性:Kafka將消息持久化到磁盤上,保證了消息不會(huì)因?yàn)橄到y(tǒng)故障而丟失。
- 分布式:Kafka是一個(gè)分布式系統(tǒng),可以在多個(gè)節(jié)點(diǎn)上運(yùn)行,具有良好的可擴(kuò)展性和容錯(cuò)性。
- 支持多種協(xié)議:Kafka支持多種協(xié)議,如TCP、HTTP、UDP等,可以與不同的系統(tǒng)進(jìn)行集成。
- 靈活的消費(fèi)模式:Kafka支持多種消費(fèi)模式,如拉取和推送,可以根據(jù)需要選擇合適的消費(fèi)模式。
- 可配置性強(qiáng):Kafka的配置參數(shù)非常豐富,可以根據(jù)需要進(jìn)行靈活配置。
- 社區(qū)支持:Kafka作為Apache旗下的開源項(xiàng)目,擁有龐大的用戶基礎(chǔ)和活躍的社區(qū)支持,方便用戶得到及時(shí)的技術(shù)支持。
二、springboot中使用Kafka
- 添加依賴:在pom.xml文件中添加Kafka的依賴,包括spring-kafka和kafka-clients。確保版本與你的項(xiàng)目兼容。
- 創(chuàng)建生產(chǎn)者:創(chuàng)建一個(gè)Kafka生產(chǎn)者類,實(shí)現(xiàn)Producer接口,并使用KafkaTemplate發(fā)送消息。
- 配置生產(chǎn)者:在Spring Boot的配置文件中配置Kafka生產(chǎn)者的相關(guān)參數(shù),例如bootstrap服務(wù)器地址、Kafka主題等。
- 發(fā)送消息:在需要發(fā)送消息的地方,注入Kafka生產(chǎn)者,并使用其發(fā)送消息到指定的Kafka主題。
- 創(chuàng)建消費(fèi)者:創(chuàng)建一個(gè)Kafka消費(fèi)者類,實(shí)現(xiàn)Consumer接口,并使用KafkaTemplate訂閱指定的Kafka主題。
- 配置消費(fèi)者:在Spring Boot的配置文件中配置Kafka消費(fèi)者的相關(guān)參數(shù),例如group id、auto offset reset等。
- 接收消息:在需要接收消息的地方,注入Kafka消費(fèi)者,并使用其接收消息。
- 處理消息:對(duì)接收到的消息進(jìn)行處理,例如保存到數(shù)據(jù)庫或進(jìn)行其他業(yè)務(wù)邏輯處理。
三、使用Kafka
pom中填了依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>- 創(chuàng)建生產(chǎn)者:創(chuàng)建一個(gè)Kafka生產(chǎn)者類,實(shí)現(xiàn)Producer接口,并使用KafkaTemplate發(fā)送消息。
import org.apache.kafka.clients.producer.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
@Value("${kafka.bootstrap}")
private String bootstrapServers;
@Value("${kafka.topic}")
private String topic;
private KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
Producer<String, String> producer = new KafkaProducer<>(bootstrapServers, new StringSerializer(), new StringSerializer());
try {
producer.send(new ProducerRecord<>(topic, message));
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}- 配置生產(chǎn)者:在Spring Boot的配置文件中配置Kafka生產(chǎn)者的相關(guān)參數(shù),例如bootstrap服務(wù)器地址、Kafka主題等。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.ConsumerConfig;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.context.annotation.PropertySource;
import java.util.*;
import org.springframework.beans.factory.*;
import org.springframework.*;
import org.springframework.*;expression.*;value; @Value("${kafka}") Properties kafkaProps = new Properties(); @Bean public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf){ KafkaTemplate<String, String> template = new KafkaTemplate<>(pf); template .setMessageConverter(new StringJsonMessageConverter()); template .setSendTimeout(Duration .ofSeconds(30)); return template ; } @Bean public ProducerFactory<String, String> producerFactory(){ DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(kafkaProps); factory .setBootstrapServers(bootstrapServers); factory .setKeySerializer(new StringSerializer()); factory .setValueSerializer(new StringSerializer()); return factory ; } @Bean public ConsumerFactory<String, String> consumerFactory(){ DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(consumerConfigProps); factory .setBootstrapServers(bootstrapServers); factory .setKeyDeserializer(new StringDeserializer()); factory .setValueDeserializer(new StringDeserializer()); return factory ; } @Bean public ConcurrentMessageListenerContainer<String, String> container(ConsumerFactory<String, String> consumerFactory, MessageListener listener){ ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(consumerFactory); container .setMessageListener(listener); container .setConcurrency(3); return container ; } @Bean public MessageListener消費(fèi)者
import org.apache.kafka.clients.consumer.*;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@Value("${kafka.bootstrap}")
private String bootstrapServers;
@Value("${kafka.group}")
private String groupId;
@Value("${kafka.topic}")
private String topic;
private KafkaTemplate<String, String> kafkaTemplate;
public KafkaConsumer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void consume() {
Consumer<String, String> consumer = new KafkaConsumer<>(consumerConfigs());
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
private Properties consumerConfigs() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
}四、kafka與rocketMQ比較
Kafka和RocketMQ都是開源的消息隊(duì)列系統(tǒng),它們具有許多相似之處,但在一些關(guān)鍵方面也存在差異。以下是它們?cè)跀?shù)據(jù)可靠性、性能、消息傳遞方式等方面的比較:
- 數(shù)據(jù)可靠性:
- Kafka使用異步刷盤方式,而RocketMQ支持異步實(shí)時(shí)刷盤、同步刷盤、同步復(fù)制和異步復(fù)制。這使得RocketMQ在單機(jī)可靠性上比Kafka更高,因?yàn)樗粫?huì)因?yàn)椴僮飨到y(tǒng)崩潰而導(dǎo)致數(shù)據(jù)丟失。此外,RocketMQ新增的同步刷盤機(jī)制也進(jìn)一步保證了數(shù)據(jù)的可靠性。
- 性能:
- Kafka和RocketMQ在性能方面各有千秋。由于Kafka的數(shù)據(jù)以partition為單位,一個(gè)Kafka實(shí)例上可能有多達(dá)上百個(gè)partition,而一個(gè)RocketMQ實(shí)例上只有一個(gè)partition。這使得RocketMQ可以充分利用IO組的commit機(jī)制,批量傳輸數(shù)據(jù),從而在replication時(shí)具有更好的性能。然而,Kafka的異步replication性能理論上低于RocketMQ的replication,因?yàn)橥絩eplication與異步replication相比,性能上會(huì)有約20%-30%的損耗。
- 消息傳遞方式:
- Kafka和RocketMQ在消息傳遞方式上也有所不同。Kafka采用Producer發(fā)送消息后,broker馬上把消息投遞給consumer,這種方式實(shí)時(shí)性較高,但會(huì)增加broker的負(fù)載。而RocketMQ基于Pull模式和Push模式的長(zhǎng)輪詢機(jī)制,來平衡Push和Pull模式各自的優(yōu)缺點(diǎn)。RocketMQ的消息及時(shí)性較好,嚴(yán)格的消息順序得到了保證。
- 其他特性:
- Kafka在單機(jī)支持的隊(duì)列數(shù)超過64個(gè)隊(duì)列,而RocketMQ最高支持5萬個(gè)隊(duì)列。隊(duì)列越多,可以支持的業(yè)務(wù)就越多。
五、kafka使用場(chǎng)景
- 實(shí)時(shí)數(shù)據(jù)流處理:Kafka可以處理大量的實(shí)時(shí)數(shù)據(jù)流,這些數(shù)據(jù)流可以來自不同的源,如用戶行為、傳感器數(shù)據(jù)、日志文件等。通過Kafka,可以將這些數(shù)據(jù)流進(jìn)行實(shí)時(shí)的處理和分析,例如進(jìn)行實(shí)時(shí)數(shù)據(jù)分析和告警。
- 消息隊(duì)列:Kafka可以作為一個(gè)消息隊(duì)列使用,用于在分布式系統(tǒng)中傳遞消息。它能夠處理高吞吐量的消息,并保證消息的有序性和可靠性。
- 事件驅(qū)動(dòng)架構(gòu):Kafka可以作為事件驅(qū)動(dòng)架構(gòu)的核心組件,將事件數(shù)據(jù)發(fā)布到不同的消費(fèi)者,以便進(jìn)行實(shí)時(shí)處理。這種架構(gòu)可以簡(jiǎn)化應(yīng)用程序的設(shè)計(jì)和開發(fā),提高系統(tǒng)的可擴(kuò)展性和靈活性。
- 數(shù)據(jù)管道:Kafka可以用于數(shù)據(jù)管道,將數(shù)據(jù)從一個(gè)系統(tǒng)傳輸?shù)搅硪粋€(gè)系統(tǒng)。例如,可以將數(shù)據(jù)從數(shù)據(jù)庫或日志文件傳輸?shù)酱髷?shù)據(jù)平臺(tái)或數(shù)據(jù)倉庫。
- 業(yè)務(wù)事件通知:Kafka可以用于通知業(yè)務(wù)事件,例如訂單狀態(tài)變化、庫存更新等。通過訂閱Kafka主題,相關(guān)的應(yīng)用程序和服務(wù)可以實(shí)時(shí)地接收到這些事件通知,并進(jìn)行相應(yīng)的處理。
- 流數(shù)據(jù)處理框架集成:Kafka可以與流處理框架集成,如Apache Flink、Apache Spark等。通過集成,可以將流數(shù)據(jù)從Kafka中實(shí)時(shí)導(dǎo)入到流處理框架中進(jìn)行處理,實(shí)現(xiàn)流式計(jì)算和實(shí)時(shí)分析。
到此這篇關(guān)于spring boot 使用 Kafka的場(chǎng)景分析的文章就介紹到這了,更多相關(guān)spring boot 使用 Kafka內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
java super關(guān)鍵字知識(shí)點(diǎn)詳解
在本篇文章里小編給大家整理的是一篇關(guān)于java super關(guān)鍵字知識(shí)點(diǎn)詳解內(nèi)容,有興趣的朋友們可以參考下。2021-01-01
Spring將MultipartFile轉(zhuǎn)存到本地磁盤的三種方式
在Java中處理文件向來是一種不是很方便的操作,然后隨著Spring框架的崛起,使用Spring框架中的MultipartFile來處理文件也是件很方便的事了,今天就給大家介紹Spring將MultipartFile轉(zhuǎn)存到本地磁盤的方式,需要的朋友可以參考下2024-10-10
Java實(shí)現(xiàn)更新順序表中的指定元素的示例
本文主要介紹了Java實(shí)現(xiàn)更新順序表中的指定元素的示例,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-06-06
SpringBoot結(jié)合mockito測(cè)試實(shí)戰(zhàn)
與集成測(cè)試將系統(tǒng)作為一個(gè)整體測(cè)試不同,單元測(cè)試更應(yīng)該專注于某個(gè)類。所以當(dāng)被測(cè)試類與外部類有依賴的時(shí)候,尤其是與數(shù)據(jù)庫相關(guān)的這種費(fèi)時(shí)且有狀態(tài)的類,很難做單元測(cè)試。但好在可以通過“Mockito”這種仿真框架來模擬這些比較費(fèi)時(shí)的類,從而專注于測(cè)試某個(gè)類內(nèi)部的邏輯2022-11-11
詳解maven的setting配置文件中mirror和repository的區(qū)別
這篇文章主要介紹了詳解maven的setting配置文件中mirror和repository的區(qū)別,小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2018-12-12
SpringBoot將多個(gè)文件夾進(jìn)行壓縮的兩種方法(瀏覽器下載和另存為)
Spring Boot項(xiàng)目通常不會(huì)自動(dòng)對(duì)文件夾進(jìn)行壓縮,不過,在打包應(yīng)用時(shí),如果你使用了Maven或Gradle這樣的構(gòu)建工具,并且配置了相應(yīng)的插件,可以在打成jar或war包的時(shí)候?qū)⒁蕾嚨膸煳募喜⒉嚎s,本文介紹了SpringBoot將多個(gè)文件夾進(jìn)行壓縮的兩種方法2024-07-07
關(guān)于Arrays.sort()使用的注意事項(xiàng)
這篇文章主要介紹了關(guān)于Arrays.sort()使用的注意事項(xiàng),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2022-05-05
本地安裝MinIO分布式對(duì)象存儲(chǔ)服務(wù)器的詳細(xì)步驟
本地安裝MinIO非常簡(jiǎn)單,MinIO提供了獨(dú)立的二進(jìn)制文件,無需額外的依賴,本文介紹如何在本地安裝MinIO分布式對(duì)象存儲(chǔ)服務(wù)器,感興趣的朋友一起看看吧2024-01-01
Spring AI TikaDocumentReader詳解
TikaDocumentReader是SpringAI中用于從多種格式文檔中提取文本內(nèi)容的組件,支持PDF、DOC/DOCX、PPT/PPTX和HTML等格式,它在構(gòu)建知識(shí)庫、文檔處理和數(shù)據(jù)清洗等任務(wù)中非常有用2025-01-01

