J~杰's Blog

人生就一条路,走一步有一步的景观

0%

JVM内存结构

根据《Java虚拟机规范(Java SE 7版)》规定,Java虚拟机内存结构可划分为以下区域:

  • 程序计数器

    • 程序计数器是一块较小的内存空间,可看作是当前线程所执行的字节码的行号指示器。在虚拟机概念模型里,字节码解释器工作时就是通过改变该计数器的值来选取下一条需要执行的字节码指令,分支、循环、跳转、异常处理、线程恢复等基础功能都需要依赖该计数器来完成。
    • JVM中多线程是通过线程轮流切换并分配处理器执行时间的方式来实现的。即在任何时刻,CPU只会执行一条线程中的指令,因此,为了线程切换后能恢复到正确的执行位置,每条线程都需要有一个独立的程序计数器,各线程之间计数器互不影响,独立存储,该内存为线程私有。
    • 如果线程正在执行一个Java方法,则PC记录的是正在执行的虚拟机字节码指令的地址,如果正在执行的是Native方法,则PC值为Undefined,该内存区域是唯一一个没用OOM的区域。
  • 虚拟机栈

    • Java虚拟机栈也是线程私有的,它的生命周期与线程相同。虚拟机栈描述的是Java方法执行的内存模型:每个方法在执行的时候会创建一个栈帧,用来存储局部变量表、操作数栈、动态链接、方法出口等信息。每个方法从调用至执行完成的过程对应着一个栈帧在虚拟机栈中入栈到出栈的过程.
    • 局部变量表:存放编译期可知的各种基本数据类型(如Boolean、byte、char、short、int、float、long、double)、对象引用类型(如:引用指针、句柄等)。局部变量表所需内存空间在编译期间完成分配,即进入一个方法时,这个方法需要在帧中分配多大的局部变量空间是确定的,在方法运行期间不会改变局部变量表的大小。
    • 异常情况:
      • StackOverflowError异常:线程请求的栈深度大于虚拟机所允许的深度时,会抛出栈上溢异常
      • OutOfMemoryError异常:虚拟机栈动态扩展时无法申请到足够的内存,会抛出内存溢出异常
  • 本地方法栈

    • 发挥的作用与虚拟机栈类似,区别在于虚拟机栈为虚拟机执行Java方法(字节码)服务,而本地方法栈则是为虚拟机使用到的Native方法服务,如Java访问C语言的方法、汇编程序等。
    • 异常情况:与虚拟机栈一样。
    • 堆是Java所管理的内存中最大的一块,是被所有线程共享的一块内存区域,在虚拟机启动时创建,用于存放对象实例,也是垃圾收集器管理的主要区域。
    • 根据GC分代收集算法,堆可细分为:新生代和老年代;新生代又分为Eden区、Survivor区(from,to)从内存分配的角度看,线程共享的Java堆可划分出多个线程私有的分配缓冲区(TLAB:Thread Local Allocation Buffer)
    • 堆内存仅要逻辑上连续即可,物理上不连续也可以,如果在堆中没有内存完成实例分配。并且堆也无法再扩展时,则会抛出OOM异常
  • 方法区

    • 与堆一样,方法区是各线程共享的,用于存储已被虚拟机 加载的类信息、常量、静态变量、即时编译器编译后的代码等数据
    • 对于开发者来说,该区又称为“永久代”Permanent Generation,当方法区无法满足内存分配时,将抛出OOM异常
    Read more »

Otter 跨机房数据同步方案

由于项目需求要把整个线上产品迁移上云,从自建IDC机房的所有服务都迁移至腾讯云。原先考虑采用腾讯云的DTS服务,但是发现他们产品属于公测阶段,在加上腾讯云RDS服务居然没有提供外网访问的tls安全传输保证,这样如果一旦开启外网地址,可能导致数据库被入侵或者攻击。基于以上考虑点,在加上我们同机房数据迁移采用otter的定制版,最终我们决定改造otter,支持跨机房安全传输。otter的改造点详细请查看文章

Otter工作原理

otter工作原理

说明:

  1. 基于Canal开源产品,获取数据库增量日志数据。
  2. 典型管理系统架构,manager(web管理)+node(工作节点)
  3. manager运行时推送同步配置到node节点
  4. node节点将同步状态反馈到manager上
  5. 基于zookeeper,解决分布式状态调度的,允许多node节点之间协同工作。

跨机房原理

跨机房原理

