Java kafka如何實現(xiàn)自定義分區(qū)類和攔截器
生產(chǎn)者發(fā)送到對應(yīng)的分區(qū)有以下幾種方式:
(1)指定了patition,則直接使用;(可以查閱對應(yīng)的java api, 有多種參數(shù))
(2)未指定patition但指定key,通過對key的value進行hash出一個patition;
(3)patition和key都未指定,使用輪詢選出一個patition。
但是kafka提供了,自定義分區(qū)算法的功能,由業(yè)務(wù)手動實現(xiàn)分布:
1、實現(xiàn)一個自定義分區(qū)類,CustomPartitioner實現(xiàn)Partitioner
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
/**
*
* @param topic 當(dāng)前的發(fā)送的topic
* @param key 當(dāng)前的key值
* @param keyBytes 當(dāng)前的key的字節(jié)數(shù)組
* @param value 當(dāng)前的value值
* @param valueBytes 當(dāng)前的value的字節(jié)數(shù)組
* @param cluster
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//這邊根據(jù)返回值就是分區(qū)號, 這邊就是固定發(fā)送到三號分區(qū)
return 3;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
2、producer配置文件指定,具體的分區(qū)類
// 具體的分區(qū)類
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
技巧:可以使用ProducerConfig中提供的配置ProducerConfig
kafka producer攔截器
攔截器(interceptor)是在Kafka 0.10版本被引入的。
interceptor使得用戶在消息發(fā)送前以及producer回調(diào)邏輯前有機會對消息做一些定制化需求,比如修改消息等。
許用戶指定多個interceptor按序作用于同一條消息從而形成一個攔截鏈(interceptor chain)。
所使用的類為:
org.apache.kafka.clients.producer.ProducerInterceptor
我們可以編碼測試下:
1、定義消息攔截器,實現(xiàn)消息處理(可以是加時間戳等等,unid等等。)
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
import java.util.UUID;
public class MessageInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
System.out.println("這是MessageInterceptor的configure方法");
}
/**
* 這個是消息發(fā)送之前進行處理
*
* @param record
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// 創(chuàng)建一個新的record,把uuid入消息體的最前部
System.out.println("為消息添加uuid");
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
UUID.randomUUID().toString().replace("-", "") + "," + record.value());
}
/**
* 這個是生產(chǎn)者回調(diào)函數(shù)調(diào)用之前處理
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("MessageInterceptor攔截器的onAcknowledgement方法");
}
@Override
public void close() {
System.out.println("MessageInterceptor close 方法");
}
}
2、定義計數(shù)攔截器
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CounterInterceptor implements ProducerInterceptor<String, String>{
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
System.out.println("這是CounterInterceptor的configure方法");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
System.out.println("CounterInterceptor計數(shù)過濾器不對消息做任何操作");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
// 統(tǒng)計成功和失敗的次數(shù)
System.out.println("CounterInterceptor過濾器執(zhí)行統(tǒng)計失敗和成功數(shù)量");
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
// 保存結(jié)果
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
3、producer客戶端:
import org.apache.kafka.clients.producer.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Producer1 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// Kafka服務(wù)端的主機名和端口號
props.put("bootstrap.servers", "localhost:9092");
// 等待所有副本節(jié)點的應(yīng)答
props.put("acks", "all");
// 消息發(fā)送最大嘗試次數(shù)
props.put("retries", 0);
// 一批消息處理大小
props.put("batch.size", 16384);
// 請求延時,可能生產(chǎn)數(shù)據(jù)太快了
props.put("linger.ms", 1);
// 發(fā)送緩存區(qū)內(nèi)存大小,數(shù)據(jù)是先放到生產(chǎn)者的緩沖區(qū)
props.put("buffer.memory", 33554432);
// key序列化
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value序列化
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 具體的分區(qū)類
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
//定義攔截器
List<String> interceptors = new ArrayList<>();
interceptors.add("kafka.MessageInterceptor");
interceptors.add("kafka.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1; i++) {
producer.send(new ProducerRecord<String, String>("test_0515", i + "", "xxx-" + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println("這是producer回調(diào)函數(shù)");
}
});
}
/*System.out.println("現(xiàn)在執(zhí)行關(guān)閉producer");
producer.close();*/
producer.close();
}
}
總結(jié),我們可以知道攔截器鏈各個方法的執(zhí)行順序,假如有A、B攔截器,在一個攔截器鏈中:
(1)執(zhí)行A的configure方法,執(zhí)行B的configure方法
(2)執(zhí)行A的onSend方法,B的onSend方法
(3)生產(chǎn)者發(fā)送完畢后,執(zhí)行A的onAcknowledgement方法,B的onAcknowledgement方法。
(4)執(zhí)行producer自身的callback回調(diào)函數(shù)。
(5)執(zhí)行A的close方法,B的close方法。
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot + 微信公眾號JSAPI支付功能的實現(xiàn)
這篇文章主要介紹了SpringBoot + 微信公眾號JSAPI支付功能的實現(xiàn),本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下2021-03-03
Java?ConcurrentHashMap實現(xiàn)線程安全的代碼示例
眾所周知ConcurrentHashMap是HashMap的多線程版本,HashMap?在并發(fā)操作時會有各種問題,而這些問題,只要使用ConcurrentHashMap就可以完美解決了,本文將給詳細介紹ConcurrentHashMap是如何保證線程安全的2023-05-05
Maven+Tomcat8 實現(xiàn)自動化部署的方法
本篇文章主要介紹了Maven+Tomcat8 實現(xiàn)自動化部署的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-10-10

