什么是增量同步
增量同步主要针对某些只有 Insert 操作的表,随着业务增长,表内数据越来越多。如果每次都同步整表的话,消耗的时间和资源会比较多。因此需要一个增量同步的功能,每次只读取增加部分的数据。
原理解析
其实现原理实际上就是配合增量键在查询的 sql 语句中拼接过滤条件,比如 where id > ? ,将之前已经读取过的数据过滤出去。
增量同步是针对于两个及以上的同步作业来说的。对于初次执行增量同步的作业而言,实际上是整表同步,不同于其他作业的在于增量同步作业会在作业执行完成后记录一个 endLocation 指标,并将这个指标上传到 prometheus 以供后续使用。除第一次作业外,后续的所有增量同步作业都会取上一次作业的 endLocation 做为本次作业的过滤依据(startLocation)。比如第一次作业执行完后,endLocation 为 10,那么下一个作业就会构建出例如 SELECT id,name,age from table where id > 10 的 SQL 语句,达到增量读取的目的。
增量键:数据库表中增量递增的字段,比如自增 id
使用限制
- 只有 RDB 的 Reader 插件插件可以使用
- 通过构建 SQL 过滤语句实现,因此只能用于 RDB 插件
- 增量同步只关心读,不关心写,因此只与 reader 插件有关
- 增量字段只能为数值类型和时间类型
- 指标需要上传到 prometheus,而 prometheus 不支持字符串类型,因此只支持数据类型和时间类型。时间类型会转换成时间戳后上传
- 增量键的值可以重复,但必须递增
- 由于使用'>'的缘故,要求字段必须递增。
如何处理增量键重复场景
考虑可能存在这样的场景:某一次增量同步后的 endLocation 为 x,在下一次增量同步作业启动的间隙中,表内又写入了增量键的值=x 的数据。按照默认的情况,假设增量键为 id,下一次作业会拼接例如 SELECT id,name,age FROM table WHERE id > x。此时在间隙中插入的 id=x 的数据将会丢失。
为了对应上述场景,chunjun 增量同步提供了配置项 useMaxFunc(默认值为 false)。在设置 useMaxFunc=true 时,chunjun 会在增量作业启动时获取当前数据库中增量键的最大值作为本次作业的 endLocation,并且将用于 startLocation 的运算符号从'>'改为'>='。例如:
- 某一次增量启动时上次作业的 endLocation 为 10,id 最大值为 100,那么将会拼接 SQL 语句 SELECT id,name,age FROM table WHERE id >= 10 AND id < 100
- 下一次增量作业启动时 id 的最大值为 200,那么将会拼接 SQL 语句 SELECT id,name,age FROM table WHERE id >=100 AND id < 200
如何使用增量同步
环境准备
由于是使用 prometheus 收集相关指标信息,因此需要先安装 prometheus 和 pushgateway。
下载 Flink Metric Prometheus 依赖,将其放入 Flink lib 目录下
修改 Flink 配置文件,conf/flink-conf.yaml,新增 flink metric 相关配置
metrics.reporter.promgateway.host: host01 metrics.reporter.promgateway.port: 9091 metrics.reporter.promgateway.deleteOnShutdown: false metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
脚本配置
主要的配置项就是 increColumn 和 startLocation
以 mysql 为例脚本如下:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "int"
}
],
"customSql": "",
"increColumn": "id", //指定增量字段进行增量同步,增量字段必须是column存在的字段
"startLocation": "2", //第一次执行时为空,可配置字符串或不配置,后续提交的作业使用prometheus中指标值
"username": "root",
"password": "root",
"connection": [
{
"jdbcUrl": ["jdbc:mysql://localhost:3306/test?useSSL=false"],
"table": ["baserow"]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
],
"setting": {
"restore": {
"restoreColumnName": "id"
},
"speed": {
"channel": 1,
"bytes": 0
}
}
}
}
查询 prometheus
使用 flink 作业对应的 JobId 在普罗米修斯中查询 endLocation 指标值
flink_taskmanager_job_task_operator_flinkx_endlocation{job_id="xxx"}