Skip to content

Commit

Permalink
[ci skip]: tweak
Browse files Browse the repository at this point in the history
Signed-off-by: Xuhui zhang <[email protected]>
  • Loading branch information
zxh326 committed Nov 20, 2024
1 parent 849701c commit e74bb58
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 30 deletions.
4 changes: 2 additions & 2 deletions config/manager/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
images:
- name: controller
newName: registry.zzde.me/juicefs-cache-group-operator
newTag: v0.2.5
newName: controller
newTag: latest
75 changes: 51 additions & 24 deletions internal/controller/cachegroup_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr
numUnavailable++
go func() {
defer wg.Done()
if backUpWorker {
log.V(1).Info("new worker added, as backup worker", "worker", expectWorker.Name)
}
if err := r.createOrUpdateWorker(ctx, actualState, expectWorker); err != nil {
log.Error(err, "failed to create or update worker", "worker", expectWorker.Name)
errCh <- err
Expand Down Expand Up @@ -184,7 +187,7 @@ func (r *CacheGroupReconciler) sync(ctx context.Context, cg *juicefsiov1.CacheGr
log.Error(err, "failed to list actual worker nodes")
return err
}
if err := r.removeRedundantWorkers(ctx, expectStates, actualWorks); err != nil {
if err := r.removeRedundantWorkers(ctx, cg, expectStates, actualWorks); err != nil {
log.Error(err, "failed to remove redundant")
return err
}
Expand Down Expand Up @@ -314,52 +317,74 @@ func (r *CacheGroupReconciler) listActualWorkers(ctx context.Context, cg *juicef
// if the worker not ready delete it directly
func (r *CacheGroupReconciler) removeRedundantWorkers(
ctx context.Context,
cg *juicefsiov1.CacheGroup,
expectStates map[string]juicefsiov1.CacheGroupWorkerTemplate,
actualWorks []corev1.Pod) error {
log := log.FromContext(ctx)
for _, worker := range actualWorks {
if _, ok := expectStates[worker.Spec.NodeName]; ok {
continue
}
if worker.DeletionTimestamp != nil {
continue
}
if _, ok := expectStates[worker.Spec.NodeName]; !ok {
// if the worker in delete state,
cacheBytes, err := utils.GetWorkerCacheBlocksBytes(ctx, worker, common.MountPoint)

if cg.Status.ReadyWorker > 1 {
delete, err := r.gracefulShutdownWorker(ctx, &worker)
if err != nil {
log.Error(err, "failed to get worker cache blocks bytes", "worker", worker.Name)
continue
log.Error(err, "failed to graceful shutdown worker", "worker", worker)
return err
}
if cacheBytes > 0 {
log.Info("found redundant worker, but it still has cache blocks, tweak the group weight to zero", "worker", worker.Name)
if err := r.gracefulShutdownWorker(ctx, &worker); err != nil {
log.Error(err, "failed to graceful shutdown worker", "worker", worker)
}
} else {
log.Info("found redundant worker, delete it", "worker", worker.Name)
if err := r.deleteCacheGroupWorker(ctx, &worker, false); err != nil {
log.Error(err, "failed to delete worker", "worker", worker.Name)
return err
}
if !delete {
continue
}
}

log.Info("found redundant worker, delete it", "worker", worker.Name)
if err := r.deleteCacheGroupWorker(ctx, &worker, false); err != nil {
log.Error(err, "failed to delete worker", "worker", worker.Name)
return err
}
}
return nil
}

// change pod options `group-weight` to zero, delete and recreate the worker pod
func (r *CacheGroupReconciler) gracefulShutdownWorker(ctx context.Context, worker *corev1.Pod) error {
func (r *CacheGroupReconciler) gracefulShutdownWorker(ctx context.Context, worker *corev1.Pod) (delete bool, err error) {
log := log.FromContext(ctx)
isReady := utils.IsPodReady(*worker) && utils.IsMountPointReady(ctx, *worker, common.MountPoint)
if !isReady {
if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok {
return false, nil
}
return true, nil
}

cacheBytes, err := utils.GetWorkerCacheBlocksBytes(ctx, *worker, common.MountPoint)
if err != nil {
log.Error(err, "failed to get worker cache blocks bytes", "worker", worker.Name)
return false, err
}

if cacheBytes <= 0 {
log.V(1).Info("redundant worker has no cache blocks, delete it", "worker", worker.Name)
return true, nil
}

if _, ok := worker.Annotations[common.AnnoWaitingDeleteWorker]; ok {
return nil
return false, nil
}
log.V(1).Info("redundant worker has cache blocks, recreate to set group-weight to 0", "worker", worker.Name, "cacheBytes", cacheBytes)
if err := r.deleteCacheGroupWorker(ctx, worker, true); err != nil {
return err
return false, err
}
builder.UpdateWorkerGroupWeight(worker, 0)
worker.ResourceVersion = ""
worker.Annotations[common.AnnoWaitingDeleteWorker] = time.Now().Format(time.RFC3339)
if err := r.Create(ctx, worker); err != nil {
return err
return false, err
}
return nil
return false, err
}

func (r *CacheGroupReconciler) asBackupWorkerOrNot(cg *juicefsiov1.CacheGroup, actual *corev1.Pod) bool {
Expand All @@ -372,8 +397,7 @@ func (r *CacheGroupReconciler) asBackupWorkerOrNot(cg *juicefsiov1.CacheGroup, a
// then this node is a normal worker.
if v, ok := actual.Annotations[common.AnnoBackupWorker]; ok {
backupAt := utils.MustParseTime(v)
// TODO: 10 minutes should be configurable
return time.Since(backupAt) < 10*time.Minute
return time.Since(backupAt) < common.BackupWorkerDuration
}
return false
}
Expand All @@ -385,6 +409,9 @@ func (r *CacheGroupReconciler) calculateStatus(
newStatus := cg.Status
if len(expectStates) == 0 {
newStatus.ReadyStr = "-"
newStatus.ReadyWorker = 0
newStatus.ExpectWorker = 0
newStatus.BackUpWorker = 0
newStatus.Phase = juicefsiov1.CacheGroupPhaseWaiting
return newStatus
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package common

import (
"fmt"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -58,6 +59,8 @@ var (
corev1.ResourceMemory: resource.MustParse("1Gi"),
},
}

BackupWorkerDuration = 10 * time.Minute
)

func GenWorkerName(cgName string, nodeName string) string {
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func IsPodReady(pod corev1.Pod) bool {

// IsMountPointReady checks if the mount point is ready in the given pod
func IsMountPointReady(ctx context.Context, pod corev1.Pod, mountPoint string) bool {
log := log.FromContext(ctx).WithName("checkMountPoint").WithValues("worker", pod.Name)
log := log.FromContext(ctx).WithValues("worker", pod.Name)

ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
Expand Down
29 changes: 29 additions & 0 deletions test/e2e/config/e2e-test-cachegroup.member_change.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: juicefs.io/v1
kind: CacheGroup
metadata:
name: e2e-test-cachegroup
spec:
secretRef:
name: juicefs-secret
updateStrategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
worker:
template:
nodeSelector:
juicefs.io/cg-worker: "true"
image: registry.cn-hangzhou.aliyuncs.com/juicedata/mount:ee-5.1.2-59d9736
dnsPolicy: ClusterFirstWithHostNet
hostNetwork: true
tolerations:
- key: node-role.kubernetes.io/control-plane
operator: Exists
effect: NoSchedule
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 1
memory: 1Gi
127 changes: 124 additions & 3 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,22 @@ var _ = Describe("controller", Ordered, func() {
cmd = exec.Command("kubectl", "apply", "-f", "test/e2e/config/e2e-test-cachegroup.yaml", "-n", namespace)
_, err = utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())

By("validating cg status is up to date")
verifyCgStatusUpToDate := func() error {
// Validate pod status
cmd := exec.Command("kubectl", "get",
"cachegroups.juicefs.io", cgName, "-o", "jsonpath={.status.phase}",
"-n", namespace,
)
status, err := utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())
if string(status) != "Waiting" {
return fmt.Errorf("cg expect Waiting status, but got %s", status)
}
return nil
}
Eventually(verifyCgStatusUpToDate, time.Minute, time.Second).Should(Succeed())
})

AfterEach(func() {
Expand Down Expand Up @@ -252,13 +268,17 @@ var _ = Describe("controller", Ordered, func() {
err = json.Unmarshal(result, &nodes)
ExpectWithOffset(1, err).NotTo(HaveOccurred())
for _, node := range nodes.Items {
checkCmd := expectCmds
if node.Annotations[common.AnnoBackupWorker] != "" {
checkCmd = checkCmd + ",group-backup"
}
ExpectWithOffset(1, node.Spec.HostNetwork).Should(BeTrue())
ExpectWithOffset(1, node.Spec.Containers[0].Image).Should(Equal(image))
ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Cpu().String()).Should(Equal("100m"))
ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Memory().String()).Should(Equal("128Mi"))
ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("1"))
ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("1Gi"))
ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", expectCmds}))
ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", checkCmd}))
}
return nil
}
Expand Down Expand Up @@ -397,24 +417,125 @@ var _ = Describe("controller", Ordered, func() {
err = json.Unmarshal(result, &nodes)
ExpectWithOffset(1, err).NotTo(HaveOccurred())
for _, node := range nodes.Items {
checkCmd := normalCmds
if node.Spec.NodeName == utils.GetKindNodeName("worker2") {
checkCmd = worker2Cmds
}
if node.Annotations[common.AnnoBackupWorker] != "" {
checkCmd = checkCmd + ",group-backup"
}
ExpectWithOffset(1, node.Spec.HostNetwork).Should(BeTrue())
ExpectWithOffset(1, node.Spec.Containers[0].Image).Should(Equal(image))
ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Cpu().String()).Should(Equal("100m"))
ExpectWithOffset(1, node.Spec.Containers[0].Resources.Requests.Memory().String()).Should(Equal("128Mi"))
ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", checkCmd}))
if node.Spec.NodeName == utils.GetKindNodeName("worker2") {
ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", worker2Cmds}))
ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("2"))
ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("2Gi"))
} else {
ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Cpu().String()).Should(Equal("1"))
ExpectWithOffset(1, node.Spec.Containers[0].Resources.Limits.Memory().String()).Should(Equal("1Gi"))
ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", normalCmds}))
}
}
return nil
}
Eventually(verifyWorkerSpec, time.Minute, time.Second).Should(Succeed())
})

It("should gracefully handle member change ", func() {
cmd := exec.Command("kubectl", "apply", "-f", "test/e2e/config/e2e-test-cachegroup.member_change.yaml", "-n", namespace)
_, err := utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())

controlPlane := utils.GetKindNodeName("control-plane")
worker := utils.GetKindNodeName("worker")
worker2 := utils.GetKindNodeName("worker2")

cmd = exec.Command("kubectl", "label", "nodes", controlPlane, worker, "juicefs.io/cg-worker=true", "--overwrite")
_, err = utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())

By("validating cg status is up to date")
verifyCgStatusUpToDate := func() error {
cmd = exec.Command("kubectl", "get",
"cachegroups.juicefs.io", cgName, "-o", "jsonpath={.status.readyWorker}",
"-n", namespace,
)
readyWorker, err := utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())
if string(readyWorker) != "2" {
return fmt.Errorf("cg expect has 2 readyWorker status, but got %s", readyWorker)
}
return nil
}
Eventually(verifyCgStatusUpToDate, 5*time.Minute, 3*time.Second).Should(Succeed())

cmd = exec.Command("kubectl", "label", "nodes", worker2, "juicefs.io/cg-worker=true", "--overwrite")
_, err = utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())
verifyCgStatusUpToDate = func() error {
cmd = exec.Command("kubectl", "get",
"cachegroups.juicefs.io", cgName, "-o", "jsonpath={.status.readyWorker}",
"-n", namespace,
)
readyWorker, err := utils.Run(cmd)
ExpectWithOffset(1, err).NotTo(HaveOccurred())
if string(readyWorker) != "3" {
return fmt.Errorf("cg expect has 3 readyWorker status, but got %s", readyWorker)
}
return nil
}
Eventually(verifyCgStatusUpToDate, 5*time.Minute, 3*time.Second).Should(Succeed())

By("validating new node should be as backup node")
verifyBackupWorkerSpec := func() error {
cmd := exec.Command("kubectl", "get", "pods", "-l", "juicefs.io/cache-group="+cgName, "-n", namespace, "-o", "json")
result, err := utils.Run(cmd)
if err != nil {
return fmt.Errorf("get worker pods failed, %+v", err)
}
worker2Cmds := "/usr/bin/juicefs auth csi-ci --token ${TOKEN} --access-key minioadmin --bucket http://test-bucket.minio.default.svc.cluster.local:9000 --secret-key ${SECRET_KEY}\nexec /sbin/mount.juicefs csi-ci /mnt/jfs -o foreground,cache-group=juicefs-cache-group-operator-system-e2e-test-cachegroup,cache-dir=/var/jfsCache,group-backup"
nodes := corev1.PodList{}
err = json.Unmarshal(result, &nodes)
ExpectWithOffset(1, err).NotTo(HaveOccurred())
for _, node := range nodes.Items {
if node.Spec.NodeName == worker2 {
ExpectWithOffset(1, node.Spec.Containers[0].Command).Should(Equal([]string{"sh", "-c", worker2Cmds}))
}
}
return nil
}
Eventually(verifyBackupWorkerSpec, time.Minute, time.Second).Should(Succeed())

verifyWarmupFile := func() error {
cmd = exec.Command("kubectl", "exec", "-i", "-n",
namespace, "-c", common.WorkerContainerName, common.GenWorkerName("e2e-test-cachegroup", worker), "--",
"sh", "-c", "echo 1 > /mnt/jfs/cache-group-test.txt")
_, err = utils.Run(cmd)
return err
}
Eventually(verifyWarmupFile, time.Minute, time.Second).Should(Succeed())

cmd = exec.Command("kubectl", "label", "nodes", worker, "juicefs.io/cg-worker-")
_, err = utils.Run(cmd)
Expect(err).NotTo(HaveOccurred())

verifyWorkerWeightToZero := func() error {
cmd := exec.Command("kubectl", "get", "pods", common.GenWorkerName(cgName, worker), "-n", namespace, "-o", "json")
result, err := utils.Run(cmd)
if err != nil {
return fmt.Errorf("get worker pods failed, %+v", err)
}
workerNode := corev1.Pod{}
err = json.Unmarshal(result, &workerNode)
ExpectWithOffset(1, err).NotTo(HaveOccurred())
if !strings.Contains(workerNode.Spec.Containers[0].Command[2], "group-weight=0") {
return fmt.Errorf("worker weight not set to 0")
}
return nil
}
Eventually(verifyWorkerWeightToZero, time.Minute, time.Second).Should(Succeed())
})
})

Context("WarmUp Controller", func() {
Expand Down

0 comments on commit e74bb58

Please sign in to comment.