Otter跨机房迁移采坑记录

背景

前几周给insgin团队业做跨机房数据迁移过程中,出现了一系列的数据,主要问题如下:

  1. 部分表未同步/增量丢失问题

    那有些人可能会问为什么不是全量迁移导致数据不一致问题?那是因为我们的数据迁移操作流程,是增量数据和全量数据同时开启更新的,当全量迁移完成时候,我们通过增量通道的偏移量追赶到最新binlog偏移量时候,会触发全量数据校验过程,在校验完成一致后,我们会修复不一致的数据,直到数据准实时一致。数据修复完成后,接下来就是每天数据校验,我们会每天晚上自动开启数据校验流程,然后持续观察数据是否一致。我们出现的现象就是某几张表一致增量丢失现象。

  2. 跨机房传输RPC是采用Http协议多线程下载问题,下载过程中出现pb反序列化问题和下载502问题;

    pb反序列化问题主要是2.6版本的pb反序列化有大小64m限制,由于某些表存在大字段问题,批量读取binlog可能导致序列化生成的文件过大导致反序列化问题。下载502报错采用多线程下载器aria2c在下载文件比较大的情况下经常出现下载失败问题。

  3. 数据同步更新时间不一致问题;

    发现更新频繁的表经常出现更新时间不一致问题。

  4. 数据校验出现超时校验导致校验失败问题;

    某一些表,数据量在50w左右,但是出现了校验超时失败问题。

原因

增量丢失问题

针对2张表增量丢失更新问题,我们差不多定位了1周左右,中间连周末都去加班找问题了。由于线下环境一直模拟不出来问题,线上由于otter定于的表差不多400多张,同步量还是挺大的,debug日志也不好开启。所以最终我们只能通过针对问题表的日志输出具体canal解析到的binlog信息。从SelectTask开始加日志,到MessageParse加日志,最终定位到,出问题的binlog信息没有rowchange信息,导致无法解析到数据变更前和变更后的信息,然后通过binlog的偏移量去查mysql的binlog信息,发现给binlog信息居然真的只有sql语句,没有变更前和变更后的数据。

com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
 private List<EventData> internParse(Pipeline pipeline, Entry entry) {
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new SelectException("parser of canal-event has an error , data:" + entry.toString(), e);
}

// //线下环境
// if(19 == pipeline.getId() && ("global_drug_relate_indication".equalsIgnoreCase(entry.getHeader().getTableName())
// || "drug_indications_relation".equalsIgnoreCase(entry.getHeader().getTableName()) )) {
//
// parseFromDesc(entry);
// logger.warn("binlogdebug-interparse rowChange object:{}", rowChange);
// }
// //线上环境
// if(23 == pipeline.getId() && ("global_drug_relate_indication".equalsIgnoreCase(entry.getHeader().getTableName())
// || "drug_indications_relation".equalsIgnoreCase(entry.getHeader().getTableName()) )) {
//
// logger.warn("binlogdebug-interparse entryposition:{}", entry.getHeader().getLogfileOffset());
//
//
// if (null == rowChange.getRowDatasList() || rowChange.getRowDatasList().size() == 0) {
// logger.warn("binlogdebug-interparse storeValue parseFrom(), dataList of rowChange is null");
// logger.warn("binlogdebug-interparse rowChange object:{}", rowChange);
// parseFromDesc(entry);
// } else {
// logger.warn("binlogdebug-interparse storeValue parseFrom(), dataList of rowChange is not null, rowDatasList: {}", rowChange.getRowDatasList());
// }
// }

if (rowChange == null) {
return null;
}

String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
EventType eventType = EventType.valueOf(rowChange.getEventType().name());

// 处理下DDL操作
if (eventType.isQuery()) {
// 直接忽略query事件
return null;
}

// 首先判断是否为系统表
if (StringUtils.equalsIgnoreCase(pipeline.getParameters().getSystemSchema(), schemaName)) {
// do noting
if (eventType.isDdl()) {
return null;
}

if (StringUtils.equalsIgnoreCase(pipeline.getParameters().getSystemDualTable(), tableName)) {
// 心跳表数据直接忽略
return null;
}
} else {
if (eventType.isDdl()) {
boolean notExistReturnNull = false;
if (eventType.isRename()) {
notExistReturnNull = true;
}

DataMedia dataMedia = ConfigHelper.findSourceDataMedia(pipeline,
schemaName,
tableName,
notExistReturnNull);
// 如果EventType是CREATE/ALTER,需要reload
// DataMediaInfo;并且把CREATE/ALTER类型的事件丢弃掉.
if (dataMedia != null && (eventType.isCreate() || eventType.isAlter() || eventType.isRename())) {
DbDialect dbDialect = dbDialectFactory.getDbDialect(pipeline.getId(),
(DbMediaSource) dataMedia.getSource());
dbDialect.reloadTable(schemaName, tableName);// 更新下meta信息
}

// if(23 == pipeline.getId() && ("global_drug_relate_indication".equalsIgnoreCase(entry.getHeader().getTableName())
// || "drug_indications_relation".equalsIgnoreCase(entry.getHeader().getTableName()) )) {
// logger.warn("binlogdebug-interparse ddlSync return");
// }

boolean ddlSync = pipeline.getParameters().getDdlSync();

if (ddlSync) {
// 处理下ddl操作
EventData eventData = new EventData();
eventData.setSchemaName(schemaName);
eventData.setTableName(tableName);
eventData.setEventType(eventType);
eventData.setExecuteTime(entry.getHeader().getExecuteTime());
eventData.setSql(rowChange.getSql());
eventData.setDdlSchemaName(rowChange.getDdlSchemaName());
eventData.setTableId(dataMedia.getId());
return Arrays.asList(eventData);
} else {
return null;
}
}
}

