詳解kafka中的消息分區(qū)分配算法
背景
kafka有分區(qū)機(jī)制,一個(gè)主題topic在創(chuàng)建的時(shí)候,會(huì)設(shè)置分區(qū)。如果只有一個(gè)分區(qū),那所有的消費(fèi)者都訂閱的是這一個(gè)分區(qū)消息;如果有多個(gè)分區(qū)的話(huà),那消費(fèi)者之間又是如何分配的呢?
分配算法
RangeAssignor
定義
Kafka默認(rèn)采?RangeAssignor的分配算法。
RangeAssignor策略的原理是按照消費(fèi)者總數(shù)和分區(qū)總數(shù)進(jìn)?整除運(yùn)算來(lái)獲得?個(gè)跨度,然 后將分區(qū)按照跨度進(jìn)?平均分配,以保證分區(qū)盡可能均勻地分配給所有的消費(fèi)者。對(duì)于每?個(gè) Topic,RangeAssignor策略會(huì)將消費(fèi)組內(nèi)所有訂閱這個(gè)Topic的消費(fèi)者按照名稱(chēng)的字典序排序,然 后為每個(gè)消費(fèi)者劃分固定的分區(qū)范圍,如果不夠平均分配,那么字典序靠前的消費(fèi)者會(huì)被多分配 ?個(gè)分區(qū)。
這種分配?式明顯的?個(gè)問(wèn)題是隨著消費(fèi)者訂閱的Topic的數(shù)量的增加,不均衡的問(wèn)題會(huì)越來(lái) 越嚴(yán)重,?如上圖中4個(gè)分區(qū)3個(gè)消費(fèi)者的場(chǎng)景,C0會(huì)多分配?個(gè)分區(qū)。如果此時(shí)再訂閱?個(gè)分區(qū) 數(shù)為4的Topic,那么C0?會(huì)?C1、C2多分配?個(gè)分區(qū),這樣C0總共就?C1、C2多分配兩個(gè)分區(qū) 了,?且隨著Topic的增加,這個(gè)情況會(huì)越來(lái)越嚴(yán)重。
源碼分析
public class RangeAssignor extends AbstractPartitionAssignor {
....
@Override
public Map> assign(Map partitionsPerTopic, Map subscriptions) {
// 1. 獲取每個(gè)topic被多少個(gè)consumer訂閱了
Map<String,List<String>> consumersPerTopic = consumersPerTopic(subscriptions);
// 2. 存儲(chǔ)最終的分配?案
Map<String,List<String>> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet())
assignment.put(memberId, new ArrayList());
for (Map.Entry> topicEntry : consumersPerTopic.entrySet()) {
String topic = topicEntry.getKey();
List consumersForTopic = topicEntry.getValue();
// 3. 每個(gè)topic的partition數(shù)量
Integer numPartitionsForTopic = partitionsPerTopic.get(topic);
if (numPartitionsForTopic == null)
continue;
Collections.sort(consumersForTopic);
// 4. 表示平均每個(gè)consumer會(huì)分配到多少個(gè)partition
int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();
// 5. 平均分配后還剩下多少個(gè)partition未被分配
int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();
List partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);
// 6. 這?是關(guān)鍵點(diǎn),分配原則是將未能被平均分配的partition分配到前 consumersWithExtraPartition個(gè)consumer
for (int i = 0, n = consumersForTopic.size(); i < n; i++) {
int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);
int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1); assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));
}
}
return assignment;
}
}場(chǎng)景
可以完全平均分配

無(wú)法完全平均分配,排序靠前分的更多

消費(fèi)者數(shù)量大于分區(qū)數(shù)量,排名靠前先分得,排名靠后未分得分區(qū)

