Skip to content

Commit

Permalink
chore: use redis client library (#840)
Browse files Browse the repository at this point in the history
* chore: use appropriate way to calulate slots

Signed-off-by: Shubham Gupta <[email protected]>

* chore: remove unncessary conversion

Signed-off-by: Shubham Gupta <[email protected]>

* chore: improve slaveid calculation

Signed-off-by: Shubham Gupta <[email protected]>

---------

Signed-off-by: Shubham Gupta <[email protected]>
  • Loading branch information
shubham-cmyk authored Mar 24, 2024
1 parent 2ee4324 commit ff91665
Showing 1 changed file with 12 additions and 36 deletions.
48 changes: 12 additions & 36 deletions k8sutils/cluster-scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,33 +86,22 @@ func getRedisClusterSlots(ctx context.Context, client kubernetes.Interface, logg

redisClient := configureRedisClient(client, logger, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()
redisClusterInfo, err := redisClient.ClusterNodes(ctx).Result()

redisSlots, err := redisClient.ClusterSlots(ctx).Result()
if err != nil {
logger.Error(err, "Failed to Get Cluster Info")
logger.Error(err, "Failed to Get Cluster Slots")
return ""
}

// Split the Redis cluster info into lines
lines := strings.Split(redisClusterInfo, "\n")
// Iterate through all lines
for _, line := range lines {
if strings.Contains(line, "master") && strings.Contains(line, "connected") { // Check if this line is a master node
parts := strings.Fields(line)
if parts[0] == nodeID { // Check if this is the node we're interested in
for _, conn := range parts[8:] {
slotRange := strings.Split(conn, "-")
if len(slotRange) < 2 {
totalSlots = totalSlots + 1
} else {
start, _ := strconv.Atoi(slotRange[0])
end, _ := strconv.Atoi(slotRange[1])
totalSlots = totalSlots + end - start + 1
}
}
for _, slot := range redisSlots {
for _, node := range slot.Nodes {
if node.ID == nodeID {
// Each slot range is a continuous block managed by the node
totalSlots += slot.End - slot.Start + 1
break
}
}
}

logger.V(1).Info("Total cluster slots to be transferred from", "node", nodeID, "is", totalSlots)
return strconv.Itoa(totalSlots)
}
Expand Down Expand Up @@ -270,25 +259,12 @@ func AddRedisNodeToCluster(ctx context.Context, client kubernetes.Interface, log
func getAttachedFollowerNodeIDs(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster, masterNodeID string) []string {
redisClient := configureRedisClient(client, logger, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()
redisClusterInfo, err := redisClient.ClusterNodes(ctx).Result()

slaveIDs, err := redisClient.ClusterSlaves(ctx, masterNodeID).Result()
if err != nil {
logger.Error(err, "Failed to Get Cluster Info")
logger.Error(err, "Failed to get attached follower node IDs", "masterNodeID", masterNodeID)
return nil
}

slaveIDs := []string{}
// Split the Redis cluster info into lines
lines := strings.Split(redisClusterInfo, "\n")

for _, line := range lines {
if strings.Contains(line, "slave") && strings.Contains(line, "connected") {
parts := strings.Fields(line)
if len(parts) >= 3 && parts[3] == masterNodeID {
slaveIDs = append(slaveIDs, parts[0])
}
}
}

logger.V(1).Info("Slaves Nodes attached to", "node", masterNodeID, "are", slaveIDs)
return slaveIDs
}
Expand Down

0 comments on commit ff91665

Please sign in to comment.