From 012be9a916e7d851571121bcff5b61a1f7792bb1 Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Tue, 19 Nov 2024 15:25:28 +0800 Subject: [PATCH 1/2] add redis client name --- .../AbstractNettyRequestResponseCommand.java | 11 +- .../netty/commands/RedisNettyClient.java | 9 + .../xpipe/pool/XpipeNettyClientPool.java | 2 +- .../redis/checker/config/CheckerConfig.java | 2 + .../config/impl/DataCenterConfigBean.java | 6 + .../redismaster/MasterOverOneMonitor.java | 4 +- .../DefaultSentinelHelloCollector.java | 4 +- .../impl/ProxyConnectedChecker.java | 4 +- .../redis/checker/resource/Resource.java | 10 +- .../AbstractCheckerIntegrationTest.java | 27 ++- .../ctrip/xpipe/redis/checker/TestConfig.java | 5 + .../config/impl/DefaultConsoleConfig.java | 5 + .../DefaultProxyMonitorCollectorManager.java | 4 +- .../console/redis/DefaultSentinelManager.java | 5 +- .../model/impl/ShardModelServiceImpl.java | 2 +- .../redis/console/spring/ResourceConfig.java | 40 ++- .../core/client/AsyncConnectionCondition.java | 7 + .../client/NettyRedisPoolClientFactory.java | 31 +++ .../core/client/RedisAsyncNettyClient.java | 124 ++++++++++ .../core/client/AsyncNettyClientTest.java | 16 +- .../client/RedisAsyncNettyClientTest.java | 228 ++++++++++++++++++ 21 files changed, 512 insertions(+), 34 deletions(-) create mode 100644 core/src/main/java/com/ctrip/xpipe/netty/commands/RedisNettyClient.java create mode 100644 redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/AsyncConnectionCondition.java create mode 100644 redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/NettyRedisPoolClientFactory.java create mode 100644 redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java create mode 100644 redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java diff --git a/core/src/main/java/com/ctrip/xpipe/netty/commands/AbstractNettyRequestResponseCommand.java b/core/src/main/java/com/ctrip/xpipe/netty/commands/AbstractNettyRequestResponseCommand.java index 1f6b9c5c4b..eedde39fb2 100644 --- a/core/src/main/java/com/ctrip/xpipe/netty/commands/AbstractNettyRequestResponseCommand.java +++ b/core/src/main/java/com/ctrip/xpipe/netty/commands/AbstractNettyRequestResponseCommand.java @@ -56,16 +56,21 @@ protected void doSendRequest(final NettyClient nettyClient, ByteBuf byteBuf) { } if(getCommandTimeoutMilli() > 0 && scheduled != null){ + int commandTimeoutMilli = getCommandTimeoutMilli(); + if (nettyClient instanceof RedisNettyClient && !((RedisNettyClient) nettyClient).getDoAfterConnectedOver()) { + commandTimeoutMilli += ((RedisNettyClient) nettyClient).getAfterConnectCommandTimeoutMill(); + } - getLogger().debug("[doSendRequest][schedule timeout]{}, {}", this, getCommandTimeoutMilli()); + getLogger().debug("[doSendRequest][schedule timeout]{}, {}", this, commandTimeoutMilli); + int finalCommandTimeoutMilli = commandTimeoutMilli; timeoutFuture = scheduled.schedule(new AbstractExceptionLogTask() { @Override public void doRun() { getLogger().info("[{}][run][timeout]{}", AbstractNettyRequestResponseCommand.this, nettyClient); - future().setFailure(new CommandTimeoutException("timeout " + + getCommandTimeoutMilli())); + future().setFailure(new CommandTimeoutException("timeout " + finalCommandTimeoutMilli)); } - }, getCommandTimeoutMilli(), TimeUnit.MILLISECONDS); + }, commandTimeoutMilli, TimeUnit.MILLISECONDS); future().addListener(new CommandFutureListener() { diff --git a/core/src/main/java/com/ctrip/xpipe/netty/commands/RedisNettyClient.java b/core/src/main/java/com/ctrip/xpipe/netty/commands/RedisNettyClient.java new file mode 100644 index 0000000000..69a4f82195 --- /dev/null +++ b/core/src/main/java/com/ctrip/xpipe/netty/commands/RedisNettyClient.java @@ -0,0 +1,9 @@ +package com.ctrip.xpipe.netty.commands; + +public interface RedisNettyClient { + + boolean getDoAfterConnectedOver(); + + int getAfterConnectCommandTimeoutMill(); + +} diff --git a/core/src/main/java/com/ctrip/xpipe/pool/XpipeNettyClientPool.java b/core/src/main/java/com/ctrip/xpipe/pool/XpipeNettyClientPool.java index 39d3e367b1..8412777e7f 100644 --- a/core/src/main/java/com/ctrip/xpipe/pool/XpipeNettyClientPool.java +++ b/core/src/main/java/com/ctrip/xpipe/pool/XpipeNettyClientPool.java @@ -61,7 +61,7 @@ public NettyClient borrowObject() throws BorrowObjectException { try { return objectPool.borrowObject(); } catch (Exception e) { - logger.error("[borrowObject]" + factory, e); + logger.error("[borrowObject] NumIdle:{}, NumActive:{}" + factory, objectPool.getNumIdle(), objectPool.getNumActive(), e); throw new BorrowObjectException("borrow " + factory, e); } } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/CheckerConfig.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/CheckerConfig.java index 8ddad11621..9665e7ee55 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/CheckerConfig.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/CheckerConfig.java @@ -112,4 +112,6 @@ public interface CheckerConfig { int getMarkInstanceMaxDelayMilli(); + boolean getShouldDoAfterNettyClientConnected(); + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/DataCenterConfigBean.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/DataCenterConfigBean.java index f7316e27a4..94889f48e2 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/DataCenterConfigBean.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/config/impl/DataCenterConfigBean.java @@ -37,6 +37,8 @@ public class DataCenterConfigBean extends AbstractConfigBean { public static final String KEY_HTTP_ACCEPT_ENCODING = "http.accept.encoding"; + private static final String KEY_NETTY_CLIENT_DO_AFTER_CONNECTED = "netty.client.do.after.connected"; + private AtomicReference zkConnection = new AtomicReference<>(); private AtomicReference zkNameSpace = new AtomicReference<>(); @@ -95,4 +97,8 @@ public String getHttpAcceptEncoding() { return getProperty(KEY_HTTP_ACCEPT_ENCODING, null); } + public boolean getDoAfterNettyClientConnected() { + return getBooleanProperty(KEY_NETTY_CLIENT_DO_AFTER_CONNECTED, true); + } + } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/redismaster/MasterOverOneMonitor.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/redismaster/MasterOverOneMonitor.java index 1750b8c8b6..3b13ef061c 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/redismaster/MasterOverOneMonitor.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/redismaster/MasterOverOneMonitor.java @@ -33,7 +33,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; -import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL; +import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_KEYED_NETTY_CLIENT_POOL; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; /** @@ -49,7 +49,7 @@ public class MasterOverOneMonitor implements RedisMasterActionListener, OneWaySu @Autowired private AlertManager alertManager; - @Resource(name = KEYED_NETTY_CLIENT_POOL) + @Resource(name = REDIS_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; @Resource(name = SCHEDULED_EXECUTOR) diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java index f38c41f619..8ab3c2d544 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/actions/sentinel/collector/DefaultSentinelHelloCollector.java @@ -46,7 +46,7 @@ import java.util.stream.Collectors; import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE; -import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL; +import static com.ctrip.xpipe.redis.checker.resource.Resource.SENTINEL_KEYED_NETTY_CLIENT_POOL; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.THREAD_POOL_TIME_OUT; /** @@ -77,7 +77,7 @@ public class DefaultSentinelHelloCollector implements SentinelHelloCollector { @Autowired private PersistenceCache persistenceCache; - @Resource(name = KEYED_NETTY_CLIENT_POOL) + @Resource(name = SENTINEL_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; private SentinelLeakyBucket leakyBucket; diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java index 394184c1ab..c66d7dd782 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/healthcheck/impl/ProxyConnectedChecker.java @@ -19,7 +19,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; -import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL; +import static com.ctrip.xpipe.redis.checker.resource.Resource.PROXY_KEYED_NETTY_CLIENT_POOL; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; @Component @@ -28,7 +28,7 @@ public class ProxyConnectedChecker implements ProxyChecker { @Autowired private CheckerConfig checkerConfig; - @Resource(name = KEYED_NETTY_CLIENT_POOL) + @Resource(name = PROXY_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; @Resource(name = SCHEDULED_EXECUTOR) diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java index 51a3e4da40..54794672cd 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/resource/Resource.java @@ -8,11 +8,15 @@ public class Resource { public static final String REDIS_COMMAND_EXECUTOR = "redisCommandExecutor"; - public static final String KEYED_NETTY_CLIENT_POOL = "keyedClientPool"; + public static final String REDIS_KEYED_NETTY_CLIENT_POOL = "redisKeyedClientPool"; - public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool"; + public static final String SENTINEL_KEYED_NETTY_CLIENT_POOL = "sentinelKeyedClientPool"; + + public static final String KEEPER_KEYED_NETTY_CLIENT_POOL = "keeperKeyedClientPool"; - public static final String MIGRATE_KEEPER_CLIENT_POOL = "migrateKeeperClientPool"; + public static final String PROXY_KEYED_NETTY_CLIENT_POOL = "proxyKeyedClientPool"; + + public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool"; public static final String PING_DELAY_INFO_EXECUTORS = "pingDelayInfoExecutors"; diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java index e1df04132c..c7b37732c1 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/AbstractCheckerIntegrationTest.java @@ -5,6 +5,7 @@ import com.ctrip.xpipe.concurrent.DefaultExecutorFactory; import com.ctrip.xpipe.lifecycle.LifecycleHelper; import com.ctrip.xpipe.netty.commands.NettyKeyedPoolClientFactory; +import com.ctrip.xpipe.redis.core.client.NettyRedisPoolClientFactory; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector; import com.ctrip.xpipe.redis.checker.config.CheckerConfig; @@ -148,9 +149,9 @@ public ScheduledExecutorService getRedisCommandExecutor() { ); } - @Bean(name = KEYED_NETTY_CLIENT_POOL) + @Bean(name = REDIS_KEYED_NETTY_CLIENT_POOL) public XpipeNettyClientKeyedObjectPool getReqResNettyClientPool() throws Exception { - XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(8)); + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getRedisPoolClientFactory(8)); LifecycleHelper.initializeIfPossible(keyedObjectPool); LifecycleHelper.startIfPossible(keyedObjectPool); return keyedObjectPool; @@ -158,7 +159,23 @@ public XpipeNettyClientKeyedObjectPool getReqResNettyClientPool() throws Excepti @Bean(name = REDIS_SESSION_NETTY_CLIENT_POOL) public XpipeNettyClientKeyedObjectPool getRedisSessionNettyClientPool() throws Exception { - XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(12)); + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getRedisPoolClientFactory(12)); + LifecycleHelper.initializeIfPossible(keyedObjectPool); + LifecycleHelper.startIfPossible(keyedObjectPool); + return keyedObjectPool; + } + + @Bean(name = SENTINEL_KEYED_NETTY_CLIENT_POOL) + public XpipeNettyClientKeyedObjectPool getSentinelReqResNettyClientPool() throws Exception { + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(8)); + LifecycleHelper.initializeIfPossible(keyedObjectPool); + LifecycleHelper.startIfPossible(keyedObjectPool); + return keyedObjectPool; + } + + @Bean(name = PROXY_KEYED_NETTY_CLIENT_POOL) + public XpipeNettyClientKeyedObjectPool getProxyReqResNettyClientPool() throws Exception { + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(8)); LifecycleHelper.initializeIfPossible(keyedObjectPool); LifecycleHelper.startIfPossible(keyedObjectPool); return keyedObjectPool; @@ -195,6 +212,10 @@ public ScheduledExecutorService getHelloCheckScheduled() { private NettyKeyedPoolClientFactory getKeyedPoolClientFactory(int eventLoopThreads) { return new NettyKeyedPoolClientFactory(eventLoopThreads); } + + private NettyKeyedPoolClientFactory getRedisPoolClientFactory(int eventLoopThreads) { + return new NettyRedisPoolClientFactory(eventLoopThreads, "xpipe", () -> true); + } @Bean public FoundationService foundationService() { diff --git a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/TestConfig.java b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/TestConfig.java index e62fc6f68e..4a41717a51 100644 --- a/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/TestConfig.java +++ b/redis/redis-checker/src/test/java/com/ctrip/xpipe/redis/checker/TestConfig.java @@ -315,4 +315,9 @@ public int getMarkInstanceMaxDelayMilli() { return 2000; } + @Override + public boolean getShouldDoAfterNettyClientConnected() { + return true; + } + } diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java index eadb46bff3..3066c66cbb 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/config/impl/DefaultConsoleConfig.java @@ -587,6 +587,11 @@ public String getHttpAcceptEncoding() { return dataCenterConfigBean.getHttpAcceptEncoding(); } + @Override + public boolean getShouldDoAfterNettyClientConnected() { + return dataCenterConfigBean.getDoAfterNettyClientConnected(); + } + @Override public void addListener(ConfigKeyListener listener) { this.listenersSet.add(listener); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyMonitorCollectorManager.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyMonitorCollectorManager.java index 080da9c581..335856633a 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyMonitorCollectorManager.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyMonitorCollectorManager.java @@ -35,7 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL; +import static com.ctrip.xpipe.redis.checker.resource.Resource.PROXY_KEYED_NETTY_CLIENT_POOL; import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; @Component @@ -49,7 +49,7 @@ public class DefaultProxyMonitorCollectorManager extends AbstractStartStoppable @Resource(name = SCHEDULED_EXECUTOR) private ScheduledExecutorService scheduled; - @Resource(name = KEYED_NETTY_CLIENT_POOL) + @Resource(name = PROXY_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; @Autowired diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/redis/DefaultSentinelManager.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/redis/DefaultSentinelManager.java index 37b8832309..228658ca43 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/redis/DefaultSentinelManager.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/redis/DefaultSentinelManager.java @@ -36,8 +36,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL; -import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_COMMAND_EXECUTOR; +import static com.ctrip.xpipe.redis.checker.resource.Resource.*; /** * @author chen.zhu @@ -53,7 +52,7 @@ public class DefaultSentinelManager implements SentinelManager, ShardEventHandle private static final int LONG_SENTINEL_COMMAND_TIMEOUT = 2000; - @Resource(name = KEYED_NETTY_CLIENT_POOL) + @Resource(name = SENTINEL_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedClientPool; @Resource(name = REDIS_COMMAND_EXECUTOR) diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java index 6623647f51..b024c921f2 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/service/model/impl/ShardModelServiceImpl.java @@ -69,7 +69,7 @@ public class ShardModelServiceImpl implements ShardModelService{ @Resource(name = REDIS_COMMAND_EXECUTOR) private ScheduledExecutorService scheduled; - @Resource(name = MIGRATE_KEEPER_CLIENT_POOL) + @Resource(name = KEEPER_KEYED_NETTY_CLIENT_POOL) private XpipeNettyClientKeyedObjectPool keyedObjectPool; private RetryCommandFactory retryCommandFactory; diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java index 6c802fa636..1c12168814 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/spring/ResourceConfig.java @@ -3,11 +3,14 @@ import com.ctrip.xpipe.concurrent.DefaultExecutorFactory; import com.ctrip.xpipe.lifecycle.LifecycleHelper; import com.ctrip.xpipe.netty.commands.NettyKeyedPoolClientFactory; +import com.ctrip.xpipe.redis.core.client.NettyRedisPoolClientFactory; import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool; +import com.ctrip.xpipe.redis.checker.config.CheckerConfig; import com.ctrip.xpipe.redis.core.spring.AbstractRedisConfigContext; import com.ctrip.xpipe.utils.OsUtils; import com.ctrip.xpipe.utils.XpipeThreadFactory; import com.google.common.util.concurrent.MoreExecutors; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -22,11 +25,14 @@ @Configuration public class ResourceConfig extends AbstractRedisConfigContext { + @Autowired + private CheckerConfig consoleConfig; + private final static int REDIS_SESSION_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("REDIS_SESSION_CLIENT_POOL_SIZE", "12")); private final static int KEYED_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("KEYED_CLIENT_POOL_SIZE", "8")); - private final static int MIGRATE_KEEPER_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("MIGRATE_KEEPER_CLIENT_POOL_SIZE", "1")); + private final static String DEFAULT_CLIENT_NAME = "xpipe"; @Bean(name = REDIS_COMMAND_EXECUTOR) public ScheduledExecutorService getRedisCommandExecutor() { @@ -40,9 +46,9 @@ public ScheduledExecutorService getRedisCommandExecutor() { ); } - @Bean(name = KEYED_NETTY_CLIENT_POOL) + @Bean(name = REDIS_KEYED_NETTY_CLIENT_POOL) public XpipeNettyClientKeyedObjectPool getReqResNettyClientPool() throws Exception { - XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE)); + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getRedisPoolClientFactory(KEYED_CLIENT_POOL_SIZE)); LifecycleHelper.initializeIfPossible(keyedObjectPool); LifecycleHelper.startIfPossible(keyedObjectPool); return keyedObjectPool; @@ -50,15 +56,31 @@ public XpipeNettyClientKeyedObjectPool getReqResNettyClientPool() throws Excepti @Bean(name = REDIS_SESSION_NETTY_CLIENT_POOL) public XpipeNettyClientKeyedObjectPool getRedisSessionNettyClientPool() throws Exception { - XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(REDIS_SESSION_CLIENT_POOL_SIZE)); + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getRedisPoolClientFactory(REDIS_SESSION_CLIENT_POOL_SIZE)); LifecycleHelper.initializeIfPossible(keyedObjectPool); LifecycleHelper.startIfPossible(keyedObjectPool); return keyedObjectPool; } - @Bean(name = MIGRATE_KEEPER_CLIENT_POOL) - public XpipeNettyClientKeyedObjectPool getMigrateKeeperClientPool() throws Exception { - XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(MIGRATE_KEEPER_CLIENT_POOL_SIZE)); + @Bean(name = KEEPER_KEYED_NETTY_CLIENT_POOL) + public XpipeNettyClientKeyedObjectPool getKeeperReqResNettyClientPool() throws Exception { + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE)); + LifecycleHelper.initializeIfPossible(keyedObjectPool); + LifecycleHelper.startIfPossible(keyedObjectPool); + return keyedObjectPool; + } + + @Bean(name = SENTINEL_KEYED_NETTY_CLIENT_POOL) + public XpipeNettyClientKeyedObjectPool getSentinelReqResNettyClientPool() throws Exception { + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE)); + LifecycleHelper.initializeIfPossible(keyedObjectPool); + LifecycleHelper.startIfPossible(keyedObjectPool); + return keyedObjectPool; + } + + @Bean(name = PROXY_KEYED_NETTY_CLIENT_POOL) + public XpipeNettyClientKeyedObjectPool getProxyReqResNettyClientPool() throws Exception { + XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE)); LifecycleHelper.initializeIfPossible(keyedObjectPool); LifecycleHelper.startIfPossible(keyedObjectPool); return keyedObjectPool; @@ -96,4 +118,8 @@ private NettyKeyedPoolClientFactory getKeyedPoolClientFactory(int eventLoopThrea return new NettyKeyedPoolClientFactory(eventLoopThreads); } + private NettyKeyedPoolClientFactory getRedisPoolClientFactory(int eventLoopThreads) { + return new NettyRedisPoolClientFactory(eventLoopThreads, DEFAULT_CLIENT_NAME, () -> consoleConfig.getShouldDoAfterNettyClientConnected()); + } + } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/AsyncConnectionCondition.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/AsyncConnectionCondition.java new file mode 100644 index 0000000000..b186cbc911 --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/AsyncConnectionCondition.java @@ -0,0 +1,7 @@ +package com.ctrip.xpipe.redis.core.client; + +public interface AsyncConnectionCondition { + + boolean shouldDo(); + +} diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/NettyRedisPoolClientFactory.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/NettyRedisPoolClientFactory.java new file mode 100644 index 0000000000..dad3f45eed --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/NettyRedisPoolClientFactory.java @@ -0,0 +1,31 @@ +package com.ctrip.xpipe.redis.core.client; + +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.netty.commands.NettyClient; +import com.ctrip.xpipe.netty.commands.NettyClientHandler; +import com.ctrip.xpipe.netty.commands.NettyKeyedPoolClientFactory; +import io.netty.channel.ChannelFuture; +import org.apache.commons.pool2.PooledObject; +import org.apache.commons.pool2.impl.DefaultPooledObject; + +public class NettyRedisPoolClientFactory extends NettyKeyedPoolClientFactory { + + private String clientName; + + private AsyncConnectionCondition asyncConnectionCondition; + + public NettyRedisPoolClientFactory(int eventLoopThreads, String clientName, AsyncConnectionCondition asyncConnectionCondition) { + super(eventLoopThreads); + this.clientName = clientName; + this.asyncConnectionCondition = asyncConnectionCondition; + } + + @Override + public PooledObject makeObject(Endpoint key) throws Exception { + ChannelFuture f = b.connect(key.getHost(), key.getPort()); + NettyClient nettyClient = new RedisAsyncNettyClient(f, key, clientName, asyncConnectionCondition); + f.channel().attr(NettyClientHandler.KEY_CLIENT).set(nettyClient); + return new DefaultPooledObject(nettyClient); + } + +} diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java new file mode 100644 index 0000000000..43ad918695 --- /dev/null +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java @@ -0,0 +1,124 @@ +package com.ctrip.xpipe.redis.core.client; + +import com.ctrip.xpipe.api.endpoint.Endpoint; +import com.ctrip.xpipe.api.monitor.Task; +import com.ctrip.xpipe.api.monitor.TransactionMonitor; +import com.ctrip.xpipe.exception.XpipeException; +import com.ctrip.xpipe.netty.commands.AsyncNettyClient; +import com.ctrip.xpipe.netty.commands.ByteBufReceiver; +import com.ctrip.xpipe.netty.commands.NettyClient; +import com.ctrip.xpipe.netty.commands.RedisNettyClient; +import com.ctrip.xpipe.redis.core.protocal.RedisClientProtocol; +import com.ctrip.xpipe.redis.core.protocal.protocal.RequestStringParser; +import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser; +import com.ctrip.xpipe.utils.ChannelUtil; +import com.dianping.cat.Cat; +import com.dianping.cat.message.Transaction; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicReference; + +public class RedisAsyncNettyClient extends AsyncNettyClient implements RedisNettyClient { + + protected Logger logger = LoggerFactory.getLogger(RedisAsyncNettyClient.class); + + private String clientName; + + private AsyncConnectionCondition asyncConnectionCondition; + + private static final String CLIENT_SET_NAME = "CLIENT SETNAME "; + + private static final String EXPECT_RESP = "OK"; + + private static final int DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI = 660; + + protected final AtomicReference doAfterConnectedOver = new AtomicReference<>(false); + + public RedisAsyncNettyClient(ChannelFuture future, Endpoint endpoint, String clientName, AsyncConnectionCondition asyncConnectionCondition) { + super(future, endpoint); + this.clientName = clientName; + this.asyncConnectionCondition = asyncConnectionCondition; + future.addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + if (future.isSuccess()) { + doAfterConnected(); + doAfterConnectedOver.set(true); + } + } + }); + } + + protected void doAfterConnected() { + if (asyncConnectionCondition != null && !asyncConnectionCondition.shouldDo()) { + return; + } + RequestStringParser requestString = new RequestStringParser(CLIENT_SET_NAME, clientName); + SimpleStringParser simpleStringParser = new SimpleStringParser(); + Transaction transaction = Cat.newTransaction("netty.client.setName", clientName); + RedisAsyncNettyClient.super.sendRequest(requestString.format(), new ByteBufReceiver() { + @Override + public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { + try{ + transaction.addData("remoteAddress", ChannelUtil.getSimpleIpport(channel.remoteAddress())); + transaction.addData("commandTimeoutMills", getAfterConnectCommandTimeoutMill()); + RedisClientProtocol payload = simpleStringParser.read(byteBuf); + String result = null; + if(payload != null){ + result = payload.getPayload(); + } + if(result == null){ + return RECEIVER_RESULT.CONTINUE; + } + if (EXPECT_RESP.equalsIgnoreCase(result)){ + transaction.setStatus(Transaction.SUCCESS); + logger.info("[redisAsync][clientSetName][success][{}] {}", desc, result); + } else { + transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-result][%s] result:%s", desc, result))); + logger.warn("[redisAsync][clientSetName][wont-result][{}] {}", desc, result); + } + }catch(Throwable th){ + transaction.setStatus(th); + logger.error("[logTransaction]" + "netty.client.setName" + "," + clientName, th); + }finally{ + transaction.complete(); + } + return RECEIVER_RESULT.SUCCESS; + } + + @Override + public void clientClosed(NettyClient nettyClient) { + logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel()); + transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-send][%s]client closed %s", desc, nettyClient.channel()))); + transaction.complete(); + } + + @Override + public void clientClosed(NettyClient nettyClient, Throwable th) { + logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel(), th); + transaction.setStatus(th); + transaction.complete(); + } + }); + + } + + @Override + public boolean getDoAfterConnectedOver() { + return doAfterConnectedOver.get(); + } + + @Override + public int getAfterConnectCommandTimeoutMill() { + return DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI; + } + +} diff --git a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/AsyncNettyClientTest.java b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/AsyncNettyClientTest.java index a87c542c5f..642d64f2dd 100644 --- a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/AsyncNettyClientTest.java +++ b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/AsyncNettyClientTest.java @@ -24,7 +24,6 @@ import org.junit.Before; import org.junit.Ignore; import org.junit.Test; - import java.util.concurrent.TimeoutException; /** @@ -36,11 +35,11 @@ public class AsyncNettyClientTest extends AbstractTest { private XpipeNettyClientKeyedObjectPool pool; - private Bootstrap b = new Bootstrap(); + protected Bootstrap b = new Bootstrap(); - private Server server; + protected Server server; - private String result = ""; + protected String result = ""; @Before public void beforeXpipeNettyClientKeyedObjectPoolTest() throws Exception{ @@ -143,7 +142,11 @@ public void run() { sleep(2 * 1000); } - private void runTheTest(AsyncNettyClient client, StringBuffer sb, StringBuilder expected, int n) { + protected void runTheTest(AsyncNettyClient client, StringBuffer sb, StringBuilder expected, int n) { + this.runTheTest(client, sb, expected, n, null); + } + + protected void runTheTest(AsyncNettyClient client, StringBuffer sb, StringBuilder expected, int n, String prefix) { for(int i = 0; i < n; i++) { String message = "+" + i + "\r\n"; client.sendRequest(Unpooled.copiedBuffer(message.getBytes()), new ByteBufReceiver() { @@ -169,6 +172,9 @@ public void clientClosed(NettyClient nettyClient, Throwable th) { } }); + if (prefix != null) { + expected.append(prefix).append("+"); + } expected.append(i); } } diff --git a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java new file mode 100644 index 0000000000..4f88a47dca --- /dev/null +++ b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java @@ -0,0 +1,228 @@ +package com.ctrip.xpipe.redis.core.client; + +import com.ctrip.xpipe.endpoint.DefaultEndPoint; +import com.ctrip.xpipe.netty.commands.ByteBufReceiver; +import com.ctrip.xpipe.netty.commands.NettyClient; +import com.ctrip.xpipe.netty.commands.NettyClientHandler; +import com.ctrip.xpipe.redis.core.protocal.RedisClientProtocol; +import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser; +import com.ctrip.xpipe.simpleserver.AbstractIoAction; +import com.ctrip.xpipe.simpleserver.IoAction; +import com.ctrip.xpipe.simpleserver.IoActionFactory; +import com.ctrip.xpipe.simpleserver.Server; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.util.Arrays; +import java.util.concurrent.TimeoutException; + +public class RedisAsyncNettyClientTest extends AsyncNettyClientTest { + + private static final String prefix = "RedisAsyncNettyClientTest"; + + @Before + public void beforeRedisAsyncNettyClientTestTest() throws Exception{ + server = startEchoServer(randomPort(), prefix); + } + + @Test + public void testStickyBag() throws Exception { + server = startEchoServer(randomPort(), "+OK\r\n+OK1\r\n+OK2"); + RedisAsyncNettyClient client = new RedisAsyncNettyClient(b.connect("localhost", server.getPort()), + new DefaultEndPoint("localhost", server.getPort()), "xpipe", () -> true); + client.channel().attr(NettyClientHandler.KEY_CLIENT).set(client); + + StringBuffer sb = new StringBuffer(); + + StringBuilder expected = new StringBuilder(); + + String message = "+1" + "\r\n"; + final int[] minReadAbleBytes = {Integer.MAX_VALUE}; + client.sendRequest(Unpooled.copiedBuffer(message.getBytes()), new ByteBufReceiver() { + + private RedisClientProtocol parser = new SimpleStringParser(); + @Override + public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { + RedisClientProtocol clientProtocol = parser.read(byteBuf); + if(clientProtocol != null) { + sb.append(clientProtocol.getPayload()); + if (byteBuf.readableBytes() < minReadAbleBytes[0]) { + minReadAbleBytes[0] = byteBuf.readableBytes(); + } + return RECEIVER_RESULT.SUCCESS; + } + return RECEIVER_RESULT.CONTINUE; + } + + @Override + public void clientClosed(NettyClient nettyClient) { + + } + + @Override + public void clientClosed(NettyClient nettyClient, Throwable th) { + + } + }); + expected.append("OK1"); + waitConditionUntilTimeOut(()->client.channel().isActive(), 1000); + sleep(1000); + String str = sb.toString(); + Assert.assertTrue(minReadAbleBytes[0] > 0); + Assert.assertEquals(str, expected.toString()); + } + + @Test + public void testUnpacking() throws Exception { + RedisAsyncNettyClient client = new RedisAsyncNettyClient(b.connect("localhost", server.getPort()), + new DefaultEndPoint("localhost", server.getPort()), "xpipe", () -> true); + client.channel().attr(NettyClientHandler.KEY_CLIENT).set(client); + + StringBuffer sb = new StringBuffer(); + + StringBuilder expected = new StringBuilder(); + + int N = 100; + final int[] maxProtocolTime = {0}; + for(int i = 0; i < N; i++) { + String message = "+" + i + "\r\n"; + client.sendRequest(Unpooled.copiedBuffer(message.getBytes()), new ByteBufReceiver() { + + private RedisClientProtocol parser = new SimpleStringParser(); + int protocolTime = 0; + @Override + public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { + RedisClientProtocol clientProtocol = parser.read(byteBuf); + if(clientProtocol != null) { + sb.append(clientProtocol.getPayload()); + protocolTime++; + if (protocolTime > maxProtocolTime[0]) { + maxProtocolTime[0] = protocolTime; + } + return RECEIVER_RESULT.SUCCESS; + } + protocolTime++; + return RECEIVER_RESULT.CONTINUE; + } + + @Override + public void clientClosed(NettyClient nettyClient) { + + } + + @Override + public void clientClosed(NettyClient nettyClient, Throwable th) { + + } + }); + expected.append(prefix).append("+"); + expected.append(i); + } + waitConditionUntilTimeOut(()->client.channel().isActive(), 1000); + sleep(1000); + String str = sb.toString(); + Assert.assertEquals(str, expected.toString()); + Assert.assertTrue(maxProtocolTime[0] >= 2); + } + + @Test + public void testSendRequest() throws TimeoutException { + RedisAsyncNettyClient client = new RedisAsyncNettyClient(b.connect("localhost", server.getPort()), + new DefaultEndPoint("localhost", server.getPort()), "xpipe", () -> true); + client.channel().attr(NettyClientHandler.KEY_CLIENT).set(client); + + StringBuffer sb = new StringBuffer(); + + StringBuilder expected = new StringBuilder(); + + int N = 100; + runTheTest(client, sb, expected, N, prefix); + waitConditionUntilTimeOut(()->client.channel().isActive(), 1000); + sleep(1000); + String str = sb.toString(); + Assert.assertEquals(str, expected.toString()); + } + + @Test + public void testFutureClosed() { + RedisAsyncNettyClient client = new RedisAsyncNettyClient(b.connect("localhost", server.getPort()), + new DefaultEndPoint("localhost", server.getPort()), "xpipe", () -> true); + client.channel().attr(NettyClientHandler.KEY_CLIENT).set(client); + + StringBuffer sb = new StringBuffer(); + + StringBuilder expected = new StringBuilder(); + + int N = 100; + new Thread(new Runnable() { + @Override + public void run() { + runTheTest(client, sb, expected, N); + } + }).start(); + new Thread(new Runnable() { + @Override + public void run() { + try { + server.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }).start(); + sleep(2 * 1000); + } + + + protected Server startEchoServer(int port, String prefix) throws Exception { + return startServer(port, new IoActionFactory() { + + @Override + public IoAction createIoAction(Socket socket) { + return new AbstractIoAction(socket) { + + private String line; + + @Override + protected Object doRead(InputStream ins) throws IOException { + line = readLine(ins); + logger.debug("[doRead]{}", line); + logger.info("[doRead]{}", line == null ? null : line.length()); + return line; + } + + @Override + protected void doWrite(OutputStream ous, Object readResult) throws IOException { + + String[] sp = line.split("\\s+"); + if (sp.length >= 1) { + if (sp[0].equalsIgnoreCase("sleep")) { + int sleep = Integer.parseInt(sp[1]); + logger.info("[sleep]{}", sleep); + sleepIgnoreInterrupt(sleep); + } + } + logger.debug("[doWrite]{}", line.length()); + logger.debug("[doWrite]{}", line); + if (prefix != null) { + ous.write(prefix.getBytes()); + } + sleepIgnoreInterrupt(1); + ous.write(line.getBytes()); + ous.flush(); + } + }; + } + }); + } + + +} From 2712c515e010b0e8d21d35591c80dd3c6cca24d4 Mon Sep 17 00:00:00 2001 From: yifuzhou Date: Wed, 20 Nov 2024 14:19:42 +0800 Subject: [PATCH 2/2] add redis client name --- .../core/client/RedisAsyncNettyClient.java | 98 +++++++++++-------- .../client/RedisAsyncNettyClientTest.java | 89 +++++++++++------ 2 files changed, 115 insertions(+), 72 deletions(-) diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java index 43ad918695..9fa1c5fa72 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClient.java @@ -12,6 +12,7 @@ import com.ctrip.xpipe.redis.core.protocal.protocal.RequestStringParser; import com.ctrip.xpipe.redis.core.protocal.protocal.SimpleStringParser; import com.ctrip.xpipe.utils.ChannelUtil; +import com.ctrip.xpipe.utils.VisibleForTesting; import com.dianping.cat.Cat; import com.dianping.cat.message.Transaction; import io.netty.buffer.ByteBuf; @@ -40,6 +41,8 @@ public class RedisAsyncNettyClient extends AsyncNettyClient implements RedisNett private static final int DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI = 660; + private boolean doAfterConnectedSuccess = false; + protected final AtomicReference doAfterConnectedOver = new AtomicReference<>(false); public RedisAsyncNettyClient(ChannelFuture future, Endpoint endpoint, String clientName, AsyncConnectionCondition asyncConnectionCondition) { @@ -64,50 +67,60 @@ protected void doAfterConnected() { RequestStringParser requestString = new RequestStringParser(CLIENT_SET_NAME, clientName); SimpleStringParser simpleStringParser = new SimpleStringParser(); Transaction transaction = Cat.newTransaction("netty.client.setName", clientName); - RedisAsyncNettyClient.super.sendRequest(requestString.format(), new ByteBufReceiver() { - @Override - public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { - try{ - transaction.addData("remoteAddress", ChannelUtil.getSimpleIpport(channel.remoteAddress())); - transaction.addData("commandTimeoutMills", getAfterConnectCommandTimeoutMill()); - RedisClientProtocol payload = simpleStringParser.read(byteBuf); - String result = null; - if(payload != null){ - result = payload.getPayload(); - } - if(result == null){ - return RECEIVER_RESULT.CONTINUE; + try { + RedisAsyncNettyClient.super.sendRequest(requestString.format(), new ByteBufReceiver() { + @Override + public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { + try{ + transaction.addData("remoteAddress", ChannelUtil.getSimpleIpport(channel.remoteAddress())); + transaction.addData("commandTimeoutMills", getAfterConnectCommandTimeoutMill()); + RedisClientProtocol payload = simpleStringParser.read(byteBuf); + String result = null; + if(payload != null){ + result = payload.getPayload(); + } + if(result == null){ + return RECEIVER_RESULT.CONTINUE; + } + if (EXPECT_RESP.equalsIgnoreCase(result)){ + transaction.setStatus(Transaction.SUCCESS); + doAfterConnectedSuccess = true; + logger.info("[redisAsync][clientSetName][success][{}] {}", desc, result); + } else { + doAfterConnectedSuccess = false; + transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-result][%s] result:%s", desc, result))); + logger.warn("[redisAsync][clientSetName][err-result][{}] {}", desc, result); + } + transaction.complete(); + }catch(Throwable th){ + doAfterConnectedSuccess = false; + transaction.setStatus(th); + transaction.complete(); + logger.error("[logTransaction]" + "netty.client.setName" + "," + clientName, th); + throw th; } - if (EXPECT_RESP.equalsIgnoreCase(result)){ - transaction.setStatus(Transaction.SUCCESS); - logger.info("[redisAsync][clientSetName][success][{}] {}", desc, result); - } else { - transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-result][%s] result:%s", desc, result))); - logger.warn("[redisAsync][clientSetName][wont-result][{}] {}", desc, result); - } - }catch(Throwable th){ - transaction.setStatus(th); - logger.error("[logTransaction]" + "netty.client.setName" + "," + clientName, th); - }finally{ - transaction.complete(); + return RECEIVER_RESULT.SUCCESS; } - return RECEIVER_RESULT.SUCCESS; - } - @Override - public void clientClosed(NettyClient nettyClient) { - logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel()); - transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-send][%s]client closed %s", desc, nettyClient.channel()))); - transaction.complete(); - } + @Override + public void clientClosed(NettyClient nettyClient) { + logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel()); + transaction.setStatus(new XpipeException(String.format("[redisAsync][clientSetName][wont-send][%s]client closed %s", desc, nettyClient.channel()))); + transaction.complete(); + } - @Override - public void clientClosed(NettyClient nettyClient, Throwable th) { - logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel(), th); - transaction.setStatus(th); - transaction.complete(); - } - }); + @Override + public void clientClosed(NettyClient nettyClient, Throwable th) { + logger.warn("[redisAsync][clientSetName][wont-send][{}] {}", desc, nettyClient.channel(), th); + transaction.setStatus(th); + transaction.complete(); + } + }); + } catch (Exception e) { + transaction.setStatus(e); + transaction.complete(); + logger.error("[redisAsync][clientSetName] err", e); + } } @@ -121,4 +134,9 @@ public int getAfterConnectCommandTimeoutMill() { return DEFAULT_REDIS_COMMAND_TIME_OUT_MILLI; } + @VisibleForTesting + public boolean getDoAfterConnectedSuccess() { + return doAfterConnectedSuccess; + } + } diff --git a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java index 4f88a47dca..8394d96210 100644 --- a/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java +++ b/redis/redis-core/src/test/java/com/ctrip/xpipe/redis/core/client/RedisAsyncNettyClientTest.java @@ -77,11 +77,13 @@ public void clientClosed(NettyClient nettyClient, Throwable th) { sleep(1000); String str = sb.toString(); Assert.assertTrue(minReadAbleBytes[0] > 0); + Assert.assertTrue(client.getDoAfterConnectedSuccess()); Assert.assertEquals(str, expected.toString()); } @Test public void testUnpacking() throws Exception { + server = startUnpackingServer(randomPort(), "+O"); RedisAsyncNettyClient client = new RedisAsyncNettyClient(b.connect("localhost", server.getPort()), new DefaultEndPoint("localhost", server.getPort()), "xpipe", () -> true); client.channel().attr(NettyClientHandler.KEY_CLIENT).set(client); @@ -89,48 +91,71 @@ public void testUnpacking() throws Exception { StringBuffer sb = new StringBuffer(); StringBuilder expected = new StringBuilder(); + String message = "K\r\nOK1\r\n"; + client.sendRequest(Unpooled.copiedBuffer(message.getBytes()), new ByteBufReceiver() { - int N = 100; - final int[] maxProtocolTime = {0}; - for(int i = 0; i < N; i++) { - String message = "+" + i + "\r\n"; - client.sendRequest(Unpooled.copiedBuffer(message.getBytes()), new ByteBufReceiver() { - - private RedisClientProtocol parser = new SimpleStringParser(); - int protocolTime = 0; - @Override - public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { - RedisClientProtocol clientProtocol = parser.read(byteBuf); - if(clientProtocol != null) { - sb.append(clientProtocol.getPayload()); - protocolTime++; - if (protocolTime > maxProtocolTime[0]) { - maxProtocolTime[0] = protocolTime; - } - return RECEIVER_RESULT.SUCCESS; - } - protocolTime++; - return RECEIVER_RESULT.CONTINUE; + private RedisClientProtocol parser = new SimpleStringParser(); + @Override + public RECEIVER_RESULT receive(Channel channel, ByteBuf byteBuf) { + RedisClientProtocol clientProtocol = parser.read(byteBuf); + if(clientProtocol != null) { + sb.append(clientProtocol.getPayload()); + return RECEIVER_RESULT.SUCCESS; } + return RECEIVER_RESULT.CONTINUE; + } - @Override - public void clientClosed(NettyClient nettyClient) { + @Override + public void clientClosed(NettyClient nettyClient) { - } + } - @Override - public void clientClosed(NettyClient nettyClient, Throwable th) { + @Override + public void clientClosed(NettyClient nettyClient, Throwable th) { - } - }); - expected.append(prefix).append("+"); - expected.append(i); - } + } + }); + expected.append("OK1"); waitConditionUntilTimeOut(()->client.channel().isActive(), 1000); sleep(1000); String str = sb.toString(); + Assert.assertTrue(client.getDoAfterConnectedSuccess()); Assert.assertEquals(str, expected.toString()); - Assert.assertTrue(maxProtocolTime[0] >= 2); + } + + + protected Server startUnpackingServer(int port, String prefix) throws Exception { + return startServer(port, new IoActionFactory() { + + boolean sended = false; + + @Override + public IoAction createIoAction(Socket socket) { + return new AbstractIoAction(socket) { + + private String line; + + @Override + protected Object doRead(InputStream ins) throws IOException { + line = readLine(ins); + return line; + } + + @Override + protected void doWrite(OutputStream ous, Object readResult) throws IOException { + if (prefix != null && !sended) { + ous.write(prefix.getBytes()); + sended = true; + } + if (!line.contains("CLIENT")) { + ous.write(line.getBytes()); + } + sleepIgnoreInterrupt(1); + ous.flush(); + } + }; + } + }); } @Test