otter输出Rocket-MQ改造
背景
最近业务团队需要实时同步Mysql—>ES数据的需求,本来想基于canal-adapter直接开发业务,后来觉得业务方可能有对binlog的订阅共性,最终我们决定还是在otter上做改造,支持Rocket-Mq输出。
整理的需求点如下:
支持输出平台RocketMQ服务/云上RocketMQ服务;
支持业务方指定某些表,某些表可以指定事件(Insert、Update、Delete事件);
直接全量数据迁移至RocketMQ Topic;
支持跨机房传输到异地RocketMQ;
架构图
改造点
定义RocketMQ实体,定义一个RocketMQ的目标源。
改造数据源,支持RocketMQ的配置。
改造Transerformer模块,支持将数据转换成RocketMQ需要的格式。
1
2
3
4
5
6
7
8public class RocketMQTransformer extends AbstractOtterTransformer<EventData, EventData> {
public EventData transform(EventData data, OtterTransformerContext context) {
data.setPairId(context.getDataMediaPair().getId());
return data;
}
}改造Load模块,支持将数据向RocketMQ发送。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260public class RocketMQLoader implements OtterLoader<DbBatch, List<LoadContext>>{
private static final Logger logger = LoggerFactory.getLogger(RocketMQLoader.class);
private ExecutorService executorService;
private static ConfigClientService configClientService;
private LoadStatsTracker loadStatsTracker;
/**
* rocket mq 实例 缓存
*/
private static Map<Long,RocketMqSender> rocketMqSenderMap = new ConcurrentHashMap<>();
/**
* 获取 mq productor
* @param pipelineId
* @return
*/
protected RocketMqSender getRocketMqSender(Long pipelineId){
RocketMqSender rocketMqSender = rocketMqSenderMap.get(pipelineId);
if(null == rocketMqSender){
synchronized (RocketMqSender.class){
rocketMqSender = rocketMqSenderMap.get(pipelineId);
if(null == rocketMqSender){
Pipeline pipeline = configClientService.findPipeline(pipelineId);
DataMediaPair dataMediaPair = pipeline.getPairs().get(0);
RocketMqDataMedia rocketMqDataMedia = (RocketMqDataMedia) dataMediaPair.getTarget();
rocketMqSender = new RocketMqSender(pipelineId,rocketMqDataMedia.getSource());
try {
rocketMqSender.start();
} catch (MQClientException e) {
throw new RuntimeException("mq start fail,error:"+e.getErrorMessage());
}
rocketMqSenderMap.put(pipelineId,rocketMqSender);
}
return rocketMqSender;
}
}
if(null == rocketMqSender){
throw new RuntimeException("获取mq product失败");
}
return rocketMqSender;
}
/**
* 删除 rocket mq productor
* @param pipelineId
*/
protected static void removeRocketMqSenderByPipeline(Long pipelineId){
RocketMqSender rocketMqSender = rocketMqSenderMap.remove(pipelineId);
if(null == rocketMqSender){
return;
}
rocketMqSender.shutdown();
logger.info("pipeline id:{} rocket mq shutdown",pipelineId);
}
public List<LoadContext> load(DbBatch data) {
final RowBatch rowBatch = data.getRowBatch();
boolean existRowBatch = (rowBatch != null && !CollectionUtils.isEmpty(rowBatch.getDatas()));
List<Future> futures = new ArrayList<Future>();
ExecutorCompletionService completionService = new ExecutorCompletionService(executorService);
if (existRowBatch) {
submitRowBatch(futures, completionService, rowBatch);
}
// 先获取一下异步处理的结果,记录一下出错的index
List<LoadContext> processedContexts = new ArrayList<LoadContext>();
int index = 0;
LoadException exception = null;
while (index < futures.size()) {
try {
Future future = completionService.take();// 它也可能被打断
future.get();
} catch (InterruptedException e) {
exception = new LoadException(e);
logger.error("线程中断异常",e);
break;
} catch (ExecutionException e) {
exception = new LoadException(e);
logger.error("load 执行异常",e);
break;
}
index++;
}
// 任何一个线程返回,出现了异常,就退出整个调度
if (index < futures.size()) {// 小于代表有错误,需要对未完成的记录进行cancel操作,对已完成的结果进行收集,做重复录入过滤记录
for (int errorIndex = 0; errorIndex < futures.size(); errorIndex++) {
Future future = futures.get(errorIndex);
if (!future.isDone()) {
future.cancel(true); // 对未完成的进行取消
}
}
}else{
for (int i = 0; i < futures.size(); i++) {// 收集一下正确处理完成的结果
Future future = futures.get(i);
try {
LoadContext loadContext = (LoadContext) future.get();
processedContexts.add((RocketMqLoadContext) loadContext);
} catch (InterruptedException e) {
// ignore
} catch (ExecutionException e) {
// ignore
}
}
// 统计信息
for(EventData eventData:rowBatch.getDatas()){
processStat(eventData,rowBatch.getIdentity());
}
}
if (exception != null) {
throw exception;
} else {
return processedContexts;
}
}
private void submitRowBatch(List<Future> futures, ExecutorCompletionService completionService,
final RowBatch rowBatch) {
// 按照table name 分割,然后分批异步发送消息
Map<String,List<EventData>> mapList = new HashMap<>();
for(EventData eventData:rowBatch.getDatas()){
String key = eventData.getSchemaName()+"_"+eventData.getTableName();
List<EventData> list = mapList.get(key);
if(null == list){
List<EventData> temp = new ArrayList<>();
temp.add(eventData);
mapList.put(key,temp);
}else{
list.add(eventData);
}
}
Long pipelineId = rowBatch.getIdentity().getPipelineId();
Pipeline pipeline = configClientService.findPipeline(pipelineId);
DataMediaPair dataMediaPair = pipeline.getPairs().get(0);
RocketMqDataMedia rocketMqDataMedia = (RocketMqDataMedia) dataMediaPair.getTarget();
RocketMqSender rocketMqSender = getRocketMqSender(pipelineId);
RocketMqLoadContext context = new RocketMqLoadContext();
for(Map.Entry<String,List<EventData>> temp:mapList.entrySet()){
String topic = rocketMqDataMedia.getNamespace();
String tag = rocketMqDataMedia.getName();
String key = "";
String orderKey = temp.getKey();
List<EventData> eventDatas = temp.getValue();
futures.add(completionService.submit(new Callable<RocketMqLoadContext>() {
public RocketMqLoadContext call() throws Exception {
for(EventData eventData:eventDatas){
MqMessage mqMessage = converMqMessage(eventData);
if(null == mqMessage){
continue;
}
String message = JSON.toJSONString(mqMessage);
rocketMqSender.send(topic,tag,key,message,orderKey);
}
return context;
}
}));
}
}
private void processStat(EventData data, Identity identity) {
LoadThroughput throughput = loadStatsTracker.getStat(identity);
LoadCounter counter = throughput.getStat(data.getPairId());
EventType type = data.getEventType();
if (type.isInsert()) {
counter.getInsertCount().incrementAndGet();
} else if (type.isUpdate()) {
counter.getUpdateCount().incrementAndGet();
} else if (type.isDelete()) {
counter.getDeleteCount().incrementAndGet();
}
counter.getRowCount().incrementAndGet();
counter.getRowSize().addAndGet(data.getSize());
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
public void setConfigClientService(
ConfigClientService configClientService) {
this.configClientService = configClientService;
}
/**
* 消息体转换
* @param eventData
*/
private MqMessage converMqMessage(EventData eventData){
// 过滤不支持类型
if(eventData.getEventType() != EventType.INSERT && eventData.getEventType() != EventType.UPDATE &&
eventData.getEventType() != EventType.DELETE){
return null;
}
MqMessage mqMessage = new MqMessage();
mqMessage.setDatabaseName(eventData.getSchemaName());
mqMessage.setTableName(eventData.getTableName());
mqMessage.setEventType(eventData.getEventType().toString());
mqMessage.setExecuteTime(eventData.getExecuteTime());
// 主键名
List<String> pkNames = new ArrayList<>();
// 只作用于 eventType为update类型,用于变更前的主键值,如果是insert/delete变更前和变更后的主键值是一样的.
List<Map<String, String>> oldKeys = new ArrayList<>();
// 所有字段的值
List<Map<String, String>> datas = new ArrayList<>();
// 字段类型,类型请参考 java.sql.Types
List<Map<String,Integer>> columnTypes = new ArrayList<>();
if(null != eventData.getKeys() && eventData.getKeys().size() > 0){
for(EventColumn column:eventData.getKeys()){
pkNames.add(column.getColumnName());
Map<String, String> tempData = new HashMap<String,String>();
tempData.put(column.getColumnName(),column.getColumnValue());
datas.add(tempData);
Map<String,Integer> columnType = new HashMap<>();
columnType.put(column.getColumnName(),column.getColumnType());
columnTypes.add(columnType);
}
}
if(null != eventData.getColumns() && eventData.getColumns().size() > 0){
for(EventColumn column:eventData.getColumns()){
Map<String, String> tempData = new HashMap<String,String>();
tempData.put(column.getColumnName(),column.getColumnValue());
datas.add(tempData);
Map<String,Integer> columnType = new HashMap<>();
columnType.put(column.getColumnName(),column.getColumnType());
columnTypes.add(columnType);
}
}
if(null != eventData.getOldKeys() && eventData.getOldKeys().size() > 0){
for(EventColumn column:eventData.getOldKeys()){
Map<String, String> tempData = new HashMap<String,String>();
tempData.put(column.getColumnName(),column.getColumnValue());
oldKeys.add(tempData);
}
}
mqMessage.setPkNames(pkNames);
mqMessage.setOldKeys(oldKeys);
mqMessage.setDatas(datas);
mqMessage.setColumnTypes(columnTypes);
return mqMessage;
}
/**
* 销毁rocket mq 生产者
* @param pipelineId
*/
public static void stopRocketMqSender(Long pipelineId){
removeRocketMqSenderByPipeline(pipelineId);
}
public LoadStatsTracker getLoadStatsTracker() {
return loadStatsTracker;
}
public void setLoadStatsTracker(LoadStatsTracker loadStatsTracker) {
this.loadStatsTracker = loadStatsTracker;
}
}
总结
经过这次改造Rocket-MQ支持,发现开发代码中其实已经有对异构介质输出做了一层扩展性,只需要重写新的扩展即可。