流式圖表拒絕增刪改查之kafka核心消費(fèi)邏輯下篇
前篇回顧
kafka消費(fèi)者線程
突擊檢查八股文,實(shí)現(xiàn)線程的方法有哪些?嗯?沒(méi)復(fù)習(xí)是吧,行沒(méi)關(guān)系,那感謝參加本次面試哈。
常用的幾種方式分別是:
- 繼承Thread類,重寫(xiě)run方法
- 實(shí)現(xiàn)Runbale接口,重寫(xiě)run方法
- 實(shí)現(xiàn)Callable接口,重寫(xiě)call方法
這里我們直接創(chuàng)捷出一個(gè)任務(wù)類實(shí)現(xiàn)Runable方法,重寫(xiě)run方法,一個(gè)線程當(dāng)作一個(gè)kafka client,所以要在任務(wù)類中聲明一個(gè)KafkaConsumer的成員變量,另外創(chuàng)建任務(wù)需要指定當(dāng)前任務(wù)的名稱也就是線程名,還有要監(jiān)聽(tīng)的topic主題。
private KafkaConsumer<String, String> consumer; private String topic; private String threadName;
name和topic通過(guò)構(gòu)造方法傳進(jìn)來(lái),同時(shí)在構(gòu)造方法里完成對(duì)client的初始化操作。
/**
* 封裝必要信息
* @param bootServer 生產(chǎn)者ip
* @param groupId 分組信息
* @param topic 訂閱主題
*/
public KafkaConsumerRunnable(String bootServer, String groupId, String topic) {
this.topic = topic;
Properties props = new Properties();
props.put("bootstrap.servers", bootServer);
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "latest");
props.put("max.poll.records", 5);
props.put("session.timeout.ms", "60000");
props.put("max.poll.interval.ms", 300000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //鍵反序列化方式
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
}
這里封裝kafka client的必要信息,入?yún)ootServer為kafka集群ip,groupId為threadName,我們規(guī)定一個(gè)線程為一個(gè)kafka消費(fèi)鏈接,消費(fèi)一個(gè)topic。
上一篇線程池保證了任務(wù)不會(huì)輕易掛掉,就算掛掉了也會(huì)重新提交,所以為了節(jié)省資源不做所謂的同groupId的負(fù)載操作。session.timeout.ms和max.poll.interval.ms可以根據(jù)當(dāng)前的kafka資源靈活配置,不然可能會(huì)引發(fā)一些reblance。
enable.auto.commit設(shè)置為false,手動(dòng)提交offset,auto.offset.reset這塊由于業(yè)務(wù)特殊,本來(lái)就是流式圖表瞬時(shí)的展示,如果真的出現(xiàn)了數(shù)據(jù)丟失那就丟了吧,從最新的數(shù)據(jù)讀取。
接下來(lái)只需要處理下消費(fèi)邏輯,consumer.subscribe(Collections.singletonList(this.topic))開(kāi)始訂閱監(jiān)聽(tīng)kafka數(shù)據(jù),搞一個(gè)while true不斷的消費(fèi)數(shù)據(jù),try catch只需要對(duì)WakeupException做處理,kafka客戶端會(huì)在關(guān)閉的時(shí)候拋出WakeupException異常。
finally里提交offset,無(wú)論這條offset對(duì)應(yīng)的數(shù)據(jù)消費(fèi)成功還是失敗都是消費(fèi)過(guò)了,失敗了就過(guò)去了。
@Override
public void run() {
consumer.subscribe(Collections.singletonList(this.topic));
String key = "stream_chart:" + this.name;
Thread.currentThread().setName(key);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
// 如果隊(duì)列中沒(méi)有消息 等待KAFKA_TIME_OUT后調(diào)用poll,如果有消息立即消費(fèi)
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
log.info("線程 {} 消費(fèi)kafka數(shù)據(jù) -> {} \n", Thread.currentThread().getName(), value);
RedisConfig.getRedisTemplate().opsForZSet().add(key, value, Instant.now().getEpochSecond() * 1000);
}
}
} catch (WakeupException e) {
log.info("ignore for shutdown", e);
} finally {
consumer.commitAsync();
}
}
我們消費(fèi)到數(shù)據(jù)直接放到redis的zset結(jié)構(gòu)里,當(dāng)前的時(shí)間戳作為score,最后留一個(gè)關(guān)閉客戶端的后門(mén)
// 退出后關(guān)掉客戶端
public void shutDown() {
consumer.wakeup();
}
任務(wù)提交
任務(wù)提交這塊只需要在業(yè)務(wù)service中注入線程池,創(chuàng)建對(duì)應(yīng)的KafkaRunable任務(wù)封裝對(duì)應(yīng)的信息,執(zhí)行execute即可。
這里有個(gè)坑需要注意下,第二次突擊檢查八股文,線程池提交方法submit與execute的區(qū)別說(shuō)一下。不知道的立刻去熟讀并背誦。
public class TestTheadPool {
public static void main(String[] args) {
ExecutorService executorService= Executors.newFixedThreadPool(1);
executorService.submit(new task("submit"));
executorService.execute(new task("execute"));
}
}
class task implements Runnable{
private String name;
public task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println(this.name + " start task");
int i=1/0;
}
}
熟悉的同學(xué)通過(guò)示例代碼可以看出來(lái),submit提交的線程不會(huì)拋出異常代碼,只有獲取Future返回值并執(zhí)行g(shù)et方法才會(huì)捕獲到異常。這塊涉及到異步的東西不再贅述
try {
Future<?> submit = executorService.submit(new task("submit"));
submit.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
所以我們要使用execute執(zhí)行,不然kafka消費(fèi)線程里消費(fèi)失敗了攔截不到就不會(huì)被重新提交,導(dǎo)致線程掛掉。
以上就是流式圖表拒絕增刪改查之kafka核心消費(fèi)邏輯下篇的詳細(xì)內(nèi)容,更多關(guān)于kafka消費(fèi)流式圖表的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
一文帶你入門(mén)JDK8新特性——Lambda表達(dá)式
這篇文章主要介紹了JDK8新特性——Lambda表達(dá)式的相關(guān)資料,幫助大家更好的理解和學(xué)習(xí)JAVA開(kāi)發(fā),感興趣的朋友可以了解下2020-08-08
Java實(shí)現(xiàn)微信掃碼登入的實(shí)例代碼
這篇文章主要介紹了java實(shí)現(xiàn)微信掃碼登入功能,本文通過(guò)實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-06-06
Java實(shí)現(xiàn)簡(jiǎn)單的萬(wàn)年歷
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)簡(jiǎn)單的萬(wàn)年歷,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-04-04
java簡(jiǎn)單實(shí)現(xiàn)斗地主發(fā)牌功能
這篇文章主要為大家詳細(xì)介紹了java簡(jiǎn)單實(shí)現(xiàn)斗地主發(fā)牌功能,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-06-06
idea項(xiàng)目實(shí)現(xiàn)移除和添加git
本文指導(dǎo)讀者如何從官網(wǎng)下載并安裝Git,以及在IDEA中配置Git的詳細(xì)步驟,首先,用戶需訪問(wèn)Git官方網(wǎng)站下載適合自己操作系統(tǒng)的Git版本并完成安裝,接著,在IDEA中通過(guò)設(shè)置找到git.exe文件以配置Gi2024-10-10

