結(jié)合線程池實現(xiàn)apache?kafka消費者組的誤區(qū)及解決方法
一個錯誤:多線程使用單一消費者
下圖顯現(xiàn)了一種錯誤的使用KafkaConsumer的方法
- 創(chuàng)建多個線程用來消費kafka數(shù)據(jù)
- 多線程使用同一個KafkaConsumer對象
- 在單線程中使用這個KafkaConsumer對象,完成數(shù)據(jù)拉取、處理、提交偏移量。

這種方式之所以錯誤的原因是:KafkaConsumer是線程不安全的,可能出現(xiàn)把同一批數(shù)據(jù)既給線程A處理,也交給線程B處理重復(fù)消費的問題。
一個誤區(qū):多線程就是消費者組
下圖中體現(xiàn)的是一種正常的KafkaConsumer使用方式
- 使用一個KafkaConsumer拉取數(shù)據(jù)
- 拉取數(shù)據(jù)后將一個批次的數(shù)據(jù)交給一個線程去處理

這個處理方式不是錯誤,但是他只是一個消費者在消費kafka消息隊列中的數(shù)據(jù),不是消費者組的方式消費數(shù)據(jù)。無法充分利用kafka分區(qū)提升消息處理的吞吐量。
常規(guī)正確做法:使用線程池實現(xiàn)消費者組
下面的方法是常規(guī)的正確實現(xiàn)方式

- 因為KafkaConsumer是線程不安全的,所以不能跨線程使用KafkaConsumer
- 每個線程持有一個KafkaConsumer對象
- 多個線程的實現(xiàn)可以使用線程池,線程池的線程數(shù)量等于消費者組內(nèi)消費者的數(shù)量
public class MyConsumerGroup {
public void groupConsumer(){
ExecutorService executorService = Executors.newFixedThreadPool(6);
for (int i = 0; i < 6; i++) {
MyConsumer myConsumer = new MyConsumer();
executorService.execute(myConsumer);
}
}
}MyConsumer方法需要實現(xiàn)Runnable接口,并在run方法中調(diào)用MyConsumer#pollData。MyConsumer的代碼參考本專欄的《消費者Java實現(xiàn)》( 集成apache kafka-clients實現(xiàn)數(shù)據(jù)消費者)
@Override
public void run() {
MyConsumer myConsumer = new MyConsumer();
myConsumer.pollData();
}到此這篇關(guān)于結(jié)合線程池實現(xiàn)apache kafka消費者組的文章就介紹到這了,更多相關(guān)apache kafka消費者組內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java語言實現(xiàn)數(shù)據(jù)結(jié)構(gòu)棧代碼詳解
這篇文章主要介紹了Java語言實現(xiàn)數(shù)據(jù)結(jié)構(gòu)棧代碼詳解,簡單介紹了棧的概念,然后分享了線性棧和鏈?zhǔn)綏5腏ava代碼,具有一定參考價值,需要的朋友可以了解下。2017-11-11
Java 實現(xiàn)分布式服務(wù)的調(diào)用鏈跟蹤
分布式服務(wù)中完成某一個業(yè)務(wù)動作,需要服務(wù)之間的相互協(xié)作才能完成,在這一次動作引起的多服務(wù)的聯(lián)動我們需要用1個唯一標(biāo)識關(guān)聯(lián)起來,關(guān)聯(lián)起來就是調(diào)用鏈的跟蹤。本文介紹了Java 實現(xiàn)分布式服務(wù)的調(diào)用鏈跟蹤的步驟2021-06-06
IDEA解決src和resource下創(chuàng)建多級目錄的操作
這篇文章主要介紹了IDEA解決src和resource下創(chuàng)建多級目錄的操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2021-02-02

