Java Kafka實(shí)現(xiàn)延遲隊(duì)列的示例代碼
kafka作為一個(gè)使用廣泛的消息隊(duì)列,很多人都不會(huì)陌生,但當(dāng)你在網(wǎng)上搜索“kafka 延遲隊(duì)列”,出現(xiàn)的都是一些講解時(shí)間輪或者只是提供了一些思路,并沒(méi)有一份真實(shí)可用的代碼實(shí)現(xiàn),今天我們就來(lái)打破這個(gè)現(xiàn)象,提供一份可運(yùn)行的代碼,拋磚引玉,吸引更多的大神來(lái)分享。
基于kafka如何實(shí)現(xiàn)延遲隊(duì)列
想要解決一個(gè)問(wèn)題,我們需要先分解問(wèn)題。kafka作為一個(gè)高性能的消息隊(duì)列,只要消費(fèi)能力足夠,發(fā)出的消息都是會(huì)立刻收到的,因此我們需要想一個(gè)辦法,讓消息延遲發(fā)送出去。
網(wǎng)上已經(jīng)有大神給出了如下方案:
- 在發(fā)送延遲消息時(shí)不直接發(fā)送到目標(biāo)topic,而是發(fā)送到一個(gè)用于處理延遲消息的topic,例如
delay-minutes-1 - 寫一段代碼拉取
delay-minutes-1中的消息,將滿足條件的消息發(fā)送到真正的目標(biāo)主題里。
就像畫一匹馬一樣簡(jiǎn)單。

方案是好的,但是我們還需要更多細(xì)節(jié)。
完善細(xì)節(jié)
問(wèn)題出在哪里?
問(wèn)題出在延遲消息發(fā)出去之后,代碼程序就會(huì)立刻收到延遲消息,要如何處理才能讓延遲消息等待一段時(shí)間才發(fā)送到真正的topic里面。
可能有同學(xué)會(huì)覺(jué)得很簡(jiǎn)單嘛,在代碼程序收到消息之后判斷條件不滿足,就調(diào)用sleep方法,過(guò)了一段時(shí)間我再進(jìn)行下一個(gè)循環(huán)拉取消息。
真的可行嗎?
一切好像都很美好,但這是不可行的。
這是因?yàn)樵谳喸僰afka拉取消息的時(shí)候,它會(huì)返回由max.poll.records配置指定的一批消息,但是當(dāng)程序代碼不能在max.poll.interval.ms配置的期望時(shí)間內(nèi)處理這些消息的話,kafka就會(huì)認(rèn)為這個(gè)消費(fèi)者已經(jīng)掛了,會(huì)進(jìn)行rebalance,同時(shí)你這個(gè)消費(fèi)者就無(wú)法再拉取到任何消息了。
舉個(gè)例子:當(dāng)你需要一個(gè)24小時(shí)的延遲消息隊(duì)列,在代碼里面寫下了Thread.sleep(1000*60*60*24);,為了不發(fā)生rebalance,你把max.poll.interval.ms 也改成了1000*60*60*24,這個(gè)時(shí)候你或許會(huì)感覺(jué)到一絲絲的怪異,我是誰(shuí)?我在哪?我為什么要寫出來(lái)這樣的代碼?
其實(shí)我們可以更優(yōu)雅的處理這個(gè)問(wèn)題。
KafkaConsumer 提供了暫停和恢復(fù)的API函數(shù),調(diào)用消費(fèi)者的暫停方法后就無(wú)法再拉取到新的消息,同時(shí)長(zhǎng)時(shí)間不消費(fèi)kafka也不會(huì)認(rèn)為這個(gè)消費(fèi)者已經(jīng)掛掉了。另外為了能夠更加優(yōu)雅,我們會(huì)啟動(dòng)一個(gè)定時(shí)器來(lái)替換sleep。,完整流程如下圖,當(dāng)消費(fèi)者發(fā)現(xiàn)消息不滿足條件時(shí),我們就暫停消費(fèi)者,并把偏移量seek到上一次消費(fèi)的位置以便等待下一個(gè)周期再次消費(fèi)這條消息。

