背景 前几周给insgin团队业做跨机房数据迁移过程中,出现了一系列的数据,主要问题如下:
部分表未同步/增量丢失问题 ;
 那有些人可能会问为什么不是全量迁移导致数据不一致问题?那是因为我们的数据迁移操作流程,是增量数据和全量数据同时开启更新的,当全量迁移完成时候,我们通过增量通道的偏移量追赶到最新binlog偏移量时候,会触发全量数据校验过程,在校验完成一致后,我们会修复不一致的数据,直到数据准实时一致。数据修复完成后,接下来就是每天数据校验,我们会每天晚上自动开启数据校验流程,然后持续观察数据是否一致。我们出现的现象就是某几张表一致增量丢失现象。
 
跨机房传输RPC是采用Http协议多线程下载问题,下载过程中出现pb反序列化问题和下载502问题;
pb反序列化问题主要是2.6版本的pb反序列化有大小64m限制,由于某些表存在大字段问题,批量读取binlog可能导致序列化生成的文件过大导致反序列化问题。下载502报错采用多线程下载器aria2c在下载文件比较大的情况下经常出现下载失败问题。
 
数据同步更新时间不一致问题;
发现更新频繁的表经常出现更新时间不一致问题。
 
数据校验出现超时校验导致校验失败问题;
某一些表,数据量在50w左右,但是出现了校验超时失败问题。
 
 
原因 增量丢失问题 针对2张表增量丢失更新问题,我们差不多定位了1周左右,中间连周末都去加班找问题了。由于线下环境一直模拟不出来问题,线上由于otter定于的表差不多400多张,同步量还是挺大的,debug日志也不好开启。所以最终我们只能通过针对问题表的日志输出具体canal解析到的binlog信息。从SelectTask开始加日志,到MessageParse加日志,最终定位到,出问题的binlog信息没有rowchange信息,导致无法解析到数据变更前和变更后的信息,然后通过binlog的偏移量去查mysql的binlog信息,发现给binlog信息居然真的只有sql语句,没有变更前和变更后的数据。
com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120  private  List<EventData> internParse (Pipeline pipeline, Entry entry)           RowChange rowChange = null ;         try  {             rowChange = RowChange.parseFrom(entry.getStoreValue());         } catch  (Exception e) {             throw  new  SelectException("parser of canal-event has an error , data:"  + entry.toString(), e);         }         if  (rowChange == null ) {             return  null ;         }         String schemaName = entry.getHeader().getSchemaName();         String tableName = entry.getHeader().getTableName();         EventType eventType = EventType.valueOf(rowChange.getEventType().name());                  if  (eventType.isQuery()) {                          return  null ;         }                  if  (StringUtils.equalsIgnoreCase(pipeline.getParameters().getSystemSchema(), schemaName)) {                          if  (eventType.isDdl()) {                 return  null ;             }             if  (StringUtils.equalsIgnoreCase(pipeline.getParameters().getSystemDualTable(), tableName)) {                                  return  null ;             }         } else  {             if  (eventType.isDdl()) {                 boolean  notExistReturnNull = false ;                 if  (eventType.isRename()) {                     notExistReturnNull = true ;                 }                 DataMedia dataMedia = ConfigHelper.findSourceDataMedia(pipeline,                     schemaName,                     tableName,                     notExistReturnNull);                                                   if  (dataMedia != null  && (eventType.isCreate() || eventType.isAlter() || eventType.isRename())) {                     DbDialect dbDialect = dbDialectFactory.getDbDialect(pipeline.getId(),                         (DbMediaSource) dataMedia.getSource());                     dbDialect.reloadTable(schemaName, tableName);                 }                 boolean  ddlSync = pipeline.getParameters().getDdlSync();                 if  (ddlSync) {                                          EventData eventData = new  EventData();                     eventData.setSchemaName(schemaName);                     eventData.setTableName(tableName);                     eventData.setEventType(eventType);                     eventData.setExecuteTime(entry.getHeader().getExecuteTime());                     eventData.setSql(rowChange.getSql());                     eventData.setDdlSchemaName(rowChange.getDdlSchemaName());                     eventData.setTableId(dataMedia.getId());                     return  Arrays.asList(eventData);                 } else  {                     return  null ;                 }             }         }         List<EventData> eventDatas = new  ArrayList<>();         for  (RowData rowData : rowChange.getRowDatasList()) {             EventData eventData = internParse(pipeline, entry, rowChange, rowData);             if  (eventData != null ) {                 eventDatas.add(eventData);             }         }         return  eventDatas;     } 
这下开始茫然了,DBA那边查看binlog format确实是row模式,但是只有几张表出现这个问题。然后通过分析DBA修改binlog-format的操作过程,发现DBA原来是修改global session方式去修改配置,这种操作理论上是需要业务方重启业务的,然后咨询了业务放是否有无重启业务,发现数据迁移的数据库有好几个应用在用,业务方均没有主动重启。这才确定了应该是没有重启导致的问题。但是为什么只有几张表才出现这个现象呢,涉及数据迁移的几个库涉及到了好几个业务应用,php应用的数据库连接池每天会重连,但是java应用是有连接池的,数据库连接没有重连导致部分连接format未生效。
pb反序列化失败问题 通过阅读pb的源码,发现新版本已经调整了这个限制大小,所以我们通过升级pb即可解决。
aria2c下载502问题 针对数据传输失败问题,查看了aria2c的github的issue,发现有些开发者也出现了这个问题,推荐调整多线程下载的线程数和重试次数可以降低报错。最终我们通过优化参数和调整canal的读取binlog的读取时间和读取大小限制来降低生成文件的大小,但是由于canal调整的参数如果读取大小设置为10,并不能精准的限制大小,只能大概可以减低读取的binlog数量。所以后期我们计划搞一个可以动态根据吞吐量来自动调整canal限制。
增量同步更新时间不一致问题 该问题定位到是由于数据更新频率太快的原因导致的,比如源库在同一时间新增一条数据,又马上对该数据更新,导致canal解析到更新时间字段为没有update,然后同步的时候去除了该更新时间字段,但是该字段又配置了自动更新,导致目标库更新的时候数据库主动将更新时间字段更新了,最终出现的更新时间不一致的问题。虽然otter本身是为了节省同步的数据量,但是由于我们公司需要对数据强一致要求,我们这边就改造了判断时间类型,强制同步。
com.alibaba.otter.node.etl.select.selector.MessageParser#internParse(com.alibaba.otter.shared.common.model.config.pipeline.Pipeline, com.alibaba.otter.canal.protocol.CanalEntry.Entry, com.alibaba.otter.canal.protocol.CanalEntry.RowChange, com.alibaba.otter.canal.protocol.CanalEntry.RowData)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 for  (Column column : afterColumns) {  if  (isKey(tableHolder, tableName, column)) {          keyColumns.put(column.getName(), copyEventColumn(column, true , tableHolder));   } else  if  (needAllColumns || entry.getHeader().getSourceType() == CanalEntry.Type.ORACLE              || column.getUpdated() || (!StringUtils.isEmpty(column.getMysqlType()) && "datetime" .equalsIgnoreCase(column.getMysqlType()))) {                         boolean  isUpdate = true ;     if  (entry.getHeader().getSourceType() == CanalEntry.Type.MYSQL) {              isUpdate = column.getUpdated();       if (!StringUtils.isEmpty(column.getMysqlType()) && "datetime" .equalsIgnoreCase(column.getMysqlType())){         isUpdate = true ;       }     }     notKeyColumns.put(column.getName(), copyEventColumn(column, isRowMode || isUpdate, tableHolder));   } } 
总结 针对这次数据迁移出现的问题,增量丢失数据闹了一个大乌龙,经过这次事件,让我们对我们的数据迁移平台更加有信心保证数据安全和高效传输。目前跨机房问题我们差不多有1-3s的同步延时问题,这块计划考虑增加grpc协议优化传输性能问题。