Java Kafka分區(qū)發(fā)送及消費(fèi)實(shí)戰(zhàn)
前言
Kafka是現(xiàn)在非常熱門的分布式消息隊(duì)列,常用于微服務(wù)間異步通信,業(yè)務(wù)解耦等場景。kafka的性能非常強(qiáng)大,但是單個(gè)微服務(wù)吞吐性能是有上限的,我們就會用到分布式微服務(wù),多消費(fèi)者多生產(chǎn)者進(jìn)行數(shù)據(jù)處理,保證性能同時(shí)也能根據(jù)業(yè)務(wù)量進(jìn)行橫向拓展,對于同一個(gè)微服務(wù)的多個(gè)實(shí)例,輸入輸出的topic是同一個(gè),這時(shí)候我們就可以利用Kafka分區(qū)消費(fèi)來解決這個(gè)問題。
業(yè)務(wù)場景
我們開發(fā)的是一個(gè)物聯(lián)網(wǎng)系統(tǒng),大量設(shè)備接入到平臺實(shí)時(shí)發(fā)送數(shù)據(jù),有秒級數(shù)據(jù)和分鐘級別數(shù)據(jù)等等,處理流程包含接入、處理、存儲,這三個(gè)模塊間就是使用kafka進(jìn)行數(shù)據(jù)流轉(zhuǎn),數(shù)據(jù)處理模塊中包含多個(gè)微服務(wù),單條數(shù)據(jù)會經(jīng)歷多次處理,部分業(yè)務(wù)耗時(shí)較長,導(dǎo)致在高頻率接收到數(shù)據(jù)時(shí)候單體服務(wù)無法達(dá)到吞吐平衡,于是對于這些服務(wù)進(jìn)行了分布式部署,多個(gè)實(shí)例進(jìn)行消費(fèi)處理。
業(yè)務(wù)實(shí)現(xiàn)
不指定分區(qū)
我們在給kafka發(fā)送消息時(shí)候,如果不指定分區(qū),是不需要手動創(chuàng)建topic的,發(fā)送時(shí)沒有topic,kafka會自動創(chuàng)建一個(gè)分區(qū)為1的topic,如下:
@Service
public class ProductService {
@Autowired
private KafkaTemplate kafkaTemplate;
public void send(String msg, String topic) {
kafkaTemplate.send(topic, msg);
}
}指定分區(qū)
topic分區(qū)初始化及配置
指定分區(qū)發(fā)送時(shí)候,如果未配置topic分區(qū)數(shù),指定>0的分區(qū),會提示分區(qū)不存在,這時(shí)候我們就需要提前創(chuàng)建好topic及分區(qū)
手動創(chuàng)建,服務(wù)啟動前,使用kafka tool手動創(chuàng)建topic 不推薦 x
自動創(chuàng)建,服務(wù)啟動時(shí),使用KafkaClient創(chuàng)建 推薦 √
/**
* 初始化多分區(qū)的topic 基于springboot2
*/
@Component
public void TopicInitRunner implements ApplicationRunner {
@Autowired
private AdminClient adminClient;
@Override
public void run(ApplicationArguments args) throws Exception {
// 通過配置文件讀取自定義配置的topic名及分區(qū)數(shù) 省略...
// Key topic V 分區(qū)數(shù)
Map<String, Integer> topicPartitionMap = new HashMap<>();
for (Map.Entry<String, Integer> e : topicPartitionMap.entrySet()) {
createTopic(e.getKey(), e.getValue());
}
}
public void createTopic(String topic, int partition) {
NewTopic newTopic = new NewTopic(topic, partition);
adminClient.createTopics(Lists.newArrayList(newTopic));
}
}
/**
* 配置類參考 基于springboot2
* 如果只進(jìn)行普通的單消息發(fā)送 無需添加此配置到項(xiàng)目中
*/
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String servers;
@Bean
public AdminClient adminClient() {
return AdminClient.create(kafkaAdmin().getConfig());
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> props = Maps.newHashMap();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
return new KafkaAdmin(props);
}
}生產(chǎn)者分區(qū)發(fā)送方案
上面講到如何初始化分區(qū)topic,這時(shí)候我們的kafka環(huán)境已經(jīng)準(zhǔn)備好了,我們先使用TopicInitRunner為我們創(chuàng)建一個(gè)名稱為 partition-topic 分區(qū)數(shù)為三,現(xiàn)在講一講如何均勻的講消息發(fā)送的每個(gè)分區(qū)上,如何保證多消費(fèi)者實(shí)例是負(fù)載均衡的,具體方案如下:
- 1.因?yàn)槊織l消息都是設(shè)備上傳的,都會有設(shè)備id,先給每個(gè)設(shè)備生成一個(gè)自增號,這樣1000個(gè)設(shè)備,每個(gè)設(shè)備就會有0到999的自增號,放到緩存中,每次根據(jù)消息中的設(shè)備id獲取到該設(shè)備的自增號
- 2.使用自增號對分區(qū)數(shù)進(jìn)行取模操作,代碼實(shí)現(xiàn)如下:
public class ProductService {
/**
* data為需要發(fā)送的數(shù)據(jù)
*/
public void partitionSend(String topic, int partition, JSONObject data) {
// 獲取設(shè)備id
String deviceId = data.getString("deviceId");
// 獲取自增數(shù) 如果是新設(shè)備會創(chuàng)建一個(gè)并放入緩存中
int inc = getDeviceInc(deviceId);
// 如果分區(qū)數(shù)為3 設(shè)備自增id為1 取模結(jié)果為1 就是發(fā)送到1分區(qū) 這樣1000個(gè)設(shè)備就可以保證每個(gè)分區(qū)發(fā)送數(shù)據(jù)量是1000 / 3
int targetPartition = Math.floorMod(inc, partition);
// 分區(qū)發(fā)送時(shí)候 需要指定一個(gè)唯一k 可以使用uuid或者百度提供的雪花算法獲取id 字符串即可
kafkaTemplate.send(topic, partition, getUuid(), data.toJSONString());
}
}消費(fèi)者
我們講到消費(fèi)者使用分布式部署,一個(gè)微服務(wù)有多個(gè)實(shí)例,我們只需要按照服務(wù)監(jiān)聽的topic分區(qū)數(shù)創(chuàng)建對應(yīng)數(shù)目的服務(wù)實(shí)例即可,這樣kafka就會自動分配對應(yīng)分區(qū)的數(shù)據(jù)到每個(gè)實(shí)例。
我們采取批量消費(fèi),進(jìn)一步提高服務(wù)吞吐性能,消費(fèi)及配置代碼如下,配置文件參考springbootkafka配置即可,主要設(shè)計(jì)kafka服務(wù)配置,消費(fèi)及生產(chǎn)配置,比較核心的是
@Component
public class DataListener {
@Autowired
private MongoTemplate mongoTemplate;
/**
* 站點(diǎn)報(bào)文監(jiān)聽消費(fèi)
*
* @param records
*/
@KafkaListener(topics = "partition-topic", containerFactory = "batchConsumerFactory")
public void iotSiteHistoryMessageConsumer(List<ConsumerRecord<String, String>> records) {
}
/**
* 消費(fèi)者配置
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = Maps.newHashMap();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
/**
* 批量消費(fèi)配置
*/
@Bean
public KafkaListenerContainerFactory batchConsumerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true);
return factory;
}
}到此這篇關(guān)于Java Kafka分區(qū)發(fā)送及消費(fèi)實(shí)戰(zhàn)的文章就介紹到這了,更多相關(guān)Kafka分區(qū)發(fā)送及消費(fèi)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
RestTemplate如何添加請求頭headers和請求體body
這篇文章主要介紹了RestTemplate如何添加請求頭headers和請求體body問題,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2023-07-07
Java Hutool工具實(shí)現(xiàn)驗(yàn)證碼生成及Excel文件的導(dǎo)入和導(dǎo)出
Hutool是一個(gè)小而全的Java工具類庫,通過靜態(tài)方法封裝,降低相關(guān)API的學(xué)習(xí)成本,提高工作效率,本文主要介紹了使用Hutool工具實(shí)現(xiàn)驗(yàn)證碼生成和excel文件的導(dǎo)入、導(dǎo)出,需要的朋友可參考一下2021-11-11
Springboot實(shí)現(xiàn)WebMvcConfigurer接口定制mvc配置詳解
這篇文章主要介紹了Springboot實(shí)現(xiàn)WebMvcConfigurer接口定制mvc配置詳解,spring?boot拋棄了傳統(tǒng)xml配置文件,通過配置類(標(biāo)注@Configuration的類,@Configuration配置類相當(dāng)于一個(gè)xml配置文件)以JavaBean形式進(jìn)行相關(guān)配置,需要的朋友可以參考下2023-09-09
使用SpringMVC響應(yīng)json格式返回的結(jié)果類型
這篇文章主要介紹了使用SpringMVC響應(yīng)json格式返回的結(jié)果類型,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-07-07
解決springboot配置文件組解決自動配置屬性無法注入問題
在使用Spring Boot時(shí),可能會遇到配置文件屬性注入失敗的問題,本文描述了一個(gè)案例,其中嘗試使用profile文件組指定不同環(huán)境下的配置文件,但遇到了屬性無法成功注入的情況,提供的解決辦法是將Spring Boot的版本號從2.2.0.RELEASE升級到2.4.02024-09-09
Java編程一個(gè)隨機(jī)數(shù)產(chǎn)生模塊代碼分享
這篇文章主要介紹了Java編程一個(gè)隨機(jī)數(shù)產(chǎn)生模塊代碼分享,具有一定借鑒價(jià)值,需要的朋友可以參考下。2017-12-12

