From 849701c0d652679c4d500a5fd63aa03a3fde485f Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Mon, 18 Nov 2024 12:02:35 +0800 Subject: [PATCH 01/16] feat: graceful group member change Signed-off-by: Xuhui zhang --- api/v1/cachegroup_types.go | 12 +- config/crd/bases/juicefs.io_cachegroups.yaml | 12 ++ config/manager/kustomization.yaml | 4 +- internal/controller/cachegroup_controller.go | 153 ++++++++++++++----- pkg/builder/pod.go | 58 +++++-- pkg/builder/pod_test.go | 43 ++++++ pkg/common/common.go | 3 + pkg/utils/pod.go | 89 ++++++----- pkg/utils/terminal.go | 72 +++++++++ pkg/utils/utils.go | 9 ++ 10 files changed, 353 insertions(+), 102 deletions(-) create mode 100644 pkg/utils/terminal.go diff --git a/api/v1/cachegroup_types.go b/api/v1/cachegroup_types.go index a0c142e..44f94c3 100644 --- a/api/v1/cachegroup_types.go +++ b/api/v1/cachegroup_types.go @@ -145,10 +145,12 @@ type CacheGroupStatus struct { Phase CacheGroupPhase `json:"phase,omitempty"` Conditions []CacheGroupCondition `json:"conditions,omitempty"` - ReadyWorker int32 `json:"readyWorker,omitempty"` - ExpectWorker int32 `json:"expectWorker,omitempty"` - ReadyStr string `json:"readyStr,omitempty"` - CacheGroup string `json:"cacheGroup,omitempty"` + ReadyWorker int32 `json:"readyWorker,omitempty"` + BackUpWorker int32 `json:"backUpWorker,omitempty"` + WaitingDeletedWorker int32 `json:"waitingDeletedWorker,omitempty"` + ExpectWorker int32 `json:"expectWorker,omitempty"` + ReadyStr string `json:"readyStr,omitempty"` + CacheGroup string `json:"cacheGroup,omitempty"` } // +kubebuilder:object:root=true @@ -156,6 +158,8 @@ type CacheGroupStatus struct { // +kubebuilder:resource:shortName=cg // +kubebuilder:printcolumn:name="Cache Group",type="string",JSONPath=".status.cacheGroup" // +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase" +// +kubebuilder:printcolumn:name="Back up",type="string",JSONPath=".status.backUpWorker" +// +kubebuilder:printcolumn:name="Waiting Deleted",type="string",JSONPath=".status.WaitingDeletedWorker" // +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.readyStr" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // CacheGroup is the Schema for the cachegroups API diff --git a/config/crd/bases/juicefs.io_cachegroups.yaml b/config/crd/bases/juicefs.io_cachegroups.yaml index 9086085..b7cfaab 100644 --- a/config/crd/bases/juicefs.io_cachegroups.yaml +++ b/config/crd/bases/juicefs.io_cachegroups.yaml @@ -23,6 +23,12 @@ spec: - jsonPath: .status.phase name: Phase type: string + - jsonPath: .status.backUpWorker + name: Back up + type: string + - jsonPath: .status.WaitingDeletedWorker + name: Waiting Deleted + type: string - jsonPath: .status.readyStr name: Ready type: string @@ -2862,6 +2868,9 @@ spec: type: object status: properties: + backUpWorker: + format: int32 + type: integer cacheGroup: type: string conditions: @@ -2890,6 +2899,9 @@ spec: readyWorker: format: int32 type: integer + waitingDeletedWorker: + format: int32 + type: integer type: object type: object served: true diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index ad13e96..97db9d0 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: controller - newTag: latest + newName: registry.zzde.me/juicefs-cache-group-operator + newTag: v0.2.5 diff --git a/internal/controller/cachegroup_controller.go b/internal/controller/cachegroup_controller.go index 3e22f47..7701933 100644 --- a/internal/controller/cachegroup_controller.go +++ b/internal/controller/cachegroup_controller.go @@ -69,7 +69,7 @@ func (r *CacheGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err := r.Get(ctx, req.NamespacedName, cg); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } - l.V(1).Info("Reconcile CacheGroup") + l.V(1).Info("start reconciler") if cg.ObjectMeta.DeletionTimestamp.IsZero() { if !controllerutil.ContainsFinalizer(cg, common.Finalizer) { controllerutil.AddFinalizer(cg, common.Finalizer) @@ -103,7 +103,13 @@ func (r *CacheGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) l.Error(err, "failed to sync cache group workers") return ctrl.Result{}, err } - + l.V(1).Info("reconciler done") + if cg.Status.BackUpWorker > 0 || cg.Status.WaitingDeletedWorker > 0 { + return ctrl.Result{ + Requeue: true, + RequeueAfter: 1 * time.Minute, + }, nil + } return ctrl.Result{}, nil } @@ -114,7 +120,6 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr wg := sync.WaitGroup{} errCh := make(chan error, 2*maxUnavailable) numUnavailable := 0 - log.V(1).Info("start to sync cache group workers", "updateStrategy", updateStrategyType, "maxUnavailable", maxUnavailable) // TODO: add a webook to validate the cache group worker template secret := &corev1.Secret{} if err := r.Get(ctx, client.ObjectKey{Namespace: cg.Namespace, Name: cg.Spec.SecretRef.Name}, secret); err != nil { @@ -125,29 +130,28 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr log.Error(err, "failed to validate secret") return err } + log.V(1).Info("sync worker to expect states", "expectStates", len(expectStates)) for node, expectState := range expectStates { actualState, err := r.getActualState(ctx, cg, node) if err != nil && !apierrors.IsNotFound(err) { log.Error(err, "failed to get actual state", "node", node) continue } - podBuilder := builder.NewPodBuilder(cg, secret, node, expectState) + backUpWorker := r.asBackupWorkerOrNot(cg, actualState) + podBuilder := builder.NewPodBuilder(cg, secret, node, expectState, backUpWorker) expectWorker := podBuilder.NewCacheGroupWorker(ctx) - hash := utils.GenHash(expectWorker) - expectWorker.Annotations[common.LabelWorkerHash] = hash if r.actualShouldbeUpdate(updateStrategyType, expectWorker, actualState) { if numUnavailable >= maxUnavailable { - log.V(1).Info("maxUnavailable reached, skip updating cache group worker, waiting for next reconciler", "worker", expectWorker.Name) + log.V(1).Info("maxUnavailable reached, skip updating worker, waiting for next reconciler", "worker", expectWorker.Name) break } wg.Add(1) numUnavailable++ go func() { defer wg.Done() - log.Info("cache group worker need to be updated", "worker", expectWorker.Name) if err := r.createOrUpdateWorker(ctx, actualState, expectWorker); err != nil { - log.Error(err, "failed to create or update cache group worker", "worker", expectWorker.Name) + log.Error(err, "failed to create or update worker", "worker", expectWorker.Name) errCh <- err return } @@ -158,6 +162,8 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr return } }() + } else { + log.V(1).Info("worker is up to date", "worker", expectWorker.Name) } } wg.Wait() @@ -237,7 +243,9 @@ func (r *CacheGroupReconciler) getActualState(ctx context.Context, cg *juicefsio } func (r *CacheGroupReconciler) createOrUpdateWorker(ctx context.Context, actual, expect *corev1.Pod) error { + log := log.FromContext(ctx).WithValues("worker", expect.Name) if actual == nil { + log.Info("create worker") return r.createCacheGroupWorker(ctx, expect) } return r.updateCacheGroupWorker(ctx, actual, expect) @@ -255,37 +263,41 @@ func (r *CacheGroupReconciler) createCacheGroupWorker(ctx context.Context, expec } func (r *CacheGroupReconciler) updateCacheGroupWorker(ctx context.Context, oldWorker, expectWorker *corev1.Pod) error { - log := log.FromContext(ctx) - worker := corev1.Pod{} - if err := r.Get(ctx, client.ObjectKey{Namespace: oldWorker.Namespace, Name: oldWorker.Name}, &worker); err != nil && !apierrors.IsNotFound(err) { + log := log.FromContext(ctx).WithValues("worker", expectWorker.Name) + log.Info("worker spec changed, delete old and create new one") + if err := r.deleteCacheGroupWorker(ctx, oldWorker, true); err != nil { + log.Error(err, "failed to delete old worker") return err } - err := r.Delete(ctx, &worker) + log.Info("old worker deleted, created new one") + return r.Create(ctx, expectWorker) +} + +// deleteCacheGroupWorker deletes a cache group worker pod. If the `waiting` is true, +// it waits until the pod is confirmed to be deleted or a timeout occurs. +func (r *CacheGroupReconciler) deleteCacheGroupWorker(ctx context.Context, worker *corev1.Pod, waiting bool) error { + err := r.Delete(ctx, worker) if err != nil { - if !apierrors.IsNotFound(err) { - log.Error(err, "failed to delete old cache group worker", "worker", worker.Name) - return err - } - } - // wait for the old worker to be deleted - ctx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - for { - err := r.Get(ctx, client.ObjectKey{Namespace: oldWorker.Namespace, Name: oldWorker.Name}, &worker) if apierrors.IsNotFound(err) { - break + return nil } - if ctx.Err() == context.Canceled || ctx.Err() == context.DeadlineExceeded { - return fmt.Errorf("timeout waiting for old cache group worker to be deleted") + return err + } + if waiting { + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + for { + err := r.Get(ctx, client.ObjectKey{Namespace: worker.Namespace, Name: worker.Name}, worker) + if apierrors.IsNotFound(err) { + break + } + if ctx.Err() == context.Canceled || ctx.Err() == context.DeadlineExceeded { + return fmt.Errorf("timeout waiting for worker to be deleted") + } + time.Sleep(time.Second) } - time.Sleep(time.Second) } - log.Info("old cache group worker deleted, created new one", "worker", expectWorker.Name) - return r.Create(ctx, expectWorker) -} - -func (r *CacheGroupReconciler) deleteCacheGroupWorker(ctx context.Context, worker *corev1.Pod) error { - return client.IgnoreNotFound(r.Delete(ctx, worker)) + return nil } func (r *CacheGroupReconciler) listActualWorkers(ctx context.Context, cg *juicefsiov1.CacheGroup) ([]corev1.Pod, error) { @@ -296,23 +308,76 @@ func (r *CacheGroupReconciler) listActualWorkers(ctx context.Context, cg *juicef return workers.Items, nil } +// removeRedundantWorkers deletes the redundant workers +// if the worker still has cache blocks, tweak the group weight to zero, waiting for data redistribution, then delete it +// if the worker has no cache blocks, delete it directly +// if the worker not ready delete it directly func (r *CacheGroupReconciler) removeRedundantWorkers( ctx context.Context, expectStates map[string]juicefsiov1.CacheGroupWorkerTemplate, actualWorks []corev1.Pod) error { log := log.FromContext(ctx) for _, worker := range actualWorks { + if worker.DeletionTimestamp != nil { + continue + } if _, ok := expectStates[worker.Spec.NodeName]; !ok { - log.Info("found redundant cache group worker, delete it", "worker", worker.Name) - if err := r.deleteCacheGroupWorker(ctx, &worker); err != nil { - log.Error(err, "failed to delete cache group worker", "worker", worker.Name) - return err + // if the worker in delete state, + cacheBytes, err := utils.GetWorkerCacheBlocksBytes(ctx, worker, common.MountPoint) + if err != nil { + log.Error(err, "failed to get worker cache blocks bytes", "worker", worker.Name) + continue + } + if cacheBytes > 0 { + log.Info("found redundant worker, but it still has cache blocks, tweak the group weight to zero", "worker", worker.Name) + if err := r.gracefulShutdownWorker(ctx, &worker); err != nil { + log.Error(err, "failed to graceful shutdown worker", "worker", worker) + } + } else { + log.Info("found redundant worker, delete it", "worker", worker.Name) + if err := r.deleteCacheGroupWorker(ctx, &worker, false); err != nil { + log.Error(err, "failed to delete worker", "worker", worker.Name) + return err + } } } } return nil } +// change pod options `group-weight` to zero, delete and recreate the worker pod +func (r *CacheGroupReconciler) gracefulShutdownWorker(ctx context.Context, worker *corev1.Pod) error { + if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { + return nil + } + if err := r.deleteCacheGroupWorker(ctx, worker, true); err != nil { + return err + } + builder.UpdateWorkerGroupWeight(worker, 0) + worker.ResourceVersion = "" + worker.Annotations[common.AnnoWaitingDeleteWorker] = time.Now().Format(time.RFC3339) + if err := r.Create(ctx, worker); err != nil { + return err + } + return nil +} + +func (r *CacheGroupReconciler) asBackupWorkerOrNot(cg *juicefsiov1.CacheGroup, actual *corev1.Pod) bool { + // If it is a new node and there are already 2 or more worker nodes + // then this node is a backup worker. + if actual == nil { + return cg.Status.ReadyWorker >= 2 + } + // If this node has been acting as a backup node for 10 minutes + // then this node is a normal worker. + if v, ok := actual.Annotations[common.AnnoBackupWorker]; ok { + backupAt := utils.MustParseTime(v) + // TODO: 10 minutes should be configurable + return time.Since(backupAt) < 10*time.Minute + } + return false +} + func (r *CacheGroupReconciler) calculateStatus( cg *juicefsiov1.CacheGroup, expectStates map[string]juicefsiov1.CacheGroupWorkerTemplate, @@ -323,8 +388,20 @@ func (r *CacheGroupReconciler) calculateStatus( newStatus.Phase = juicefsiov1.CacheGroupPhaseWaiting return newStatus } + backupWorker := 0 + waitingDeletedWorker := 0 + for _, worker := range actualWorks { + if _, ok := worker.Annotations[common.AnnoBackupWorker]; ok { + backupWorker++ + } + if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { + waitingDeletedWorker++ + } + } newStatus.ReadyWorker = int32(len(actualWorks)) newStatus.ExpectWorker = int32(len(expectStates)) + newStatus.BackUpWorker = int32(backupWorker) + newStatus.WaitingDeletedWorker = int32(waitingDeletedWorker) newStatus.ReadyStr = fmt.Sprintf("%d/%d", newStatus.ReadyWorker, newStatus.ExpectWorker) if newStatus.ExpectWorker != newStatus.ReadyWorker { newStatus.Phase = juicefsiov1.CacheGroupPhaseProgressing @@ -353,7 +430,7 @@ func (r *CacheGroupReconciler) waitForWorkerReady(ctx context.Context, cg *juice return err } if utils.IsPodReady(worker) && utils.IsMountPointReady(ctx, worker, common.MountPoint) { - log.Info("cache group worker is ready", "worker", worker.Name) + log.Info("worker is ready", "worker", worker.Name) return nil } time.Sleep(time.Second) diff --git a/pkg/builder/pod.go b/pkg/builder/pod.go index d602d11..3fe6d2f 100644 --- a/pkg/builder/pod.go +++ b/pkg/builder/pod.go @@ -16,12 +16,17 @@ package builder import ( "context" + "fmt" + "regexp" "strings" + "time" juicefsiov1 "github.com/juicedata/juicefs-cache-group-operator/api/v1" "github.com/juicedata/juicefs-cache-group-operator/pkg/common" "github.com/juicedata/juicefs-cache-group-operator/pkg/utils" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -52,27 +57,29 @@ var ( ) type PodBuilder struct { - volName string - cg *juicefsiov1.CacheGroup - node string - spec juicefsiov1.CacheGroupWorkerTemplate - secretData map[string]string - initConfig string + volName string + cg *juicefsiov1.CacheGroup + node string + spec juicefsiov1.CacheGroupWorkerTemplate + secretData map[string]string + initConfig string + backUpWorker bool } -func NewPodBuilder(cg *juicefsiov1.CacheGroup, secret *corev1.Secret, node string, spec juicefsiov1.CacheGroupWorkerTemplate) *PodBuilder { +func NewPodBuilder(cg *juicefsiov1.CacheGroup, secret *corev1.Secret, node string, spec juicefsiov1.CacheGroupWorkerTemplate, backUpWorker bool) *PodBuilder { secretData := utils.ParseSecret(secret) initconfig := "" if v, ok := secretData["initconfig"]; ok && v != "" { initconfig = v } return &PodBuilder{ - secretData: secretData, - volName: secretData["name"], - cg: cg, - node: node, - spec: spec, - initConfig: initconfig, + secretData: secretData, + volName: secretData["name"], + cg: cg, + node: node, + spec: spec, + initConfig: initconfig, + backUpWorker: backUpWorker, } } @@ -215,6 +222,9 @@ func (p *PodBuilder) genCommands(ctx context.Context) []string { cacheDirs = append(cacheDirs, "/var/jfsCache") } opts = append(opts, "cache-dir="+strings.Join(cacheDirs, ":")) + if p.backUpWorker { + opts = append(opts, "group-backup") + } mountCmds = append(mountCmds, "-o", strings.Join(opts, ",")) cmds := []string{ "sh", @@ -301,6 +311,16 @@ func (p *PodBuilder) NewCacheGroupWorker(ctx context.Context) *corev1.Pod { } worker.Spec.Containers[0].Env = p.genEnvs() worker.Spec.Containers[0].Command = p.genCommands(ctx) + + hash := utils.GenHash(worker) + + // The following fields do not participate in the hash calculation. + worker.Annotations[common.LabelWorkerHash] = hash + if p.backUpWorker { + backupAt := time.Now().Format(time.RFC3339) + worker.Annotations[common.AnnoBackupWorker] = backupAt + } + return worker } @@ -357,3 +377,15 @@ func MergeCacheGrouopWorkerTemplate(template *juicefsiov1.CacheGroupWorkerTempla template.DNSPolicy = overwrite.DNSPolicy } } + +func UpdateWorkerGroupWeight(worker *corev1.Pod, weight int) { + cmd := worker.Spec.Containers[0].Command[2] + weightStr := fmt.Sprint(weight) + if strings.Contains(cmd, "group-weight") { + regex := regexp.MustCompile("group-weight=[0-9]+") + cmd = regex.ReplaceAllString(cmd, "group-weight="+weightStr) + } else { + cmd = cmd + ",group-weight=" + weightStr + } + worker.Spec.Containers[0].Command[2] = cmd +} diff --git a/pkg/builder/pod_test.go b/pkg/builder/pod_test.go index 369148b..3d07d9b 100644 --- a/pkg/builder/pod_test.go +++ b/pkg/builder/pod_test.go @@ -21,6 +21,7 @@ import ( juicefsiov1 "github.com/juicedata/juicefs-cache-group-operator/api/v1" "github.com/juicedata/juicefs-cache-group-operator/pkg/common" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -167,3 +168,45 @@ func TestPodBuilder_genCommands(t *testing.T) { }) } } +func TestUpdateWorkerGroupWeight(t *testing.T) { + tests := []struct { + name string + worker *corev1.Pod + weight int + expected string + }{ + { + name: "no group-weight option", + worker: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Command: []string{"sh", "-c", "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache"}, + }}, + }, + }, + weight: 10, + expected: "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=10", + }, + { + name: "with group-weight option", + worker: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Command: []string{"sh", "-c", "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=10"}, + }}, + }, + }, + weight: 0, + expected: "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=0", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + UpdateWorkerGroupWeight(tt.worker, tt.weight) + if tt.worker.Spec.Containers[0].Command[2] != tt.expected { + t.Errorf("UpdateWorkerGroupWeight() = %v, want %v", tt.worker.Spec.Containers[0].Command[2], tt.expected) + } + }) + } +} diff --git a/pkg/common/common.go b/pkg/common/common.go index b546d49..53fb0a7 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -42,6 +42,9 @@ const ( LabelWorkerValue = "juicefs-cache-group-worker" LabelAppType = "app.kubernetes.io/name" LabelJobValue = "juicefs-warmup-job" + + AnnoBackupWorker = "juicefs.io/backup-worker" + AnnoWaitingDeleteWorker = "juicefs.io/waiting-delete-worker" ) var ( diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index e440c56..5c9e995 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -15,7 +15,6 @@ package utils import ( - "bytes" "context" "fmt" "strings" @@ -25,10 +24,6 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/remotecommand" - "k8s.io/kubectl/pkg/scheme" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -45,55 +40,22 @@ func IsPodReady(pod corev1.Pod) bool { // IsMountPointReady checks if the mount point is ready in the given pod func IsMountPointReady(ctx context.Context, pod corev1.Pod, mountPoint string) bool { log := log.FromContext(ctx).WithName("checkMountPoint").WithValues("worker", pod.Name) - config := ctrl.GetConfigOrDie() + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - log.Error(err, "failed to create Kubernetes client") - return false - } - cmd := []string{"sh", "-c", fmt.Sprintf("stat %s", mountPoint)} - req := clientset.CoreV1().RESTClient(). - Post(). - Resource("pods"). - Name(pod.Name). - Namespace(pod.Namespace). - SubResource("exec") - - req.VersionedParams(&corev1.PodExecOptions{ - Container: common.WorkerContainerName, - Command: cmd, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: false, - }, scheme.ParameterCodec) - - exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) - if err != nil { - log.Error(err, "failed to create SPDY executor") - return false - } - var stdout, stderr bytes.Buffer - err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ - Stdin: nil, - Stdout: &stdout, - Stderr: &stderr, - Tty: false, - }) - if err != nil && stderr.Len() == 0 { + stdout, stderr, err := ExecInPod(ctx, pod.Namespace, pod.Name, common.WorkerContainerName, cmd) + + if err != nil && len(stderr) == 0 { log.Error(err, "failed to execute command") return false } - if stderr.Len() > 0 { - log.V(1).Info("mount point is not ready", "stderr", strings.Trim(stderr.String(), "\n")) + if len(stderr) > 0 { + log.V(1).Info("mount point is not ready", "stderr", strings.Trim(stderr, "\n")) return false } - if !strings.Contains(stdout.String(), "Inode: 1") { + if !strings.Contains(stdout, "Inode: 1") { log.V(1).Info("mount point is not ready") return false } @@ -101,6 +63,43 @@ func IsMountPointReady(ctx context.Context, pod corev1.Pod, mountPoint string) b return true } +func GetWorkerCacheBlocksBytes(ctx context.Context, pod corev1.Pod, mountPoint string) (int, error) { + if !IsPodReady(pod) || !IsMountPointReady(ctx, pod, mountPoint) { + return 0, fmt.Errorf("pod %s is not ready yet", pod.Name) + } + log := log.FromContext(ctx).WithName("getWorkerCacheBlocksBytes").WithValues("worker", pod.Name) + + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + cmd := []string{"sh", "-c", fmt.Sprintf("cat %s/.stats", mountPoint)} + + stdout, stderr, err := ExecInPod(ctx, pod.Namespace, pod.Name, common.WorkerContainerName, cmd) + + if err != nil && len(stderr) == 0 { + return 0, err + } + if len(stderr) > 0 { + log.Error(err, "failed to get cache blocks bytes", "stderr", strings.Trim(stderr, "\n")) + return 0, err + } + // parse stdout + // style like: "blockcache.bytes: 0\nblockcache.blocks: 0" + lines := strings.Split(stdout, "\n") + cacheBytes := 0 + + for _, line := range lines { + if strings.HasPrefix(line, "blockcache.bytes:") { + if _, err := fmt.Sscanf(line, "blockcache.bytes: %d", &cacheBytes); err != nil { + log.Error(err, "failed to parse cache bytes in stats file") + return 0, err + } + break + } + } + + return cacheBytes, nil +} + func ParseUpdateStrategy(strategy *appsv1.DaemonSetUpdateStrategy, total int) (appsv1.DaemonSetUpdateStrategyType, int) { if strategy == nil { return appsv1.RollingUpdateDaemonSetStrategyType, 1 diff --git a/pkg/utils/terminal.go b/pkg/utils/terminal.go new file mode 100644 index 0000000..6466af2 --- /dev/null +++ b/pkg/utils/terminal.go @@ -0,0 +1,72 @@ +// Copyright 2024 Juicedata Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "bytes" + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/kubectl/pkg/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +func ExecInPod(ctx context.Context, namespace, name, container string, cmd []string) (string, string, error) { + log := log.FromContext(ctx).WithName("execInPod") + config := ctrl.GetConfigOrDie() + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error(err, "failed to create Kubernetes client") + return "", "", err + } + + req := clientset.CoreV1().RESTClient(). + Post(). + Resource("pods"). + Name(name). + Namespace(namespace). + SubResource("exec") + + req.VersionedParams(&corev1.PodExecOptions{ + Container: container, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + log.Error(err, "failed to create SPDY executor") + return "", "", err + } + + var stdout, stderr bytes.Buffer + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + Tty: false, + }) + if err != nil { + return stdout.String(), stderr.String(), err + } + return stdout.String(), stderr.String(), nil +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index aa56268..01cbc8e 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -18,6 +18,7 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "time" ) func ToPtr[T any](v T) *T { @@ -48,3 +49,11 @@ func GenHash(object interface{}) string { hash := sha256.Sum256(data) return hex.EncodeToString(hash[:]) } + +func MustParseTime(s string) time.Time { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + panic(err) + } + return t +} From cb2d4e2c3024611c515ac66fe0cf9c9db508cf99 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Wed, 20 Nov 2024 17:20:08 +0800 Subject: [PATCH 02/16] fix ci Signed-off-by: Xuhui zhang --- internal/controller/cachegroup_controller.go | 75 +++++++---- pkg/common/common.go | 3 + pkg/utils/pod.go | 2 +- .../e2e-test-cachegroup.member_change.yaml | 29 ++++ test/e2e/e2e_test.go | 127 +++++++++++++++++- 5 files changed, 208 insertions(+), 28 deletions(-) create mode 100644 test/e2e/config/e2e-test-cachegroup.member_change.yaml diff --git a/internal/controller/cachegroup_controller.go b/internal/controller/cachegroup_controller.go index 7701933..cb545fc 100644 --- a/internal/controller/cachegroup_controller.go +++ b/internal/controller/cachegroup_controller.go @@ -150,6 +150,9 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr numUnavailable++ go func() { defer wg.Done() + if backUpWorker { + log.V(1).Info("new worker added, as backup worker", "worker", expectWorker.Name) + } if err := r.createOrUpdateWorker(ctx, actualState, expectWorker); err != nil { log.Error(err, "failed to create or update worker", "worker", expectWorker.Name) errCh <- err @@ -184,7 +187,7 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr log.Error(err, "failed to list actual worker nodes") return err } - if err := r.removeRedundantWorkers(ctx, expectStates, actualWorks); err != nil { + if err := r.removeRedundantWorkers(ctx, cg, expectStates, actualWorks); err != nil { log.Error(err, "failed to remove redundant") return err } @@ -314,52 +317,74 @@ func (r *CacheGroupReconciler) listActualWorkers(ctx context.Context, cg *juicef // if the worker not ready delete it directly func (r *CacheGroupReconciler) removeRedundantWorkers( ctx context.Context, + cg *juicefsiov1.CacheGroup, expectStates map[string]juicefsiov1.CacheGroupWorkerTemplate, actualWorks []corev1.Pod) error { log := log.FromContext(ctx) for _, worker := range actualWorks { + if _, ok := expectStates[worker.Spec.NodeName]; ok { + continue + } if worker.DeletionTimestamp != nil { continue } - if _, ok := expectStates[worker.Spec.NodeName]; !ok { - // if the worker in delete state, - cacheBytes, err := utils.GetWorkerCacheBlocksBytes(ctx, worker, common.MountPoint) + + if cg.Status.ReadyWorker > 1 { + delete, err := r.gracefulShutdownWorker(ctx, &worker) if err != nil { - log.Error(err, "failed to get worker cache blocks bytes", "worker", worker.Name) - continue + log.Error(err, "failed to graceful shutdown worker", "worker", worker) + return err } - if cacheBytes > 0 { - log.Info("found redundant worker, but it still has cache blocks, tweak the group weight to zero", "worker", worker.Name) - if err := r.gracefulShutdownWorker(ctx, &worker); err != nil { - log.Error(err, "failed to graceful shutdown worker", "worker", worker) - } - } else { - log.Info("found redundant worker, delete it", "worker", worker.Name) - if err := r.deleteCacheGroupWorker(ctx, &worker, false); err != nil { - log.Error(err, "failed to delete worker", "worker", worker.Name) - return err - } + if !delete { + continue } } + + log.Info("found redundant worker, delete it", "worker", worker.Name) + if err := r.deleteCacheGroupWorker(ctx, &worker, false); err != nil { + log.Error(err, "failed to delete worker", "worker", worker.Name) + return err + } } return nil } // change pod options `group-weight` to zero, delete and recreate the worker pod -func (r *CacheGroupReconciler) gracefulShutdownWorker(ctx context.Context, worker *corev1.Pod) error { +func (r *CacheGroupReconciler) gracefulShutdownWorker(ctx context.Context, worker *corev1.Pod) (delete bool, err error) { + log := log.FromContext(ctx) + isReady := utils.IsPodReady(*worker) && utils.IsMountPointReady(ctx, *worker, common.MountPoint) + if !isReady { + if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { + return false, nil + } + return true, nil + } + + cacheBytes, err := utils.GetWorkerCacheBlocksBytes(ctx, *worker, common.MountPoint) + if err != nil { + log.Error(err, "failed to get worker cache blocks bytes", "worker", worker.Name) + return false, err + } + + if cacheBytes <= 0 { + log.V(1).Info("redundant worker has no cache blocks, delete it", "worker", worker.Name) + return true, nil + } + if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { - return nil + return false, nil } + log.V(1).Info("redundant worker has cache blocks, recreate to set group-weight to 0", "worker", worker.Name, "cacheBytes", cacheBytes) if err := r.deleteCacheGroupWorker(ctx, worker, true); err != nil { - return err + return false, err } builder.UpdateWorkerGroupWeight(worker, 0) worker.ResourceVersion = "" worker.Annotations[common.AnnoWaitingDeleteWorker] = time.Now().Format(time.RFC3339) if err := r.Create(ctx, worker); err != nil { - return err + return false, err } - return nil + return false, err } func (r *CacheGroupReconciler) asBackupWorkerOrNot(cg *juicefsiov1.CacheGroup, actual *corev1.Pod) bool { @@ -372,8 +397,7 @@ func (r *CacheGroupReconciler) asBackupWorkerOrNot(cg *juicefsiov1.CacheGroup, a // then this node is a normal worker. if v, ok := actual.Annotations[common.AnnoBackupWorker]; ok { backupAt := utils.MustParseTime(v) - // TODO: 10 minutes should be configurable - return time.Since(backupAt) < 10*time.Minute + return time.Since(backupAt) < common.BackupWorkerDuration } return false } @@ -385,6 +409,9 @@ func (r *CacheGroupReconciler) calculateStatus( newStatus := cg.Status if len(expectStates) == 0 { newStatus.ReadyStr = "-" + newStatus.ReadyWorker = 0 + newStatus.ExpectWorker = 0 + newStatus.BackUpWorker = 0 newStatus.Phase = juicefsiov1.CacheGroupPhaseWaiting return newStatus } diff --git a/pkg/common/common.go b/pkg/common/common.go index 53fb0a7..bbcea8e 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -16,6 +16,7 @@ package common import ( "fmt" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -58,6 +59,8 @@ var ( corev1.ResourceMemory: resource.MustParse("1Gi"), }, } + + BackupWorkerDuration = 10 * time.Minute ) func GenWorkerName(cgName string, nodeName string) string { diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index 5c9e995..94a5b52 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -39,7 +39,7 @@ func IsPodReady(pod corev1.Pod) bool { // IsMountPointReady checks if the mount point is ready in the given pod func IsMountPointReady(ctx context.Context, pod corev1.Pod, mountPoint string) bool { - log := log.FromContext(ctx).WithName("checkMountPoint").WithValues("worker", pod.Name) + log := log.FromContext(ctx).WithValues("worker", pod.Name) ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() diff --git a/test/e2e/config/e2e-test-cachegroup.member_change.yaml b/test/e2e/config/e2e-test-cachegroup.member_change.yaml new file mode 100644 index 0000000..540abe7 --- /dev/null +++ b/test/e2e/config/e2e-test-cachegroup.member_change.yaml @@ -0,0 +1,29 @@ +apiVersion: juicefs.io/v1 +kind: CacheGroup +metadata: + name: e2e-test-cachegroup +spec: + secretRef: + name: juicefs-secret + updateStrategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: 1 + worker: + template: + nodeSelector: + juicefs.io/cg-worker: "true" + image: registry.cn-hangzhou.aliyuncs.com/juicedata/mount:ee-5.1.2-59d9736 + dnsPolicy: ClusterFirstWithHostNet + hostNetwork: true + tolerations: + - key: node-role.kubernetes.io/control-plane + operator: Exists + effect: NoSchedule + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 1 + memory: 1Gi \ No newline at end of file diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 8ae812a..f0788cb 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -155,6 +155,22 @@ var _ = Describe("controller", Ordered, func() { cmd = exec.Command("kubectl", "apply", "-f", "test/e2e/config/e2e-test-cachegroup.yaml", "-n", namespace) _, err = utils.Run(cmd) Expect(err).NotTo(HaveOccurred()) + + By("validating cg status is up to date") + verifyCgStatusUpToDate := func() error { + // Validate pod status + cmd := exec.Command("kubectl", "get", + "cachegroups.juicefs.io", cgName, "-o", "jsonpath={.status.phase}", + "-n", namespace, + ) + status, err := utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + if string(status) != "Waiting" { + return fmt.Errorf("cg expect Waiting status, but got %s", status) + } + return nil + } + Eventually(verifyCgStatusUpToDate, time.Minute, time.Second).Should(Succeed()) }) AfterEach(func() { @@ -252,13 +268,17 @@ var _ = Describe("controller", Ordered, func() { err = json.Unmarshal(result, &nodes) ExpectWithOffset(1, err).NotTo(HaveOccurred()) for _, node := range nodes.Items { + checkCmd := expectCmds + if node.Annotations[common.AnnoBackupWorker] != "" { + checkCmd = checkCmd + ",group-backup" + } ExpectWithOffset(1, node.Spec.HostNetwork).Should(BeTrue()) ExpectWithOffset(1, node.Spec.Containers[0].Image).Should(Equal(image)) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Cpu().String()).Should(Equal("100m")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Memory().String()).Should(Equal("128Mi")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("1")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("1Gi")) - ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", expectCmds})) + ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", checkCmd})) } return nil } @@ -397,24 +417,125 @@ var _ = Describe("controller", Ordered, func() { err = json.Unmarshal(result, &nodes) ExpectWithOffset(1, err).NotTo(HaveOccurred()) for _, node := range nodes.Items { + checkCmd := normalCmds + if node.Spec.NodeName == utils.GetKindNodeName("worker2") { + checkCmd = worker2Cmds + } + if node.Annotations[common.AnnoBackupWorker] != "" { + checkCmd = checkCmd + ",group-backup" + } ExpectWithOffset(1, node.Spec.HostNetwork).Should(BeTrue()) ExpectWithOffset(1, node.Spec.Containers[0].Image).Should(Equal(image)) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Cpu().String()).Should(Equal("100m")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Memory().String()).Should(Equal("128Mi")) + ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", checkCmd})) if node.Spec.NodeName == utils.GetKindNodeName("worker2") { - ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", worker2Cmds})) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("2")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("2Gi")) } else { ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("1")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("1Gi")) - ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", normalCmds})) } } return nil } Eventually(verifyWorkerSpec, time.Minute, time.Second).Should(Succeed()) }) + + It("should gracefully handle member change ", func() { + cmd := exec.Command("kubectl", "apply", "-f", "test/e2e/config/e2e-test-cachegroup.member_change.yaml", "-n", namespace) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + controlPlane := utils.GetKindNodeName("control-plane") + worker := utils.GetKindNodeName("worker") + worker2 := utils.GetKindNodeName("worker2") + + cmd = exec.Command("kubectl", "label", "nodes", controlPlane, worker, "juicefs.io/cg-worker=true", "--overwrite") + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("validating cg status is up to date") + verifyCgStatusUpToDate := func() error { + cmd = exec.Command("kubectl", "get", + "cachegroups.juicefs.io", cgName, "-o", "jsonpath={.status.readyWorker}", + "-n", namespace, + ) + readyWorker, err := utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + if string(readyWorker) != "2" { + return fmt.Errorf("cg expect has 2 readyWorker status, but got %s", readyWorker) + } + return nil + } + Eventually(verifyCgStatusUpToDate, 5*time.Minute, 3*time.Second).Should(Succeed()) + + cmd = exec.Command("kubectl", "label", "nodes", worker2, "juicefs.io/cg-worker=true", "--overwrite") + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + verifyCgStatusUpToDate = func() error { + cmd = exec.Command("kubectl", "get", + "cachegroups.juicefs.io", cgName, "-o", "jsonpath={.status.readyWorker}", + "-n", namespace, + ) + readyWorker, err := utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + if string(readyWorker) != "3" { + return fmt.Errorf("cg expect has 3 readyWorker status, but got %s", readyWorker) + } + return nil + } + Eventually(verifyCgStatusUpToDate, 5*time.Minute, 3*time.Second).Should(Succeed()) + + By("validating new node should be as backup node") + verifyBackupWorkerSpec := func() error { + cmd := exec.Command("kubectl", "get", "pods", "-l", "juicefs.io/cache-group="+cgName, "-n", namespace, "-o", "json") + result, err := utils.Run(cmd) + if err != nil { + return fmt.Errorf("get worker pods failed, %+v", err) + } + worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,cache-dir=/var/jfsCache,group-backup" + nodes := corev1.PodList{} + err = json.Unmarshal(result, &nodes) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + for _, node := range nodes.Items { + if node.Spec.NodeName == worker2 { + ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", worker2Cmds})) + } + } + return nil + } + Eventually(verifyBackupWorkerSpec, time.Minute, time.Second).Should(Succeed()) + + verifyWarmupFile := func() error { + cmd = exec.Command("kubectl", "exec", "-i", "-n", + namespace, "-c", common.WorkerContainerName, common.GenWorkerName("e2e-test-cachegroup", worker), "--", + "sh", "-c", "echo 1 > /mnt/jfs/cache-group-test.txt") + _, err = utils.Run(cmd) + return err + } + Eventually(verifyWarmupFile, time.Minute, time.Second).Should(Succeed()) + + cmd = exec.Command("kubectl", "label", "nodes", worker, "juicefs.io/cg-worker-") + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + verifyWorkerWeightToZero := func() error { + cmd := exec.Command("kubectl", "get", "pods", common.GenWorkerName(cgName, worker), "-n", namespace, "-o", "json") + result, err := utils.Run(cmd) + if err != nil { + return fmt.Errorf("get worker pods failed, %+v", err) + } + workerNode := corev1.Pod{} + err = json.Unmarshal(result, &workerNode) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + if !strings.Contains(workerNode.Spec.Containers[0].Command[2], "group-weight=0") { + return fmt.Errorf("worker weight not set to 0") + } + return nil + } + Eventually(verifyWorkerWeightToZero, time.Minute, time.Second).Should(Succeed()) + }) }) Context("WarmUp Controller", func() { From 7cc71f30fc5d680bd98b2d0e25110954afc6ea61 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Thu, 21 Nov 2024 10:38:22 +0800 Subject: [PATCH 03/16] fix ci Signed-off-by: Xuhui zhang --- test/e2e/e2e_test.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index f0788cb..9c1d841 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -512,7 +512,24 @@ var _ = Describe("controller", Ordered, func() { namespace, "-c", common.WorkerContainerName, common.GenWorkerName("e2e-test-cachegroup", worker), "--", "sh", "-c", "echo 1 > /mnt/jfs/cache-group-test.txt") _, err = utils.Run(cmd) - return err + + // check block bytes + cmd = exec.Command("kubectl", "exec", "-i", "-n", + namespace, "-c", common.WorkerContainerName, common.GenWorkerName("e2e-test-cachegroup", worker), "--", + "sh", "-c", "cat /mnt/jfs/.stats | grep blockcache.bytes") + result, err := utils.Run(cmd) + if err != nil { + return fmt.Errorf("get block bytes failed, %+v", err) + } + var blockBytes int + _, err = fmt.Sscanf(strings.Trim(string(result), "\n"), "blockcache.bytes: %d", &blockBytes) + if err != nil { + return fmt.Errorf("failed to scan block bytes: %v", err) + } + if blockBytes == 0 { + return fmt.Errorf("block bytes is ") + } + return nil } Eventually(verifyWarmupFile, time.Minute, time.Second).Should(Succeed()) From 1758eadbd431f51c5956c310f771b81dbf8f4d73 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Mon, 25 Nov 2024 17:50:25 +0800 Subject: [PATCH 04/16] update Signed-off-by: Xuhui zhang --- api/v1/cachegroup_types.go | 19 +++++ api/v1/zz_generated.deepcopy.go | 20 +++++ config/crd/bases/juicefs.io_cachegroups.yaml | 28 ++++++ dist/crd.yaml | 40 +++++++++ internal/controller/cachegroup_controller.go | 2 +- pkg/builder/pod.go | 89 +++++++++++++++----- pkg/builder/pod_test.go | 29 +++++-- pkg/common/common.go | 9 +- test/e2e/e2e_test.go | 10 ++- 9 files changed, 214 insertions(+), 32 deletions(-) diff --git a/api/v1/cachegroup_types.go b/api/v1/cachegroup_types.go index 44f94c3..ab7dde8 100644 --- a/api/v1/cachegroup_types.go +++ b/api/v1/cachegroup_types.go @@ -25,6 +25,24 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +type CacheDirType string + +var ( + CacheDirTypeHostPath CacheDirType = "HostPath" + CacheDirTypePVC CacheDirType = "PVC" +) + +type CacheDir struct { + // +kubebuilder:validation:Enum=HostPath;PVC + Type CacheDirType `json:"type,omitempty"` + // required for HostPath type + // +optional + Path string `json:"path,omitempty"` + // required for PVC type + // +optional + Name string `json:"name,omitempty"` +} + // CacheGroupWorkerTemplate defines cache group worker template type CacheGroupWorkerTemplate struct { NodeSelector map[string]string `json:"nodeSelector,omitempty"` @@ -32,6 +50,7 @@ type CacheGroupWorkerTemplate struct { HostNetwork *bool `json:"hostNetwork,omitempty"` SchedulerName string `json:"schedulerName,omitempty"` Tolerations []corev1.Toleration `json:"tolerations,omitempty"` + CacheDirs []CacheDir `json:"cacheDirs,omitempty"` // Container image. // More info: https://kubernetes.io/docs/concepts/containers/images diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index f20bbf3..eabf063 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -26,6 +26,21 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CacheDir) DeepCopyInto(out *CacheDir) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CacheDir. +func (in *CacheDir) DeepCopy() *CacheDir { + if in == nil { + return nil + } + out := new(CacheDir) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CacheGroup) DeepCopyInto(out *CacheGroup) { *out = *in @@ -215,6 +230,11 @@ func (in *CacheGroupWorkerTemplate) DeepCopyInto(out *CacheGroupWorkerTemplate) (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.CacheDirs != nil { + in, out := &in.CacheDirs, &out.CacheDirs + *out = make([]CacheDir, len(*in)) + copy(*out, *in) + } if in.Env != nil { in, out := &in.Env, &out.Env *out = make([]corev1.EnvVar, len(*in)) diff --git a/config/crd/bases/juicefs.io_cachegroups.yaml b/config/crd/bases/juicefs.io_cachegroups.yaml index b7cfaab..cb9e066 100644 --- a/config/crd/bases/juicefs.io_cachegroups.yaml +++ b/config/crd/bases/juicefs.io_cachegroups.yaml @@ -83,6 +83,20 @@ spec: overwrite: items: properties: + cacheDirs: + items: + properties: + name: + type: string + path: + type: string + type: + enum: + - HostPath + - PVC + type: string + type: object + type: array dnsPolicy: type: string env: @@ -1477,6 +1491,20 @@ spec: type: array template: properties: + cacheDirs: + items: + properties: + name: + type: string + path: + type: string + type: + enum: + - HostPath + - PVC + type: string + type: object + type: array dnsPolicy: type: string env: diff --git a/dist/crd.yaml b/dist/crd.yaml index c7d556c..d806c8e 100644 --- a/dist/crd.yaml +++ b/dist/crd.yaml @@ -22,6 +22,12 @@ spec: - jsonPath: .status.phase name: Phase type: string + - jsonPath: .status.backUpWorker + name: Back up + type: string + - jsonPath: .status.WaitingDeletedWorker + name: Waiting Deleted + type: string - jsonPath: .status.readyStr name: Ready type: string @@ -76,6 +82,20 @@ spec: overwrite: items: properties: + cacheDirs: + items: + properties: + name: + type: string + path: + type: string + type: + enum: + - HostPath + - PVC + type: string + type: object + type: array dnsPolicy: type: string env: @@ -1470,6 +1490,20 @@ spec: type: array template: properties: + cacheDirs: + items: + properties: + name: + type: string + path: + type: string + type: + enum: + - HostPath + - PVC + type: string + type: object + type: array dnsPolicy: type: string env: @@ -2861,6 +2895,9 @@ spec: type: object status: properties: + backUpWorker: + format: int32 + type: integer cacheGroup: type: string conditions: @@ -2889,6 +2926,9 @@ spec: readyWorker: format: int32 type: integer + waitingDeletedWorker: + format: int32 + type: integer type: object type: object served: true diff --git a/internal/controller/cachegroup_controller.go b/internal/controller/cachegroup_controller.go index cb545fc..e248301 100644 --- a/internal/controller/cachegroup_controller.go +++ b/internal/controller/cachegroup_controller.go @@ -229,7 +229,7 @@ func (r *CacheGroupReconciler) parseExpectState(ctx context.Context, cg *juicefs for _, overwrite := range cg.Spec.Worker.Overwrite { if utils.SliceContains(overwrite.Nodes, node.Name) || (overwrite.NodeSelector != nil && utils.NodeSelectorContains(overwrite.NodeSelector, node.Labels)) { - builder.MergeCacheGrouopWorkerTemplate(expectState, overwrite) + builder.MergeCacheGroupWorkerTemplate(expectState, overwrite) } } expectStates[node.Name] = *expectState diff --git a/pkg/builder/pod.go b/pkg/builder/pod.go index 3fe6d2f..c55623a 100644 --- a/pkg/builder/pod.go +++ b/pkg/builder/pod.go @@ -57,13 +57,14 @@ var ( ) type PodBuilder struct { - volName string - cg *juicefsiov1.CacheGroup - node string - spec juicefsiov1.CacheGroupWorkerTemplate - secretData map[string]string - initConfig string - backUpWorker bool + volName string + cg *juicefsiov1.CacheGroup + node string + spec juicefsiov1.CacheGroupWorkerTemplate + secretData map[string]string + initConfig string + backUpWorker bool + cacheDirsInContainer []string } func NewPodBuilder(cg *juicefsiov1.CacheGroup, secret *corev1.Secret, node string, spec juicefsiov1.CacheGroupWorkerTemplate, backUpWorker bool) *PodBuilder { @@ -185,6 +186,60 @@ func (p *PodBuilder) genAuthCmds(ctx context.Context) []string { return authCmds } +func (p *PodBuilder) genCacheDirs() { + volumes := []corev1.Volume{} + volumeMounts := []corev1.VolumeMount{} + for i, dir := range p.spec.CacheDirs { + cachePathInContainer := fmt.Sprintf("%s%d", common.CacheDirVolumeMountPathPrefix, i) + volumeName := fmt.Sprintf("%s%d", common.CacheDirVolumeNamePrefix, i) + p.spec.VolumeMounts = append(volumeMounts, corev1.VolumeMount{ + Name: volumeName, + MountPath: cachePathInContainer, + }) + switch dir.Type { + case juicefsiov1.CacheDirTypeHostPath: + p.spec.Volumes = append(volumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: dir.Path, + Type: utils.ToPtr(corev1.HostPathDirectoryOrCreate), + }, + }, + }) + case juicefsiov1.CacheDirTypePVC: + p.spec.Volumes = append(volumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: dir.Name, + }, + }, + }) + } + p.cacheDirsInContainer = append(p.cacheDirsInContainer, cachePathInContainer) + } + + if len(p.cacheDirsInContainer) == 0 { + cachePathInContainer := common.DefaultCacheHostPath + volumeName := fmt.Sprintf("%s%d", common.CacheDirVolumeNamePrefix, 0) + p.spec.Volumes = append(volumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: common.DefaultCacheHostPath, + Type: utils.ToPtr(corev1.HostPathDirectoryOrCreate), + }, + }, + }) + p.spec.VolumeMounts = append(volumeMounts, corev1.VolumeMount{ + Name: volumeName, + MountPath: cachePathInContainer, + }) + p.cacheDirsInContainer = append(p.cacheDirsInContainer, cachePathInContainer) + } +} + func (p *PodBuilder) genCommands(ctx context.Context) []string { authCmds := p.genAuthCmds(ctx) cacheGroup := GenCacheGroupName(p.cg) @@ -200,16 +255,10 @@ func (p *PodBuilder) genCommands(ctx context.Context) []string { "cache-group=" + cacheGroup, } - cacheDirs := []string{} parsedOpts := utils.ParseOptions(ctx, p.spec.Opts) for _, opt := range parsedOpts { if opt[0] == "cache-dir" { - if opt[1] == "" { - log.FromContext(ctx).Info("invalid cache-dir option", "option", opt) - continue - } - cacheDirs = strings.Split(opt[1], ":") - continue + log.FromContext(ctx).Info("cache-dir option is not allowed, plz use cacheDirs instead") } if opt[1] != "" { opts = append(opts, strings.TrimSpace(opt[0])+"="+strings.TrimSpace(opt[1])) @@ -217,11 +266,7 @@ func (p *PodBuilder) genCommands(ctx context.Context) []string { opts = append(opts, strings.TrimSpace(opt[0])) } } - - if len(cacheDirs) == 0 { - cacheDirs = append(cacheDirs, "/var/jfsCache") - } - opts = append(opts, "cache-dir="+strings.Join(cacheDirs, ":")) + opts = append(opts, "cache-dir="+strings.Join(p.cacheDirsInContainer, ":")) if p.backUpWorker { opts = append(opts, "group-backup") } @@ -262,6 +307,7 @@ func (p *PodBuilder) genInitConfigVolumes() { func (p *PodBuilder) NewCacheGroupWorker(ctx context.Context) *corev1.Pod { worker := newBasicPod(p.cg, p.node) p.genInitConfigVolumes() + p.genCacheDirs() spec := p.spec if spec.HostNetwork != nil { worker.Spec.HostNetwork = *spec.HostNetwork @@ -324,7 +370,7 @@ func (p *PodBuilder) NewCacheGroupWorker(ctx context.Context) *corev1.Pod { return worker } -func MergeCacheGrouopWorkerTemplate(template *juicefsiov1.CacheGroupWorkerTemplate, overwrite juicefsiov1.CacheGroupWorkerOverwrite) { +func MergeCacheGroupWorkerTemplate(template *juicefsiov1.CacheGroupWorkerTemplate, overwrite juicefsiov1.CacheGroupWorkerOverwrite) { if overwrite.ServiceAccountName != "" { template.ServiceAccountName = overwrite.ServiceAccountName } @@ -376,6 +422,9 @@ func MergeCacheGrouopWorkerTemplate(template *juicefsiov1.CacheGroupWorkerTempla if overwrite.DNSPolicy != nil { template.DNSPolicy = overwrite.DNSPolicy } + if overwrite.CacheDirs != nil { + template.CacheDirs = overwrite.CacheDirs + } } func UpdateWorkerGroupWeight(worker *corev1.Pod, weight int) { diff --git a/pkg/builder/pod_test.go b/pkg/builder/pod_test.go index 3d07d9b..67e07f0 100644 --- a/pkg/builder/pod_test.go +++ b/pkg/builder/pod_test.go @@ -57,7 +57,7 @@ func TestPodBuilder_genCommands(t *testing.T) { }, }, { - name: "with cache-dir option", + name: "with cache-dir", podBuilder: &PodBuilder{ volName: "test-name", cg: &juicefsiov1.CacheGroup{ @@ -71,14 +71,19 @@ func TestPodBuilder_genCommands(t *testing.T) { "secret-key": "test-secret-key", }, spec: juicefsiov1.CacheGroupWorkerTemplate{ - Opts: []string{"cache-dir=/custom/cache"}, + CacheDirs: []juicefsiov1.CacheDir{ + { + Type: juicefsiov1.CacheDirTypeHostPath, + Path: "/custom/cache", + }, + }, }, }, expected: []string{ "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY}\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,cache-dir=/custom/cache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,cache-dir=/var/jfsCache-0", }, }, { @@ -96,14 +101,14 @@ func TestPodBuilder_genCommands(t *testing.T) { "secret-key": "test-secret-key", }, spec: juicefsiov1.CacheGroupWorkerTemplate{ - Opts: []string{"cache-dir=/custom/cache", "verbose"}, + Opts: []string{"a=b", "verbose"}, }, }, expected: []string{ "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY}\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,verbose,cache-dir=/custom/cache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,a=b,verbose,cache-dir=/var/jfsCache", }, }, { @@ -122,14 +127,14 @@ func TestPodBuilder_genCommands(t *testing.T) { "format-options": "format-options,format-options2", }, spec: juicefsiov1.CacheGroupWorkerTemplate{ - Opts: []string{"cache-dir=/custom/cache", "verbose"}, + Opts: []string{"verbose"}, }, }, expected: []string{ "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY} --format-options --format-options2\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,verbose,cache-dir=/custom/cache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,verbose,cache-dir=/var/jfsCache", }, }, { @@ -141,6 +146,13 @@ func TestPodBuilder_genCommands(t *testing.T) { Name: "test-cg", Namespace: "default", }, + Spec: juicefsiov1.CacheGroupSpec{ + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "test-secret", + }, + }, + }, }, initConfig: "initconfig", secretData: map[string]string{ @@ -161,6 +173,8 @@ func TestPodBuilder_genCommands(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.TODO() + tt.podBuilder.genInitConfigVolumes() + tt.podBuilder.genCacheDirs() got := tt.podBuilder.genCommands(ctx) if !reflect.DeepEqual(got, tt.expected) { t.Errorf("genCommands() = %v, want %v", got, tt.expected) @@ -168,6 +182,7 @@ func TestPodBuilder_genCommands(t *testing.T) { }) } } + func TestUpdateWorkerGroupWeight(t *testing.T) { tests := []struct { name string diff --git a/pkg/common/common.go b/pkg/common/common.go index bbcea8e..ca54a4f 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -32,9 +32,12 @@ const ( // Finalizer is the finalizer for CacheGroup Finalizer = "juicefs.io/finalizer" // juicefs binary path - JuiceFSBinary = "/usr/bin/juicefs" - JuiceFsMountBinary = "/sbin/mount.juicefs" - MountPoint = "/mnt/jfs" + JuiceFSBinary = "/usr/bin/juicefs" + JuiceFsMountBinary = "/sbin/mount.juicefs" + MountPoint = "/mnt/jfs" + CacheDirVolumeNamePrefix = "jfs-cache-dir-" + CacheDirVolumeMountPathPrefix = "/var/jfsCache-" + DefaultCacheHostPath = "/var/jfsCache" // label keys LabelCacheGroup = "juicefs.io/cache-group" diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 9c1d841..6398ecb 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -279,6 +279,10 @@ var _ = Describe("controller", Ordered, func() { ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("1")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("1Gi")) ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", checkCmd})) + ExpectWithOffset(1, node.Spec.Volumes[0].Name).Should(Equal("jfs-cache-dir-0")) + ExpectWithOffset(1, node.Spec.Volumes[0].HostPath.Path).Should(Equal("/var/jfsCache")) + ExpectWithOffset(1, node.Spec.Containers[0].VolumeMounts[0].Name).Should(Equal("jfs-cache-dir-0")) + ExpectWithOffset(1, node.Spec.Containers[0].VolumeMounts[0].MountPath).Should(Equal("/var/jfsCache")) } return nil } @@ -412,7 +416,7 @@ var _ = Describe("controller", Ordered, func() { return fmt.Errorf("get worker pods failed, %+v", err) } normalCmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.1,group-weight=200,cache-dir=/var/jfsCache" - worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.01,group-weight=100,cache-dir=/var/jfsCache" + worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.01,group-weight=100,cache-dir=/var/jfsCache-0" nodes := corev1.PodList{} err = json.Unmarshal(result, &nodes) ExpectWithOffset(1, err).NotTo(HaveOccurred()) @@ -432,6 +436,10 @@ var _ = Describe("controller", Ordered, func() { if node.Spec.NodeName == utils.GetKindNodeName("worker2") { ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("2")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("2Gi")) + ExpectWithOffset(1, node.Spec.Volumes[0].Name).Should(Equal("jfs-cache-dir-0")) + ExpectWithOffset(1, node.Spec.Volumes[0].HostPath.Path).Should(Equal("/data/juicefs")) + ExpectWithOffset(1, node.Spec.Containers[0].VolumeMounts[0].Name).Should(Equal("jfs-cache-dir-0")) + ExpectWithOffset(1, node.Spec.Containers[0].VolumeMounts[0].MountPath).Should(Equal("/var/jfsCache-0")) } else { ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("1")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("1Gi")) From 22b6ecc4dc1647ebc6f01983389ef3a6d4b77a49 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Tue, 26 Nov 2024 12:03:15 +0800 Subject: [PATCH 05/16] fix ci Signed-off-by: Xuhui zhang --- test/e2e/config/e2e-test-cachegroup.overwrite.yaml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/test/e2e/config/e2e-test-cachegroup.overwrite.yaml b/test/e2e/config/e2e-test-cachegroup.overwrite.yaml index d45158f..316da68 100644 --- a/test/e2e/config/e2e-test-cachegroup.overwrite.yaml +++ b/test/e2e/config/e2e-test-cachegroup.overwrite.yaml @@ -39,4 +39,7 @@ spec: memory: 2Gi opts: - free-space-ratio=0.01 - - group-weight=100 \ No newline at end of file + - group-weight=100 + cacheDirs: + - type: HostPath + path: /data/juicefs \ No newline at end of file From 4231534cf5fc3d04b768e6d618c7dbabcce5cf3f Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Tue, 3 Dec 2024 11:04:51 +0800 Subject: [PATCH 06/16] add fileSystem status Signed-off-by: Xuhui zhang --- api/v1/cachegroup_types.go | 1 + config/crd/bases/juicefs.io_cachegroups.yaml | 2 ++ dist/crd.yaml | 2 ++ internal/controller/cachegroup_controller.go | 4 +++- 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/api/v1/cachegroup_types.go b/api/v1/cachegroup_types.go index ab7dde8..8fc5147 100644 --- a/api/v1/cachegroup_types.go +++ b/api/v1/cachegroup_types.go @@ -164,6 +164,7 @@ type CacheGroupStatus struct { Phase CacheGroupPhase `json:"phase,omitempty"` Conditions []CacheGroupCondition `json:"conditions,omitempty"` + FileSystem string `json:"fileSystem,omitempty"` ReadyWorker int32 `json:"readyWorker,omitempty"` BackUpWorker int32 `json:"backUpWorker,omitempty"` WaitingDeletedWorker int32 `json:"waitingDeletedWorker,omitempty"` diff --git a/config/crd/bases/juicefs.io_cachegroups.yaml b/config/crd/bases/juicefs.io_cachegroups.yaml index cb9e066..081109c 100644 --- a/config/crd/bases/juicefs.io_cachegroups.yaml +++ b/config/crd/bases/juicefs.io_cachegroups.yaml @@ -2920,6 +2920,8 @@ spec: expectWorker: format: int32 type: integer + fileSystem: + type: string phase: type: string readyStr: diff --git a/dist/crd.yaml b/dist/crd.yaml index d806c8e..5dab2fc 100644 --- a/dist/crd.yaml +++ b/dist/crd.yaml @@ -2919,6 +2919,8 @@ spec: expectWorker: format: int32 type: integer + fileSystem: + type: string phase: type: string readyStr: diff --git a/internal/controller/cachegroup_controller.go b/internal/controller/cachegroup_controller.go index e248301..40c37d5 100644 --- a/internal/controller/cachegroup_controller.go +++ b/internal/controller/cachegroup_controller.go @@ -193,7 +193,7 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr } // calculate status - newStatus := r.calculateStatus(cg, expectStates, actualWorks) + newStatus := r.calculateStatus(cg, string(secret.Data["name"]), expectStates, actualWorks) if !reflect.DeepEqual(cg.Status, newStatus) { cg.Status = newStatus return utils.IgnoreConflict(r.Status().Update(ctx, cg)) @@ -404,9 +404,11 @@ func (r *CacheGroupReconciler) asBackupWorkerOrNot(cg *juicefsiov1.CacheGroup, a func (r *CacheGroupReconciler) calculateStatus( cg *juicefsiov1.CacheGroup, + fileSystem string, expectStates map[string]juicefsiov1.CacheGroupWorkerTemplate, actualWorks []corev1.Pod) juicefsiov1.CacheGroupStatus { newStatus := cg.Status + newStatus.FileSystem = fileSystem if len(expectStates) == 0 { newStatus.ReadyStr = "-" newStatus.ReadyWorker = 0 From 0e483cda9dc8756f16ba1a7b72ced17556a9cba1 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Tue, 3 Dec 2024 15:50:59 +0800 Subject: [PATCH 07/16] fix: check version Signed-off-by: Xuhui zhang --- internal/controller/cachegroup_controller.go | 23 ++++++---- pkg/builder/pod.go | 22 ++++----- pkg/utils/utils.go | 37 +++++++++++++++ pkg/utils/utils_test.go | 48 ++++++++++++++++++++ 4 files changed, 110 insertions(+), 20 deletions(-) diff --git a/internal/controller/cachegroup_controller.go b/internal/controller/cachegroup_controller.go index 40c37d5..41f1fdd 100644 --- a/internal/controller/cachegroup_controller.go +++ b/internal/controller/cachegroup_controller.go @@ -137,8 +137,8 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr log.Error(err, "failed to get actual state", "node", node) continue } - backUpWorker := r.asBackupWorkerOrNot(cg, actualState) - podBuilder := builder.NewPodBuilder(cg, secret, node, expectState, backUpWorker) + groupBackUp := r.shouldAddGroupBackupOrNot(cg, actualState, expectState.Image) + podBuilder := builder.NewPodBuilder(cg, secret, node, expectState, groupBackUp) expectWorker := podBuilder.NewCacheGroupWorker(ctx) if r.actualShouldbeUpdate(updateStrategyType, expectWorker, actualState) { @@ -150,8 +150,8 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr numUnavailable++ go func() { defer wg.Done() - if backUpWorker { - log.V(1).Info("new worker added, as backup worker", "worker", expectWorker.Name) + if groupBackUp { + log.V(1).Info("new worker added, add group-backup option", "worker", expectWorker.Name) } if err := r.createOrUpdateWorker(ctx, actualState, expectWorker); err != nil { log.Error(err, "failed to create or update worker", "worker", expectWorker.Name) @@ -354,6 +354,7 @@ func (r *CacheGroupReconciler) gracefulShutdownWorker(ctx context.Context, worke log := log.FromContext(ctx) isReady := utils.IsPodReady(*worker) && utils.IsMountPointReady(ctx, *worker, common.MountPoint) if !isReady { + // TODO: add a timeout for data migration. if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { return false, nil } @@ -387,13 +388,17 @@ func (r *CacheGroupReconciler) gracefulShutdownWorker(ctx context.Context, worke return false, err } -func (r *CacheGroupReconciler) asBackupWorkerOrNot(cg *juicefsiov1.CacheGroup, actual *corev1.Pod) bool { - // If it is a new node and there are already 2 or more worker nodes - // then this node is a backup worker. +func (r *CacheGroupReconciler) shouldAddGroupBackupOrNot(cg *juicefsiov1.CacheGroup, actual *corev1.Pod, newImage string) bool { + if utils.CompareEEImageVersion(newImage, "5.1.0") < 0 { + return false + } + + // If it is a new node and there are already 1 or more worker nodes + // then this node should add group-backup. if actual == nil { - return cg.Status.ReadyWorker >= 2 + return cg.Status.ReadyWorker >= 1 } - // If this node has been acting as a backup node for 10 minutes + // If this node has been added group-backup for 10 minutes // then this node is a normal worker. if v, ok := actual.Annotations[common.AnnoBackupWorker]; ok { backupAt := utils.MustParseTime(v) diff --git a/pkg/builder/pod.go b/pkg/builder/pod.go index c55623a..28a936f 100644 --- a/pkg/builder/pod.go +++ b/pkg/builder/pod.go @@ -63,24 +63,24 @@ type PodBuilder struct { spec juicefsiov1.CacheGroupWorkerTemplate secretData map[string]string initConfig string - backUpWorker bool + groupBackup bool cacheDirsInContainer []string } -func NewPodBuilder(cg *juicefsiov1.CacheGroup, secret *corev1.Secret, node string, spec juicefsiov1.CacheGroupWorkerTemplate, backUpWorker bool) *PodBuilder { +func NewPodBuilder(cg *juicefsiov1.CacheGroup, secret *corev1.Secret, node string, spec juicefsiov1.CacheGroupWorkerTemplate, groupBackup bool) *PodBuilder { secretData := utils.ParseSecret(secret) initconfig := "" if v, ok := secretData["initconfig"]; ok && v != "" { initconfig = v } return &PodBuilder{ - secretData: secretData, - volName: secretData["name"], - cg: cg, - node: node, - spec: spec, - initConfig: initconfig, - backUpWorker: backUpWorker, + secretData: secretData, + volName: secretData["name"], + cg: cg, + node: node, + spec: spec, + initConfig: initconfig, + groupBackup: groupBackup, } } @@ -267,7 +267,7 @@ func (p *PodBuilder) genCommands(ctx context.Context) []string { } } opts = append(opts, "cache-dir="+strings.Join(p.cacheDirsInContainer, ":")) - if p.backUpWorker { + if p.groupBackup { opts = append(opts, "group-backup") } mountCmds = append(mountCmds, "-o", strings.Join(opts, ",")) @@ -362,7 +362,7 @@ func (p *PodBuilder) NewCacheGroupWorker(ctx context.Context) *corev1.Pod { // The following fields do not participate in the hash calculation. worker.Annotations[common.LabelWorkerHash] = hash - if p.backUpWorker { + if p.groupBackup { backupAt := time.Now().Format(time.RFC3339) worker.Annotations[common.AnnoBackupWorker] = backupAt } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 01cbc8e..10d0a68 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -18,6 +18,8 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "strconv" + "strings" "time" ) @@ -57,3 +59,38 @@ func MustParseTime(s string) time.Time { } return t } + +// CompareImageVersion compares two image versions +// return 1 if image > target +// return -1 if image < target +// return 0 if image == target +func CompareEEImageVersion(image, target string) int { + current := strings.Split(image, ":")[1] + current = strings.ReplaceAll(current, "ee-", "") + if strings.Contains(current, "latest") || + strings.Contains(current, "nightly") || + strings.Contains(current, "dev") { + return 1 + } + + currentParts := strings.Split(current, ".") + targetParts := strings.Split(target, ".") + for i := 0; i < len(currentParts); i++ { + if i >= len(targetParts) { + return 1 + } + v1, _ := strconv.Atoi(currentParts[i]) + v2, _ := strconv.Atoi(targetParts[i]) + if v1 > v2 { + return 1 + } else if v1 < v2 { + return -1 + } + } + + if len(currentParts) < len(targetParts) { + return -1 + } + + return 0 +} diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index 961f389..0319503 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -96,3 +96,51 @@ func TestContainsNodeSelector(t *testing.T) { }) } } + +func TestCompareImageVersion(t *testing.T) { + tests := []struct { + name string + current string + target string + want int + }{ + { + name: "Test with current greater than target", + current: "juicedata/mount:ee-1.2.3", + target: "1.2.2", + want: 1, + }, + { + name: "Test with current less than target", + current: "juicedata/mount:ee-1.2.2", + target: "1.2.3", + want: -1, + }, + { + name: "Test with current equal to target", + current: "juicedata/mount:ee-1.2.3", + target: "1.2.3", + want: 0, + }, + { + name: "Test with current having less parts than target", + current: "juicedata/mount:ee-1.2", + target: "1.2.3", + want: -1, + }, + { + name: "Test with specific version", + current: "juicedata/mount:ee-nightly", + target: "1.2.1", + want: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := CompareEEImageVersion(tt.current, tt.target); got != tt.want { + t.Errorf("CompareImageVersion() = %v, want = %v", got, tt.want) + } + }) + } +} From 2b67114c3e9e80f282be0a690be0cf85b87d40c5 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Thu, 5 Dec 2024 10:22:00 +0800 Subject: [PATCH 08/16] fix Signed-off-by: Xuhui zhang --- api/v1/cachegroup_types.go | 8 ++++++++ api/v1/zz_generated.deepcopy.go | 11 +++++++++++ config/crd/bases/juicefs.io_cachegroups.yaml | 4 ++++ config/manager/kustomization.yaml | 4 ++-- dist/crd.yaml | 4 ++++ internal/controller/cachegroup_controller.go | 20 ++++++++++++++------ pkg/common/common.go | 3 ++- pkg/utils/utils.go | 17 +++++++++++++++++ 8 files changed, 62 insertions(+), 9 deletions(-) diff --git a/api/v1/cachegroup_types.go b/api/v1/cachegroup_types.go index 8fc5147..53c2665 100644 --- a/api/v1/cachegroup_types.go +++ b/api/v1/cachegroup_types.go @@ -140,6 +140,14 @@ type CacheGroupSpec struct { CleanCache bool `json:"cleanCache,omitempty"` CacheGroup string `json:"cacheGroup,omitempty"` Worker CacheGroupWorkerSpec `json:"worker,omitempty"` + // Duration for new node to join cluster with group-backup option + // Default is 10 minutes + // +optional + BackupDuration *metav1.Duration `json:"backupDuration,omitempty"` + // Maximum time to wait for data migration when deleting + // Default is 1 hour + // +optional + WaitingDeletedMaxDuration *metav1.Duration `json:"waitingDeletedMaxDuration,omitempty"` } type CacheGroupPhase string diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index eabf063..638934c 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -23,6 +23,7 @@ package v1 import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) @@ -130,6 +131,16 @@ func (in *CacheGroupSpec) DeepCopyInto(out *CacheGroupSpec) { (*in).DeepCopyInto(*out) } in.Worker.DeepCopyInto(&out.Worker) + if in.BackupDuration != nil { + in, out := &in.BackupDuration, &out.BackupDuration + *out = new(metav1.Duration) + **out = **in + } + if in.WaitingDeletedMaxDuration != nil { + in, out := &in.WaitingDeletedMaxDuration, &out.WaitingDeletedMaxDuration + *out = new(metav1.Duration) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CacheGroupSpec. diff --git a/config/crd/bases/juicefs.io_cachegroups.yaml b/config/crd/bases/juicefs.io_cachegroups.yaml index 081109c..794cf6d 100644 --- a/config/crd/bases/juicefs.io_cachegroups.yaml +++ b/config/crd/bases/juicefs.io_cachegroups.yaml @@ -47,6 +47,8 @@ spec: type: object spec: properties: + backupDuration: + type: string cacheGroup: type: string cleanCache: @@ -78,6 +80,8 @@ spec: type: type: string type: object + waitingDeletedMaxDuration: + type: string worker: properties: overwrite: diff --git a/config/manager/kustomization.yaml b/config/manager/kustomization.yaml index 97db9d0..ad13e96 100644 --- a/config/manager/kustomization.yaml +++ b/config/manager/kustomization.yaml @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization images: - name: controller - newName: registry.zzde.me/juicefs-cache-group-operator - newTag: v0.2.5 + newName: controller + newTag: latest diff --git a/dist/crd.yaml b/dist/crd.yaml index 5dab2fc..3b07d42 100644 --- a/dist/crd.yaml +++ b/dist/crd.yaml @@ -46,6 +46,8 @@ spec: type: object spec: properties: + backupDuration: + type: string cacheGroup: type: string cleanCache: @@ -77,6 +79,8 @@ spec: type: type: string type: object + waitingDeletedMaxDuration: + type: string worker: properties: overwrite: diff --git a/internal/controller/cachegroup_controller.go b/internal/controller/cachegroup_controller.go index 41f1fdd..203e9b8 100644 --- a/internal/controller/cachegroup_controller.go +++ b/internal/controller/cachegroup_controller.go @@ -330,7 +330,7 @@ func (r *CacheGroupReconciler) removeRedundantWorkers( } if cg.Status.ReadyWorker > 1 { - delete, err := r.gracefulShutdownWorker(ctx, &worker) + delete, err := r.gracefulShutdownWorker(ctx, cg, &worker) if err != nil { log.Error(err, "failed to graceful shutdown worker", "worker", worker) return err @@ -350,11 +350,13 @@ func (r *CacheGroupReconciler) removeRedundantWorkers( } // change pod options `group-weight` to zero, delete and recreate the worker pod -func (r *CacheGroupReconciler) gracefulShutdownWorker(ctx context.Context, worker *corev1.Pod) (delete bool, err error) { +func (r *CacheGroupReconciler) gracefulShutdownWorker( + ctx context.Context, + cg *juicefsiov1.CacheGroup, + worker *corev1.Pod) (delete bool, err error) { log := log.FromContext(ctx) isReady := utils.IsPodReady(*worker) && utils.IsMountPointReady(ctx, *worker, common.MountPoint) if !isReady { - // TODO: add a timeout for data migration. if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { return false, nil } @@ -372,7 +374,13 @@ func (r *CacheGroupReconciler) gracefulShutdownWorker(ctx context.Context, worke return true, nil } - if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { + if v, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { + waitingAt := utils.MustParseTime(v) + if time.Since(waitingAt) > utils.GetWaitingDeletedMaxDuration(cg.Spec.WaitingDeletedMaxDuration) { + log.Info("redundant worker still has cache blocks, waiting for data migration timeout, delete it", "worker", worker.Name) + return true, nil + } + // already set group-weight to 0 return false, nil } log.V(1).Info("redundant worker has cache blocks, recreate to set group-weight to 0", "worker", worker.Name, "cacheBytes", cacheBytes) @@ -398,11 +406,11 @@ func (r *CacheGroupReconciler) shouldAddGroupBackupOrNot(cg *juicefsiov1.CacheGr if actual == nil { return cg.Status.ReadyWorker >= 1 } - // If this node has been added group-backup for 10 minutes + // If this node has been added group-backup for x(default 10m) minutes // then this node is a normal worker. if v, ok := actual.Annotations[common.AnnoBackupWorker]; ok { backupAt := utils.MustParseTime(v) - return time.Since(backupAt) < common.BackupWorkerDuration + return time.Since(backupAt) < utils.GetBackupWorkerDuration(cg.Spec.BackupDuration) } return false } diff --git a/pkg/common/common.go b/pkg/common/common.go index ca54a4f..8aad8d9 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -63,7 +63,8 @@ var ( }, } - BackupWorkerDuration = 10 * time.Minute + DefaultBackupWorkerDuration = 10 * time.Minute + DefaultWaitingMaxDuration = 1 * time.Hour ) func GenWorkerName(cgName string, nodeName string) string { diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 10d0a68..4d2ad21 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -21,6 +21,9 @@ import ( "strconv" "strings" "time" + + "github.com/juicedata/juicefs-cache-group-operator/pkg/common" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func ToPtr[T any](v T) *T { @@ -94,3 +97,17 @@ func CompareEEImageVersion(image, target string) int { return 0 } + +func GetWaitingDeletedMaxDuration(d *metav1.Duration) time.Duration { + if d == nil { + return common.DefaultWaitingMaxDuration + } + return d.Duration +} + +func GetBackupWorkerDuration(d *metav1.Duration) time.Duration { + if d == nil { + return common.DefaultBackupWorkerDuration + } + return d.Duration +} From 1a7e49aa43ea49a8f22879fe957e5bb7591c3bae Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Fri, 6 Dec 2024 11:44:58 +0800 Subject: [PATCH 09/16] fix cache-dir Signed-off-by: Xuhui zhang --- pkg/builder/pod.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/pkg/builder/pod.go b/pkg/builder/pod.go index 28a936f..f077311 100644 --- a/pkg/builder/pod.go +++ b/pkg/builder/pod.go @@ -187,18 +187,16 @@ func (p *PodBuilder) genAuthCmds(ctx context.Context) []string { } func (p *PodBuilder) genCacheDirs() { - volumes := []corev1.Volume{} - volumeMounts := []corev1.VolumeMount{} for i, dir := range p.spec.CacheDirs { cachePathInContainer := fmt.Sprintf("%s%d", common.CacheDirVolumeMountPathPrefix, i) volumeName := fmt.Sprintf("%s%d", common.CacheDirVolumeNamePrefix, i) - p.spec.VolumeMounts = append(volumeMounts, corev1.VolumeMount{ + p.spec.VolumeMounts = append(p.spec.VolumeMounts, corev1.VolumeMount{ Name: volumeName, MountPath: cachePathInContainer, }) switch dir.Type { case juicefsiov1.CacheDirTypeHostPath: - p.spec.Volumes = append(volumes, corev1.Volume{ + p.spec.Volumes = append(p.spec.Volumes, corev1.Volume{ Name: volumeName, VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ @@ -208,7 +206,7 @@ func (p *PodBuilder) genCacheDirs() { }, }) case juicefsiov1.CacheDirTypePVC: - p.spec.Volumes = append(volumes, corev1.Volume{ + p.spec.Volumes = append(p.spec.Volumes, corev1.Volume{ Name: volumeName, VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ @@ -223,7 +221,7 @@ func (p *PodBuilder) genCacheDirs() { if len(p.cacheDirsInContainer) == 0 { cachePathInContainer := common.DefaultCacheHostPath volumeName := fmt.Sprintf("%s%d", common.CacheDirVolumeNamePrefix, 0) - p.spec.Volumes = append(volumes, corev1.Volume{ + p.spec.Volumes = append(p.spec.Volumes, corev1.Volume{ Name: volumeName, VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ @@ -232,7 +230,7 @@ func (p *PodBuilder) genCacheDirs() { }, }, }) - p.spec.VolumeMounts = append(volumeMounts, corev1.VolumeMount{ + p.spec.VolumeMounts = append(p.spec.VolumeMounts, corev1.VolumeMount{ Name: volumeName, MountPath: cachePathInContainer, }) From ebff75eff3458178da9a413258bae6f3421cd948 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Fri, 6 Dec 2024 14:04:11 +0800 Subject: [PATCH 10/16] [ci skip]: update sample Signed-off-by: Xuhui zhang --- config/samples/v1_cachegroup.yaml | 36 ++++++++----------------------- 1 file changed, 9 insertions(+), 27 deletions(-) diff --git a/config/samples/v1_cachegroup.yaml b/config/samples/v1_cachegroup.yaml index c236e6f..ffa52f2 100644 --- a/config/samples/v1_cachegroup.yaml +++ b/config/samples/v1_cachegroup.yaml @@ -36,36 +36,18 @@ spec: limits: cpu: 1 memory: 1Gi + cacheDirs: + - path: /var/jfsCache-0 + type: HostPath opts: - - cache-dir=/mnt/juicefs:/mnt/juicefs2:/mnt/juicefs3 - volumeMounts: - - name: cachedir-0 - mountPath: /mnt/juicefs - - name: cachedir-1 - mountPath: /mnt/juicefs2 - - name: cachedir-2 - mountPath: /mnt/juicefs3 - volumes: - - name: cachedir-0 - hostPath: - path: /mnt/juicefs - - name: cachedir-1 - hostPath: - path: /dev/sda/juicefs2 - - name: cachedir-2 - persistentVolumeClaim: - claimName: juicefs-cache-pvc + - group-weight=100 overwrite: - nodes: - k8s-03 opts: - - cache-dir=/mnt/juicefs - group-weight=50 - volumeMounts: - - name: cachedir-0 - mountPath: /mnt/juicefs - volumes: - - name: cachedir-0 - hostPath: - path: /mnt/juicefs - + cacheDirs: + - path: /var/jfsCache-1 + type: HostPath + - path: /var/jfsCache-2 + type: HostPath From d68d0753cfa68789006e7a5fa4cae82fe7922649 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Fri, 6 Dec 2024 14:10:17 +0800 Subject: [PATCH 11/16] [ci skip]: update sample Signed-off-by: Xuhui zhang --- config/samples/v1_cachegroup.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/config/samples/v1_cachegroup.yaml b/config/samples/v1_cachegroup.yaml index ffa52f2..e4e47f7 100644 --- a/config/samples/v1_cachegroup.yaml +++ b/config/samples/v1_cachegroup.yaml @@ -16,6 +16,10 @@ metadata: spec: secretRef: name: juicefs-secret + # Duration for new node to join cluster with group-backup option + backupDuration: "10m" + # Maximum time to wait for data migration when deleting + waitingDeletedMaxDuration: "1h" updateStrategy: # support: # - RollingUpdate: default policy From 3c79160b3857b8d7f2e1896633c0ce2e3539e9ae Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Fri, 6 Dec 2024 14:13:58 +0800 Subject: [PATCH 12/16] [ci skip]: update sample Signed-off-by: Xuhui zhang --- config/samples/v1_cachegroup.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/config/samples/v1_cachegroup.yaml b/config/samples/v1_cachegroup.yaml index e4e47f7..a6b4822 100644 --- a/config/samples/v1_cachegroup.yaml +++ b/config/samples/v1_cachegroup.yaml @@ -2,6 +2,7 @@ apiVersion: v1 kind: Secret metadata: name: juicefs-secret + namespace: default type: Opaque stringData: name: juicefs-xx @@ -13,6 +14,7 @@ apiVersion: juicefs.io/v1 kind: CacheGroup metadata: name: cachegroup-sample + namespace: default spec: secretRef: name: juicefs-secret From a0e523d9e8a20ab2c2c2632c391a82f98bafd388 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Fri, 6 Dec 2024 17:42:04 +0800 Subject: [PATCH 13/16] fix: add `no-update` as default option Signed-off-by: Xuhui zhang --- pkg/builder/pod.go | 1 + test/e2e/e2e_test.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/builder/pod.go b/pkg/builder/pod.go index f077311..9282658 100644 --- a/pkg/builder/pod.go +++ b/pkg/builder/pod.go @@ -250,6 +250,7 @@ func (p *PodBuilder) genCommands(ctx context.Context) []string { opts := []string{ "foreground", + "no-update", "cache-group=" + cacheGroup, } diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 6398ecb..152b2f3 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -263,7 +263,7 @@ var _ = Describe("controller", Ordered, func() { if err != nil { return fmt.Errorf("get worker pods failed, %+v", err) } - expectCmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,cache-dir=/var/jfsCache" + expectCmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,cache-dir=/var/jfsCache" nodes := corev1.PodList{} err = json.Unmarshal(result, &nodes) ExpectWithOffset(1, err).NotTo(HaveOccurred()) @@ -415,8 +415,8 @@ var _ = Describe("controller", Ordered, func() { if err != nil { return fmt.Errorf("get worker pods failed, %+v", err) } - normalCmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.1,group-weight=200,cache-dir=/var/jfsCache" - worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.01,group-weight=100,cache-dir=/var/jfsCache-0" + normalCmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.1,group-weight=200,cache-dir=/var/jfsCache" + worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.01,group-weight=100,cache-dir=/var/jfsCache-0" nodes := corev1.PodList{} err = json.Unmarshal(result, &nodes) ExpectWithOffset(1, err).NotTo(HaveOccurred()) From 0af644168506685d7b32386d9e3efc7c8ae67d22 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Mon, 9 Dec 2024 15:37:33 +0800 Subject: [PATCH 14/16] fix unit test Signed-off-by: Xuhui zhang --- pkg/builder/pod_test.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/builder/pod_test.go b/pkg/builder/pod_test.go index 67e07f0..5f219f4 100644 --- a/pkg/builder/pod_test.go +++ b/pkg/builder/pod_test.go @@ -53,7 +53,7 @@ func TestPodBuilder_genCommands(t *testing.T) { "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY}\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,cache-dir=/var/jfsCache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,no-update,cache-group=default-test-cg,cache-dir=/var/jfsCache", }, }, { @@ -83,7 +83,7 @@ func TestPodBuilder_genCommands(t *testing.T) { "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY}\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,cache-dir=/var/jfsCache-0", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,no-update,cache-group=default-test-cg,cache-dir=/var/jfsCache-0", }, }, { @@ -108,7 +108,7 @@ func TestPodBuilder_genCommands(t *testing.T) { "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY}\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,a=b,verbose,cache-dir=/var/jfsCache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,no-update,cache-group=default-test-cg,a=b,verbose,cache-dir=/var/jfsCache", }, }, { @@ -134,7 +134,7 @@ func TestPodBuilder_genCommands(t *testing.T) { "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY} --format-options --format-options2\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,verbose,cache-dir=/var/jfsCache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,no-update,cache-group=default-test-cg,verbose,cache-dir=/var/jfsCache", }, }, { @@ -165,7 +165,7 @@ func TestPodBuilder_genCommands(t *testing.T) { "sh", "-c", "cp /etc/juicefs/test-name.conf /root/.juicefs\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,cache-dir=/var/jfsCache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,no-update,cache-group=default-test-cg,cache-dir=/var/jfsCache", }, }, } @@ -195,24 +195,24 @@ func TestUpdateWorkerGroupWeight(t *testing.T) { worker: &corev1.Pod{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Command: []string{"sh", "-c", "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache"}, + Command: []string{"sh", "-c", "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache"}, }}, }, }, weight: 10, - expected: "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=10", + expected: "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=10", }, { name: "with group-weight option", worker: &corev1.Pod{ Spec: corev1.PodSpec{ Containers: []corev1.Container{{ - Command: []string{"sh", "-c", "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=10"}, + Command: []string{"sh", "-c", "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=10"}, }}, }, }, weight: 0, - expected: "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=0", + expected: "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=0", }, } From 6aa71491d508e5750b4ae551a8d95182b310fc9e Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Mon, 9 Dec 2024 17:05:26 +0800 Subject: [PATCH 15/16] fix unit test Signed-off-by: Xuhui zhang --- test/e2e/e2e_test.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 152b2f3..615a604 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -336,11 +336,12 @@ var _ = Describe("controller", Ordered, func() { cmd = exec.Command("kubectl", "wait", "pod/"+expectWorkerName, "--for", "condition=Ready", "--namespace", namespace, - "--timeout", "2m", + "--timeout", "1m", ) _, err = utils.Run(cmd) - ExpectWithOffset(1, err).NotTo(HaveOccurred()) - + if err != nil { + return fmt.Errorf("wait worker pod failed, %+v", err) + } return nil } Eventually(verifyWorkerCreated, time.Minute, time.Second).Should(Succeed()) From ea1f93534df576abac29e069471e1ebeb0874132 Mon Sep 17 00:00:00 2001 From: Xuhui zhang Date: Mon, 9 Dec 2024 17:35:42 +0800 Subject: [PATCH 16/16] fix unit test Signed-off-by: Xuhui zhang --- pkg/builder/pod.go | 5 +++++ test/e2e/e2e_test.go | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/builder/pod.go b/pkg/builder/pod.go index 9282658..f6f5797 100644 --- a/pkg/builder/pod.go +++ b/pkg/builder/pod.go @@ -258,6 +258,11 @@ func (p *PodBuilder) genCommands(ctx context.Context) []string { for _, opt := range parsedOpts { if opt[0] == "cache-dir" { log.FromContext(ctx).Info("cache-dir option is not allowed, plz use cacheDirs instead") + continue + } + if utils.SliceContains(opts, opt[1]) { + log.FromContext(ctx).Info("option is duplicated, skip", "option", opt[0]) + continue } if opt[1] != "" { opts = append(opts, strings.TrimSpace(opt[0])+"="+strings.TrimSpace(opt[1])) diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 615a604..1502d64 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -503,7 +503,7 @@ var _ = Describe("controller", Ordered, func() { if err != nil { return fmt.Errorf("get worker pods failed, %+v", err) } - worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,cache-dir=/var/jfsCache,group-backup" + worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,cache-dir=/var/jfsCache,group-backup" nodes := corev1.PodList{} err = json.Unmarshal(result, &nodes) ExpectWithOffset(1, err).NotTo(HaveOccurred())