Skip to content

Commit

Permalink
[Feat][Association] Add GetRayClusterHeadPod
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Jul 24, 2024
1 parent 2da0db3 commit a90e3b3
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 20 deletions.
14 changes: 14 additions & 0 deletions ray-operator/controllers/ray/common/association.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package common

import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -159,3 +162,14 @@ func RayJobRayClusterNamespacedName(rayJob *rayv1.RayJob) types.NamespacedName {
Namespace: rayJob.Namespace,
}
}

func GetRayClusterHeadPod(ctx context.Context, reader client.Reader, instance *rayv1.RayCluster) (*corev1.Pod, error) {
if instance.Status.Head.PodName == "" {
return nil, fmt.Errorf("RayCluster %s in the namespace %s did not contain .Status.Head.PodName", instance.Name, instance.Namespace)
}
pod := &corev1.Pod{}
if err := reader.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Status.Head.PodName}, pod); err != nil {
return nil, err
}
return pod, nil
}
38 changes: 38 additions & 0 deletions ray-operator/controllers/ray/common/association_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
clientFake "sigs.k8s.io/controller-runtime/pkg/client/fake"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
Expand Down Expand Up @@ -256,3 +258,39 @@ func TestRayJobRayClusterNamespacedName(t *testing.T) {
t.Errorf("Expected %v, got %v", expected, result)
}
}

func TestGetRayClusterHeadPod(t *testing.T) {
// Create a new scheme with CRDs, Pod, Service schemes.
newScheme := runtime.NewScheme()
_ = rayv1.AddToScheme(newScheme)
_ = corev1.AddToScheme(newScheme)

// Mock data
cluster := rayv1.RayCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test-cluster",
Namespace: "default",
},
Status: rayv1.RayClusterStatus{
Head: rayv1.HeadInfo{
PodName: "head-pod",
},
},
}

headPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "head-pod",
Namespace: cluster.ObjectMeta.Namespace,
},
}

// Initialize a fake client with newScheme and runtimeObjects.
runtimeObjects := []runtime.Object{headPod}
fakeClient := clientFake.NewClientBuilder().WithScheme(newScheme).WithRuntimeObjects(runtimeObjects...).Build()
ctx := context.TODO()

ret, err := GetRayClusterHeadPod(ctx, fakeClient, &cluster)
assert.Nil(t, err)
assert.Equal(t, ret, headPod)
}
6 changes: 3 additions & 3 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,13 +1233,13 @@ func (r *RayClusterReconciler) getHeadPodIPAndName(ctx context.Context, instance
logger := ctrl.LoggerFrom(ctx)

runtimePods := corev1.PodList{}
filterLabels := client.MatchingLabels{utils.RayClusterLabelKey: instance.Name, utils.RayNodeTypeLabelKey: string(rayv1.HeadNode)}
if err := r.List(ctx, &runtimePods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
filterLabels := common.RayClusterHeadPodsAssociationOptions(instance)
if err := r.List(ctx, &runtimePods, filterLabels.ToListOptions()...); err != nil {
return "", "", err
}
if len(runtimePods.Items) != 1 {
logger.Info(fmt.Sprintf("Found %d head pods. cluster name %s, filter labels %v", len(runtimePods.Items), instance.Name, filterLabels))
return "", "", nil
return "", "", fmt.Errorf("unable to find the head. cluster name %s, filter labels %v", instance.Name, filterLabels)
}
return runtimePods.Items[0].Status.PodIP, runtimePods.Items[0].Name, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1326,12 +1326,12 @@ func TestGetHeadPodIPAndName(t *testing.T) {
"no error if there's no head node": {
pods: []runtime.Object{},
expectedIP: "",
returnsError: false,
returnsError: true,
},
"no error if there's more than one head node": {
pods: append(testPods, extraHeadPod),
expectedIP: "",
returnsError: false,
returnsError: true,
},
"no error if head pod ip is not yet set": {
pods: testPodsNoHeadIP,
Expand Down
17 changes: 2 additions & 15 deletions ray-operator/controllers/ray/rayservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"strings"
"time"

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/yaml"

Expand Down Expand Up @@ -1111,7 +1110,7 @@ func (r *RayServiceReconciler) reconcileServe(ctx context.Context, rayServiceIns
}

func (r *RayServiceReconciler) labelHeadPodForServeStatus(ctx context.Context, rayClusterInstance *rayv1.RayCluster) error {
headPod, err := r.getHeadPod(ctx, rayClusterInstance)
headPod, err := common.GetRayClusterHeadPod(ctx, r, rayClusterInstance)
if err != nil {
return err
}
Expand Down Expand Up @@ -1211,7 +1210,7 @@ func compareRayClusterJsonHash(spec1 rayv1.RayClusterSpec, spec2 rayv1.RayCluste

// isHeadPodRunningAndReady checks if the head pod of the RayCluster is running and ready.
func (r *RayServiceReconciler) isHeadPodRunningAndReady(ctx context.Context, instance *rayv1.RayCluster) (bool, error) {
headPod, err := r.getHeadPod(ctx, instance)
headPod, err := common.GetRayClusterHeadPod(ctx, r, instance)
if err != nil {
return false, err
}
Expand All @@ -1221,15 +1220,3 @@ func (r *RayServiceReconciler) isHeadPodRunningAndReady(ctx context.Context, ins
func isServeAppUnhealthyOrDeployedFailed(appStatus string) bool {
return appStatus == rayv1.ApplicationStatusEnum.UNHEALTHY || appStatus == rayv1.ApplicationStatusEnum.DEPLOY_FAILED
}

// TODO: Move this function to util.go and always use this function to retrieve the head Pod.
func (r *RayServiceReconciler) getHeadPod(ctx context.Context, instance *rayv1.RayCluster) (*corev1.Pod, error) {
if instance.Status.Head.PodName == "" {
return nil, fmt.Errorf("RayCluster %s in the namespace %s did not contain .Status.Head.PodName", instance.Name, instance.Namespace)
}
pod := &corev1.Pod{}
if err := r.Get(ctx, types.NamespacedName{Namespace: instance.Namespace, Name: instance.Status.Head.PodName}, pod); err != nil {
return nil, err
}
return pod, nil
}

0 comments on commit a90e3b3

Please sign in to comment.