Spring Boot集群管理工具KafkaAdminClient使用方法解析
原理介紹
在Kafka官網(wǎng)中這么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準):
- 創(chuàng)建Topic:createTopics(Collection<NewTopic> newTopics)
- 刪除Topic:deleteTopics(Collection<String> topics)
- 羅列所有Topic:listTopics()
- 查詢Topic:describeTopics(Collection<String> topicNames)
- 查詢集群信息:describeCluster()
- 查詢ACL信息:describeAcls(AclBindingFilter filter)
- 創(chuàng)建ACL信息:createAcls(Collection<AclBinding> acls)
- 刪除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
- 查詢配置信息:describeConfigs(Collection<ConfigResource> resources)
- 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
- 修改副本的日志目錄:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
- 查詢節(jié)點的日志目錄信息:describeLogDirs(Collection<Integer> brokers)
- 查詢副本的日志目錄信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
- 增加分區(qū):createPartitions(Map<String, NewPartitions> newPartitions)
其內部原理是使用Kafka自定義的一套二進制協(xié)議來實現(xiàn),詳細可以參見Kafka協(xié)議。主要實現(xiàn)步驟:
客戶端根據(jù)方法的調用創(chuàng)建相應的協(xié)議請求,比如創(chuàng)建Topic的createTopics方法,其內部就是發(fā)送CreateTopicRequest請求。
客戶端發(fā)送請求至Kafka Broker。
Kafka Broker處理相應的請求并回執(zhí),比如與CreateTopicRequest對應的是CreateTopicResponse。
客戶端接收相應的回執(zhí)并進行解析處理。
和協(xié)議有關的請求和回執(zhí)的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執(zhí)類的兩個基本父類。
代碼如下
@Component
public class KafkaConfig{
// 配置Kafka
public Properties getProps(){
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
/* props.put("retries", 2); // 重試次數(shù)
props.put("batch.size", 16384); // 批量發(fā)送大小
props.put("buffer.memory", 33554432); // 緩存大小,根據(jù)本機內存大小配置
props.put("linger.ms", 1000); // 發(fā)送頻率,滿足任務一個條件發(fā)送*/
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
}
@RestController
public class KafkaTopicManager {
@Autowired
private KafkaConfig kafkaConfig;
@GetMapping("createTopic")
public void createTopic(){
AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
NewTopic newTopic = new NewTopic("test1",4, (short) 1);
Collection<NewTopic> newTopicList = new ArrayList<>();
newTopicList.add(newTopic);
adminClient.createTopics(newTopicList);
adminClient.close();
}
@GetMapping("deleteTopic")
public void deleteTopic(){
AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
adminClient.deleteTopics(Arrays.asList("test1"));
adminClient.close();
}
@GetMapping("listAllTopic")
public void listAllTopic(){
AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
ListTopicsResult result = adminClient.listTopics();
KafkaFuture<Set<String>> names = result.names();
try {
names.get().forEach((k)->{
System.out.println(k);
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
adminClient.close();
}
@GetMapping("getTopic")
public void getTopic(){
AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps());
DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test"));
Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values();
if(values.isEmpty()){
System.out.println("找不到描述信息");
}else{
for (KafkaFuture<TopicDescription> value : values) {
System.out.println(value);
}
}
adminClient.close();
}
}
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
- Spring?Boot整合Kafka教程詳解
- Spring?Boot?中使用@KafkaListener并發(fā)批量接收消息的完整代碼
- 基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
- SpringBoot如何獲取Kafka的Topic列表
- SpringBoot整合kafka遇到的版本不對應問題及解決
- SpringBoot+Nacos+Kafka微服務流編排的簡單實現(xiàn)
- SpringBoot集成Kafka的步驟
- Springboot集成Kafka實現(xiàn)producer和consumer的示例代碼
- Spring?Boot?基于?SCRAM?認證集成?Kafka?的過程詳解
相關文章
java使用RSA加密方式實現(xiàn)數(shù)據(jù)加密解密的代碼
這篇文章給大家分享java使用RSA加密方式實現(xiàn)數(shù)據(jù)加密解密,通過實例代碼文字相結合給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友參考下2019-11-11
JAVA使用ElasticSearch查詢in和not in的實現(xiàn)方式
今天小編就為大家分享一篇關于JAVA使用Elasticsearch查詢in和not in的實現(xiàn)方式,小編覺得內容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧2018-12-12
MybatisPlus查詢條件為空字符串或null問題及解決
這篇文章主要介紹了MybatisPlus查詢條件為空字符串或null問題及解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2022-06-06
ShardingSphere數(shù)據(jù)分片算法及測試實戰(zhàn)
這篇文章主要為大家介紹了ShardingSphere數(shù)據(jù)分片算法及測試實戰(zhàn)示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-03-03
SpringCloud Hystrix-Dashboard儀表盤的實現(xiàn)
這篇文章主要介紹了SpringCloud Hystrix-Dashboard儀表盤的實現(xiàn),文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,需要的朋友們下面隨著小編來一起學習學習吧2019-08-08

