MySQL特定表全量、增量數(shù)據(jù)同步到消息隊(duì)列-解決方案
1、原始需求
既要同步原始全量數(shù)據(jù),也要實(shí)時(shí)同步MySQL特定庫(kù)的特定表增量數(shù)據(jù),同時(shí)對(duì)應(yīng)的修改、刪除也要對(duì)應(yīng)。
數(shù)據(jù)同步不能有侵入性:不能更改業(yè)務(wù)程序,并且不能對(duì)業(yè)務(wù)側(cè)有太大性能壓力。
應(yīng)用場(chǎng)景:數(shù)據(jù)ETL同步、降低業(yè)務(wù)服務(wù)器壓力。
2、解決方案

3、canal介紹、安裝
canal是阿里巴巴旗下的一款開源項(xiàng)目,純Java開發(fā)?;跀?shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱&消費(fèi),目前主要支持了MySQL(也支持mariaDB)。
工作原理:mysql主備復(fù)制實(shí)現(xiàn)

從上層來(lái)看,復(fù)制分成三步:
- master將改變記錄到二進(jìn)制日志(binary log)中(這些記錄叫做二進(jìn)制日志事件,binary log events,可以通過(guò)show binlog events進(jìn)行查看);
- slave將master的binary log events拷貝到它的中繼日志(relay log);
- slave重做中繼日志中的事件,將改變反映它自己的數(shù)據(jù)。
canal的工作原理

原理相對(duì)比較簡(jiǎn)單:
- canal模擬mysql slave的交互協(xié)議,偽裝自己為mysql slave,向mysql master發(fā)送dump協(xié)議
- mysql master收到dump請(qǐng)求,開始推送binary log給slave(也就是canal)
- canal解析binary log對(duì)象(原始為byte流)
架構(gòu)

說(shuō)明:
- server代表一個(gè)canal運(yùn)行實(shí)例,對(duì)應(yīng)于一個(gè)jvm
- instance對(duì)應(yīng)于一個(gè)數(shù)據(jù)隊(duì)列 (1個(gè)server對(duì)應(yīng)1..n個(gè)instance)
instance模塊:
- eventParser (數(shù)據(jù)源接入,模擬slave協(xié)議和master進(jìn)行交互,協(xié)議解析)
- eventSink (Parser和Store鏈接器,進(jìn)行數(shù)據(jù)過(guò)濾,加工,分發(fā)的工作)
- eventStore (數(shù)據(jù)存儲(chǔ))
- metaManager (增量訂閱&消費(fèi)信息管理器)
安裝
1、mysql、kafka環(huán)境準(zhǔn)備
2、canal下載:wget?https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
3、解壓:tar -zxvf canal.deployer-1.1.3.tar.gz
4、對(duì)目錄conf里文件參數(shù)配置
對(duì)canal.properties配置:


進(jìn)入conf/example里,對(duì)instance.properties配置:


5、啟動(dòng):bin/startup.sh
6、日志查看:

4、驗(yàn)證
1、開發(fā)對(duì)應(yīng)的kafka消費(fèi)者
package org.kafka;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
/**
*
* Title: KafkaConsumerTest
* Description:
* kafka消費(fèi)者 demo
* Version:1.0.0
* @author pancm
* @date 2018年1月26日
*/
public class KafkaConsumerTest implements Runnable {
private final KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private final String topic;
private static final String GROUPID = "groupA";
public KafkaConsumerTest(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.7.193:9092");
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
int messageNo = 1;
System.out.println("---------開始消費(fèi)---------");
try {
for (; ; ) {
msgList = consumer.poll(1000);
if (null != msgList && msgList.count() > 0) {
for (ConsumerRecord<String, String> record : msgList) {
//消費(fèi)100條就打印 ,但打印的數(shù)據(jù)不一定是這個(gè)規(guī)律的
System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
// String v = decodeUnicode(record.value());
// System.out.println(v);
//當(dāng)消費(fèi)了1000條就退出
if (messageNo % 1000 == 0) {
break;
}
messageNo++;
}
} else {
Thread.sleep(11);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String args[]) {
KafkaConsumerTest test1 = new KafkaConsumerTest("sample-data");
Thread thread1 = new Thread(test1);
thread1.start();
}
/*
* 中文轉(zhuǎn)unicode編碼
*/
public static String gbEncoding(final String gbString) {
char[] utfBytes = gbString.toCharArray();
String unicodeBytes = "";
for (int i = 0; i < utfBytes.length; i++) {
String hexB = Integer.toHexString(utfBytes[i]);
if (hexB.length() <= 2) {
hexB = "00" + hexB;
}
unicodeBytes = unicodeBytes + "\\u" + hexB;
}
return unicodeBytes;
}
/*
* unicode編碼轉(zhuǎn)中文
*/
public static String decodeUnicode(final String dataStr) {
int start = 0;
int end = 0;
final StringBuffer buffer = new StringBuffer();
while (start > -1) {
end = dataStr.indexOf("\\u", start + 2);
String charStr = "";
if (end == -1) {
charStr = dataStr.substring(start + 2, dataStr.length());
} else {
charStr = dataStr.substring(start + 2, end);
}
char letter = (char) Integer.parseInt(charStr, 16); // 16進(jìn)制parse整形字符串。
buffer.append(new Character(letter).toString());
start = end;
}
return buffer.toString();
}
}
2、對(duì)表bak1進(jìn)行增加數(shù)據(jù)
CREATE TABLE `bak1` ( `vin` varchar(20) NOT NULL, `p1` double DEFAULT NULL, `p2` double DEFAULT NULL, `p3` double DEFAULT NULL, `p4` double DEFAULT NULL, `p5` double DEFAULT NULL, `p6` double DEFAULT NULL, `p7` double DEFAULT NULL, `p8` double DEFAULT NULL, `p9` double DEFAULT NULL, `p0` double DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 show create table bak1; insert into bak1 select '李雷abcv', `p1` , `p2` , `p3` , `p4` , `p5` , `p6` , `p7` , `p8` , `p9` , `p0` from moci limit 10
3、查看輸出結(jié)果:

到此這篇關(guān)于MySQL特定表全量、增量數(shù)據(jù)同步到消息隊(duì)列-解決方案的文章就介紹到這了,更多相關(guān)MySQL特定表數(shù)據(jù)同步內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
MySQL常用命令與內(nèi)部組件及SQL優(yōu)化詳情
這篇文章主要介紹了MySQL常用命令與內(nèi)部組件及SQL優(yōu)化詳情,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考價(jià)值,需要的朋友可以參考一下2022-07-07
mysql-5.7.42升級(jí)到mysql-8.2.0(二進(jìn)制方式)
隨著數(shù)據(jù)量的增長(zhǎng)和業(yè)務(wù)需求的變更,我們可能需要升級(jí)MySQL,本文主要介紹了mysql-5.7.42升級(jí)到mysql-8.2.0(二進(jìn)制方式),具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03
MYSQL不能從遠(yuǎn)程連接的一個(gè)解決方法(s not allowed to connect to this MySQL s
MYSQL不能從遠(yuǎn)程連接的一個(gè)解決方法(s not allowed to connect to this MySQL server)2011-08-08
mysql查找刪除表中重復(fù)數(shù)據(jù)方法總結(jié)
在本篇文章中小編給大家整理了關(guān)于mysql查找刪除表中重復(fù)數(shù)據(jù)方法和相關(guān)知識(shí)點(diǎn),需要的朋友們參考下。2019-05-05
MySQL數(shù)據(jù)庫(kù)實(shí)現(xiàn)MMM高可用群集架構(gòu)
這篇文章主要介紹了MySQL數(shù)據(jù)庫(kù)實(shí)現(xiàn)MMM高可用群集架構(gòu),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-12-12
mysql安裝時(shí)出現(xiàn)各種常見問(wèn)題的解決方法
mysql數(shù)據(jù)庫(kù)安裝不了了!mysql最后一步安裝不上?真頭疼!這篇文章主要為大家詳細(xì)介紹了解決mysql安裝時(shí)出現(xiàn)各種經(jīng)典問(wèn)題的方法,感興趣的小伙伴們可以參考一下2016-08-08
MySQL?SQL性能分析之慢查詢?nèi)罩?、explain使用詳解
這篇文章主要介紹了MySQL?SQL性能分析?慢查詢?nèi)罩?、explain使用,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2023-04-04
linux下mysql數(shù)據(jù)庫(kù)單向同步配置方法分享
mysql數(shù)據(jù)庫(kù)單向同步又叫做主從復(fù)制,是通過(guò)二進(jìn)制日志文件完成的,注意:mysql 數(shù)據(jù)庫(kù)的版本,兩個(gè)數(shù)據(jù)庫(kù)版本要相同2012-06-06

