被kafka-client和springkafka版本坑到自閉及解決
被kafka-client和springkafka版本坑
上周剛剛歡天喜地的在linux上部了kafka,這周打算用spring-boot框架寫個簡單demo跑一下,結(jié)果悲劇就此展開。
首先建立maven工程:pom中添加spring boot kafka依賴:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ? ?xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> ? ?<modelVersion>4.0.0</modelVersion> ? ?<parent> ? ? ? <groupId>org.springframework.boot</groupId> ? ? ? <artifactId>spring-boot-starter-parent</artifactId> ? ? ? <version>2.1.5.RELEASE</version> ? ? ? <relativePath/> <!-- lookup parent from repository --> ? ?</parent> ? ?<groupId>com.example</groupId> ? ?<artifactId>kafkaproducer</artifactId> ? ?<version>0.0.1-SNAPSHOT</version> ? ?<name>kafkaproducer</name> ? ?<description>Demo project for Spring Boot</description> ? ? ?<properties> ? ? ? <java.version>1.8</java.version> ? ?</properties> ? ? ?<dependencies> ? ? ? <dependency> ? ? ? ? ?<groupId>org.springframework.boot</groupId> ? ? ? ? ?<artifactId>spring-boot-starter-web</artifactId> ? ? ? </dependency> ? ? ? ? <dependency> ? ? ? ? ?<groupId>org.projectlombok</groupId> ? ? ? ? ?<artifactId>lombok</artifactId> ? ? ? ? ?<optional>true</optional> ? ? ? </dependency> ? ? ? <dependency> ? ? ? ? ?<groupId>org.springframework.boot</groupId> ? ? ? ? ?<artifactId>spring-boot-starter-test</artifactId> ? ? ? ? ?<scope>test</scope> ? ? ? </dependency> ? ? ? <dependency> ? ? ? ? ?<groupId>org.springframework.kafka</groupId> ? ? ? ? ?<artifactId>spring-kafka</artifactId> ? ? ? </dependency> ? ?</dependencies> ? ? ?<build> ? ? ? <plugins> ? ? ? ? ?<plugin> ? ? ? ? ? ? <groupId>org.springframework.boot</groupId> ? ? ? ? ? ? <artifactId>spring-boot-maven-plugin</artifactId> ? ? ? ? ?</plugin> ? ? ? </plugins> ? ?</build> ? ? </project>
配置文件如下:
server.port=8089 spring.kafka.bootstrap-servers=ip:port spring.kafka.producer.retries= 0 spring.kafka.producer.batch-size=16384 spring.kafka.producer.buffer-memory=33554432 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.linger.ms=1
然后新建一個Producer類
package com.example.kafkaproducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
?
@Component
public class KafkaProducer {
? ? @Autowired
? ? KafkaTemplate kafkaTemplate;
? ? public void produce(){
? ? ? ? kafkaTemplate.send("test","hello word");
? ? ? ? System.out.println("發(fā)送消息");
? ? }
}在test類中調(diào)用
package com.example.kafkaproducer;??
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;?
?
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaproducerApplicationTests {?
? ?@Autowired KafkaProducer kafkaProducer;?
? ?@Test
? ?public void contextLoads() {
? ? ? kafkaProducer.produce();
? ?}?
}然后控制臺就會打印一個莫名奇妙的錯誤,沒有打印任何堆棧信息,大概意思只是表達(dá)了連接不上。
Exception thrown when sending a message with key='null' and payload='' to topic
telnet ip+port 是可以通的
隨后發(fā)現(xiàn),xshell上啟動的kafka-server在報這樣一個錯,更詳細(xì)的沒有留存。
ERROR Closing socket for /127.0.0.1 because of error (kafka.network.Processor)
kafka.common.KafkaException: Wrong request type 18
百度了一下,很可能是Linux上的kafka版本和pom中引入的spring-kafka依賴不匹配造成的,于是查看對應(yīng)關(guān)系。
查看kafka,發(fā)現(xiàn)裝的是一個0.8.2.1 版本的kafka,該版本的kafka是2015年3月發(fā)布的版本,可以說是十分古老,真是不知道為什么當(dāng)初要選這么老的版本。
換了幾次spring-kafka的pom之后,依然在報這個問題,于是我選擇換更新的kafka的包。
換了2.2.0版本kafka的包,問題得到解決。
其中consumer的創(chuàng)建命令和老版本的不太一樣,且consumer和producer需使用相同的端口號,而不是像之前producer配置為broker的端口,consumer配置為zookeeper的端口號。
./bin/kafka-console-consumer.sh --bootstrap-server ip:9092 ?--topic test
且config文件夾下server.properties文件中的一些配置和之前不太一樣,需要注意的是,以下兩行配置原來是被注解了的,需要在這里取消掉注解,并配置自己的ip。
listeners = PLAINTEXT://your.host.name:9092 advertised.listeners=PLAINTEXT://your.host.name:9092
springboot、spring-kafka、kafka-client三者兼容性關(guān)系
spring官方描述的spring-kafka的版本和kafka-clients的版本對應(yīng)關(guān)系:
官方地址:https://spring.io/projects/spring-kafka

中間列:“Spring Integration for Apache Kafka Version 可忽略不看:
也就是說spring-kafka與spring-client是存在在一對多關(guān)系的,那是不是他所有的spring-client都可以選呢?
接著往下看(摘自官網(wǎng)):

他說啥 ?
- springboot 1.5 你應(yīng)該用的是spring-kafka 1.3.x.
- springboot2.0你應(yīng)該使用的是spring-kafka2.0.x.
- 如果用的是spring boot2.1.x,那么你必須使用spring-kafka的版本是2.2.x。否則就會出現(xiàn)noClass等等各種異常。
- spring-kafka的版本是2.1默認(rèn)使用的spring-client是1.1.x,當(dāng)你要使用另外兩個時,你就要使用如下的版本配置.
- 如果你用的是2.2.x的spring-kafka,只看第一張圖,你會以為2.1.x的kafka-clients也可以用。但是spring說了,此時默認(rèn)用的kafka-clients是2.0.x,如果你想用2.1.x,必須看文檔附錄,下圖的大概意思,必須換掉下圖所示的所有依賴版本。

也就是說并不是一對多 他默認(rèn)的還是只有一個kafka-client來給你的,你要選其他的可以的,你添加一些額外配置
例如:
Spring Boot 2.1.0.RELEASE 版本,因此 spring-kafka 的版本為 2.2.0.RELEASE,kafka-clients 的默認(rèn)版本為2.0.0,所以 kafka 的版本選用為 kafka_2.11-2.1.0 (前面的2.11代表的是Scala的版本后面為kafka的版本號)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
Springboot+Vue+axios實現(xiàn)文章收藏功能
這篇文章主要為大家詳細(xì)介紹了Springboot+Vue+axios實現(xiàn)文章收藏功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價值,感興趣的小伙伴們可以參考一下2022-08-08
詳解springboot設(shè)置cors跨域請求的兩種方式
這篇文章主要介紹了詳解springboot設(shè)置cors跨域請求的兩種方式,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-11-11

