diff --git a/changes/en-us/2.x.md b/changes/en-us/2.x.md index 768ce3ebc1b..969b9578bea 100644 --- a/changes/en-us/2.x.md +++ b/changes/en-us/2.x.md @@ -31,6 +31,7 @@ Add changes here for all PR submitted to the 2.x branch. - [[#6898](https://github.com/apache/incubator-seata/pull/6898)] upgrade npmjs version in saga module - [[#6879](https://github.com/apache/incubator-seata/pull/6879)] fix log argument mismatch issue - [[#6902](https://github.com/apache/incubator-seata/pull/6900)] optimize readme docs +- [[#6807](https://github.com/apache/incubator-seata/pull/6807)] splitting MergedWarpMessage enhances the server parallel processing capability - [[#6905](https://github.com/apache/incubator-seata/pull/6905)] remove incompatible licenses at build time - [[#6906](https://github.com/apache/incubator-seata/pull/6906)] h2 dependency adds test scope - [[#6911](https://github.com/apache/incubator-seata/pull/6911)] fix some typos in project diff --git a/changes/zh-cn/2.x.md b/changes/zh-cn/2.x.md index 7a5366ec660..873f126b64a 100644 --- a/changes/zh-cn/2.x.md +++ b/changes/zh-cn/2.x.md @@ -33,6 +33,7 @@ - [[#6879](https://github.com/apache/incubator-seata/pull/6879)] 修复日志参数不匹配问题 - [[#6898](https://github.com/apache/incubator-seata/pull/6898)] 升级 saga 模块 npmjs 版本 - [[#6902](https://github.com/apache/incubator-seata/pull/6900)] 优化 readme 文档 +- [[#6807](https://github.com/apache/incubator-seata/pull/6807)] 分离merge消息使其能完全并行处理 - [[#6905](https://github.com/apache/incubator-seata/pull/6905)] 移除构建期不兼容的 license - [[#6906](https://github.com/apache/incubator-seata/pull/6906)] h2依赖添加test scope - [[#6911](https://github.com/apache/incubator-seata/pull/6911)] 修正项目中的部分拼写错误 diff --git a/core/src/main/java/org/apache/seata/core/protocol/Version.java b/core/src/main/java/org/apache/seata/core/protocol/Version.java index 12178d8fe05..f32bb163a69 100644 --- a/core/src/main/java/org/apache/seata/core/protocol/Version.java +++ b/core/src/main/java/org/apache/seata/core/protocol/Version.java @@ -39,6 +39,7 @@ public class Version { private static final String CURRENT = VersionInfo.VERSION; private static final String VERSION_0_7_1 = "0.7.1"; private static final String VERSION_1_5_0 = "1.5.0"; + private static final String VERSION_2_3_0 = "2.3.0"; private static final int MAX_VERSION_DOT = 3; /** @@ -86,15 +87,21 @@ public static String getChannelVersion(Channel c) { * @return true: client version is above or equal version 1.5.0, false: on the contrary */ public static boolean isAboveOrEqualVersion150(String version) { - boolean isAboveOrEqualVersion150 = false; + return isAboveOrEqualVersion(version, VERSION_1_5_0); + } + + public static boolean isAboveOrEqualVersion230(String version) { + return isAboveOrEqualVersion(version, VERSION_2_3_0); + } + + public static boolean isAboveOrEqualVersion(String clientVersion, String divideVersion) { + boolean isAboveOrEqualVersion = false; try { - long clientVersion = convertVersion(version); - long divideVersion = convertVersion(VERSION_1_5_0); - isAboveOrEqualVersion150 = clientVersion >= divideVersion; + isAboveOrEqualVersion = convertVersion(clientVersion) >= convertVersion(divideVersion); } catch (Exception e) { - LOGGER.error("convert version error, clientVersion:{}", version, e); + LOGGER.error("convert version error, clientVersion:{}", clientVersion, e); } - return isAboveOrEqualVersion150; + return isAboveOrEqualVersion; } public static long convertVersion(String version) throws IncompatibleVersionException { diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java index 248e8f48f6d..bbbab50faa5 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingClient.java @@ -89,6 +89,8 @@ public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting */ protected final Map mergeMsgMap = new ConcurrentHashMap<>(); + protected final Map childToParentMap = new ConcurrentHashMap<>(); + /** * When batch sending is enabled, the message will be stored to basketMap * Send via asynchronous thread {@link AbstractNettyRemotingClient.MergedSendRunnable} @@ -203,8 +205,15 @@ public void sendAsyncRequest(Channel channel, Object msg) { RpcMessage rpcMessage = buildRequestMessage(msg, msg instanceof HeartbeatMessage ? ProtocolConstants.MSGTYPE_HEARTBEAT_REQUEST : ProtocolConstants.MSGTYPE_RESQUEST_ONEWAY); - if (rpcMessage.getBody() instanceof MergeMessage) { - mergeMsgMap.put(rpcMessage.getId(), (MergeMessage) rpcMessage.getBody()); + Object body = rpcMessage.getBody(); + if (body instanceof MergeMessage) { + Integer parentId = rpcMessage.getId(); + mergeMsgMap.put(parentId, (MergeMessage)rpcMessage.getBody()); + if (body instanceof MergedWarpMessage) { + for (Integer msgId : ((MergedWarpMessage)rpcMessage.getBody()).msgIds) { + childToParentMap.put(msgId, parentId); + } + } } super.sendAsync(channel, rpcMessage); } @@ -370,6 +379,10 @@ public void run() { // fast fail for (Integer msgId : mergeMessage.msgIds) { MessageFuture messageFuture = futures.remove(msgId); + Integer parentId = childToParentMap.remove(msgId); + if (parentId != null) { + mergeMsgMap.remove(parentId); + } if (messageFuture != null) { messageFuture.setResultMessage( new RuntimeException(String.format("%s is unreachable", address), e)); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java index 4b79f20d95c..67df2ea8494 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/AbstractNettyRemotingServer.java @@ -29,9 +29,13 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import org.apache.seata.common.util.NetUtil; +import org.apache.seata.common.util.StringUtils; +import org.apache.seata.core.protocol.AbstractMessage; import org.apache.seata.core.protocol.HeartbeatMessage; +import org.apache.seata.core.protocol.MergedWarpMessage; import org.apache.seata.core.protocol.ProtocolConstants; import org.apache.seata.core.protocol.RpcMessage; +import org.apache.seata.core.protocol.Version; import org.apache.seata.core.rpc.RemotingServer; import org.apache.seata.core.rpc.RpcContext; import org.apache.seata.core.rpc.processor.Pair; @@ -270,4 +274,32 @@ public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Excep } } + + @Override + protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { + Object body = rpcMessage.getBody(); + RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel()); + // If the client is not version 2.3.0 or higher, splitting MergedWarpMessage will result in the client’s mergeMsgMap not being cleared + if (body instanceof MergedWarpMessage && (StringUtils.isNotBlank(rpcContext.getVersion()) + && Version.isAboveOrEqualVersion230(rpcContext.getVersion()))) { + MergedWarpMessage mergedWarpMessage = (MergedWarpMessage)body; + for (int i = 0; i < mergedWarpMessage.msgs.size(); i++) { + RpcMessage rpcMsg = + buildRequestMessage(mergedWarpMessage.msgs.get(i), rpcMessage, mergedWarpMessage.msgIds.get(i)); + super.processMessage(ctx, rpcMsg); + } + } else { + super.processMessage(ctx, rpcMessage); + } + } + + private RpcMessage buildRequestMessage(AbstractMessage msg, RpcMessage rpcMessage,int id) { + RpcMessage rpcMsg = new RpcMessage(); + rpcMsg.setId(id); + rpcMsg.setCodec(rpcMessage.getCodec()); + rpcMsg.setCompressor(rpcMessage.getCompressor()); + rpcMsg.setBody(msg); + return rpcMsg; + } + } diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java index 3e6ec63c15e..3010efc4ae2 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/NettyRemotingServer.java @@ -45,7 +45,7 @@ public class NettyRemotingServer extends AbstractNettyRemotingServer { private final AtomicBoolean initialized = new AtomicBoolean(false); - private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(), + private final ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(), NettyServerConfig.getMaxBranchResultPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS, new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()), new NamedThreadFactory("BranchResultHandlerThread", NettyServerConfig.getMaxBranchResultPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy()); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java index 92cbafd0a5d..0b9fd0e12bc 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/RmNettyRemotingClient.java @@ -323,7 +323,7 @@ private void registerProcessor() { super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor); // 4.registry TC response processor ClientOnResponseProcessor onResponseProcessor = - new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler()); + new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler()); super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null); diff --git a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java index 68ff739bbb0..28993b61f77 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java +++ b/core/src/main/java/org/apache/seata/core/rpc/netty/TmNettyRemotingClient.java @@ -46,6 +46,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.seata.common.util.StringUtils.isNotBlank; + /** * The rm netty client. * @@ -187,7 +189,7 @@ public void init() { registerProcessor(); if (initialized.compareAndSet(false, true)) { super.init(); - if (org.apache.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) { + if (isNotBlank(transactionServiceGroup)) { initConnection(); } } @@ -247,7 +249,7 @@ protected Function getPoolKeyFunction() { private void registerProcessor() { // 1.registry TC response processor ClientOnResponseProcessor onResponseProcessor = - new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler()); + new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), childToParentMap, getTransactionMessageHandler()); super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null); super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null); diff --git a/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java b/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java index f7b44c2e563..bb13c664488 100644 --- a/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java +++ b/core/src/main/java/org/apache/seata/core/rpc/processor/client/ClientOnResponseProcessor.java @@ -69,7 +69,9 @@ public class ClientOnResponseProcessor implements RemotingProcessor { /** * The Merge msg map from org.apache.seata.core.rpc.netty.AbstractNettyRemotingClient#mergeMsgMap. */ - private Map mergeMsgMap; + private final Map mergeMsgMap; + + private final Map childToParentMap; /** * The Futures from org.apache.seata.core.rpc.netty.AbstractNettyRemoting#futures @@ -82,9 +84,10 @@ public class ClientOnResponseProcessor implements RemotingProcessor { private final TransactionMessageHandler transactionMessageHandler; public ClientOnResponseProcessor(Map mergeMsgMap, - ConcurrentHashMap futures, + ConcurrentHashMap futures, Map childToParentMap, TransactionMessageHandler transactionMessageHandler) { this.mergeMsgMap = mergeMsgMap; + this.childToParentMap = childToParentMap; this.futures = futures; this.transactionMessageHandler = transactionMessageHandler; } @@ -97,6 +100,8 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc for (int i = 0; i < mergeMessage.msgs.size(); i++) { int msgId = mergeMessage.msgIds.get(i); MessageFuture future = futures.remove(msgId); + // The old version of the server will return MergeResultMessage, so it is necessary to remove the msgId from the childToParentMap. + childToParentMap.remove(msgId); if (future == null) { LOGGER.error("msg: {} is not found in futures, result message: {}", msgId,results.getMsgs()[i]); } else { @@ -104,33 +109,43 @@ public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exc } } } else if (rpcMessage.getBody() instanceof BatchResultMessage) { - try { - BatchResultMessage batchResultMessage = (BatchResultMessage) rpcMessage.getBody(); - for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) { - int msgId = batchResultMessage.getMsgIds().get(i); - MessageFuture future = futures.remove(msgId); - if (future == null) { - LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, batchResultMessage.getResultMessages().get(i)); - } else { - future.setResultMessage(batchResultMessage.getResultMessages().get(i)); - } + BatchResultMessage batchResultMessage = (BatchResultMessage)rpcMessage.getBody(); + for (int i = 0; i < batchResultMessage.getMsgIds().size(); i++) { + int msgId = batchResultMessage.getMsgIds().get(i); + MessageFuture future = futures.remove(msgId); + // The old version of the server will return BatchResultMessage, so it is necessary to remove the msgId + // from the childToParentMap. + Integer parentId = childToParentMap.remove(msgId); + if (parentId != null) { + mergeMsgMap.remove(parentId); + } + if (future == null) { + LOGGER.error("msg: {} is not found in futures, result message: {}", msgId, + batchResultMessage.getResultMessages().get(i)); + } else { + future.setResultMessage(batchResultMessage.getResultMessages().get(i)); } - } finally { - // In order to be compatible with the old version, in the batch sending of version 1.5.0, - // batch messages will also be placed in the local cache of mergeMsgMap, - // but version 1.5.0 no longer needs to obtain batch messages from mergeMsgMap - mergeMsgMap.clear(); } } else { - MessageFuture messageFuture = futures.remove(rpcMessage.getId()); - if (messageFuture != null) { - messageFuture.setResultMessage(rpcMessage.getBody()); - } else { - if (rpcMessage.getBody() instanceof AbstractResultMessage) { - if (transactionMessageHandler != null) { - transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null); + Integer id = rpcMessage.getId(); + try { + MessageFuture messageFuture = futures.remove(id); + if (messageFuture != null) { + messageFuture.setResultMessage(rpcMessage.getBody()); + } else { + if (rpcMessage.getBody() instanceof AbstractResultMessage) { + if (transactionMessageHandler != null) { + transactionMessageHandler.onResponse((AbstractResultMessage)rpcMessage.getBody(), null); + } } } + } finally { + // In version 2.3.0, the server does not return MergeResultMessage and BatchResultMessage + // so it is necessary to clear childToParentMap and mergeMsgMap here. + Integer parentId = childToParentMap.remove(id); + if (parentId != null) { + mergeMsgMap.remove(parentId); + } } } } diff --git a/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java b/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java index fac04d66de1..755c10759c6 100644 --- a/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java +++ b/server/src/test/java/org/apache/seata/server/storage/redis/store/RedisVGroupMappingStoreManagerTest.java @@ -21,11 +21,13 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfSystemProperty; import org.springframework.boot.test.context.SpringBootTest; import java.util.Map; +@EnabledIfSystemProperty(named = "redisCaseEnabled", matches = "true") @SpringBootTest public class RedisVGroupMappingStoreManagerTest { private RedisVGroupMappingStoreManager redisVGroupMappingStoreManager; diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java new file mode 100644 index 00000000000..3d62a10214c --- /dev/null +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/RmNettyClientTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.seata.core.rpc.netty; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.seata.common.ConfigurationKeys; +import org.apache.seata.common.ConfigurationTestHelper; +import org.apache.seata.common.XID; +import org.apache.seata.common.util.NetUtil; +import org.apache.seata.common.util.UUIDGenerator; +import org.apache.seata.core.protocol.ResultCode; +import org.apache.seata.core.protocol.transaction.BranchRegisterRequest; +import org.apache.seata.core.protocol.transaction.BranchRegisterResponse; +import org.apache.seata.rm.tcc.TCCResourceManager; +import org.apache.seata.saga.engine.db.AbstractServerTest; +import org.apache.seata.server.coordinator.DefaultCoordinator; +import org.apache.seata.server.session.SessionHolder; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; + +public class RmNettyClientTest extends AbstractServerTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(RmNettyClientTest.class); + + @BeforeAll + public static void init(){ + ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, "8091"); + } + @AfterAll + public static void after() { + ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL); + } + + public static ThreadPoolExecutor initMessageExecutor() { + return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(20000), new ThreadPoolExecutor.CallerRunsPolicy()); + } + + @Test + public void testMergeMsg() throws Exception { + ThreadPoolExecutor workingThreads = initMessageExecutor(); + NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads); + new Thread(() -> { + SessionHolder.init(null); + nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer)); + // set registry + XID.setIpAddress(NetUtil.getLocalIp()); + XID.setPort(8091); + // init snowflake for transactionId, branchId + UUIDGenerator.init(1L); + nettyRemotingServer.init(); + }).start(); + Thread.sleep(3000); + + String applicationId = "app 1"; + String transactionServiceGroup = "default_tx_group"; + RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup); + rmNettyRemotingClient.setResourceManager(new TCCResourceManager()); + rmNettyRemotingClient.init(); + rmNettyRemotingClient.getClientChannelManager().initReconnect(transactionServiceGroup, true); + String serverAddress = "0.0.0.0:8091"; + Channel channel = RmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress); + Assertions.assertNotNull(channel); + + CountDownLatch latch = new CountDownLatch(3); + for (int i = 0; i < 3; i++) { + CompletableFuture.runAsync(()->{ + BranchRegisterRequest request = new BranchRegisterRequest(); + request.setXid("127.0.0.1:8091:1249853"); + request.setLockKey("lock key testSendMsgWithResponse"); + request.setResourceId("resoutceId1"); + BranchRegisterResponse branchRegisterResponse = null; + try { + branchRegisterResponse = (BranchRegisterResponse) rmNettyRemotingClient.sendSyncRequest(request); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + Assertions.assertNotNull(branchRegisterResponse); + Assertions.assertEquals(ResultCode.Failed, branchRegisterResponse.getResultCode()); + Assertions.assertEquals("TransactionException[Could not found global transaction xid = 127.0.0.1:8091:1249853, may be has finished.]", + branchRegisterResponse.getMsg()); + latch.countDown(); + }); + } + latch.await(10,TimeUnit.SECONDS); + nettyRemotingServer.destroy(); + rmNettyRemotingClient.destroy(); + } + +} diff --git a/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java b/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java index 9e855645305..9a46210aee2 100644 --- a/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java +++ b/test/src/test/java/org/apache/seata/core/rpc/netty/TmNettyClientTest.java @@ -21,10 +21,9 @@ import org.apache.seata.common.ConfigurationTestHelper; import org.apache.seata.common.XID; import org.apache.seata.common.util.NetUtil; -import org.apache.seata.core.protocol.ResultCode; -import org.apache.seata.core.protocol.transaction.BranchRegisterRequest; -import org.apache.seata.core.protocol.transaction.BranchRegisterResponse; -import org.apache.seata.mockserver.MockServer; +import org.apache.seata.core.model.GlobalStatus; +import org.apache.seata.core.protocol.transaction.GlobalCommitRequest; +import org.apache.seata.core.protocol.transaction.GlobalCommitResponse; import org.apache.seata.saga.engine.db.AbstractServerTest; import org.apache.seata.common.util.UUIDGenerator; import org.apache.seata.server.coordinator.DefaultCoordinator; @@ -40,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; /** @@ -58,7 +58,7 @@ public static void after() { } public static ThreadPoolExecutor initMessageExecutor() { - return new ThreadPoolExecutor(100, 500, 500, TimeUnit.SECONDS, + return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS, new LinkedBlockingQueue(20000), new ThreadPoolExecutor.CallerRunsPolicy()); } @@ -176,16 +176,16 @@ public void testSendMsgWithResponse() throws Exception { String serverAddress = "0.0.0.0:8091"; Channel channel = TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress); Assertions.assertNotNull(channel); - - BranchRegisterRequest request = new BranchRegisterRequest(); + GlobalCommitRequest request = new GlobalCommitRequest(); request.setXid("127.0.0.1:8091:1249853"); - request.setLockKey("lock key testSendMsgWithResponse"); - request.setResourceId("resourceId1"); - BranchRegisterResponse branchRegisterResponse = (BranchRegisterResponse) tmNettyRemotingClient.sendSyncRequest(request); - Assertions.assertNotNull(branchRegisterResponse); - Assertions.assertEquals(ResultCode.Failed, branchRegisterResponse.getResultCode()); - Assertions.assertEquals("TransactionException[Could not found global transaction xid = 127.0.0.1:8091:1249853, may be has finished.]", - branchRegisterResponse.getMsg()); + GlobalCommitResponse globalCommitResponse = null; + try { + globalCommitResponse = (GlobalCommitResponse)tmNettyRemotingClient.sendSyncRequest(request); + } catch (TimeoutException e) { + throw new RuntimeException(e); + } + Assertions.assertNotNull(globalCommitResponse); + Assertions.assertEquals(GlobalStatus.Finished, globalCommitResponse.getGlobalStatus()); nettyRemotingServer.destroy(); tmNettyRemotingClient.destroy(); }