一文詳解kafka序列化器和攔截器
介紹
本篇主要介紹kafka的攔截器和序列化器,序列化器是和數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸有關(guān),數(shù)據(jù)在網(wǎng)絡(luò)中的傳輸為字節(jié)流,所以生產(chǎn)者在發(fā)送時(shí)需要將其序列化為字節(jié)流,消費(fèi)者收到消息時(shí),需要將字節(jié)流反序列化為我們能夠識(shí)別的對(duì)象,我們不難看出,這就是RPC通信,kafka中實(shí)現(xiàn)了很多自定義協(xié)議,我們知道,在RPC通信中,只有生產(chǎn)者和消費(fèi)者的協(xié)議一樣,才能相互傳輸和解析數(shù)據(jù),在使用HTTP時(shí),我們就不用去關(guān)注協(xié)議本身,因?yàn)镠TTP是TCP的上層建筑,它自己實(shí)現(xiàn)了一套協(xié)議,我們不用去關(guān)注,但是使用RPC,我們是面向TCP編程,所以自然得約定和實(shí)現(xiàn)自己的協(xié)議,而序列化就是這過(guò)程中很重要的一部分。
攔截器是一個(gè)隨處可見(jiàn)的詞,基本上很多框架中都有攔截器機(jī)制,它的作用主要是對(duì)請(qǐng)求進(jìn)行攔截,我們可以對(duì)請(qǐng)求進(jìn)行過(guò)濾和處理,以達(dá)到業(yè)務(wù)目的,比如Spring中有HandlerInterceptor攔截器,在kafka種也有攔截器,我們可以自定義攔截器,對(duì)消息進(jìn)行攔截,比如某些異常消息我們不需要發(fā)送,那么就將其攔截下來(lái)。
序列化器
數(shù)據(jù)在網(wǎng)絡(luò)中傳輸是以字節(jié)流的形式進(jìn)行傳輸,在生產(chǎn)者端發(fā)送消息需要先進(jìn)行序列化,消費(fèi)者端進(jìn)行反序列化,序列化的方式有很多,比如jdk,json,protobuf,kryo,hessian,avro等等,在大數(shù)據(jù)量的傳輸中,序列化和反序列化的效率對(duì)吞吐量有一定的影響,kafka提供了許多序列化和反序列化器,如StringDeserializer和StringSerializer,如果我們需要自定義一個(gè)序列化和反序列化器,那么實(shí)現(xiàn)Serializer,Deserializer接口即可。
如下,kafka生產(chǎn)者在發(fā)送消息到broker之前需要序列化,消費(fèi)者從broker獲取消息后需要反序列化。

