Kafka的生產(chǎn)者和消費者機制使用方式
Kafka提供了兩套客戶端API,HighLevel API和LowLevel API。
HighLevel API封裝了kafka的運行細(xì)節(jié),使用起來比較簡單,是企業(yè)開發(fā)過程中最常用的客戶端API。 而LowLevel API則需要客戶端自己管理Kafka的運行細(xì)節(jié),Partition,Offset這些數(shù)據(jù)都由客戶端自行管理。
1.基礎(chǔ)的客戶端
1.1消息發(fā)送者的主流程
- 設(shè)置Producer核心屬性
- 構(gòu)建消息:Kafka的消息是一個Key-Value結(jié)構(gòu)的消息。其中,key和value都可以是任意對象類型。其中,key主要是用來進(jìn)行Partition分區(qū)的,業(yè)務(wù)上更關(guān)心的是value。
- 使用Producer發(fā)送消息:通常用到的就是單向發(fā)送、同步發(fā)送和異步發(fā)送三種發(fā)送方式。
注意:
- 單向發(fā)送:不關(guān)心服務(wù)端的應(yīng)答;
- 同步發(fā)送:獲取到服務(wù)端應(yīng)答消息前,會阻塞當(dāng)前線程;
- 異步發(fā)送:消息發(fā)送后不阻塞,服務(wù)端有應(yīng)答后會觸發(fā)回調(diào)函數(shù)。
1.2消息消費者主流程
- 設(shè)置Consumer核心屬性
- 拉取消息:Kafka采用Consumer主動拉取消息的Pull模式。consumer主動從Broker上拉取一批感興趣的消息。
- 處理消息,提交位點:消費者將消息拉取完成后,就可以交由業(yè)務(wù)自行處理對應(yīng)的這一批消息了。只是消費者需要向Broker提交偏移量offset。如果不提交Offset,Broker會認(rèn)為消費者端消息處理失敗了,還會重復(fù)進(jìn)行推送。
2.客戶端工作機制
2.1消費者分組消費機制

offset偏移量表示每個消費者組在每個Partiton中已經(jīng)消費處理的進(jìn)度。在Kafka中,可以看到消費者組的Offset記錄情況。

這個Offset偏移量,需要消費者處理完成后主動向Kafka的Broker提交。提交完成后,Broker就會更新消費進(jìn)度,表示這個消息已經(jīng)被這個消費者組處理完了。但是如果消費者沒有提交Offset,Broker就會認(rèn)為這個消息還沒有被處理過,就會重新往對應(yīng)的消費者組進(jìn)行推送,不過這次,一般會盡量推送給同一個消費者組當(dāng)中的其他消費者實例。
在示例當(dāng)中,是通過業(yè)務(wù)端主動調(diào)用Consumer的commitAsync(異步)方法或者commitSync(同步)方法主動提交的,Kafka中自然也提供了自動提交Offset的方式。使用自動提交,只需要在Comsumer中配置ENABLE_AUTO_COMMIT_CONFIG屬性即可。
(1)Offset是Kafka進(jìn)行消息推送控制的關(guān)鍵之處
- Offset是根據(jù)Group、Partition分開記錄的。消費者如果一個Partition對應(yīng)多個Consumer消費者實例,那么每個Consumer實例都會往Broker提交同一個Partition的不同Offset,這時候Broker要聽誰的?所以一個Partition最多只能同時被一個Consumer消費。也就是說,示例中四個Partition的Topic,那么同一個消費者組中最多就只能配置四個消費者實例。
- 這么關(guān)鍵的Offset數(shù)據(jù),保存在Broker端,但是卻是由"不靠譜"的消費者主導(dǎo)推進(jìn),這顯然是不夠安全的。那么應(yīng)該如何提高Offset數(shù)據(jù)的安全性呢?如果你有興趣自己觀察,會發(fā)現(xiàn)在Consumer中,實際上也提供了AUTO_OFFSET_RESET_CONFIG參數(shù),來指定消費者組在服務(wù)端的Offset不存在時如何進(jìn)行后續(xù)消費。(有可能服務(wù)端初始化Consumer Group的Offset失敗,也有可能Consumer Group當(dāng)前的Offset對應(yīng)的數(shù)據(jù)文件被過期刪除了。)這就相當(dāng)于服務(wù)端做的兜底保障。