说明:

  1. 数据涉及网络传输,S/E/T/L几个阶段会分散在2个或者更多Node节点上,多个Node之间通过zookeeper进行协同工作 (一般是Select和Extract在一个机房的Node,Transform/Load落在另一个机房的Node)。
  2. node节点可以有failover / loadBalancer. (每个机房的Node节点,都可以是集群,一台或者多台机器)。
Read more »

问题描述

最近我们在性能压测seata Tcc模式中发现了一些问题,seata issue

  1. 业务方tps稳定在200tps左右,但是实际去除seata依赖,tps达到了2000左右;

  2. 性能长时间压测几个小时之后,发现tm端出现了rpc 30s超时问题,导致所有的请求都超时,业务无法响应;

    seata性能压测

    seata性能压测结果

    TM端报错

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    2020-01-16 19:00:19.956 ERROR 19765 --- [o-8873-exec-288] i.s.core.rpc.netty.AbstractRpcRemoting   : wait response error:cost 30000 ms,ip:192.168.202.137:8091,request:timeout=60000,transactionName=test(javax.servlet.http.HttpServletRequest, com.fly.seata.dto.OrderDTO)
    2020-01-16 19:00:19.960 WARN 19765 --- [o-8873-exec-288] i.s.tm.api.DefaultFailureHandlerImpl : Failed to begin transaction.

    io.seata.core.exception.TmTransactionException: RPC timeout
    at io.seata.tm.DefaultTransactionManager.syncCall(DefaultTransactionManager.java:97) ~[seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.tm.DefaultTransactionManager.begin(DefaultTransactionManager.java:53) ~[seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.tm.api.DefaultGlobalTransaction.begin(DefaultGlobalTransaction.java:102) ~[seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.tm.api.TransactionalTemplate.beginTransaction(TransactionalTemplate.java:123) ~[seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.tm.api.TransactionalTemplate.execute(TransactionalTemplate.java:58) ~[seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.spring.annotation.GlobalTransactionalInterceptor.handleGlobalTransaction(GlobalTransactionalInterceptor.java:106) [seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.spring.annotation.GlobalTransactionalInterceptor.invoke(GlobalTransactionalInterceptor.java:81) [seata-all-1.0.0.jar!/:1.0.0]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) [spring-aop-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) [spring-aop-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at com.fly.seata.controller.TestController$$EnhancerBySpringCGLIB$$63e8794f.test(<generated>) [classes!/:1.0-SNAPSHOT]
    at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_181]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_181]

    线程栈信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    "ServerHandlerThread_1_500" #86 daemon prio=5 os_prio=0 tid=0x00007fe594011800 nid=0x6138 waiting on condition [0x00007fe6507c0000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000c6f363c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    "ServerHandlerThread_1_500" #85 daemon prio=5 os_prio=0 tid=0x00007fe580013000 nid=0x6137 waiting on condition [0x00007fe650841000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000c6f363c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    "ServerHandlerThread_1_500" #84 daemon prio=5 os_prio=0 tid=0x00007fe580011000 nid=0x6136 waiting on condition [0x00007fe6508c2000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000c6f363c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    "ServerHandlerThread_1_500" #83 daemon prio=5 os_prio=0 tid=0x00007fe5a4080000 nid=0x6135 waiting on condition [0x00007fe650943000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000c6f363c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    "ServerHandlerThread_1_500" #82 daemon prio=5 os_prio=0 tid=0x00007fe594010000 nid=0x6134 waiting on condition [0x00007fe6509c4000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000c6f363c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    出现超时之后,server端和Tm端、Rm端有建立连接的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    pangpeijie@mdw1:/home/dxy/seata-server/bin$ lsof -i:8091
    COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
    java 24684 pangpeijie 6u IPv4 1673279856 0t0 TCP 192.168.202.137:8091->192.168.202.138:53212 (ESTABLISHED)
    java 24684 pangpeijie 270u IPv4 1591859785 0t0 TCP 192.168.202.137:8091->192.168.202.138:44208 (ESTABLISHED)
    java 24684 pangpeijie 272u IPv4 1548003634 0t0 TCP *:8091 (LISTEN)
    java 24684 pangpeijie 275u IPv4 1571258316 0t0 TCP 192.168.202.137:8091->192.168.202.138:43734 (ESTABLISHED)
    java 24684 pangpeijie 276u IPv4 1571258317 0t0 TCP 192.168.202.137:8091->192.168.202.138:43738 (ESTABLISHED)
    java 24684 pangpeijie 278u IPv4 1571258320 0t0 TCP 192.168.202.137:8091->192.168.202.138:43806 (ESTABLISHED)
    java 24684 pangpeijie 280u IPv4 1571258321 0t0 TCP 192.168.202.137:8091->192.168.202.138:43814 (ESTABLISHED)
    1
    2
    3
    4
    5
    6
    7
    pangpeijie@mdw2:~$ netstat -ano|grep "192.168.202.137:8091"
    tcp 0 0 192.168.202.138:43814 192.168.202.137:8091 ESTABLISHED keepalive (2398.82/0/0)
    tcp 0 0 192.168.202.138:53212 192.168.202.137:8091 ESTABLISHED keepalive (4753.92/0/0)
    tcp 0 0 192.168.202.138:43738 192.168.202.137:8091 ESTABLISHED keepalive (2396.00/0/0)
    tcp 0 0 192.168.202.138:43806 192.168.202.137:8091 ESTABLISHED keepalive (2397.65/0/0)
    tcp 0 0 192.168.202.138:44208 192.168.202.137:8091 ESTABLISHED keepalive (4107.33/0/0)
    tcp 0 0 192.168.202.138:43734 192.168.202.137:8091 ESTABLISHED keepalive (2396.14/0/0)

    其他报错信息请查看github的issue

Read more »

问题描述

最近我们生产上的消息推送服务不断的爆出内存增长过大导致操作系统kill该进程,并且随着业务量的增长出现的频次越来越高,由去年12月底的半个月出现一次,到现在的一天出现一次。

该服务的业务逻辑就是消息推送,业务方推送消息,服务方通过rocketmq做削峰处理(之前使用kafka来左右消息队列没出现过问题),其中主要通道是个推和apns(使用pushy框架)通道,业务方的全局推送大概在5000w用户左右,每全局推送一次,内存就会增长几百mb,直到jvm进程内存达到5g之后,系统就会出现服务请求没响应或者被守护进程重启等问题。

刚开始的快速解决方案也只能重启,虽然重启能解决问题,但是也没找到根本原因!

问题排查

由于我们jvm堆内的初始内存和最大内存都是配置成2g ,但是进程的内存达到5g,那就是说堆外占了3g

-Xms2048m -Xmx2048m

运维平台监控

下面是运维监控平台的cpu、内存、线程资源数据,内存在一直在增长,没有回收释放。

oom内存和cpu

出问题的时候监控应用fullgc情况

fullgc情况

监控jvm虚拟机的线程增长情况

线程增长情况

Read more »

在使用Spring源码Debug的过程中,遇到了一些问题,如下:

问题1. cannot find symbol CoroutinesUtils

1
2
3
4
5
6
Error:(347, 51) java: cannot find symbol
symbol: variable CoroutinesUtils
location: class org.springframework.core.ReactiveAdapterRegistry.CoroutinesRegistrar
Error:(348, 51) java: cannot find symbol
symbol: variable CoroutinesUtils
location: class org.springframework.core.ReactiveAdapterRegistry.CoroutinesRegistrar

问题2. 找不到符号符号: 类 AnnotationTransactionAspect位置: 类 org.springframework.transaction.aspectj.AspectJTransactionManagementConfiguration

问题3. Unexpected AOP exception; nested exception is java.lang.IllegalStateException: Unable to load cache item

1
2
3
4
5
6
7
8
9
10
Caused by: org.springframework.aop.framework.AopConfigException: Unexpected AOP exception; nested exception is java.lang.IllegalStateException: Unable to load cache item
at org.springframework.aop.framework.CglibAopProxy.getProxy(CglibAopProxy.java:214)
at org.springframework.aop.framework.ProxyFactory.getProxy(ProxyFactory.java:110)
at org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator.createProxy(AbstractAutoProxyCreator.java:488)
at org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator.wrapIfNecessary(AbstractAutoProxyCreator.java:367)
at org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator.postProcessAfterInitialization(AbstractAutoProxyCreator.java:310)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsAfterInitialization(AbstractAutowireCapableBeanFactory.java:431)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1861)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:608)
... 10 more
Read more »

1. biglog文件被清理

1.1 报错日志  
1
Could not find first log file name in binary log index file
1.2 现象 pipeline的mainstem状态 一直处于定位中状态 1.3 问题排查 一般出现这个报错,都是由于清空数据库binlog文件导致(自动清除biglog),我们按下述步骤确定是否由于binlog文件被清理: 首先,查看当前同步的binlog位点; 其次,登录数据库查看binlog信息(show master logs); 最后,对比位点信息就能发现biglog丢失; 1.4 处理方法 1)通道停止增量同步; 2)清空掉otter的同步信息; 3)检查canal的同步位点信息; 4)重新启动otter同步;

