Skip to content

Commit

Permalink
ping instead of checking pod readiness
Browse files Browse the repository at this point in the history
  • Loading branch information
com6056 committed Aug 2, 2024
1 parent e3655af commit d1e81b1
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 8 deletions.
4 changes: 2 additions & 2 deletions controllers/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
return intctrlutil.RequeueWithError(err, reqLogger, "")
}

if r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") {
if r.IsStatefulSetReady(ctx, r.K8sClient, instance, instance.Name+"-leader", instance.Namespace) {
// Mark the cluster status as initializing if there are no follower nodes
if (instance.Status.ReadyLeaderReplicas == 0 && instance.Status.ReadyFollowerReplicas == 0) ||
instance.Status.ReadyFollowerReplicas != followerReplicas {
Expand All @@ -147,7 +147,7 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
}
}

if !(r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-leader") && r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name+"-follower")) {
if !(r.IsStatefulSetReady(ctx, r.K8sClient, instance, instance.Name+"-leader", instance.Namespace) && r.IsStatefulSetReady(ctx, r.K8sClient, instance, instance.Name+"-follower", instance.Namespace)) {
return intctrlutil.Reconciled()
}

Expand Down
2 changes: 1 addition & 1 deletion controllers/redisreplication_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (r *RedisReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Req
if err != nil {
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
if !r.IsStatefulSetReady(ctx, instance.Namespace, instance.Name) {
if !r.IsStatefulSetReady(ctx, r.K8sClient, instance, instance.Name, instance.Namespace) {
return intctrlutil.Reconciled()
}

Expand Down
26 changes: 26 additions & 0 deletions k8sutils/cluster-scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
redisv1beta2 "github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta2"
"github.com/go-logr/logr"
redis "github.com/redis/go-redis/v9"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -106,6 +107,31 @@ func getRedisClusterSlots(ctx context.Context, redisClient *redis.Client, logger
return strconv.Itoa(totalSlots)
}

// pingRedisNode will ping the redis node to check if it is up and running
func pingRedisNode(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr runtime.Object, pod RedisDetails) bool {
var redisClient *redis.Client

switch cr := cr.(type) {
case *redisv1beta2.RedisCluster:
redisClient = configureRedisClient(client, logger, cr, pod.PodName)
case *redisv1beta2.RedisReplication:
redisClient = configureRedisReplicationClient(client, logger, cr, pod.PodName)
default:
logger.Error(nil, "Unknown CR type")
return false
}

defer redisClient.Close()

pong, err := redisClient.Ping(ctx).Result()
if err != nil || pong != "PONG" {
logger.Error(err, "Failed to ping Redis server")
return false
}

return true
}

// getRedisNodeID would return nodeID of a redis node by passing pod
func getRedisNodeID(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster, pod RedisDetails) string {
redisClient := configureRedisClient(client, logger, cr, pod.PodName)
Expand Down
20 changes: 15 additions & 5 deletions k8sutils/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/env"
"k8s.io/utils/ptr"
)

type StatefulSet interface {
IsStatefulSetReady(ctx context.Context, namespace, name string) bool
IsStatefulSetReady(ctx context.Context, client kubernetes.Interface, cr runtime.Object, name, namespace string) bool
}

type StatefulSetService struct {
Expand All @@ -41,7 +42,7 @@ func NewStatefulSetService(kubeClient kubernetes.Interface, log logr.Logger) *St
}
}

func (s *StatefulSetService) IsStatefulSetReady(ctx context.Context, namespace, name string) bool {
func (s *StatefulSetService) IsStatefulSetReady(ctx context.Context, client kubernetes.Interface, cr runtime.Object, namespace string, name string) bool {
var (
partition = 0
replicas = 1
Expand Down Expand Up @@ -74,10 +75,19 @@ func (s *StatefulSetService) IsStatefulSetReady(ctx context.Context, namespace,
logger.V(1).Info("StatefulSet is not ready", "Status.ObservedGeneration", sts.Status.ObservedGeneration, "ObjectMeta.Generation", sts.ObjectMeta.Generation)
return false
}
if int(sts.Status.ReadyReplicas) != replicas {
logger.V(1).Info("StatefulSet is not ready", "Status.ReadyReplicas", sts.Status.ReadyReplicas, "Replicas", replicas)
return false

for i := 0; i < replicas; i++ {
pod := RedisDetails{
PodName: fmt.Sprintf("%s-%d", name, i),
Namespace: namespace,
}

if !pingRedisNode(ctx, client, logger, cr, pod) {
logger.V(1).Info("StatefulSet is not ready", "PingRedisNode", pod.PodName)
return false
}
}

return true
}

Expand Down

0 comments on commit d1e81b1

Please sign in to comment.