java分布式流處理組件Producer入門(mén)詳解
前言
前面兩章我們花費(fèi)了很長(zhǎng)的時(shí)間將Kafka的整體架構(gòu),包括其中涉及到的角色、每個(gè)角色所對(duì)對(duì)應(yīng)的用途進(jìn)行了整體的一個(gè)串聯(lián)。然后我們也通過(guò)Kafka所提供的腳本進(jìn)行了相對(duì)應(yīng)的操作,并且對(duì)核心參數(shù)進(jìn)行了分析。
相信大家對(duì)于Kafka的處理和消費(fèi)流程已經(jīng)有了一個(gè)比較籠統(tǒng)的概念。光是如此還是不夠的,那么接下來(lái)我們就開(kāi)始對(duì)其中的每一個(gè)角色做一個(gè)詳細(xì)的分析。
先從生產(chǎn)者開(kāi)始,我們需要對(duì)其中有如下了解:
- 了解外部數(shù)據(jù)是如何通過(guò)生產(chǎn)者,經(jīng)過(guò)層層編碼,然后進(jìn)入到了集群內(nèi)部進(jìn)行存儲(chǔ)。
- 同步和異步數(shù)據(jù)是如何操作,Broker如何處理應(yīng)答。
- 消息發(fā)送失敗后的重試機(jī)制
- ...
等等的一切,慢慢往下看吧~~~
基于Java的API
首先, 在了解生產(chǎn)者發(fā)送消息的原理之前,我們應(yīng)該先學(xué)會(huì)如何去發(fā)送消息。
Kafka為我們提供了很多項(xiàng)目可以操作的API客戶(hù)端,包括:
- C/C++
- GO
- Python
- ...
更多需要對(duì)接Kafka的項(xiàng)目可以點(diǎn)擊這里進(jìn)行查看
我本人屬于Java開(kāi)發(fā),所以我這里就通過(guò)Java項(xiàng)目來(lái)做一個(gè)QuickStart項(xiàng)目
通過(guò)官網(wǎng)查看API菜單,官方文檔上也是Java的版本。我們根據(jù)提示一步步操作即可~
先新建maven項(xiàng)目,并且引入對(duì)應(yīng)的****kafka-clients依賴(lài)
建議:Kafka-clients依賴(lài)版本,最好和安裝的kafka版本一致
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
同步發(fā)送
Kafka生產(chǎn)者主要靠KafkaProducer來(lái)進(jìn)行操作。點(diǎn)擊到對(duì)應(yīng)的文檔頁(yè)面,我們可以看到關(guān)于KafkaProducer<K,V> 的詳細(xì)信息。
一個(gè)好的組件是非常貼心的, 甚至我們都不用去網(wǎng)上搜任何相關(guān)的資料,只需要通過(guò)查看對(duì)應(yīng)的注釋就可以知道這個(gè)東西該怎么用。
Properties config = new Properties();
// --bootstrap-server
config.setProperty(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"master:9092,node01:9092,node02:9092"
);
// key 序列化器
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化器
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
try(Producer<String, String> producer = new KafkaProducer<>(config)) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"newTopic001",
"key01",
"data from " + KafkaQuickProducer.class.getName()
);
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println(
MessageFormat.format("{0}\t{1}\t{2}\t{3}",
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
recordMetadata.timestamp()
)
);
} catch (Exception e) {
e.printStackTrace();
}
以上代碼就是同步發(fā)送的過(guò)程,這已經(jīng)是在開(kāi)發(fā)過(guò)程中需要配置的最小單元,而其他關(guān)于生產(chǎn)者的配置,我們可以通過(guò)ProducerConfig來(lái)進(jìn)行查看
** 與命令行上的參數(shù),基本上是一模一樣的**
而關(guān)于序列化器的問(wèn)題,我們?cè)谙旅嬖淼牟糠终f(shuō)明
異步發(fā)送
我們?cè)谡{(diào)用同步send的時(shí)候,發(fā)現(xiàn)有兩個(gè)參數(shù)的方法, 而這個(gè)方法實(shí)現(xiàn)的就是****異步發(fā)送
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);
異步發(fā)送會(huì)將發(fā)送結(jié)果以事件驅(qū)動(dòng)的形式傳遞,那么這里,我們就需要注意一點(diǎn):
- 程序調(diào)用完成之后,不能讓他立即執(zhí)行,否則我們無(wú)法查看到具體的發(fā)送結(jié)果
接下來(lái)我們看具體的程序?qū)崿F(xiàn)。理論上:我們只需要改最后發(fā)送的部分
Properties config = new Properties();
// --bootstrap-server
config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,node01:9092,node02:9092");
// key 序列化器
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
// value 序列化器
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
try(Producer<String, String> producer = new KafkaProducer<>(config)) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"newTopic001",
"key01",
"data from " + KafkaQuickProducer.class.getName()
);
async(producer, record);
} catch (Exception e) {
e.printStackTrace();
}
// 異步發(fā)送
private static void async(Producer<String, String> producer, ProducerRecord<String, String> record) {
producer.send(record, (recordMetadata, exception) -> {
if (null != exception) {
exception.printStackTrace();
return;
}
System.out.println(
MessageFormat.format("{0}\t{1}\t{2}\t{3}",
recordMetadata.topic(),
recordMetadata.partition(),
recordMetadata.offset(),
recordMetadata.timestamp()
)
);
});
try {
// 將程序進(jìn)行阻塞,防止由于消息發(fā)送成功之后進(jìn)程停止而無(wú)法接收到事件反饋
System.in.read();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
這屬于整個(gè)生產(chǎn)者發(fā)送消息方式的最小單元,本文屬于Producer入門(mén)階段。
在ProducerConfig中還包含了非常多的配置項(xiàng),更多的配置信息我們會(huì)在優(yōu)化章節(jié)中說(shuō)明。
原理

在第一部分,我們已經(jīng)了解到,關(guān)于生產(chǎn)者最基本的使用方式,到這里,其實(shí)我想跟大家聊一聊:
- 生產(chǎn)者在發(fā)送消息的時(shí)候中間到底經(jīng)歷了什么?
大家應(yīng)該已經(jīng)看到上面的那張?jiān)韴D,我們可以從中找出答案!
主線(xiàn)程
**這里我們分為兩個(gè)線(xiàn)程塊來(lái)說(shuō)明, 第一部分是Main主線(xiàn)程, 也就是生產(chǎn)者在調(diào)用****send()**方法時(shí)所在的線(xiàn)程
在這里,我們可以看到:
- 外部數(shù)據(jù)首先被封裝為ProducerRecord**,然后調(diào)用**send()**方法。
- 在send()過(guò)程中,經(jīng)過(guò)攔截器、序列化器、分區(qū)器等處理之后進(jìn)入到RecordAccumulator中。
接下來(lái)我們仔細(xì)聊一聊攔截器、序列化器、分區(qū)器的作用
攔截器
攔截器很類(lèi)似于我們?cè)赟pringMVC中Interceptor的功能,而且在Producer中我們是可以自定義攔截器的。
我們可以在發(fā)送之前對(duì)數(shù)據(jù)進(jìn)行攔截處理,比如說(shuō):統(tǒng)計(jì)生產(chǎn)者發(fā)送數(shù)據(jù)的總量等等。
當(dāng)然目前來(lái)講,我們?nèi)绻婚_(kāi)發(fā)Kafka監(jiān)控平臺(tái)的話(huà),這里攔截器的用處并不大。我們忽略不計(jì)即可
后續(xù)如果有機(jī)會(huì)的話(huà),我們可以專(zhuān)門(mén)寫(xiě)篇文章,用來(lái)介紹如何開(kāi)發(fā)一個(gè)攔截器
序列化器
而序列化器,主要對(duì)兩個(gè)部分的數(shù)據(jù)進(jìn)行處理:
- Key
- Value
byte[] serializedKey = serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key()); byte[] serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
從本質(zhì)上來(lái)講,外部數(shù)據(jù)屬于屬于對(duì)象,而對(duì)象不能直接通過(guò)網(wǎng)絡(luò)進(jìn)行傳輸。 所以我們就需要一個(gè)序列化器,將它轉(zhuǎn)換成字節(jié)數(shù)組,進(jìn)而進(jìn)行傳輸