2. mysql大事物造成otter假死

2.1 报错日志  
    无报错日志

2.2 现象  
    channel状态正常,mainstrm状态正常,但是position信息里,position的信息一直不更新(超过半小时以上)

2.3 问题排查  
    确认是否为大事物的方法  
    1)查询数据库当前位点信息(show master logs);
    2)查询位点的详细信息(show binlog events in 'binlog名称');
    3)查看binlog日志中begin和commit的偏移量相差多大即可;
    
2.4 处理方法
    1)清空掉otter同步信息;
    2)检查canal的同步位点配置;
    3)重新启动otter同步;
    

3. node节点内存溢出

3.1 报错日志
1
java.lang.OutOfMemoryError: unable to create new native thread
3.2 现象 node节点均为运行状态,但是涉及到问题node的channel同步处于挂起状态,且无法解挂及停止; 3.3问题排查 1)根据报错log找到对应的node节点; 2)查看该node节点的日志找到该报错日志; 3.4处理方法 1)重启node节点;
Read more »

数据迁移方案

数据迁移核心思路抽象起来其实很简单,但如何稳定平滑迁移数据,我们会遇到不少问题,如:

  1. 数据如何迁移?
  2. 如何校验数据迁移过程中的正确性?如果发现数据不一致,如何修复?
  3. 我们的业务改造有问题如何回滚?

