Kafka消費(fèi)客戶端協(xié)調(diào)器GroupCoordinator詳解
協(xié)調(diào)器的生命周期
- 什么是協(xié)調(diào)器
- 協(xié)調(diào)器工作原理
- 協(xié)調(diào)器的Rebalance機(jī)制
GroupCoordinator的創(chuàng)建
在Kafka啟動(dòng)的時(shí)候, 會(huì)自動(dòng)創(chuàng)建并啟動(dòng)GroupCoordinator

這個(gè)GroupCoordinator對(duì)象創(chuàng)建的時(shí)候傳入的幾個(gè)屬性需要介紹一下
new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)
offsetConfig相關(guān)配置
private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
maxMetadataSize = config.offsetMetadataMaxSize,
loadBufferSize = config.offsetsLoadBufferSize,
offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
offsetsTopicNumPartitions = config.offsetsTopicPartitions,
offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
)
| 屬性 | 介紹 | 默認(rèn)值 |
|---|---|---|
| offset.metadata.max.bytes | ||
| offsets.load.buffer.size | ||
| offsets.retention.minutes | ||
| offsets.retention.check.interval.ms | ||
| offsets.topic.num.partitions | ||
| offsets.commit.timeout.ms | ||
| offsets.topic.segment.bytes | ||
| offsets.topic.replication.factor | ||
| offsets.topic.compression.codec | ||
| offsets.commit.timeout.ms | ||
| offsets.commit.required.acks |
groupConfig相關(guān)配置
| 屬性 | 介紹 | 默認(rèn)值 |
|---|---|---|
| group.min.session.timeout.ms | ||
| group.max.session.timeout.ms | ||
| group.initial.rebalance.delay.ms | ||
| group.max.size | ||
| group.initial.rebalance.delay.ms |
groupMetadataManager
組元信息管理類
heartbeatPurgatory
心跳監(jiān)測操作,每一秒執(zhí)行一次
joinPurgatory
GroupCoordinator的啟動(dòng)
def startup(enableMetadataExpiration: Boolean = true): Unit = {
info("Starting up.")
groupManager.startup(enableMetadataExpiration)
isActive.set(true)
info("Startup complete.")
}
這個(gè)啟動(dòng)對(duì)于GroupCoordinator來說只是給屬性isActive標(biāo)記為了true, 但是同時(shí)呢也調(diào)用了GroupMetadataManager.startup
定時(shí)清理Group元信息
這個(gè)Group元信息管理類呢啟動(dòng)了一個(gè)定時(shí)任務(wù), 名字為:delete-expired-group-metadata
每隔600000ms的時(shí)候就執(zhí)行一下 清理過期組元信息的操作, 這個(gè)600000ms時(shí)間是代碼寫死的。
TODO:GroupMetadataManager#cleanupGroupMetadata
GroupCoordinator OnElection
當(dāng)內(nèi)部topic __consumer_offsets 有分區(qū)的Leader變更的時(shí)候,比如觸發(fā)了 LeaderAndIsr的請(qǐng)求, 發(fā)現(xiàn)分區(qū)Leader進(jìn)行了切換。
那么就會(huì)執(zhí)行 GroupCoordinator#OnElection 的接口, 這個(gè)接口會(huì)把任務(wù)丟個(gè)一個(gè)單線程的調(diào)度程序, 專門處理offset元數(shù)據(jù)緩存加載和卸載的。線程名稱前綴為group-metadata-manager- ,一個(gè)分區(qū)一個(gè)任務(wù)
最終執(zhí)行的任務(wù)內(nèi)容是:GroupMetadataManager#doLoadGroupsAndOffsets
__consumer_offsets 的key有兩種消息類型
key version 0: 消費(fèi)組消費(fèi)偏移量信息 -> value version 0: [offset, metadata, timestamp]
key version 1: 消費(fèi)組消費(fèi)偏移量信息-> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]
key version 2: 消費(fèi)組的元信息 -> value version 0: [protocol_type, generation, protocol, leader,

例如 version:3 的schemaForGroupValue
Version-0
{
protocol_type: STRING,
generation: INT32,
protocol: NULLABLE_STRING,
leader: NULLABLE_STRING,
members: ARRAY({
member_id: STRING,
client_id: STRING,
client_host: STRING,
session_timeout: INT32,
subscription: BYTES,
assignment: BYTES
})
}
Version-1
{
protocol_type: STRING,
generation: INT32,
protocol: NULLABLE_STRING,
leader: NULLABLE_STRING,
members: ARRAY({
member_id: STRING,
client_id: STRING,
client_host: STRING,
rebalance_timeout: INT32,
session_timeout: INT32,
subscription: BYTES,
assignment: BYTES
})
}
Version-2
{
protocol_type: STRING,
generation: INT32,
protocol: NULLABLE_STRING,
leader: NULLABLE_STRING,
current_state_timestamp: INT64,
members: ARRAY({
member_id: STRING,
client_id: STRING,
client_host: STRING,
rebalance_timeout: INT32,
session_timeout: INT32,
subscription: BYTES,
assignment: BYTES
})
}
Version-3
{
protocol_type: STRING,
generation: INT32,
protocol: NULLABLE_STRING,
leader: NULLABLE_STRING,
current_state_timestamp: INT64,
members: ARRAY({
member_id: STRING,
group_instance_id: NULLABLE_STRING,
client_id: STRING,
client_host: STRING,
rebalance_timeout: INT32,
session_timeout: INT32,
subscription: BYTES,
assignment: BYTES
})
}
Value每個(gè)版本的 Scheme如下
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
new Field(PROTOCOL_TYPE_KEY, STRING),
new Field(GENERATION_KEY, INT32),
new Field(PROTOCOL_KEY, NULLABLE_STRING),
new Field(LEADER_KEY, NULLABLE_STRING),
new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
GroupCoordinator onResignation

以上就是Kafka消費(fèi)客戶端協(xié)調(diào)器GroupCoordinator詳解的詳細(xì)內(nèi)容,更多關(guān)于Kafka GroupCoordinator的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Thymeleaf對(duì)象的使用之基本對(duì)象實(shí)例解析
這篇文章主要介紹了Thymeleaf對(duì)象的使用之基本對(duì)象實(shí)例解析,文中通過示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2020-04-04
完美解決idea moudle沒有藍(lán)色的小方塊的問題
這篇文章主要介紹了完美解決idea moudle沒有藍(lán)色的小方塊的問題,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。一起跟隨小編過來看看吧2021-02-02
Spring靜態(tài)代理和動(dòng)態(tài)代理代碼詳解
這篇文章主要介紹了Spring靜態(tài)代理和動(dòng)態(tài)代理代碼詳解,具有一定參考價(jià)值,需要的朋友可以了解下。2017-11-11
Java程序運(yùn)行時(shí)出現(xiàn)亂碼問題的排查與解決方法
本文主要介紹了Java程序運(yùn)行時(shí)出現(xiàn)亂碼問題的排查與解決方法,包括檢查Java源文件編碼、檢查編譯時(shí)的編碼設(shè)置、檢查運(yùn)行時(shí)的編碼設(shè)置、檢查命令提示符的代碼頁、檢查命令提示符的字體、檢查 Java 程序的輸出代碼以及檢查環(huán)境變量,需要的朋友可以參考下2025-03-03
從零開始Java實(shí)現(xiàn)Parser?Combinator
這篇文章主要為大家介紹了從零開始Java實(shí)現(xiàn)Parser?Combinator過程及原理詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2023-05-05
使用Java實(shí)現(xiàn)查看線程的運(yùn)行狀態(tài)(附源碼)
在現(xiàn)代 Java 應(yīng)用中,線程的運(yùn)行狀態(tài)對(duì)于排查問題和優(yōu)化性能具有至關(guān)重要的作用,本文將使用Java編寫一個(gè)查看線程運(yùn)行狀態(tài)的工具,有需要的可以了解下2025-03-03
MyBatis如何通過xml方式實(shí)現(xiàn)SaveOrUpdate
這篇文章主要講如何通過xml方式實(shí)現(xiàn)SaveOrUpdate,但是仍然建議在Service中實(shí)現(xiàn),本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友參考下吧2023-06-06