Kafka本身為我們提供了很多可用的序列化器,不過(guò)我們能用到最多的還是StringSerializer。
在生產(chǎn)端將消息進(jìn)行序列話(huà),那么在消費(fèi)端必然會(huì)進(jìn)行反序列化操作
分區(qū)器
我們知道Kafka是以Topic為消息發(fā)送的主體,不過(guò)由于Topic是一個(gè)虛擬的概念, 所以我們沒(méi)有辦法在實(shí)際中查看到關(guān)于Topic的相關(guān)信息。 但是前面我們也說(shuō)過(guò), 當(dāng)前Topic下的消息數(shù)據(jù)都是通過(guò)Partition進(jìn)行存儲(chǔ)的。
發(fā)送出去的消息需要存儲(chǔ)在哪個(gè)分區(qū)中就是通過(guò)分區(qū)器來(lái)進(jìn)行指定的,在我們沒(méi)有指定分區(qū)策略的情況下,生產(chǎn)者會(huì)通過(guò)默認(rèn)的分區(qū)策略指定當(dāng)前消息應(yīng)該存儲(chǔ)在哪個(gè)分區(qū)下

分區(qū)的內(nèi)容還是比較多的,我們會(huì)在下一節(jié)做詳細(xì)的說(shuō)明
RecordAccumulator
此時(shí),在主線(xiàn)程的區(qū)域中,當(dāng)消息進(jìn)入到默認(rèn)大小為32m的記錄緩沖區(qū)時(shí), 本區(qū)的工作就到此結(jié)束。
緩沖區(qū)中有多個(gè)雙端隊(duì)列,分別對(duì)應(yīng)Topic不同的分區(qū)。每一個(gè)分區(qū)就會(huì)創(chuàng)建一個(gè)雙端隊(duì)列。
此時(shí)的消息將會(huì)被按照批次的方式存放在隊(duì)列中, 默認(rèn)一批為16k大小。當(dāng)緩沖區(qū)達(dá)到指定條件之后,****sender線(xiàn)程將會(huì)被喚醒,Sender程序?qū)?huì)沖隊(duì)列中不斷拉出消息進(jìn)行下一步的發(fā)送
Sender線(xiàn)程
影響Sender線(xiàn)程喚醒的條件
想要喚醒Sender線(xiàn)程有兩個(gè)因素,但不是說(shuō)這兩個(gè)條件都必須滿(mǎn)足,他們是或的關(guān)系。
batch.size是一個(gè)條件,這也是后期針對(duì)生產(chǎn)者優(yōu)化的主要參數(shù)之一。
當(dāng)發(fā)送消息之后,生產(chǎn)者會(huì)將消息進(jìn)行整合。將其按照一批一批的方式發(fā)送給Broker,從而減少網(wǎng)絡(luò)間的傳輸請(qǐng)求次數(shù)。默認(rèn)情況下為16k。
而如果一批數(shù)據(jù)的大小累計(jì)達(dá)到了設(shè)置的batch.size之后,sender才會(huì)做發(fā)送數(shù)據(jù)的操作
這是第一個(gè)限制
下面再來(lái)介紹一個(gè)非常強(qiáng)勢(shì)的參數(shù):liner.ms。生產(chǎn)者優(yōu)化的主要參數(shù)之二。
這么說(shuō)吧,如果你設(shè)置的liner.ms=0,表示不延遲直接發(fā)送。那么batch.size就不會(huì)生效了
而liner.ms=0屬于默認(rèn)配置
如果數(shù)據(jù)一直沒(méi)有達(dá)到設(shè)置的batch.size大小,數(shù)據(jù)也不能不發(fā)對(duì)吧。所以Kafka也就為我們提供了這樣的參數(shù):
- 當(dāng)sender等待liner.ms設(shè)置的時(shí)間之后【單位ms】,不管數(shù)據(jù)如何都會(huì)將消息進(jìn)行發(fā)送
- 如未設(shè)置當(dāng)前參數(shù),表示沒(méi)有延遲,直接發(fā)送
下面舉個(gè)小例子
config.setProperty(ProducerConfig.LINGER_MS_CONFIG, "5000");

