Skip to content

Commit

Permalink
operator: Refactor config generation and properly sync desired config…
Browse files Browse the repository at this point in the history
… including rebalance

Signed-off-by: Aaron Wilson <[email protected]>
  • Loading branch information
aaronnw committed Sep 18, 2024
1 parent b9fe848 commit 35c0248
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 194 deletions.
29 changes: 15 additions & 14 deletions operator/api/v1beta1/aisconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,9 @@
package v1beta1

import (
"crypto/sha256"
"encoding/hex"

aisapc "github.com/NVIDIA/aistore/api/apc"
aiscmn "github.com/NVIDIA/aistore/cmn"
"github.com/NVIDIA/aistore/cmn/cos"
jsoniter "github.com/json-iterator/go"
)

// NOTE: `*ToUpdate` structures are duplicates of `*ToUpdate` structs from AIStore main repository.
Expand Down Expand Up @@ -215,21 +212,25 @@ type (
}
)

// UpdateRebalanceEnabled Sets the rebalance config to include the default value for `Rebalance.Enabled`
func (c *ConfigToUpdate) UpdateRebalanceEnabled(rebEnabled *bool) {
func (c *ConfigToUpdate) IsRebalanceEnabledSet() bool {
if c.Rebalance == nil {
c.Rebalance = &RebalanceConfToUpdate{}
return false
}
// Allows for other rebalance config settings to be set, but ensures enabled is always included
if c.Rebalance.Enabled == nil {
c.Rebalance.Enabled = rebEnabled
return c.Rebalance.Enabled != nil
}

func (c *ConfigToUpdate) UpdateRebalanceEnabled(enabled *bool) {
if c.Rebalance == nil {
c.Rebalance = &RebalanceConfToUpdate{}
}
c.Rebalance.Enabled = enabled
}

func (c *ConfigToUpdate) Hash() string {
data, _ := jsoniter.Marshal(c)
hash := sha256.Sum256(data)
return hex.EncodeToString(hash[:])
func (c *ConfigToUpdate) EnableAuth() {
if c.Auth == nil {
c.Auth = &AuthConfToUpdate{}
}
c.Auth.Enabled = aisapc.Ptr(true)
}

func (c *ConfigToUpdate) Convert() (toUpdate *aiscmn.ConfigToSet, err error) {
Expand Down
35 changes: 11 additions & 24 deletions operator/api/v1beta1/aistore_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,15 @@ const (
ConditionCreated ClusterConditionType = "Created"
// ConditionReady indicates the cluster is fully operational and ready for use.
ConditionReady ClusterConditionType = "Ready"
// ConditionReadyRebalance indicates whether the cluster should allow rebalance as determined by spec or default config.
ConditionReadyRebalance ClusterConditionType = "ReadyRebalance"
)

// These are reasons for a AIStore's transition to a condition.
const (
ReasonUpgrading ClusterConditionReason = "Upgrading"
ReasonScaling ClusterConditionReason = "Scaling"
ReasonShutdown ClusterConditionReason = "Shutdown"
)

// Helper constants.
Expand Down Expand Up @@ -299,6 +303,8 @@ func (ais *AIStore) SetCondition(conditionType ClusterConditionType) {
msg = "Success creating AIS cluster"
case ConditionReady:
msg = "Cluster is ready"
case ConditionReadyRebalance:
msg = "Cluster is ready to rebalance"
}
ais.AddOrUpdateCondition(&metav1.Condition{
Type: string(conditionType),
Expand All @@ -308,15 +314,15 @@ func (ais *AIStore) SetCondition(conditionType ClusterConditionType) {
})
}

// UnsetConditionReady add/updates condition setting type `Ready` to `False`.
// SetConditionFalse updates the given condition's status to `False`
// - `reason` - tag why the condition is being set to `False`.
// - `message` - a human-readable message indicating details about state change.
func (ais *AIStore) UnsetConditionReady(reason ClusterConditionReason, message string) {
// - `msg` - a human-readable message indicating details about state change.
func (ais *AIStore) SetConditionFalse(conditionType ClusterConditionType, reason ClusterConditionReason, msg string) {
ais.AddOrUpdateCondition(&metav1.Condition{
Type: string(ConditionReady),
Type: string(conditionType),
Status: metav1.ConditionFalse,
Reason: string(reason),
Message: message,
Message: msg,
})
}

Expand Down Expand Up @@ -409,25 +415,6 @@ func (ais *AIStore) ShouldCleanupMetadata() bool {
return ais.Spec.CleanupMetadata != nil && *ais.Spec.CleanupMetadata
}

// UpdateDefaultRebalance Ensures the defined config from the spec includes the default value for rebalance enabled if
// one is not already set
func (ais *AIStore) UpdateDefaultRebalance(rebEnabled bool) {
if ais.Spec.ConfigToUpdate == nil {
ais.Spec.ConfigToUpdate = &ConfigToUpdate{}
}
ais.Spec.ConfigToUpdate.UpdateRebalanceEnabled(aisapc.Ptr(rebEnabled))
}

func (ais *AIStore) DisableRebalance() {
if ais.Spec.ConfigToUpdate == nil {
ais.Spec.ConfigToUpdate = &ConfigToUpdate{}
}
if ais.Spec.ConfigToUpdate.Rebalance == nil {
ais.Spec.ConfigToUpdate.Rebalance = &RebalanceConfToUpdate{}
}
ais.Spec.ConfigToUpdate.Rebalance.Enabled = aisapc.Ptr(false)
}

func (ais *AIStore) AllowTargetSharedNodes() bool {
allowSharedNodes := ais.Spec.TargetSpec.DisablePodAntiAffinity != nil && *ais.Spec.TargetSpec.DisablePodAntiAffinity
//nolint:all
Expand Down
2 changes: 1 addition & 1 deletion operator/pkg/controllers/cluster_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func (r *AIStoreReconciler) cleanup(ctx context.Context, ais *aisv1.AIStore) (up
if err != nil {
return
}
err = r.updateStatus(ctx, ais, aisv1.HostCleanup)
err = r.updateStatusWithState(ctx, ais, aisv1.HostCleanup)
if err != nil {
return
}
Expand Down
108 changes: 64 additions & 44 deletions operator/pkg/controllers/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,6 @@ func NewAISReconcilerFromMgr(mgr manager.Manager, logger logr.Logger, isExternal
func (r *AIStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := r.log.WithValues("namespace", req.Namespace, "name", req.Name)
ctx = logf.IntoContext(ctx, logger)
logger.Info("Reconciling AIStore")

ais, err := r.client.GetAIStoreCR(ctx, req.NamespacedName)
if err != nil {
if k8serrors.IsNotFound(err) {
Expand All @@ -115,6 +113,7 @@ func (r *AIStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
logger.Error(err, "Unable to fetch AIStore")
return reconcile.Result{}, err
}
logger.Info("Reconciling AIStore", "state", ais.Status.State)

if ais.HasState("") {
if err := r.initializeCR(ctx, ais); err != nil {
Expand All @@ -123,19 +122,26 @@ func (r *AIStoreReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct
}

if ais.ShouldDecommission() {
err = r.updateStatus(ctx, ais, aisv1.ClusterDecommissioning)
err = r.updateStatusWithState(ctx, ais, aisv1.ClusterDecommissioning)
if err != nil {
return reconcile.Result{}, err
}
r.recorder.Event(ais, corev1.EventTypeNormal, EventReasonDeleted, "Decommissioning...")
}

if ais.ShouldStartShutdown() {
// TODO: AIS should handle this
logger.Info("Disabling rebalance before shutting down cluster")
err = r.disableRebalance(ctx, ais, aisv1.ReasonShutdown, "Disabling rebalance before shutdown")
if err != nil {
logger.Error(err, "Failed to disable rebalance before shutdown")
return reconcile.Result{}, err
}
if err = r.attemptGracefulShutdown(ctx, ais); err != nil {
logger.Error(err, "Graceful shutdown failed")
return reconcile.Result{}, err
}
err = r.updateStatus(ctx, ais, aisv1.ClusterShuttingDown)
err = r.updateStatusWithState(ctx, ais, aisv1.ClusterShuttingDown)
if err != nil {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -184,7 +190,7 @@ func (r *AIStoreReconciler) initializeCR(ctx context.Context, ais *aisv1.AIStore

logger.Info("Updating state and setting condition", "state", aisv1.ConditionInitialized)
ais.SetCondition(aisv1.ConditionInitialized)
err = r.updateStatus(ctx, ais, aisv1.ClusterInitialized)
err = r.updateStatusWithState(ctx, ais, aisv1.ClusterInitialized)
if err != nil {
logger.Error(err, "Failed to update state", "state", aisv1.ConditionInitialized)
return err
Expand Down Expand Up @@ -218,7 +224,7 @@ func (r *AIStoreReconciler) shutdownCluster(ctx context.Context, ais *aisv1.AISt
if _, err = r.client.UpdateStatefulSetReplicas(ctx, target.StatefulSetNSName(ais), 0); err != nil {
return reconcile.Result{}, err
}
err = r.updateStatus(ctx, ais, aisv1.ClusterShutdown)
err = r.updateStatusWithState(ctx, ais, aisv1.ClusterShutdown)
if err != nil {
logger.Error(err, "Failed to update state", "state", aisv1.ClusterShutdown)
return reconcile.Result{}, err
Expand All @@ -237,7 +243,7 @@ func (r *AIStoreReconciler) decommissionCluster(ctx context.Context, ais *aisv1.
}
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
}
err := r.updateStatus(ctx, ais, aisv1.ClusterCleanup)
err := r.updateStatusWithState(ctx, ais, aisv1.ClusterCleanup)
if err != nil {
logger.Error(err, "Failed to update state", "state", aisv1.ClusterCleanup)
return reconcile.Result{}, err
Expand All @@ -263,7 +269,7 @@ func (r *AIStoreReconciler) cleanupClusterRes(ctx context.Context, ais *aisv1.AI
// It is better to delay the requeue little bit since cleanup can take some time.
return reconcile.Result{RequeueAfter: 10 * time.Second}, nil
}
err = r.updateStatus(ctx, ais, aisv1.HostCleanup)
err = r.updateStatusWithState(ctx, ais, aisv1.HostCleanup)
return reconcile.Result{}, err
}

Expand All @@ -283,7 +289,7 @@ func (r *AIStoreReconciler) cleanupHost(ctx context.Context, ais *aisv1.AIStore)
return reconcile.Result{Requeue: true}, nil
}
// If all are gone, move to finalized stage
err = r.updateStatus(ctx, ais, aisv1.ClusterFinalized)
err = r.updateStatusWithState(ctx, ais, aisv1.ClusterFinalized)
if err != nil {
return reconcile.Result{}, err
}
Expand Down Expand Up @@ -378,13 +384,6 @@ func (r *AIStoreReconciler) attemptGracefulShutdown(ctx context.Context, ais *ai
if err != nil {
return err
}
// TODO AIS should handle this
logger.Info("Disabling rebalance before shutting down cluster")
err = r.disableRebalance(ctx, ais)
if err != nil {
logger.Error(err, "Failed to disable rebalance before shutdown")
return err
}
logger.Info("Attempting graceful shutdown of cluster")
err = aisapi.ShutdownCluster(*params)
if err != nil {
Expand Down Expand Up @@ -461,12 +460,12 @@ func (r *AIStoreReconciler) ensurePrereqs(ctx context.Context, ais *aisv1.AIStor
// To ensure correct behavior of cluster, we requeue the reconciler till we have all the external IPs.
if !proxyReady {
if !ais.HasState(aisv1.ClusterInitializingLBService) && !ais.HasState(aisv1.ClusterPendingLBService) {
err = r.updateStatus(ctx, ais, aisv1.ClusterInitializingLBService)
err = r.updateStatusWithState(ctx, ais, aisv1.ClusterInitializingLBService)
if err == nil {
r.recorder.Event(ais, corev1.EventTypeNormal, EventReasonInitialized, "Successfully initialized LoadBalancer service")
}
} else {
err = r.updateStatus(ctx, ais, aisv1.ClusterPendingLBService)
err = r.updateStatusWithState(ctx, ais, aisv1.ClusterPendingLBService)
if err == nil {
r.recorder.Eventf(
ais, corev1.EventTypeNormal, EventReasonWaiting,
Expand Down Expand Up @@ -505,7 +504,7 @@ func (r *AIStoreReconciler) bootstrapNew(ctx context.Context, ais *aisv1.AIStore
}

ais.SetCondition(aisv1.ConditionCreated)
err = r.updateStatus(ctx, ais, aisv1.ClusterCreated)
err = r.updateStatusWithState(ctx, ais, aisv1.ClusterCreated)
if err != nil {
return
}
Expand Down Expand Up @@ -540,6 +539,13 @@ func (r *AIStoreReconciler) handleCREvents(ctx context.Context, ais *aisv1.AISto
if err != nil {
return
}
// Enables the rebalance condition (still respects the spec desired rebalance.Enabled property)
err = r.enableRebalanceCondition(ctx, ais)
if err != nil {
logf.FromContext(ctx).Error(err, "Failed to enable rebalance condition")
return
}

if err = r.handleConfigState(ctx, ais); err != nil {
goto requeue
}
Expand All @@ -549,40 +555,39 @@ func (r *AIStoreReconciler) handleCREvents(ctx context.Context, ais *aisv1.AISto
requeue:
// We requeue till the AIStore cluster becomes ready.
if ais.IsConditionTrue(aisv1.ConditionReady) {
ais.UnsetConditionReady(aisv1.ReasonUpgrading, "Waiting for cluster to upgrade")
err = r.updateStatus(ctx, ais, aisv1.ClusterUpgrading)
ais.SetConditionFalse(aisv1.ConditionReady, aisv1.ReasonUpgrading, "Waiting for cluster to upgrade")
err = r.updateStatusWithState(ctx, ais, aisv1.ClusterUpgrading)
}
return
}

// handleConfigState properly reconciles any changes in `.spec.configToUpdate` field.
// handleConfigState properly reconciles the AIS cluster config with the `.spec.configToUpdate` field and any other custom config changes
//
// The ConfigMap that also contains the value of this field is updated earlier, but
// this synchronizes any changes to the active loaded config in the cluster.
func (r *AIStoreReconciler) handleConfigState(ctx context.Context, ais *aisv1.AIStore) error {
// Special case for rebalance since we may disable it -- always include `Rebalance.Enabled` in the sync
ais.UpdateDefaultRebalance(cmn.DefaultAISConf(ctx, ais).GetRebalanceEnabled())
return r.syncConfig(ctx, ais)
}

// syncConfig synchronizes the active cluster config with the desired config in `ais.Spec.ConfigToUpdate`
func (r *AIStoreReconciler) syncConfig(ctx context.Context, ais *aisv1.AIStore) error {
logger := logf.FromContext(ctx)
currentHash := ais.Spec.ConfigToUpdate.Hash()
// Get the config provided in spec plus any additional ones we want to set
desiredConf, err := cmn.GenerateConfigToSet(ctx, ais)
if err != nil {
return err
}
currentHash, err := cmn.HashConfigToSet(desiredConf)
if err != nil {
logger.Error(err, "Error generating hash for desired config")
return err
}
if ais.Annotations[configHashAnnotation] == currentHash {
logger.Info("Global config hash matches last applied config")
return nil
}
// Update cluster config based on what we have in the CRD spec.
baseParams, err := r.getAPIParams(ctx, ais)
if err != nil {
return err
}
configToSet, err := ais.Spec.ConfigToUpdate.Convert()
if err != nil {
return err
}
logger.Info("Updating cluster config to match spec via API")
err = aisapi.SetClusterConfigUsingMsg(*baseParams, configToSet, false /*transient*/)
err = aisapi.SetClusterConfigUsingMsg(*baseParams, desiredConf, false /*transient*/)
if err != nil {
logger.Error(err, "Failed to update cluster config")
return err
Expand Down Expand Up @@ -635,22 +640,37 @@ func (r *AIStoreReconciler) createOrUpdateRBACResources(ctx context.Context, ais
return
}

func (r *AIStoreReconciler) disableRebalance(ctx context.Context, ais *aisv1.AIStore) error {
// Disable in spec and synchronize with cluster
ais.DisableRebalance()
return r.syncConfig(ctx, ais)
func (r *AIStoreReconciler) disableRebalance(ctx context.Context, ais *aisv1.AIStore, reason aisv1.ClusterConditionReason, msg string) error {
logf.FromContext(ctx).Info("Disabling rebalance condition")
ais.SetConditionFalse(aisv1.ConditionReadyRebalance, reason, msg)
err := r.patchStatus(ctx, ais)
if err != nil {
return err
}
// Also disable in the live cluster (don't wait for config sync)
// This function will update the annotation so future reconciliations can tell the config has been updated
return r.handleConfigState(ctx, ais)
}

func (r *AIStoreReconciler) updateStatus(ctx context.Context, ais *aisv1.AIStore, state aisv1.ClusterState) error {
logger := logf.FromContext(ctx)
logger.Info("Updating AIS state", "state", state)
func (r *AIStoreReconciler) enableRebalanceCondition(ctx context.Context, ais *aisv1.AIStore) error {
logf.FromContext(ctx).Info("Enabling rebalance condition")
// Note this does not force-enable rebalance, only allows the value from spec to be used again
ais.SetCondition(aisv1.ConditionReadyRebalance)
return r.patchStatus(ctx, ais)
}

func (r *AIStoreReconciler) updateStatusWithState(ctx context.Context, ais *aisv1.AIStore, state aisv1.ClusterState) error {
logf.FromContext(ctx).Info("Updating AIS state", "state", state)
ais.SetState(state)
return r.patchStatus(ctx, ais)
}

func (r *AIStoreReconciler) patchStatus(ctx context.Context, ais *aisv1.AIStore) error {
patchBytes, err := json.Marshal(map[string]interface{}{
"status": ais.Status,
})
if err != nil {
logger.Error(err, "Failed to marshal AIS status")
logf.FromContext(ctx).Error(err, "Failed to marshal AIS status")
return err
}
patch := k8sclient.RawPatch(types.MergePatchType, patchBytes)
Expand Down Expand Up @@ -720,7 +740,7 @@ func (r *AIStoreReconciler) handleSuccessfulReconcile(ctx context.Context, ais *
needsUpdate = true
}
if needsUpdate {
err = r.updateStatus(ctx, ais, aisv1.ClusterReady)
err = r.updateStatusWithState(ctx, ais, aisv1.ClusterReady)
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion operator/pkg/controllers/proxy_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (r *AIStoreReconciler) handleProxyImage(ctx context.Context, ais *aisv1.AIS
logger.Error(err, "failed to set primary proxy")
return false, err
}
logger.Info("updated primary to pod " + firstPodName)
logger.Info("Updated primary to pod " + firstPodName)
ss.Spec.UpdateStrategy = appsv1.StatefulSetUpdateStrategy{
Type: appsv1.RollingUpdateStatefulSetStrategyType,
RollingUpdate: &appsv1.RollingUpdateStatefulSetStrategy{
Expand Down
Loading

0 comments on commit 35c0248

Please sign in to comment.