otter输出Rocket-MQ改造

背景

最近业务团队需要实时同步Mysql—>ES数据的需求,本来想基于canal-adapter直接开发业务,后来觉得业务方可能有对binlog的订阅共性,最终我们决定还是在otter上做改造,支持Rocket-Mq输出。

整理的需求点如下:

  • 支持输出平台RocketMQ服务/云上RocketMQ服务;

  • 支持业务方指定某些表,某些表可以指定事件(Insert、Update、Delete事件);

  • 直接全量数据迁移至RocketMQ Topic;

  • 支持跨机房传输到异地RocketMQ;

架构图

改造点

  1. 定义RocketMQ实体,定义一个RocketMQ的目标源。

  2. 改造数据源,支持RocketMQ的配置。

  3. 改造Transerformer模块,支持将数据转换成RocketMQ需要的格式。

    1
    2
    3
    4
    5
    6
    7
    8
    public class RocketMQTransformer  extends AbstractOtterTransformer<EventData, EventData> {

    @Override
    public EventData transform(EventData data, OtterTransformerContext context) {
    data.setPairId(context.getDataMediaPair().getId());
    return data;
    }
    }
  4. 改造Load模块,支持将数据向RocketMQ发送。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    public class RocketMQLoader implements OtterLoader<DbBatch, List<LoadContext>>{

    private static final Logger logger = LoggerFactory.getLogger(RocketMQLoader.class);
    private ExecutorService executorService;
    private static ConfigClientService configClientService;
    private LoadStatsTracker loadStatsTracker;
    /**
    * rocket mq 实例 缓存
    */
    private static Map<Long,RocketMqSender> rocketMqSenderMap = new ConcurrentHashMap<>();

    /**
    * 获取 mq productor
    * @param pipelineId
    * @return
    */
    protected RocketMqSender getRocketMqSender(Long pipelineId){
    RocketMqSender rocketMqSender = rocketMqSenderMap.get(pipelineId);
    if(null == rocketMqSender){
    synchronized (RocketMqSender.class){
    rocketMqSender = rocketMqSenderMap.get(pipelineId);
    if(null == rocketMqSender){
    Pipeline pipeline = configClientService.findPipeline(pipelineId);
    DataMediaPair dataMediaPair = pipeline.getPairs().get(0);
    RocketMqDataMedia rocketMqDataMedia = (RocketMqDataMedia) dataMediaPair.getTarget();
    rocketMqSender = new RocketMqSender(pipelineId,rocketMqDataMedia.getSource());
    try {
    rocketMqSender.start();
    } catch (MQClientException e) {
    throw new RuntimeException("mq start fail,error:"+e.getErrorMessage());
    }
    rocketMqSenderMap.put(pipelineId,rocketMqSender);
    }
    return rocketMqSender;
    }
    }
    if(null == rocketMqSender){
    throw new RuntimeException("获取mq product失败");
    }
    return rocketMqSender;
    }

    /**
    * 删除 rocket mq productor
    * @param pipelineId
    */
    protected static void removeRocketMqSenderByPipeline(Long pipelineId){
    RocketMqSender rocketMqSender = rocketMqSenderMap.remove(pipelineId);
    if(null == rocketMqSender){
    return;
    }
    rocketMqSender.shutdown();
    logger.info("pipeline id:{} rocket mq shutdown",pipelineId);
    }

    @Override
    public List<LoadContext> load(DbBatch data) {
    final RowBatch rowBatch = data.getRowBatch();
    boolean existRowBatch = (rowBatch != null && !CollectionUtils.isEmpty(rowBatch.getDatas()));
    List<Future> futures = new ArrayList<Future>();
    ExecutorCompletionService completionService = new ExecutorCompletionService(executorService);
    if (existRowBatch) {
    submitRowBatch(futures, completionService, rowBatch);
    }

    // 先获取一下异步处理的结果,记录一下出错的index
    List<LoadContext> processedContexts = new ArrayList<LoadContext>();
    int index = 0;
    LoadException exception = null;
    while (index < futures.size()) {
    try {
    Future future = completionService.take();// 它也可能被打断
    future.get();
    } catch (InterruptedException e) {
    exception = new LoadException(e);
    logger.error("线程中断异常",e);
    break;
    } catch (ExecutionException e) {
    exception = new LoadException(e);
    logger.error("load 执行异常",e);
    break;
    }
    index++;
    }

    // 任何一个线程返回,出现了异常,就退出整个调度
    if (index < futures.size()) {// 小于代表有错误,需要对未完成的记录进行cancel操作,对已完成的结果进行收集,做重复录入过滤记录
    for (int errorIndex = 0; errorIndex < futures.size(); errorIndex++) {
    Future future = futures.get(errorIndex);
    if (!future.isDone()) {
    future.cancel(true); // 对未完成的进行取消
    }
    }
    }else{
    for (int i = 0; i < futures.size(); i++) {// 收集一下正确处理完成的结果
    Future future = futures.get(i);
    try {
    LoadContext loadContext = (LoadContext) future.get();
    processedContexts.add((RocketMqLoadContext) loadContext);
    } catch (InterruptedException e) {
    // ignore
    } catch (ExecutionException e) {
    // ignore
    }
    }
    // 统计信息
    for(EventData eventData:rowBatch.getDatas()){
    processStat(eventData,rowBatch.getIdentity());
    }
    }

    if (exception != null) {
    throw exception;
    } else {
    return processedContexts;
    }
    }

    private void submitRowBatch(List<Future> futures, ExecutorCompletionService completionService,
    final RowBatch rowBatch) {
    // 按照table name 分割,然后分批异步发送消息
    Map<String,List<EventData>> mapList = new HashMap<>();
    for(EventData eventData:rowBatch.getDatas()){
    String key = eventData.getSchemaName()+"_"+eventData.getTableName();
    List<EventData> list = mapList.get(key);
    if(null == list){
    List<EventData> temp = new ArrayList<>();
    temp.add(eventData);
    mapList.put(key,temp);
    }else{
    list.add(eventData);
    }
    }
    Long pipelineId = rowBatch.getIdentity().getPipelineId();
    Pipeline pipeline = configClientService.findPipeline(pipelineId);
    DataMediaPair dataMediaPair = pipeline.getPairs().get(0);
    RocketMqDataMedia rocketMqDataMedia = (RocketMqDataMedia) dataMediaPair.getTarget();
    RocketMqSender rocketMqSender = getRocketMqSender(pipelineId);
    RocketMqLoadContext context = new RocketMqLoadContext();
    for(Map.Entry<String,List<EventData>> temp:mapList.entrySet()){
    String topic = rocketMqDataMedia.getNamespace();
    String tag = rocketMqDataMedia.getName();
    String key = "";
    String orderKey = temp.getKey();
    List<EventData> eventDatas = temp.getValue();
    futures.add(completionService.submit(new Callable<RocketMqLoadContext>() {
    public RocketMqLoadContext call() throws Exception {
    for(EventData eventData:eventDatas){
    MqMessage mqMessage = converMqMessage(eventData);
    if(null == mqMessage){
    continue;
    }
    String message = JSON.toJSONString(mqMessage);
    rocketMqSender.send(topic,tag,key,message,orderKey);
    }
    return context;
    }
    }));
    }
    }

    private void processStat(EventData data, Identity identity) {
    LoadThroughput throughput = loadStatsTracker.getStat(identity);
    LoadCounter counter = throughput.getStat(data.getPairId());
    EventType type = data.getEventType();
    if (type.isInsert()) {
    counter.getInsertCount().incrementAndGet();
    } else if (type.isUpdate()) {
    counter.getUpdateCount().incrementAndGet();
    } else if (type.isDelete()) {
    counter.getDeleteCount().incrementAndGet();
    }
    counter.getRowCount().incrementAndGet();
    counter.getRowSize().addAndGet(data.getSize());
    }

    public void setExecutorService(ExecutorService executorService) {
    this.executorService = executorService;
    }

    public void setConfigClientService(
    ConfigClientService configClientService) {
    this.configClientService = configClientService;
    }

    /**
    * 消息体转换
    * @param eventData
    */
    private MqMessage converMqMessage(EventData eventData){
    // 过滤不支持类型
    if(eventData.getEventType() != EventType.INSERT && eventData.getEventType() != EventType.UPDATE &&
    eventData.getEventType() != EventType.DELETE){
    return null;
    }
    MqMessage mqMessage = new MqMessage();
    mqMessage.setDatabaseName(eventData.getSchemaName());
    mqMessage.setTableName(eventData.getTableName());
    mqMessage.setEventType(eventData.getEventType().toString());
    mqMessage.setExecuteTime(eventData.getExecuteTime());

    // 主键名
    List<String> pkNames = new ArrayList<>();
    // 只作用于 eventType为update类型,用于变更前的主键值,如果是insert/delete变更前和变更后的主键值是一样的.
    List<Map<String, String>> oldKeys = new ArrayList<>();
    // 所有字段的值
    List<Map<String, String>> datas = new ArrayList<>();
    // 字段类型,类型请参考 java.sql.Types
    List<Map<String,Integer>> columnTypes = new ArrayList<>();
    if(null != eventData.getKeys() && eventData.getKeys().size() > 0){
    for(EventColumn column:eventData.getKeys()){
    pkNames.add(column.getColumnName());
    Map<String, String> tempData = new HashMap<String,String>();
    tempData.put(column.getColumnName(),column.getColumnValue());
    datas.add(tempData);
    Map<String,Integer> columnType = new HashMap<>();
    columnType.put(column.getColumnName(),column.getColumnType());
    columnTypes.add(columnType);
    }
    }
    if(null != eventData.getColumns() && eventData.getColumns().size() > 0){
    for(EventColumn column:eventData.getColumns()){
    Map<String, String> tempData = new HashMap<String,String>();
    tempData.put(column.getColumnName(),column.getColumnValue());
    datas.add(tempData);
    Map<String,Integer> columnType = new HashMap<>();
    columnType.put(column.getColumnName(),column.getColumnType());
    columnTypes.add(columnType);
    }
    }
    if(null != eventData.getOldKeys() && eventData.getOldKeys().size() > 0){
    for(EventColumn column:eventData.getOldKeys()){
    Map<String, String> tempData = new HashMap<String,String>();
    tempData.put(column.getColumnName(),column.getColumnValue());
    oldKeys.add(tempData);
    }
    }
    mqMessage.setPkNames(pkNames);
    mqMessage.setOldKeys(oldKeys);
    mqMessage.setDatas(datas);
    mqMessage.setColumnTypes(columnTypes);
    return mqMessage;
    }

    /**
    * 销毁rocket mq 生产者
    * @param pipelineId
    */
    public static void stopRocketMqSender(Long pipelineId){
    removeRocketMqSenderByPipeline(pipelineId);
    }

    public LoadStatsTracker getLoadStatsTracker() {
    return loadStatsTracker;
    }

    public void setLoadStatsTracker(LoadStatsTracker loadStatsTracker) {
    this.loadStatsTracker = loadStatsTracker;
    }
    }

总结

经过这次改造Rocket-MQ支持,发现开发代码中其实已经有对异构介质输出做了一层扩展性,只需要重写新的扩展即可。