RoundRobinAssignor
定義
RoundRobinAssignor的分配策略是將消費(fèi)組內(nèi)訂閱的所有Topic的分區(qū)及所有消費(fèi)者進(jìn)?排序后盡 量均衡的分配(RangeAssignor是針對(duì)單個(gè)Topic的分區(qū)進(jìn)?排序分配的)。如果消費(fèi)組內(nèi),消費(fèi)者訂閱 的Topic列表是相同的(每個(gè)消費(fèi)者都訂閱了相同的Topic),那么分配結(jié)果是盡量均衡的(消費(fèi)者之間 分配到的分區(qū)數(shù)的差值不會(huì)超過(guò)1)。
源碼分析
package org.apache.kafka.clients.consumer;
public class RoundRobinAssignor extends AbstractPartitionAssignor {
@Override
public Map> assign(Map partitionsPerTopic, Map subscriptions) {
<Map> assignment = new HashMap<>();
for (String memberId : subscriptions.keySet()) assignment.put(memberId, new ArrayList()); // 1. 環(huán)狀鏈表,存儲(chǔ)所有的consumer,?次迭代完之后?會(huì)回到原點(diǎn)
CircularIterator assigner = new CircularIterator<> (Utils.sorted(subscriptions.keySet())); // 2. 獲取所有訂閱的topic的partition總數(shù) for (TopicPartition partition : allPartitionsSorted(partitionsPerTopic, subscriptions)) {
final String topic = partition.topic();
while (!subscriptions.get(assigner.peek()).topics().contains(topic))
assigner.next();
assignment.get(assigner.next()).add(partition);
}
return assignment;
}
.... }場(chǎng)景
無(wú)法完全平均分配,排序靠前分的更多

StickyAssignor
定義
盡管RoundRobinAssignor已經(jīng)在RangeAssignor上做了?些優(yōu)化來(lái)更均衡的分配分區(qū),但是在?些情況下依舊會(huì)產(chǎn)?嚴(yán)重的分配偏差,從字?意義上看,Sticky是“粘性的”,可以理解為分配結(jié)果是帶“粘性的”——每?次分配變更相對(duì) 上?次分配做最少的變動(dòng)(上?次的結(jié)果是有粘性的) 其?標(biāo)有兩點(diǎn):
- 分區(qū)的分配盡量的均衡
- 每?次重分配的結(jié)果盡量與上?次分配結(jié)果保持?致
場(chǎng)景

到此這篇關(guān)于詳解kafka中的消息分區(qū)分配算法的文章就介紹到這了,更多相關(guān)kafka消息分區(qū)分配算法內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
使用Java和SNMP4J實(shí)現(xiàn)SNMP操作完整代碼
這篇文章主要介紹了如何使用Java和SNMP4J庫(kù)進(jìn)行SNMP操作,包括初始化SNMP、創(chuàng)建目標(biāo)、創(chuàng)建PDU、發(fā)送SNMP請(qǐng)求和處理響應(yīng)等內(nèi)容,通過(guò)編寫(xiě)SnmpUtil類(lèi),展示了完整的SNMP操作流程,需要的朋友可以參考下2024-12-12
Spring實(shí)例化bean過(guò)程解析及完整代碼示例
這篇文章主要介紹了Spring實(shí)例化bean過(guò)程解析及完整代碼示例,簡(jiǎn)單分析實(shí)例化bean過(guò)程并且分享了相關(guān)實(shí)例,具有一定借鑒價(jià)值,需要的朋友可以參考下2018-01-01
解決在SpringBoot中使用@Value取不到值的問(wèn)題
這篇文章主要給大家分享解決在SpringBoot中使用@Value取不到值的問(wèn)題,文中有詳細(xì)的解決代碼供大家參考,具有一定的參考價(jià)值,需要的朋友可以參考下2023-09-09
IDEA創(chuàng)建Servlet編寫(xiě)HelloWorldServlet頁(yè)面詳細(xì)教程(圖文并茂)
在學(xué)習(xí)servlet過(guò)程中參考的教程是用eclipse完成的,而我在練習(xí)的過(guò)程中是使用IDEA的,在創(chuàng)建servlet程序時(shí)遇到了挺多困難,在此記錄一下,這篇文章主要給大家介紹了關(guān)于IDEA創(chuàng)建Servlet編寫(xiě)HelloWorldServlet頁(yè)面詳細(xì)教程的相關(guān)資料,需要的朋友可以參考下2023-10-10

