diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/cache/TimeBoundCache.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/cache/TimeBoundCache.java index f59c64502e..1689b528da 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/cache/TimeBoundCache.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/cache/TimeBoundCache.java @@ -29,6 +29,10 @@ public T getCurrentData() { return this.data; } + public T getData() { + return getData(false); + } + public T getData(boolean disableCache) { if (!disableCache && null != data && expiredAt > System.currentTimeMillis()) { return data; @@ -37,7 +41,10 @@ public T getData(boolean disableCache) { synchronized (this) { if (!disableCache && null != data && expiredAt > System.currentTimeMillis()) return data; this.data = dataSupplier.get(); - this.expiredAt = System.currentTimeMillis() + timeoutMillSupplier.getAsLong(); + long timeout = timeoutMillSupplier.getAsLong(); + this.expiredAt = System.currentTimeMillis() + timeout; + // expiredAt exceeds max long + if (this.expiredAt < timeout) this.expiredAt = Long.MAX_VALUE; return this.data; } } diff --git a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/TestMetaCache.java b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/TestMetaCache.java index fdca6dbd3e..66240033f8 100644 --- a/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/TestMetaCache.java +++ b/redis/redis-checker/src/main/java/com/ctrip/xpipe/redis/checker/impl/TestMetaCache.java @@ -36,6 +36,11 @@ public XpipeMeta getDividedXpipeMeta(int partIndex) { return xpipeMeta; } + @Override + public String getXmlFormatDividedXpipeMeta(int partIndex) { + return xpipeMeta.toString(); + } + @Override public boolean inBackupDc(HostPort hostPort) { return true; diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java index 180133fd65..87c94df20c 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/controller/api/checker/ConsoleCheckerController.java @@ -84,9 +84,8 @@ public class ConsoleCheckerController extends AbstractConsoleController { @GetMapping(ConsoleCheckerPath.PATH_GET_META) public String getDividedMeta(@PathVariable int index, @RequestParam(value="format", required = false) String format) { if (index < 0) throw new IllegalArgumentException("illegal index " + index); - XpipeMeta xpipeMeta = metaCache.getDividedXpipeMeta(index); - - return (format != null && format.equals("xml"))? xpipeMeta.toString() : coder.encode(xpipeMeta); + if (format != null && format.equals("xml")) return metaCache.getXmlFormatDividedXpipeMeta(index); + return coder.encode(metaCache.getDividedXpipeMeta(index)); } @GetMapping(ConsoleCheckerPath.PATH_GET_ALL_META) diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/ProxyModel.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/ProxyModel.java index dc75963bee..aaf3a8ab86 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/ProxyModel.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/model/ProxyModel.java @@ -106,6 +106,18 @@ public int hashCode() { return Objects.hash(uri, dcName, id, active); } + @Override + public ProxyModel clone() { + ProxyModel clone = new ProxyModel(); + clone.uri = this.uri; + clone.dcName = this.dcName; + clone.id = this.id; + clone.active = this.active; + clone.monitorActive = this.monitorActive; + if (null != this.hostPort) clone.hostPort = new HostPort(this.hostPort.getHost(), this.hostPort.getPort()); + return clone; + } + @Override public String toString() { return String.format("ProxyModel[uri: %s, active: %b, dc-name: %s, id: %d]", uri, active, dcName, id); diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChain.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChain.java index 8ef46041ed..39738c34bf 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChain.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChain.java @@ -4,6 +4,7 @@ import com.ctrip.xpipe.redis.checker.model.TunnelStatsInfo; import com.ctrip.xpipe.redis.console.proxy.ProxyChain; import com.ctrip.xpipe.redis.core.proxy.monitor.TunnelStatsResult; +import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -107,6 +108,25 @@ public ProxyTunnelInfo buildProxyTunnelInfo() { return proxyTunnelInfo; } + @Override + public DefaultProxyChain clone() { + DefaultProxyChain clone = new DefaultProxyChain(); + clone.peerDcId = this.peerDcId; + clone.backupDcId = this.backupDcId; + clone.clusterId = this.clusterId; + clone.shardId = this.shardId; + + if (null != this.tunnelInfos) { + List cloneTunnelInfos = Lists.newArrayList(); + for (DefaultTunnelInfo tunnelInfo: this.tunnelInfos) { + cloneTunnelInfos.add(tunnelInfo.clone()); + } + clone.tunnelInfos = cloneTunnelInfos; + } + + return clone; + } + @Override public String toString() { return "DefaultProxyChain{" + diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainAnalyzer.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainAnalyzer.java index 726aeae2a8..3ebb1fb5bf 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainAnalyzer.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainAnalyzer.java @@ -20,7 +20,6 @@ import com.ctrip.xpipe.utils.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.commons.lang3.SerializationUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -75,7 +74,7 @@ public class DefaultProxyChainAnalyzer extends AbstractStartStoppable implements @Override public Map getClusterShardChainMap() { - return chains.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> SerializationUtils.clone(e.getValue()))); + return chains.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((DefaultProxyChain)e.getValue()).clone())); } @Override diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java index f04151ddb4..9723bc8d3d 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollector.java @@ -13,6 +13,8 @@ import com.ctrip.xpipe.redis.console.reporter.DefaultHttpService; import com.ctrip.xpipe.spring.AbstractProfile; import com.ctrip.xpipe.utils.VisibleForTesting; +import com.ctrip.xpipe.utils.XpipeThreadFactory; +import com.ctrip.xpipe.utils.job.DynamicDelayPeriodTask; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; @@ -24,34 +26,28 @@ import org.springframework.stereotype.Component; import org.springframework.web.client.RestOperations; -import javax.annotation.Resource; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; import static java.lang.String.format; @Component @Profile(AbstractProfile.PROFILE_NAME_PRODUCTION) public class DefaultProxyChainCollector extends AbstractStartStoppable implements ProxyChainCollector { - @Resource(name = SCHEDULED_EXECUTOR) - private ScheduledExecutorService scheduled; - - @Autowired private ProxyChainAnalyzer proxyChainAnalyzer; - @Autowired private ConsoleConfig consoleConfig; private DefaultHttpService httpService = new DefaultHttpService(); - private ScheduledFuture future; + private DynamicDelayPeriodTask collectTask; + + private ScheduledExecutorService scheduled; private AtomicBoolean taskTrigger = new AtomicBoolean(false); @@ -63,6 +59,15 @@ public class DefaultProxyChainCollector extends AbstractStartStoppable implement private Map> dcProxyChainMap = Maps.newConcurrentMap(); + @Autowired + public DefaultProxyChainCollector(ProxyChainAnalyzer proxyChainAnalyzer, ConsoleConfig consoleConfig) { + this.proxyChainAnalyzer = proxyChainAnalyzer; + this.consoleConfig = consoleConfig; + this.scheduled = Executors.newScheduledThreadPool(1, XpipeThreadFactory.create(getClass().getSimpleName())); + this.collectTask = new DynamicDelayPeriodTask(getClass().getSimpleName(), this::fetchAllDcProxyChains, + consoleConfig::getProxyInfoCollectInterval, scheduled); + } + @Override public void isleader() { taskTrigger.set(true); @@ -115,16 +120,20 @@ public Map getTunnelClusterShardMap() { @Override protected void doStart() throws Exception { - future = scheduled.scheduleWithFixedDelay(() -> { - if (!taskTrigger.get()) { - return; - } - logger.debug("proxy chain collector started"); - fetchAllDcProxyChains(); - }, getStartTime(), getPeriodic(), TimeUnit.MILLISECONDS); + collectTask.start(); + } + + @Override + protected void doStop() throws Exception { + collectTask.stop(); + clear(); } protected void fetchAllDcProxyChains() { + if (!taskTrigger.get()) { + return; + } + ParallelCommandChain commandChain = new ParallelCommandChain(MoreExecutors.directExecutor(), false); consoleConfig.getConsoleDomains().forEach((dc, domain)->{ logger.debug("begin to get proxy chain from dc {} {}", dc, domain); @@ -172,9 +181,8 @@ void updateShardProxyChainMap() { } @VisibleForTesting - DefaultProxyChainCollector setDcProxyChainMap(Map> dcProxyChainMap) { - this.dcProxyChainMap = dcProxyChainMap; - return this; + protected void setTaskTrigger(boolean trigger) { + this.taskTrigger.set(trigger); } @VisibleForTesting @@ -188,29 +196,12 @@ public Map getShardProxyChainMap() { return shardProxyChainMap; } - protected int getStartTime() { - return 2 * 1000; - } - - protected int getPeriodic() { - return 1000; - } - protected void clear() { shardProxyChainMap.clear(); tunnelClusterShardMap.clear(); dcProxyChainMap.clear(); } - @Override - protected void doStop() throws Exception { - if(future != null) { - future.cancel(true); - future = null; - } - clear(); - } - @Override public Map getAllProxyChains() { return shardProxyChainMap; diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultTunnelInfo.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultTunnelInfo.java index 69555a1974..d19c7da023 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultTunnelInfo.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultTunnelInfo.java @@ -106,6 +106,31 @@ public int hashCode() { return Objects.hash(tunnelId); } + @Override + protected DefaultTunnelInfo clone() { + DefaultTunnelInfo clone = new DefaultTunnelInfo(); + clone.tunnelId = this.tunnelId; + clone.tunnelDcId = this.tunnelDcId; + + if (null != this.proxyModel) { + clone.proxyModel = this.proxyModel.clone(); + } + + if (null != this.tunnelSocketStatsResult) { + clone.tunnelSocketStatsResult = this.tunnelSocketStatsResult.clone(); + } + + if (null != this.tunnelStatsResult) { + clone.tunnelStatsResult = this.tunnelStatsResult.clone(); + } + + if (null != this.tunnelTrafficResult) { + clone.tunnelTrafficResult = this.tunnelTrafficResult.clone(); + } + + return clone; + } + @Override public String toString() { return "DefaultTunnelInfo{" + diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/AbstractMetaCache.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/AbstractMetaCache.java index 0528f19bb2..0d1dfc1eff 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/AbstractMetaCache.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/AbstractMetaCache.java @@ -68,6 +68,11 @@ public XpipeMeta getDividedXpipeMeta(int partIndex) { throw new UnsupportedOperationException(); } + @Override + public String getXmlFormatDividedXpipeMeta(int partIndex) { + throw new UnsupportedOperationException(); + } + protected void refreshMeta(XpipeMeta xpipeMeta) { Pair meta = new Pair<>(xpipeMeta, new DefaultXpipeMetaManager(xpipeMeta)); AbstractMetaCache.this.meta = meta; diff --git a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultMetaCache.java b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultMetaCache.java index 8b4f53338d..f7ba92cf29 100644 --- a/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultMetaCache.java +++ b/redis/redis-console/src/main/java/com/ctrip/xpipe/redis/console/resources/DefaultMetaCache.java @@ -3,6 +3,7 @@ import com.ctrip.xpipe.api.monitor.Task; import com.ctrip.xpipe.api.monitor.TransactionMonitor; import com.ctrip.xpipe.concurrent.AbstractExceptionLogTask; +import com.ctrip.xpipe.redis.checker.cache.TimeBoundCache; import com.ctrip.xpipe.redis.console.cluster.ConsoleLeaderAware; import com.ctrip.xpipe.redis.console.config.ConsoleConfig; import com.ctrip.xpipe.redis.console.exception.DataNotFoundException; @@ -30,6 +31,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; /** * @author wenchao.meng @@ -64,6 +66,8 @@ public class DefaultMetaCache extends AbstractMetaCache implements MetaCache, Co private List> keeperContainerParts; + private List> xmlFormatXPipeMetaParts = null; + private ScheduledExecutorService scheduled = Executors.newScheduledThreadPool(1); private ScheduledFuture future; @@ -144,8 +148,7 @@ public void go() throws Exception { } synchronized (this) { - refreshClusterParts(); - refreshKeeperContainerParts(); + refreshMetaParts(); XpipeMeta xpipeMeta = createXpipeMeta(dcMetas, redisCheckRuleMetas); refreshMeta(xpipeMeta); } @@ -158,7 +161,7 @@ public Map getData() { }); } - private void refreshClusterParts() { + private void refreshMetaParts() { try { int parts = Math.max(1, consoleConfig.getClusterDividedParts()); logger.debug("[refreshClusterParts] start parts {}", parts); @@ -168,18 +171,6 @@ private void refreshClusterParts() { logger.info("[refreshClusterParts] skip for parts miss, expect {}, actual {}", parts, newClusterParts.size()); return; } - - this.clusterParts = newClusterParts; - } catch (Throwable th) { - logger.warn("[refreshClusterParts] fail", th); - } - } - - private void refreshKeeperContainerParts() { - try { - int parts = Math.max(1, consoleConfig.getClusterDividedParts()); - logger.debug("[refreshKeeperContainerParts] start parts {}", parts); - List> newKeeperContainerParts = keeperContainerService.divideKeeperContainers(parts); if (newKeeperContainerParts.size() < parts) { logger.info("[refreshKeeperContainerParts] skip for parts miss, expect {}, actual {}", @@ -187,9 +178,17 @@ private void refreshKeeperContainerParts() { return; } + this.clusterParts = newClusterParts; this.keeperContainerParts = newKeeperContainerParts; + + List> localXPipeMetaParts = new ArrayList<>(); + IntStream.range(0, consoleConfig.getClusterDividedParts()).forEach(i -> { + // using as lazy-load cache + localXPipeMetaParts.add(new TimeBoundCache<>(() -> Long.MAX_VALUE, () -> getDividedXpipeMeta(i).toString())); + }); + this.xmlFormatXPipeMetaParts = localXPipeMetaParts; } catch (Throwable th) { - logger.warn("[refreshKeeperContainerParts] fail", th); + logger.warn("[refreshClusterParts] fail", th); } } @@ -235,6 +234,17 @@ public synchronized XpipeMeta getDividedXpipeMeta(int partIndex) { return createDividedMeta(xpipeMeta, requestClusters, requestKeeperContainers); } + @Override + public String getXmlFormatDividedXpipeMeta(int partIndex) { + List> localParts = this.xmlFormatXPipeMetaParts; + if (null == localParts) throw new DataNotFoundException("data not ready"); + if (partIndex >= localParts.size()) { + throw new DataNotFoundException("no part " + partIndex); + } + + return localParts.get(partIndex).getData(); + } + @VisibleForTesting ScheduledFuture getFuture() { return future; diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollectorTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollectorTest.java index c4271ab999..535395fe25 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollectorTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainCollectorTest.java @@ -1,39 +1,30 @@ package com.ctrip.xpipe.redis.console.proxy.impl; -import com.ctrip.xpipe.api.foundation.FoundationService; import com.ctrip.xpipe.redis.checker.model.DcClusterShardPeer; import com.ctrip.xpipe.redis.console.AbstractConsoleTest; import com.ctrip.xpipe.redis.console.config.ConsoleConfig; import com.ctrip.xpipe.redis.console.model.ProxyModel; import com.ctrip.xpipe.redis.console.proxy.ProxyChain; import com.ctrip.xpipe.redis.console.proxy.ProxyChainAnalyzer; -import com.ctrip.xpipe.redis.console.proxy.ProxyChainCollector; import com.ctrip.xpipe.redis.console.reporter.DefaultHttpService; import org.apache.http.HttpException; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.InjectMocks; import org.mockito.Mock; -import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.HttpMethod; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.client.RestOperations; -import javax.annotation.Resource; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledExecutorService; import java.util.stream.IntStream; -import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.SCHEDULED_EXECUTOR; import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.when; @@ -54,7 +45,6 @@ public class DefaultProxyChainCollectorTest extends AbstractConsoleTest { @Mock private RestOperations restTemplate; - @InjectMocks DefaultProxyChainCollector collector; private Map consoles = new HashMap() {{ @@ -64,6 +54,7 @@ public class DefaultProxyChainCollectorTest extends AbstractConsoleTest { @Before public void setupDefaultProxyChainCollectorTest() { + this.collector = new DefaultProxyChainCollector(proxyChainAnalyzer, consoleConfig); when(httpService.getRestTemplate()).thenReturn(restTemplate); this.collector.setHttpService(httpService); when(consoleConfig.getConsoleDomains()).thenReturn(consoles); @@ -85,15 +76,14 @@ private Map generateProxyChains(int cnt) @Test public void remoteDcDown_noMemLeak() { ResponseEntity> resp = new ResponseEntity(generateProxyChains(100), HttpStatus.OK); + collector.setTaskTrigger(true); when(restTemplate.exchange(anyString(), any(), any(), any(ParameterizedTypeReference.class), anyString())) .thenReturn(resp); IntStream.range(0, 10).forEach(i -> collector.fetchAllDcProxyChains()); - Map> dcProxyChainMap = collector.getDcProxyChainMap(); - for (Map proxyChainMap: dcProxyChainMap.values()) { - for (ProxyChain proxyChain: proxyChainMap.values()) { - Assert.assertEquals(2, proxyChain.getTunnelInfos().size()); - } + List proxyChains = collector.getProxyChains(); + for (ProxyChain proxyChain: proxyChains) { + Assert.assertEquals(2, proxyChain.getTunnelInfos().size()); } doAnswer(inov -> { @@ -101,14 +91,12 @@ public void remoteDcDown_noMemLeak() { String host = consoles.values().iterator().next(); if (uri.startsWith(host)) throw new HttpException("mock"); else return resp; - }).when(restTemplate).exchange(anyString(), any(), any(), any(ParameterizedTypeReference.class)); + }).when(restTemplate).exchange(anyString(), any(), any(), any(ParameterizedTypeReference.class), anyString()); IntStream.range(0, 10).forEach(i -> collector.fetchAllDcProxyChains()); - dcProxyChainMap = collector.getDcProxyChainMap(); - for (Map proxyChainMap: dcProxyChainMap.values()) { - for (ProxyChain proxyChain: proxyChainMap.values()) { - Assert.assertEquals(2, proxyChain.getTunnelInfos().size()); - } + proxyChains = collector.getProxyChains(); + for (ProxyChain proxyChain: proxyChains) { + Assert.assertEquals(2, proxyChain.getTunnelInfos().size()); } } diff --git a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainTest.java b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainTest.java index 00dcbb969f..ab702eefc0 100644 --- a/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainTest.java +++ b/redis/redis-console/src/test/java/com/ctrip/xpipe/redis/console/proxy/impl/DefaultProxyChainTest.java @@ -1,5 +1,6 @@ package com.ctrip.xpipe.redis.console.proxy.impl; +import com.ctrip.xpipe.api.codec.Codec; import com.ctrip.xpipe.endpoint.HostPort; import com.ctrip.xpipe.redis.checker.model.ProxyTunnelInfo; import com.ctrip.xpipe.redis.console.AbstractConsoleTest; @@ -38,4 +39,13 @@ public void testTunnelMissStatsResult() { Assert.assertEquals("test-tunnel2", proxyTunnelInfo.getTunnelStatsInfos().get(0).getTunnelId()); } + @Test + public void testClone() { + DefaultProxyChain proxyChain = Codec.DEFAULT.decode("{\"backupDcId\":\"SIN-AWS\",\"clusterId\":\"GSPoiCache2\",\"shardId\":\"GSPoiCache2_v20220519_2\",\"peerDcId\":\"SHARB\",\"tunnelInfos\":[{\"tunnelDcId\":\"SHARB\",\"tunnelId\":\"10.129.68.65:48722-R(10.60.204.185:48459)-L(10.57.195.124:38060)->R(10.99.132.31:6454)-TCP://10.99.132.31:6454\",\"proxyModel\":{\"uri\":\"PROXYTCP://10.57.195.124:80\",\"dcName\":\"SHARB\",\"id\":325,\"active\":true,\"monitorActive\":true,\"hostPort\":{\"port\":80,\"host\":\"10.57.195.124\"}},\"tunnelStatsResult\":{\"tunnelId\":\"10.129.68.65:48722-R(10.60.204.185:48459)-L(10.57.195.124:38060)->R(10.99.132.31:6454)-TCP://10.99.132.31:6454\",\"tunnelState\":\"Tunnel-Established\",\"frontend\":{\"port\":443,\"host\":\"10.57.195.124\"},\"backend\":{\"port\":38060,\"host\":\"10.57.195.124\"},\"protocolRecvTime\":1703576919127,\"protocolSndTime\":1703576919127,\"closeTime\":-1,\"closeFrom\":\"Not Yet\"},\"tunnelSocketStatsResult\":{\"tunnelId\":\"10.129.68.65:48722-R(10.60.204.185:48459)-L(10.57.195.124:38060)->R(10.99.132.31:6454)-TCP://10.99.132.31:6454\",\"frontendSocketStats\":{\"result\":[\"ESTAB 0 105 ::ffff:10.57.195.124:443 ::ffff:10.60.204.185:48459 \",\" skmem:(r0,rb374400,t0,tb2624256,f1792,w2304,o0,bl0,d145) bbr wscale:10,10 rto:274 rtt:73.701/0.448 ato:40 mss:1460 rcvmss:1036 advmss:1460 cwnd:34 bytes_acked:5385278559 bytes_received:17669224 segs_out:29874924 segs_in:24061408 data_segs_out:29813172 data_segs_in:242106 bbr:(bw:6.8Mbps,mrtt:73.342,pacing_gain:1.25,cwnd_gain:2) send 5.4Mbps lastsnd:51 lastrcv:25 lastack:171 pacing_rate 7.0Mbps delivery_rate 6.8Mbps app_limited unacked:1 retrans:0/106570 reordering:4 rcv_rtt:380909 rcv_space:29358 minrtt:73.336\"],\"timestamp\":1703818983031},\"backendSocketStats\":{\"result\":[\"ESTAB 0 27 ::ffff:10.57.195.124:38060 ::ffff:10.99.132.31:6454 \",\" skmem:(r0,rb1890616,t0,tb130560,f1792,w2304,o0,bl0,d2463) bbr wscale:7,10 rto:217 rtt:16.18/11.05 ato:42 mss:1448 rcvmss:1448 advmss:1448 cwnd:10 bytes_acked:6534735 bytes_received:6051294796 segs_out:23121506 segs_in:32093607 data_segs_out:242026 data_segs_in:32031120 bbr:(bw:1158.4Mbps,mrtt:0.07,pacing_gain:2.88672,cwnd_gain:2.88672) send 7.2Mbps lastsnd:24 lastrcv:50 lastack:312 pacing_rate 3464.1Mbps delivery_rate 1158.4Mbps app_limited unacked:1 rcv_rtt:27.75 rcv_space:221184 minrtt:0.06\"],\"timestamp\":1703818983031}},\"tunnelTrafficResult\":{\"tunnelId\":\"10.129.68.65:48722-R(10.60.204.185:48459)-L(10.57.195.124:38060)->R(10.99.132.31:6454)-TCP://10.99.132.31:6454\",\"frontend\":{\"timestamp\":1703818983419,\"inputBytes\":10649220,\"outputBytes\":4509204672,\"inputRates\":55,\"outputRates\":7271},\"backend\":{\"timestamp\":1703818983419,\"inputBytes\":6051298860,\"outputBytes\":6534761,\"inputRates\":9732,\"outputRates\":33}}}]}", + DefaultProxyChain.class); + DefaultProxyChain cloneProxyChain = proxyChain.clone(); + Assert.assertEquals(Codec.DEFAULT.encode(proxyChain), Codec.DEFAULT.encode(cloneProxyChain)); + Assert.assertEquals(proxyChain.toString(), cloneProxyChain.toString()); + } + } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/MetaCache.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/MetaCache.java index 2941ad5c20..4632c538ff 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/MetaCache.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/meta/MetaCache.java @@ -23,6 +23,8 @@ public interface MetaCache { XpipeMeta getDividedXpipeMeta(int partIndex); + String getXmlFormatDividedXpipeMeta(int partIndex); + boolean inBackupDc(HostPort hostPort); HostPort findMasterInSameShard(HostPort hostPort); diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/SessionTrafficResult.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/SessionTrafficResult.java index 5b60591e0f..20499b009f 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/SessionTrafficResult.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/SessionTrafficResult.java @@ -111,4 +111,15 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(timestamp, inputBytes, outputBytes); } + + @Override + protected SessionTrafficResult clone() { + SessionTrafficResult clone = new SessionTrafficResult(); + clone.timestamp = this.timestamp; + clone.inputBytes = this.inputBytes; + clone.outputBytes = this.outputBytes; + clone.inputRates = this.inputRates; + clone.outputRates = this.outputRates; + return clone; + } } diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/SocketStatsResult.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/SocketStatsResult.java index 0fbff90412..f45e308feb 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/SocketStatsResult.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/SocketStatsResult.java @@ -8,6 +8,7 @@ import io.netty.buffer.ByteBuf; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -100,6 +101,14 @@ public int hashCode() { return Objects.hash(result, timestamp); } + @Override + public SocketStatsResult clone() { + SocketStatsResult clone = new SocketStatsResult(); + clone.timestamp = this.timestamp; + if (null != this.result) clone.result = new ArrayList<>(this.result); + return clone; + } + @Override public String toString() { return "SocketStatsResult{" + diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelSocketStatsResult.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelSocketStatsResult.java index 6d6323eebe..d397a2a6ab 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelSocketStatsResult.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelSocketStatsResult.java @@ -92,6 +92,15 @@ public int hashCode() { return Objects.hash(tunnelId, frontendSocketStats, backendSocketStats); } + @Override + public TunnelSocketStatsResult clone() { + TunnelSocketStatsResult clone = new TunnelSocketStatsResult(); + clone.tunnelId = this.tunnelId; + if (null != this.frontendSocketStats) clone.frontendSocketStats = this.frontendSocketStats.clone(); + if (null != this.backendSocketStats) clone.backendSocketStats = this.backendSocketStats.clone(); + return clone; + } + @Override public String toString() { return "TunnelSocketStatsResult{" + diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelStatsResult.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelStatsResult.java index de095030ee..73aff77827 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelStatsResult.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelStatsResult.java @@ -162,6 +162,20 @@ public int hashCode() { return Objects.hash(tunnelId, tunnelState, protocolRecvTime, protocolSndTime, closeTime, closeFrom); } + @Override + public TunnelStatsResult clone() { + TunnelStatsResult clone = new TunnelStatsResult(); + clone.tunnelId = this.tunnelId; + clone.tunnelState = this.tunnelState; + clone.protocolRecvTime = this.protocolRecvTime; + clone.protocolSndTime = this.protocolSndTime; + clone.closeFrom = this.closeFrom; + clone.closeTime = this.closeTime; + if (null != this.frontend) clone.frontend = new HostPort(this.frontend.getHost(), this.frontend.getPort()); + if (null != this.backend) clone.backend = new HostPort(this.backend.getHost(), this.backend.getPort()); + return clone; + } + @Override public String toString() { return "TunnelStatsResult{" + diff --git a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelTrafficResult.java b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelTrafficResult.java index beae9fc160..cdc70b0643 100644 --- a/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelTrafficResult.java +++ b/redis/redis-core/src/main/java/com/ctrip/xpipe/redis/core/proxy/monitor/TunnelTrafficResult.java @@ -89,4 +89,13 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(tunnelId, frontend, backend); } + + @Override + public TunnelTrafficResult clone() { + TunnelTrafficResult clone = new TunnelTrafficResult(); + clone.tunnelId = this.tunnelId; + if (null != this.frontend) clone.frontend = this.frontend.clone(); + if (null != this.backend) clone.backend = this.backend.clone(); + return clone; + } }