处理数据源结构变化
Deal with schema change of data source
处理数据源结构变化是当数据源发生结构变化时,在DataPipeline产品中处理该结构变化的处理流程。
关于此功能
对于不同类型数据源,不同类型的结构变化系统的处理方式与处理流程会有所不同。
数据源删除数据表

目前数据源删除数据表的感知和执行都是由数据源完成,当前所有的数据源处理逻辑相同。
在数据源发生结构变化时,发送源表被删除的请求变化时,支持的情况及处理方法详见:结构变化策略
如果选择暂停的处理方式,此时会暂停数据任务。并提示您源端正在同步的表被删除,由您根据业务的实际情况手动进行处理。
如果选择删除映射,并删除目的地表和数据,此时需要对目的地的表有drop权限。由系统帮您删除目的地的表和数据。并在数据链路和数据任务中删除该映射。
如果选择删除映射,并保留数据目的地表和数据。此时会在数据链路和数据任务中删除该映射。
如果选择持续扫描并保留数据目的地表和数据。此时会保留映射,继续扫描。
手动处理的流程如下:
首先根据结构策略,会在日志中提示您「schema.table name」 发现被删除, 暂停,系统机器人暂停了数据任务。
此时查看相关的数据链路,点击删除,再保存。最后再去数据任务中重新激活任务,继续同步数据任务。

数据源正在同步的表/topic新增字段或者删除正在同步的字段
目前系统中,结构变化的感知是在数据源数据采集过程中,对比采集到的数据同数据映射中源端的表的Schema来获取的。结构变化的执行是在目的地通过对比待写入的数据同目的地的表的Schema,然后根据您所设置的结构变化策略,对目的地数据结构进行自适应变更的。
对于RDBMS类型的数据源


如您在结构变化策略中选择暂停数据任务的处理方式,数据映射将会保留,数据任务将会暂停。此时需要您根据业务的实际情况,手动修改并调整数据链路中的数据映射,确保修改后的数据映射符合您的要求后,再重新激活数据任务,继续数据同步。
如您在结构变化策略中选择忽略结构变化,系统将会继续同步数据任务,并且不修改和变更数据链路和数据任务。
如您在结构变化策略中选择跟随结构变化,系统将会在按照处理数据结构变化的执行逻辑执行变更,然后继续同步数据任务。
手动处理的流程如下:
首先根据结构策略,会在日志中提示您「schema.tasble name」 新增字段 field1,field2,... 暂停。系统机器人暂停了数据任务。
此时查看数据链路会得到如下的提示

此时查看字段映射,会将新增的字段提示给您。如果希望同步新增字段,则需要根据目的地的表结构情况进行处理然后完成数据链路的映射工作。
具体情况:
RDBMS
首先要先去目的地端对表执行alter table增加该字段,然后刷新目的地端的字段,重新映射。保存配置后,重新同步数据任务。
Sequoia
Sequoia可以通过界面增加字段。确认后,完成新增字段的修改,然后保存链路后,继续同步数据任务即可。
kafka
首先要先去目的地端的kafka修改该topic的结构定义,然后刷新目的地端的字段,重新映射。保存配置后,重新同步数据任务。
文件系统
文件系统当前暂不支持结构变化策略。因为选择文件系统暂时无效果。
Redis
Redis可以通过界面的恢复字段,然后点击结构定义,确认后,完成新增数据字段的修改。然后保存链路后,继续同步任务即可

如果检测出正在同步的字段被删除。会显示如下的界面

