Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add redis client name #908

Merged
merged 2 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,21 @@ protected void doSendRequest(final NettyClient nettyClient, ByteBuf byteBuf) {
}

if(getCommandTimeoutMilli() > 0 && scheduled != null){
int commandTimeoutMilli = getCommandTimeoutMilli();
if (nettyClient instanceof RedisNettyClient && !((RedisNettyClient) nettyClient).getDoAfterConnectedOver()) {
commandTimeoutMilli += ((RedisNettyClient) nettyClient).getAfterConnectCommandTimeoutMill();
}

getLogger().debug("[doSendRequest][schedule timeout]{}, {}", this, getCommandTimeoutMilli());
getLogger().debug("[doSendRequest][schedule timeout]{}, {}", this, commandTimeoutMilli);
int finalCommandTimeoutMilli = commandTimeoutMilli;
timeoutFuture = scheduled.schedule(new AbstractExceptionLogTask() {

@Override
public void doRun() {
getLogger().info("[{}][run][timeout]{}", AbstractNettyRequestResponseCommand.this, nettyClient);
future().setFailure(new CommandTimeoutException("timeout " + + getCommandTimeoutMilli()));
future().setFailure(new CommandTimeoutException("timeout " + finalCommandTimeoutMilli));
}
}, getCommandTimeoutMilli(), TimeUnit.MILLISECONDS);
}, commandTimeoutMilli, TimeUnit.MILLISECONDS);