Java代碼實(shí)現(xiàn)
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
@SpringBootTest
public class DelayQueueTest {
private KafkaConsumer<String, String> consumer;
private KafkaProducer<String, String> producer;
private volatile Boolean exit = false;
private final Object lock = new Object();
private final String servers = "";
@BeforeEach
void initConsumer() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "d");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "5000");
consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer());
}
@BeforeEach
void initProducer() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<>(props);
}
@Test
void testDelayQueue() throws JsonProcessingException, InterruptedException {
String topic = "delay-minutes-1";
List<String> topics = Collections.singletonList(topic);
consumer.subscribe(topics);
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
synchronized (lock) {
consumer.resume(consumer.paused());
lock.notify();
}
}
}, 0, 1000);
do {
synchronized (lock) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(200));
if (consumerRecords.isEmpty()) {
lock.wait();
continue;
}
boolean timed = false;
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
long timestamp = consumerRecord.timestamp();
TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
if (timestamp + 60 * 1000 < System.currentTimeMillis()) {
String value = consumerRecord.value();
ObjectMapper objectMapper = new ObjectMapper();
JsonNode jsonNode = objectMapper.readTree(value);
JsonNode jsonNodeTopic = jsonNode.get("topic");
String appTopic = null, appKey = null, appValue = null;
if (jsonNodeTopic != null) {
appTopic = jsonNodeTopic.asText();
}
if (appTopic == null) {
continue;
}
JsonNode jsonNodeKey = jsonNode.get("key");
if (jsonNodeKey != null) {
appKey = jsonNode.asText();
}
JsonNode jsonNodeValue = jsonNode.get("value");
if (jsonNodeValue != null) {
appValue = jsonNodeValue.asText();
}
// send to application topic
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(appTopic, appKey, appValue);
try {
producer.send(producerRecord).get();
// success. commit message
OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1);
HashMap<TopicPartition, OffsetAndMetadata> metadataHashMap = new HashMap<>();
metadataHashMap.put(topicPartition, offsetAndMetadata);
consumer.commitSync(metadataHashMap);
} catch (ExecutionException e) {
consumer.pause(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, consumerRecord.offset());
timed = true;
break;
}
} else {
consumer.pause(Collections.singletonList(topicPartition));
consumer.seek(topicPartition, consumerRecord.offset());
timed = true;
break;
}
}
if (timed) {
lock.wait();
}
}
} while (!exit);
}
}
這段程序是基于SpringBoot 2.4.4版本和 kafka-client 2.7.0版本編寫的一個(gè)單元測(cè)試,需要修改私有變量servers為kafka broker的地址。
在啟動(dòng)程序后,向Topic delay-minutes-1 發(fā)送如以下格式的json字符串?dāng)?shù)據(jù)
{
"topic": "target",
"key": "key1",
"value": "value1"
}
同時(shí)啟動(dòng)一個(gè)消費(fèi)者監(jiān)聽(tīng)topic target,在一分鐘后,將會(huì)收到一條 key="key1", value="value1"的數(shù)據(jù)。
還需要做什么
創(chuàng)建多個(gè)topic用于處理不同時(shí)間的延遲消息,例如delay-minutes-1 delay-minutes-5 delay-minutes-10 delay-minutes-15以提供指數(shù)級(jí)別的延遲時(shí)間,這樣比一個(gè)topic要好很多,畢竟在順序拉取消息的時(shí)候,有一條消息不滿足條件,后面的將全部進(jìn)行排隊(duì)。
到此這篇關(guān)于Java Kafka實(shí)現(xiàn)延遲隊(duì)列的示例代碼的文章就介紹到這了,更多相關(guān)Java Kafka延遲隊(duì)列內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java的優(yōu)先隊(duì)列PriorityQueue原理及實(shí)例分析
這篇文章主要介紹了Java的優(yōu)先隊(duì)列PriorityQueue原理解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-12-12
SpringBoot 攔截器和自定義注解判斷請(qǐng)求是否合法
這篇文章主要介紹了SpringBoot 攔截器和自定義注解判斷請(qǐng)求是否合法,幫助大家更好的理解和使用springboot框架,感興趣的朋友可以了解下2020-12-12
詳解SpringCloud微服務(wù)架構(gòu)之Hystrix斷路器
本篇文章主要介紹了詳解SpringCloud微服務(wù)架構(gòu)之Hystrix斷路器,Hystrix是一個(gè)庫(kù),通過(guò)添加延遲容差和容錯(cuò)邏輯來(lái)幫助您控制這些分布式服務(wù)之間的交互,有興趣的可以了解一下2018-01-01
SpringBoot+隨機(jī)鹽值+雙重MD5實(shí)現(xiàn)加密登錄
數(shù)據(jù)加密在很多項(xiàng)目上都可以用到,大部分都會(huì)采用MD5進(jìn)行加密,本文主要介紹了SpringBoot+隨機(jī)鹽值+雙重MD5實(shí)現(xiàn)加密登錄,具有一定的參考價(jià)值,感興趣的可以了解一下2024-02-02
spring-boot-plus V1.4.0發(fā)布 集成用戶角色權(quán)限部門管理(推薦)
這篇文章主要介紹了spring-boot-plus V1.4.0發(fā)布 集成用戶角色權(quán)限部門管理,本文給大家介紹的非常詳細(xì),具有一定的參考借鑒價(jià)值需要的朋友可以參考下2019-11-11
ShardingSphere結(jié)合MySQL實(shí)現(xiàn)分庫(kù)分表的項(xiàng)目實(shí)踐
在實(shí)際開發(fā)中,如果表的數(shù)據(jù)過(guò)大我們需要把一張表拆分成多張表,本文主要介紹了使用ShardingSphere實(shí)現(xiàn)MySQL分庫(kù)分表,具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03