(2)消費者應(yīng)該要如何保證offset的安全性
有兩種方式:一種是異步提交。就是消費者在處理業(yè)務(wù)的同時,異步向Broker提交Offset。這樣好處是消費者的效率會比較高,但是如果消費者的消息處理失敗了,而offset又成功提交了。這就會造成消息丟失。
另一種方式是同步提交。消費者保證處理完所有業(yè)務(wù)后,再提交Offset。這樣的好處自然是消息不會因為offset丟失了。因為如果業(yè)務(wù)處理失敗,消費者就可以不去提交Offset,這樣消息還可以重試。但是壞處是消費者處理信息自然就慢了。另外還會產(chǎn)生消息重復(fù)。因為Broker端不可能一直等待消費者提交。如果消費者的業(yè)務(wù)處理時間比較長,這時在消費者正常處理消息的過程中,Broker端就已經(jīng)等不下去了,認(rèn)為這個消費者處理失敗了。這時就會往同組的其他消費者實例投遞消息,這就造成了消息重復(fù)處理。
這類問題的根源在于Offset反映的是消息的處理進(jìn)度。而消息處理進(jìn)度跟業(yè)務(wù)的處理進(jìn)度又是不同步的。所有我們可以換一種思路,將Offset從Broker端抽取出來,放到第三方存儲比如Redis里自行管理。這樣就可以自己控制用業(yè)務(wù)的處理進(jìn)度推進(jìn)Offset往前更新。
2.2生產(chǎn)者攔截器機制
攔截器機制一般用得比較少,主要用在一些統(tǒng)一添加時間等類似的業(yè)務(wù)場景。比如,用Kafka傳遞一些POJO,就可以用攔截器統(tǒng)一添加時間屬性。但是我們平常用Kafka傳遞的都是String類型的消息,POJO類型的消息,Kafka可以傳嗎?這就要用到下面的消息序列化機制。
2.3消息序列化機制
Kafka內(nèi)部發(fā)送和接收消息的時候,使用的是byte[]字節(jié)數(shù)組的方式(RPC底層也是用這種通訊格式)

在Kafka中,對于常用的一些基礎(chǔ)數(shù)據(jù)類型,都已經(jīng)提供了對應(yīng)的實現(xiàn)類。但是,如果需要使用一些自定義的消息格式,比如自己定制的POJO,就需要定制具體的序列化機制了。(需要考慮的是如何用二進(jìn)制來描述業(yè)務(wù)數(shù)據(jù))
序列化機制的實現(xiàn)方法:
如對于一個通常的POJO類型,可以將他的屬性拆分成兩種類型:一種類型是定長的基礎(chǔ)類型,比如Integer, Long,Double等。這些基礎(chǔ)類型轉(zhuǎn)化成二進(jìn)制數(shù)組都是定長的。這類屬性可以直接轉(zhuǎn)成序列化數(shù)組,在反序列化時,只要按照定長去讀取二進(jìn)制數(shù)據(jù)就可以反序列化了。另一種是不定長的浮動類型,比如String,或者基于String的JSON類型等。這種浮動類型的基礎(chǔ)數(shù)據(jù)轉(zhuǎn)化成二進(jìn)制數(shù)組,長度都是不一定的。對于這類數(shù)據(jù),通常的處理方式都是先往二進(jìn)制數(shù)組中寫入一個定長的數(shù)據(jù)的長度數(shù)據(jù)(Integer或者Long類型),然后再繼續(xù)寫入數(shù)據(jù)本身。這樣,反序列化時,就可以先讀取一個定長的長度,再按照這個長度去讀取對應(yīng)長度的二進(jìn)制數(shù)據(jù),這樣就能讀取到數(shù)據(jù)的完整二進(jìn)制內(nèi)容。
2.4消息分區(qū)路由機制
Kafka默認(rèn)提供了三種消費者的分區(qū)分配策略:
- range策略:比如一個Topic有10個Partiton(partition 0~9) 一個消費者組下有三個Consumer(consumer1~3)。Range策略就會將分區(qū)0~3分給一個Consumer,4~6給一個Consumer,7~9給一個Consumer。
- round-robin策略:輪詢分配策略,可以理解為在Consumer中一個一個輪流分配分區(qū)。比如0,3,6,9分區(qū)給一個Consumer,1,4,7分區(qū)給一個Consumer,然后2,5,8給一個Consumer。
- sticky策略:粘性策略。這個策略有兩個原則:
- 在開始分區(qū)時,盡量保持分區(qū)的分配均勻。比如按照Range策略分(這一步實際上是隨機的)。
- 分區(qū)的分配盡可能的與上一次分配的保持一致。比如在range分區(qū)的情況下,第三個Consumer的服務(wù)宕機了,那么按照sticky策略,就會保持consumer1和consumer2原有的分區(qū)分配情況。然后將consumer3分配的7~9分區(qū)盡量平均的分配到另外兩個consumer上。這種粘性策略可以很好的保持Consumer的數(shù)據(jù)穩(wěn)定性。
也可以自定義實現(xiàn)分區(qū)路由機制。
2.5生產(chǎn)者消息緩存機制
Kafka生產(chǎn)者為了避免高并發(fā)請求對服務(wù)端造成過大壓力,每次發(fā)消息時并不是一條一條發(fā)往服務(wù)端,而是增加了一個高速緩存,將消息集中到緩存后,批量進(jìn)行發(fā)送。這種緩存機制也是高并發(fā)處理時非常常用的一種機制。
Kafka的消息緩存機制涉及到KafkaProducer中的兩個關(guān)鍵組件: accumulator 和 sender

