Java使用pulsar-flink-connector讀取pulsar catalog元數(shù)據(jù)代碼剖析
簡介
通過 pulsar-flink-connector 讀取到 Apache pulsar 中的namespaces、topics的元數(shù)據(jù)信息。
pulsar-flink-connector 的 github: https://github.com/streamnative/pulsar-flink
Maven
<dependency>
<groupId>io.streamnative.connectors</groupId>
<artifactId>pulsar-flink-connector-2.11-1.12</artifactId>
<version>2.7.3</version>
</dependency>
<!-- JAR repositories -->
<repositories>
<repository>
<id>central</id>
<layout>default</layout>
<url>https://repo1.maven.org/maven2</url>
</repository>
<repository>
<id>bintray-streamnative-maven</id>
<name>bintray</name>
<url>https://dl.bintray.com/streamnative/maven</url>
</repository>
</repositories>
CODE
使用PulsarMetadataReader獲取元數(shù)據(jù)
package com.levi.demo;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarMetadataReader;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.impl.auth.AuthenticationToken;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Test.
*
* @author levi
* @version 1.0
**/
public class Test {
public static void main(String[] args) {
final ClientConfigurationData configurationData = new ClientConfigurationData();
configurationData.setServiceUrl("pulsar://127.0.0.1:6650");
//Your Pulsar Token
final AuthenticationToken token =
new AuthenticationToken(
"eyJxxxxxxxxxxx.eyxxxxxxxxxxxxx.xxxxxxxxxxx");
configurationData.setAuthentication(token);
try (final PulsarMetadataReader reader =
new PulsarMetadataReader("http://127.0.0.1:8443",
configurationData,
"",
new HashMap(),
-1,
-1)) {
//獲取namespaces
final List<String> namespaces = reader.listNamespaces();
System.out.println("namespaces: " + namespaces.toString());
for (final String namespace : namespaces) {
//獲取Topics
final List<String> topics = reader.getTopics(namespace);
System.out.println("topic: " + topics.toString());
for (String topic : topics) {
//獲取字段SchemaInfo
final SchemaInfo schemaInfo = reader.getPulsarSchema(topic);
final String name = schemaInfo.getName();
System.out.println("SchemaName:" + name); //topicName
final SchemaType type = schemaInfo.getType();
System.out.println("SchemaType:" + type.toString());// "JSON"...
final Map<String, String> properties = schemaInfo.getProperties();
System.out.println(properties);
final String schemaDefinition = schemaInfo.getSchemaDefinition();
System.out.println(schemaDefinition); // Field info.
}
}
} catch (IOException | PulsarAdminException e) {
e.printStackTrace();
}
}
}
到此這篇關(guān)于Java使用pulsar-flink-connector讀取pulsar catalog元數(shù)據(jù)的文章就介紹到這了,更多相關(guān)Java讀取pulsar catalog元數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java的字符讀寫類CharArrayReader和CharArrayWriter使用示例
這篇文章主要介紹了Java的字符讀寫類CharArrayReader和CharArrayWriter使用示例,兩個類分別繼承于Reader和Writer,需要的朋友可以參考下2016-06-06
Mybatis中如何設(shè)置sqlSession自動提交
在MyBatis中,默認情況下,獲取的SqlSession對象不會自動提交事務(wù),這意味著在進行更新、刪除或插入等操作后,需要顯式調(diào)用commit方法來提交事務(wù),但是,可以在獲取SqlSession時通過將openSession方法的參數(shù)設(shè)置為true2024-09-09
SpringBoot中實現(xiàn)定時任務(wù)的幾種方式
定時任務(wù)在我們項目開發(fā)中也是很重要的,對于某些場景必須要用定時任務(wù)?,如定時發(fā)送郵件啊,定時統(tǒng)計數(shù)據(jù)等,這篇文章主要講講項目中實現(xiàn)定時任務(wù)的幾種方式,需要的朋友可以參考下2023-05-05
Java類之間的關(guān)系圖_動力節(jié)點Java學(xué)院整理
在Java以及其他的面向?qū)ο笤O(shè)計模式中,類與類之間主要有6種關(guān)系,他們分別是:依賴、關(guān)聯(lián)、聚合、組合、繼承、實現(xiàn)。他們的耦合度依次增強,有興趣的可以了解一下2017-08-08
maven項目編譯后沒有生成target/class文件問題
文章介紹了在Maven項目中,`target/classes`目錄用于存放編譯后的字節(jié)碼文件,如果在項目編譯啟動后卻沒有看到這個文件夾,可以通過取消勾選`javaoutputfolders`選項來解決,以便顯示編譯后的Java類文件2024-11-11
Java格式化小數(shù)并保留兩位小數(shù)的四種方法
Java中格式化小數(shù)并保留兩位小數(shù)的四種方法:使用DecimalFormat、String.format()、BigDecimal和NumberFormat,每種方法都有其適用場景和特點,文章通過代碼示例介紹的非常詳細,需要的朋友可以參考下2025-03-03

