SpringBoot整合Canal全過程
前言
canal 是阿里巴巴旗下的一款開源項目,純Java開發(fā)。
基于數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。

canal [k?’næl],譯意為水道/管道/溝渠,主要用途是基于 MySQL 數(shù)據(jù)庫增量日志解析,提供增量數(shù)據(jù)訂閱和消費。
基于日志增量訂閱和消費的業(yè)務包括
- 數(shù)據(jù)庫鏡像
- 數(shù)據(jù)庫實時備份
- 索引構建和實時維護(拆分異構索引、倒排索引等)
- 業(yè)務 cache 刷新
- 帶業(yè)務邏輯的增量數(shù)據(jù)處理
當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
Canal工作原理

Canal工作原理
- canal 模擬 MySQL slave 的交互協(xié)議,偽裝自己為 MySQL slave ,向 MySQL master 發(fā)送 dump協(xié)議。
- MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )。
- canal 解析 binary log 對象(原始為 byte 流)。
Canal架構

server代表一個 canal 運行實例,對應于一個 jvminstance對應于一個數(shù)據(jù)隊列 (1個 canal server 對應 1…n 個 instance )
instance 下的子模塊:
eventParser:數(shù)據(jù)源接入,模擬 slave 協(xié)議和 master 進行交互,協(xié)議解析eventSink:Parser 和 Store 鏈接器,進行數(shù)據(jù)過濾,加工,分發(fā)的工作eventStore:數(shù)據(jù)存儲metaManager:增量訂閱 & 消費信息管理器
EventParser在向MySQL發(fā)送dump命令之前會先從Log Position中獲取上次解析成功的位置(如果是第一次啟動,則獲取初始指定位置或者當前數(shù)據(jù)段binlog位點)。
mysql接受到dump命令后,由EventParser從mysql上pull binlog數(shù)據(jù)進行解析并傳遞給EventSink(傳遞給EventSink模塊進行數(shù)據(jù)存儲,是一個阻塞操作,直到存儲成功 ),傳送成功之后更新Log Position。
- 流程圖如下:

EventSink起到一個類似channel的功能,可以對數(shù)據(jù)進行過濾、分發(fā)/路由(1:n)、歸并(n:1)和加工。EventSink是連接EventParser和EventStore的橋梁。EventStore實現(xiàn)模式是內存模式,內存結構為環(huán)形隊列,由三個指針(Put、Get和Ack)標識數(shù)據(jù)存儲和讀取的位置。MetaManager是增量訂閱&消費信息管理器,增量訂閱和消費之間的協(xié)議包括get/ack/rollback,分別為:
Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的對象為Message,包含的內容為:batch id[唯一標識]和entries[具體的數(shù)據(jù)對象] void rollback(long batchId),顧名思義,回滾上次的get請求,重新獲取數(shù)據(jù)。基于get獲取的batchId進行提交,避免誤操作 void ack(long batchId),顧名思議,確認已經(jīng)消費成功,通知server刪除數(shù)據(jù)。基于get獲取的batchId進行提交,避免誤操作
Canal環(huán)境搭建
準備
- 檢查binlog功能是否有開啟:show variables like ‘log_bin’,如果on則已開啟,顯示off則未開啟。
- 未開啟需要先開啟 Binlog 寫入功能,配置 binlog-format 為ROW 模式,my.cnf (win下為my.ini)中配置如下
log-bin=mysql-bin #binlog文件名 binlog_format=ROW #選擇row模式 server_id=1 #mysql實例id,不能和canal的slaveId重復
注意:針對阿里云 RDS for MySQL , 默認打開了 binlog , 并且賬號默認具有 binlog dump 權限 , 不需要任何權限或者 binlog 設置,可以直接跳過這一步
MySQL的binLog:
STATEMENT記錄的是執(zhí)行的sql語句ROW記錄的是真實的行數(shù)據(jù)記錄MIXED記錄的是1+2,優(yōu)先按照1的模式記錄
授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
啟動
1、下載 canal, 訪問 release 頁面 , 選擇需要的包下載, 如以 1.0.17 版本為例

