seata客户端响应超时问题

问题描述

最近我们在性能压测seata Tcc模式中发现了一些问题,seata issue

  1. 业务方tps稳定在200tps左右,但是实际去除seata依赖,tps达到了2000左右;

  2. 性能长时间压测几个小时之后,发现tm端出现了rpc 30s超时问题,导致所有的请求都超时,业务无法响应;

    seata性能压测

    seata性能压测结果

    TM端报错

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    2020-01-16 19:00:19.956 ERROR 19765 --- [o-8873-exec-288] i.s.core.rpc.netty.AbstractRpcRemoting   : wait response error:cost 30000 ms,ip:192.168.202.137:8091,request:timeout=60000,transactionName=test(javax.servlet.http.HttpServletRequest, com.fly.seata.dto.OrderDTO)
    2020-01-16 19:00:19.960 WARN 19765 --- [o-8873-exec-288] i.s.tm.api.DefaultFailureHandlerImpl : Failed to begin transaction.

    io.seata.core.exception.TmTransactionException: RPC timeout
    at io.seata.tm.DefaultTransactionManager.syncCall(DefaultTransactionManager.java:97) ~[seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.tm.DefaultTransactionManager.begin(DefaultTransactionManager.java:53) ~[seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.tm.api.DefaultGlobalTransaction.begin(DefaultGlobalTransaction.java:102) ~[seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.tm.api.TransactionalTemplate.beginTransaction(TransactionalTemplate.java:123) ~[seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.tm.api.TransactionalTemplate.execute(TransactionalTemplate.java:58) ~[seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.spring.annotation.GlobalTransactionalInterceptor.handleGlobalTransaction(GlobalTransactionalInterceptor.java:106) [seata-all-1.0.0.jar!/:1.0.0]
    at io.seata.spring.annotation.GlobalTransactionalInterceptor.invoke(GlobalTransactionalInterceptor.java:81) [seata-all-1.0.0.jar!/:1.0.0]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) [spring-aop-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) [spring-aop-5.0.5.RELEASE.jar!/:5.0.5.RELEASE]
    at com.fly.seata.controller.TestController$$EnhancerBySpringCGLIB$$63e8794f.test(<generated>) [classes!/:1.0-SNAPSHOT]
    at sun.reflect.GeneratedMethodAccessor80.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_181]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_181]

    线程栈信息

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    "ServerHandlerThread_1_500" #86 daemon prio=5 os_prio=0 tid=0x00007fe594011800 nid=0x6138 waiting on condition [0x00007fe6507c0000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000c6f363c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    "ServerHandlerThread_1_500" #85 daemon prio=5 os_prio=0 tid=0x00007fe580013000 nid=0x6137 waiting on condition [0x00007fe650841000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000c6f363c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    "ServerHandlerThread_1_500" #84 daemon prio=5 os_prio=0 tid=0x00007fe580011000 nid=0x6136 waiting on condition [0x00007fe6508c2000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000c6f363c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    "ServerHandlerThread_1_500" #83 daemon prio=5 os_prio=0 tid=0x00007fe5a4080000 nid=0x6135 waiting on condition [0x00007fe650943000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000c6f363c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    "ServerHandlerThread_1_500" #82 daemon prio=5 os_prio=0 tid=0x00007fe594010000 nid=0x6134 waiting on condition [0x00007fe6509c4000]
    java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for <0x00000000c6f363c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1074)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1134)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)

    出现超时之后,server端和Tm端、Rm端有建立连接的

    1
    2
    3
    4
    5
    6
    7
    8
    9
    pangpeijie@mdw1:/home/dxy/seata-server/bin$ lsof -i:8091
    COMMAND PID USER FD TYPE DEVICE SIZE/OFF NODE NAME
    java 24684 pangpeijie 6u IPv4 1673279856 0t0 TCP 192.168.202.137:8091->192.168.202.138:53212 (ESTABLISHED)
    java 24684 pangpeijie 270u IPv4 1591859785 0t0 TCP 192.168.202.137:8091->192.168.202.138:44208 (ESTABLISHED)
    java 24684 pangpeijie 272u IPv4 1548003634 0t0 TCP *:8091 (LISTEN)
    java 24684 pangpeijie 275u IPv4 1571258316 0t0 TCP 192.168.202.137:8091->192.168.202.138:43734 (ESTABLISHED)
    java 24684 pangpeijie 276u IPv4 1571258317 0t0 TCP 192.168.202.137:8091->192.168.202.138:43738 (ESTABLISHED)
    java 24684 pangpeijie 278u IPv4 1571258320 0t0 TCP 192.168.202.137:8091->192.168.202.138:43806 (ESTABLISHED)
    java 24684 pangpeijie 280u IPv4 1571258321 0t0 TCP 192.168.202.137:8091->192.168.202.138:43814 (ESTABLISHED)
    1
    2
    3
    4
    5
    6
    7
    pangpeijie@mdw2:~$ netstat -ano|grep "192.168.202.137:8091"
    tcp 0 0 192.168.202.138:43814 192.168.202.137:8091 ESTABLISHED keepalive (2398.82/0/0)
    tcp 0 0 192.168.202.138:53212 192.168.202.137:8091 ESTABLISHED keepalive (4753.92/0/0)
    tcp 0 0 192.168.202.138:43738 192.168.202.137:8091 ESTABLISHED keepalive (2396.00/0/0)
    tcp 0 0 192.168.202.138:43806 192.168.202.137:8091 ESTABLISHED keepalive (2397.65/0/0)
    tcp 0 0 192.168.202.138:44208 192.168.202.137:8091 ESTABLISHED keepalive (4107.33/0/0)
    tcp 0 0 192.168.202.138:43734 192.168.202.137:8091 ESTABLISHED keepalive (2396.14/0/0)

    其他报错信息请查看github的issue

