Skip to content

Commit

Permalink
clean up some global vars
Browse files Browse the repository at this point in the history
  • Loading branch information
skonto committed Mar 17, 2024
1 parent 99c5a3a commit a0572b6
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 20 deletions.
38 changes: 29 additions & 9 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var (
const (
DefaultRequeueDuration = 2 * time.Second
ForcedClusterUpgrade bool
EnableBatchScheduler bool

// Definition of a index field for pod name
podUIDIndexField = "metadata.uid"
Expand Down Expand Up @@ -648,7 +646,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
if err := r.List(ctx, &headPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return err
}
if EnableBatchScheduler {
if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler {
if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(instance); err == nil {
if err := scheduler.DoBatchSchedulingOnSubmission(ctx, instance); err != nil {
return err
Expand Down Expand Up @@ -705,7 +703,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
}
}

if ForcedClusterUpgrade {
if cc := getClusterConfig(ctx); cc != nil && cc.ForcedClusterUpgrade {
if len(headPods.Items) == 1 {
// head node amount is exactly 1, but we need to check if it has been changed
res := utils.PodNotMatchingTemplate(headPods.Items[0], instance.Spec.HeadGroupSpec.Template)
Expand Down Expand Up @@ -1026,7 +1024,7 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1
Name: pod.Name,
Namespace: pod.Namespace,
}
if EnableBatchScheduler {
if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler {
if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(&instance); err == nil {
scheduler.AddMetadataToPod(&instance, utils.RayNodeHeadGroupLabelValue, &pod)
} else {
Expand Down Expand Up @@ -1063,7 +1061,7 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray
Name: pod.Name,
Namespace: pod.Namespace,
}
if EnableBatchScheduler {
if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler {
if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(&instance); err == nil {
scheduler.AddMetadataToPod(&instance, worker.GroupName, &pod)
} else {
Expand Down Expand Up @@ -1223,7 +1221,7 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(ctx context.Context, instanc
}

// SetupWithManager builds the reconciler.
func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error {
func (r *RayClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, reconcileConcurrency int) error {
b := ctrl.NewControllerManagedBy(mgr).
For(&rayv1.RayCluster{}, builder.WithPredicates(predicate.Or(
predicate.GenerationChangedPredicate{},
Expand All @@ -1233,7 +1231,7 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
Owns(&corev1.Pod{}).
Owns(&corev1.Service{})

if EnableBatchScheduler {
if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler {
b = batchscheduler.ConfigureReconciler(b)
}

Expand Down Expand Up @@ -1570,3 +1568,25 @@ func sumGPUs(resources map[corev1.ResourceName]resource.Quantity) resource.Quant

return totalGPUs
}

type clusterConfig struct{}

// ClusterConfig holds info about the cluster configuration
type ClusterConfig struct {
ForcedClusterUpgrade bool
EnableBatchScheduler bool
}

// WithClusterConfig stores configuration about the cluster in the context.
func WithClusterConfig(ctx context.Context, cc ClusterConfig) context.Context {
return context.WithValue(ctx, clusterConfig{}, &cc)
}

// getClusterConfig extracts a configuration value, if present.
func getClusterConfig(ctx context.Context) *ClusterConfig {
r := ctx.Value(clusterConfig{})
if r == nil {
return nil
}
return r.(*ClusterConfig)
}
2 changes: 1 addition & 1 deletion ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ var _ = BeforeSuite(func(ctx SpecContext) {
},
},
}
err = NewReconciler(ctx, mgr, options).SetupWithManager(mgr, 1)
err = NewReconciler(ctx, mgr, options).SetupWithManager(ctx, mgr, 1)
Expect(err).NotTo(HaveOccurred(), "failed to setup RayCluster controller")

err = NewRayServiceReconciler(ctx, mgr, func() utils.RayDashboardClientInterface {
Expand Down
25 changes: 15 additions & 10 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func main() {
var logFileEncoder string
var logStdoutEncoder string
var configFile string
var EnableBatchScheduler bool
var ForcedClusterUpgrade bool

// TODO: remove flag-based config once Configuration API graduates to v1.
flag.BoolVar(&version, "version", false, "Show the version information.")
Expand All @@ -83,15 +85,15 @@ func main() {
"watch-namespace",
"",
"Specify a list of namespaces to watch for custom resources, separated by commas. If left empty, all namespaces will be watched.")
flag.BoolVar(&ray.ForcedClusterUpgrade, "forced-cluster-upgrade", false,
flag.BoolVar(&ForcedClusterUpgrade, "forced-cluster-upgrade", false,
"Forced cluster upgrade flag")
flag.StringVar(&logFile, "log-file-path", "",
"Synchronize logs to local file")
flag.StringVar(&logFileEncoder, "log-file-encoder", "json",
"Encoder to use for log file. Valid values are 'json' and 'console'. Defaults to 'json'")
flag.StringVar(&logStdoutEncoder, "log-stdout-encoder", "json",
"Encoder to use for logging stdout. Valid values are 'json' and 'console'. Defaults to 'json'")
flag.BoolVar(&ray.EnableBatchScheduler, "enable-batch-scheduler", false,
flag.BoolVar(&EnableBatchScheduler, "enable-batch-scheduler", false,
"Enable batch scheduler. Currently is volcano, which supports gang scheduler policy.")
flag.StringVar(&configFile, "config", "", "Path to structured config file. Flags are ignored if config file is set.")

Expand All @@ -116,21 +118,20 @@ func main() {
config, err = decodeConfig(configData, scheme)
exitOnError(err, "failed to decode config file")

// TODO: remove globally-scoped variables
ray.ForcedClusterUpgrade = config.ForcedClusterUpgrade
ray.EnableBatchScheduler = config.EnableBatchScheduler
ForcedClusterUpgrade = config.ForcedClusterUpgrade
EnableBatchScheduler = config.EnableBatchScheduler
} else {
config.MetricsAddr = metricsAddr
config.ProbeAddr = probeAddr
config.EnableLeaderElection = &enableLeaderElection
config.LeaderElectionNamespace = leaderElectionNamespace
config.ReconcileConcurrency = reconcileConcurrency
config.WatchNamespace = watchNamespace
config.ForcedClusterUpgrade = ray.ForcedClusterUpgrade
config.ForcedClusterUpgrade = ForcedClusterUpgrade
config.LogFile = logFile
config.LogFileEncoder = logFileEncoder
config.LogStdoutEncoder = logStdoutEncoder
config.EnableBatchScheduler = ray.EnableBatchScheduler
config.EnableBatchScheduler = EnableBatchScheduler
}

stdoutEncoder, err := newLogEncoder(logStdoutEncoder)
Expand Down Expand Up @@ -162,10 +163,10 @@ func main() {
}

setupLog.Info("the operator", "version:", os.Getenv("OPERATOR_VERSION"))
if ray.ForcedClusterUpgrade {
if ForcedClusterUpgrade {
setupLog.Info("Feature flag forced-cluster-upgrade is enabled.")
}
if ray.EnableBatchScheduler {
if EnableBatchScheduler {
setupLog.Info("Feature flag enable-batch-scheduler is enabled.")
}

Expand Down Expand Up @@ -220,7 +221,11 @@ func main() {
WorkerSidecarContainers: config.WorkerSidecarContainers,
}
ctx := ctrl.SetupSignalHandler()
exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(mgr, config.ReconcileConcurrency),
ctx = ray.WithClusterConfig(ctx, ray.ClusterConfig{
ForcedClusterUpgrade: config.ForcedClusterUpgrade,
EnableBatchScheduler: config.EnableBatchScheduler,
})
exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(ctx, mgr, config.ReconcileConcurrency),
"unable to create controller", "controller", "RayCluster")
exitOnError(ray.NewRayServiceReconciler(ctx, mgr, utils.GetRayDashboardClient, utils.GetRayHttpProxyClient).SetupWithManager(mgr),
"unable to create controller", "controller", "RayService")
Expand Down

0 comments on commit a0572b6

Please sign in to comment.