future().addListener(new CommandFutureListener<V>() {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package com.ctrip.xpipe.netty.commands;

public interface RedisNettyClient {

boolean getDoAfterConnectedOver();

int getAfterConnectCommandTimeoutMill();

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public NettyClient borrowObject() throws BorrowObjectException {
try {
return objectPool.borrowObject();
} catch (Exception e) {
logger.error("[borrowObject]" + factory, e);
logger.error("[borrowObject] NumIdle:{}, NumActive:{}" + factory, objectPool.getNumIdle(), objectPool.getNumActive(), e);
throw new BorrowObjectException("borrow " + factory, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,6 @@ public interface CheckerConfig {

int getMarkInstanceMaxDelayMilli();

boolean getShouldDoAfterNettyClientConnected();

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class DataCenterConfigBean extends AbstractConfigBean {

public static final String KEY_HTTP_ACCEPT_ENCODING = "http.accept.encoding";

private static final String KEY_NETTY_CLIENT_DO_AFTER_CONNECTED = "netty.client.do.after.connected";

private AtomicReference<String> zkConnection = new AtomicReference<>();
private AtomicReference<String> zkNameSpace = new AtomicReference<>();

Expand Down Expand Up @@ -95,4 +97,8 @@ public String getHttpAcceptEncoding() {
return getProperty(KEY_HTTP_ACCEPT_ENCODING, null);
}

public boolean getDoAfterNettyClientConnected() {
return getBooleanProperty(KEY_NETTY_CLIENT_DO_AFTER_CONNECTED, true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR;

/**
Expand All @@ -49,7 +49,7 @@ public class MasterOverOneMonitor implements RedisMasterActionListener, OneWaySu
@Autowired
private AlertManager alertManager;

@Resource(name = KEYED_NETTY_CLIENT_POOL)
@Resource(name = REDIS_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

@Resource(name = SCHEDULED_EXECUTOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import java.util.stream.Collectors;

import static com.ctrip.xpipe.redis.checker.healthcheck.actions.sentinel.SentinelHelloCheckAction.LOG_TITLE;
import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.SENTINEL_KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.THREAD_POOL_TIME_OUT;

/**
Expand Down Expand Up @@ -77,7 +77,7 @@ public class DefaultSentinelHelloCollector implements SentinelHelloCollector {
@Autowired
private PersistenceCache persistenceCache;

@Resource(name = KEYED_NETTY_CLIENT_POOL)
@Resource(name = SENTINEL_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

private SentinelLeakyBucket leakyBucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;

import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.PROXY_KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR;

@Component
Expand All @@ -28,7 +28,7 @@ public class ProxyConnectedChecker implements ProxyChecker {
@Autowired
private CheckerConfig checkerConfig;

@Resource(name = KEYED_NETTY_CLIENT_POOL)
@Resource(name = PROXY_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

@Resource(name = SCHEDULED_EXECUTOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@ public class Resource {

public static final String REDIS_COMMAND_EXECUTOR = "redisCommandExecutor";

public static final String KEYED_NETTY_CLIENT_POOL = "keyedClientPool";
public static final String REDIS_KEYED_NETTY_CLIENT_POOL = "redisKeyedClientPool";

public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool";
public static final String SENTINEL_KEYED_NETTY_CLIENT_POOL = "sentinelKeyedClientPool";

public static final String KEEPER_KEYED_NETTY_CLIENT_POOL = "keeperKeyedClientPool";

public static final String MIGRATE_KEEPER_CLIENT_POOL = "migrateKeeperClientPool";
public static final String PROXY_KEYED_NETTY_CLIENT_POOL = "proxyKeyedClientPool";

public static final String REDIS_SESSION_NETTY_CLIENT_POOL = "redisSessionClientPool";

public static final String PING_DELAY_INFO_EXECUTORS = "pingDelayInfoExecutors";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.ctrip.xpipe.concurrent.DefaultExecutorFactory;
import com.ctrip.xpipe.lifecycle.LifecycleHelper;
import com.ctrip.xpipe.netty.commands.NettyKeyedPoolClientFactory;
import com.ctrip.xpipe.redis.core.client.NettyRedisPoolClientFactory;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.cluster.GroupCheckerLeaderElector;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
Expand Down Expand Up @@ -148,17 +149,33 @@ public ScheduledExecutorService getRedisCommandExecutor() {
);
}

@Bean(name = KEYED_NETTY_CLIENT_POOL)
@Bean(name = REDIS_KEYED_NETTY_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getReqResNettyClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(8));
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getRedisPoolClientFactory(8));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
LifecycleHelper.startIfPossible(keyedObjectPool);
return keyedObjectPool;
}

@Bean(name = REDIS_SESSION_NETTY_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getRedisSessionNettyClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(12));
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getRedisPoolClientFactory(12));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
LifecycleHelper.startIfPossible(keyedObjectPool);
return keyedObjectPool;
}

@Bean(name = SENTINEL_KEYED_NETTY_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getSentinelReqResNettyClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(8));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
LifecycleHelper.startIfPossible(keyedObjectPool);
return keyedObjectPool;
}

@Bean(name = PROXY_KEYED_NETTY_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getProxyReqResNettyClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(8));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
LifecycleHelper.startIfPossible(keyedObjectPool);
return keyedObjectPool;
Expand Down Expand Up @@ -195,6 +212,10 @@ public ScheduledExecutorService getHelloCheckScheduled() {
private NettyKeyedPoolClientFactory getKeyedPoolClientFactory(int eventLoopThreads) {
return new NettyKeyedPoolClientFactory(eventLoopThreads);
}

private NettyKeyedPoolClientFactory getRedisPoolClientFactory(int eventLoopThreads) {
return new NettyRedisPoolClientFactory(eventLoopThreads, "xpipe", () -> true);
}

@Bean
public FoundationService foundationService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,4 +315,9 @@ public int getMarkInstanceMaxDelayMilli() {
return 2000;
}

@Override
public boolean getShouldDoAfterNettyClientConnected() {
return true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,11 @@ public String getHttpAcceptEncoding() {
return dataCenterConfigBean.getHttpAcceptEncoding();
}

@Override
public boolean getShouldDoAfterNettyClientConnected() {
return dataCenterConfigBean.getDoAfterNettyClientConnected();
}

@Override
public void addListener(ConfigKeyListener listener) {
this.listenersSet.add(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.PROXY_KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR;

@Component
Expand All @@ -49,7 +49,7 @@ public class DefaultProxyMonitorCollectorManager extends AbstractStartStoppable
@Resource(name = SCHEDULED_EXECUTOR)
private ScheduledExecutorService scheduled;

@Resource(name = KEYED_NETTY_CLIENT_POOL)
@Resource(name = PROXY_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

@Autowired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.ctrip.xpipe.redis.checker.resource.Resource.KEYED_NETTY_CLIENT_POOL;
import static com.ctrip.xpipe.redis.checker.resource.Resource.REDIS_COMMAND_EXECUTOR;
import static com.ctrip.xpipe.redis.checker.resource.Resource.*;

/**
* @author chen.zhu
Expand All @@ -53,7 +52,7 @@ public class DefaultSentinelManager implements SentinelManager, ShardEventHandle

private static final int LONG_SENTINEL_COMMAND_TIMEOUT = 2000;

@Resource(name = KEYED_NETTY_CLIENT_POOL)
@Resource(name = SENTINEL_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedClientPool;

@Resource(name = REDIS_COMMAND_EXECUTOR)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public class ShardModelServiceImpl implements ShardModelService{
@Resource(name = REDIS_COMMAND_EXECUTOR)
private ScheduledExecutorService scheduled;

@Resource(name = MIGRATE_KEEPER_CLIENT_POOL)
@Resource(name = KEEPER_KEYED_NETTY_CLIENT_POOL)
private XpipeNettyClientKeyedObjectPool keyedObjectPool;

private RetryCommandFactory<Object> retryCommandFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,14 @@
import com.ctrip.xpipe.concurrent.DefaultExecutorFactory;
import com.ctrip.xpipe.lifecycle.LifecycleHelper;
import com.ctrip.xpipe.netty.commands.NettyKeyedPoolClientFactory;
import com.ctrip.xpipe.redis.core.client.NettyRedisPoolClientFactory;
import com.ctrip.xpipe.pool.XpipeNettyClientKeyedObjectPool;
import com.ctrip.xpipe.redis.checker.config.CheckerConfig;
import com.ctrip.xpipe.redis.core.spring.AbstractRedisConfigContext;
import com.ctrip.xpipe.utils.OsUtils;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import com.google.common.util.concurrent.MoreExecutors;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand All @@ -22,11 +25,14 @@
@Configuration
public class ResourceConfig extends AbstractRedisConfigContext {

@Autowired
private CheckerConfig consoleConfig;

private final static int REDIS_SESSION_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("REDIS_SESSION_CLIENT_POOL_SIZE", "12"));

private final static int KEYED_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("KEYED_CLIENT_POOL_SIZE", "8"));

private final static int MIGRATE_KEEPER_CLIENT_POOL_SIZE = Integer.parseInt(System.getProperty("MIGRATE_KEEPER_CLIENT_POOL_SIZE", "1"));
private final static String DEFAULT_CLIENT_NAME = "xpipe";

@Bean(name = REDIS_COMMAND_EXECUTOR)
public ScheduledExecutorService getRedisCommandExecutor() {
Expand All @@ -40,25 +46,41 @@ public ScheduledExecutorService getRedisCommandExecutor() {
);
}

@Bean(name = KEYED_NETTY_CLIENT_POOL)
@Bean(name = REDIS_KEYED_NETTY_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getReqResNettyClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE));
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getRedisPoolClientFactory(KEYED_CLIENT_POOL_SIZE));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
LifecycleHelper.startIfPossible(keyedObjectPool);
return keyedObjectPool;
}

@Bean(name = REDIS_SESSION_NETTY_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getRedisSessionNettyClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(REDIS_SESSION_CLIENT_POOL_SIZE));
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getRedisPoolClientFactory(REDIS_SESSION_CLIENT_POOL_SIZE));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
LifecycleHelper.startIfPossible(keyedObjectPool);
return keyedObjectPool;
}

@Bean(name = MIGRATE_KEEPER_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getMigrateKeeperClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(MIGRATE_KEEPER_CLIENT_POOL_SIZE));
@Bean(name = KEEPER_KEYED_NETTY_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getKeeperReqResNettyClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
LifecycleHelper.startIfPossible(keyedObjectPool);
return keyedObjectPool;
}

@Bean(name = SENTINEL_KEYED_NETTY_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getSentinelReqResNettyClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
LifecycleHelper.startIfPossible(keyedObjectPool);
return keyedObjectPool;
}

@Bean(name = PROXY_KEYED_NETTY_CLIENT_POOL)
public XpipeNettyClientKeyedObjectPool getProxyReqResNettyClientPool() throws Exception {
XpipeNettyClientKeyedObjectPool keyedObjectPool = new XpipeNettyClientKeyedObjectPool(getKeyedPoolClientFactory(KEYED_CLIENT_POOL_SIZE));
LifecycleHelper.initializeIfPossible(keyedObjectPool);
LifecycleHelper.startIfPossible(keyedObjectPool);
return keyedObjectPool;
Expand Down Expand Up @@ -96,4 +118,8 @@ private NettyKeyedPoolClientFactory getKeyedPoolClientFactory(int eventLoopThrea
return new NettyKeyedPoolClientFactory(eventLoopThreads);
}

private NettyKeyedPoolClientFactory getRedisPoolClientFactory(int eventLoopThreads) {
return new NettyRedisPoolClientFactory(eventLoopThreads, DEFAULT_CLIENT_NAME, () -> consoleConfig.getShouldDoAfterNettyClientConnected());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.ctrip.xpipe.redis.core.client;

public interface AsyncConnectionCondition {

boolean shouldDo();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.ctrip.xpipe.redis.core.client;

import com.ctrip.xpipe.api.endpoint.Endpoint;
import com.ctrip.xpipe.netty.commands.NettyClient;
import com.ctrip.xpipe.netty.commands.NettyClientHandler;
import com.ctrip.xpipe.netty.commands.NettyKeyedPoolClientFactory;
import io.netty.channel.ChannelFuture;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;

public class NettyRedisPoolClientFactory extends NettyKeyedPoolClientFactory {

private String clientName;

private AsyncConnectionCondition asyncConnectionCondition;

public NettyRedisPoolClientFactory(int eventLoopThreads, String clientName, AsyncConnectionCondition asyncConnectionCondition) {
super(eventLoopThreads);
this.clientName = clientName;
this.asyncConnectionCondition = asyncConnectionCondition;
}

@Override
public PooledObject<NettyClient> makeObject(Endpoint key) throws Exception {
ChannelFuture f = b.connect(key.getHost(), key.getPort());
NettyClient nettyClient = new RedisAsyncNettyClient(f, key, clientName, asyncConnectionCondition);
f.channel().attr(NettyClientHandler.KEY_CLIENT).set(nettyClient);
return new DefaultPooledObject<NettyClient>(nettyClient);
}

}
Loading
Loading