问题排查

server端服务器负载

刚开始我们怀疑是seata-server的服务器压力太大,导致服务端处理能力低下或者负载过高导致频繁fullgc,结果发现server端的服务器负载和gc情况正常,如下图

服务器负载情况,server端服务器的配置是8核16G内存配置

服务器负载

server端gc情况

gc情况

数据库连接情况,看起来绝大多数的数据库连接都是sleep状态,觉得数据库也不应该存在瓶颈

数据库连接情况

网络io瓶颈

和运维沟通后,server和tm端是属于同一台宿主机,用的是虚拟网卡,应该是千兆网卡(物理网卡是千兆网卡),所以不应该网络io成瓶颈。

网卡信息

流量信息

优化参数

client端timeout时间

总觉得如果绝大多数服务端响应超时处理总导致总体的服务雪崩问题,将客户端发送服务端消息的超时时间由原先的30s调整到2s。

1
2
3
4
5
public class NettyClientConfig extends NettyBaseConfig {
// ......
private static final int RPC_REQUEST_TIMEOUT = 30 * 1000;
// ......
}

client端netty的处理线程调大

由于TM和RM端是采用一个channel连接,连接池的线程数配置为当前服务器cpu核数,我们将这个客户端的处理线程调整到100个线程。

1
2
3
4
5
public class NettyClientConfig extends NettyBaseConfig {
// ......
private int clientWorkerThreads = WORKER_THREAD_SIZE;
// ......
}

server端netty的处理线程数调大

将server端对应的业务处理线程池messageExecutor调大

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof RpcMessage) {
final RpcMessage rpcMessage = (RpcMessage) msg;
if (rpcMessage.getMessageType() == ProtocolConstants.MSGTYPE_RESQUEST
|| rpcMessage.getMessageType() == ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));
}
try {
messageExecutor.execute(new Runnable() {
@Override
public void run() {
try {
dispatch(rpcMessage, ctx);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
});
} catch (RejectedExecutionException e) {
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
if (allowDumpStack) {
String name = ManagementFactory.getRuntimeMXBean().getName();
String pid = name.split("@")[0];
int idx = new Random().nextInt(100);
try {
Runtime.getRuntime().exec("jstack " + pid + " >d:/" + idx + ".log");
} catch (IOException exx) {
LOGGER.error(exx.getMessage());
}
allowDumpStack = false;
}
}
} else {
MessageFuture messageFuture = futures.remove(rpcMessage.getId());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String
.format("%s msgId:%s, future :%s, body:%s", this, rpcMessage.getId(), messageFuture,
rpcMessage.getBody()));
}
if (messageFuture != null) {
messageFuture.setResultMessage(rpcMessage.getBody());
} else {
try {
messageExecutor.execute(new Runnable() {
@Override
public void run() {
try {
dispatch(rpcMessage, ctx);
} catch (Throwable th) {
LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);
}
}
});
} catch (RejectedExecutionException e) {
LOGGER.error(FrameworkErrorCode.ThreadPoolFull.getErrCode(),
"thread pool is full, current max pool size is " + messageExecutor.getActiveCount());
}
}
}
}
}

