怎樣給Kafka新增分區(qū)
給Kafka新增分區(qū)
數(shù)據(jù)量猛增的時(shí)候,需要給 kafka 的 topic 新增分區(qū),增大處理的數(shù)據(jù)量,可以通過(guò)以下步驟
1、修改 topic 的分區(qū)
kafka-topics --zookeeper hadoop004:2181 --alter --topic flink-test-04 --partitions 3
2、遷移數(shù)據(jù)
生成遷移計(jì)劃,手動(dòng)新建一個(gè) json 文件
{
"topics": [
{"topic": "flink-test-03"}
],
"version": 1
}生成遷移計(jì)劃
kafka-reassign-partitions --zookeeper hadoop004:2181 --topics-to-move-json-file topic.json --broker-list “120,121,122” --generate
Current partition replica assignment:
{"version":1,"partitions":[{"topic":"flink-test-02","partition":5,"replicas":[120]},{"topic":"flink-test-02","partition":0,"replicas":[121]},{"topic":"flink-test-02","partition":2,"replicas":[120]},{"topic":"flink-test-02","partition":1,"replicas":[122]},{"topic":"flink-test-02","partition":4,"replicas":[122]},{"topic":"flink-test-02","partition":3,"replicas":[121]}]}新建一個(gè)文件reassignment.json,保存上邊這些信息
3、遷移
kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --execute
4、驗(yàn)證
kafka-reassign-partitions --zookeeper hadoop004:2181 --reassignment-json-file reassignment.json --verify
Kafka分區(qū)原理機(jī)制
分區(qū)結(jié)構(gòu)
kafka的消息總共是三層結(jié)構(gòu)
Topic(第一層結(jié)構(gòu),表示一個(gè)主題)-> Partition(分區(qū),每個(gè)消息可以有多個(gè)分區(qū)) -> 消息實(shí)例(具體的消息文本等等,一個(gè)消息實(shí)例只可能在一個(gè)分區(qū)里面,不會(huì)出現(xiàn)在多個(gè)分區(qū)中)

分區(qū)優(yōu)點(diǎn)
分區(qū)其實(shí)是一個(gè)負(fù)載均衡的思想。如此設(shè)計(jì)能使每一個(gè)分區(qū)獨(dú)自處理單獨(dú)的讀寫請(qǐng)求,提高吞吐量。
分區(qū)策略
- 輪詢策略Round-robin(未指定key新版本默認(rèn)策略)
- 隨機(jī)策略Randomness(老版本默認(rèn)策略)
- 消息鍵排序策略Key-ordering(指定了key,則使用該策略)
- 根據(jù)地理位置進(jìn)行分區(qū)
- 自定義分區(qū) 需要在生產(chǎn)者端實(shí)現(xiàn)org.apache.kafka.clients.producer.Partitioner接口,并配置一下實(shí)現(xiàn)類的全限定名
根據(jù)分區(qū)策略實(shí)現(xiàn)消息的順序消費(fèi)
可以只設(shè)置一個(gè)分區(qū),這樣子消息都是放在一個(gè)partition,肯定是先進(jìn)先出進(jìn)行消費(fèi),然而這種場(chǎng)景無(wú)法利用kafka多分區(qū)的高吞吐量以及負(fù)載均衡的優(yōu)勢(shì)。
將需要順序消費(fèi)的消息設(shè)置key,這個(gè)時(shí)候根據(jù)默認(rèn)的分區(qū)策略,kafka會(huì)將所有的相同的key放在一個(gè)partition上面,這樣既可以使用kafka的partition又可以實(shí)現(xiàn)順序消費(fèi)。
默認(rèn)分區(qū)策略源碼
/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a partition based on a hash of the key
* <li>If no partition or key is present choose a partition in a round-robin fashion
*/
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();
public void configure(Map<String, ?> configs) {}
/**
* Compute the partition for the given record.
*
* @param topic The topic name
* @param key The key to partition on (or null if no key)
* @param keyBytes serialized key to partition on (or null if no key)
* @param value The value to partition on or null
* @param valueBytes serialized value to partition on or null
* @param cluster The current cluster metadata
*/
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
從類注釋當(dāng)中已經(jīng)很明顯的看出來(lái)分區(qū)邏輯
3. 如果指定了分區(qū),則使用指定分區(qū)
4. 如果沒(méi)有指定分區(qū),但是有key,則使用hash過(guò)的key放置消息
5. 如果沒(méi)有指定分區(qū),也沒(méi)有key,則使用輪詢
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
mybatis中foreach報(bào)錯(cuò):_frch_item_0 not found的解決方法
這篇文章主要給大家介紹了mybatis中foreach報(bào)錯(cuò):_frch_item_0 not found的解決方法,文章通過(guò)示例代碼介紹了詳細(xì)的解決方法,對(duì)大家具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面來(lái)一起看看吧。2017-06-06
聊聊@value注解和@ConfigurationProperties注解的使用
這篇文章主要介紹了@value注解和@ConfigurationProperties注解的使用,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09
Java實(shí)現(xiàn)迅雷地址轉(zhuǎn)成普通地址實(shí)例代碼
本篇文章主要介紹了Java實(shí)現(xiàn)迅雷地址轉(zhuǎn)成普通地址實(shí)例代碼,非常具有實(shí)用價(jià)值,有興趣的可以了解一下。2017-03-03
IDEA2020.1同步系統(tǒng)設(shè)置到GitHub的方法
這篇文章主要介紹了IDEA2020.1同步系統(tǒng)設(shè)置到GitHub的方法,本文通過(guò)圖文并茂的形式給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-05-05

