Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up some global vars #2025

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions ray-operator/controllers/ray/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var (
const (
DefaultRequeueDuration = 2 * time.Second
ForcedClusterUpgrade bool
EnableBatchScheduler bool

// Definition of a index field for pod name
podUIDIndexField = "metadata.uid"
Expand Down Expand Up @@ -648,7 +646,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
if err := r.List(ctx, &headPods, client.InNamespace(instance.Namespace), filterLabels); err != nil {
return err
}
if EnableBatchScheduler {
if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler {
if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(instance); err == nil {
if err := scheduler.DoBatchSchedulingOnSubmission(ctx, instance); err != nil {
return err
Expand Down Expand Up @@ -705,7 +703,7 @@ func (r *RayClusterReconciler) reconcilePods(ctx context.Context, instance *rayv
}
}

if ForcedClusterUpgrade {
if cc := getClusterConfig(ctx); cc != nil && cc.ForcedClusterUpgrade {
if len(headPods.Items) == 1 {
// head node amount is exactly 1, but we need to check if it has been changed
res := utils.PodNotMatchingTemplate(headPods.Items[0], instance.Spec.HeadGroupSpec.Template)
Expand Down Expand Up @@ -1026,7 +1024,7 @@ func (r *RayClusterReconciler) createHeadPod(ctx context.Context, instance rayv1
Name: pod.Name,
Namespace: pod.Namespace,
}
if EnableBatchScheduler {
if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler {
if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(&instance); err == nil {
scheduler.AddMetadataToPod(&instance, utils.RayNodeHeadGroupLabelValue, &pod)
} else {
Expand Down Expand Up @@ -1063,7 +1061,7 @@ func (r *RayClusterReconciler) createWorkerPod(ctx context.Context, instance ray
Name: pod.Name,
Namespace: pod.Namespace,
}
if EnableBatchScheduler {
if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler {
if scheduler, err := r.BatchSchedulerMgr.GetSchedulerForCluster(&instance); err == nil {
scheduler.AddMetadataToPod(&instance, worker.GroupName, &pod)
} else {
Expand Down Expand Up @@ -1223,7 +1221,7 @@ func (r *RayClusterReconciler) buildRedisCleanupJob(ctx context.Context, instanc
}

// SetupWithManager builds the reconciler.
func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error {
func (r *RayClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, reconcileConcurrency int) error {
b := ctrl.NewControllerManagedBy(mgr).
For(&rayv1.RayCluster{}, builder.WithPredicates(predicate.Or(
predicate.GenerationChangedPredicate{},
Expand All @@ -1233,7 +1231,7 @@ func (r *RayClusterReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcu
Owns(&corev1.Pod{}).
Owns(&corev1.Service{})

if EnableBatchScheduler {
if cc := getClusterConfig(ctx); cc != nil && cc.EnableBatchScheduler {
b = batchscheduler.ConfigureReconciler(b)
}

Expand Down Expand Up @@ -1570,3 +1568,25 @@ func sumGPUs(resources map[corev1.ResourceName]resource.Quantity) resource.Quant

return totalGPUs
}

type clusterConfig struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type clusterConfig struct{}
type clusterConfigCtxKey struct{}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about clarifying the purpose of this struct by adding something like CtxKey?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it should be renamed. Kind of missed it when refactoring.


// ClusterConfig holds info about the cluster configuration
type ClusterConfig struct {
ForcedClusterUpgrade bool
EnableBatchScheduler bool
}

// WithClusterConfig stores configuration about the cluster in the context.
func WithClusterConfig(ctx context.Context, cc ClusterConfig) context.Context {
return context.WithValue(ctx, clusterConfig{}, &cc)
}

// getClusterConfig extracts a configuration value, if present.
func getClusterConfig(ctx context.Context) *ClusterConfig {
r := ctx.Value(clusterConfig{})
if r == nil {
return nil
}
return r.(*ClusterConfig)
}
2 changes: 1 addition & 1 deletion ray-operator/controllers/ray/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ var _ = BeforeSuite(func(ctx SpecContext) {
},
},
}
err = NewReconciler(ctx, mgr, options).SetupWithManager(mgr, 1)
err = NewReconciler(ctx, mgr, options).SetupWithManager(ctx, mgr, 1)
Expect(err).NotTo(HaveOccurred(), "failed to setup RayCluster controller")

err = NewRayServiceReconciler(ctx, mgr, func() utils.RayDashboardClientInterface {
Expand Down
25 changes: 15 additions & 10 deletions ray-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func main() {
var logFileEncoder string
var logStdoutEncoder string
var configFile string
var EnableBatchScheduler bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the EnableBatchScheduler and ForcedClusterUpgrade variables be removed from the raycluster_controller.go file?

var ForcedClusterUpgrade bool

// TODO: remove flag-based config once Configuration API graduates to v1.
flag.StringVar(&metricsAddr, "metrics-addr", configapi.DefaultMetricsAddr, "The address the metric endpoint binds to.")
Expand All @@ -78,15 +80,15 @@ func main() {
"watch-namespace",
"",
"Specify a list of namespaces to watch for custom resources, separated by commas. If left empty, all namespaces will be watched.")
flag.BoolVar(&ray.ForcedClusterUpgrade, "forced-cluster-upgrade", false,
flag.BoolVar(&ForcedClusterUpgrade, "forced-cluster-upgrade", false,
"Forced cluster upgrade flag")
flag.StringVar(&logFile, "log-file-path", "",
"Synchronize logs to local file")
flag.StringVar(&logFileEncoder, "log-file-encoder", "json",
"Encoder to use for log file. Valid values are 'json' and 'console'. Defaults to 'json'")
flag.StringVar(&logStdoutEncoder, "log-stdout-encoder", "json",
"Encoder to use for logging stdout. Valid values are 'json' and 'console'. Defaults to 'json'")
flag.BoolVar(&ray.EnableBatchScheduler, "enable-batch-scheduler", false,
flag.BoolVar(&EnableBatchScheduler, "enable-batch-scheduler", false,
"Enable batch scheduler. Currently is volcano, which supports gang scheduler policy.")
flag.StringVar(&configFile, "config", "", "Path to structured config file. Flags are ignored if config file is set.")

Expand All @@ -105,21 +107,20 @@ func main() {
config, err = decodeConfig(configData, scheme)
exitOnError(err, "failed to decode config file")

// TODO: remove globally-scoped variables
ray.ForcedClusterUpgrade = config.ForcedClusterUpgrade
ray.EnableBatchScheduler = config.EnableBatchScheduler
ForcedClusterUpgrade = config.ForcedClusterUpgrade
EnableBatchScheduler = config.EnableBatchScheduler
} else {
config.MetricsAddr = metricsAddr
config.ProbeAddr = probeAddr
config.EnableLeaderElection = &enableLeaderElection
config.LeaderElectionNamespace = leaderElectionNamespace
config.ReconcileConcurrency = reconcileConcurrency
config.WatchNamespace = watchNamespace
config.ForcedClusterUpgrade = ray.ForcedClusterUpgrade
config.ForcedClusterUpgrade = ForcedClusterUpgrade
config.LogFile = logFile
config.LogFileEncoder = logFileEncoder
config.LogStdoutEncoder = logStdoutEncoder
config.EnableBatchScheduler = ray.EnableBatchScheduler
config.EnableBatchScheduler = EnableBatchScheduler
}

stdoutEncoder, err := newLogEncoder(logStdoutEncoder)
Expand Down Expand Up @@ -150,10 +151,10 @@ func main() {
ctrl.SetLogger(k8szap.New(k8szap.UseFlagOptions(&opts)))
}

if ray.ForcedClusterUpgrade {
if ForcedClusterUpgrade {
setupLog.Info("Feature flag forced-cluster-upgrade is enabled.")
}
if ray.EnableBatchScheduler {
if EnableBatchScheduler {
setupLog.Info("Feature flag enable-batch-scheduler is enabled.")
}

Expand Down Expand Up @@ -208,7 +209,11 @@ func main() {
WorkerSidecarContainers: config.WorkerSidecarContainers,
}
ctx := ctrl.SetupSignalHandler()
exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(mgr, config.ReconcileConcurrency),
ctx = ray.WithClusterConfig(ctx, ray.ClusterConfig{
ForcedClusterUpgrade: config.ForcedClusterUpgrade,
EnableBatchScheduler: config.EnableBatchScheduler,
})
exitOnError(ray.NewReconciler(ctx, mgr, rayClusterOptions).SetupWithManager(ctx, mgr, config.ReconcileConcurrency),
Comment on lines +212 to +216
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing state by values into context.Context is considered an anti-pattern by some people given its unstructured nature.

What about passing the configapi.Configuration struct (or fields) instead where it's needed, which is rather the idiomatic approach operators take in that case. It'll also avoid the need to declare another configuration struct like ray.ClusterConfig.

Copy link
Author

@skonto skonto Mar 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Passing state by values into context.Context is considered an anti-pattern by some people given its unstructured nature.

Not sure to what extent it is so. I come from the Knative project where we actually use it this way to propagate values a lot. Knative follows the K8s project practices and I have seen this pattern used in that code base too, for example.

What about passing the configapi.Configuration

Ok I can try passing the configuration and see if that is available where we need it and take it from there. In general with any option there are pros and cons.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can see why this may be debated. In essence some people consider contexts should be primarily used for cancellations and propagating termination signals, and only on last resort for storing values. An example of such discussion can be found at https://www.reddit.com/r/golang/comments/10qf73m/context_api_and_best_practices_of_using_it/.

In the particular case of that PR, I still fail to see the benefit of relying on context, over passing the configapi.Configuration object as an argument.

Copy link
Author

@skonto skonto Mar 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still fail to see the benefit of relying on context

The generic benefit is not having to change the func signature, allowing passing data down the call chain easier in the future even if they are not a good fit for the config object etc. Of course there are downsides and probably here it is an early optimization. In any case I will adapt the PR to use an argument.

"unable to create controller", "controller", "RayCluster")
exitOnError(ray.NewRayServiceReconciler(ctx, mgr, utils.GetRayDashboardClient, utils.GetRayHttpProxyClient).SetupWithManager(mgr),
"unable to create controller", "controller", "RayService")
Expand Down
Loading