詳解Flink同步Kafka數(shù)據(jù)到ClickHouse分布式表
引言
業(yè)務(wù)需要一種OLAP引擎,可以做到實時寫入存儲和查詢計算功能,提供高效、穩(wěn)健的實時數(shù)據(jù)服務(wù),最終決定ClickHouse
什么是ClickHouse?
ClickHouse是一個用于聯(lián)機分析(OLAP)的列式數(shù)據(jù)庫管理系統(tǒng)(DBMS)。
列式數(shù)據(jù)庫更適合于OLAP場景(對于大多數(shù)查詢而言,處理速度至少提高了100倍),下面詳細解釋了原因(通過圖片更有利于直觀理解),圖片來源于ClickHouse中文官方文檔。
行式

列式

我們使用Flink編寫程序,消費kafka里面的主題數(shù)據(jù),清洗、歸一,寫入到clickhouse里面去。
這里的關(guān)鍵點,由于第一次使用,無法分清應(yīng)該建立什么格式的clickhouse表,出現(xiàn)了一些問題,最大的問題就是程序?qū)?shù)據(jù)寫入了,查詢發(fā)現(xiàn)數(shù)據(jù)不完整,只有一部分。我也在網(wǎng)上查了一些原因,總結(jié)下來。
為什么有時看不到已經(jīng)創(chuàng)建好的表并且查詢結(jié)果一直抖動時多時少?
常見原因1:
建表流程存在問題。ClickHouse的分布式集群搭建并沒有原生的分布式DDL語義。如果您在自建ClickHouse集群時使用create table創(chuàng)建表,查詢雖然返回了成功,但實際這個表只在當前連接的Server上創(chuàng)建了。下次連接重置換一個Server,您就看不到這個表了。
解決方案:
建表時,請使用create table <table_name> on cluster default語句,on cluster default聲明會把這條語句廣播給default集群的所有節(jié)點進行執(zhí)行。示例代碼如下。 Create table test on cluster default (a UInt64) Engine = MergeTree() order by tuple(); 在test表上再創(chuàng)建一個分布式表引擎,建表語句如下。 Create table test_dis on cluster default as test Engine = Distributed(default, default, test, cityHash64(a));
常見原因2:
ReplicatedMergeTree存儲表配置有問題。ReplicatedMergeTree表引擎是對應(yīng)MergeTree表引擎的主備同步增強版,在單副本實例上限定只能創(chuàng)建MergeTree表引擎,在雙副本實例上只能創(chuàng)建ReplicatedMergeTree表引擎。
解決方案:
在雙副本實例上建表時,請使用ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)或ReplicatedMergeTree()配置ReplicatedMergeTree表引擎。其中,ReplicatedMergeTree(‘/clickhouse/tables/{database}/{table}/{shard}’, ‘{replica}’)為固定配置,無需修改。
這里引出了復(fù)制表的概念,這里介紹一下,只有 MergeTree 系列里的表可支持副本:
ReplicatedMergeTree
ReplicatedSummingMergeTree
ReplicatedReplacingMergeTree
ReplicatedAggregatingMergeTree ReplicatedCollapsingMergeTree
ReplicatedVersionedCollapsingMergeTree
ReplicatedGraphiteMergeTree
副本是表級別的,不是整個服務(wù)器級的。所以,服務(wù)器里可以同時有復(fù)制表和非復(fù)制表。副本不依賴分片。每個分片有它自己的獨立副本。
創(chuàng)建復(fù)制表
先做好準備工作,該建表的建表,然后編寫程序。在表引擎名稱上加上 Replicated 前綴。例如:ReplicatedMergeTree。
- 首先創(chuàng)建一個分布式數(shù)據(jù)庫
create database test on cluster default_cluster;
- 創(chuàng)建本地表
由于clickhouse是分布式的,創(chuàng)建本地表本來應(yīng)該在每個節(jié)點上創(chuàng)建的,但是指定on cluster關(guān)鍵字可以直接完成,建表語句如下:
CREATE TABLE test.test_data_shade on cluster default_cluster
(
`data` Map(String, String),
`uid` String,
`remote_addr` String,
`time` Datetime64,
`status` Int32,
...其它字段省略
`dt` String
)
ENGINE = ReplicatedMergeTree()
partition by dt
order by (dt, sipHash64(uid));
這里表引擎為ReplicatedMergeTree,即有副本的表,根據(jù)dt按天分區(qū),提升查詢效率,sipHash64是一個hash函數(shù),根據(jù)uid散列使得相同uid數(shù)據(jù)在同一個分片上面,如果有去重需求,速度更快,因為可以計算每個分片去重,再匯總一下即可。
- 創(chuàng)建分布式表
CREATE TABLE test.test_data_all on cluster default_cluster as test.test_data_shade ENGINE = Distributed('default_cluster', 'test', 'test_data_shade', sipHash64(uid));
在多副本分布式 ClickHouse 集群中,通常需要使用 Distributed 表寫入或讀取數(shù)據(jù),Distributed 表引擎自身不存儲任何數(shù)據(jù),它能夠作為分布式表的一層透明代理,在集群內(nèi)部自動開展數(shù)據(jù)的寫入、分發(fā)、查詢、路由等工作。
通過jdbc寫入
這個我是看的官方文檔,里面有2種選擇,感興趣的同學(xué)可以都去嘗試一下。

