Skip to content

Commit

Permalink
[Test][Autoscaler] Run Autoscaler e2e tests on buildkite and share th…
Browse files Browse the repository at this point in the history
…e support functions

Signed-off-by: Rueian <[email protected]>
  • Loading branch information
rueian committed Jun 27, 2024
1 parent 4879ee1 commit 3aff0be
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 184 deletions.
4 changes: 2 additions & 2 deletions ray-operator/test/e2e/rayjob_cluster_selector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ func TestRayJobWithClusterSelector(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Job scripts
jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py"))
jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "counter.py", "fail.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)

// RayCluster
rayClusterAC := rayv1ac.RayCluster("raycluster", namespace.Name).
WithSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))
WithSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs")))

rayCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), rayClusterAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down
10 changes: 5 additions & 5 deletions ray-operator/test/e2e/rayjob_lightweight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestRayJobLightWeightMode(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Job scripts
jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py", "stop.py"))
jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "counter.py", "fail.py", "stop.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)
Expand All @@ -49,8 +49,8 @@ env_vars:
"num-cpus": "4",
"resources": `'{"R1": 4}'`,
}).
WithTemplate(podTemplateSpecApplyConfiguration(headPodTemplateApplyConfiguration(),
mountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](jobs, "/home/ray/jobs"))))))
WithTemplate(podTemplateSpecApplyConfiguration(HeadPodTemplateApplyConfiguration(),
MountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](jobs, "/home/ray/jobs"))))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -83,7 +83,7 @@ env_vars:
WithSubmissionMode(rayv1.HTTPMode).
WithEntrypoint("python /home/ray/jobs/fail.py").
WithShutdownAfterJobFinishes(false).
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -114,7 +114,7 @@ env_vars:
WithSpec(rayv1ac.RayJobSpec().
WithSubmissionMode(rayv1.HTTPMode).
WithEntrypoint("python /home/ray/jobs/stop.py").
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/test/e2e/rayjob_suspend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestRayJobSuspend(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Job scripts
jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "long_running.py", "counter.py"))
jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "long_running.py", "counter.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)
Expand All @@ -31,7 +31,7 @@ func TestRayJobSuspend(t *testing.T) {
// RayJob
rayJobAC := rayv1ac.RayJob("long-running", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/long_running.py").
WithShutdownAfterJobFinishes(true).
WithTTLSecondsAfterFinished(600).
Expand Down Expand Up @@ -91,7 +91,7 @@ env_vars:
`).
WithShutdownAfterJobFinishes(true).
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()).
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down
14 changes: 7 additions & 7 deletions ray-operator/test/e2e/rayjob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRayJob(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Job scripts
jobsAC := newConfigMap(namespace.Name, "jobs", files(test, "counter.py", "fail.py", "stop.py", "long_running.py"))
jobsAC := NewConfigMap(namespace.Name, "jobs", Files(test, _files, "counter.py", "fail.py", "stop.py", "long_running.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)
Expand All @@ -30,7 +30,7 @@ func TestRayJob(t *testing.T) {
// RayJob
rayJobAC := rayv1ac.RayJob("counter", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/counter.py").
WithRuntimeEnvYAML(`
env_vars:
Expand Down Expand Up @@ -87,7 +87,7 @@ env_vars:
// RayJob
rayJobAC := rayv1ac.RayJob("fail", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/fail.py").
WithShutdownAfterJobFinishes(false).
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()))
Expand Down Expand Up @@ -132,7 +132,7 @@ env_vars:
// RayJob
rayJobAC := rayv1ac.RayJob("fail-k8s-job", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("The command will be overridden by the submitter Job").
WithShutdownAfterJobFinishes(true).
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()))
Expand Down Expand Up @@ -176,7 +176,7 @@ env_vars:
WithSpec(rayv1ac.RayJobSpec().
WithEntrypoint("python /home/ray/jobs/stop.py").
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()).
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -205,7 +205,7 @@ env_vars:
WithSpec(rayv1ac.RayJobSpec().
WithEntrypoint("python /home/ray/jobs/counter.py").
WithRuntimeEnvYAML(`invalid_yaml_string`).
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand All @@ -219,7 +219,7 @@ env_vars:
test.T().Run("RayJob has passed ActiveDeadlineSeconds", func(_ *testing.T) {
rayJobAC := rayv1ac.RayJob("long-running", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithRayClusterSpec(NewRayClusterSpec(MountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/long_running.py").
WithShutdownAfterJobFinishes(true).
WithTTLSecondsAfterFinished(600).
Expand Down
6 changes: 3 additions & 3 deletions ray-operator/test/e2e/rayservice_ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestRayService(t *testing.T) {
test.StreamKubeRayOperatorLogs()

// Scripts for creating and terminating detached actors to trigger autoscaling
scriptsAC := newConfigMap(namespace.Name, "scripts", files(test, "locustfile.py", "locust_runner.py"))
scriptsAC := NewConfigMap(namespace.Name, "scripts", Files(test, _files, "locustfile.py", "locust_runner.py"))
scripts, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), scriptsAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", scripts.Namespace, scripts.Name)
Expand All @@ -46,7 +46,7 @@ applications:
ray_actor_options:
num_cpus: 1
`).
WithRayClusterSpec(newRayClusterSpec()))
WithRayClusterSpec(NewRayClusterSpec()))

rayService, err := test.Client().Ray().RayV1().RayServices(namespace.Name).Apply(test.Ctx(), rayServiceAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
Expand All @@ -61,7 +61,7 @@ applications:
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}).
WithTemplate(apply(headPodTemplateApplyConfiguration(), mountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](scripts, "/home/ray/test_scripts")))))
WithTemplate(Apply(HeadPodTemplateApplyConfiguration(), MountConfigMap[corev1ac.PodTemplateSpecApplyConfiguration](scripts, "/home/ray/test_scripts")))))
locustCluster, err := test.Client().Ray().RayV1().RayClusters(namespace.Name).Apply(test.Ctx(), locustClusterAC, TestApplyOptions)
test.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created Locust RayCluster %s/%s successfully", locustCluster.Namespace, locustCluster.Name)
Expand Down
53 changes: 26 additions & 27 deletions ray-operator/test/e2e/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"embed"

"github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
corev1ac "k8s.io/client-go/applyconfigurations/core/v1"
Expand All @@ -16,23 +15,23 @@ import (
//go:embed *.py
var _files embed.FS

func ReadFile(t Test, fileName string) []byte {
func ReadFile(t Test, fs embed.FS, fileName string) []byte {
t.T().Helper()
file, err := _files.ReadFile(fileName)
file, err := fs.ReadFile(fileName)
t.Expect(err).NotTo(gomega.HaveOccurred())
return file
}

type option[T any] func(t *T) *T
type ApplyOption[T any] func(t *T) *T

func apply[T any](t *T, options ...option[T]) *T {
func Apply[T any](t *T, options ...ApplyOption[T]) *T {
for _, opt := range options {
t = opt(t)
}
return t
}

func options[T any](options ...option[T]) option[T] {
func Options[T any](options ...ApplyOption[T]) ApplyOption[T] {
return func(t *T) *T {
for _, opt := range options {
t = opt(t)
Expand All @@ -41,42 +40,42 @@ func options[T any](options ...option[T]) option[T] {
}
}

func newConfigMap(namespace, name string, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration {
func NewConfigMap(namespace, name string, options ...ApplyOption[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration {
cmAC := corev1ac.ConfigMap(name, namespace).
WithBinaryData(map[string][]byte{}).
WithImmutable(true)

return configMapWith(cmAC, options...)
return ConfigMapWith(cmAC, options...)
}

func configMapWith(configMapAC *corev1ac.ConfigMapApplyConfiguration, options ...option[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration {
return apply(configMapAC, options...)
func ConfigMapWith(configMapAC *corev1ac.ConfigMapApplyConfiguration, options ...ApplyOption[corev1ac.ConfigMapApplyConfiguration]) *corev1ac.ConfigMapApplyConfiguration {
return Apply(configMapAC, options...)
}

func file(t Test, fileName string) option[corev1ac.ConfigMapApplyConfiguration] {
func File(t Test, fs embed.FS, fileName string) ApplyOption[corev1ac.ConfigMapApplyConfiguration] {
return func(cmAC *corev1ac.ConfigMapApplyConfiguration) *corev1ac.ConfigMapApplyConfiguration {
cmAC.WithBinaryData(map[string][]byte{fileName: ReadFile(t, fileName)})
cmAC.WithBinaryData(map[string][]byte{fileName: ReadFile(t, fs, fileName)})
return cmAC
}
}

func files(t Test, fileNames ...string) option[corev1ac.ConfigMapApplyConfiguration] {
var files []option[corev1ac.ConfigMapApplyConfiguration]
func Files(t Test, fs embed.FS, fileNames ...string) ApplyOption[corev1ac.ConfigMapApplyConfiguration] {
var files []ApplyOption[corev1ac.ConfigMapApplyConfiguration]
for _, fileName := range fileNames {
files = append(files, file(t, fileName))
files = append(files, File(t, fs, fileName))
}
return options(files...)
return Options(files...)
}

func newRayClusterSpec(options ...option[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration {
return rayClusterSpecWith(rayClusterSpec(), options...)
func NewRayClusterSpec(options ...ApplyOption[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration {
return RayClusterSpecWith(rayClusterSpec(), options...)
}

func rayClusterSpecWith(spec *rayv1ac.RayClusterSpecApplyConfiguration, options ...option[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration {
return apply(spec, options...)
func RayClusterSpecWith(spec *rayv1ac.RayClusterSpecApplyConfiguration, options ...ApplyOption[rayv1ac.RayClusterSpecApplyConfiguration]) *rayv1ac.RayClusterSpecApplyConfiguration {
return Apply(spec, options...)
}

func mountConfigMap[T rayv1ac.RayClusterSpecApplyConfiguration | corev1ac.PodTemplateSpecApplyConfiguration](configMap *corev1.ConfigMap, mountPath string) option[T] {
func MountConfigMap[T rayv1ac.RayClusterSpecApplyConfiguration | corev1ac.PodTemplateSpecApplyConfiguration](configMap *corev1.ConfigMap, mountPath string) ApplyOption[T] {
return func(t *T) *T {
switch obj := (interface{})(t).(type) {
case *rayv1ac.RayClusterSpecApplyConfiguration:
Expand Down Expand Up @@ -104,21 +103,21 @@ func rayClusterSpec() *rayv1ac.RayClusterSpecApplyConfiguration {
WithRayVersion(GetRayVersion()).
WithHeadGroupSpec(rayv1ac.HeadGroupSpec().
WithRayStartParams(map[string]string{"dashboard-host": "0.0.0.0"}).
WithTemplate(headPodTemplateApplyConfiguration())).
WithTemplate(HeadPodTemplateApplyConfiguration())).
WithWorkerGroupSpecs(rayv1ac.WorkerGroupSpec().
WithReplicas(1).
WithMinReplicas(1).
WithMaxReplicas(1).
WithGroupName("small-group").
WithRayStartParams(map[string]string{"num-cpus": "1"}).
WithTemplate(workerPodTemplateApplyConfiguration()))
WithTemplate(WorkerPodTemplateApplyConfiguration()))
}

func podTemplateSpecApplyConfiguration(template *corev1ac.PodTemplateSpecApplyConfiguration, options ...option[corev1ac.PodTemplateSpecApplyConfiguration]) *corev1ac.PodTemplateSpecApplyConfiguration {
return apply(template, options...)
func podTemplateSpecApplyConfiguration(template *corev1ac.PodTemplateSpecApplyConfiguration, options ...ApplyOption[corev1ac.PodTemplateSpecApplyConfiguration]) *corev1ac.PodTemplateSpecApplyConfiguration {
return Apply(template, options...)
}

func headPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration {
func HeadPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration {
return corev1ac.PodTemplateSpec().
WithSpec(corev1ac.PodSpec().
WithContainers(corev1ac.Container().
Expand All @@ -141,7 +140,7 @@ func headPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfigura
}))))
}

func workerPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration {
func WorkerPodTemplateApplyConfiguration() *corev1ac.PodTemplateSpecApplyConfiguration {
return corev1ac.PodTemplateSpec().
WithSpec(corev1ac.PodSpec().
WithContainers(corev1ac.Container().
Expand Down
Loading

0 comments on commit 3aff0be

Please sign in to comment.