背景 前几周给insgin团队业做跨机房数据迁移过程中,出现了一系列的数据,主要问题如下:
部分表未同步/增量丢失问题 ;
那有些人可能会问为什么不是全量迁移导致数据不一致问题?那是因为我们的数据迁移操作流程,是增量数据和全量数据同时开启更新的,当全量迁移完成时候,我们通过增量通道的偏移量追赶到最新binlog偏移量时候,会触发全量数据校验过程,在校验完成一致后,我们会修复不一致的数据,直到数据准实时一致。数据修复完成后,接下来就是每天数据校验,我们会每天晚上自动开启数据校验流程,然后持续观察数据是否一致。我们出现的现象就是某几张表一致增量丢失现象。
跨机房传输RPC是采用Http协议多线程下载问题,下载过程中出现pb反序列化问题和下载502问题;
pb反序列化问题主要是2.6版本的pb反序列化有大小64m限制,由于某些表存在大字段问题,批量读取binlog可能导致序列化生成的文件过大导致反序列化问题。下载502报错采用多线程下载器aria2c在下载文件比较大的情况下经常出现下载失败问题。
数据同步更新时间不一致问题;
发现更新频繁的表经常出现更新时间不一致问题。
数据校验出现超时校验导致校验失败问题;
某一些表,数据量在50w左右,但是出现了校验超时失败问题。
原因 增量丢失问题 针对2张表增量丢失更新问题,我们差不多定位了1周左右,中间连周末都去加班找问题了。由于线下环境一直模拟不出来问题,线上由于otter定于的表差不多400多张,同步量还是挺大的,debug日志也不好开启。所以最终我们只能通过针对问题表的日志输出具体canal解析到的binlog信息。从SelectTask开始加日志,到MessageParse加日志,最终定位到,出问题的binlog信息没有rowchange信息,导致无法解析到数据变更前和变更后的信息,然后通过binlog的偏移量去查mysql的binlog信息,发现给binlog信息居然真的只有sql语句,没有变更前和变更后的数据。
com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 private List<EventData> internParse (Pipeline pipeline, Entry entry) { RowChange rowChange = null ; try { rowChange = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new SelectException("parser of canal-event has an error , data:" + entry.toString(), e); } if (rowChange == null ) { return null ; } String schemaName = entry.getHeader().getSchemaName(); String tableName = entry.getHeader().getTableName(); EventType eventType = EventType.valueOf(rowChange.getEventType().name()); if (eventType.isQuery()) { return null ; } if (StringUtils.equalsIgnoreCase(pipeline.getParameters().getSystemSchema(), schemaName)) { if (eventType.isDdl()) { return null ; } if (StringUtils.equalsIgnoreCase(pipeline.getParameters().getSystemDualTable(), tableName)) { return null ; } } else { if (eventType.isDdl()) { boolean notExistReturnNull = false ; if (eventType.isRename()) { notExistReturnNull = true ; } DataMedia dataMedia = ConfigHelper.findSourceDataMedia(pipeline, schemaName, tableName, notExistReturnNull); if (dataMedia != null && (eventType.isCreate() || eventType.isAlter() || eventType.isRename())) { DbDialect dbDialect = dbDialectFactory.getDbDialect(pipeline.getId(), (DbMediaSource) dataMedia.getSource()); dbDialect.reloadTable(schemaName, tableName); } boolean ddlSync = pipeline.getParameters().getDdlSync(); if (ddlSync) { EventData eventData = new EventData(); eventData.setSchemaName(schemaName); eventData.setTableName(tableName); eventData.setEventType(eventType); eventData.setExecuteTime(entry.getHeader().getExecuteTime()); eventData.setSql(rowChange.getSql()); eventData.setDdlSchemaName(rowChange.getDdlSchemaName()); eventData.setTableId(dataMedia.getId()); return Arrays.asList(eventData); } else { return null ; } } } List<EventData> eventDatas = new ArrayList<>(); for (RowData rowData : rowChange.getRowDatasList()) { EventData eventData = internParse(pipeline, entry, rowChange, rowData); if (eventData != null ) { eventDatas.add(eventData); } } return eventDatas; }
这下开始茫然了,DBA那边查看binlog format确实是row模式,但是只有几张表出现这个问题。然后通过分析DBA修改binlog-format的操作过程,发现DBA原来是修改global session方式去修改配置,这种操作理论上是需要业务方重启业务的,然后咨询了业务放是否有无重启业务,发现数据迁移的数据库有好几个应用在用,业务方均没有主动重启。这才确定了应该是没有重启导致的问题。但是为什么只有几张表才出现这个现象呢,涉及数据迁移的几个库涉及到了好几个业务应用,php应用的数据库连接池每天会重连,但是java应用是有连接池的,数据库连接没有重连导致部分连接format未生效。
pb反序列化失败问题 通过阅读pb的源码,发现新版本已经调整了这个限制大小,所以我们通过升级pb即可解决。
aria2c下载502问题 针对数据传输失败问题,查看了aria2c的github的issue,发现有些开发者也出现了这个问题,推荐调整多线程下载的线程数和重试次数可以降低报错。最终我们通过优化参数和调整canal的读取binlog的读取时间和读取大小限制来降低生成文件的大小,但是由于canal调整的参数如果读取大小设置为10,并不能精准的限制大小,只能大概可以减低读取的binlog数量。所以后期我们计划搞一个可以动态根据吞吐量来自动调整canal限制。
增量同步更新时间不一致问题 该问题定位到是由于数据更新频率太快的原因导致的,比如源库在同一时间新增一条数据,又马上对该数据更新,导致canal解析到更新时间字段为没有update,然后同步的时候去除了该更新时间字段,但是该字段又配置了自动更新,导致目标库更新的时候数据库主动将更新时间字段更新了,最终出现的更新时间不一致的问题。虽然otter本身是为了节省同步的数据量,但是由于我们公司需要对数据强一致要求,我们这边就改造了判断时间类型,强制同步。
com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry, com.alibaba.otter.canal.protocol.CanalEntry.RowChange, com.alibaba.otter.canal.protocol.CanalEntry.RowData)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 for (Column column : afterColumns) { if (isKey(tableHolder, tableName, column)) { keyColumns.put(column.getName(), copyEventColumn(column, true , tableHolder)); } else if (needAllColumns || entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE || column.getUpdated() || (!StringUtils.isEmpty(column.getMysqlType()) && "datetime" .equalsIgnoreCase(column.getMysqlType()))) { boolean isUpdate = true ; if (entry.getHeader().getSourceType() == CanalEntry.Type.MYSQL) { isUpdate = column.getUpdated(); if (!StringUtils.isEmpty(column.getMysqlType()) && "datetime" .equalsIgnoreCase(column.getMysqlType())){ isUpdate = true ; } } notKeyColumns.put(column.getName(), copyEventColumn(column, isRowMode || isUpdate, tableHolder)); } }
总结 针对这次数据迁移出现的问题,增量丢失数据闹了一个大乌龙,经过这次事件,让我们对我们的数据迁移平台更加有信心保证数据安全和高效传输。目前跨机房问题我们差不多有1-3s的同步延时问题,这块计划考虑增加grpc协议优化传输性能问题。