Apache SeaTunnel 將 CDC 數(shù)據(jù)流轉(zhuǎn)換為 Append-Only 模式的詳細(xì)過程

RowKindExtractor 是 Apache SeaTunnel 的一個轉(zhuǎn)換插件,它能將 CDC 數(shù)據(jù)流轉(zhuǎn)為 Append-Only 模式,并提取原始 RowKind 信息為新字段。本文將介紹 RowKindExtractor 的核心功能,其在 CDC 數(shù)據(jù)同步場景下的使用方法,以及配置選項、注意事項及多種應(yīng)用示例。
RowKindExtractor
RowKindExtractor 轉(zhuǎn)換插件用于將 CDC(Change Data Capture)數(shù)據(jù)流轉(zhuǎn)換為 Append-Only(僅追加)模式,同時將原始的 RowKind 信息提取為一個新的字段。
核心功能:
- 將所有數(shù)據(jù)行的 RowKind 統(tǒng)一改為 +I(INSERT),實現(xiàn) Append-Only 模式
- 將原始的 RowKind 信息(INSERT、UPDATE_BEFORE、UPDATE_AFTER、DELETE)保存到新增的字段中
- 支持短格式和完整格式兩種輸出方式
為什么需要這個插件?
在 CDC 數(shù)據(jù)同步場景中,數(shù)據(jù)行帶有 RowKind 標(biāo)記(+I、-U、+U、-D),表示不同的變更類型。但某些下游系統(tǒng)(如數(shù)據(jù)湖、分析系統(tǒng))只支持 Append-Only 模式,不支持 UPDATE 和 DELETE 操作。此時需要:
- 將所有數(shù)據(jù)轉(zhuǎn)換為 INSERT 類型(Append-Only)
- 將原始的變更類型保存為普通字段,供后續(xù)分析使用
轉(zhuǎn)換示例:
輸入(CDC 數(shù)據(jù)): RowKind: -D (DELETE) 數(shù)據(jù): id=1, name="test1", age=20 輸出(Append-Only 數(shù)據(jù)): RowKind: +I (INSERT) 數(shù)據(jù): id=1, name="test1", age=20, row_kind="DELETE"
典型應(yīng)用場景:
- 將 CDC 數(shù)據(jù)寫入只支持 Append 的數(shù)據(jù)湖
- 需要在數(shù)據(jù)倉庫中保留完整的變更歷史記錄
- 需要對不同類型的變更進(jìn)行統(tǒng)計分析
配置選項

custom_field_name [string]
指定新增字段的名稱,該字段用于存儲原始的 RowKind 信息。
默認(rèn)值:row_kind
注意事項:
- 字段名不能與原有字段重名,否則會報錯
- 建議使用有意義的名稱,如 operation_type、change_type、cdc_op 等
示例:
custom_field_name = "operation_type" # 使用自定義字段名
transform_type [enum]
指定 RowKind 字段值的輸出格式。
可選值:

默認(rèn)值:SHORT
各值含義:

