diff --git a/api/v1/cachegroup_types.go b/api/v1/cachegroup_types.go index a0c142e..53c2665 100644 --- a/api/v1/cachegroup_types.go +++ b/api/v1/cachegroup_types.go @@ -25,6 +25,24 @@ import ( // EDIT THIS FILE! THIS IS SCAFFOLDING FOR YOU TO OWN! // NOTE: json tags are required. Any new fields you add must have json tags for the fields to be serialized. +type CacheDirType string + +var ( + CacheDirTypeHostPath CacheDirType = "HostPath" + CacheDirTypePVC CacheDirType = "PVC" +) + +type CacheDir struct { + // +kubebuilder:validation:Enum=HostPath;PVC + Type CacheDirType `json:"type,omitempty"` + // required for HostPath type + // +optional + Path string `json:"path,omitempty"` + // required for PVC type + // +optional + Name string `json:"name,omitempty"` +} + // CacheGroupWorkerTemplate defines cache group worker template type CacheGroupWorkerTemplate struct { NodeSelector map[string]string `json:"nodeSelector,omitempty"` @@ -32,6 +50,7 @@ type CacheGroupWorkerTemplate struct { HostNetwork *bool `json:"hostNetwork,omitempty"` SchedulerName string `json:"schedulerName,omitempty"` Tolerations []corev1.Toleration `json:"tolerations,omitempty"` + CacheDirs []CacheDir `json:"cacheDirs,omitempty"` // Container image. // More info: https://kubernetes.io/docs/concepts/containers/images @@ -121,6 +140,14 @@ type CacheGroupSpec struct { CleanCache bool `json:"cleanCache,omitempty"` CacheGroup string `json:"cacheGroup,omitempty"` Worker CacheGroupWorkerSpec `json:"worker,omitempty"` + // Duration for new node to join cluster with group-backup option + // Default is 10 minutes + // +optional + BackupDuration *metav1.Duration `json:"backupDuration,omitempty"` + // Maximum time to wait for data migration when deleting + // Default is 1 hour + // +optional + WaitingDeletedMaxDuration *metav1.Duration `json:"waitingDeletedMaxDuration,omitempty"` } type CacheGroupPhase string @@ -145,10 +172,13 @@ type CacheGroupStatus struct { Phase CacheGroupPhase `json:"phase,omitempty"` Conditions []CacheGroupCondition `json:"conditions,omitempty"` - ReadyWorker int32 `json:"readyWorker,omitempty"` - ExpectWorker int32 `json:"expectWorker,omitempty"` - ReadyStr string `json:"readyStr,omitempty"` - CacheGroup string `json:"cacheGroup,omitempty"` + FileSystem string `json:"fileSystem,omitempty"` + ReadyWorker int32 `json:"readyWorker,omitempty"` + BackUpWorker int32 `json:"backUpWorker,omitempty"` + WaitingDeletedWorker int32 `json:"waitingDeletedWorker,omitempty"` + ExpectWorker int32 `json:"expectWorker,omitempty"` + ReadyStr string `json:"readyStr,omitempty"` + CacheGroup string `json:"cacheGroup,omitempty"` } // +kubebuilder:object:root=true @@ -156,6 +186,8 @@ type CacheGroupStatus struct { // +kubebuilder:resource:shortName=cg // +kubebuilder:printcolumn:name="Cache Group",type="string",JSONPath=".status.cacheGroup" // +kubebuilder:printcolumn:name="Phase",type="string",JSONPath=".status.phase" +// +kubebuilder:printcolumn:name="Back up",type="string",JSONPath=".status.backUpWorker" +// +kubebuilder:printcolumn:name="Waiting Deleted",type="string",JSONPath=".status.WaitingDeletedWorker" // +kubebuilder:printcolumn:name="Ready",type="string",JSONPath=".status.readyStr" // +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp" // CacheGroup is the Schema for the cachegroups API diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index f20bbf3..638934c 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -23,9 +23,25 @@ package v1 import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CacheDir) DeepCopyInto(out *CacheDir) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CacheDir. +func (in *CacheDir) DeepCopy() *CacheDir { + if in == nil { + return nil + } + out := new(CacheDir) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CacheGroup) DeepCopyInto(out *CacheGroup) { *out = *in @@ -115,6 +131,16 @@ func (in *CacheGroupSpec) DeepCopyInto(out *CacheGroupSpec) { (*in).DeepCopyInto(*out) } in.Worker.DeepCopyInto(&out.Worker) + if in.BackupDuration != nil { + in, out := &in.BackupDuration, &out.BackupDuration + *out = new(metav1.Duration) + **out = **in + } + if in.WaitingDeletedMaxDuration != nil { + in, out := &in.WaitingDeletedMaxDuration, &out.WaitingDeletedMaxDuration + *out = new(metav1.Duration) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CacheGroupSpec. @@ -215,6 +241,11 @@ func (in *CacheGroupWorkerTemplate) DeepCopyInto(out *CacheGroupWorkerTemplate) (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.CacheDirs != nil { + in, out := &in.CacheDirs, &out.CacheDirs + *out = make([]CacheDir, len(*in)) + copy(*out, *in) + } if in.Env != nil { in, out := &in.Env, &out.Env *out = make([]corev1.EnvVar, len(*in)) diff --git a/config/crd/bases/juicefs.io_cachegroups.yaml b/config/crd/bases/juicefs.io_cachegroups.yaml index 9086085..794cf6d 100644 --- a/config/crd/bases/juicefs.io_cachegroups.yaml +++ b/config/crd/bases/juicefs.io_cachegroups.yaml @@ -23,6 +23,12 @@ spec: - jsonPath: .status.phase name: Phase type: string + - jsonPath: .status.backUpWorker + name: Back up + type: string + - jsonPath: .status.WaitingDeletedWorker + name: Waiting Deleted + type: string - jsonPath: .status.readyStr name: Ready type: string @@ -41,6 +47,8 @@ spec: type: object spec: properties: + backupDuration: + type: string cacheGroup: type: string cleanCache: @@ -72,11 +80,27 @@ spec: type: type: string type: object + waitingDeletedMaxDuration: + type: string worker: properties: overwrite: items: properties: + cacheDirs: + items: + properties: + name: + type: string + path: + type: string + type: + enum: + - HostPath + - PVC + type: string + type: object + type: array dnsPolicy: type: string env: @@ -1471,6 +1495,20 @@ spec: type: array template: properties: + cacheDirs: + items: + properties: + name: + type: string + path: + type: string + type: + enum: + - HostPath + - PVC + type: string + type: object + type: array dnsPolicy: type: string env: @@ -2862,6 +2900,9 @@ spec: type: object status: properties: + backUpWorker: + format: int32 + type: integer cacheGroup: type: string conditions: @@ -2883,6 +2924,8 @@ spec: expectWorker: format: int32 type: integer + fileSystem: + type: string phase: type: string readyStr: @@ -2890,6 +2933,9 @@ spec: readyWorker: format: int32 type: integer + waitingDeletedWorker: + format: int32 + type: integer type: object type: object served: true diff --git a/config/samples/v1_cachegroup.yaml b/config/samples/v1_cachegroup.yaml index c236e6f..a6b4822 100644 --- a/config/samples/v1_cachegroup.yaml +++ b/config/samples/v1_cachegroup.yaml @@ -2,6 +2,7 @@ apiVersion: v1 kind: Secret metadata: name: juicefs-secret + namespace: default type: Opaque stringData: name: juicefs-xx @@ -13,9 +14,14 @@ apiVersion: juicefs.io/v1 kind: CacheGroup metadata: name: cachegroup-sample + namespace: default spec: secretRef: name: juicefs-secret + # Duration for new node to join cluster with group-backup option + backupDuration: "10m" + # Maximum time to wait for data migration when deleting + waitingDeletedMaxDuration: "1h" updateStrategy: # support: # - RollingUpdate: default policy @@ -36,36 +42,18 @@ spec: limits: cpu: 1 memory: 1Gi + cacheDirs: + - path: /var/jfsCache-0 + type: HostPath opts: - - cache-dir=/mnt/juicefs:/mnt/juicefs2:/mnt/juicefs3 - volumeMounts: - - name: cachedir-0 - mountPath: /mnt/juicefs - - name: cachedir-1 - mountPath: /mnt/juicefs2 - - name: cachedir-2 - mountPath: /mnt/juicefs3 - volumes: - - name: cachedir-0 - hostPath: - path: /mnt/juicefs - - name: cachedir-1 - hostPath: - path: /dev/sda/juicefs2 - - name: cachedir-2 - persistentVolumeClaim: - claimName: juicefs-cache-pvc + - group-weight=100 overwrite: - nodes: - k8s-03 opts: - - cache-dir=/mnt/juicefs - group-weight=50 - volumeMounts: - - name: cachedir-0 - mountPath: /mnt/juicefs - volumes: - - name: cachedir-0 - hostPath: - path: /mnt/juicefs - + cacheDirs: + - path: /var/jfsCache-1 + type: HostPath + - path: /var/jfsCache-2 + type: HostPath diff --git a/dist/crd.yaml b/dist/crd.yaml index c7d556c..3b07d42 100644 --- a/dist/crd.yaml +++ b/dist/crd.yaml @@ -22,6 +22,12 @@ spec: - jsonPath: .status.phase name: Phase type: string + - jsonPath: .status.backUpWorker + name: Back up + type: string + - jsonPath: .status.WaitingDeletedWorker + name: Waiting Deleted + type: string - jsonPath: .status.readyStr name: Ready type: string @@ -40,6 +46,8 @@ spec: type: object spec: properties: + backupDuration: + type: string cacheGroup: type: string cleanCache: @@ -71,11 +79,27 @@ spec: type: type: string type: object + waitingDeletedMaxDuration: + type: string worker: properties: overwrite: items: properties: + cacheDirs: + items: + properties: + name: + type: string + path: + type: string + type: + enum: + - HostPath + - PVC + type: string + type: object + type: array dnsPolicy: type: string env: @@ -1470,6 +1494,20 @@ spec: type: array template: properties: + cacheDirs: + items: + properties: + name: + type: string + path: + type: string + type: + enum: + - HostPath + - PVC + type: string + type: object + type: array dnsPolicy: type: string env: @@ -2861,6 +2899,9 @@ spec: type: object status: properties: + backUpWorker: + format: int32 + type: integer cacheGroup: type: string conditions: @@ -2882,6 +2923,8 @@ spec: expectWorker: format: int32 type: integer + fileSystem: + type: string phase: type: string readyStr: @@ -2889,6 +2932,9 @@ spec: readyWorker: format: int32 type: integer + waitingDeletedWorker: + format: int32 + type: integer type: object type: object served: true diff --git a/internal/controller/cachegroup_controller.go b/internal/controller/cachegroup_controller.go index 3e22f47..203e9b8 100644 --- a/internal/controller/cachegroup_controller.go +++ b/internal/controller/cachegroup_controller.go @@ -69,7 +69,7 @@ func (r *CacheGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err := r.Get(ctx, req.NamespacedName, cg); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } - l.V(1).Info("Reconcile CacheGroup") + l.V(1).Info("start reconciler") if cg.ObjectMeta.DeletionTimestamp.IsZero() { if !controllerutil.ContainsFinalizer(cg, common.Finalizer) { controllerutil.AddFinalizer(cg, common.Finalizer) @@ -103,7 +103,13 @@ func (r *CacheGroupReconciler) Reconcile(ctx context.Context, req ctrl.Request) l.Error(err, "failed to sync cache group workers") return ctrl.Result{}, err } - + l.V(1).Info("reconciler done") + if cg.Status.BackUpWorker > 0 || cg.Status.WaitingDeletedWorker > 0 { + return ctrl.Result{ + Requeue: true, + RequeueAfter: 1 * time.Minute, + }, nil + } return ctrl.Result{}, nil } @@ -114,7 +120,6 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr wg := sync.WaitGroup{} errCh := make(chan error, 2*maxUnavailable) numUnavailable := 0 - log.V(1).Info("start to sync cache group workers", "updateStrategy", updateStrategyType, "maxUnavailable", maxUnavailable) // TODO: add a webook to validate the cache group worker template secret := &corev1.Secret{} if err := r.Get(ctx, client.ObjectKey{Namespace: cg.Namespace, Name: cg.Spec.SecretRef.Name}, secret); err != nil { @@ -125,29 +130,31 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr log.Error(err, "failed to validate secret") return err } + log.V(1).Info("sync worker to expect states", "expectStates", len(expectStates)) for node, expectState := range expectStates { actualState, err := r.getActualState(ctx, cg, node) if err != nil && !apierrors.IsNotFound(err) { log.Error(err, "failed to get actual state", "node", node) continue } - podBuilder := builder.NewPodBuilder(cg, secret, node, expectState) + groupBackUp := r.shouldAddGroupBackupOrNot(cg, actualState, expectState.Image) + podBuilder := builder.NewPodBuilder(cg, secret, node, expectState, groupBackUp) expectWorker := podBuilder.NewCacheGroupWorker(ctx) - hash := utils.GenHash(expectWorker) - expectWorker.Annotations[common.LabelWorkerHash] = hash if r.actualShouldbeUpdate(updateStrategyType, expectWorker, actualState) { if numUnavailable >= maxUnavailable { - log.V(1).Info("maxUnavailable reached, skip updating cache group worker, waiting for next reconciler", "worker", expectWorker.Name) + log.V(1).Info("maxUnavailable reached, skip updating worker, waiting for next reconciler", "worker", expectWorker.Name) break } wg.Add(1) numUnavailable++ go func() { defer wg.Done() - log.Info("cache group worker need to be updated", "worker", expectWorker.Name) + if groupBackUp { + log.V(1).Info("new worker added, add group-backup option", "worker", expectWorker.Name) + } if err := r.createOrUpdateWorker(ctx, actualState, expectWorker); err != nil { - log.Error(err, "failed to create or update cache group worker", "worker", expectWorker.Name) + log.Error(err, "failed to create or update worker", "worker", expectWorker.Name) errCh <- err return } @@ -158,6 +165,8 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr return } }() + } else { + log.V(1).Info("worker is up to date", "worker", expectWorker.Name) } } wg.Wait() @@ -178,13 +187,13 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr log.Error(err, "failed to list actual worker nodes") return err } - if err := r.removeRedundantWorkers(ctx, expectStates, actualWorks); err != nil { + if err := r.removeRedundantWorkers(ctx, cg, expectStates, actualWorks); err != nil { log.Error(err, "failed to remove redundant") return err } // calculate status - newStatus := r.calculateStatus(cg, expectStates, actualWorks) + newStatus := r.calculateStatus(cg, string(secret.Data["name"]), expectStates, actualWorks) if !reflect.DeepEqual(cg.Status, newStatus) { cg.Status = newStatus return utils.IgnoreConflict(r.Status().Update(ctx, cg)) @@ -220,7 +229,7 @@ func (r *CacheGroupReconciler) parseExpectState(ctx context.Context, cg *juicefs for _, overwrite := range cg.Spec.Worker.Overwrite { if utils.SliceContains(overwrite.Nodes, node.Name) || (overwrite.NodeSelector != nil && utils.NodeSelectorContains(overwrite.NodeSelector, node.Labels)) { - builder.MergeCacheGrouopWorkerTemplate(expectState, overwrite) + builder.MergeCacheGroupWorkerTemplate(expectState, overwrite) } } expectStates[node.Name] = *expectState @@ -237,7 +246,9 @@ func (r *CacheGroupReconciler) getActualState(ctx context.Context, cg *juicefsio } func (r *CacheGroupReconciler) createOrUpdateWorker(ctx context.Context, actual, expect *corev1.Pod) error { + log := log.FromContext(ctx).WithValues("worker", expect.Name) if actual == nil { + log.Info("create worker") return r.createCacheGroupWorker(ctx, expect) } return r.updateCacheGroupWorker(ctx, actual, expect) @@ -255,37 +266,41 @@ func (r *CacheGroupReconciler) createCacheGroupWorker(ctx context.Context, expec } func (r *CacheGroupReconciler) updateCacheGroupWorker(ctx context.Context, oldWorker, expectWorker *corev1.Pod) error { - log := log.FromContext(ctx) - worker := corev1.Pod{} - if err := r.Get(ctx, client.ObjectKey{Namespace: oldWorker.Namespace, Name: oldWorker.Name}, &worker); err != nil && !apierrors.IsNotFound(err) { + log := log.FromContext(ctx).WithValues("worker", expectWorker.Name) + log.Info("worker spec changed, delete old and create new one") + if err := r.deleteCacheGroupWorker(ctx, oldWorker, true); err != nil { + log.Error(err, "failed to delete old worker") return err } - err := r.Delete(ctx, &worker) + log.Info("old worker deleted, created new one") + return r.Create(ctx, expectWorker) +} + +// deleteCacheGroupWorker deletes a cache group worker pod. If the `waiting` is true, +// it waits until the pod is confirmed to be deleted or a timeout occurs. +func (r *CacheGroupReconciler) deleteCacheGroupWorker(ctx context.Context, worker *corev1.Pod, waiting bool) error { + err := r.Delete(ctx, worker) if err != nil { - if !apierrors.IsNotFound(err) { - log.Error(err, "failed to delete old cache group worker", "worker", worker.Name) - return err - } - } - // wait for the old worker to be deleted - ctx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - for { - err := r.Get(ctx, client.ObjectKey{Namespace: oldWorker.Namespace, Name: oldWorker.Name}, &worker) if apierrors.IsNotFound(err) { - break + return nil } - if ctx.Err() == context.Canceled || ctx.Err() == context.DeadlineExceeded { - return fmt.Errorf("timeout waiting for old cache group worker to be deleted") + return err + } + if waiting { + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + for { + err := r.Get(ctx, client.ObjectKey{Namespace: worker.Namespace, Name: worker.Name}, worker) + if apierrors.IsNotFound(err) { + break + } + if ctx.Err() == context.Canceled || ctx.Err() == context.DeadlineExceeded { + return fmt.Errorf("timeout waiting for worker to be deleted") + } + time.Sleep(time.Second) } - time.Sleep(time.Second) } - log.Info("old cache group worker deleted, created new one", "worker", expectWorker.Name) - return r.Create(ctx, expectWorker) -} - -func (r *CacheGroupReconciler) deleteCacheGroupWorker(ctx context.Context, worker *corev1.Pod) error { - return client.IgnoreNotFound(r.Delete(ctx, worker)) + return nil } func (r *CacheGroupReconciler) listActualWorkers(ctx context.Context, cg *juicefsiov1.CacheGroup) ([]corev1.Pod, error) { @@ -296,35 +311,139 @@ func (r *CacheGroupReconciler) listActualWorkers(ctx context.Context, cg *juicef return workers.Items, nil } +// removeRedundantWorkers deletes the redundant workers +// if the worker still has cache blocks, tweak the group weight to zero, waiting for data redistribution, then delete it +// if the worker has no cache blocks, delete it directly +// if the worker not ready delete it directly func (r *CacheGroupReconciler) removeRedundantWorkers( ctx context.Context, + cg *juicefsiov1.CacheGroup, expectStates map[string]juicefsiov1.CacheGroupWorkerTemplate, actualWorks []corev1.Pod) error { log := log.FromContext(ctx) for _, worker := range actualWorks { - if _, ok := expectStates[worker.Spec.NodeName]; !ok { - log.Info("found redundant cache group worker, delete it", "worker", worker.Name) - if err := r.deleteCacheGroupWorker(ctx, &worker); err != nil { - log.Error(err, "failed to delete cache group worker", "worker", worker.Name) + if _, ok := expectStates[worker.Spec.NodeName]; ok { + continue + } + if worker.DeletionTimestamp != nil { + continue + } + + if cg.Status.ReadyWorker > 1 { + delete, err := r.gracefulShutdownWorker(ctx, cg, &worker) + if err != nil { + log.Error(err, "failed to graceful shutdown worker", "worker", worker) return err } + if !delete { + continue + } + } + + log.Info("found redundant worker, delete it", "worker", worker.Name) + if err := r.deleteCacheGroupWorker(ctx, &worker, false); err != nil { + log.Error(err, "failed to delete worker", "worker", worker.Name) + return err } } return nil } +// change pod options `group-weight` to zero, delete and recreate the worker pod +func (r *CacheGroupReconciler) gracefulShutdownWorker( + ctx context.Context, + cg *juicefsiov1.CacheGroup, + worker *corev1.Pod) (delete bool, err error) { + log := log.FromContext(ctx) + isReady := utils.IsPodReady(*worker) && utils.IsMountPointReady(ctx, *worker, common.MountPoint) + if !isReady { + if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { + return false, nil + } + return true, nil + } + + cacheBytes, err := utils.GetWorkerCacheBlocksBytes(ctx, *worker, common.MountPoint) + if err != nil { + log.Error(err, "failed to get worker cache blocks bytes", "worker", worker.Name) + return false, err + } + + if cacheBytes <= 0 { + log.V(1).Info("redundant worker has no cache blocks, delete it", "worker", worker.Name) + return true, nil + } + + if v, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { + waitingAt := utils.MustParseTime(v) + if time.Since(waitingAt) > utils.GetWaitingDeletedMaxDuration(cg.Spec.WaitingDeletedMaxDuration) { + log.Info("redundant worker still has cache blocks, waiting for data migration timeout, delete it", "worker", worker.Name) + return true, nil + } + // already set group-weight to 0 + return false, nil + } + log.V(1).Info("redundant worker has cache blocks, recreate to set group-weight to 0", "worker", worker.Name, "cacheBytes", cacheBytes) + if err := r.deleteCacheGroupWorker(ctx, worker, true); err != nil { + return false, err + } + builder.UpdateWorkerGroupWeight(worker, 0) + worker.ResourceVersion = "" + worker.Annotations[common.AnnoWaitingDeleteWorker] = time.Now().Format(time.RFC3339) + if err := r.Create(ctx, worker); err != nil { + return false, err + } + return false, err +} + +func (r *CacheGroupReconciler) shouldAddGroupBackupOrNot(cg *juicefsiov1.CacheGroup, actual *corev1.Pod, newImage string) bool { + if utils.CompareEEImageVersion(newImage, "5.1.0") < 0 { + return false + } + + // If it is a new node and there are already 1 or more worker nodes + // then this node should add group-backup. + if actual == nil { + return cg.Status.ReadyWorker >= 1 + } + // If this node has been added group-backup for x(default 10m) minutes + // then this node is a normal worker. + if v, ok := actual.Annotations[common.AnnoBackupWorker]; ok { + backupAt := utils.MustParseTime(v) + return time.Since(backupAt) < utils.GetBackupWorkerDuration(cg.Spec.BackupDuration) + } + return false +} + func (r *CacheGroupReconciler) calculateStatus( cg *juicefsiov1.CacheGroup, + fileSystem string, expectStates map[string]juicefsiov1.CacheGroupWorkerTemplate, actualWorks []corev1.Pod) juicefsiov1.CacheGroupStatus { newStatus := cg.Status + newStatus.FileSystem = fileSystem if len(expectStates) == 0 { newStatus.ReadyStr = "-" + newStatus.ReadyWorker = 0 + newStatus.ExpectWorker = 0 + newStatus.BackUpWorker = 0 newStatus.Phase = juicefsiov1.CacheGroupPhaseWaiting return newStatus } + backupWorker := 0 + waitingDeletedWorker := 0 + for _, worker := range actualWorks { + if _, ok := worker.Annotations[common.AnnoBackupWorker]; ok { + backupWorker++ + } + if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok { + waitingDeletedWorker++ + } + } newStatus.ReadyWorker = int32(len(actualWorks)) newStatus.ExpectWorker = int32(len(expectStates)) + newStatus.BackUpWorker = int32(backupWorker) + newStatus.WaitingDeletedWorker = int32(waitingDeletedWorker) newStatus.ReadyStr = fmt.Sprintf("%d/%d", newStatus.ReadyWorker, newStatus.ExpectWorker) if newStatus.ExpectWorker != newStatus.ReadyWorker { newStatus.Phase = juicefsiov1.CacheGroupPhaseProgressing @@ -353,7 +472,7 @@ func (r *CacheGroupReconciler) waitForWorkerReady(ctx context.Context, cg *juice return err } if utils.IsPodReady(worker) && utils.IsMountPointReady(ctx, worker, common.MountPoint) { - log.Info("cache group worker is ready", "worker", worker.Name) + log.Info("worker is ready", "worker", worker.Name) return nil } time.Sleep(time.Second) diff --git a/pkg/builder/pod.go b/pkg/builder/pod.go index d602d11..f6f5797 100644 --- a/pkg/builder/pod.go +++ b/pkg/builder/pod.go @@ -16,12 +16,17 @@ package builder import ( "context" + "fmt" + "regexp" "strings" + "time" juicefsiov1 "github.com/juicedata/juicefs-cache-group-operator/api/v1" "github.com/juicedata/juicefs-cache-group-operator/pkg/common" "github.com/juicedata/juicefs-cache-group-operator/pkg/utils" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -52,27 +57,30 @@ var ( ) type PodBuilder struct { - volName string - cg *juicefsiov1.CacheGroup - node string - spec juicefsiov1.CacheGroupWorkerTemplate - secretData map[string]string - initConfig string + volName string + cg *juicefsiov1.CacheGroup + node string + spec juicefsiov1.CacheGroupWorkerTemplate + secretData map[string]string + initConfig string + groupBackup bool + cacheDirsInContainer []string } -func NewPodBuilder(cg *juicefsiov1.CacheGroup, secret *corev1.Secret, node string, spec juicefsiov1.CacheGroupWorkerTemplate) *PodBuilder { +func NewPodBuilder(cg *juicefsiov1.CacheGroup, secret *corev1.Secret, node string, spec juicefsiov1.CacheGroupWorkerTemplate, groupBackup bool) *PodBuilder { secretData := utils.ParseSecret(secret) initconfig := "" if v, ok := secretData["initconfig"]; ok && v != "" { initconfig = v } return &PodBuilder{ - secretData: secretData, - volName: secretData["name"], - cg: cg, - node: node, - spec: spec, - initConfig: initconfig, + secretData: secretData, + volName: secretData["name"], + cg: cg, + node: node, + spec: spec, + initConfig: initconfig, + groupBackup: groupBackup, } } @@ -178,6 +186,58 @@ func (p *PodBuilder) genAuthCmds(ctx context.Context) []string { return authCmds } +func (p *PodBuilder) genCacheDirs() { + for i, dir := range p.spec.CacheDirs { + cachePathInContainer := fmt.Sprintf("%s%d", common.CacheDirVolumeMountPathPrefix, i) + volumeName := fmt.Sprintf("%s%d", common.CacheDirVolumeNamePrefix, i) + p.spec.VolumeMounts = append(p.spec.VolumeMounts, corev1.VolumeMount{ + Name: volumeName, + MountPath: cachePathInContainer, + }) + switch dir.Type { + case juicefsiov1.CacheDirTypeHostPath: + p.spec.Volumes = append(p.spec.Volumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: dir.Path, + Type: utils.ToPtr(corev1.HostPathDirectoryOrCreate), + }, + }, + }) + case juicefsiov1.CacheDirTypePVC: + p.spec.Volumes = append(p.spec.Volumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ + ClaimName: dir.Name, + }, + }, + }) + } + p.cacheDirsInContainer = append(p.cacheDirsInContainer, cachePathInContainer) + } + + if len(p.cacheDirsInContainer) == 0 { + cachePathInContainer := common.DefaultCacheHostPath + volumeName := fmt.Sprintf("%s%d", common.CacheDirVolumeNamePrefix, 0) + p.spec.Volumes = append(p.spec.Volumes, corev1.Volume{ + Name: volumeName, + VolumeSource: corev1.VolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: common.DefaultCacheHostPath, + Type: utils.ToPtr(corev1.HostPathDirectoryOrCreate), + }, + }, + }) + p.spec.VolumeMounts = append(p.spec.VolumeMounts, corev1.VolumeMount{ + Name: volumeName, + MountPath: cachePathInContainer, + }) + p.cacheDirsInContainer = append(p.cacheDirsInContainer, cachePathInContainer) + } +} + func (p *PodBuilder) genCommands(ctx context.Context) []string { authCmds := p.genAuthCmds(ctx) cacheGroup := GenCacheGroupName(p.cg) @@ -190,18 +250,18 @@ func (p *PodBuilder) genCommands(ctx context.Context) []string { opts := []string{ "foreground", + "no-update", "cache-group=" + cacheGroup, } - cacheDirs := []string{} parsedOpts := utils.ParseOptions(ctx, p.spec.Opts) for _, opt := range parsedOpts { if opt[0] == "cache-dir" { - if opt[1] == "" { - log.FromContext(ctx).Info("invalid cache-dir option", "option", opt) - continue - } - cacheDirs = strings.Split(opt[1], ":") + log.FromContext(ctx).Info("cache-dir option is not allowed, plz use cacheDirs instead") + continue + } + if utils.SliceContains(opts, opt[1]) { + log.FromContext(ctx).Info("option is duplicated, skip", "option", opt[0]) continue } if opt[1] != "" { @@ -210,11 +270,10 @@ func (p *PodBuilder) genCommands(ctx context.Context) []string { opts = append(opts, strings.TrimSpace(opt[0])) } } - - if len(cacheDirs) == 0 { - cacheDirs = append(cacheDirs, "/var/jfsCache") + opts = append(opts, "cache-dir="+strings.Join(p.cacheDirsInContainer, ":")) + if p.groupBackup { + opts = append(opts, "group-backup") } - opts = append(opts, "cache-dir="+strings.Join(cacheDirs, ":")) mountCmds = append(mountCmds, "-o", strings.Join(opts, ",")) cmds := []string{ "sh", @@ -252,6 +311,7 @@ func (p *PodBuilder) genInitConfigVolumes() { func (p *PodBuilder) NewCacheGroupWorker(ctx context.Context) *corev1.Pod { worker := newBasicPod(p.cg, p.node) p.genInitConfigVolumes() + p.genCacheDirs() spec := p.spec if spec.HostNetwork != nil { worker.Spec.HostNetwork = *spec.HostNetwork @@ -301,10 +361,20 @@ func (p *PodBuilder) NewCacheGroupWorker(ctx context.Context) *corev1.Pod { } worker.Spec.Containers[0].Env = p.genEnvs() worker.Spec.Containers[0].Command = p.genCommands(ctx) + + hash := utils.GenHash(worker) + + // The following fields do not participate in the hash calculation. + worker.Annotations[common.LabelWorkerHash] = hash + if p.groupBackup { + backupAt := time.Now().Format(time.RFC3339) + worker.Annotations[common.AnnoBackupWorker] = backupAt + } + return worker } -func MergeCacheGrouopWorkerTemplate(template *juicefsiov1.CacheGroupWorkerTemplate, overwrite juicefsiov1.CacheGroupWorkerOverwrite) { +func MergeCacheGroupWorkerTemplate(template *juicefsiov1.CacheGroupWorkerTemplate, overwrite juicefsiov1.CacheGroupWorkerOverwrite) { if overwrite.ServiceAccountName != "" { template.ServiceAccountName = overwrite.ServiceAccountName } @@ -356,4 +426,19 @@ func MergeCacheGrouopWorkerTemplate(template *juicefsiov1.CacheGroupWorkerTempla if overwrite.DNSPolicy != nil { template.DNSPolicy = overwrite.DNSPolicy } + if overwrite.CacheDirs != nil { + template.CacheDirs = overwrite.CacheDirs + } +} + +func UpdateWorkerGroupWeight(worker *corev1.Pod, weight int) { + cmd := worker.Spec.Containers[0].Command[2] + weightStr := fmt.Sprint(weight) + if strings.Contains(cmd, "group-weight") { + regex := regexp.MustCompile("group-weight=[0-9]+") + cmd = regex.ReplaceAllString(cmd, "group-weight="+weightStr) + } else { + cmd = cmd + ",group-weight=" + weightStr + } + worker.Spec.Containers[0].Command[2] = cmd } diff --git a/pkg/builder/pod_test.go b/pkg/builder/pod_test.go index 369148b..5f219f4 100644 --- a/pkg/builder/pod_test.go +++ b/pkg/builder/pod_test.go @@ -21,6 +21,7 @@ import ( juicefsiov1 "github.com/juicedata/juicefs-cache-group-operator/api/v1" "github.com/juicedata/juicefs-cache-group-operator/pkg/common" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -52,11 +53,11 @@ func TestPodBuilder_genCommands(t *testing.T) { "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY}\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,cache-dir=/var/jfsCache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,no-update,cache-group=default-test-cg,cache-dir=/var/jfsCache", }, }, { - name: "with cache-dir option", + name: "with cache-dir", podBuilder: &PodBuilder{ volName: "test-name", cg: &juicefsiov1.CacheGroup{ @@ -70,14 +71,19 @@ func TestPodBuilder_genCommands(t *testing.T) { "secret-key": "test-secret-key", }, spec: juicefsiov1.CacheGroupWorkerTemplate{ - Opts: []string{"cache-dir=/custom/cache"}, + CacheDirs: []juicefsiov1.CacheDir{ + { + Type: juicefsiov1.CacheDirTypeHostPath, + Path: "/custom/cache", + }, + }, }, }, expected: []string{ "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY}\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,cache-dir=/custom/cache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,no-update,cache-group=default-test-cg,cache-dir=/var/jfsCache-0", }, }, { @@ -95,14 +101,14 @@ func TestPodBuilder_genCommands(t *testing.T) { "secret-key": "test-secret-key", }, spec: juicefsiov1.CacheGroupWorkerTemplate{ - Opts: []string{"cache-dir=/custom/cache", "verbose"}, + Opts: []string{"a=b", "verbose"}, }, }, expected: []string{ "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY}\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,verbose,cache-dir=/custom/cache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,no-update,cache-group=default-test-cg,a=b,verbose,cache-dir=/var/jfsCache", }, }, { @@ -121,14 +127,14 @@ func TestPodBuilder_genCommands(t *testing.T) { "format-options": "format-options,format-options2", }, spec: juicefsiov1.CacheGroupWorkerTemplate{ - Opts: []string{"cache-dir=/custom/cache", "verbose"}, + Opts: []string{"verbose"}, }, }, expected: []string{ "sh", "-c", common.JuiceFSBinary + " auth test-name --token ${TOKEN} --secret-key ${SECRET_KEY} --format-options --format-options2\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,verbose,cache-dir=/custom/cache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,no-update,cache-group=default-test-cg,verbose,cache-dir=/var/jfsCache", }, }, { @@ -140,6 +146,13 @@ func TestPodBuilder_genCommands(t *testing.T) { Name: "test-cg", Namespace: "default", }, + Spec: juicefsiov1.CacheGroupSpec{ + SecretRef: &corev1.SecretEnvSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: "test-secret", + }, + }, + }, }, initConfig: "initconfig", secretData: map[string]string{ @@ -152,7 +165,7 @@ func TestPodBuilder_genCommands(t *testing.T) { "sh", "-c", "cp /etc/juicefs/test-name.conf /root/.juicefs\n" + - "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,cache-group=default-test-cg,cache-dir=/var/jfsCache", + "exec " + common.JuiceFsMountBinary + " test-name " + common.MountPoint + " -o foreground,no-update,cache-group=default-test-cg,cache-dir=/var/jfsCache", }, }, } @@ -160,6 +173,8 @@ func TestPodBuilder_genCommands(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { ctx := context.TODO() + tt.podBuilder.genInitConfigVolumes() + tt.podBuilder.genCacheDirs() got := tt.podBuilder.genCommands(ctx) if !reflect.DeepEqual(got, tt.expected) { t.Errorf("genCommands() = %v, want %v", got, tt.expected) @@ -167,3 +182,46 @@ func TestPodBuilder_genCommands(t *testing.T) { }) } } + +func TestUpdateWorkerGroupWeight(t *testing.T) { + tests := []struct { + name string + worker *corev1.Pod + weight int + expected string + }{ + { + name: "no group-weight option", + worker: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Command: []string{"sh", "-c", "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache"}, + }}, + }, + }, + weight: 10, + expected: "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=10", + }, + { + name: "with group-weight option", + worker: &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Command: []string{"sh", "-c", "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=10"}, + }}, + }, + }, + weight: 0, + expected: "cp /etc/juicefs/zxh-test-2.conf /root/.juicefs\nexec /sbin/mount.juicefs zxh-test-2 /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-cachegroup-sample,cache-dir=/var/jfsCache,group-weight=0", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + UpdateWorkerGroupWeight(tt.worker, tt.weight) + if tt.worker.Spec.Containers[0].Command[2] != tt.expected { + t.Errorf("UpdateWorkerGroupWeight() = %v, want %v", tt.worker.Spec.Containers[0].Command[2], tt.expected) + } + }) + } +} diff --git a/pkg/common/common.go b/pkg/common/common.go index b546d49..8aad8d9 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -16,6 +16,7 @@ package common import ( "fmt" + "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -31,9 +32,12 @@ const ( // Finalizer is the finalizer for CacheGroup Finalizer = "juicefs.io/finalizer" // juicefs binary path - JuiceFSBinary = "/usr/bin/juicefs" - JuiceFsMountBinary = "/sbin/mount.juicefs" - MountPoint = "/mnt/jfs" + JuiceFSBinary = "/usr/bin/juicefs" + JuiceFsMountBinary = "/sbin/mount.juicefs" + MountPoint = "/mnt/jfs" + CacheDirVolumeNamePrefix = "jfs-cache-dir-" + CacheDirVolumeMountPathPrefix = "/var/jfsCache-" + DefaultCacheHostPath = "/var/jfsCache" // label keys LabelCacheGroup = "juicefs.io/cache-group" @@ -42,6 +46,9 @@ const ( LabelWorkerValue = "juicefs-cache-group-worker" LabelAppType = "app.kubernetes.io/name" LabelJobValue = "juicefs-warmup-job" + + AnnoBackupWorker = "juicefs.io/backup-worker" + AnnoWaitingDeleteWorker = "juicefs.io/waiting-delete-worker" ) var ( @@ -55,6 +62,9 @@ var ( corev1.ResourceMemory: resource.MustParse("1Gi"), }, } + + DefaultBackupWorkerDuration = 10 * time.Minute + DefaultWaitingMaxDuration = 1 * time.Hour ) func GenWorkerName(cgName string, nodeName string) string { diff --git a/pkg/utils/pod.go b/pkg/utils/pod.go index e440c56..94a5b52 100644 --- a/pkg/utils/pod.go +++ b/pkg/utils/pod.go @@ -15,7 +15,6 @@ package utils import ( - "bytes" "context" "fmt" "strings" @@ -25,10 +24,6 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/remotecommand" - "k8s.io/kubectl/pkg/scheme" - ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/log" ) @@ -44,56 +39,23 @@ func IsPodReady(pod corev1.Pod) bool { // IsMountPointReady checks if the mount point is ready in the given pod func IsMountPointReady(ctx context.Context, pod corev1.Pod, mountPoint string) bool { - log := log.FromContext(ctx).WithName("checkMountPoint").WithValues("worker", pod.Name) - config := ctrl.GetConfigOrDie() + log := log.FromContext(ctx).WithValues("worker", pod.Name) + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() - - clientset, err := kubernetes.NewForConfig(config) - if err != nil { - log.Error(err, "failed to create Kubernetes client") - return false - } - cmd := []string{"sh", "-c", fmt.Sprintf("stat %s", mountPoint)} - req := clientset.CoreV1().RESTClient(). - Post(). - Resource("pods"). - Name(pod.Name). - Namespace(pod.Namespace). - SubResource("exec") - - req.VersionedParams(&corev1.PodExecOptions{ - Container: common.WorkerContainerName, - Command: cmd, - Stdin: false, - Stdout: true, - Stderr: true, - TTY: false, - }, scheme.ParameterCodec) - - exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) - if err != nil { - log.Error(err, "failed to create SPDY executor") - return false - } - var stdout, stderr bytes.Buffer - err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ - Stdin: nil, - Stdout: &stdout, - Stderr: &stderr, - Tty: false, - }) - if err != nil && stderr.Len() == 0 { + stdout, stderr, err := ExecInPod(ctx, pod.Namespace, pod.Name, common.WorkerContainerName, cmd) + + if err != nil && len(stderr) == 0 { log.Error(err, "failed to execute command") return false } - if stderr.Len() > 0 { - log.V(1).Info("mount point is not ready", "stderr", strings.Trim(stderr.String(), "\n")) + if len(stderr) > 0 { + log.V(1).Info("mount point is not ready", "stderr", strings.Trim(stderr, "\n")) return false } - if !strings.Contains(stdout.String(), "Inode: 1") { + if !strings.Contains(stdout, "Inode: 1") { log.V(1).Info("mount point is not ready") return false } @@ -101,6 +63,43 @@ func IsMountPointReady(ctx context.Context, pod corev1.Pod, mountPoint string) b return true } +func GetWorkerCacheBlocksBytes(ctx context.Context, pod corev1.Pod, mountPoint string) (int, error) { + if !IsPodReady(pod) || !IsMountPointReady(ctx, pod, mountPoint) { + return 0, fmt.Errorf("pod %s is not ready yet", pod.Name) + } + log := log.FromContext(ctx).WithName("getWorkerCacheBlocksBytes").WithValues("worker", pod.Name) + + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + cmd := []string{"sh", "-c", fmt.Sprintf("cat %s/.stats", mountPoint)} + + stdout, stderr, err := ExecInPod(ctx, pod.Namespace, pod.Name, common.WorkerContainerName, cmd) + + if err != nil && len(stderr) == 0 { + return 0, err + } + if len(stderr) > 0 { + log.Error(err, "failed to get cache blocks bytes", "stderr", strings.Trim(stderr, "\n")) + return 0, err + } + // parse stdout + // style like: "blockcache.bytes: 0\nblockcache.blocks: 0" + lines := strings.Split(stdout, "\n") + cacheBytes := 0 + + for _, line := range lines { + if strings.HasPrefix(line, "blockcache.bytes:") { + if _, err := fmt.Sscanf(line, "blockcache.bytes: %d", &cacheBytes); err != nil { + log.Error(err, "failed to parse cache bytes in stats file") + return 0, err + } + break + } + } + + return cacheBytes, nil +} + func ParseUpdateStrategy(strategy *appsv1.DaemonSetUpdateStrategy, total int) (appsv1.DaemonSetUpdateStrategyType, int) { if strategy == nil { return appsv1.RollingUpdateDaemonSetStrategyType, 1 diff --git a/pkg/utils/terminal.go b/pkg/utils/terminal.go new file mode 100644 index 0000000..6466af2 --- /dev/null +++ b/pkg/utils/terminal.go @@ -0,0 +1,72 @@ +// Copyright 2024 Juicedata Inc +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "bytes" + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/remotecommand" + "k8s.io/kubectl/pkg/scheme" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +func ExecInPod(ctx context.Context, namespace, name, container string, cmd []string) (string, string, error) { + log := log.FromContext(ctx).WithName("execInPod") + config := ctrl.GetConfigOrDie() + + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + log.Error(err, "failed to create Kubernetes client") + return "", "", err + } + + req := clientset.CoreV1().RESTClient(). + Post(). + Resource("pods"). + Name(name). + Namespace(namespace). + SubResource("exec") + + req.VersionedParams(&corev1.PodExecOptions{ + Container: container, + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(config, "POST", req.URL()) + if err != nil { + log.Error(err, "failed to create SPDY executor") + return "", "", err + } + + var stdout, stderr bytes.Buffer + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: nil, + Stdout: &stdout, + Stderr: &stderr, + Tty: false, + }) + if err != nil { + return stdout.String(), stderr.String(), err + } + return stdout.String(), stderr.String(), nil +} diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index aa56268..4d2ad21 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -18,6 +18,12 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" + "strconv" + "strings" + "time" + + "github.com/juicedata/juicefs-cache-group-operator/pkg/common" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) func ToPtr[T any](v T) *T { @@ -48,3 +54,60 @@ func GenHash(object interface{}) string { hash := sha256.Sum256(data) return hex.EncodeToString(hash[:]) } + +func MustParseTime(s string) time.Time { + t, err := time.Parse(time.RFC3339, s) + if err != nil { + panic(err) + } + return t +} + +// CompareImageVersion compares two image versions +// return 1 if image > target +// return -1 if image < target +// return 0 if image == target +func CompareEEImageVersion(image, target string) int { + current := strings.Split(image, ":")[1] + current = strings.ReplaceAll(current, "ee-", "") + if strings.Contains(current, "latest") || + strings.Contains(current, "nightly") || + strings.Contains(current, "dev") { + return 1 + } + + currentParts := strings.Split(current, ".") + targetParts := strings.Split(target, ".") + for i := 0; i < len(currentParts); i++ { + if i >= len(targetParts) { + return 1 + } + v1, _ := strconv.Atoi(currentParts[i]) + v2, _ := strconv.Atoi(targetParts[i]) + if v1 > v2 { + return 1 + } else if v1 < v2 { + return -1 + } + } + + if len(currentParts) < len(targetParts) { + return -1 + } + + return 0 +} + +func GetWaitingDeletedMaxDuration(d *metav1.Duration) time.Duration { + if d == nil { + return common.DefaultWaitingMaxDuration + } + return d.Duration +} + +func GetBackupWorkerDuration(d *metav1.Duration) time.Duration { + if d == nil { + return common.DefaultBackupWorkerDuration + } + return d.Duration +} diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go index 961f389..0319503 100644 --- a/pkg/utils/utils_test.go +++ b/pkg/utils/utils_test.go @@ -96,3 +96,51 @@ func TestContainsNodeSelector(t *testing.T) { }) } } + +func TestCompareImageVersion(t *testing.T) { + tests := []struct { + name string + current string + target string + want int + }{ + { + name: "Test with current greater than target", + current: "juicedata/mount:ee-1.2.3", + target: "1.2.2", + want: 1, + }, + { + name: "Test with current less than target", + current: "juicedata/mount:ee-1.2.2", + target: "1.2.3", + want: -1, + }, + { + name: "Test with current equal to target", + current: "juicedata/mount:ee-1.2.3", + target: "1.2.3", + want: 0, + }, + { + name: "Test with current having less parts than target", + current: "juicedata/mount:ee-1.2", + target: "1.2.3", + want: -1, + }, + { + name: "Test with specific version", + current: "juicedata/mount:ee-nightly", + target: "1.2.1", + want: 1, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := CompareEEImageVersion(tt.current, tt.target); got != tt.want { + t.Errorf("CompareImageVersion() = %v, want = %v", got, tt.want) + } + }) + } +} diff --git a/test/e2e/config/e2e-test-cachegroup.member_change.yaml b/test/e2e/config/e2e-test-cachegroup.member_change.yaml new file mode 100644 index 0000000..540abe7 --- /dev/null +++ b/test/e2e/config/e2e-test-cachegroup.member_change.yaml @@ -0,0 +1,29 @@ +apiVersion: juicefs.io/v1 +kind: CacheGroup +metadata: + name: e2e-test-cachegroup +spec: + secretRef: + name: juicefs-secret + updateStrategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: 1 + worker: + template: + nodeSelector: + juicefs.io/cg-worker: "true" + image: registry.cn-hangzhou.aliyuncs.com/juicedata/mount:ee-5.1.2-59d9736 + dnsPolicy: ClusterFirstWithHostNet + hostNetwork: true + tolerations: + - key: node-role.kubernetes.io/control-plane + operator: Exists + effect: NoSchedule + resources: + requests: + cpu: 100m + memory: 128Mi + limits: + cpu: 1 + memory: 1Gi \ No newline at end of file diff --git a/test/e2e/config/e2e-test-cachegroup.overwrite.yaml b/test/e2e/config/e2e-test-cachegroup.overwrite.yaml index d45158f..316da68 100644 --- a/test/e2e/config/e2e-test-cachegroup.overwrite.yaml +++ b/test/e2e/config/e2e-test-cachegroup.overwrite.yaml @@ -39,4 +39,7 @@ spec: memory: 2Gi opts: - free-space-ratio=0.01 - - group-weight=100 \ No newline at end of file + - group-weight=100 + cacheDirs: + - type: HostPath + path: /data/juicefs \ No newline at end of file diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go index 8ae812a..1502d64 100644 --- a/test/e2e/e2e_test.go +++ b/test/e2e/e2e_test.go @@ -155,6 +155,22 @@ var _ = Describe("controller", Ordered, func() { cmd = exec.Command("kubectl", "apply", "-f", "test/e2e/config/e2e-test-cachegroup.yaml", "-n", namespace) _, err = utils.Run(cmd) Expect(err).NotTo(HaveOccurred()) + + By("validating cg status is up to date") + verifyCgStatusUpToDate := func() error { + // Validate pod status + cmd := exec.Command("kubectl", "get", + "cachegroups.juicefs.io", cgName, "-o", "jsonpath={.status.phase}", + "-n", namespace, + ) + status, err := utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + if string(status) != "Waiting" { + return fmt.Errorf("cg expect Waiting status, but got %s", status) + } + return nil + } + Eventually(verifyCgStatusUpToDate, time.Minute, time.Second).Should(Succeed()) }) AfterEach(func() { @@ -247,18 +263,26 @@ var _ = Describe("controller", Ordered, func() { if err != nil { return fmt.Errorf("get worker pods failed, %+v", err) } - expectCmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,cache-dir=/var/jfsCache" + expectCmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,cache-dir=/var/jfsCache" nodes := corev1.PodList{} err = json.Unmarshal(result, &nodes) ExpectWithOffset(1, err).NotTo(HaveOccurred()) for _, node := range nodes.Items { + checkCmd := expectCmds + if node.Annotations[common.AnnoBackupWorker] != "" { + checkCmd = checkCmd + ",group-backup" + } ExpectWithOffset(1, node.Spec.HostNetwork).Should(BeTrue()) ExpectWithOffset(1, node.Spec.Containers[0].Image).Should(Equal(image)) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Cpu().String()).Should(Equal("100m")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Memory().String()).Should(Equal("128Mi")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("1")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("1Gi")) - ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", expectCmds})) + ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", checkCmd})) + ExpectWithOffset(1, node.Spec.Volumes[0].Name).Should(Equal("jfs-cache-dir-0")) + ExpectWithOffset(1, node.Spec.Volumes[0].HostPath.Path).Should(Equal("/var/jfsCache")) + ExpectWithOffset(1, node.Spec.Containers[0].VolumeMounts[0].Name).Should(Equal("jfs-cache-dir-0")) + ExpectWithOffset(1, node.Spec.Containers[0].VolumeMounts[0].MountPath).Should(Equal("/var/jfsCache")) } return nil } @@ -312,11 +336,12 @@ var _ = Describe("controller", Ordered, func() { cmd = exec.Command("kubectl", "wait", "pod/"+expectWorkerName, "--for", "condition=Ready", "--namespace", namespace, - "--timeout", "2m", + "--timeout", "1m", ) _, err = utils.Run(cmd) - ExpectWithOffset(1, err).NotTo(HaveOccurred()) - + if err != nil { + return fmt.Errorf("wait worker pod failed, %+v", err) + } return nil } Eventually(verifyWorkerCreated, time.Minute, time.Second).Should(Succeed()) @@ -391,30 +416,152 @@ var _ = Describe("controller", Ordered, func() { if err != nil { return fmt.Errorf("get worker pods failed, %+v", err) } - normalCmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.1,group-weight=200,cache-dir=/var/jfsCache" - worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.01,group-weight=100,cache-dir=/var/jfsCache" + normalCmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.1,group-weight=200,cache-dir=/var/jfsCache" + worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,free-space-ratio=0.01,group-weight=100,cache-dir=/var/jfsCache-0" nodes := corev1.PodList{} err = json.Unmarshal(result, &nodes) ExpectWithOffset(1, err).NotTo(HaveOccurred()) for _, node := range nodes.Items { + checkCmd := normalCmds + if node.Spec.NodeName == utils.GetKindNodeName("worker2") { + checkCmd = worker2Cmds + } + if node.Annotations[common.AnnoBackupWorker] != "" { + checkCmd = checkCmd + ",group-backup" + } ExpectWithOffset(1, node.Spec.HostNetwork).Should(BeTrue()) ExpectWithOffset(1, node.Spec.Containers[0].Image).Should(Equal(image)) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Cpu().String()).Should(Equal("100m")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Memory().String()).Should(Equal("128Mi")) + ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", checkCmd})) if node.Spec.NodeName == utils.GetKindNodeName("worker2") { - ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", worker2Cmds})) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("2")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("2Gi")) + ExpectWithOffset(1, node.Spec.Volumes[0].Name).Should(Equal("jfs-cache-dir-0")) + ExpectWithOffset(1, node.Spec.Volumes[0].HostPath.Path).Should(Equal("/data/juicefs")) + ExpectWithOffset(1, node.Spec.Containers[0].VolumeMounts[0].Name).Should(Equal("jfs-cache-dir-0")) + ExpectWithOffset(1, node.Spec.Containers[0].VolumeMounts[0].MountPath).Should(Equal("/var/jfsCache-0")) } else { ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("1")) ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("1Gi")) - ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", normalCmds})) } } return nil } Eventually(verifyWorkerSpec, time.Minute, time.Second).Should(Succeed()) }) + + It("should gracefully handle member change ", func() { + cmd := exec.Command("kubectl", "apply", "-f", "test/e2e/config/e2e-test-cachegroup.member_change.yaml", "-n", namespace) + _, err := utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + controlPlane := utils.GetKindNodeName("control-plane") + worker := utils.GetKindNodeName("worker") + worker2 := utils.GetKindNodeName("worker2") + + cmd = exec.Command("kubectl", "label", "nodes", controlPlane, worker, "juicefs.io/cg-worker=true", "--overwrite") + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + By("validating cg status is up to date") + verifyCgStatusUpToDate := func() error { + cmd = exec.Command("kubectl", "get", + "cachegroups.juicefs.io", cgName, "-o", "jsonpath={.status.readyWorker}", + "-n", namespace, + ) + readyWorker, err := utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + if string(readyWorker) != "2" { + return fmt.Errorf("cg expect has 2 readyWorker status, but got %s", readyWorker) + } + return nil + } + Eventually(verifyCgStatusUpToDate, 5*time.Minute, 3*time.Second).Should(Succeed()) + + cmd = exec.Command("kubectl", "label", "nodes", worker2, "juicefs.io/cg-worker=true", "--overwrite") + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + verifyCgStatusUpToDate = func() error { + cmd = exec.Command("kubectl", "get", + "cachegroups.juicefs.io", cgName, "-o", "jsonpath={.status.readyWorker}", + "-n", namespace, + ) + readyWorker, err := utils.Run(cmd) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + if string(readyWorker) != "3" { + return fmt.Errorf("cg expect has 3 readyWorker status, but got %s", readyWorker) + } + return nil + } + Eventually(verifyCgStatusUpToDate, 5*time.Minute, 3*time.Second).Should(Succeed()) + + By("validating new node should be as backup node") + verifyBackupWorkerSpec := func() error { + cmd := exec.Command("kubectl", "get", "pods", "-l", "juicefs.io/cache-group="+cgName, "-n", namespace, "-o", "json") + result, err := utils.Run(cmd) + if err != nil { + return fmt.Errorf("get worker pods failed, %+v", err) + } + worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,no-update,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,cache-dir=/var/jfsCache,group-backup" + nodes := corev1.PodList{} + err = json.Unmarshal(result, &nodes) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + for _, node := range nodes.Items { + if node.Spec.NodeName == worker2 { + ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", worker2Cmds})) + } + } + return nil + } + Eventually(verifyBackupWorkerSpec, time.Minute, time.Second).Should(Succeed()) + + verifyWarmupFile := func() error { + cmd = exec.Command("kubectl", "exec", "-i", "-n", + namespace, "-c", common.WorkerContainerName, common.GenWorkerName("e2e-test-cachegroup", worker), "--", + "sh", "-c", "echo 1 > /mnt/jfs/cache-group-test.txt") + _, err = utils.Run(cmd) + + // check block bytes + cmd = exec.Command("kubectl", "exec", "-i", "-n", + namespace, "-c", common.WorkerContainerName, common.GenWorkerName("e2e-test-cachegroup", worker), "--", + "sh", "-c", "cat /mnt/jfs/.stats | grep blockcache.bytes") + result, err := utils.Run(cmd) + if err != nil { + return fmt.Errorf("get block bytes failed, %+v", err) + } + var blockBytes int + _, err = fmt.Sscanf(strings.Trim(string(result), "\n"), "blockcache.bytes: %d", &blockBytes) + if err != nil { + return fmt.Errorf("failed to scan block bytes: %v", err) + } + if blockBytes == 0 { + return fmt.Errorf("block bytes is ") + } + return nil + } + Eventually(verifyWarmupFile, time.Minute, time.Second).Should(Succeed()) + + cmd = exec.Command("kubectl", "label", "nodes", worker, "juicefs.io/cg-worker-") + _, err = utils.Run(cmd) + Expect(err).NotTo(HaveOccurred()) + + verifyWorkerWeightToZero := func() error { + cmd := exec.Command("kubectl", "get", "pods", common.GenWorkerName(cgName, worker), "-n", namespace, "-o", "json") + result, err := utils.Run(cmd) + if err != nil { + return fmt.Errorf("get worker pods failed, %+v", err) + } + workerNode := corev1.Pod{} + err = json.Unmarshal(result, &workerNode) + ExpectWithOffset(1, err).NotTo(HaveOccurred()) + if !strings.Contains(workerNode.Spec.Containers[0].Command[2], "group-weight=0") { + return fmt.Errorf("worker weight not set to 0") + } + return nil + } + Eventually(verifyWorkerWeightToZero, time.Minute, time.Second).Should(Succeed()) + }) }) Context("WarmUp Controller", func() {