SpringBoot整合Canal實(shí)現(xiàn)數(shù)據(jù)庫實(shí)時(shí)同步的完整指南
前言
在微服務(wù)架構(gòu)盛行的今天,數(shù)據(jù)一致性成為了一個(gè)關(guān)鍵挑戰(zhàn)。當(dāng)業(yè)務(wù)數(shù)據(jù)在MySQL中發(fā)生變化時(shí),如何實(shí)時(shí)同步到其他服務(wù)或緩存中?阿里巴巴開源的Canal組件為我們提供了完美的解決方案。今天,我將帶你深入探索SpringBoot整合Canal的技術(shù)內(nèi)幕,讓你輕松掌握這一核心技術(shù)。
一、什么是Canal
Canal是阿里巴巴開源的一個(gè)基于MySQL數(shù)據(jù)庫增量日志解析的組件,它模擬MySQL主從復(fù)制的交互協(xié)議,偽裝成MySQL的從節(jié)點(diǎn),向MySQL主節(jié)點(diǎn)發(fā)送dump協(xié)議,獲取到MySQL的二進(jìn)制日志(binlog)后,再解析為便于理解和使用的數(shù)據(jù)格式。
1.1 Canal工作原理
MySQL主庫(master)
↓ (binlog日志)
Canal Server (模擬slave)
↓ (解析后的數(shù)據(jù)變更事件)
Canal Client/SpringBoot應(yīng)用
↓ (業(yè)務(wù)處理)
下游系統(tǒng)(Redis/ES/MQ等)
Canal的核心原理就是:
- 偽裝成MySQL從庫:Canal模擬MySQL slave的交互協(xié)議
- 獲取binlog:向MySQL master發(fā)送dump協(xié)議,獲取binlog日志
- 解析數(shù)據(jù)變更:解析binlog中的INSERT、UPDATE、DELETE等事件
- 推送到下游:將解析后的數(shù)據(jù)變更推送給應(yīng)用處理
二、環(huán)境準(zhǔn)備
2.1 MySQL配置
首先,確保MySQL已開啟binlog功能,并設(shè)置為ROW模式:
-- 查看binlog配置 SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE 'binlog_format'; -- 如果未開啟,需在my.cnf中添加以下配置 [mysqld] # 開啟binlog log-bin=mysql-bin # binlog格式必須為ROW binlog-format=ROW # 服務(wù)器ID,唯一標(biāo)識 server-id=1 # binlog過期時(shí)間(天) expire_logs_days=7 # 單個(gè)binlog文件大小 max_binlog_size=500M
2.2 創(chuàng)建Canal專用用戶
創(chuàng)建Canal用戶并授權(quán):
-- 創(chuàng)建canal用戶 CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; -- 授予權(quán)限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- 刷新權(quán)限 FLUSH PRIVILEGES;
注意:必須授予REPLICATION SLAVE和REPLICATION CLIENT權(quán)限,Canal才能讀取binlog。
2.3 部署Canal Server
下載Canal Server
從GitHub下載最新穩(wěn)定版(推薦1.1.7版本):
wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz # 解壓 mkdir -p /opt/canal tar -zxvf canal.deployer-1.1.7.tar.gz -C /opt/canal
配置Canal Server
修改conf/example/instance.properties配置文件:
# MySQL連接信息 canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset=UTF-8 # 監(jiān)聽的表(正則表達(dá)式) # .*\\..* 表示監(jiān)聽所有庫的所有表 # 可以指定為具體的表,如:test_db\\.user_info canal.instance.filter.regex=.*\\..* # 表黑名單 canal.instance.filter.black.regex=mysql\\.information_schema.* # slaveId(需要唯一) canal.instance.mysql.slaveId=1234
修改conf/canal.properties全局配置:
# Canal服務(wù)端口 canal.port=11111 canal.metrics.pull.port=11112 # 實(shí)例列表 canal.destinations=example # 管理端口 canal.admin.manager.port=8089
啟動Canal Server
# 啟動 sh bin/startup.sh # 查看日志 tail -f logs/canal/canal.log tail -f logs/example/example.log # 停止 sh bin/stop.sh
啟動成功后,你會看到類似以下日志:
2026-02-02 17:00:00.123 [main] INFO com.alibaba.otter.canal.instance.core.CanalInstanceWithManager - [example] init successful
三、SpringBoot集成Canal
3.1 創(chuàng)建SpringBoot項(xiàng)目
創(chuàng)建一個(gè)新的SpringBoot項(xiàng)目,添加必要的依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.18</version>
<relativePath/>
</parent>
<groupId>com.imooc</groupId>
<artifactId>canal-sync-demo</artifactId>
<version>1.0.0</version>
<properties>
<java.version>8</java.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Canal Spring Boot Starter -->
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
<!-- JPA注解支持 -->
<dependency>
<groupId>javax.persistence</groupId>
<artifactId>persistence-api</artifactId>
<version>1.0</version>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- MyBatis Plus -->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.5</version>
</dependency>
<!-- MySQL驅(qū)動 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.2 配置文件
創(chuàng)建application.yml配置文件:
server:
port: 8080
spring:
application:
name: canal-sync-demo
# MySQL配置
datasource:
url: jdbc:mysql://127.0.0.1:3306/test_db?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai
username: root
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
# Redis配置
redis:
host: 127.0.0.1
port: 6379
password:
database: 0
# Canal配置
canal:
# Canal Server地址
server: 127.0.0.1:11111
# 實(shí)例名稱(對應(yīng)Canal Server配置的destination)
destination: example
# Canal Server用戶名和密碼(如果Canal Server配置了認(rèn)證)
user-name: canal
password: canal
# 日志配置
logging:
level:
top.javatool.canal.client: warn
注意:top.javatool.canal.client的日志級別設(shè)置為warn,避免Canal客戶端的日志過多。
3.3 創(chuàng)建實(shí)體類
假設(shè)我們有一個(gè)數(shù)據(jù)字典表data_dictionary,對應(yīng)的實(shí)體類如下:
package com.xxx.pojo;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import javax.persistence.Column;
import javax.persistence.Id;
/**
* 數(shù)據(jù)字典實(shí)體類
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@TableName("data_dictionary")
public class DataDictionary {
/**
* 主鍵ID
*/
@Id
@TableId
@Column(name = "id")
private String id;
/**
* 類型編碼
*/
@Column(name = "type_code")
private String typeCode;
/**
* 類型名稱
*/
@Column(name = "type_name")
private String typeName;
/**
* 字典項(xiàng)鍵
*/
@Column(name = "item_key")
private String itemKey;
/**
* 字典項(xiàng)值
*/
@Column(name = "item_value")
private String itemValue;
/**
* 排序
*/
@Column(name = "sort")
private Integer sort;
/**
* 圖標(biāo)
*/
@Column(name = "icon")
private String icon;
/**
* 是否啟用
*/
@Column(name = "enable")
private Boolean enable;
}
關(guān)鍵點(diǎn)說明:
@TableName("data_dictionary"):指定對應(yīng)的數(shù)據(jù)庫表名@Id和@Column:這些注解來自JPA,Canal會根據(jù)這些注解將數(shù)據(jù)庫字段映射到實(shí)體類屬性- 類名、屬性名可以與表名、字段名不同,通過注解映射
3.4 創(chuàng)建Canal監(jiān)聽處理器
這是最核心的部分!我們需要創(chuàng)建一個(gè)類來實(shí)現(xiàn)EntryHandler<T>接口:
package com.xxx.canal;
import com.alibaba.fastjson.JSON;
import com.xxx.pojo.DataDictionary;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;
import javax.annotation.Resource;
/**
* 數(shù)據(jù)字典同步處理器
* 監(jiān)聽data_dictionary表的數(shù)據(jù)變更,實(shí)時(shí)同步到Redis
*/
@Slf4j
@Component
@CanalTable("data_dictionary") // 指定監(jiān)聽的表名
public class DataDictSyncHandler implements EntryHandler<DataDictionary> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
/**
* 監(jiān)聽新增操作
* 當(dāng)MySQL中執(zhí)行INSERT時(shí),此方法會被回調(diào)
*
* @param dataDictionary 新增的數(shù)據(jù)
*/
@Override
public void insert(DataDictionary dataDictionary) {
log.info("=== Canal監(jiān)聽到新增操作 ===");
log.info("數(shù)據(jù)內(nèi)容:{}", JSON.toJSONString(dataDictionary));
// 業(yè)務(wù)邏輯:將數(shù)據(jù)同步到Redis
syncToRedis(dataDictionary);
log.info("新增操作處理完成");
}
/**
* 監(jiān)聽更新操作
* 當(dāng)MySQL中執(zhí)行UPDATE時(shí),此方法會被回調(diào)
*
* @param before 更新前的數(shù)據(jù)(老數(shù)據(jù))
* @param after 更新后的數(shù)據(jù)(新數(shù)據(jù))
*/
@Override
public void update(DataDictionary before, DataDictionary after) {
log.info("=== Canal監(jiān)聽到更新操作 ===");
log.info("更新前數(shù)據(jù):{}", JSON.toJSONString(before));
log.info("更新后數(shù)據(jù):{}", JSON.toJSONString(after));
// 業(yè)務(wù)邏輯:刪除舊緩存,寫入新緩存
deleteFromRedis(before.getId());
syncToRedis(after);
log.info("更新操作處理完成");
}
/**
* 監(jiān)聽刪除操作
* 當(dāng)MySQL中執(zhí)行DELETE時(shí),此方法會被回調(diào)
*
* @param dataDictionary 被刪除的數(shù)據(jù)
*/
@Override
public void delete(DataDictionary dataDictionary) {
log.info("=== Canal監(jiān)聽到刪除操作 ===");
log.info("被刪除數(shù)據(jù):{}", JSON.toJSONString(dataDictionary));
// 業(yè)務(wù)邏輯:從Redis中刪除對應(yīng)數(shù)據(jù)
deleteFromRedis(dataDictionary.getId());
log.info("刪除操作處理完成");
}
/**
* 將數(shù)據(jù)同步到Redis
*
* @param dataDictionary 數(shù)據(jù)字典對象
*/
private void syncToRedis(DataDictionary dataDictionary) {
try {
String key = "data_dict:" + dataDictionary.getId();
redisTemplate.opsForValue().set(key, dataDictionary);
log.info("數(shù)據(jù)已同步到Redis,key:{}", key);
} catch (Exception e) {
log.error("同步數(shù)據(jù)到Redis失敗:", e);
}
}
/**
* 從Redis中刪除數(shù)據(jù)
*
* @param id 數(shù)據(jù)ID
*/
private void deleteFromRedis(String id) {
try {
String key = "data_dict:" + id;
redisTemplate.delete(key);
log.info("已從Redis刪除數(shù)據(jù),key:{}", key);
} catch (Exception e) {
log.error("從Redis刪除數(shù)據(jù)失敗:", e);
}
}
}
核心要點(diǎn):
@CanalTable("data_dictionary"):指定要監(jiān)聽的數(shù)據(jù)庫表名implements EntryHandler<DataDictionary>:泛型指定要處理的實(shí)體類型- 三個(gè)核心方法:
insert(T t):監(jiān)聽INSERT操作update(T before, T after):監(jiān)聽UPDATE操作,可以同時(shí)拿到更新前后的數(shù)據(jù)delete(T t):監(jiān)聽DELETE操作
- 自動回調(diào)機(jī)制:Canal會根據(jù)數(shù)據(jù)庫變更類型自動調(diào)用對應(yīng)的方法,并將數(shù)據(jù)封裝成實(shí)體對象傳入
3.5 啟動類
創(chuàng)建SpringBoot啟動類:
package com.xxx;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Canal同步演示啟動類
*/
@SpringBootApplication
public class CanalSyncApplication {
public static void main(String[] args) {
SpringApplication.run(CanalSyncApplication.class, args);
}
}
四、測試驗(yàn)證
4.1 準(zhǔn)備測試數(shù)據(jù)
在MySQL中創(chuàng)建測試表并插入數(shù)據(jù):
-- 創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE IF NOT EXISTS test_db DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE test_db;
-- 創(chuàng)建數(shù)據(jù)字典表
CREATE TABLE IF NOT EXISTS `data_dictionary` (
`id` varchar(64) NOT NULL COMMENT '主鍵ID',
`type_code` varchar(50) NOT NULL COMMENT '類型編碼',
`type_name` varchar(100) DEFAULT NULL COMMENT '類型名稱',
`item_key` varchar(50) DEFAULT NULL COMMENT '字典項(xiàng)鍵',
`item_value` varchar(200) DEFAULT NULL COMMENT '字典項(xiàng)值',
`sort` int(11) DEFAULT '0' COMMENT '排序',
`icon` varchar(100) DEFAULT NULL COMMENT '圖標(biāo)',
`enable` tinyint(1) DEFAULT '1' COMMENT '是否啟用',
PRIMARY KEY (`id`),
KEY `idx_type_code` (`type_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='數(shù)據(jù)字典表';
-- 插入測試數(shù)據(jù)
INSERT INTO `data_dictionary`
VALUES ('1', 'gender', '性別', 'male', '男', 1, 'icon-male', 1),
('2', 'gender', '性別', 'female', '女', 2, 'icon-female', 1);
4.2 啟動應(yīng)用
- 確保Canal Server已啟動
- 啟動SpringBoot應(yīng)用
4.3 測試數(shù)據(jù)同步
測試新增
在MySQL中執(zhí)行INSERT:
INSERT INTO `data_dictionary`
VALUES ('3', 'status', '狀態(tài)', 'active', '激活', 1, NULL, 1);
觀察應(yīng)用日志:
=== Canal監(jiān)聽到新增操作 ===
數(shù)據(jù)內(nèi)容:{"id":"3","typeCode":"status","typeName":"狀態(tài)","itemKey":"active","itemValue":"激活","sort":1,"icon":null,"enable":true}
數(shù)據(jù)已同步到Redis,key:data_dict:3
新增操作處理完成
檢查Redis中是否有對應(yīng)數(shù)據(jù):
redis-cli
127.0.0.1:6379> GET data_dict:3
{"id":"3","typeCode":"status","typeName":"狀態(tài)","itemKey":"active","itemValue":"激活","sort":1,"icon":null,"enable":true}
測試更新
在MySQL中執(zhí)行UPDATE:
UPDATE `data_dictionary` SET `item_value`='啟用中', `sort`=2 WHERE `id`='3';
觀察應(yīng)用日志:
=== Canal監(jiān)聽到更新操作 ===
更新前數(shù)據(jù):{"id":"3","typeCode":"status","typeName":"狀態(tài)","itemKey":"active","itemValue":"激活","sort":1,"icon":null,"enable":true}
更新后數(shù)據(jù):{"id":"3","typeCode":"status","typeName":"狀態(tài)","itemKey":"active","itemValue":"啟用中","sort":2,"icon":null,"enable":true}
已從Redis刪除數(shù)據(jù),key:data_dict:3
數(shù)據(jù)已同步到Redis,key:data_dict:3
更新操作處理完成
測試刪除
在MySQL中執(zhí)行DELETE:
DELETE FROM `data_dictionary` WHERE `id`='3';
觀察應(yīng)用日志:
=== Canal監(jiān)聽到刪除操作 ===
被刪除數(shù)據(jù):{"id":"3","typeCode":"status","typeName":"狀態(tài)","itemKey":"active","itemValue":"啟用中","sort":2,"icon":null,"enable":true}
已從Redis刪除數(shù)據(jù),key:data_dict:3
刪除操作處理完成
檢查Redis中數(shù)據(jù)是否已被刪除:
127.0.0.1:6379> GET data_dict:3
(nil)
五、高級應(yīng)用場景
5.1 多表監(jiān)聽
如果需要監(jiān)聽多張表,可以創(chuàng)建多個(gè)Handler:
// 用戶表監(jiān)聽器
@CanalTable("t_user")
@Component
public class UserHandler implements EntryHandler<User> {
@Override
public void insert(User user) {
// 處理用戶新增
}
@Override
public void update(User before, User after) {
// 處理用戶更新
}
@Override
public void delete(User user) {
// 處理用戶刪除
}
}
// 訂單表監(jiān)聽器
@CanalTable("t_order")
@Component
public class OrderHandler implements EntryHandler<Order> {
@Override
public void insert(Order order) {
// 處理訂單新增
}
@Override
public void update(Order before, Order after) {
// 處理訂單更新
}
@Override
public void delete(Order order) {
// 處理訂單刪除
}
}
5.2 同步到Elasticsearch
@CanalTable("t_product")
@Component
@Slf4j
public class ProductEsHandler implements EntryHandler<Product> {
@Autowired
private ElasticsearchRestTemplate esTemplate;
@Override
public void insert(Product product) {
// 同步到ES
esTemplate.save(product);
}
@Override
public void update(Product before, Product after) {
// 更新ES文檔
esTemplate.save(after);
}
@Override
public void delete(Product product) {
// 從ES刪除文檔
esTemplate.delete(product.getId(), "product");
}
}
5.3 發(fā)送消息到MQ
@CanalTable("t_order")
@Component
@Slf4j
public class OrderMqHandler implements EntryHandler<Order> {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void insert(Order order) {
// 發(fā)送訂單創(chuàng)建消息
rabbitTemplate.convertAndSend("order.exchange", "order.created", order);
}
@Override
public void update(Order before, Order after) {
// 判斷訂單狀態(tài)變化,發(fā)送相應(yīng)消息
if (!before.getStatus().equals(after.getStatus())) {
rabbitTemplate.convertAndSend("order.exchange", "order.status.changed", after);
}
}
@Override
public void delete(Order order) {
// 發(fā)送訂單刪除消息
rabbitTemplate.convertAndSend("order.exchange", "order.deleted", order);
}
}
六、生產(chǎn)環(huán)境優(yōu)化建議
6.1 性能優(yōu)化
批量處理:Canal支持批量拉取數(shù)據(jù),可以在配置中設(shè)置batch-size
canal: batch-size: 1000 # 每次拉取1000條
異步處理:在Handler中使用異步方式處理業(yè)務(wù)邏輯,避免阻塞Canal消費(fèi)線程
@Async
@Override
public void insert(DataDictionary dataDictionary) {
// 異步處理
}
精簡監(jiān)聽:只監(jiān)聽必要的表,避免監(jiān)聽所有庫表
# 在Canal Server配置中精簡監(jiān)聽表 canal.instance.filter.regex=test_db\\.data_dictionary,test_db\\.t_user
6.2 高可用方案
- Canal Server集群:部署多個(gè)Canal Server實(shí)例,通過Zookeeper進(jìn)行協(xié)調(diào)
- 主從切換:MySQL主從切換時(shí),Canal需要自動切換到新的master
- 消費(fèi)位點(diǎn)管理:確保消費(fèi)位點(diǎn)正常提交,避免重復(fù)消費(fèi)或數(shù)據(jù)丟失
6.3 監(jiān)控告警
- 監(jiān)控Canal延遲:定期檢查Canal消費(fèi)延遲,確保實(shí)時(shí)性
- 監(jiān)控Handler執(zhí)行時(shí)間:記錄Handler執(zhí)行耗時(shí),及時(shí)發(fā)現(xiàn)性能問題
- 異常告警:當(dāng)同步出現(xiàn)異常時(shí),及時(shí)發(fā)送告警通知
七、常見問題與解決方案
7.1 監(jiān)聽不到數(shù)據(jù)變更
可能原因:
- MySQL的binlog未開啟或格式不是ROW
- Canal用戶權(quán)限不足
canal.instance.filter.regex配置錯(cuò)誤- 實(shí)體類的
@Column注解配置不正確
解決方案:
-- 檢查binlog SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE 'binlog_format'; -- 檢查用戶權(quán)限 SHOW GRANTS FOR 'canal'@'%';
7.2 字段映射失敗
問題描述:某些字段無法映射到實(shí)體類
解決方案:
- 確保
@Column注解的name屬性與數(shù)據(jù)庫字段名一致 - 對于下劃線轉(zhuǎn)駝峰的情況,Canal會自動轉(zhuǎn)換,但建議顯式指定
@Column(name = "type_code") // 顯式指定字段名 private String typeCode;
7.3 數(shù)據(jù)同步延遲
可能原因:
- Handler中的業(yè)務(wù)邏輯執(zhí)行時(shí)間過長
- Canal消費(fèi)線程阻塞
- 網(wǎng)絡(luò)延遲
解決方案:
- 將耗時(shí)的業(yè)務(wù)邏輯異步化
- 增加Canal消費(fèi)線程數(shù)
- 優(yōu)化網(wǎng)絡(luò)環(huán)境
7.4 重復(fù)消費(fèi)數(shù)據(jù)
問題描述:重啟應(yīng)用后,部分?jǐn)?shù)據(jù)被重復(fù)處理
解決方案:
- 確保業(yè)務(wù)邏輯具備冪等性
- 在Redis或其他存儲中記錄已處理的ID
- 使用數(shù)據(jù)庫唯一索引防止重復(fù)插入
@Override
public void insert(DataDictionary dataDictionary) {
// 冪等性校驗(yàn)
String key = "processed:" + dataDictionary.getId();
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
log.info("數(shù)據(jù)已處理過,跳過:{}", dataDictionary.getId());
return;
}
// 處理業(yè)務(wù)邏輯
syncToRedis(dataDictionary);
// 標(biāo)記已處理(設(shè)置過期時(shí)間)
redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS);
}
八、總結(jié)
通過本文的詳細(xì)介紹,相信你已經(jīng)掌握了SpringBoot整合Canal的核心技術(shù)。讓我們總結(jié)一下關(guān)鍵要點(diǎn):
核心優(yōu)勢
- 零侵入:不需要修改業(yè)務(wù)代碼,基于binlog監(jiān)聽
- 實(shí)時(shí)性強(qiáng):基于binlog解析,延遲可控制在毫秒級
- 易于集成:使用
canal-spring-boot-starter,幾行注解即可實(shí)現(xiàn) - 應(yīng)用場景廣:緩存同步、搜索索引、數(shù)據(jù)歸檔等
最佳實(shí)踐
- 合理配置binlog格式為ROW
- 為Canal創(chuàng)建專用用戶并授權(quán)
- 實(shí)體類使用
@Id和@Column注解精確映射 - Handler中的業(yè)務(wù)邏輯要保持輕量,耗時(shí)操作異步化
- 注意冪等性處理,避免數(shù)據(jù)重復(fù)
- 監(jiān)控Canal消費(fèi)延遲和Handler執(zhí)行時(shí)間
適用場景
- 緩存實(shí)時(shí)更新(Redis、Memcached)
- 搜索引擎數(shù)據(jù)同步(Elasticsearch)
- 數(shù)據(jù)歸檔與備份
- 多數(shù)據(jù)源同步
- 業(yè)務(wù)解耦(通過MQ發(fā)送變更事件)
到此這篇關(guān)于SpringBoot整合Canal實(shí)現(xiàn)數(shù)據(jù)庫實(shí)時(shí)同步的完整指南的文章就介紹到這了,更多相關(guān)SpringBoot Canal數(shù)據(jù)庫同步內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java8中stream流的collectingAndThen方法應(yīng)用實(shí)例詳解
Java8中的Stream流提供了collectingAndThen方法,用于對歸納結(jié)果進(jìn)行二次處理,文章通過User類的數(shù)據(jù)填充,演示了如何使用該方法進(jìn)行集合去重、查找最高工資員工、計(jì)算平均工資等操作,感興趣的朋友跟隨小編一起看看吧2025-03-03
Java中循環(huán)冗余校驗(yàn)(CRC32)的實(shí)現(xiàn)
CRC校驗(yàn)實(shí)用程序庫在數(shù)據(jù)存儲和數(shù)據(jù)通訊領(lǐng)域,為了保證數(shù)據(jù)的正確,就不得不采用檢錯(cuò)的手段,下面這篇文章主要給大家介紹了關(guān)于Java中循環(huán)冗余校驗(yàn)(CRC32)實(shí)現(xiàn)的相關(guān)資料,需要的朋友可以參考借鑒,下面來一起看看吧。2017-10-10
SpringBoot啟動時(shí)如何通過啟動參數(shù)指定logback的位置
這篇文章主要介紹了SpringBoot啟動時(shí)如何通過啟動參數(shù)指定logback的位置,在spring boot中,使用logback配置的方式常用的有兩種,本文給大家介紹的非常詳細(xì),需要的朋友可以參考下2023-07-07
Java通過ServerSocket與Socket實(shí)現(xiàn)通信過程
本文介紹了Java中的ServerSocket和Socket類,詳細(xì)講解了它們的構(gòu)造方法和使用場景,并通過一個(gè)簡單的通信示例展示了如何實(shí)現(xiàn)服務(wù)器和客戶端之間的通信,此外,還介紹了如何為Socket設(shè)置超時(shí)時(shí)間2025-11-11
基于hibernate框架在eclipse下的配置方法(必看篇)
下面小編就為大家?guī)硪黄趆ibernate框架在eclipse下的配置方法(必看篇)。小編覺得挺不錯(cuò)的,現(xiàn)在就分享給大家,也給大家做個(gè)參考。一起跟隨小編過來看看吧2017-09-09