调整client端批量消息发送

调整客户端参数,发现后端是1个线程1ms将消息合并成一条然后发送给服务端

1
2
3
4
transport {
# the client batch send request enable
enableClientBatchSendRequest = true
}

经过调整这些参数后,发现还是出问题,发现在压测十几分钟就能复现问题,一直没搞明白是啥原因?

分析线程dump信息

发现所有的server端线程都是在waiting,等待一个queue的锁,看起来也是就是server端根本没有消息堆积,所有的处理线程都在等待消息过来消费。

处理线程阻塞

线程阻塞

线程cpu执行情况

线程cpu执行情况

线程情况

线程执行情况

问题原因

线程执行情况

通过最后一个线程监控,发现所有的瓶颈都在执行数据库压力下,然后查看了下server的数据库连接池的源码,默认的minconn为1,但是我们server上的配置只配置了maxconn最大连接数,这个最大连接数还是配置为100,理论上应该也没啥问题,就开始怀疑是不是dbcp数据源问题,然后就开始切换用druid数据源,其他配置项不变。经过一整晚的性能压测,tps达到了500多,比原先提高了4倍。

性能压测

然后我们就开始复盘为啥切换成dbcp性能会如此差,该版本也算是稳定版,毕竟很多公司生产环境在使用,接着我们就怀疑dbcp的参数配置问题,分析了源码终于找到了原因。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@LoadLevel(name = "dbcp")
public class DbcpDataSourceGenerator extends AbstractDataSourceGenerator {

@Override
public DataSource generateDataSource() {
BasicDataSource ds = new BasicDataSource();
ds.setDriverClassName(getDriverClassName());
ds.setUrl(getUrl());
ds.setUsername(getUser());
ds.setPassword(getPassword());
ds.setInitialSize(getMinConn());
ds.setMaxActive(getMaxConn());
// 最小空间链接和最大空闲连接
ds.setMinIdle(getMinConn());
ds.setMaxIdle(getMinConn());
ds.setMaxWait(5000);
ds.setTimeBetweenEvictionRunsMillis(120000);
ds.setNumTestsPerEvictionRun(1);
ds.setTestWhileIdle(true);
ds.setValidationQuery(getValidationQuery(getDBType()));
ds.setConnectionProperties("useUnicode=yes;characterEncoding=utf8;socketTimeout=5000;connectTimeout=500");
return ds;
}
}

在初始化dbcp数据源,配置了最小空闲数配置成1(默认),最大连接处配置成50(手动配置为50,默认是10),当并发量超过50时,在maxIdle和maxActive之间,因此当一个连接使用结束后由于当前连接数大于maxIdle连接无法被复用会被立即断开,新来一个请求也无法获取一个空闲的连接需要重新建立一个新的连接,由于断开连接的响应时间较慢,断开连接都在等待资源的释放,大量的线程出现排队,这样就出现了通过jstack看到的,线程被block住的现象。参考博客文章

dbcp获取数据源逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
  /**
* Create (if necessary) and return a connection to the database.
*
* @throws SQLException if a database access error occurs
* @return a database connection
*/
public Connection getConnection() throws SQLException {
return createDataSource().getConnection();
}

protected synchronized DataSource createDataSource()
throws SQLException {
if (closed) {
throw new SQLException("Data source is closed");
}

// Return the pool if we have already created it
if (dataSource != null) {
return (dataSource);
}

// create factory which returns raw physical connections
ConnectionFactory driverConnectionFactory = createConnectionFactory();

// create a pool for our connections
createConnectionPool();

// Set up statement pool, if desired
GenericKeyedObjectPoolFactory statementPoolFactory = null;
if (isPoolPreparedStatements()) {
statementPoolFactory = new GenericKeyedObjectPoolFactory(null,
-1, // unlimited maxActive (per key)
GenericKeyedObjectPool.WHEN_EXHAUSTED_FAIL,
0, // maxWait
1, // maxIdle (per key)
maxOpenPreparedStatements);
}

// Set up the poolable connection factory
createPoolableConnectionFactory(driverConnectionFactory, statementPoolFactory, abandonedConfig);

// Create and return the pooling data source to manage the connections
createDataSourceInstance();

try {
for (int i = 0 ; i < initialSize ; i++) {
connectionPool.addObject();
}
} catch (Exception e) {
throw new SQLNestedException("Error preloading the connection pool", e);
}

return dataSource;
}


