SpringBoot集成Kafka2.4.0的全過程
1、前提條件
前提是需要安裝Kafka及能正常啟動(dòng),正常啟動(dòng)后記錄下Kafka的ip及端口號,例如我的:127.0.0.1:9092。
由于用的Kafka還是需要zookeeper的,因此之前也要安裝啟動(dòng)好zookeeper,zookeeper的安裝啟動(dòng)見這篇文章:
2、啟動(dòng)Kafka
2.1 windows啟動(dòng)
1、到Kafka安裝目錄下的bin目錄下的windows下:

2、執(zhí)行命令:kafka-server-start.bat -daemon D:\java\kafka_2.11-2.4.0\config\server.properties
2.2 linux啟動(dòng)
3、引入pom
這里由于我系統(tǒng)安裝的是2.4.0版本的Kafka,因此pom引入最好用相同版本的,避免出現(xiàn)其他問題。還有一點(diǎn)要注意的是,如果最新的spring boot引入這個(gè)版本zookeeper可能有問題,因此需要注意。我用的是2.3.7.RELEASE版本的spring boot。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>4、編寫配置文件
這里一些相應(yīng)的配置后續(xù)可以專門更新一篇文章,目前的目的是先跑起來體驗(yàn)一下。搭建起來。
spring:
application:
name: xuydkafka
kafka:
bootstrap-servers: 127.0.0.1:9092 #kafka地址
producer: # 生產(chǎn)者
retries: 3 # 設(shè)置大于0的值,則客戶端會(huì)將發(fā)送失敗的記錄重新發(fā)送
batch-size: 16384
buffer-memory: 33554432
acks: 1
# 指定消息key和消息體的編解碼方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: default-group
enable-auto-commit: false
auto-offset-reset: earliest # 從消息頭開始poll()數(shù)據(jù)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 500 # 一次poll最大的消息數(shù)量
listener:
# 當(dāng)每一條記錄被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交
# RECORD
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后提交
# BATCH
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后,距離上次提交的時(shí)間大于time時(shí)提交
# TIME
# 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后,被處理的record數(shù)量大于等于count時(shí)提交
# COUNT
# TIME | COUNT 有?個(gè)條件滿?時(shí)提交
# COUNT_TIME
# 當(dāng)每?批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽器(ListenerConsumer)處理之后, ?動(dòng)調(diào)#?Acknowledgment.acknowledge()后提交
# MANUAL
# ?動(dòng)調(diào)?Acknowledgment.acknowledge()后?即提交,?般使?這種
# MANUAL_IMMEDIATE
ack-mode: MANUAL_IMMEDIATE
redis:
host: 127.0.0.1
server:
port: 90005、編寫消息生產(chǎn)者代碼
package com.xuyd.kafka.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/msg")
public class KafkaController {
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@RequestMapping("/send")
public String sendMessage(){
kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a message");
return "send success";
}
}6、編寫消息消費(fèi)者相關(guān)代碼
需要注意的是,一般生產(chǎn)者和消費(fèi)者不會(huì)在同一個(gè)項(xiàng)目。這里只是體驗(yàn),所以臨時(shí)這么處理。
package com.xuyd.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer {
@KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup1")
public void listenGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
// 由于配置的提交方式是MANUAL_IMMEDIATE 因此這里需要手動(dòng)提交代碼
ack.acknowledge();
}
}7、啟動(dòng)并發(fā)送消息
這里啟動(dòng)時(shí)候可能會(huì)遇到各種問題,99%是因?yàn)閟pringcloud或者springboot與kafka的版本不匹配造成的,大家先臨時(shí)委曲求全一下,用我的版本跑起來再去完善吧~~
消息生產(chǎn)者發(fā)生消息。

消費(fèi)者收到消息打?。?/p>

到此這篇關(guān)于SpringBoot集成Kafka2.4.0的全過程的文章就介紹到這了,更多相關(guān)SpringBoot集成Kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
- SpringBoot集成Kafka并使用多個(gè)死信隊(duì)列詳解
- Java中Springboot集成Kafka實(shí)現(xiàn)消息發(fā)送和接收功能
- SpringBoot集成Kafka的實(shí)現(xiàn)示例
- 如何使用SpringBoot集成Kafka實(shí)現(xiàn)用戶數(shù)據(jù)變更后發(fā)送消息
- SpringBoot如何集成Kafka低版本和高版本
- SpringBoot3集成Kafka的方法詳解
- SpringBoot集成Kafka 配置工具類的詳細(xì)代碼
- Springboot集成Kafka進(jìn)行批量消費(fèi)及踩坑點(diǎn)
- SpringBoot集成kafka全面實(shí)戰(zhàn)記錄
- SpringBoot集成Kafka的步驟
- Springboot集成Kafka實(shí)現(xiàn)producer和consumer的示例代碼
相關(guān)文章
MySQL啟動(dòng)方式之systemctl與mysqld的對比詳解
MySQL 是當(dāng)今最流行的開源關(guān)系型數(shù)據(jù)庫之一,其性能、可靠性和易用性讓它廣泛應(yīng)用于各種場景,如何正確啟動(dòng) MySQL 服務(wù)可能并不是一件簡單的事情,本文將聚焦兩種常用的 MySQL 啟動(dòng)方式:通過 systemctl 啟動(dòng)和直接使用 mysqld 啟動(dòng),需要的朋友可以參考下2024-11-11
用SQL語句解決mysql導(dǎo)入大數(shù)據(jù)文件的問題
今天的這篇文章用來討論如何解決導(dǎo)入mysql大數(shù)據(jù)文件的問題,其實(shí)說的簡單了就是一條SQL語句,而如果你是一名SQL高手,那完全可以略過此文。2010-08-08
一文帶你學(xué)會(huì)MySQL的select語句
在MySQL中可以使用SELECT語句來查詢數(shù)據(jù),查詢數(shù)據(jù)是指從數(shù)據(jù)庫中根據(jù)需求,使用不同的查詢方式來獲取不同的數(shù)據(jù),是使用頻率最高、最重要的操作,下面這篇文章主要給大家介紹了關(guān)于MySQL中select語句的相關(guān)資料,需要的朋友可以參考下2022-11-11
MySQL實(shí)戰(zhàn)之Insert語句的使用心得
這篇文章主要給大家介紹了關(guān)于MySQL實(shí)戰(zhàn)之Insert語句的使用心得的相關(guān)資料,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-10-10
mysql 讓一個(gè)存儲(chǔ)過程定時(shí)作業(yè)的代碼
以下例子主要是實(shí)現(xiàn)簡單的mysq 定時(shí)作業(yè),需要的朋友可以參考下。2011-05-05
Mysql BinLog存儲(chǔ)機(jī)制與數(shù)據(jù)恢復(fù)方式
這篇文章主要介紹了Mysql BinLog存儲(chǔ)機(jī)制與數(shù)據(jù)恢復(fù)方式,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-06-06

