canal實現mysql數據同步的詳細過程
1、canal下載
canal實現mysql數據同步可以直接安裝canal server就可以了,但是為了方便管理(instance配置,canal server狀態(tài)管理,集群等),需要安裝canal admin,應用下載地址:Releases · alibaba/canal · GitHub
進入頁面可以選擇需要安裝的版本

下載canal.deployer-1.1.8.tar.gz和canal.admin-1.1.8.tar.gz
2、mysql同步用戶創(chuàng)建和授權
登錄mysql mysql -h 127.0.0.1 -P 3306 -u root -p 創(chuàng)建同步用戶 repl 密碼設為123456 CREATE USER 'repl'@'%' IDENTIFIED BY '123456'; 給予同步權限 GRANT REPLICATION SLAVE ON *.* to 'repl'@'%' identified by '123456'; 給予repl只讀test庫的權限,test庫是用來同步數據的 GRANT SELECT ON test.* to 'repl'@'%' identified by '123456'; canal_manager是canal admin需要的,給予repl對該庫的讀寫權限 GRANT ALL PRIVILEGES ON canal_manager.* to 'repl'@'%' identified by '123456'; mysql my.cnf配置文件增加主從配置master數據庫的配置信息 #主數據主從配置 唯一id server_id=1 #開啟logbin log-bin=mysql-bin #寫入模式 row binlog-format=ROW #需要同步的庫 binlog-do-db=test #忽略的數據庫 replicate-ignore-db=mysql replicate-ignore-db=sys replicate-ignore-db=information_schema replicate-ignore-db=performance_schema

在canal-admin解壓文件的conf中有一個canal_manager.sql,導入到master數據庫

3、canal admin安裝和啟動
把canal.admin-1.1.8.tar.gz上傳到linux
解壓 tar -zvxf canal.admin-1.1.8.tar.gz
進入conf目錄下,編輯application.yml配置文件。
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address: 127.0.0.1:3306
database: canal_manager
username: repl
password: 123456
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: 123456重點介紹以下幾個參數:
address:我們需要訂閱(也就是mysql master服務器)mysql所在的服務器IP和數據庫端口。
database:canal.admin web系統(tǒng)必須的幾張表,需要在mysql master服務器上初始化conf/canal_manager.sql文件。
sername和password就是mysql master服務器創(chuàng)建的用于復制的用戶和密碼,也就是我們在canal server中配置的repl 和 123456。
driver-class-name:mysql的驅動,默認是MYSQL5的驅動,如果你的MYSQL是8的(我的就是),要將驅動改為com.mysql.cj.jdbc.Driver。
另外,還需要在mysql連接后面加上allowPublicKeyRetrieval=true,不然啟動時,有可能報錯。
啟動canal.admin
進入bin目錄,執(zhí)行如下命令,啟動canal.admin:
./startup.sh
查看 admin 日志
2022-12-10 03:13:58.995 [main] INFO o.s.jmx.export.annotation.AnnotationMBeanExporter -
Located MBean 'dataSource': registering with JMX server as MBean [com.zaxxer.hikari:name=dataSource,type=HikariDataSource]
2022-12-10 03:13:59.015 [main] INFO org.apache.coyote.http11.Http11NioProtocol - Starting ProtocolHandler ["http-nio-8089"]
2022-12-10 03:13:59.038 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a shared selector for servlet write/read
2022-12-10 03:13:59.214 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8089 (http) with context path ''
2022-12-10 03:13:59.221 [main] INFO com.alibaba.otter.canal.admin.CanalAdminApplication - Started CanalAdminApplication in 14.281 seconds (JVM running for 15.894)
如果出現上述日志,說明啟動成功!
登錄admin
通過http://127.0.0.1:8089/訪問,默認密碼:admin/123456。
注意,IP和密碼需要改成你自己配置的。如果是在服務器上配置的,別忘記放開8089端口。

輸入用戶名和密碼之后,出現上述頁面說明配置成功!
如果需要修改密碼,直接通過執(zhí)行 select upper(sha1(unhex(sha1('1234567')))) 這個sql得到結果,然后復制到canal_manager庫的canal_user表的password字段中就可以了,其中1234567是明文密碼,執(zhí)行上述sql會得到一個密碼。
4、canal server安裝和啟動
把canal.deployer-1.1.8.tar.gz上傳到linux
解壓 tar -zvxf ccanal.deployer-1.1.8.tar.gz
進入conf目錄下,編輯canal.properties配置文件。
注意,如果直接編輯canal.properties,可能無法啟動,報如下錯誤:

可以通過如下方式修改
mv canal.properties canal.properties_bak cp canal_local.properties canal.properties vim canal.properties
canal.properties文件全部內容如下:
# register ip canal.register.ip = # canal admin config canalAdmin 的鏈接、端口、用戶名和MD5密碼 canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB # admin auto register canal server啟動后自動注入到canal admin管理模塊 canal.admin.register.auto = true canal.admin.register.cluster = canal.admin.register.name =
一般只需要修改下面這3個
canal.admin.manager = 127.0.0.1:8089
canal.admin.user = admin
canal.admin.passwd =6F32482BAFC60F23B7736044CEFC1799166E5CDB
啟動canal.server
進入bin目錄,執(zhí)行如下命令,啟動canal.server:
./startup.sh
查看canal日志

啟動后,canalAdmin的server管理模塊,對應創(chuàng)建的canal server會動態(tài)識別到,狀態(tài)變?yōu)閱?/p>

5、canal數據同步
5.1、java 端集成監(jiān)聽canal 同步的mysql數據
1、引入依賴
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.4</version>
</dependency>2、編寫測試代碼
package com.hy.das.config;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class CanalClient implements InitializingBean{
private final static int BATCH_SIZE = 1000;
@Override
public void afterPropertiesSet() throws Exception {
// 創(chuàng)建鏈接 此處的11111為tcp端口 在canal admin Server管理模塊可以查看
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
"test", "", "");
try {
//打開連接
connector.connect();
//訂閱數據庫表,全部表
connector.subscribe(".*\\..*");
//回滾到未進行ack的地方,下次fetch的時候,可以從最后一個沒有ack的地方開始拿
connector.rollback();
while (true) {
// 獲取指定數量的數據
Message message = connector.getWithoutAck(BATCH_SIZE);
System.out.println(message.getEntries().size());
//獲取批量ID
long batchId = message.getId();
//獲取批量的數量
int size = message.getEntries().size();
//如果沒有數據
if (batchId == -1 || size == 0) {
try {
//線程休眠2秒
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println("----------------");
//如果有數據,處理數據
//遍歷entries,單條解析
for (CanalEntry.Entry entry : message.getEntries()) {
//獲取表名
String tableName = entry.getHeader().getTableName();
//獲取類型
CanalEntry.EntryType entryType = entry.getEntryType();
//獲取序列化后的數據
ByteString storeValue = entry.getStoreValue();
//判斷entry類型是否為ROWDATA類型
if (CanalEntry.EntryType.ROWDATA.equals(entryType)){
//反序列化
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//獲取當前事件操作類型
CanalEntry.EventType eventType = rowChange.getEventType();
//獲取數據集
List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();
//遍歷
for (CanalEntry.RowData rowData : rowDatasList) {
//改變前數據
JSONObject jsonObjectBefore = new JSONObject();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
jsonObjectBefore.put(column.getName(),column.getValue());
}
//改變后數據
JSONObject jsonObjectAfter = new JSONObject();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
jsonObjectAfter.put(column.getName(),column.getValue());
}
System.out.println("Table:"+tableName+",EventTpye:"+eventType+",Before:"+jsonObjectBefore+",After:"+jsonObjectAfter);
}
}else {
System.out.println("當前操作類型為:"+entryType);
}
}
}
//進行 batch id 的確認。確認之后,小于等于此 batchId 的 Message 都會被確認。
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
}
newSingleConnector方法里面的test是一個instance實列,定義了需要同步的master庫的信息(ip、端口、用戶名、密碼、binlog文件名稱、同步位置、需要同步的庫、不需要同步的庫等)
在canal admin web管理界面的Instance 管理模塊,點擊新建Instance進行創(chuàng)建,新建頁面的Instance名稱就是test,這個可以隨便填寫,代碼對應修改就行,所屬集群/主機,因為我這里是單機部署,直接選擇自動注入的canal server就行,點擊載入模板,獲取配置初始信息,下圖中標出的信息按照實際的修改填入就行,點擊保存后,啟動這個Instance。


3、啟動服務,對test庫的sys_user表進行數據更新,可以看到后臺已經收到變更數據

5.2、kafka同步數據
1:canal.properties配置文件增加如下配置
#數據變更發(fā)送到kafka # 設置輸出目標為 kafka canal.serverMode = kafka # Kafka 地址 canal.mq.servers = xx.xx.xx.xx:9092 # 投遞失敗的重試次數,默認0,改為2 canal.mq.retries = 2 # Kafka batch.size,即producer一個微批次的大小,默認16K,這里加倍 canal.mq.batchSize = 32768 # Kafka max.request.size,即一個請求的最大大小,默認1M,這里也加倍 canal.mq.maxRequestSize = 2097152 # Kafka linger.ms,即sender線程在檢查微批次是否就緒時的超時,默認0ms,這里改為200ms # 滿足batch.size和linger.ms其中之一,就會發(fā)送消息 canal.mq.lingerMs = 200 # Kafka buffer.memory,緩存大小,默認32M canal.mq.bufferMemory = 33554432 # 獲取binlog數據的批次大小,默認50 canal.mq.canalBatchSize = 50 # 獲取binlog數據的超時時間,默認200ms canal.mq.canalGetTimeout = 200 # 是否將binlog轉為JSON格式。如果為false,就是原生Protobuf格式 canal.mq.flatMessage = true # 壓縮類型,官方文檔沒有描述 canal.mq.compressionType = none # Kafka acks,默認all,表示分區(qū)leader會等所有follower同步完才給producer發(fā)送ack # 0表示不等待ack,1表示leader寫入完畢之后直接ack canal.mq.acks = all # Kafka消息投遞是否使用事務 # 主要針對flatMessage的異步發(fā)送和動態(tài)多topic消息投遞進行事務控制來保持和Canal binlog位置的一致性 # flatMessage模式下建議開啟 canal.mq.transaction = true

2:在canal admin web界面修改instance mq配置,增加數據同步到kakfa的topic

3:如上兩步配置完成重啟后,在kafka監(jiān)聽配置的topic就可以接收到數據了
6、java tcp同步只是其中一種方式,還可以通過kafka、rabbitmq等方式進行數據同步
注意上面需要提供對外訪問的端口需要開通安全組,比如8089、11111等端口。
參考文章:
https://zhuanlan.zhihu.com/p/590705531
到此這篇關于canal實現mysql數據同步的文章就介紹到這了,更多相關canal mysql數據同步內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
- Redis和數據庫的一致性(Canal+MQ) 的實現
- 使用Canal實現MySQL數據同步的完整指南
- 兩個windows服務器使用canal實現mysql實時同步
- Canal實現MYSQL實時數據同步的示例代碼
- Canal進行MySQL到MySQL數據庫全量+增量同步踩坑指南
- 基于Docker結合Canal實現MySQL實時增量數據傳輸功能
- MySQL數據實時同步Redis的方案全解析
- 保證MySQL與Redis數據一致性的6種實現方案
- Redis與MySQL數據一致性問題的策略模式及解決方案
- 詳解讓MySQL和Redis數據保持一致的四種策略
- Java使用Canal同步MySQL數據到Redis
- Linux寶塔面板使用Canal實現Mysql和Redis數據同步(圖文教程)
相關文章
MySQL count(*/column)查詢優(yōu)化的實現
count()是SQL中一個常用的聚合函數,其被用來統(tǒng)計記錄的總數,本文主要介紹了MySQL count(*/column)查詢優(yōu)化的實現,具有一定的參考價值,感興趣的可以了解一下2024-09-09
MySQL中On duplicate key update的實現示例
ON DUPLICATE KEY UPDATE是一種MySQL的語法,它在插入新數據時,如果遇到唯一鍵沖突,則會執(zhí)行更新操作,而不是拋出異?;蚝雎栽摋l數據,下面就具體來介紹一下如何使用2025-08-08
mysql 8.0.18 壓縮包安裝及忘記密碼重置所遇到的坑
這篇文章主要介紹了mysql 8.0.18 壓縮包安裝及忘記密碼重置所遇到的坑,本文給大家介紹的非常詳細,具有一定的參考借鑒價值,需要的朋友可以參考下2019-12-12
MySQL 8.0.13設置日期為0000-00-00 00:00:00時出現的問題解決
這篇文章主要介紹了MySQL 8.0.13設置日期為0000-00-00 00:00:00時出現的問題解決,小編覺得挺不錯的,現在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2019-01-01