選擇建議:
- SHORT 格式:節(jié)省存儲空間,適合對存儲敏感的場景
- FULL 格式:可讀性更好,適合需要人工查看或分析的場景
示例:
transform_type = FULL # 使用完整格式
完整示例
- 示例 1:使用默認(rèn)配置(SHORT 格式)
使用默認(rèn)配置,將 CDC 數(shù)據(jù)轉(zhuǎn)換為 Append-Only 模式,RowKind 以短格式保存。
env {
parallelism = 1
job.mode = "STREAMING"
}
source {
MySQL-CDC {
plugin_output = "cdc_source"
server-id = 5652
username = "root"
password = "your_password"
table-names = ["mydb.users"]
url = "jdbc:mysql://localhost:3306/mydb"
}
}
transform {
RowKindExtractor {
plugin_input = "cdc_source"
plugin_output = "append_only_data"
# 使用默認(rèn)配置:
# custom_field_name = "row_kind"
# transform_type = SHORT
}
}
sink {
Console {
plugin_input = "append_only_data"
}
}數(shù)據(jù)轉(zhuǎn)換過程:
輸入數(shù)據(jù)(CDC 格式): 1. RowKind=+I, id=1, name="張三", age=25 2. RowKind=-U, id=1, name="張三", age=25 3. RowKind=+U, id=1, name="張三", age=26 4. RowKind=-D, id=1, name="張三", age=26 輸出數(shù)據(jù)(Append-Only 格式): 1. RowKind=+I, id=1, name="張三", age=25, row_kind="+I" 2. RowKind=+I, id=1, name="張三", age=25, row_kind="-U" 3. RowKind=+I, id=1, name="張三", age=26, row_kind="+U" 4. RowKind=+I, id=1, name="張三", age=26, row_kind="-D"
- 示例 2:使用 FULL 格式和自定義字段名
使用完整格式輸出 RowKind,并自定義字段名稱。
env {
parallelism = 1
job.mode = "STREAMING"
}
source {
MySQL-CDC {
plugin_output = "cdc_source"
server-id = 5652
username = "root"
password = "your_password"
table-names = ["mydb.orders"]
url = "jdbc:mysql://localhost:3306/mydb"
}
}
transform {
RowKindExtractor {
plugin_input = "cdc_source"
plugin_output = "append_only_data"
custom_field_name = "operation_type" # 自定義字段名
transform_type = FULL # 使用完整格式
}
}
sink {
Iceberg {
plugin_input = "append_only_data"
catalog_name = "iceberg_catalog"
database = "mydb"
table = "orders_history"
# Iceberg 表會包含 operation_type 字段,記錄每條數(shù)據(jù)的變更類型
}
}數(shù)據(jù)轉(zhuǎn)換過程: 輸入數(shù)據(jù)(CDC 格式): 1. RowKind=+I, order_id=1001, amount=100.00 2. RowKind=-U, order_id=1001, amount=100.00 3. RowKind=+U, order_id=1001, amount=150.00 4. RowKind=-D, order_id=1001, amount=150.00 輸出數(shù)據(jù)(Append-Only 格式,F(xiàn)ULL 格式): 1. RowKind=+I, order_id=1001, amount=100.00, operation_type="INSERT" 2. RowKind=+I, order_id=1001, amount=100.00, operation_type="UPDATE_BEFORE" 3. RowKind=+I, order_id=1001, amount=150.00, operation_type="UPDATE_AFTER" 4. RowKind=+I, order_id=1001, amount=150.00, operation_type="DELETE"
- 示例 3:完整的測試示例(使用 FakeSource)
使用 FakeSource 生成測試數(shù)據(jù),演示各種 RowKind 的轉(zhuǎn)換效果。
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
plugin_output = "fake_cdc_data"
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "A_updated", 95]
},
{
kind = UPDATE_BEFORE
fields = [2, "B", 100]
},
{
kind = UPDATE_AFTER
fields = [2, "B_updated", 98]
},
{
kind = DELETE
fields = [1, "A_updated", 95]
}
]
}
}
transform {
RowKindExtractor {
plugin_input = "fake_cdc_data"
plugin_output = "transformed_data"
custom_field_name = "change_type"
transform_type = FULL
}
}
sink {
Console {
plugin_input = "transformed_data"
}
}預(yù)期輸出:
+I, pk_id=1, name="A", score=100, change_type="INSERT"
+I, pk_id=2, name="B", score=100, change_type="INSERT"
+I, pk_id=1, name="A", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=1, name="A_updated", score=95, change_type="UPDATE_AFTER"
+I, pk_id=2, name="B", score=100, change_type="UPDATE_BEFORE"
+I, pk_id=2, name="B_updated", score=98, change_type="UPDATE_AFTER"
+I, pk_id=1, name="A_updated", score=95, change_type="DELETE"
到此這篇關(guān)于Apache SeaTunnel 將 CDC 數(shù)據(jù)流轉(zhuǎn)換為 Append-Only 模式的詳細(xì)過程的文章就介紹到這了,更多相關(guān)Apache SeaTunnel Append-Only 模式內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
重啟Linux服務(wù)器后數(shù)據(jù)消失問題的解決方法(重新掛載)
在使用 reboot 命令重啟服務(wù)器后,服務(wù)器內(nèi)掛載的文件全部丟失,那應(yīng)該如何重新掛載呢?所以本文小編給大家介紹了重啟Linux服務(wù)器后數(shù)據(jù)消失問題的解決方法,并通過圖文講解的非常詳細(xì),需要的朋友可以參考下2024-09-09
讓Apache 2支持.htaccess并實現(xiàn)目錄加密的方法
這篇文章主要介紹了讓Apache 2支持.htaccess并實現(xiàn)目錄加密的方法,文中給出了詳細(xì)的方法步驟,并給出了示例代碼,對大家具有一定的參考價值,需要的朋友們下面來一起看看吧。2017-02-02

