Spring?Boot整合阿里開源中間件Canal實現(xiàn)數(shù)據(jù)增量同步
前言
數(shù)據(jù)同步一直是一個令人頭疼的問題。在業(yè)務量小,場景不多,數(shù)據(jù)量不大的情況下我們可能會選擇在項目中直接寫一些定時任務手動處理數(shù)據(jù),例如從多個表將數(shù)據(jù)查出來,再匯總處理,再插入到相應的地方。
但是隨著業(yè)務量增大,數(shù)據(jù)量變多以及各種復雜場景下的分庫分表的實現(xiàn),使數(shù)據(jù)同步變得越來越困難。
今天這篇文章使用阿里開源的中間件Canal解決數(shù)據(jù)增量同步的痛點。
文章目錄如下:

Canal是什么?
canal譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費。
從這句話理解到了什么?
基于MySQL,并且通過MySQL日志進行的增量解析,這也就意味著對原有的業(yè)務代碼完全是無侵入性的。
工作原理:解析MySQL的binlog日志,提供增量數(shù)據(jù)。
基于日志增量訂閱和消費的業(yè)務包括
- 數(shù)據(jù)庫鏡像
- 數(shù)據(jù)庫實時備份
- 索引構建和實時維護(拆分異構索引、倒排索引等)
- 業(yè)務 cache 刷新
- 帶業(yè)務邏輯的增量數(shù)據(jù)處理
當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
Canal數(shù)據(jù)如何傳輸?
先來一張官方圖:

Canal分為服務端和客戶端,這也是阿里常用的套路,比如前面講到的注冊中心Nacos:
- 服務端:負責解析MySQL的binlog日志,傳遞增量數(shù)據(jù)給客戶端或者消息中間件
- 客戶端:負責解析服務端傳過來的數(shù)據(jù),然后定制自己的業(yè)務處理。
目前為止支持的消息中間件很全面了,比如Kafka、RocketMQ,RabbitMQ。
數(shù)據(jù)同步還有其他中間件嗎?
有,當然有,還有一些開源的中間件也是相當不錯的,比如Bifrost。
常見的幾款中間件的區(qū)別如下:

當然要我選擇的話,首選阿里的中間件Canal。
Canal服務端安裝
服務端需要下載壓縮包,下載地址:github.com/alibaba/can…
目前最新的是v1.1.5,點擊下載:

下載完成解壓,目錄如下:

本文使用Canal+RabbitMQ進行數(shù)據(jù)的同步,因此下面步驟完全按照這個base進行。
1、打開MySQL的binlog日志
修改MySQL的日志文件,my.cnf 配置如下:
[mysqld] log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
2、設置MySQL的配置
需要設置服務端配置文件中的MySQL配置,這樣Canal才能知道需要監(jiān)聽哪個庫、哪個表的日志文件。
一個 Server 可以配置多個實例監(jiān)聽 ,Canal 功能默認自帶的有個 example 實例,本篇就用 example 實例 。如果增加實例,復制 example 文件夾內(nèi)容到同級目錄下,然后在 canal.properties 指定添加實例的名稱。
修改canal.deployer-1.1.5\conf\example\instance.properties配置文件
# url canal.instance.master.address=127.0.0.1:3306 # username/password canal.instance.dbUsername=root canal.instance.dbPassword=root # 監(jiān)聽的數(shù)據(jù)庫 canal.instance.defaultDatabaseName=test # 監(jiān)聽的表,可以指定,多個用逗號分割,這里正則是監(jiān)聽所有 canal.instance.filter.regex=.*\\..*
3、設置RabbitMQ的配置
服務端默認的傳輸方式是tcp,需要在配置文件中設置MQ的相關信息。
這里需要修改兩處配置文件,如下;
1、canal.deployer-1.1.5\conf\canal.properties
這個配置文件主要是設置MQ相關的配置,比如URL,用戶名、密碼...
# 傳輸方式:tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = rabbitMQ ################################################## ######### RabbitMQ ############# ################################################## rabbitmq.host = 127.0.0.1 rabbitmq.virtual.host =/ # exchange rabbitmq.exchange =canal.exchange # 用戶名、密碼 rabbitmq.username =guest rabbitmq.password =guest ## 是否持久化 rabbitmq.deliveryMode = 2
2、canal.deployer-1.1.5\conf\example\instance.properties
這個文件設置MQ的路由KEY,這樣才能路由到指定的隊列中,如下:
canal.mq.topic=canal.routing.key
4、RabbitMQ新建exchange和Queue
在RabbitMQ中需要新建一個canal.exchange(必須和配置中的相同)的exchange和一個名稱為 canal.queue(名稱隨意)的隊列。
其中綁定的路由KEY為:canal.routing.key(必須和配置中的相同),如下圖:

5、啟動服務端
點擊bin目錄下的腳本,windows直接雙擊startup.bat,啟動成功如下:

6、測試
在本地數(shù)據(jù)庫test中的oauth_client_details插入一條數(shù)據(jù),如下:
INSERT INTO `oauth_client_details` VALUES ('myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false');
此時查看MQ中的canal.queue已經(jīng)有了數(shù)據(jù),如下:

其實就是一串JSON數(shù)據(jù),這個JSON如下:
{
"data": [{
"client_id": "myjszl",
"resource_ids": "res1",
"client_secret": "$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W",
"scope": "all",
"authorized_grant_types": "password,refresh_token,authorization_code,client_credentials,implicit",
"web_server_redirect_uri": "http://www.baidu.com",
"authorities": null,
"access_token_validity": "1000",
"refresh_token_validity": "1000",
"additional_information": null,
"autoapprove": "false"
}],
"database": "test",
"es": 1640337532000,
"id": 7,
"isDdl": false,
"mysqlType": {
"client_id": "varchar(48)",
"resource_ids": "varchar(256)",
"client_secret": "varchar(256)",
"scope": "varchar(256)",
"authorized_grant_types": "varchar(256)",
"web_server_redirect_uri": "varchar(256)",
"authorities": "varchar(256)",
"access_token_validity": "int(11)",
"refresh_token_validity": "int(11)",
"additional_information": "varchar(4096)",
"autoapprove": "varchar(256)"
},
"old": null,
"pkNames": ["client_id"],
"sql": "",
"sqlType": {
"client_id": 12,
"resource_ids": 12,
"client_secret": 12,
"scope": 12,
"authorized_grant_types": 12,
"web_server_redirect_uri": 12,
"authorities": 12,
"access_token_validity": 4,
"refresh_token_validity": 4,
"additional_information": 12,
"autoapprove": 12
},
"table": "oauth_client_details",
"ts": 1640337532520,
"type": "INSERT"
}
每個字段的意思已經(jīng)很清楚了,有表名稱、方法、參數(shù)、參數(shù)類型、參數(shù)值.....
客戶端要做的就是監(jiān)聽MQ獲取JSON數(shù)據(jù),然后將其解析出來,處理自己的業(yè)務邏輯。
Canal客戶端搭建
客戶端很簡單實現(xiàn),要做的就是消費Canal服務端傳遞過來的消息,監(jiān)聽canal.queue這個隊列。
1、創(chuàng)建消息實體類
MQ傳遞過來的是JSON數(shù)據(jù),當然要創(chuàng)建個實體類接收數(shù)據(jù),如下:
/**
* @author 公號 碼猿技術專欄
* Canal消息接收實體類
*/
@NoArgsConstructor
@Data
public class CanalMessage<T> {
@JsonProperty("type")
private String type;
@JsonProperty("table")
private String table;
@JsonProperty("data")
private List<T> data;
@JsonProperty("database")
private String database;
@JsonProperty("es")
private Long es;
@JsonProperty("id")
private Integer id;
@JsonProperty("isDdl")
private Boolean isDdl;
@JsonProperty("old")
private List<T> old;
@JsonProperty("pkNames")
private List<String> pkNames;
@JsonProperty("sql")
private String sql;
@JsonProperty("ts")
private Long ts;
}
2、MQ消息監(jiān)聽業(yè)務
接下來就是監(jiān)聽隊列,一旦有Canal服務端有數(shù)據(jù)推送能夠及時的消費。
代碼很簡單,只是給出個接收的案例,具體的業(yè)務邏輯可以根據(jù)業(yè)務實現(xiàn),如下:
import cn.hutool.json.JSONUtil;
import cn.myjszl.middle.ware.canal.mq.rabbit.model.CanalMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 監(jiān)聽MQ獲取Canal增量的數(shù)據(jù)消息
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalRabbitMQListener {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "canal.queue", durable = "true"),
exchange = @Exchange(value = "canal.exchange"),
key = "canal.routing.key"
)
})
public void handleDataChange(String message) {
//將message轉(zhuǎn)換為CanalMessage
CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
String tableName = canalMessage.getTable();
log.info("Canal 監(jiān)聽 {} 發(fā)生變化;明細:{}", tableName, message);
//TODO 業(yè)務邏輯自己完善...............
}
}
3、測試
下面向表中插入數(shù)據(jù),看下接收的消息是什么樣的,SQL如下:
INSERT INTO `oauth_client_details` VALUES ( 'myjszl', 'res1', '$2a$10$F1tQdeb0SEMdtjlO8X/0wO6Gqybu6vPC/Xg8OmP9/TL1i4beXdK9W', 'all', 'password,refresh_token,authorization_code,client_credentials,implicit', 'http://www.baidu.com', NULL, 1000, 1000, NULL, 'false' );
客戶端轉(zhuǎn)換后的消息如下圖:

上圖可以看出所有的數(shù)據(jù)都已經(jīng)成功接收到,只需要根據(jù)數(shù)據(jù)完善自己的業(yè)務邏輯即可。
源碼地址:https://github.com/chenjiabing666/JavaFamily/tree/master/spring-security/middleware/canal-mq-boot
總結
數(shù)據(jù)增量同步的開源工具并不只有Canal一種,根據(jù)自己的業(yè)務需要選擇合適的組件。
以上就是Spring Boot整合阿里開源中間件Canal實現(xiàn)數(shù)據(jù)增量同步的詳細內(nèi)容,更多關于Spring Boot整合Canal數(shù)據(jù)同步的資料請關注腳本之家其它相關文章!
相關文章
HttpServletRequest對象簡介_動力節(jié)點Java學院整理
這篇文章主要為大家詳細介紹了HttpServletRequest對象簡介的相關資料,具有一定的參考價值,感興趣的小伙伴們可以參考一下2017-07-07
如何使用SpringBoot進行優(yōu)雅的數(shù)據(jù)驗證
這篇文章主要介紹了如何使用SpringBoot進行優(yōu)雅的數(shù)據(jù)驗證,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-11-11
Java實現(xiàn)提取HTML文件中的文本內(nèi)容
從?HTML?文件中提取文本內(nèi)容是數(shù)據(jù)抓取中的一個常見任務,本文主要和大家分享了如何使用免費?Java?API?從HTML?文件中提取文本內(nèi)容,需要的可以參考下2024-04-04
macOS上使用gperftools定位Java內(nèi)存泄漏問題及解決方案
這篇文章主要介紹了macOS上使用gperftools定位Java內(nèi)存泄漏問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下2020-07-07
java底層JDK?Logging日志模塊處理細節(jié)深入分析
這篇文章主要為大家介紹了java底層JDK?Logging日志模塊處理細節(jié)深入分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-03-03
springMVC+velocity實現(xiàn)仿Datatables局部刷新分頁方法
下面小編就為大家分享一篇springMVC+velocity實現(xiàn)仿Datatables局部刷新分頁方法,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2018-02-02