開(kāi)始發(fā)送
將RecordAccumulator內(nèi)存儲(chǔ)的數(shù)據(jù)拉取出來(lái)之后,開(kāi)始將其創(chuàng)建為一個(gè)個(gè)的Request請(qǐng)求。這里需要注意的是:
- NetworkClient并非一股腦的將全部可發(fā)送數(shù)據(jù)進(jìn)行傳輸請(qǐng)求
正相反,為了能夠保證不同分區(qū)所對(duì)應(yīng)DQueue的數(shù)據(jù)進(jìn)入到對(duì)應(yīng)的Broker所在的分區(qū)內(nèi),Kafka將按照<BrokerId, Request>的形式對(duì)請(qǐng)求進(jìn)行傳輸。如果傳輸?shù)竭_(dá)Broker之后沒(méi)有acks應(yīng)答,那么當(dāng)前節(jié)點(diǎn)下最多能夠保存5個(gè)未響應(yīng)的請(qǐng)求。
ACKS
這里簡(jiǎn)單聊一下它的應(yīng)答方式。在ProducerConfig.ACKS_DOC下我們也可以看到相關(guān)的說(shuō)明:
- acks=0: 生產(chǎn)者不會(huì)等待Broker的應(yīng)答,直接表示消息已經(jīng)發(fā)送成功。而消息有沒(méi)有真正達(dá)到Broker,不關(guān)心。
當(dāng)然了,這種方式在性能上來(lái)講是最好的,適合一些數(shù)據(jù)不重要的場(chǎng)景
- acks=1: 生產(chǎn)者將消息發(fā)送到Broker之后,由Leader在本地將消息進(jìn)行存儲(chǔ)之后,返回發(fā)送成功的應(yīng)答。
如果Follower還沒(méi)有同步到消息,Leader就已經(jīng)掛了。那么此時(shí)就會(huì)出現(xiàn)消息丟失的情況
- acks=all:生產(chǎn)者將消息發(fā)送到Broker之后,由Leader在本地將消息進(jìn)行存儲(chǔ),并且Follower同步完消息之后才會(huì)返回發(fā)送成功的應(yīng)答。
這種方式是最能保證數(shù)據(jù)安全的情況,但是性能也是最低的~
最后:
- 當(dāng)Broker返回成功應(yīng)答之后,RecordAccumulator中的數(shù)據(jù)將會(huì)被清理
- 如果失敗,可以嘗試重試等操作
總結(jié)
而到了這里,本次關(guān)于Producer理論篇就結(jié)束了,針對(duì)API部分大家需要多練,可以先看看關(guān)于ProducerConfig內(nèi)的配置參數(shù)說(shuō)明,可以嘗試先練習(xí)練習(xí)。

