Skip to content

Commit

Permalink
add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes committed Sep 24, 2024
1 parent b51c94e commit 6457f6c
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class NettyRemotingServer extends AbstractNettyRemotingServer {

private final AtomicBoolean initialized = new AtomicBoolean(false);

private ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(),
private final ThreadPoolExecutor branchResultMessageExecutor = new ThreadPoolExecutor(NettyServerConfig.getMinBranchResultPoolSize(),
NettyServerConfig.getMaxBranchResultPoolSize(), NettyServerConfig.getKeepAliveTime(), TimeUnit.SECONDS,
new LinkedBlockingQueue<>(NettyServerConfig.getMaxTaskQueueSize()),
new NamedThreadFactory("BranchResultHandlerThread", NettyServerConfig.getMaxBranchResultPoolSize()), new ThreadPoolExecutor.CallerRunsPolicy());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.seata.common.util.StringUtils.isNotBlank;

/**
* The rm netty client.
*
Expand Down Expand Up @@ -187,7 +189,7 @@ public void init() {
registerProcessor();
if (initialized.compareAndSet(false, true)) {
super.init();
if (org.apache.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {
if (isNotBlank(transactionServiceGroup)) {
initConnection();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seata.core.rpc.netty;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.seata.common.ConfigurationKeys;
import org.apache.seata.common.ConfigurationTestHelper;
import org.apache.seata.common.XID;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.common.util.UUIDGenerator;
import org.apache.seata.core.protocol.ResultCode;
import org.apache.seata.core.protocol.transaction.BranchRegisterRequest;
import org.apache.seata.core.protocol.transaction.BranchRegisterResponse;
import org.apache.seata.rm.tcc.TCCResourceManager;
import org.apache.seata.saga.engine.db.AbstractServerTest;
import org.apache.seata.server.coordinator.DefaultCoordinator;
import org.apache.seata.server.session.SessionHolder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.Channel;

public class RmNettyClientTest extends AbstractServerTest {

private static final Logger LOGGER = LoggerFactory.getLogger(RmNettyClientTest.class);

@BeforeAll
public static void init(){
ConfigurationTestHelper.putConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL, "8091");
}
@AfterAll
public static void after() {
ConfigurationTestHelper.removeConfig(ConfigurationKeys.SERVER_SERVICE_PORT_CAMEL);
}

public static ThreadPoolExecutor initMessageExecutor() {
return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(20000), new ThreadPoolExecutor.CallerRunsPolicy());
}

@Test
public void testMergeMsg() throws Exception {
ThreadPoolExecutor workingThreads = initMessageExecutor();
NettyRemotingServer nettyRemotingServer = new NettyRemotingServer(workingThreads);
new Thread(() -> {
SessionHolder.init(null);
nettyRemotingServer.setHandler(DefaultCoordinator.getInstance(nettyRemotingServer));
// set registry
XID.setIpAddress(NetUtil.getLocalIp());
XID.setPort(8091);
// init snowflake for transactionId, branchId
UUIDGenerator.init(1L);
nettyRemotingServer.init();
}).start();
Thread.sleep(3000);

String applicationId = "app 1";
String transactionServiceGroup = "default_tx_group";
RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);
rmNettyRemotingClient.setResourceManager(new TCCResourceManager());
rmNettyRemotingClient.init();
rmNettyRemotingClient.getClientChannelManager().initReconnect(transactionServiceGroup, true);
String serverAddress = "0.0.0.0:8091";
Channel channel = RmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress);
Assertions.assertNotNull(channel);

CountDownLatch latch = new CountDownLatch(3);
for (int i = 0; i < 3; i++) {
CompletableFuture.runAsync(()->{
BranchRegisterRequest request = new BranchRegisterRequest();
request.setXid("127.0.0.1:8091:1249853");
request.setLockKey("lock key testSendMsgWithResponse");
request.setResourceId("resoutceId1");
BranchRegisterResponse branchRegisterResponse = null;
try {
branchRegisterResponse = (BranchRegisterResponse) rmNettyRemotingClient.sendSyncRequest(request);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
Assertions.assertNotNull(branchRegisterResponse);
Assertions.assertEquals(ResultCode.Failed, branchRegisterResponse.getResultCode());
Assertions.assertEquals("TransactionException[Could not found global transaction xid = 127.0.0.1:8091:1249853, may be has finished.]",
branchRegisterResponse.getMsg());
latch.countDown();
});
}
latch.await(10,TimeUnit.SECONDS);
nettyRemotingServer.destroy();
rmNettyRemotingClient.destroy();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,9 @@
import org.apache.seata.common.ConfigurationTestHelper;
import org.apache.seata.common.XID;
import org.apache.seata.common.util.NetUtil;
import org.apache.seata.core.protocol.ResultCode;
import org.apache.seata.core.protocol.transaction.BranchRegisterRequest;
import org.apache.seata.core.protocol.transaction.BranchRegisterResponse;
import org.apache.seata.mockserver.MockServer;
import org.apache.seata.core.model.GlobalStatus;
import org.apache.seata.core.protocol.transaction.GlobalCommitRequest;
import org.apache.seata.core.protocol.transaction.GlobalCommitResponse;
import org.apache.seata.saga.engine.db.AbstractServerTest;
import org.apache.seata.common.util.UUIDGenerator;
import org.apache.seata.server.coordinator.DefaultCoordinator;
Expand All @@ -40,6 +39,7 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

/**
Expand All @@ -58,7 +58,7 @@ public static void after() {
}

public static ThreadPoolExecutor initMessageExecutor() {
return new ThreadPoolExecutor(100, 500, 500, TimeUnit.SECONDS,
return new ThreadPoolExecutor(5, 5, 500, TimeUnit.SECONDS,
new LinkedBlockingQueue(20000), new ThreadPoolExecutor.CallerRunsPolicy());
}

Expand Down Expand Up @@ -176,16 +176,16 @@ public void testSendMsgWithResponse() throws Exception {
String serverAddress = "0.0.0.0:8091";
Channel channel = TmNettyRemotingClient.getInstance().getClientChannelManager().acquireChannel(serverAddress);
Assertions.assertNotNull(channel);

BranchRegisterRequest request = new BranchRegisterRequest();
GlobalCommitRequest request = new GlobalCommitRequest();
request.setXid("127.0.0.1:8091:1249853");
request.setLockKey("lock key testSendMsgWithResponse");
request.setResourceId("resoutceId1");
BranchRegisterResponse branchRegisterResponse = (BranchRegisterResponse) tmNettyRemotingClient.sendSyncRequest(request);
Assertions.assertNotNull(branchRegisterResponse);
Assertions.assertEquals(ResultCode.Failed, branchRegisterResponse.getResultCode());
Assertions.assertEquals("TransactionException[Could not found global transaction xid = 127.0.0.1:8091:1249853, may be has finished.]",
branchRegisterResponse.getMsg());
GlobalCommitResponse globalCommitResponse = null;
try {
globalCommitResponse = (GlobalCommitResponse)tmNettyRemotingClient.sendSyncRequest(request);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
Assertions.assertNotNull(globalCommitResponse);
Assertions.assertEquals(GlobalStatus.Finished, globalCommitResponse.getGlobalStatus());
nettyRemotingServer.destroy();
tmNettyRemotingClient.destroy();
}
Expand Down

0 comments on commit 6457f6c

Please sign in to comment.