/**
* Get a db connection from the pool.
*
* If removeAbandoned=true, recovers db connections which
* have been idle > removeAbandonedTimeout and
* getNumActive() > getMaxActive() - 3 and
* getNumIdle() < 2
*
* @return Object jdbc Connection
* @throws Exception if an exception occurs retrieving a
* connection from the pool
*/
public Object borrowObject() throws Exception {
if (config != null
&& config.getRemoveAbandoned()
&& (getNumIdle() < 2)
&& (getNumActive() > getMaxActive() - 3) ) {
removeAbandoned();
}
Object obj = super.borrowObject();
if (obj instanceof AbandonedTrace) {
((AbandonedTrace) obj).setStackTrace();
}
if (obj != null && config != null && config.getRemoveAbandoned()) {
synchronized (trace) {
trace.add(obj);
}
}
return obj;
}

但是这个也只能影响server端的吞吐量,也不应该所有的请求都超时,带着这个问题,我们再一次的将druid连接池的minconn调整为1,压测差不多半个多小时,还是出现了。这时我们想着从客户端的netty的解码器看下客户端是否有无接受到包,发现Object decoded = super.decode(ctx, in)返回为null。在每次出问题之后,客户端都会接收到响应包,但是解码都是为null,这是什么原因呢?

io.seata.core.rpc.netty.v1.ProtocolV1Decoder#decode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
Object decoded = super.decode(ctx, in);
if (decoded instanceof ByteBuf) {
ByteBuf frame = (ByteBuf) decoded;
try {
return decodeFrame(frame);
} catch (Exception e) {
LOGGER.error("Decode frame error!", e);
throw e;
} finally {
frame.release();
}
}
return decoded;
}

刚开始我们一直认为这个返回为null是由于tcp拆包之后的空包导致,后来继续远程debug代码,发现解码那边一直在丢弃包。

io.netty.handler.codec.LengthFieldBasedFrameDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf)

1
2
3
if (discardingTooLongFrame) {
discardingTooLongFrame(in);
}

这个时候我们开始怀疑是不是服务端发送的包有问题呢,这时候发现server端发现编码异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2020-03-25 17:06:43.587 ERROR[NettyServerNIOWorker_1_16]io.seata.core.rpc.netty.v1.ProtocolV1Encoder.encode:118 -Encode request error!
java.lang.NullPointerException: null
at io.seata.serializer.seata.protocol.transaction.AbstractGlobalEndResponseCodec.encode(AbstractGlobalEndResponseCodec.java:42)
at io.seata.serializer.seata.protocol.MergeResultMessageCodec.encode(MergeResultMessageCodec.java:52)
at io.seata.serializer.seata.SeataSerializer.serialize(SeataSerializer.java:47)
at io.seata.core.rpc.netty.v1.ProtocolV1Encoder.encode(ProtocolV1Encoder.java:97)
at io.netty.handler.codec.MessageToByteEncoder.write(MessageToByteEncoder.java:107)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:106)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081)
at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128)
at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:462)

接着看了编码器的代码,发现没有针对编码异常做throw或者close channel处理,此时就有理由怀疑是server端的编码异常,最终导致将有问题的数据包发送给客户端,由于tcp传输可能存在粘包和拆包问题,最终导致客户端解码的时候异常,最终所有的客户端请求无响应。