RecordAccumulator,就是Kafka生產(chǎn)者的消息累加器。Kafka Producer要發(fā)送的消息都會在ReocrdAccumulator中緩存起來,然后再分批發(fā)送給kafka broker。在RecordAccumulator中,會針對每一個Partition,維護(hù)一個Deque雙端隊列,這些Deque隊列基本上是和Kafka服務(wù)端的Topic下的Partition對應(yīng)的。每個Deque里會放入若干個ProducerBatch數(shù)據(jù)。 (涉及到兩個參數(shù):BUFFER_MEMORY_CONFIG 是指RecordAccumulator緩沖區(qū)大小,BATCH_SIZE_CONFIG 是指緩沖區(qū)中每一個batch的大?。?/p>
KafkaProducer每次發(fā)送的消息,都會根據(jù)key分配到對應(yīng)的Deque隊列中。然后每個消息都會保存在這些隊列中的某一個ProducerBatch中。而消息分發(fā)的規(guī)則,就是由上面的Partitioner組件完成的。
Sender就是KafkaProducer中用來發(fā)送消息的一個單獨的線程。從這里可以看到,每個KafkaProducer對象都對應(yīng)一個sender線程。他會負(fù)責(zé)將RecordAccumulator中的消息發(fā)送給Kafka。

Sender也并不是一次就把RecordAccumulator中緩存的所有消息都發(fā)送出去,而是每次只拿一部分消息。他只獲取RecordAccumulator中緩存內(nèi)容達(dá)到BATCH_SIZE_CONFIG大小的ProducerBatch消息。當(dāng)然,如果消息比較少,ProducerBatch中的消息大小長期達(dá)不到BATCH_SIZE_CONFIG的話,Sender也不會一直等待。最多等待LINGER_MS_CONFIG時長。然后就會將ProducerBatch中的消息讀取出來。(LINGER_MS_CONFIG默認(rèn)值是0 )
然后,Sender對讀取出來的消息,會以Broker為key,緩存到一個對應(yīng)的隊列當(dāng)中。這些隊列當(dāng)中的消息就稱為InflightRequest。接下來這些Inflight就會一 一發(fā)往Kafka對應(yīng)的Broker中,直到收到Broker的響應(yīng),才會從隊列中移除。這些隊列也并不會無限緩存,最多緩存MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION(默認(rèn)值為5)個請求。
注意:生產(chǎn)者緩存機制的主要目的是將消息打包,減少網(wǎng)絡(luò)IO頻率。所以,在Sender的InflightRequest隊列中,消息也不是一條一條發(fā)送給Broker的,而是一批消息一起往Broker發(fā)送。而這就意味著這一批消息是沒有固定的先后順序的。
最后,Sender會通過其中的一個Selector組件完成與Kafka的IO請求,并接收Kafka的響應(yīng)。
補充:Kafka的生產(chǎn)者緩存機制是Kafka面對海量消息時非常重要的優(yōu)化機制。合理優(yōu)化這些參數(shù),對于Kafka集群性能提升是非常重要的。比如如果你的消息體比較大,那么應(yīng)該考慮加大batch.size,盡量提升batch的緩存效率。而如果Producer要發(fā)送的消息確實非常多,那么就需要考慮加大total.memory參數(shù),盡量避免緩存不夠造成的阻塞。如果發(fā)現(xiàn)生產(chǎn)者發(fā)送消息比較慢,那么可以考慮提升max.in.flight.requests.per.connection參數(shù),這樣能加大消息發(fā)送的吞吐量。
2.6發(fā)送應(yīng)答機制
在Producer將消息發(fā)送到Broker后,要怎么確定消息是不是成功發(fā)到Broker上了呢?
這里涉及到的,就是在Producer端一個不太起眼的屬性ACKS_CONFIG。這個屬性更大的作用在于保證消息的安全性,尤其在replica-factor備份因子比較大的Topic中,尤為重要。
- acks=0,生產(chǎn)者不關(guān)心Broker端有沒有將消息寫入到Partition,只發(fā)送消息就不管了。吞吐量是最高的,但是數(shù)據(jù)安全性是最低的。
- acks=all or -1,生產(chǎn)者需要等Broker端的所有Partiton(Leader Partition以及其對應(yīng)的Follower Partition )都寫完了才能得到返回結(jié)果,這樣數(shù)據(jù)是最安全的,但是每次發(fā)消息需要等待更長的時間,吞吐量是最低的。
- acks設(shè)置成1,則是一種相對中和的策略。Leader Partition在完成自己的消息寫入后,就向生產(chǎn)者返回結(jié)果。
應(yīng)用場景:在生產(chǎn)環(huán)境中,acks=0可靠性太差,很少使用。acks=1,一般用于傳輸日志等,允許個別數(shù)據(jù)丟失的場景。使用范圍最廣。acks=-1,一般用于傳輸敏感數(shù)據(jù),比如與錢相關(guān)的數(shù)據(jù)。
注意:如果ack設(shè)置為all或者-1 ,Kafka也并不是強制要求所有Partition都寫入數(shù)據(jù)后才響應(yīng)。在Kafka的Broker 服務(wù)端會有一個配置參數(shù)min.insync.replicas,控制Leader Partition在完成多少個Partition的消息寫入后,往Producer返回響應(yīng)。這個參數(shù)可以在broker.conf文件中進(jìn)行配置。
2.7生產(chǎn)者消息冪等性
(1)生產(chǎn)者消息冪等性介紹
當(dāng)Producer的acks設(shè)置為1或-1時,Producer每次發(fā)送消息都是需要獲取Broker端返回的RecordMetadate的。這個過程就需要兩次跨網(wǎng)絡(luò)請求。

如果要保證消息安全,那么對于每個消息,這兩次網(wǎng)絡(luò)請求就必須要求是冪等的。但是,網(wǎng)絡(luò)是不靠譜的,在高并發(fā)場景下,往往沒辦法保證這兩個請求是冪等的。Producer發(fā)送消息的過程中,如果第一步請求成功了, 但是第二步卻沒有返回。這時,Producer就會認(rèn)為消息發(fā)送失敗了。那么Producer必然會發(fā)起重試。重試次數(shù)由參數(shù)ProducerConfig.RETRIES_CONFIG,默認(rèn)值是Integer.MAX。
這時問題就來了。Producer會重復(fù)發(fā)送多條消息到Broker中。Kafka如何保證無論Producer向Broker發(fā)送多少次重復(fù)的數(shù)據(jù),Broker端都只保留一條消息,而不會重復(fù)保存多條消息呢?這就是Kafka消息生產(chǎn)者的冪等性問題。
(2)解決方案
分布式數(shù)據(jù)傳遞過程中的三個數(shù)據(jù)語義:
at-least-once:至少一次;at-most-once:最多一次;exactly-once:精確一次。
通常意義上,at-least-once可以保證數(shù)據(jù)不丟失,但是不能保證數(shù)據(jù)不重復(fù)。而at-most-once保證數(shù)據(jù)不重復(fù),但是又不能保證數(shù)據(jù)不丟失。這兩種語義雖然都有缺陷,但是實現(xiàn)起來相對來說比較簡單。但是對一些敏感的業(yè)務(wù)數(shù)據(jù),往往要求數(shù)據(jù)即不重復(fù)也不丟失,這就需要支持Exactly-once語義。而要支持Exactly-once語義,需要有非常精密的設(shè)計。
Kafka為了保證消息發(fā)送的Exactly-once語義,增加了幾個概念:
- PID:每個新的Producer在初始化的過程中就會被分配一個唯一的PID。這個PID對用戶是不可見的。
- Sequence Numer: 對于每個PID,這個Producer針對Partition會維護(hù)一個sequenceNumber。這是一個從0開始單調(diào)遞增的數(shù)字。當(dāng)Producer要往同一個Partition發(fā)送消息時,這個Sequence Number就會加1。然后會隨著消息一起發(fā)往Broker。
- Broker端則會針對每個<PID,Partition>維護(hù)一個序列號(SN),只有當(dāng)對應(yīng)的SequenceNumber = SN+1時,Broker才會接收消息,同時將SN更新為SN+1。否則,SequenceNumber過小就認(rèn)為消息已經(jīng)寫入了,不需要再重復(fù)寫入。而如果SequenceNumber過大,就會認(rèn)為中間可能有數(shù)據(jù)丟失了。對生產(chǎn)者就會拋出一個OutOfOrderSequenceException。

2.8生產(chǎn)者消息事務(wù)機制
(1)事務(wù)引入原因
通過生產(chǎn)者消息冪等性問題,能夠解決單生產(chǎn)者消息寫入單分區(qū)的的冪等性問題。
但是,如果是要寫入多個分區(qū)呢?比如像我們的示例中,就發(fā)送了五條消息,他們的key都是不同的。這批消息就有可能寫入多個Partition,而這些Partition是分布在不同Broker上的。這意味著,Producer需要對多個Broker同時保證消息的冪等性。

這時候,通過上面的生產(chǎn)者消息冪等性機制就無法保證所有消息的冪等了。這時候就需要有一個事務(wù)機制,保證這一批消息最好同時成功的保持冪等性?;蛘哌@一批消息同時失敗,這樣生產(chǎn)者就可以開始進(jìn)行整體重試,消息不至于重復(fù)。
(2)具體流程
Producer中的幾個API:

注意:
1.一個TransactionId只會對應(yīng)一個PID
如果當(dāng)前一個Producer的事務(wù)沒有提交,而另一個新的Producer保持相同的TransactionId,這時舊的生產(chǎn)者會立即失效,無法繼續(xù)發(fā)送消息。
2.跨會話事務(wù)對齊
如果某個Producer實例異常宕機了,事務(wù)沒有被正常提交。那么新的TransactionId相同的Producer實例會對舊的事務(wù)進(jìn)行補齊。保證舊事務(wù)要么提交,要么終止。這樣新的Producer實例就可以以一個正常的狀態(tài)開始工作。
3.客戶端流程總結(jié)

總結(jié)
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
解析SpringBoot中使用LoadTimeWeaving技術(shù)實現(xiàn)AOP功能
這篇文章主要介紹了SpringBoot中使用LoadTimeWeaving技術(shù)實現(xiàn)AOP功能,AOP面向切面編程,通過為目標(biāo)類織入切面的方式,實現(xiàn)對目標(biāo)類功能的增強,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2022-09-09
java網(wǎng)絡(luò)編程學(xué)習(xí)java聊天程序代碼分享
java聊天程序代碼分享,大家參考使用吧2013-12-12
Java中Runnable與Callable接口的區(qū)別詳解
這篇文章主要為大家詳細(xì)介紹了Java中Runnable與Callable接口的區(qū)別,文中的示例代碼講解詳細(xì),對我們學(xué)習(xí)Java有一定的幫助,需要的可以參考一下2023-03-03
MyBatis 數(shù)據(jù)封裝全攻略(告別空值與映射混亂問題)
本文系統(tǒng)介紹MyBatis數(shù)據(jù)封裝的常見問題及解決方案,涵蓋resultType、resultMap、駝峰轉(zhuǎn)換、嵌套處理、懶加載等核心機制,并推薦MyBatis-Plus簡化開發(fā),提升效率與可維護(hù)性,感興趣的朋友跟隨小編一起看看吧2025-09-09

