From a0572b660f06eaa16f1c86505c54b86faab2ed6d Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Mon, 18 Mar 2024 00:14:31 +0200 Subject: [PATCH] clean up some global vars --- .../controllers/ray/raycluster_controller.go | 38 ++++++++++++++----- ray-operator/controllers/ray/suite_test.go | 2 +- ray-operator/main.go | 25 +++++++----- 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/ray-operator/controllers/ray/raycluster_controller.go b/ray-operator/controllers/ray/raycluster_controller.go index ca57c12557d..bd0063ea21b 100644 --- a/ray-operator/controllers/ray/raycluster_controller.go +++ b/ray-operator/controllers/ray/raycluster_controller.go @@ -44,10 +44,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -var ( +const ( DefaultRequeueDuration = 2 * time.Second - ForcedClusterUpgrade bool - EnableBatchScheduler bool // Definition of a index field for pod name podUIDIndexField = "metadata.uid" @@ -648,7 +646,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv if err := r.List(ctx, &headPods, client.InNamespace(instance.Namespace), filterLabels); err != nil { return err } - if EnableBatchScheduler { + if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler { if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(instance); err == nil { if err := scheduler.DoBatchSchedulingOnSubmission(ctx, instance); err != nil { return err @@ -705,7 +703,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv } } - if ForcedClusterUpgrade { + if cc := getClusterConfig(ctx); cc != nil && cc.ForcedClusterUpgrade { if len(headPods.Items) == 1 { // head node amount is exactly 1, but we need to check if it has been changed res := utils.PodNotMatchingTemplate(headPods.Items[0], instance.Spec.HeadGroupSpec.Template) @@ -1026,7 +1024,7 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1 Name: pod.Name, Namespace: pod.Namespace, } - if EnableBatchScheduler { + if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler { if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(&instance); err == nil { scheduler.AddMetadataToPod(&instance, utils.RayNodeHeadGroupLabelValue, &pod) } else { @@ -1063,7 +1061,7 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray Name: pod.Name, Namespace: pod.Namespace, } - if EnableBatchScheduler { + if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler { if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(&instance); err == nil { scheduler.AddMetadataToPod(&instance, worker.GroupName, &pod) } else { @@ -1223,7 +1221,7 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(ctx context.Context, instanc } // SetupWithManager builds the reconciler. -func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error { +func (r *RayClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, reconcileConcurrency int) error { b := ctrl.NewControllerManagedBy(mgr). For(&rayv1.RayCluster{}, builder.WithPredicates(predicate.Or( predicate.GenerationChangedPredicate{}, @@ -1233,7 +1231,7 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu Owns(&corev1.Pod{}). Owns(&corev1.Service{}) - if EnableBatchScheduler { + if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler { b = batchscheduler.ConfigureReconciler(b) } @@ -1570,3 +1568,25 @@ func sumGPUs(resources map[corev1.ResourceName]resource.Quantity) resource.Quant return totalGPUs } + +type clusterConfig struct{} + +// ClusterConfig holds info about the cluster configuration +type ClusterConfig struct { + ForcedClusterUpgrade bool + EnableBatchScheduler bool +} + +// WithClusterConfig stores configuration about the cluster in the context. +func WithClusterConfig(ctx context.Context, cc ClusterConfig) context.Context { + return context.WithValue(ctx, clusterConfig{}, &cc) +} + +// getClusterConfig extracts a configuration value, if present. +func getClusterConfig(ctx context.Context) *ClusterConfig { + r := ctx.Value(clusterConfig{}) + if r == nil { + return nil + } + return r.(*ClusterConfig) +} diff --git a/ray-operator/controllers/ray/suite_test.go b/ray-operator/controllers/ray/suite_test.go index 05b4d385f7f..0708ea1e91a 100644 --- a/ray-operator/controllers/ray/suite_test.go +++ b/ray-operator/controllers/ray/suite_test.go @@ -104,7 +104,7 @@ var _ = BeforeSuite(func(ctx SpecContext) { }, }, } - err = NewReconciler(ctx, mgr, options).SetupWithManager(mgr, 1) + err = NewReconciler(ctx, mgr, options).SetupWithManager(ctx, mgr, 1) Expect(err).NotTo(HaveOccurred(), "failed to setup RayCluster controller") err = NewRayServiceReconciler(ctx, mgr, func() utils.RayDashboardClientInterface { diff --git a/ray-operator/main.go b/ray-operator/main.go index 3db588a9365..4045fe30d0a 100644 --- a/ray-operator/main.go +++ b/ray-operator/main.go @@ -68,6 +68,8 @@ func main() { var logFileEncoder string var logStdoutEncoder string var configFile string + var EnableBatchScheduler bool + var ForcedClusterUpgrade bool // TODO: remove flag-based config once Configuration API graduates to v1. flag.BoolVar(&version, "version", false, "Show the version information.") @@ -83,7 +85,7 @@ func main() { "watch-namespace", "", "Specify a list of namespaces to watch for custom resources, separated by commas. If left empty, all namespaces will be watched.") - flag.BoolVar(&ray.ForcedClusterUpgrade, "forced-cluster-upgrade", false, + flag.BoolVar(&ForcedClusterUpgrade, "forced-cluster-upgrade", false, "Forced cluster upgrade flag") flag.StringVar(&logFile, "log-file-path", "", "Synchronize logs to local file") @@ -91,7 +93,7 @@ func main() { "Encoder to use for log file. Valid values are 'json' and 'console'. Defaults to 'json'") flag.StringVar(&logStdoutEncoder, "log-stdout-encoder", "json", "Encoder to use for logging stdout. Valid values are 'json' and 'console'. Defaults to 'json'") - flag.BoolVar(&ray.EnableBatchScheduler, "enable-batch-scheduler", false, + flag.BoolVar(&EnableBatchScheduler, "enable-batch-scheduler", false, "Enable batch scheduler. Currently is volcano, which supports gang scheduler policy.") flag.StringVar(&configFile, "config", "", "Path to structured config file. Flags are ignored if config file is set.") @@ -116,9 +118,8 @@ func main() { config, err = decodeConfig(configData, scheme) exitOnError(err, "failed to decode config file") - // TODO: remove globally-scoped variables - ray.ForcedClusterUpgrade = config.ForcedClusterUpgrade - ray.EnableBatchScheduler = config.EnableBatchScheduler + ForcedClusterUpgrade = config.ForcedClusterUpgrade + EnableBatchScheduler = config.EnableBatchScheduler } else { config.MetricsAddr = metricsAddr config.ProbeAddr = probeAddr @@ -126,11 +127,11 @@ func main() { config.LeaderElectionNamespace = leaderElectionNamespace config.ReconcileConcurrency = reconcileConcurrency config.WatchNamespace = watchNamespace - config.ForcedClusterUpgrade = ray.ForcedClusterUpgrade + config.ForcedClusterUpgrade = ForcedClusterUpgrade config.LogFile = logFile config.LogFileEncoder = logFileEncoder config.LogStdoutEncoder = logStdoutEncoder - config.EnableBatchScheduler = ray.EnableBatchScheduler + config.EnableBatchScheduler = EnableBatchScheduler } stdoutEncoder, err := newLogEncoder(logStdoutEncoder) @@ -162,10 +163,10 @@ func main() { } setupLog.Info("the operator", "version:", os.Getenv("OPERATOR_VERSION")) - if ray.ForcedClusterUpgrade { + if ForcedClusterUpgrade { setupLog.Info("Feature flag forced-cluster-upgrade is enabled.") } - if ray.EnableBatchScheduler { + if EnableBatchScheduler { setupLog.Info("Feature flag enable-batch-scheduler is enabled.") } @@ -220,7 +221,11 @@ func main() { WorkerSidecarContainers: config.WorkerSidecarContainers, } ctx := ctrl.SetupSignalHandler() - exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(mgr, config.ReconcileConcurrency), + ctx = ray.WithClusterConfig(ctx, ray.ClusterConfig{ + ForcedClusterUpgrade: config.ForcedClusterUpgrade, + EnableBatchScheduler: config.EnableBatchScheduler, + }) + exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(ctx, mgr, config.ReconcileConcurrency), "unable to create controller", "controller", "RayCluster") exitOnError(ray.NewRayServiceReconciler(ctx, mgr, utils.GetRayDashboardClient, utils.GetRayHttpProxyClient).SetupWithManager(mgr), "unable to create controller", "controller", "RayService")