Skip to content

Commit

Permalink
Fix downgrade 1.15 to 1.13 scenario with 0 scheduler pods
Browse files Browse the repository at this point in the history
Signed-off-by: Anton Troshin <[email protected]>
  • Loading branch information
antontroshin committed Feb 27, 2025
1 parent 68e5a09 commit 4e1bede
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 12 deletions.
34 changes: 24 additions & 10 deletions pkg/kubernetes/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ func Upgrade(conf UpgradeConfig) error {

// wait for the deletion of the scheduler pods to finish
if downgradeDeletionChan != nil {
<-downgradeDeletionChan
select {
case <-downgradeDeletionChan:
case <-time.After(3 * time.Minute):
return errors.New("timed out waiting for downgrade deletion")
}
}

if dashboardChart != nil {
Expand All @@ -242,11 +246,6 @@ func Upgrade(conf UpgradeConfig) error {
}

func deleteSchedulerPods(namespace string, currentVersion *semver.Version, targetVersion *semver.Version) error {
_, client, err := GetKubeConfigClient()
if err != nil {
return err
}

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

Expand All @@ -259,13 +258,22 @@ func deleteSchedulerPods(namespace string, currentVersion *semver.Version, targe
if foundTargetVersion {
break
}
pods, err = client.CoreV1().Pods(namespace).List(ctxWithTimeout, meta_v1.ListOptions{
k8sClient, err := Client()
if err != nil {
return err
}

pods, err = k8sClient.CoreV1().Pods(namespace).List(ctxWithTimeout, meta_v1.ListOptions{
LabelSelector: "app=dapr-scheduler-server",
})
if err != nil {
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}

if len(pods.Items) == 0 {
return nil
}

for _, pod := range pods.Items {
pv, ok := pod.Labels["app.kubernetes.io/version"]
if ok {
Expand All @@ -276,19 +284,25 @@ func deleteSchedulerPods(namespace string, currentVersion *semver.Version, targe
}
}
}
time.Sleep(time.Second)
time.Sleep(5 * time.Second)
}

if pods == nil {
return errors.New("no scheduler pods found")
}

// get a fresh client to ensure we have the latest state of the cluster
k8sClient, err := Client()
if err != nil {
return err
}

// delete scheduler pods of the current version, i.e. >1.15.0
for _, pod := range pods.Items {
if pv, ok := pod.Labels["app.kubernetes.io/version"]; ok {
podVersion, err := semver.NewVersion(pv)
if err == nil && podVersion.Equal(currentVersion) {
err = client.CoreV1().Pods(namespace).Delete(ctxWithTimeout, pod.Name, meta_v1.DeleteOptions{})
err = k8sClient.CoreV1().Pods(namespace).Delete(ctxWithTimeout, pod.Name, meta_v1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete pod %s during downgrade: %w", pod.Name, err)
}
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1193,7 +1193,7 @@ func waitPodDeletion(t *testing.T, done, podsDeleted chan struct{}) {
if len(list.Items) == 0 {
podsDeleted <- struct{}{}
}
time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
}
}

Expand Down Expand Up @@ -1239,7 +1239,7 @@ func waitAllPodsRunning(t *testing.T, namespace string, haEnabled bool, done, po
podsRunning <- struct{}{}
}

time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
}
}

Expand Down

0 comments on commit 4e1bede

Please sign in to comment.