Skip to content

Commit

Permalink
feat: graceful group member change
Browse files Browse the repository at this point in the history
Signed-off-by: Xuhui zhang <[email protected]>
  • Loading branch information
zxh326 committed Nov 18, 2024
1 parent 15326cc commit 849701c
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 102 deletions.
12 changes: 8 additions & 4 deletions api/v1/cachegroup_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,17 +145,21 @@ type CacheGroupStatus struct {
Phase CacheGroupPhase `json:"phase,omitempty"`
Conditions []CacheGroupCondition `json:"conditions,omitempty"`

ReadyWorker int32 `json:"readyWorker,omitempty"`
ExpectWorker int32 `json:"expectWorker,omitempty"`
ReadyStr string `json:"readyStr,omitempty"`
CacheGroup string `json:"cacheGroup,omitempty"`
ReadyWorker int32 `json:"readyWorker,omitempty"`
BackUpWorker int32 `json:"backUpWorker,omitempty"`
WaitingDeletedWorker int32 `json:"waitingDeletedWorker,omitempty"`
ExpectWorker int32 `json:"expectWorker,omitempty"`
ReadyStr string `json:"readyStr,omitempty"`
CacheGroup string `json:"cacheGroup,omitempty"`
}

// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=cg
// +kubebuilder:printcolumn:name="Cache Group",type="string",JSONPath=".status.cacheGroup"
// +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase"
// +kubebuilder:printcolumn:name="Back up",type="string",JSONPath=".status.backUpWorker"
// +kubebuilder:printcolumn:name="Waiting Deleted",type="string",JSONPath=".status.WaitingDeletedWorker"
// +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.readyStr"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
// CacheGroup is the Schema for the cachegroups API
Expand Down
12 changes: 12 additions & 0 deletions config/crd/bases/juicefs.io_cachegroups.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ spec:
- jsonPath: .status.phase
name: Phase
type: string
- jsonPath: .status.backUpWorker
name: Back up
type: string
- jsonPath: .status.WaitingDeletedWorker
name: Waiting Deleted
type: string
- jsonPath: .status.readyStr
name: Ready
type: string
Expand Down Expand Up @@ -2862,6 +2868,9 @@ spec:
type: object
status:
properties:
backUpWorker:
format: int32
type: integer
cacheGroup:
type: string
conditions:
Expand Down Expand Up @@ -2890,6 +2899,9 @@ spec:
readyWorker:
format: int32
type: integer
waitingDeletedWorker:
format: int32
type: integer
type: object
type: object
served: true
Expand Down
4 changes: 2 additions & 2 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: controller
newTag: latest
newName: registry.zzde.me/juicefs-cache-group-operator
newTag: v0.2.5
153 changes: 115 additions & 38 deletions internal/controller/cachegroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (r *CacheGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err := r.Get(ctx, req.NamespacedName, cg); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
l.V(1).Info("Reconcile CacheGroup")
l.V(1).Info("start reconciler")
if cg.ObjectMeta.DeletionTimestamp.IsZero() {
if !controllerutil.ContainsFinalizer(cg, common.Finalizer) {
controllerutil.AddFinalizer(cg, common.Finalizer)
Expand Down Expand Up @@ -103,7 +103,13 @@ func (r *CacheGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request)
l.Error(err, "failed to sync cache group workers")
return ctrl.Result{}, err
}

l.V(1).Info("reconciler done")
if cg.Status.BackUpWorker > 0 || cg.Status.WaitingDeletedWorker > 0 {
return ctrl.Result{
Requeue: true,
RequeueAfter: 1 * time.Minute,
}, nil
}
return ctrl.Result{}, nil
}

Expand All @@ -114,7 +120,6 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr
wg := sync.WaitGroup{}
errCh := make(chan error, 2*maxUnavailable)
numUnavailable := 0
log.V(1).Info("start to sync cache group workers", "updateStrategy", updateStrategyType, "maxUnavailable", maxUnavailable)
// TODO: add a webook to validate the cache group worker template
secret := &corev1.Secret{}
if err := r.Get(ctx, client.ObjectKey{Namespace: cg.Namespace, Name: cg.Spec.SecretRef.Name}, secret); err != nil {
Expand All @@ -125,29 +130,28 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr
log.Error(err, "failed to validate secret")
return err
}
log.V(1).Info("sync worker to expect states", "expectStates", len(expectStates))
for node, expectState := range expectStates {
actualState, err := r.getActualState(ctx, cg, node)
if err != nil && !apierrors.IsNotFound(err) {
log.Error(err, "failed to get actual state", "node", node)
continue
}
podBuilder := builder.NewPodBuilder(cg, secret, node, expectState)
backUpWorker := r.asBackupWorkerOrNot(cg, actualState)
podBuilder := builder.NewPodBuilder(cg, secret, node, expectState, backUpWorker)
expectWorker := podBuilder.NewCacheGroupWorker(ctx)
hash := utils.GenHash(expectWorker)
expectWorker.Annotations[common.LabelWorkerHash] = hash

if r.actualShouldbeUpdate(updateStrategyType, expectWorker, actualState) {
if numUnavailable >= maxUnavailable {
log.V(1).Info("maxUnavailable reached, skip updating cache group worker, waiting for next reconciler", "worker", expectWorker.Name)
log.V(1).Info("maxUnavailable reached, skip updating worker, waiting for next reconciler", "worker", expectWorker.Name)
break
}
wg.Add(1)
numUnavailable++
go func() {
defer wg.Done()
log.Info("cache group worker need to be updated", "worker", expectWorker.Name)
if err := r.createOrUpdateWorker(ctx, actualState, expectWorker); err != nil {
log.Error(err, "failed to create or update cache group worker", "worker", expectWorker.Name)
log.Error(err, "failed to create or update worker", "worker", expectWorker.Name)
errCh <- err
return
}
Expand All @@ -158,6 +162,8 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr
return
}
}()
} else {
log.V(1).Info("worker is up to date", "worker", expectWorker.Name)
}
}
wg.Wait()
Expand Down Expand Up @@ -237,7 +243,9 @@ func (r *CacheGroupReconciler) getActualState(ctx context.Context, cg *juicefsio
}

