Springboot 2.x集成kafka 2.2.0的示例代碼
引言
kafka近幾年更新非常快,也可以看出kafka在企業(yè)中是用的頻率越來越高,在springboot中集成kafka還是比較簡單的,但是應(yīng)該注意使用的版本和kafka中基本配置,這個地方需要信心,防止進入坑中。
版本對應(yīng)地址:https://spring.io/projects/spring-kafka
基本環(huán)境
springboot版本2.1.4
kafka版本2.2.0
jdk 1.8
代碼編寫
1、基本引用pom
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>kafkademo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.0.RELEASE</version> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>2.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2、基本配置
spring.kafka.bootstrap-servers=2.1.1.1:9092 spring.kafka.consumer.group-id=test-consumer-group spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #logging.level.root=debug
3、實體類
package com.example.demo.model;
import java.util.Date;
public class Messages {
private Long id;
private String msg;
private Date sendTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public Date getSendTime() {
return sendTime;
}
public void setSendTime(Date sendTime) {
this.sendTime = sendTime;
}
}4、生產(chǎn)者端
package com.example.demo.service;
import com.example.demo.model.Messages;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import java.util.Date;
import java.util.UUID;
@Service
public class KafkaSender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private Gson gson = new GsonBuilder().create();
public void send() {
Messages message = new Messages();
message.setId(System.currentTimeMillis());
message.setMsg("123");
message.setSendTime(new Date());
ListenableFuture<SendResult<String, String>> test0 = kafkaTemplate.send("newtopic", gson.toJson(message));
}
}5、消費者
package com.example.demo.service;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.util.Optional;
@Service
public class KafkaReceiver {
@KafkaListener(topics = {"newtopic"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
System.out.println("record =" + record);
System.out.println("message =" + message);
}
}
}6、測試
在啟動方法中模擬消息生產(chǎn)者,向kafka中發(fā)送消息
package com.example.demo;
import com.example.demo.service.KafkaSender;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
@SpringBootApplication
public class KafkademoApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(KafkademoApplication.class, args);
KafkaSender sender = context.getBean(KafkaSender.class);
for (int i = 0; i <1000; i++) {
sender.send();
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}效果展示

命令行直接消費消息

遇到的問題
生產(chǎn)端連接kafka超時
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:119)
解決方案:
修改kafka中的server.properties中的下面配置,將原來的默認配置替換成下面ip+端口的形式,重啟kafka

到此這篇關(guān)于Springboot 2.x集成kafka 2.2.0的示例代碼的文章就介紹到這了,更多相關(guān)Springboot集成kafka內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java超詳細教你寫一個網(wǎng)絡(luò)購書系統(tǒng)案例
這篇文章主要介紹了怎么用Java來寫一個購書系統(tǒng),購買書籍主要需要每本書的編號、書名、單價、庫存屬性,能夠讓客戶通過編號來選書,感興趣的朋友跟隨文章往下看看吧2022-03-03
java中元素排序Comparable和Comparator的區(qū)別
本文主要介紹了java中元素排序Comparable和Comparator的區(qū)別,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12
MyBatis-Plus攔截器對敏感數(shù)據(jù)實現(xiàn)加密
做課程項目petstore時遇到需要加密屬性的問題,而MyBatis-Plus為開發(fā)者提供了攔截器的相關(guān)接口,本文主要介紹通過MyBatis-Plus的攔截器接口自定義一個攔截器類實現(xiàn)敏感數(shù)據(jù)如用戶密碼的加密功能,感興趣的可以了解一下2021-11-11

