Skip to content

Commit

Permalink
aggregate pull instance health delay
Browse files Browse the repository at this point in the history
  • Loading branch information
yifuzhou committed Oct 16, 2024
1 parent d0bb509 commit fc12b78
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@ public List<RedisMeta> getAllInstancesOfShard(String cluster, String shard) {
return null;
}

@Override
public List<RedisMeta> getAllInstanceOfDc(String cluster, String dc) {
return null;
}

@Override
public boolean isAsymmetricCluster(String clusterName) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.ctrip.xpipe.tuple.Pair;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -23,6 +24,8 @@ public interface ConsoleService extends CheckerService {

Long getInstanceDelayStatus(String ip, int port);

Map<HostPort, Long> getInstancesDelayStatus(List<HostPort> hostPorts);

Long getShardDelay(long shardId);

Long getInstanceDelayStatusFromParallelService(String ip, int port);
Expand All @@ -45,4 +48,6 @@ public interface ConsoleService extends CheckerService {

class ShardCheckerHealthCheckModels extends ArrayList<ShardCheckerHealthCheckModel> {}

class InstancesDelayStatusModels extends HashMap<HostPort, Long> {};

}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public long getDelay(String ip, int port, String activeIdc) {
return service.getInstanceDelayStatus(ip, port);
}

public Map<HostPort, Long> getDelay(List<HostPort> hostPorts, String activeIdc) {
ConsoleService service = getServiceByDc(activeIdc);
return service.getInstancesDelayStatus(hostPorts);
}

public long getShardDelay(long shardId, String activeIdc) {
ConsoleService service = getServiceByDc(activeIdc);
return service.getShardDelay(shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ public class DefaultConsoleService extends AbstractService implements ConsoleSer

private final String innerDelayStatusUrl;

private final String instancesDelayStatusUrl;

private final String delayStatusUrl;

private final String allDelayStatusUrl;
Expand Down Expand Up @@ -83,6 +85,7 @@ public DefaultConsoleService(String address){
pingStatusUrl = String.format("%s/api/redis/ping/{ip}/{port}", this.address);
innerShardDelayStatusUrl = String.format("%s/api/shard/inner/delay/{shardId}", this.address);
innerDelayStatusUrl = String.format("%s/api/redis/inner/delay/{ip}/{port}", this.address);
instancesDelayStatusUrl = String.format("%s/api/redises/inner/delay", this.address);
delayStatusUrl = String.format("%s/api/redis/delay/{ip}/{port}", this.address);
allDelayStatusUrl = String.format("%s/api/redis/inner/delay/all", this.address);
unhealthyInstanceUrl = String.format("%s/api/redis/inner/unhealthy", this.address);
Expand Down Expand Up @@ -140,6 +143,11 @@ public Long getInstanceDelayStatus(String ip, int port) {
return restTemplate.getForObject(innerDelayStatusUrl, Long.class, ip, port);
}

@Override
public Map<HostPort, Long> getInstancesDelayStatus(List<HostPort> hostPorts) {
return restTemplate.postForObject(instancesDelayStatusUrl, hostPorts, InstancesDelayStatusModels.class);
}

@Override
public Long getShardDelay(long shardId) {
return restTemplate.getForObject(innerShardDelayStatusUrl, Long.class, shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@
import com.ctrip.xpipe.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* @author wenchao.meng
Expand Down Expand Up @@ -63,6 +62,11 @@ public Long getInnerReplDelayMillis(@PathVariable String redisIp, @PathVariable
return delayService.getLocalCachedDelay(new HostPort(redisIp, redisPort));
}

@RequestMapping(value = "/redises/inner/delay", method = RequestMethod.POST)
public Map<HostPort, Long> getInnerReplDelaysMillis(@RequestBody List<HostPort> hostPorts) {
return hostPorts.stream().collect(Collectors.toMap(instance -> instance, instance -> delayService.getLocalCachedDelay(instance)));
}

@RequestMapping(value = "/redis/inner/delay/all", method = RequestMethod.GET)
public Map<HostPort, Long> getAllInnerReplDelayMills() {
return delayService.getDcCachedDelay(FoundationService.DEFAULT.getDataCenter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ public Map<String, Long> getReplDelayMillis(@PathVariable String redisIp, @PathV
return ImmutableMap.of("delay", delayService.getDelay(new HostPort(redisIp, redisPort)));
}

@RequestMapping(value = "/redis/delay/{clusterType}/{redisIp}/{redisPort}", method = RequestMethod.GET)
public Map<String, Long> getReplDelayMillis(@PathVariable String clusterType, @PathVariable String redisIp, @PathVariable int redisPort) {
ClusterType type = ClusterType.lookup(clusterType);
return ImmutableMap.of("delay", delayService.getDelay(type, new HostPort(redisIp, redisPort)));
@RequestMapping(value = "/redises/delay/{dcId}/{clusterId}", method = RequestMethod.GET)
public Map<String, Map<HostPort, Long>> getAllReplDelayMillis(@PathVariable String dcId, @PathVariable String clusterId) {
return ImmutableMap.of("delay", delayService.getDelay(dcId, clusterId));
}


@RequestMapping(value = "/cross-master/delay/{dcId}/" + CLUSTER_ID_PATH_VARIABLE + "/" + SHARD_ID_PATH_VARIABLE, method = RequestMethod.GET)
public Map<String, Pair<HostPort, Long>> getCrossMasterReplHealthStatus(@PathVariable String dcId, @PathVariable String clusterId, @PathVariable String shardId) {
return crossMasterDelayService.getPeerMasterDelayFromSourceDc(dcId, clusterId, shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ public List<RedisMeta> getAllInstancesOfShard(String cluster, String shard) {
return instances;
}

@Override
public List<RedisMeta> getAllInstanceOfDc(String cluster, String dc) {
return meta.getValue().doGetRedises(dc, cluster);
}

@Override
public String getDc(HostPort hostPort) {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.ctrip.xpipe.redis.console.service;

import com.ctrip.xpipe.cluster.ClusterType;
import com.ctrip.xpipe.endpoint.HostPort;
import com.ctrip.xpipe.redis.checker.RedisDelayManager;
import com.ctrip.xpipe.redis.console.model.consoleportal.UnhealthyInfoModel;

import java.util.List;
import java.util.Map;

/**
Expand All @@ -22,7 +22,9 @@ public interface DelayService extends RedisDelayManager {

long getDelay(HostPort hostPort);

long getDelay(ClusterType clusterType, HostPort hostPort);
Map<HostPort, Long> getDelay(List<HostPort> hostPorts);

Map<HostPort, Long> getDelay(String dcId, String clusterId);

long getLocalCachedDelay(HostPort hostPort);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.ctrip.xpipe.spring.AbstractSpringConfigContext.GLOBAL_EXECUTOR;

Expand Down Expand Up @@ -101,18 +100,7 @@ public long getShardDelay(String clusterId, String shardId, Long shardDbId) {

@Override
public long getDelay(HostPort hostPort) {
Pair<String, String> clusterShard = metaCache.findClusterShard(hostPort);
if (null == clusterShard) return -1L;

ClusterType clusterType = metaCache.getClusterType(clusterShard.getKey());
ClusterType azGroupType = metaCache.getAzGroupType(hostPort);
String dcId = null;
if (clusterType.supportSingleActiveDC() && azGroupType != ClusterType.SINGLE_DC) {
dcId = metaCache.getActiveDc(hostPort);
} else if (clusterType.supportMultiActiveDC() || azGroupType == ClusterType.SINGLE_DC) {
dcId = metaCache.getDc(hostPort);
}

String dcId = getClusterActiveDc(hostPort);
if (StringUtil.isEmpty(dcId)) {
return -1L;
}
Expand All @@ -131,13 +119,88 @@ public long getDelay(HostPort hostPort) {
}

@Override
public long getDelay(ClusterType clusterType, HostPort hostPort) {
public Map<HostPort, Long> getDelay(List<HostPort> hostPorts) {
if (hostPorts == null || hostPorts.isEmpty()) {
return Collections.emptyMap();
}
Map<HostPort, Long> result = new HashMap<>();
Map<String, List<HostPort>> dcMap = new HashMap<>();
for (HostPort hostPort : hostPorts) {
String dcId = getClusterActiveDc(hostPort);
if (dcId == null) {
result.put(hostPort, -1L);
continue;
}
if (!dcMap.containsKey(dcId)) {
dcMap.put(dcId, new ArrayList<>());
}
dcMap.get(dcId).add(hostPort);
}

if (dcMap.isEmpty()) {
return getFailDelayResult(hostPorts);
}

for (Map.Entry<String, List<HostPort>> entry : dcMap.entrySet()) {
String dcId = entry.getKey();
List<HostPort> instances = entry.getValue();
if(!foundationService.getDataCenter().equals(dcId)) {
try {
result = consoleServiceManager.getDelay(instances, dcId);
} catch (Exception e) {
result.putAll(getFailDelayResult(instances));
}
} else {
for (HostPort hostPort : hostPorts) {
result.put(hostPort, TimeUnit.NANOSECONDS.toMillis(hostPort2Delay.getOrDefault(hostPort, DelayAction.SAMPLE_LOST_AND_NO_PONG)));
}
}
}
return result;
}

@Override
public Map<HostPort, Long> getDelay(String dcId, String clusterId) {
Map<HostPort, Long> result;
List<RedisMeta> allInstancesOfDc = metaCache.getAllInstanceOfDc(clusterId, dcId);
if (allInstancesOfDc == null || allInstancesOfDc.isEmpty()) {
return Collections.emptyMap();
}
List<HostPort> hostPorts = new ArrayList<>();
for (RedisMeta redisMeta : allInstancesOfDc) {
hostPorts.add(new HostPort(redisMeta.getIp(), redisMeta.getPort()));
}
ClusterType clusterType = metaCache.getClusterType(clusterId);
if (consoleConfig.getOwnClusterType().contains(clusterType.toString())) {
return getDelay(hostPort);
try {
result = getDelay(hostPorts);
} catch (Exception e) {
result = getFailDelayResult(hostPorts);
}
} else {
return consoleServiceManager.getDelayFromParallelService(hostPort.getHost(), hostPort.getPort());
result = getFailDelayResult(hostPorts);
}

return result;
}

private Map<HostPort, Long> getFailDelayResult(List<HostPort> instances) {
return instances.stream().collect(Collectors.toMap(instance -> instance, instance -> -1L));
}

private String getClusterActiveDc(HostPort hostPort) {
Pair<String, String> clusterShard = metaCache.findClusterShard(hostPort);
if (null == clusterShard) return null;

ClusterType clusterType = metaCache.getClusterType(clusterShard.getKey());
ClusterType azGroupType = metaCache.getAzGroupType(hostPort);
String dcId = null;
if (clusterType.supportSingleActiveDC() && azGroupType != ClusterType.SINGLE_DC) {
dcId = metaCache.getActiveDc(hostPort);
} else if (clusterType.supportMultiActiveDC() || azGroupType == ClusterType.SINGLE_DC) {
dcId = metaCache.getDc(hostPort);
}
return dcId;
}

@Override
Expand Down
Loading

0 comments on commit fc12b78

Please sign in to comment.