很貼心的框架
后面我們還會(huì)介紹一些核心參數(shù)
Kafka的分區(qū)處理也是一個(gè)比較核心的內(nèi)容,接下來(lái)我們會(huì)著重介紹
以上就是java分布式流處理組件Producer入門(mén)詳解的詳細(xì)內(nèi)容,更多關(guān)于java分布式Producer的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
SpringBoot與velocity的結(jié)合的示例代碼
本篇文章主要介紹了SpringBoot與velocity的結(jié)合的示例代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-03-03
Java?實(shí)現(xiàn)判定順序表中是否包含某個(gè)元素(思路詳解)
這篇文章主要介紹了Java?實(shí)現(xiàn)判定順序表中是否包含某個(gè)元素,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-06-06
Maven打包沒(méi)有指定主類(lèi)問(wèn)題(xxx.jar中沒(méi)有主清單屬性)
這篇文章主要介紹了Maven打包沒(méi)有指定主類(lèi)問(wèn)題(xxx.jar中沒(méi)有主清單屬性),具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-04-04
IDEA打開(kāi)項(xiàng)目所有東西都在報(bào)紅報(bào)錯(cuò)的解決方案
這篇文章主要給大家介紹了關(guān)于IDEA打開(kāi)項(xiàng)目所有東西都在報(bào)紅報(bào)錯(cuò)的三個(gè)解決方案,文中通過(guò)圖文介紹的非常詳細(xì),對(duì)大家學(xué)習(xí)或者使用idea具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2023-06-06
JAVA內(nèi)存模型和Happens-Before規(guī)則知識(shí)點(diǎn)講解
在本篇文章里小編給大家整理的是一篇關(guān)于JAVA內(nèi)存模型和Happens-Before規(guī)則知識(shí)點(diǎn)內(nèi)容,有需要的朋友們跟著學(xué)習(xí)下。2020-11-11
Java多線(xiàn)程Callable和Future接口區(qū)別
這篇文章主要介紹了Java多線(xiàn)程Callable和Future接口區(qū)別,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
Spring Boot日志技術(shù)logback原理及配置解析
這篇文章主要介紹了Spring Boot日志技術(shù)logback原理及用法解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-07-07
在SpringBoot中使用MongoDB完成數(shù)據(jù)存儲(chǔ)
本文主要介紹了在SpringBoot中如惡化使用MongoDB完成數(shù)據(jù)存儲(chǔ),接下來(lái)這篇我們將圍繞MongoDB進(jìn)行,MongoDB是一個(gè)開(kāi)源的,面向文檔的NoSQL數(shù)據(jù)庫(kù)管理系統(tǒng),使用類(lèi)似JSON的BSON(二進(jìn)制JSON)格式來(lái)存儲(chǔ)數(shù)據(jù),具有靈活的數(shù)據(jù)模型和強(qiáng)大的查詢(xún)功能,需要的朋友可以參考下2023-11-11