List<EventData> eventDatas = new ArrayList<>();
for (RowData rowData : rowChange.getRowDatasList()) {
// if(23 == pipeline.getId() && ("global_drug_relate_indication".equalsIgnoreCase(entry.getHeader().getTableName())
// || "drug_indications_relation".equalsIgnoreCase(entry.getHeader().getTableName()) )) {
// logger.warn("binlogdebug-interparse internParse execute start");
// }

EventData eventData = internParse(pipeline, entry, rowChange, rowData);

// if(23 == pipeline.getId() && ("global_drug_relate_indication".equalsIgnoreCase(entry.getHeader().getTableName())
// || "drug_indications_relation".equalsIgnoreCase(entry.getHeader().getTableName()) )) {
// logger.warn("binlogdebug-interparse internParse execute end, eventData: {}", eventData);
// }

if (eventData != null) {
eventDatas.add(eventData);
}
}

return eventDatas;
}

这下开始茫然了,DBA那边查看binlog format确实是row模式,但是只有几张表出现这个问题。然后通过分析DBA修改binlog-format的操作过程,发现DBA原来是修改global session方式去修改配置,这种操作理论上是需要业务方重启业务的,然后咨询了业务放是否有无重启业务,发现数据迁移的数据库有好几个应用在用,业务方均没有主动重启。这才确定了应该是没有重启导致的问题。但是为什么只有几张表才出现这个现象呢,涉及数据迁移的几个库涉及到了好几个业务应用,php应用的数据库连接池每天会重连,但是java应用是有连接池的,数据库连接没有重连导致部分连接format未生效。

pb反序列化失败问题

通过阅读pb的源码,发现新版本已经调整了这个限制大小,所以我们通过升级pb即可解决。

aria2c下载502问题

针对数据传输失败问题,查看了aria2c的github的issue,发现有些开发者也出现了这个问题,推荐调整多线程下载的线程数和重试次数可以降低报错。最终我们通过优化参数和调整canal的读取binlog的读取时间和读取大小限制来降低生成文件的大小,但是由于canal调整的参数如果读取大小设置为10,并不能精准的限制大小,只能大概可以减低读取的binlog数量。所以后期我们计划搞一个可以动态根据吞吐量来自动调整canal限制。

增量同步更新时间不一致问题

该问题定位到是由于数据更新频率太快的原因导致的,比如源库在同一时间新增一条数据,又马上对该数据更新,导致canal解析到更新时间字段为没有update,然后同步的时候去除了该更新时间字段,但是该字段又配置了自动更新,导致目标库更新的时候数据库主动将更新时间字段更新了,最终出现的更新时间不一致的问题。虽然otter本身是为了节省同步的数据量,但是由于我们公司需要对数据强一致要求,我们这边就改造了判断时间类型,强制同步。

com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry, com.alibaba.otter.canal.protocol.CanalEntry.RowChange, com.alibaba.otter.canal.protocol.CanalEntry.RowData)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
for (Column column : afterColumns) {
if (isKey(tableHolder, tableName, column)) {
// 获取变更后的主键
keyColumns.put(column.getName(), copyEventColumn(column, true, tableHolder));
} else if (needAllColumns || entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE
|| column.getUpdated() || (!StringUtils.isEmpty(column.getMysqlType()) && "datetime".equalsIgnoreCase(column.getMysqlType()))) {
// 在update操作时,oracle和mysql存放变更的非主键值的方式不同,oracle只有变更的字段;
// mysql会把变更前和变更后的字段都发出来,只需要取有变更的字段.
// 如果是oracle库,after里一定为对应的变更字段
// 增加如果字段类型是datetime,

boolean isUpdate = true;
if (entry.getHeader().getSourceType() == CanalEntry.Type.MYSQL) {
// mysql的after里部分数据为未变更,oracle里after里为变更字段
isUpdate = column.getUpdated();
if(!StringUtils.isEmpty(column.getMysqlType()) && "datetime".equalsIgnoreCase(column.getMysqlType())){
isUpdate = true;
}
}

notKeyColumns.put(column.getName(), copyEventColumn(column, isRowMode || isUpdate, tableHolder));// 如果是rowMode,所有字段都为updated
}
}

总结

针对这次数据迁移出现的问题,增量丢失数据闹了一个大乌龙,经过这次事件,让我们对我们的数据迁移平台更加有信心保证数据安全和高效传输。目前跨机房问题我们差不多有1-3s的同步延时问题,这块计划考虑增加grpc协议优化传输性能问题。