Skip to content

Commit

Permalink
[Refactor][RayCluster] Unify status update to single place
Browse files Browse the repository at this point in the history
Resolves: ray-project#2235
Signed-off-by: Chi-Sheng Liu <[email protected]>
  • Loading branch information
MortalHappiness committed Jul 16, 2024
1 parent 9831375 commit 4cd9bb1
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 93 deletions.
175 changes: 85 additions & 90 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

type reconcileFunc func(context.Context, *rayv1.RayCluster) error

var (
DefaultRequeueDuration = 2 * time.Second
EnableBatchScheduler bool
Expand Down Expand Up @@ -194,6 +196,7 @@ func (r *RayClusterReconciler) deleteAllPods(ctx context.Context, filters common
}

func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request ctrl.Request, instance *rayv1.RayCluster) (ctrl.Result, error) {
var reconcileErr error
logger := ctrl.LoggerFrom(ctx)

// Please do NOT modify `originalRayClusterInstance` in the following code.
Expand Down Expand Up @@ -296,79 +299,66 @@ func (r *RayClusterReconciler) rayClusterReconcile(ctx context.Context, request
return ctrl.Result{}, nil
}

if err := r.reconcileAutoscalerServiceAccount(ctx, instance); err != nil {
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

if err := r.reconcileAutoscalerRole(ctx, instance); err != nil {
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if err := r.reconcileAutoscalerRoleBinding(ctx, instance); err != nil {
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if err := r.reconcileIngress(ctx, instance); err != nil {
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if err := r.reconcileHeadService(ctx, instance); err != nil {
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
if err := r.reconcileHeadlessService(ctx, instance); err != nil {
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
// Only reconcile the K8s service for Ray Serve when the "ray.io/enable-serve-service" annotation is set to true.
if enableServeServiceValue, exist := instance.Annotations[utils.EnableServeServiceKey]; exist && enableServeServiceValue == utils.EnableServeServiceTrue {
if err := r.reconcileServeService(ctx, instance); err != nil {
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
reconcileFuncs := []reconcileFunc{
r.reconcileAutoscalerServiceAccount,
r.reconcileAutoscalerRole,
r.reconcileAutoscalerRoleBinding,
r.reconcileIngress,
r.reconcileHeadService,
r.reconcileHeadlessService,
// reconcileServeService
func(ctx context.Context, instance *rayv1.RayCluster) error {
// Only reconcile the K8s service for Ray Serve when the "ray.io/enable-serve-service" annotation is set to true.
if enableServeServiceValue, exist := instance.Annotations[utils.EnableServeServiceKey]; exist && enableServeServiceValue == utils.EnableServeServiceTrue {
return r.reconcileServeService(ctx, instance)
}
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}
return nil
},
// reconcilePods
func(ctx context.Context, instance *rayv1.RayCluster) error {
err := r.reconcilePods(ctx, instance)
if err != nil {
r.Recorder.Event(instance, corev1.EventTypeWarning, string(rayv1.PodReconciliationError), err.Error())
}
return err
},
}
if err := r.reconcilePods(ctx, instance); err != nil {
if updateErr := r.updateClusterState(ctx, instance, rayv1.Failed); updateErr != nil {
logger.Error(updateErr, "RayCluster update state error", "cluster name", request.Name)
}
if updateErr := r.updateClusterReason(ctx, instance, err.Error()); updateErr != nil {
logger.Error(updateErr, "RayCluster update reason error", "cluster name", request.Name)

for _, fn := range reconcileFuncs {
if reconcileErr = fn(ctx, instance); reconcileErr != nil {
break
}
r.Recorder.Event(instance, corev1.EventTypeWarning, string(rayv1.PodReconciliationError), err.Error())
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

// Calculate the new status for the RayCluster. Note that the function will deep copy `instance` instead of mutating it.
newInstance, err := r.calculateStatus(ctx, instance)
if err != nil {
logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", err)
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
newInstance, calculateErr := r.calculateStatus(ctx, instance, reconcileErr)
if calculateErr != nil {
logger.Info("Got error when calculating new status", "cluster name", request.Name, "error", calculateErr)
}

// Check if need to update the status.
if r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) {
// Check if the status needs to be updated.
var updateErr error
if calculateErr == nil && r.inconsistentRayClusterStatus(ctx, originalRayClusterInstance.Status, newInstance.Status) {
logger.Info("rayClusterReconcile", "Update CR status", request.Name, "status", newInstance.Status)
if err := r.Status().Update(ctx, newInstance); err != nil {
logger.Info("Got error when updating status", "cluster name", request.Name, "error", err, "RayCluster", newInstance)
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
updateErr = r.Status().Update(ctx, newInstance)
if updateErr != nil {
logger.Info("Got error when updating status", "cluster name", request.Name, "error", updateErr, "RayCluster", newInstance)
}
}

// Return error based on order.
var err error
if reconcileErr != nil {
err = reconcileErr
} else if calculateErr != nil {
err = calculateErr
} else {
err = updateErr
}
if err != nil {
return ctrl.Result{RequeueAfter: DefaultRequeueDuration}, err
}

// Unconditionally requeue after the number of seconds specified in the
// environment variable RAYCLUSTER_DEFAULT_REQUEUE_SECONDS_ENV. If the
// environment variable is not set, requeue after the default value.
Expand Down Expand Up @@ -1168,45 +1158,50 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
Complete(r)
}

func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1.RayCluster) (*rayv1.RayCluster, error) {
func (r *RayClusterReconciler) calculateStatus(ctx context.Context, instance *rayv1.RayCluster, reconcileErr error) (*rayv1.RayCluster, error) {
// Deep copy the instance, so we don't mutate the original object.
newInstance := instance.DeepCopy()

// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
newInstance.Status.ObservedGeneration = newInstance.ObjectMeta.Generation
if reconcileErr != nil {
newInstance.Status.State = rayv1.Failed
newInstance.Status.Reason = reconcileErr.Error()
} else {
// TODO (kevin85421): ObservedGeneration should be used to determine whether to update this CR or not.
newInstance.Status.ObservedGeneration = newInstance.ObjectMeta.Generation

runtimePods := corev1.PodList{}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: newInstance.Name}
if err := r.List(ctx, &runtimePods, client.InNamespace(newInstance.Namespace), filterLabels); err != nil {
return nil, err
}
runtimePods := corev1.PodList{}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: newInstance.Name}
if err := r.List(ctx, &runtimePods, client.InNamespace(newInstance.Namespace), filterLabels); err != nil {
return nil, err
}

newInstance.Status.ReadyWorkerReplicas = utils.CalculateReadyReplicas(runtimePods)
newInstance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
newInstance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(ctx, newInstance)
newInstance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(newInstance)
newInstance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(newInstance)
newInstance.Status.ReadyWorkerReplicas = utils.CalculateReadyReplicas(runtimePods)
newInstance.Status.AvailableWorkerReplicas = utils.CalculateAvailableReplicas(runtimePods)
newInstance.Status.DesiredWorkerReplicas = utils.CalculateDesiredReplicas(ctx, newInstance)
newInstance.Status.MinWorkerReplicas = utils.CalculateMinReplicas(newInstance)
newInstance.Status.MaxWorkerReplicas = utils.CalculateMaxReplicas(newInstance)

totalResources := utils.CalculateDesiredResources(newInstance)
newInstance.Status.DesiredCPU = totalResources[corev1.ResourceCPU]
newInstance.Status.DesiredMemory = totalResources[corev1.ResourceMemory]
newInstance.Status.DesiredGPU = sumGPUs(totalResources)
newInstance.Status.DesiredTPU = totalResources[corev1.ResourceName("google.com/tpu")]
totalResources := utils.CalculateDesiredResources(newInstance)
newInstance.Status.DesiredCPU = totalResources[corev1.ResourceCPU]
newInstance.Status.DesiredMemory = totalResources[corev1.ResourceMemory]
newInstance.Status.DesiredGPU = sumGPUs(totalResources)
newInstance.Status.DesiredTPU = totalResources[corev1.ResourceName("google.com/tpu")]

if utils.CheckAllPodsRunning(ctx, runtimePods) {
newInstance.Status.State = rayv1.Ready
}
if utils.CheckAllPodsRunning(ctx, runtimePods) {
newInstance.Status.State = rayv1.Ready
}

if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
newInstance.Status.State = rayv1.Suspended
}
if newInstance.Spec.Suspend != nil && *newInstance.Spec.Suspend && len(runtimePods.Items) == 0 {
newInstance.Status.State = rayv1.Suspended
}

if err := r.updateEndpoints(ctx, newInstance); err != nil {
return nil, err
}
if err := r.updateEndpoints(ctx, newInstance); err != nil {
return nil, err
}

if err := r.updateHeadInfo(ctx, newInstance); err != nil {
return nil, err
if err := r.updateHeadInfo(ctx, newInstance); err != nil {
return nil, err
}
}

timeNow := metav1.Now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1496,7 +1496,7 @@ func TestUpdateStatusObservedGeneration(t *testing.T) {
}

// Compare the values of `Generation` and `ObservedGeneration` to check if they match.
newInstance, err := testRayClusterReconciler.calculateStatus(ctx, testRayCluster)
newInstance, err := testRayClusterReconciler.calculateStatus(ctx, testRayCluster, nil)
assert.Nil(t, err)
err = fakeClient.Get(ctx, namespacedName, &cluster)
assert.Nil(t, err)
Expand Down Expand Up @@ -1676,7 +1676,7 @@ func TestCalculateStatus(t *testing.T) {
}

// Test head information
newInstance, err := r.calculateStatus(ctx, testRayCluster)
newInstance, err := r.calculateStatus(ctx, testRayCluster, nil)
assert.Nil(t, err)
assert.Equal(t, headNodeIP, newInstance.Status.Head.PodIP)
assert.Equal(t, headServiceIP, newInstance.Status.Head.ServiceIP)
Expand Down Expand Up @@ -1729,7 +1729,7 @@ func TestStateTransitionTimes_NoStateChange(t *testing.T) {
preUpdateTime := metav1.Now()
testRayCluster.Status.State = rayv1.Ready
testRayCluster.Status.StateTransitionTimes = map[rayv1.ClusterState]*metav1.Time{rayv1.Ready: &preUpdateTime}
newInstance, err := r.calculateStatus(ctx, testRayCluster)
newInstance, err := r.calculateStatus(ctx, testRayCluster, nil)
assert.Nil(t, err)
assert.Equal(t, preUpdateTime, *newInstance.Status.StateTransitionTimes[rayv1.Ready], "Cluster state transition timestamp should not be updated")
}
Expand Down

0 comments on commit 4cd9bb1

Please sign in to comment.