func (r *CacheGroupReconciler) createOrUpdateWorker(ctx context.Context, actual, expect *corev1.Pod) error {
log := log.FromContext(ctx).WithValues("worker", expect.Name)
if actual == nil {
log.Info("create worker")
return r.createCacheGroupWorker(ctx, expect)
}
return r.updateCacheGroupWorker(ctx, actual, expect)
Expand All @@ -255,37 +263,41 @@ func (r *CacheGroupReconciler) createCacheGroupWorker(ctx context.Context, expec
}

func (r *CacheGroupReconciler) updateCacheGroupWorker(ctx context.Context, oldWorker, expectWorker *corev1.Pod) error {
log := log.FromContext(ctx)
worker := corev1.Pod{}
if err := r.Get(ctx, client.ObjectKey{Namespace: oldWorker.Namespace, Name: oldWorker.Name}, &worker); err != nil && !apierrors.IsNotFound(err) {
log := log.FromContext(ctx).WithValues("worker", expectWorker.Name)
log.Info("worker spec changed, delete old and create new one")
if err := r.deleteCacheGroupWorker(ctx, oldWorker, true); err != nil {
log.Error(err, "failed to delete old worker")
return err
}
err := r.Delete(ctx, &worker)
log.Info("old worker deleted, created new one")
return r.Create(ctx, expectWorker)
}

// deleteCacheGroupWorker deletes a cache group worker pod. If the `waiting` is true,
// it waits until the pod is confirmed to be deleted or a timeout occurs.
func (r *CacheGroupReconciler) deleteCacheGroupWorker(ctx context.Context, worker *corev1.Pod, waiting bool) error {
err := r.Delete(ctx, worker)
if err != nil {
if !apierrors.IsNotFound(err) {
log.Error(err, "failed to delete old cache group worker", "worker", worker.Name)
return err
}
}
// wait for the old worker to be deleted
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
for {
err := r.Get(ctx, client.ObjectKey{Namespace: oldWorker.Namespace, Name: oldWorker.Name}, &worker)
if apierrors.IsNotFound(err) {
break
return nil
}
if ctx.Err() == context.Canceled || ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("timeout waiting for old cache group worker to be deleted")
return err
}
if waiting {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
for {
err := r.Get(ctx, client.ObjectKey{Namespace: worker.Namespace, Name: worker.Name}, worker)
if apierrors.IsNotFound(err) {
break
}
if ctx.Err() == context.Canceled || ctx.Err() == context.DeadlineExceeded {
return fmt.Errorf("timeout waiting for worker to be deleted")
}
time.Sleep(time.Second)
}
time.Sleep(time.Second)
}
log.Info("old cache group worker deleted, created new one", "worker", expectWorker.Name)
return r.Create(ctx, expectWorker)
}

func (r *CacheGroupReconciler) deleteCacheGroupWorker(ctx context.Context, worker *corev1.Pod) error {
return client.IgnoreNotFound(r.Delete(ctx, worker))
return nil
}

func (r *CacheGroupReconciler) listActualWorkers(ctx context.Context, cg *juicefsiov1.CacheGroup) ([]corev1.Pod, error) {
Expand All @@ -296,23 +308,76 @@ func (r *CacheGroupReconciler) listActualWorkers(ctx context.Context, cg *juicef
return workers.Items, nil
}

// removeRedundantWorkers deletes the redundant workers
// if the worker still has cache blocks, tweak the group weight to zero, waiting for data redistribution, then delete it
// if the worker has no cache blocks, delete it directly
// if the worker not ready delete it directly
func (r *CacheGroupReconciler) removeRedundantWorkers(
ctx context.Context,
expectStates map[string]juicefsiov1.CacheGroupWorkerTemplate,
actualWorks []corev1.Pod) error {
log := log.FromContext(ctx)
for _, worker := range actualWorks {
if worker.DeletionTimestamp != nil {
continue
}
if _, ok := expectStates[worker.Spec.NodeName]; !ok {
log.Info("found redundant cache group worker, delete it", "worker", worker.Name)
if err := r.deleteCacheGroupWorker(ctx, &worker); err != nil {
log.Error(err, "failed to delete cache group worker", "worker", worker.Name)
return err
// if the worker in delete state,
cacheBytes, err := utils.GetWorkerCacheBlocksBytes(ctx, worker, common.MountPoint)
if err != nil {
log.Error(err, "failed to get worker cache blocks bytes", "worker", worker.Name)
continue
}
if cacheBytes > 0 {
log.Info("found redundant worker, but it still has cache blocks, tweak the group weight to zero", "worker", worker.Name)
if err := r.gracefulShutdownWorker(ctx, &worker); err != nil {
log.Error(err, "failed to graceful shutdown worker", "worker", worker)
}
} else {
log.Info("found redundant worker, delete it", "worker", worker.Name)
if err := r.deleteCacheGroupWorker(ctx, &worker, false); err != nil {
log.Error(err, "failed to delete worker", "worker", worker.Name)
return err
}
}
}
}
return nil
}

// change pod options `group-weight` to zero, delete and recreate the worker pod
func (r *CacheGroupReconciler) gracefulShutdownWorker(ctx context.Context, worker *corev1.Pod) error {
if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok {
return nil
}
if err := r.deleteCacheGroupWorker(ctx, worker, true); err != nil {
return err
}
builder.UpdateWorkerGroupWeight(worker, 0)
worker.ResourceVersion = ""
worker.Annotations[common.AnnoWaitingDeleteWorker] = time.Now().Format(time.RFC3339)
if err := r.Create(ctx, worker); err != nil {
return err
}
return nil
}

func (r *CacheGroupReconciler) asBackupWorkerOrNot(cg *juicefsiov1.CacheGroup, actual *corev1.Pod) bool {
// If it is a new node and there are already 2 or more worker nodes
// then this node is a backup worker.
if actual == nil {
return cg.Status.ReadyWorker >= 2
}
// If this node has been acting as a backup node for 10 minutes
// then this node is a normal worker.
if v, ok := actual.Annotations[common.AnnoBackupWorker]; ok {
backupAt := utils.MustParseTime(v)
// TODO: 10 minutes should be configurable
return time.Since(backupAt) < 10*time.Minute
}
return false
}

func (r *CacheGroupReconciler) calculateStatus(
cg *juicefsiov1.CacheGroup,
expectStates map[string]juicefsiov1.CacheGroupWorkerTemplate,
Expand All @@ -323,8 +388,20 @@ func (r *CacheGroupReconciler) calculateStatus(
newStatus.Phase = juicefsiov1.CacheGroupPhaseWaiting
return newStatus
}
backupWorker := 0
waitingDeletedWorker := 0
for _, worker := range actualWorks {
if _, ok := worker.Annotations[common.AnnoBackupWorker]; ok {
backupWorker++
}
if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok {
waitingDeletedWorker++
}
}
newStatus.ReadyWorker = int32(len(actualWorks))
newStatus.ExpectWorker = int32(len(expectStates))
newStatus.BackUpWorker = int32(backupWorker)
newStatus.WaitingDeletedWorker = int32(waitingDeletedWorker)
newStatus.ReadyStr = fmt.Sprintf("%d/%d", newStatus.ReadyWorker, newStatus.ExpectWorker)
if newStatus.ExpectWorker != newStatus.ReadyWorker {
newStatus.Phase = juicefsiov1.CacheGroupPhaseProgressing
Expand Down Expand Up @@ -353,7 +430,7 @@ func (r *CacheGroupReconciler) waitForWorkerReady(ctx context.Context, cg *juice
return err
}
if utils.IsPodReady(worker) && utils.IsMountPointReady(ctx, worker, common.MountPoint) {
log.Info("cache group worker is ready", "worker", worker.Name)
log.Info("worker is ready", "worker", worker.Name)
return nil
}
time.Sleep(time.Second)
Expand Down
Loading

0 comments on commit 849701c

Please sign in to comment.