进入到数据链路的界面,点击到字段映射,然后手动的点击确定,即完成源字段的删除。然后到数据任务的界面,继续同步数据任务。
对于IBM DB2 数据源,采用CDC的方式获取数据源结构变化
当IBM DB2的数据源发生结构变化时,感知到源的结构变化同RDBMS数据库一样。而在执行处理的时候,会支持Rename Table和Alter Table的两种方式。
为了尽量降低对生产业务系统的影响,可以通过如下操作实现结构变化策略的处理。
Rename Table方式
准备工作:
知道cdc for db2和cdc for kafka的实例名字;(本文假定cdc for db2的实例名是db2,假定cdc for kafka的实例名是kafka);
知道要变更的表所在的任务id;(假定任务的ID为104,被变更的表的任务ID为105)
知道要变更的表名;(假定原表名为DP_ABC,变更的表名为DP_ABC_BAK)
处理结构变化策略的流程:
去备端执行showbookmark(这一步为了获取当前的数据读取进度)
db2inst1@db2 bin]$./dmshowbookmark -I kafka -s DP_104 ////// 需要记录下面的bookmark
0007010000000EABF0335800001D4588B6E2455FDC699300000003000000000000000EABF0335800001D4588B6E245
暂停DP任务,选择正常暂停
暂停业务
在数据库执行表结构变更操作
db2 "rename table DP_ABC to DP_ABC_BAK"
db2 "CREATE TABLE DP_ABC(ID bigint primary key not null, NAME char(20),age char(20) DEFAULT '10')";
db2 "insert into DP_ABC(ID,NAME) select ID,NAME from DP_ABC_BAK"
source agent执行以下操作:(确保每一步返回successfully executed)
这一步的执行逻辑
更新源表的定义,并让其设置为活动状态
把更新后的表绑定到订阅上
设置该订阅下该表的复制方法为快照刷新
然后再将其复制方法设置为镜像
最后在源表上标记捕获点,从而继续增量同步
[db2inst1@db2 bin]$ cd /opt/ibm/InfoSphereDataReplication/ReplicationEngineforIBMDB2/bin //// 进入到源端Agent
[db2inst1@db2 bin]$./dmreaddtable -I db2 -t DB2INST1.DP_ABC -a
[db2inst1@db2 bin]$./dmdescribe -I db2-s DP_104
[db2inst1@db2 bin]$./dmsetreplicationmethod -I db2 -s DP_104 -t DB2INST1.DP_ABC -r
[db2inst1@db2 bin]$./dmsetreplicationmethod -I db2 -s DP_104 -t DB2INST1.DP_ABC -m
[db2inst1@db2 bin]$./dmmarktablecapturepoint -I db2 -s DP_104 -t DB2INST1.DP_ABC
恢复业务
新建DP任务(关闭全量初始化),这里在DP链路中选择BAK表,注意目的地为同一个源同一张表,创建任务并激活,然后暂停,暂停后去源端执行下面操作。DP_任务id 就是订阅名。
./dmmarktablecapturepoint -I db2 -s DP_105 -t DB2INST1.DP_ABC_BAK";///////注意:这个DP_105是针对bak新建订阅的名字
./dmsetbookmark -I db2 -b 0007010000000EABF0335800001D4588B6E2455FDC699300000003000000000000000EABF0335800001D4588B6E245 -s DP_105
DP点击启动老任务,选择【启动已存在订阅 DP_104,并激活任务】 注意:该步骤最好在上面订阅DP_105读写都为0时再去重启,尽量保证数据顺序
105任务全部同步完之后,就可以删除了
Alter方式
去备端执行showbookmark(这一步为了获取当前的数据读取进度)
db2inst1@db2 bin]$./dmshowbookmark -I kafka -s DP_104 ////// 需要记录下面的bookmark
0007010000000EABF0335800001D4588B6E2455FDC699300000003000000000000000EABF0335800001D4588B6E245
暂停DP任务,选择正常暂停
暂停业务
在数据库执行表结构变更操作
执行下面两种表结构变更中的一种
// 改变表结构增加列
db2 "alter table DP_ABC add colume xxx char(50)"
//改变表结构删除列
db2 "alter table DP_ABC drop colume xxx "
恢复业务
source agent执行以下操作:(确保每一步返回successfully executed)
[db2inst1@db2 bin]$ cd /opt/ibm/InfoSphereDataReplication/ReplicationEngineforIBMDB2/bin //// 进入到源端Agent
[db2inst1@db2 bin]$./dmreaddtable -I db2 -t DB2INST1.DP_ABC -a
[db2inst1@db2 bin]$./dmdescribe -I db2-s DP_104
[db2inst1@db2 bin]$./dmsetreplicationmethod -I db2 -s DP_104 -t DB2INST1.DP_ABC -r
[db2inst1@db2 bin]$./dmsetreplicationmethod -I db2 -s DP_104 -t DB2INST1.DP_ABC -m
[db2inst1@db2 bin]$./dmmarktablecapturepoint -I db2 -s DP_104 -t DB2INST1.DP_ABC
[db2inst1@db2 bin]$./dmsetbookmark -I db2 -b 0007010000000EABF0335800001D4588B6E2455FDC699300000003000000000000000EABF0335800001D4588B6E245 -s DP_104
DP点击启动任务,选择【启动已存在订阅 DP_104,并激活任务】
对于Alter table的方式,如果源端的变化时数据字段的类型,或者精度或者标度变化,则处理步骤需要调整为如下的步骤:
Alter方式
暂停业务
暂停DP任务,选择正常暂停
在数据库执行表结构变更操作
执行下面表结构变更中的一种
//改变表结构修改字段精度或者标度
db2 "alter table DP_ABC alter colume XXX set data type carchar (100)"
source agent执行以下操作:(确保每一步返回successfully executed)
[db2inst1@db2 bin]$ cd /opt/ibm/InfoSphereDataReplication/ReplicationEngineforIBMDB2/bin //// 进入到源端Agent
[db2inst1@db2 bin]$./dmreaddtable -I db2 -t DB2INST1.DP_ABC -a
[db2inst1@db2 bin]$./dmdescribe -I db2-s DP_104
启动DP任务
恢复业务
最后更新于
这有帮助吗?