Ali Canal
文章目录
Ali Canal
背景需求
- 背景:分布式系统中会存在许多中间件,比如:Redis、Kafka、大数据存储,但是生产环境核心数据肯定是存在mysql的
- 需求:如何将mysql的数据和中间件的数据进行同步,并且确保数据的一致性、及时性,以及做到代码的无侵入式
数据同步解决方案
常规思路
举例:比如将mysql数据同步到ElasticSearch
- 代码实现
- 做法:对mysql进行增删改操作的时,同时进行ES的增删改
- 好处:技术难度低,实时性高
- 弊端:代码侵入性强
- mybatis实现
- 做法:mybatis的plugin进行实现,针对insert、update、delete的语句进行处理
- 弊端:单条数据很好处理,批量数据就处理不了;并且有一定的技术难度
- AOP实现
- 做法:根据相应的规则、范式、注解等进行切面处理
- 好处:技术难度低,半侵入式
- 弊端:单条数据很好处理,批量数据就处理不了
- logstash
- 做法:利用logstash提供的文件和数据同步功能,能完成数据的同步,配置也简单
- 好处:技术难度低,无侵入性,无需开发
- 弊端:logstash原理是每秒进行一次增量数据查询,将结果同步到ES,实时性要求特别高,而且此方案性能也不咋地,会造成资源浪费
高效思路Canal
组件介绍
- 阿里巴巴开源的Canal:github地址—https://github.com/alibaba/canal
- 好处:能对mysql的binlog进行同步,实时性强,无侵入性,性能也好,不会造成资源浪费
- mysql主备复制原理
- ① master将改变记录到二进制日志(binary log)中(可以通过show binlog events进行查看)
- ② slave将master的binary log events拷贝到它的中继日志(relay log)
- ③ slave重做中继日志中的事件,将改变反映它自己的数据
- canal原理
- ① canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
- ② mysql master收到dump请求,开始推送binary log给slave(也就是canal)
- ③ canal解析binary log对象(原始为byte流)
- ④ binlog中的内容:每一个增删改的脚本以及修改前和修改后的数据
- ⑤ 注意:binlog必须设置成Row模式
- binlog的数据结构如下:
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [binlog里记录变更发生的时间戳,精确到秒]
schemaName
tableName
eventType [insert/update/delete类型]
entryType [事务头BEGIN/事务尾END/数据ROWDATA]
storeValue [byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl [是否是ddl变更操作,比如create table/drop table]
sql [具体的ddl sql]
rowDatas [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组,变更前的数据字段]
afterColumns [Column类型的数组,变更后的数据字段]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否为主键]
updated [是否发生过变更]
isNull [值是否为null]
value [具体的内容,注意为string文本]
- 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全可以提供ddl的变更语句
- insert只有after columns, delete只有before columns,而update则会有before / after columns数据.
使用
canal下载地址
- https://github.com/alibaba/canal/releases
- 客户端ClientAPI地址(很重要):https://github.com/alibaba/canal/wiki/ClientAPI?spm=a2c4e.11153940.blogcont686081.19.117a316cST1LQY
canal在IDEA或者Eclipse中的使用示例
- 创建mvn标准工程
mvn archetype:create -DgroupId=com.alibaba.otter -DartifactId=canal.sample
- 或者使用generate生成项目
mvn archetype:generate -DgroupId=com.alibaba.otter -DartifactId=canal.sample
- pow.xml导入依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.0</version>
</dependency>
- ClientSample代码
package com.alibaba.otter.canal.sample;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
public class SimpleCanalClientExample {
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
- 运行Client
- 参考:github.com/alibaba/canal/wiki/QuickStart
- 启动Canal Client后,可以从控制台从看到类似消息:表示当前数据库无变更
empty count : 1
empty count : 2
empty count : 3
empty count : 4
- 触发数据库变更
mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
-> `ID` int(11) NOT NULL AUTO_INCREMENT,
-> `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
-> PRIMARY KEY (`ID`)
-> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)
从控制台中看到:
empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4 update=true
X : 2013-02-05 23:29:46 update=true
linux上配置使用Canal—QuickStart
- 地址:https://github.com/alibaba/canal/wiki/QuickStart
- Canal Kafka RocketMQ QuickStart
- canal Adapter
- 官方做的独立的组件Adapter,可以将canal server端获取的数据转换成几个常用的中间件数据源,canal1.1后支持:kafka、rocketmq、hbase、elasticsearch
- 做法:直接配置就行,不用开发
常见问题
- 无法接收到数据,程序也没有报错?
- 解决:一定要确保mysql的binlog模式为row模式,canal原理是解析Binlog文件,并且直接中文件中获取数据的。
- Adapter 使用无法同步数据?
- 解决:按照官方文档,检查配置项,如sql的大小写,字段的大小写可能都会有影响,如果还无法搞定,可以自己获取代码调试下,Adapter的代码还是比较容易看懂的
- canal Adapter elasticsearch 改造
- 原因:一个全新的elasticsearch无法使用,因为没有创建elasticsearch index和mapping,增加了对应的功能
- 做法:elasticsearch配置文件mapping节点增加两个参数:
enablefieldmap: true
fieldmap:
id: "text"
BuildingId: "text"
HouseNum: "text"
Floors: "text"
IdProjectInfo: "text"
HouseDigitNum: "text"
BuildingNum: "text"
BuildingName: "text"
Name: "text"
projectid: "text"
bIdProjectInfo: "text"
cinitid: "text"
pCommunityId: "text"
- 解释:enablefieldmap 是否需要自动生成fieldmap,默认为false,如果需要启动的时候就生成这设置为true,并且设置
fieldmap,类似elasticsearch mapping中每个字段的类型
- esconfig bug处理
- 处理:代码中获取binlog的日志处理时,必须要获取数据库名,但是当获取binlog为type query时,是无法获取