Skip to content

Commit

Permalink
console cpu/gc optimize (#762)
Browse files Browse the repository at this point in the history
* cache xml format xpipeMeta

* remove refactor using in proxyChainApi && adjust proxyChain collection interval 1s -> 60s

* avoid expiredAt exceeds max long

---------

Co-authored-by: lishanglin <[email protected]>
  • Loading branch information
LanternLee and lishanglin authored Dec 29, 2023
1 parent cf9ac9d commit 112a10d
Show file tree
Hide file tree
Showing 18 changed files with 205 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ public T getCurrentData() {
return this.data;
}

public T getData() {
return getData(false);
}

public T getData(boolean disableCache) {
if (!disableCache && null != data && expiredAt > System.currentTimeMillis()) {
return data;
Expand All @@ -37,7 +41,10 @@ public T getData(boolean disableCache) {
synchronized (this) {
if (!disableCache && null != data && expiredAt > System.currentTimeMillis()) return data;
this.data = dataSupplier.get();
this.expiredAt = System.currentTimeMillis() + timeoutMillSupplier.getAsLong();
long timeout = timeoutMillSupplier.getAsLong();
this.expiredAt = System.currentTimeMillis() + timeout;
// expiredAt exceeds max long
if (this.expiredAt < timeout) this.expiredAt = Long.MAX_VALUE;
return this.data;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ public XpipeMeta getDividedXpipeMeta(int partIndex) {
return xpipeMeta;
}

@Override
public String getXmlFormatDividedXpipeMeta(int partIndex) {
return xpipeMeta.toString();
}

@Override
public boolean inBackupDc(HostPort hostPort) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ public class ConsoleCheckerController extends AbstractConsoleController {
@GetMapping(ConsoleCheckerPath.PATH_GET_META)
public String getDividedMeta(@PathVariable int index, @RequestParam(value="format", required = false) String format) {
if (index < 0) throw new IllegalArgumentException("illegal index " + index);
XpipeMeta xpipeMeta = metaCache.getDividedXpipeMeta(index);

return (format != null && format.equals("xml"))? xpipeMeta.toString() : coder.encode(xpipeMeta);
if (format != null && format.equals("xml")) return metaCache.getXmlFormatDividedXpipeMeta(index);
return coder.encode(metaCache.getDividedXpipeMeta(index));
}

@GetMapping(ConsoleCheckerPath.PATH_GET_ALL_META)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ public int hashCode() {
return Objects.hash(uri, dcName, id, active);
}

@Override
public ProxyModel clone() {
ProxyModel clone = new ProxyModel();
clone.uri = this.uri;
clone.dcName = this.dcName;
clone.id = this.id;
clone.active = this.active;
clone.monitorActive = this.monitorActive;
if (null != this.hostPort) clone.hostPort = new HostPort(this.hostPort.getHost(), this.hostPort.getPort());
return clone;
}

@Override
public String toString() {
return String.format("ProxyModel[uri: %s, active: %b, dc-name: %s, id: %d]", uri, active, dcName, id);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ctrip.xpipe.redis.checker.model.TunnelStatsInfo;
import com.ctrip.xpipe.redis.console.proxy.ProxyChain;
import com.ctrip.xpipe.redis.core.proxy.monitor.TunnelStatsResult;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -107,6 +108,25 @@ public ProxyTunnelInfo buildProxyTunnelInfo() {
return proxyTunnelInfo;
}

@Override
public DefaultProxyChain clone() {
DefaultProxyChain clone = new DefaultProxyChain();
clone.peerDcId = this.peerDcId;
clone.backupDcId = this.backupDcId;
clone.clusterId = this.clusterId;
clone.shardId = this.shardId;

if (null != this.tunnelInfos) {
List<DefaultTunnelInfo> cloneTunnelInfos = Lists.newArrayList();
for (DefaultTunnelInfo tunnelInfo: this.tunnelInfos) {
cloneTunnelInfos.add(tunnelInfo.clone());
}
clone.tunnelInfos = cloneTunnelInfos;
}

return clone;
}

@Override
public String toString() {
return "DefaultProxyChain{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down Expand Up @@ -75,7 +74,7 @@ public class DefaultProxyChainAnalyzer extends AbstractStartStoppable implements

@Override
public Map<DcClusterShardPeer, ProxyChain> getClusterShardChainMap() {
return chains.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> SerializationUtils.clone(e.getValue())));
return chains.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((DefaultProxyChain)e.getValue()).clone()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import com.ctrip.xpipe.redis.console.reporter.DefaultHttpService;
import com.ctrip.xpipe.spring.AbstractProfile;
import com.ctrip.xpipe.utils.VisibleForTesting;
import com.ctrip.xpipe.utils.XpipeThreadFactory;
import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -24,34 +26,28 @@
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestOperations;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR;
import static java.lang.String.format;

@Component
@Profile(AbstractProfile.PROFILE_NAME_PRODUCTION)
public class DefaultProxyChainCollector extends AbstractStartStoppable implements ProxyChainCollector {

@Resource(name = SCHEDULED_EXECUTOR)
private ScheduledExecutorService scheduled;

@Autowired
private ProxyChainAnalyzer proxyChainAnalyzer;

@Autowired
private ConsoleConfig consoleConfig;

private DefaultHttpService httpService = new DefaultHttpService();

private ScheduledFuture future;
private DynamicDelayPeriodTask collectTask;

private ScheduledExecutorService scheduled;

private AtomicBoolean taskTrigger = new AtomicBoolean(false);

Expand All @@ -63,6 +59,15 @@ public class DefaultProxyChainCollector extends AbstractStartStoppable implement

private Map<String, Map<DcClusterShardPeer, ProxyChain>> dcProxyChainMap = Maps.newConcurrentMap();

@Autowired
public DefaultProxyChainCollector(ProxyChainAnalyzer proxyChainAnalyzer, ConsoleConfig consoleConfig) {
this.proxyChainAnalyzer = proxyChainAnalyzer;
this.consoleConfig = consoleConfig;
this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create(getClass().getSimpleName()));
this.collectTask = new DynamicDelayPeriodTask(getClass().getSimpleName(), this::fetchAllDcProxyChains,
consoleConfig::getProxyInfoCollectInterval, scheduled);
}

@Override
public void isleader() {
taskTrigger.set(true);
Expand Down Expand Up @@ -115,16 +120,20 @@ public Map<String, DcClusterShardPeer> getTunnelClusterShardMap() {

@Override
protected void doStart() throws Exception {
future = scheduled.scheduleWithFixedDelay(() -> {
if (!taskTrigger.get()) {
return;
}
logger.debug("proxy chain collector started");
fetchAllDcProxyChains();
}, getStartTime(), getPeriodic(), TimeUnit.MILLISECONDS);
collectTask.start();
}

@Override
protected void doStop() throws Exception {
collectTask.stop();
clear();
}

protected void fetchAllDcProxyChains() {
if (!taskTrigger.get()) {
return;
}

ParallelCommandChain commandChain = new ParallelCommandChain(MoreExecutors.directExecutor(), false);
consoleConfig.getConsoleDomains().forEach((dc, domain)->{
logger.debug("begin to get proxy chain from dc {} {}", dc, domain);
Expand Down Expand Up @@ -172,9 +181,8 @@ void updateShardProxyChainMap() {
}

@VisibleForTesting
DefaultProxyChainCollector setDcProxyChainMap(Map<String, Map<DcClusterShardPeer, ProxyChain>> dcProxyChainMap) {
this.dcProxyChainMap = dcProxyChainMap;
return this;
protected void setTaskTrigger(boolean trigger) {
this.taskTrigger.set(trigger);
}

@VisibleForTesting
Expand All @@ -188,29 +196,12 @@ public Map<DcClusterShardPeer, ProxyChain> getShardProxyChainMap() {
return shardProxyChainMap;
}

protected int getStartTime() {
return 2 * 1000;
}

protected int getPeriodic() {
return 1000;
}

protected void clear() {
shardProxyChainMap.clear();
tunnelClusterShardMap.clear();
dcProxyChainMap.clear();
}

@Override
protected void doStop() throws Exception {
if(future != null) {
future.cancel(true);
future = null;
}
clear();
}

@Override
public Map<DcClusterShardPeer, ProxyChain> getAllProxyChains() {
return shardProxyChainMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,31 @@ public int hashCode() {
return Objects.hash(tunnelId);
}

@Override
protected DefaultTunnelInfo clone() {
DefaultTunnelInfo clone = new DefaultTunnelInfo();
clone.tunnelId = this.tunnelId;
clone.tunnelDcId = this.tunnelDcId;

if (null != this.proxyModel) {
clone.proxyModel = this.proxyModel.clone();
}

if (null != this.tunnelSocketStatsResult) {
clone.tunnelSocketStatsResult = this.tunnelSocketStatsResult.clone();
}

if (null != this.tunnelStatsResult) {
clone.tunnelStatsResult = this.tunnelStatsResult.clone();
}

if (null != this.tunnelTrafficResult) {
clone.tunnelTrafficResult = this.tunnelTrafficResult.clone();
}

return clone;
}

@Override
public String toString() {
return "DefaultTunnelInfo{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ public XpipeMeta getDividedXpipeMeta(int partIndex) {
throw new UnsupportedOperationException();
}

@Override
public String getXmlFormatDividedXpipeMeta(int partIndex) {
throw new UnsupportedOperationException();
}

protected void refreshMeta(XpipeMeta xpipeMeta) {
Pair<XpipeMeta, XpipeMetaManager> meta = new Pair<>(xpipeMeta, new DefaultXpipeMetaManager(xpipeMeta));
AbstractMetaCache.this.meta = meta;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.ctrip.xpipe.api.monitor.Task;
import com.ctrip.xpipe.api.monitor.TransactionMonitor;
import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask;
import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache;
import com.ctrip.xpipe.redis.console.cluster.ConsoleLeaderAware;
import com.ctrip.xpipe.redis.console.config.ConsoleConfig;
import com.ctrip.xpipe.redis.console.exception.DataNotFoundException;
Expand Down Expand Up @@ -30,6 +31,7 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

/**
* @author wenchao.meng
Expand Down Expand Up @@ -64,6 +66,8 @@ public class DefaultMetaCache extends AbstractMetaCache implements MetaCache, Co

private List<Set<Long>> keeperContainerParts;

private List<TimeBoundCache<String>> xmlFormatXPipeMetaParts = null;

private ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1);

private ScheduledFuture<?> future;
Expand Down Expand Up @@ -144,8 +148,7 @@ public void go() throws Exception {
}

synchronized (this) {
refreshClusterParts();
refreshKeeperContainerParts();
refreshMetaParts();
XpipeMeta xpipeMeta = createXpipeMeta(dcMetas, redisCheckRuleMetas);
refreshMeta(xpipeMeta);
}
Expand All @@ -158,7 +161,7 @@ public Map getData() {
});
}

private void refreshClusterParts() {
private void refreshMetaParts() {
try {
int parts = Math.max(1, consoleConfig.getClusterDividedParts());
logger.debug("[refreshClusterParts] start parts {}", parts);
Expand All @@ -168,28 +171,24 @@ private void refreshClusterParts() {
logger.info("[refreshClusterParts] skip for parts miss, expect {}, actual {}", parts, newClusterParts.size());
return;
}

this.clusterParts = newClusterParts;
} catch (Throwable th) {
logger.warn("[refreshClusterParts] fail", th);
}
}

private void refreshKeeperContainerParts() {
try {
int parts = Math.max(1, consoleConfig.getClusterDividedParts());
logger.debug("[refreshKeeperContainerParts] start parts {}", parts);

List<Set<Long>> newKeeperContainerParts = keeperContainerService.divideKeeperContainers(parts);
if (newKeeperContainerParts.size() < parts) {
logger.info("[refreshKeeperContainerParts] skip for parts miss, expect {}, actual {}",
parts, newKeeperContainerParts.size());
return;
}

this.clusterParts = newClusterParts;
this.keeperContainerParts = newKeeperContainerParts;

List<TimeBoundCache<String>> localXPipeMetaParts = new ArrayList<>();
IntStream.range(0, consoleConfig.getClusterDividedParts()).forEach(i -> {
// using as lazy-load cache
localXPipeMetaParts.add(new TimeBoundCache<>(() -> Long.MAX_VALUE, () -> getDividedXpipeMeta(i).toString()));
});
this.xmlFormatXPipeMetaParts = localXPipeMetaParts;
} catch (Throwable th) {
logger.warn("[refreshKeeperContainerParts] fail", th);
logger.warn("[refreshClusterParts] fail", th);
}
}

Expand Down Expand Up @@ -235,6 +234,17 @@ public synchronized XpipeMeta getDividedXpipeMeta(int partIndex) {
return createDividedMeta(xpipeMeta, requestClusters, requestKeeperContainers);
}

@Override
public String getXmlFormatDividedXpipeMeta(int partIndex) {
List<TimeBoundCache<String>> localParts = this.xmlFormatXPipeMetaParts;
if (null == localParts) throw new DataNotFoundException("data not ready");
if (partIndex >= localParts.size()) {
throw new DataNotFoundException("no part " + partIndex);
}

return localParts.get(partIndex).getData();
}

@VisibleForTesting
ScheduledFuture<?> getFuture() {
return future;
Expand Down
Loading

0 comments on commit 112a10d

Please sign in to comment.