io.seata.core.rpc.netty.v1.ProtocolV1Encoder

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public class ProtocolV1Encoder extends MessageToByteEncoder {

private static final Logger LOGGER = LoggerFactory.getLogger(ProtocolV1Encoder.class);

@Override
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
try {
if (msg instanceof RpcMessage) {
RpcMessage rpcMessage = (RpcMessage) msg;

int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
int headLength = ProtocolConstants.V1_HEAD_LENGTH;

byte messageType = rpcMessage.getMessageType();
out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
out.writeByte(ProtocolConstants.VERSION);
// full Length(4B) and head length(2B) will fix in the end.
out.writerIndex(out.writerIndex() + 6);
out.writeByte(messageType);
out.writeByte(rpcMessage.getCodec());
out.writeByte(rpcMessage.getCompressor());
out.writeInt(rpcMessage.getId());

// direct write head with zero-copy
Map<String, String> headMap = rpcMessage.getHeadMap();
if (headMap != null && !headMap.isEmpty()) {
int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out);
headLength += headMapBytesLength;
fullLength += headMapBytesLength;
}

byte[] bodyBytes = null;
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
// heartbeat has no body
Serializer serializer = SerializerFactory.getSerializer(rpcMessage.getCodec());
bodyBytes = serializer.serialize(rpcMessage.getBody());
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
bodyBytes = compressor.compress(bodyBytes);
fullLength += bodyBytes.length;
}

if (bodyBytes != null) {
out.writeBytes(bodyBytes);
}

// fix fullLength and headLength
int writeIndex = out.writerIndex();
// skip magic code(2B) + version(1B)
out.writerIndex(writeIndex - fullLength + 3);
out.writeInt(fullLength);
out.writeShort(headLength);
out.writerIndex(writeIndex);
} else {
throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
}
} catch (Throwable e) {
LOGGER.error("Encode request error!", e);
}
}
}

此时我们修改了编码器,出现异常的时候关闭channel通道,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@Override
public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
try {
if (msg instanceof RpcMessage) {
RpcMessage rpcMessage = (RpcMessage) msg;

int fullLength = ProtocolConstants.V1_HEAD_LENGTH;
int headLength = ProtocolConstants.V1_HEAD_LENGTH;

byte messageType = rpcMessage.getMessageType();
out.writeBytes(ProtocolConstants.MAGIC_CODE_BYTES);
out.writeByte(ProtocolConstants.VERSION);
// full Length(4B) and head length(2B) will fix in the end.
out.writerIndex(out.writerIndex() + 6);
out.writeByte(messageType);
out.writeByte(rpcMessage.getCodec());
out.writeByte(rpcMessage.getCompressor());
out.writeInt(rpcMessage.getId());

// direct write head with zero-copy
Map<String, String> headMap = rpcMessage.getHeadMap();
if (headMap != null && !headMap.isEmpty()) {
int headMapBytesLength = HeadMapSerializer.getInstance().encode(headMap, out);
headLength += headMapBytesLength;
fullLength += headMapBytesLength;
}

byte[] bodyBytes = null;
if (messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST
&& messageType != ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE) {
// heartbeat has no body
Serializer serializer = SerializerFactory.getSerializer(rpcMessage.getCodec());
bodyBytes = serializer.serialize(rpcMessage.getBody());
Compressor compressor = CompressorFactory.getCompressor(rpcMessage.getCompressor());
bodyBytes = compressor.compress(bodyBytes);
fullLength += bodyBytes.length;
}

if (bodyBytes != null) {
out.writeBytes(bodyBytes);
}

// fix fullLength and headLength
int writeIndex = out.writerIndex();
// skip magic code(2B) + version(1B)
out.writerIndex(writeIndex - fullLength + 3);
out.writeInt(fullLength);
out.writeShort(headLength);
out.writerIndex(writeIndex);
} else {
throw new UnsupportedOperationException("Not support this class:" + msg.getClass());
}
} catch (Throwable e) {
LOGGER.error("Encode request error!", e);
final String addrRemote = RemoteUtils.parseChannelRemoteAddr(ctx.channel());
ctx.channel().close().addListener(
new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future)
throws Exception {
LOGGER.error("Encode request error closeChannel: close the connection to remote address[{}] result: {}", addrRemote,
future.isSuccess());
}
});
}
}

在压测24小时候无出现异常后我们才能肯定就是server端在高并发压力下可能存在编码问题,然后给社区提了issue

总结

经过这次排查过程,虽然花了很多时间去排查性能瓶颈,但是最终找到问题原因还是很惊喜,没想到居然是这个问题,中间也踩了很多坑,很多监控也是现学的,学习了问题的发现和查找思路,最重要的就是了解了没接触过的netty框架。