canal-adapter(canal-client)
- 相當于canal的客戶端,會從canal-server中獲取數(shù)據(jù)(需要配置為tcp方式),然后對數(shù)據(jù)進行同步,可以同步到MySQL、Elasticsearch和HBase等存儲中去。
- 相較于canal-server自帶的canal.serverMode,canal-adapter提供的下游數(shù)據(jù)接受更為廣泛。
canal-admin
- 為canal提供整體配置管理、節(jié)點運維等面向運維的功能
- 提供相對友好的WebUI操作界面,方便更多用戶快速和安全的操作
canal-deployer(canal-server)
- 可以直接監(jiān)聽MySQL的binlog,把自己偽裝成MySQL的從庫,只負責接收數(shù)據(jù),并不做處理。
- 接收到MySQL的binlog數(shù)據(jù)后可以通過配置canal.serverMode:tcp, kafka, rocketMQ, rabbitMQ連接方式發(fā)送到對應的下游。
- 其中tcp方式可以自定義canal客戶端進行接受數(shù)據(jù),較為靈活。
2、配置修改,修改conf/example/instance.properties配置文件
################################################# ## mysql serverId , v1.0.26+ will autoGen # mysql 集群配置中的serverId概念,需要保證和當前mysql集群中id唯一 (v1.1.x版本之后canal會自動生成,不需要手工指定) canal.instance.mysql.slaveId=1212 # enable gtid use true/false # 是否啟用mysql gtid的訂閱模式 canal.instance.gtidon=false # position info # mysql 主庫鏈接地址 canal.instance.master.address=127.0.0.1:3306 # mysql 主庫鏈接時起始的binlog文件 canal.instance.master.journal.name= # mysql 主庫鏈接時起始的binlog偏移量 canal.instance.master.position= # mysql 主庫鏈接時起始的binlog的時間戳 canal.instance.master.timestamp= # mysql 主庫鏈接時對應的gtid位點 canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= # aliyun rds 對應的實例id信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值) canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password # mysql 數(shù)據(jù)庫帳號 canal.instance.dbUsername=canal # mysql 數(shù)據(jù)庫密碼 canal.instance.dbPassword=canal # mysql 數(shù)據(jù)解析編碼,代表數(shù)據(jù)庫的編碼方式對應到 java 中的編碼類型,比如 UTF-8,GBK , ISO-8859-1 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex # mysql 數(shù)據(jù)解析關注的表,Perl正則表達式,多個正則之間以逗號(,)分隔,轉義符需要雙斜杠(\\) # 注意:此過濾條件只針對row模式的數(shù)據(jù)有效(ps. mixed/statement因為不解析sql,所以無法準確提取tableName進行過濾) canal.instance.filter.regex=.*\\..* # table black regex # mysql 數(shù)據(jù)解析表的黑名單,表達式規(guī)則見白名單的規(guī)則 canal.instance.filter.black.regex=mysql\\.slave_.* # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=yang # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.enableDynamicQueuePartition=false #canal.mq.partitionsNum=3 #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6 #canal.mq.partitionHash=test.table:id^name,.*\\..* ################################################# #如果系統(tǒng)是1個 cpu,需要將 canal.instance.parser.parallel 設置為 false
常見的匹配規(guī)則:
- 所有表:.* or .\…
- canal schema下所有表: canal\…*
- canal下的以canal打頭的表:canal.canal.*
- canal schema下的一張表:canal.test1
- 多個規(guī)則組合使用:canal\…*,mysql.test1,mysql.test2 (逗號分隔)
- 進入bin目錄下啟動虛擬機的mysql
3、sh bin/startup.sh(win下是運行 startup.bat)
工程搭建
修改pom.xml,添加依賴
# 服務端口 server.port=10000 # 服務名 spring.application.name=canal-client # 環(huán)境設置:dev、test、prod spring.profiles.active=dev # mysql數(shù)據(jù)庫連接 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.url=jdbc:mysql://localhost:3306/yang?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false&useSSL=true&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=UTC spring.datasource.username=root spring.datasource.password=root # 監(jiān)聽樣例使用 # canal.client.instances.example.host=127.0.0.1 # canal.client.instances.example.port=11111
canal 依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
其他依賴(用則添加)
<dependency>
<groupId>commons-dbutils</groupId>
<artifactId>commons-dbutils</artifactId>
<version>1.7</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.17</version>
</dependency>
編寫canal自定義客戶端類(也可以使用canal-adapter)
官網(wǎng)樣例
package com.example.canal.yang;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient {
private final static int BATCH_SIZE = 1000;
/**
* @Description: canal 客戶端
* @Author: yangjj_tc
* @Date: 2022/11/11 11:38
*/
public void run() throws Exception {
// 創(chuàng)建鏈接
CanalConnector connector =
CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "canal", "canal");
try {
// 打開連接
connector.connect();
// 訂閱數(shù)據(jù)庫表,來覆蓋服務端初始化時的設置
connector.subscribe(".*\..*");
// 回滾到未進行ack的地方,下次fetch的時候,可以從最后一個沒有ack的地方開始拿
connector.rollback();
while (true) {
// 獲取指定數(shù)量的數(shù)據(jù)
Message message = connector.getWithoutAck(BATCH_SIZE);
// 獲取批量ID
long batchId = message.getId();
// 獲取批量的數(shù)量
int size = message.getEntries().size();
// 如果沒有數(shù)據(jù)
if (batchId == -1 || size == 0) {
try {
// 線程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
// 如果有數(shù)據(jù),處理數(shù)據(jù)
printEntry(message.getEntries());
}
// 進行 batch id 的確認
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
/**
* @Description: canal server 解析binlog獲得的實體類信息
* @Author: yangjj_tc
* @Date: 2022/11/11 11:37
*/
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == EntryType.TRANSACTIONEND) {
// 開啟/關閉事務的實體類型,跳過
continue;
}
// RowChange對象,包含了一行數(shù)據(jù)變化的所有特征
RowChange rowChage;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
// 獲取操作類型:insert/update/delete類型
EventType eventType = rowChage.getEventType();
// 打印Header信息
System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
// 判斷是否是DDL語句
if (rowChage.getIsDdl()) {
System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
}
// 獲取RowChange對象里的每一行數(shù)據(jù),打印出來
for (RowData rowData : rowChage.getRowDatasList()) {
// 如果是刪除語句
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
// 如果是新增語句
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
// 如果是更新的語句
} else {
// 變更前的數(shù)據(jù)
System.out.println("------->; before");
printColumn(rowData.getBeforeColumnsList());
// 變更后的數(shù)據(jù)
System.out.println("------->; after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
表數(shù)據(jù)同步樣例
package com.example.canal.yang;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@Component
public class CanalClient {
private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();
@Resource
private DataSource dataSource;
public void run() {
CanalConnector connector =
CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", "");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe("canal.canal_test");
connector.rollback();
try {
while (true) {
Message message = connector.getWithoutAck(batchSize);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
dataHandle(message.getEntries());
}
connector.ack(batchId);
if (SQL_QUEUE.size() >= 1) {
executeQueueSql();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
} finally {
connector.disconnect();
}
}
private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {
for (Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
if (eventType == EventType.DELETE) {
saveDeleteSql(entry);
} else if (eventType == EventType.UPDATE) {
saveUpdateSql(entry);
} else if (eventType == EventType.INSERT) {
saveInsertSql(entry);
}
}
}
}
private void saveDeleteSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getBeforeColumnsList();
StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");
for (Column column : columnList) {
if (column.getIsKey()) {
// 暫時只支持單一主鍵
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
private void saveUpdateSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> newColumnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");
for (int i = 0; i < newColumnList.size(); i++) {
sql.append(" " + newColumnList.get(i).getName() + " = '" + newColumnList.get(i).getValue() + "'");
if (i != newColumnList.size() - 1) {
sql.append(",");
}
}
sql.append(" where ");
List<Column> oldColumnList = rowData.getBeforeColumnsList();
for (Column column : oldColumnList) {
if (column.getIsKey()) {
// 暫時只支持單一主鍵
sql.append(column.getName() + "=" + column.getValue());
break;
}
}
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
private void saveInsertSql(Entry entry) {
try {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
List<RowData> rowDatasList = rowChange.getRowDatasList();
for (RowData rowData : rowDatasList) {
List<Column> columnList = rowData.getAfterColumnsList();
StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");
for (int i = 0; i < columnList.size(); i++) {
sql.append(columnList.get(i).getName());
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(") VALUES (");
for (int i = 0; i < columnList.size(); i++) {
sql.append("'" + columnList.get(i).getValue() + "'");
if (i != columnList.size() - 1) {
sql.append(",");
}
}
sql.append(")");
SQL_QUEUE.add(sql.toString());
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
public void executeQueueSql() {
int size = SQL_QUEUE.size();
for (int i = 0; i < size; i++) {
String sql = SQL_QUEUE.poll();
System.out.println("[sql]----> " + sql);
this.execute(sql.toString());
}
}
public void execute(String sql) {
Connection con = null;
try {
if (null == sql)
return;
con = dataSource.getConnection();
QueryRunner qr = new QueryRunner();
int row = qr.execute(con, sql);
System.out.println("update: " + row);
} catch (SQLException e) {
e.printStackTrace();
} finally {
DbUtils.closeQuietly(con);
}
}
}
注解監(jiān)聽樣例(依賴下載不下來用這個導入到項目)
<dependency>
<groupId>com.xpand</groupId>
<artifactId>starter-canal</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
package com.example.canal.yang;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.xpand.starter.canal.annotation.*;
@CanalEventListener
public class CanalDataEventListener {
/**
* @Description: 增加數(shù)據(jù)監(jiān)聽
* @Author: yangjj_tc
* @Date: 2022/11/11 15:16
*/
@InsertListenPoint
public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
rowData.getAfterColumnsList()
.forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
}
/**
* @Description: 修改數(shù)據(jù)監(jiān)聽
* @Author: yangjj_tc
* @Date: 2022/11/11 15:17
*/
@UpdateListenPoint
public void onEventUpdate(CanalEntry.RowData rowData) {
System.out.println("UpdateListenPoint");
rowData.getAfterColumnsList()
.forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
}
/**
* @Description: 刪除數(shù)據(jù)監(jiān)聽
* @Author: yangjj_tc
* @Date: 2022/11/11 15:17
*/
@DeleteListenPoint
public void onEventDelete(CanalEntry.EventType eventType) {
System.out.println("DeleteListenPoint");
}
/**
* @Description: 自定義數(shù)據(jù)監(jiān)聽
* @Author: yangjj_tc
* @Date: 2022/11/11 15:18
*/
@ListenPoint(destination = "example", schema = "canal", table = {"canal_test", "tb_order"},
eventType = CanalEntry.EventType.UPDATE)
public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
System.err.println("DeleteListenPoint");
rowData.getAfterColumnsList()
.forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue()));
}
@ListenPoint(destination = "example", schema = "canal", // 所要監(jiān)聽的數(shù)據(jù)庫名
table = {"canal_test"}, // 所要監(jiān)聽的數(shù)據(jù)庫表名
eventType = {CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE})
public void onEventCustomUpdateForTbUser(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
getChangeValue(eventType, rowData);
}
public static void getChangeValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
if (eventType == CanalEntry.EventType.DELETE) {
rowData.getBeforeColumnsList().forEach(column -> {
// 獲取刪除前的數(shù)據(jù)
System.out.println(column.getName() + " == " + column.getValue());
});
} else {
rowData.getBeforeColumnsList().forEach(column -> {
// 打印改變前的字段名和值
System.out.println(column.getName() + " == " + column.getValue());
});
rowData.getAfterColumnsList().forEach(column -> {
// 打印改變后的字段名和值
System.out.println(column.getName() + " == " + column.getValue());
});
}
}
}
- 觸發(fā)數(shù)據(jù)庫變更
開始測試,首先啟動MySQL、Canal Server,還有剛剛寫的Spring Boot項目。然后創(chuàng)建表:
DROP TABLE IF EXISTS `canal_test`; CREATE TABLE `canal_test` ( `id` int NOT NULL, `name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL, `age` int NOT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_0900_ai_ci ROW_FORMAT = Dynamic;
如果新增一條數(shù)據(jù)到表中:
INSERT INTO `yang`.`canal_test` (`id`, `name`, `age`) VALUES (1, '1', 1);

總結
canal的好處在于對業(yè)務代碼沒有侵入,因為是基于監(jiān)聽binlog日志去進行同步數(shù)據(jù)的。實時性也能做到準實時,其實是很多企業(yè)一種比較常見的數(shù)據(jù)同步的方案。
通過上面的學習之后,我們應該都明白canal是什么,它的原理,還有用法。實際上這僅僅只是入門,實際項目我們是配置MQ模式,配合RocketMQ或者Kafka,canal會把數(shù)據(jù)發(fā)送到MQ的topic中,然后通過消息隊列的消費者進行處理。

Canal的部署也是支持集群的,需要配合ZooKeeper進行集群管理。
Canal還有一個簡單的Web管理界面。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關文章
SpringBoot集成支付寶沙箱支付的實現(xiàn)示例
本文主要介紹了SpringBoot集成支付寶沙箱支付的實現(xiàn)示例,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下2021-12-12
Java深入淺出掌握SpringBoot之MVC自動配置原理篇
在進行項目編寫前,我們還需要知道一個東西,就是SpringBoot對我們的SpringMVC還做了哪些配置,包括如何擴展,如何定制,只有把這些都搞清楚了,我們在之后使用才會更加得心應手2021-10-10
一文詳解MySql外連接查詢在SpringBoot中的具體使用
外連接通常分為左外連接,右外連接和全外連接,這篇文章主要為大家詳細介紹了如何在SpringBoot中使用MySql的外連接查詢,需要的可以參考下2025-02-02