数据同步中间件选型

开源数据同步中间件主要包括canal、otter、maxwell、kettle等,下面进行简单比对说明。

  • canal:canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费,目前主要支持了MySQL。
  • otter:阿里巴巴旗下的另一款开源项目,始于中美数据同步需求,纯Java开发。可以理解为canal+ETL,对数据抽取进行了扩展,加入自由门、反查等功能,拓展了已经无法从binlog获取的数据来源。同时提供页面的ETL编辑配置功能,方便快速实现带逻辑的业务数据同步。
  • maxwell:Maxwell 是java语言编写的能够读取、解析MySQL binlog,将行更新以json格式发送到 Kafka、RabbitMQ等,有了增量的数据流,可以想象的应用场景实在太多了,如ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等等。
  • kettle:kettle可以实现从不同数据源(excel、数据库、文本文件等)获取数据,然后将数据进行整合、转换处理,可以再将数据输出到指定的位置(excel、数据库、文本文件)等;是B/S架构,多用于数仓作业。

最终结合公司需求,otter功能最强大,二次开发比较方便,最终选择用otter来作为数据同步工具。

重点原理阐述

  • 全量迁移:数据迁移首要目标如何将历史全量数据迁移到新库中,我们利用otter的自由门原理改造支持全量数据同步功能,整个过程都是查询在线库的备库,因此不影响在线业务的数据库服务,自由门原理如下:

    a. 基于otter系统表retl_buffer,插入特定的数据,包含需要同步的表名,pk信息。

    b. otter系统感知后会根据表名和pk提取对应的数据(整行记录),和正常的增量同步一起同步到目标库。

  • 增量迁移:通过模拟mysql slave获取binlog数据,然后通过canal解析增量数据,最终准实时同步数据。

    方案一:先开启全量任务,等全量数据同步完成后,在开启增量任务。由于迁移过程中业务服务一直运行,因此全量迁移完全成,并且要将全量时间点后的数据追回来,这里核心原理是同步全量时间位点后binlog日志数据来保证数据一致性,需要注意的是增量时间需要前移一小断时间(如5分钟),其主要原因是全量迁移启动的那刻会有时间差,需要增量前移来保证数据最终一致性。

    方案二:先开启增量任务,然后开启全量任务,这种方案存在当全量和增量任务同时操作同一条pk数据的时候,就可能会产生丢失更新。推荐使用该方案,操作比较简单,这种场景出现的问题是很小的概率,最终还能通过数据校验服务来找到问题数据,然后用一键修复功能即可。

    增量同步原理如下:

    a. 基于Canal获取数据库增量日志数据。

    b. 利用SETL调度模型实现调度和处理实现。

    c. 基于zookeeper,解决分布式状态调度的,允许多node节点之间协同工作。

    工作原理如下:

  • 反向同步:迁移到新库后,为了保证业务方出现业务回滚,保证源库与目标库数据一致性,需要建立从新库到老库的回流任务,原理跟增量迁移一样,只是变更一下原库及目标库。

  • 一致性校验:通过比对源库和目标库的所有同步字段的crc校验码,如果出现不一致,重新比对3次,如果最终数据不一致,则会将该条数据落库。流程图如下:

  • 一键修复:将一致性校验比对失败的数据,通过全量同步原理来触发数据修复功能。