設(shè)置序列化和反序列化
生產(chǎn)者端設(shè)置序列化
//序列化
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
消費(fèi)者端設(shè)置反序列化
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
自定義序列化
/**
* 功能說(shuō)明: JSON序列化
* <p>
* Original @Author: steakliu , 2022-11-02 15:14
*/
public class JsonSerializer<T> implements Serializer<T> {
@Override
public byte[] serialize(String topic, T obj) {
try {
return obj == null ? null : JSON.toJSONBytes(obj);
}catch (Exception e){
throw new SerializationException("json serializing exception");
}
}
}
自定義反序列化
/**
* 功能說(shuō)明:JSON反序列化
* <p>
* Original @Author: steakliu-劉牌, 2022-11-11 09:38
*/
public class JsonDeserializer<T> implements Deserializer<T> {
@Override
public T deserialize(String topic, byte[] data) {
return (T) JSON.parse(data);
}
}
如上簡(jiǎn)單的使用fastjson作為序列化和反序列化工具,演示了自定義kafka的序列化和反序列化機(jī)制,我們可以根據(jù)實(shí)際情況來(lái)設(shè)計(jì)不同的序列化反序列化機(jī)制,當(dāng)然,不會(huì)是像上面這些簡(jiǎn)單,如果使用spring,那么spring提供了JSON序列化和反序列化器直接使用。
思考
雖然我們可以自定義序列化和反序列化器,但是自定義序列化和反序列化器在使用上也要保持一些一致,也就是說(shuō)生產(chǎn)者和消費(fèi)者要保持使用一種類型的序列化機(jī)制,不然會(huì)出現(xiàn)消息轉(zhuǎn)換問(wèn)題,如果我們以kafka的方式向別人提供服務(wù),那么他們就需要使用我們的制定的序列化方式,所以這可能就存在一定的耦合,如果使用Kafka的String序列化和反序列化機(jī)制,因?yàn)槭撬悄J(rèn)方式并且是字符串,通用性比較好,所以就不用去考慮序列化和反序列化,直接拿到字符串轉(zhuǎn)為對(duì)象,再進(jìn)行業(yè)務(wù)處理,使用自定義序列化的話,就直接拿到序列化后的對(duì)象,不用進(jìn)行字符串轉(zhuǎn)對(duì)象操作。
在實(shí)際場(chǎng)景中,我們可以根據(jù)自己的業(yè)務(wù)來(lái)使用何種序列化方式,沒(méi)有最好的,只有合適的。
攔截器
kafka中消費(fèi)者和生產(chǎn)者都有攔截器,分別為ConsumerInterceptor和ProducerInterceptor,只需實(shí)現(xiàn)它們即可實(shí)現(xiàn)攔截,加入攔截器后,生產(chǎn)者會(huì)在發(fā)送消息之前對(duì)消息進(jìn)行攔截處理,消費(fèi)者在收到消息之前也會(huì)經(jīng)過(guò)攔截器,那么我們就可以在攔截器中加入一些自己需要的邏輯。
如下消費(fèi)者攔截器對(duì)消息進(jìn)行攔截,如果有異常消息,則對(duì)異常消息進(jìn)行處理,只要需要對(duì)消息進(jìn)行處理,監(jiān)控等,都可以使用攔截器。
/**
* 功能說(shuō)明: 消費(fèi)者攔截器
* <p>
* Original @Author: steakliu-劉牌, 2023-03-15 10:17
*/
public class MyConsumerInterceptor implements ConsumerInterceptor<String, Message> {
@Override
public ConsumerRecords<String, Message> onConsume(ConsumerRecords<String, Message> records) {
long currentTimeMillis = System.currentTimeMillis();
records.forEach(record -> {
if ("消息異常".equals(record.value().getMessageText())) {
//處理異常消息
this.handleMsg(record);
}
});
return records;
}
private void handleMsg(ConsumerRecord<String, Message> record) {
//處理異常消息
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) { }
}
攔截器可以有多個(gè),如果設(shè)置多個(gè)攔截器,那么就形成一個(gè)攔截器鏈,一個(gè)一個(gè)地執(zhí)行。
下面是使用spring-kafka時(shí)所配置的攔截器和序列化器的基本配置。
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
# 反序列化器
key-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
# 攔截器
interceptor:
classes: com.steakliu.kafka.interceptor.MyConsumerInterceptor
spring:
json:
trusted:
packages: '*'
producer:
key-serializer: org.springframework.kafka.support.serializer.JsonSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
properties:
# 攔截器
interceptor:
classes: com.steakliu.kafka.interceptor.MyProducerInterceptor,com.steakliu.kafka.interceptor.MyProducerInterceptor2
總結(jié)
對(duì)于攔截器和序列化器,我們上面作了簡(jiǎn)單地描述和示例,對(duì)于它們,可能我們都不怎么去去用甚至沒(méi)有用過(guò),但是還是很有必要去了解的,了解它的設(shè)計(jì)和思想,在一些特殊的場(chǎng)景可能會(huì)用到。
以上就是一文詳解kafka序列化器和攔截器的詳細(xì)內(nèi)容,更多關(guān)于kafka序列化器攔截器的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot配置主從數(shù)據(jù)庫(kù)實(shí)現(xiàn)讀寫(xiě)分離
現(xiàn)在的 Web 應(yīng)用大都是讀多寫(xiě)少,本文主要介紹了SpringBoot配置主從數(shù)據(jù)庫(kù)實(shí)現(xiàn)讀寫(xiě)分離,具有一定的參考價(jià)值,感興趣的可以了解一下2023-11-11
在MyBatis中使用 # 和 $ 書(shū)寫(xiě)占位符的區(qū)別說(shuō)明
這篇文章主要介紹了在MyBatis中使用 # 和 $ 書(shū)寫(xiě)占位符的區(qū)別說(shuō)明,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過(guò)來(lái)看看吧2020-10-10
java中ThreadPoolExecutor常識(shí)匯總
這篇文章主要介紹了java中ThreadPoolExecutor常識(shí)匯總,線程池技術(shù)在并發(fā)時(shí)經(jīng)常會(huì)使用到,java中的線程池的使用是通過(guò)調(diào)用ThreadPoolExecutor來(lái)實(shí)現(xiàn)的,需要的朋友可以參考下2019-06-06
Springmvc數(shù)據(jù)格式化原理及代碼案例
這篇文章主要介紹了Springmvc數(shù)據(jù)格式化原理及代碼案例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-10-10
SpringBoot配置SwaggerUI訪問(wèn)404錯(cuò)誤的解決方法
這篇文章主要為大家詳細(xì)介紹了SpringBoot配置SwaggerUI訪問(wèn)404錯(cuò)誤的解決方法,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2017-12-12
解決MyEclipse下啟動(dòng)項(xiàng)目時(shí)JBoss內(nèi)存溢出的問(wèn)題
下面小編就為大家?guī)?lái)一篇解決MyEclipse下啟動(dòng)項(xiàng)目時(shí)JBoss內(nèi)存溢出的問(wèn)題。小編覺(jué)得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2017-07-07
Spring?IOC中對(duì)象的創(chuàng)建、策略及銷(xiāo)毀時(shí)機(jī)和生命周期詳解
這篇文章主要介紹了Spring?IOC中對(duì)象的創(chuàng)建、策略及銷(xiāo)毀時(shí)機(jī)和生命周期詳解,Spring默認(rèn)使用類的空參構(gòu)造方法創(chuàng)建bean,假如類沒(méi)有空參構(gòu)造方法,將無(wú)法完成bean的創(chuàng)建,需要的朋友可以參考下2023-08-08
SpringBoot使用swagger生成api接口文檔的方法詳解
在之前的文章中,使用mybatis-plus生成了對(duì)應(yīng)的包,在此基礎(chǔ)上,我們針對(duì)項(xiàng)目的api接口,添加swagger配置和注解,生成swagger接口文檔,需要的可以了解一下2022-10-10

