Spring Boot集成Kafka的示例代碼
本文介紹了Spring Boot集成Kafka的示例代碼,分享給大家,也給自己留個(gè)筆記
系統(tǒng)環(huán)境
使用遠(yuǎn)程服務(wù)器上搭建的kafka服務(wù)
- Ubuntu 16.04 LTS
- kafka_2.12-0.11.0.0.tgz
- zookeeper-3.5.2-alpha.tar.gz
集成過程
1.創(chuàng)建spring boot工程,添加相關(guān)依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.laravelshao.springboot</groupId>
<artifactId>spring-boot-integration-kafka</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>spring-boot-integration-kafka</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2.添加配置信息,這里使用yml文件
spring:
kafka:
bootstrap-servers:X.X.X.X:9092
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: test
auto-offset-reset: earliest
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: com.laravelshao.springboot.kafka
3.創(chuàng)建消息對象
public class Message {
private Integer id;
private String msg;
public Message() {
}
public Message(Integer id, String msg) {
this.id = id;
this.msg = msg;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", msg='" + msg + '\'' +
'}';
}
}
4.創(chuàng)建生產(chǎn)者
package com.laravelshao.springboot.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
/**
* Created by shaoqinghua on 2018/3/23.
*/
@Component
public class Producer {
private static Logger log = LoggerFactory.getLogger(Producer.class);
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String topic, Message message) {
kafkaTemplate.send(topic, message);
log.info("Producer->topic:{}, message:{}", topic, message);
}
}
5.創(chuàng)建消費(fèi)者,使用@ KafkaListener注解監(jiān)聽主題
package com.laravelshao.springboot.kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* Created by shaoqinghua on 2018/3/23.
*/
@Component
public class Consumer {
private static Logger log = LoggerFactory.getLogger(Consumer.class);
@KafkaListener(topics = "test_topic")
public void receive(ConsumerRecord<String, Message> consumerRecord) {
log.info("Consumer->topic:{}, value:{}", consumerRecord.topic(), consumerRecord.value());
}
}
6.發(fā)送消費(fèi)測試
package com.laravelshao.springboot;
import com.laravelshao.springboot.kafka.Message;
import com.laravelshao.springboot.kafka.Producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
@SpringBootApplication
public class IntegrationKafkaApplication {
public static void main(String[] args) throws InterruptedException {
ApplicationContext context = SpringApplication.run(IntegrationKafkaApplication.class, args);
Producer producer = context.getBean(Producer.class);
for (int i = 1; i < 10; i++) {
producer.send("test_topic", new Message(i, "test topic message " + i));
Thread.sleep(2000);
}
}
}
可以依次看到發(fā)送消息,消費(fèi)消息

異常問題
反序列化異常(自定義的消息對象不在kafka信任的包路徑下)?
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.719 Container exception
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition test_topic-0 at offset 9. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.laravelshao.springboot.kafka.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139)
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:191)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923)
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1100)
at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1200(Fetcher.java:949)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:570)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:531)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1103)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:667)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
解決方法:將當(dāng)前包添加到kafka信任的包路徑下
spring:
kafka:
consumer:
properties:
spring:
json:
trusted:
packages: com.laravelshao.springboot.kafka
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
詳解spring batch的使用和定時(shí)器Quart的使用
spring Batch是一個(gè)基于Spring的企業(yè)級批處理框架,它通過配合定時(shí)器Quartz來輕易實(shí)現(xiàn)大批量的數(shù)據(jù)讀取或插入,并且全程自動化,無需人員管理2017-08-08
基于javaWeb 項(xiàng)目SSM配置要點(diǎn)及可能遇到的問題和解決方法
下面小編就為大家?guī)硪黄趈avaWeb 項(xiàng)目SSM配置要點(diǎn)及可能遇到的問題和解決方法。小編覺得挺不錯的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-10-10
聊聊@RequestMapping和@GetMapping @PostMapping的區(qū)別
這篇文章主要介紹了@RequestMapping和@GetMapping及@PostMapping的區(qū)別,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2021-08-08
MyBatis-Plus如何通過注解使用TypeHandler
這篇文章主要介紹了MyBatis-Plus如何通過注解使用TypeHandler,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-01-01
SpringBoot整合Ureport2報(bào)表及常見使用方法
這篇文章主要介紹了SpringBoot整合Ureport2報(bào)表及常見使用方法,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2021-01-01
Java?遠(yuǎn)程調(diào)用失敗重試的操作方法
這篇文章主要介紹了Java?遠(yuǎn)程調(diào)用失敗重試的操作方法,今天給大家介紹了一下?Spring??的?@Retryable?注解使用,并通過幾個(gè) demo 來帶大家編寫了自己重試攔截器以及回滾方法,需要的朋友可以參考下2022-09-09