Read more »

otter简介

Otter的业务域是支持异构数据库实时同步,数据记录变更订阅服务。
Otter需要保障数据库的事务一致性,包括DDL(表结构变更)也可以进行同步或过滤。而DBA天生就在这个坑里,绝对不能让主备不一致、或事务不完整,哪怕只是一条数据。而且DBA迫切希望以后不用通知下游了,让DRC自动适配主备切换或拆库。

定位:基于数据库增量日志解析,准实时同步到本机房或异地机房的mysql/oracle数据库.一个分布式数据库同步系统.

Otter具备的三大特性:

  • 1)稳定性,支持HA;
  • 2)实时性;
  • 3)一致性,数据同步前后必须保证数据的一致性;

我们公司对Otter的需求场景:

  • MySQL原生复制
  • 大数据实时抽取
  • 搜索实时索引
  • 数据迁移

Otter架构

数据同步过程可以分为Select–>Extract、Transform–>Load四个过程,也就是上图中的S、E、T、L,通过将这4个步骤进行服务拆分,每个服务都具有自己的线程池。通过S、L过程的串型,保证数据的一致性,E、T过程的并行提升系统处理的性能。

原理:

  1. 基于Canal开源产品,获取数据库增量日志数据。

  2. 典型管理系统架构,manager(web管理)+node(工作节点)

    a. manager运行时推送同步配置到node节点;
    b. node节点将同步状态反馈到manager上;

  3. 基于zookeeper,解决分布式状态调度的,允许多node节点之间协同工作.

Read more »

背景

在复杂分布式系统中,往往需要对大量的数据和消息进行唯一标识。随着数据日渐增长,对数据分库分表后需要有一个唯一ID来标识一条数据或消息,数据库的自增ID显然不能满足需求,此时一个能够生成全局唯一ID的系统是非常必要的。概括下来,那业务系统对ID号的要求有哪些呢?

  1. 全局唯一性:不能出现重复的ID号,既然是唯一标识,这是最基本的要求。
  2. 趋势递增:在MySQL InnoDB引擎中使用的是聚集索引,由于多数RDBMS使用B-tree的数据结构来存储索引数据,在主键的选择上面我们应该尽量使用有序的主键保证写入性能。
  3. 单调递增:保证下一个ID一定大于上一个ID,例如事务版本号、IM增量消息、排序等特殊需求。
  4. 信息安全:如果ID是连续的,恶意用户的扒取工作就非常容易做了,直接按照顺序下载指定URL即可;如果是订单号就更危险了,竞对可以直接知道我们一天的单量。所以在一些应用场景下,会需要ID无规则、不规则。

上述123对应三类不同的场景,3和4需求还是互斥的,无法使用同一个方案满足。

Read more »

Execution相关的属性的配置:

  • hystrix.command.default.execution.isolation.strategy 隔离策略,默认是Thread, 可选Thread|Semaphore
    • thread 通过线程数量来限制并发请求数,可以提供额外的保护,但有一定的延迟。一般用于网络调用
    • semaphore 通过semaphore count来限制并发请求数,适用于无网络的高并发请求
  • hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds 命令执行超时时间,默认1000ms
  • hystrix.command.default.execution.timeout.enabled 执行是否启用超时,默认启用true
  • hystrix.command.default.execution.isolation.thread.interruptOnTimeout 发生超时是是否中断,默认true
  • hystrix.command.default.execution.isolation.semaphore.maxConcurrentRequests 最大并发请求数,默认10,该参数当使用ExecutionIsolationStrategy.SEMAPHORE策略时才有效。如果达到最大并发请求数,请求会被拒绝。理论上选择semaphore size的原则和选择thread size一致,但选用semaphore时每次执行的单元要比较小且执行速度快(ms级别),否则的话应该用thread。semaphore应该占整个容器(tomcat)的线程池的一小部分。
Read more »