這里貼一下我的Pom依賴
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.1-patch</version>
<classifier>shaded</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
Flink主程序,消費kafka,做清洗,然后寫入clickhouse,這都是常規(guī)操作,這里貼一下關(guān)鍵代碼吧。

連接clickhouse有2種方式,8123端口的http方式,和基于9000端口的tcp方式。
這里官方推薦的是連接驅(qū)動是0.3.2:
<dependency>
<!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2-patch11</version>
<classifier>all</classifier>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
Note: ru.yandex.clickhouse.ClickHouseDriver has been deprecated and everything under ru.yandex.clickhouse will be removed in 0.3.3.

官方推薦升級到0.3.2,上面表格給出了升級方法,文檔地址:
以上就是詳解Flink同步Kafka數(shù)據(jù)到ClickHouse分布式表的詳細內(nèi)容,更多關(guān)于Flink數(shù)據(jù)同步Kafka ClickHouse的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
淺談一下數(shù)據(jù)庫系統(tǒng)的發(fā)展與組成
這篇文章主要介紹了淺談一下數(shù)據(jù)庫系統(tǒng)的發(fā)展與組成,數(shù)據(jù)庫系統(tǒng),指在計算機系統(tǒng)中引入數(shù)據(jù)庫后的系統(tǒng),一般由數(shù)據(jù)庫、數(shù)據(jù)庫管理系統(tǒng)、應(yīng)用系統(tǒng)、數(shù)據(jù)庫管理員(DBA)構(gòu)成,本文就數(shù)據(jù)庫的發(fā)展展開詳細講解2023-07-07
navicat導(dǎo)入excel文件的步驟以及可能碰到的問題
本文介紹將excel導(dǎo)入到mysql數(shù)據(jù)庫的方法,相對來說比較簡單,但也可能會碰到一些小問題,在這里做一個小的總結(jié),這里使用到的工具包括navicat,mysql數(shù)據(jù)庫以及excel,需要的朋友可以參考下2024-07-07
access mysql mssql 隨機 10條數(shù)據(jù)的sql語句
好多情況下,大家需要隨機抽取幾個數(shù)據(jù),當然數(shù)據(jù)是從數(shù)據(jù)庫來的,所以腳本之家特為大家準備了一些。2009-05-05
SQL注入篇學(xué)習(xí)之盲注/寬字節(jié)注入
盲注是注入的一種,指的是在不知道數(shù)據(jù)庫返回值的情況下對數(shù)據(jù)中的內(nèi)容進行猜測,實施SQL注入,下面這篇文章主要給大家介紹了關(guān)于SQL注入篇之盲注/寬字節(jié)注入的相關(guān)資料,需要的朋友可以參考下2022-03-03
梧桐數(shù)據(jù)庫與`mysql`及`oracle`關(guān)于交換服務(wù)器編號的`SQL`寫法分析(推薦)
本文介紹了如何通過SQL查詢實現(xiàn)服務(wù)器編號的交換操作,以優(yōu)化數(shù)據(jù)中心內(nèi)部服務(wù)器的布局,文章說明了不同數(shù)據(jù)庫(如梧桐數(shù)據(jù)庫、MySQL和Oracle)的建表語句、數(shù)據(jù)插入以及SQL實現(xiàn)思路,通過具體的SQL查詢,文章展示了如何在不同數(shù)據(jù)庫中交換服務(wù)器編號,并解釋了每個部分的功能2024-11-11

