Skip to content

Commit

Permalink
update runCanary traffic step for special cases
Browse files Browse the repository at this point in the history
Signed-off-by: yunbo <[email protected]>
  • Loading branch information
Funinu committed Jul 19, 2024
1 parent e7652cb commit 7e1281a
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 34 deletions.
22 changes: 22 additions & 0 deletions api/v1beta1/rollout_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ limitations under the License.
package v1beta1

import (
"reflect"

apps "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
Expand Down Expand Up @@ -92,6 +95,25 @@ func (r *RolloutStrategy) GetRollingStyle() RollingStyleType {
return PartitionRollingStyle
}

// using single field EnableExtraWorkloadForCanary to distinguish partition-style from canary-style
// is not enough, for example, a v1alaph1 Rollout can be converted to v1beta1 Rollout
// with EnableExtraWorkloadForCanary set as true, even the objectRef is cloneset (which doesn't support canary release)
func IsRealPartition(rollout *Rollout) bool {
if rollout.Spec.Strategy.IsEmptyRelease() {
return false
}
estimation := rollout.Spec.Strategy.GetRollingStyle()
if estimation == BlueGreenRollingStyle {
return false
}
targetRef := rollout.Spec.WorkloadRef
if targetRef.APIVersion == apps.SchemeGroupVersion.String() && targetRef.Kind == reflect.TypeOf(apps.Deployment{}).Name() &&
estimation == CanaryRollingStyle {
return false
}
return true
}

// r.GetRollingStyle() == BlueGreenRollingStyle
func (r *RolloutStrategy) IsBlueGreenRelease() bool {
return r.GetRollingStyle() == BlueGreenRollingStyle
Expand Down
82 changes: 78 additions & 4 deletions pkg/controller/rollout/rollout_canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -75,6 +76,7 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
klog.Infof("rollout(%s/%s) canary step jumped", c.Rollout.Namespace, c.Rollout.Name)
return nil
}
gracePeriodSeconds := util.GracePeriodSecondsOrDefault(c.Rollout.Spec.Strategy.GetTrafficRouting(), defaultGracePeriodSeconds)
// When the first batch is trafficRouting rolling and the next steps are rolling release,
// We need to clean up the canary-related resources first and then rollout the rest of the batch.
currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1]
Expand All @@ -86,17 +88,76 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
return err
} else if !done {
klog.Infof("rollout(%s/%s) cleaning up canary-related resources", c.Rollout.Namespace, c.Rollout.Name)
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime
return nil
}
}
switch canaryStatus.CurrentStepState {
// before CanaryStepStateUpgrade, handle some special cases, to prevent traffic loss
case v1beta1.CanaryStepStateInit:
// placeholder for the later traffic modification Pull Request
canaryStatus.NextStepIndex = util.NextBatchIndex(c.Rollout, canaryStatus.CurrentStepIndex)
klog.Infof("rollout(%s/%s) run canary strategy, and state(%s)", c.Rollout.Namespace, c.Rollout.Name, v1beta1.CanaryStepStateInit)
tr := newTrafficRoutingContext(c)
if currentStep.Traffic == nil && len(currentStep.Matches) == 0 {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState)
return nil
}

/*
The following check serves to bypass the bug in ingress-nginx controller https://github.com/kubernetes/ingress-nginx/issues/9635
For partition-style: if the expected replicas of the current rollout step is not less than workload.spec.replicas,
it indicates that this step will release all stable pods to new version, ie. there will be no stable pods, which will
trigger the bug.
To avoid this issue, we restore stable Service before scaling the stable pods down to zero.
This ensures that the backends behind the stable ingress remain active, preventing the bug from being triggered.
*/
expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true)
if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) {
klog.Infof("special case detected: rollout(%s/%s) restore stable Service", c.Rollout.Namespace, c.Rollout.Name)
done, err := m.trafficRoutingManager.RestoreStableService(tr)
if err != nil {
return err
} else if !done {
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime
return nil
}
}

/*
The following check is used to solve scenario like this:
steps:
- replicas: 1 # first batch
matches:
- headers:
- name: user-agent
type: Exact
value: pc
in the first batch, pods with new version will be created in step CanaryStepStateUpgrade, once ready,
they will serve as active backends behind the stable service, because the stable service hasn't been
modified by rollout (ie. it selects pods of all versions).
Thus, requests with or without the header (user-agent: pc) will be routed to pods of all versions evenly, before
we arrive the CanaryStepStateTrafficRouting step.
To avoid this issue, we patch selector to stable Service before CanaryStepStateUpgrade step.
*/
if canaryStatus.CurrentStepIndex == 1 {
klog.Infof("special case detected: rollout(%s/%s) patch stable Service", c.Rollout.Namespace, c.Rollout.Name)
done, err := m.trafficRoutingManager.PatchStableService(tr)
if err != nil {
return err
} else if !done {
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime
return nil
}
}

canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateInit, canaryStatus.CurrentStepState)
fallthrough

case v1beta1.CanaryStepStateUpgrade:
Expand All @@ -106,6 +167,13 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
return err
} else if done {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting
// To correspond with the above explanation wrt. the mentioned bug https://github.com/kubernetes/ingress-nginx/issues/9635
// we likewise do this check again to skip the CanaryStepStateTrafficRouting step, since
// it has been done in the CanaryStepInit step
expectedReplicas, _ := intstr.GetScaledValueFromIntOrPercent(currentStep.Replicas, int(c.Workload.Replicas), true)
if expectedReplicas >= int(c.Workload.Replicas) && v1beta1.IsRealPartition(c.Rollout) {
canaryStatus.CurrentStepState = v1beta1.CanaryStepStateMetricsAnalysis
}
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateUpgrade, canaryStatus.CurrentStepState)
Expand All @@ -124,7 +192,7 @@ func (m *canaryReleaseManager) runCanary(c *RolloutContext) error {
klog.Infof("rollout(%s/%s) step(%d) state from(%s) -> to(%s)", c.Rollout.Namespace, c.Rollout.Name,
canaryStatus.CurrentStepIndex, v1beta1.CanaryStepStateTrafficRouting, canaryStatus.CurrentStepState)
}
expectedTime := time.Now().Add(time.Duration(defaultGracePeriodSeconds) * time.Second)
expectedTime := time.Now().Add(time.Duration(gracePeriodSeconds) * time.Second)
c.RecheckTime = &expectedTime

case v1beta1.CanaryStepStateMetricsAnalysis:
Expand Down Expand Up @@ -217,6 +285,12 @@ func (m *canaryReleaseManager) doCanaryPaused(c *RolloutContext) (bool, error) {
canaryStatus := c.NewStatus.CanaryStatus
currentStep := c.Rollout.Spec.Strategy.Canary.Steps[canaryStatus.CurrentStepIndex-1]
steps := len(c.Rollout.Spec.Strategy.Canary.Steps)
// If it is the last step, and 100% of pods, then return true
if int32(steps) == canaryStatus.CurrentStepIndex {
if currentStep.Replicas != nil && currentStep.Replicas.StrVal == "100%" {
return true, nil
}
}
cond := util.GetRolloutCondition(*c.NewStatus, v1beta1.RolloutConditionProgressing)
// need manual confirmation
if currentStep.Pause.Duration == nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/rollout/rollout_canary_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func TestRunCanary(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 1
obj.Status.CanaryStatus.NextStepIndex = 2
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand All @@ -76,6 +77,7 @@ func TestRunCanary(t *testing.T) {
s.CanaryStatus.StableRevision = "pod-template-hash-v1"
s.CanaryStatus.CanaryRevision = "6f8cc56547"
s.CanaryStatus.CurrentStepIndex = 1
s.CanaryStatus.NextStepIndex = 2
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -139,6 +141,7 @@ func TestRunCanary(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 1
obj.Status.CanaryStatus.NextStepIndex = 2
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -185,6 +188,7 @@ func TestRunCanary(t *testing.T) {
s.CanaryStatus.CanaryReplicas = 1
s.CanaryStatus.CanaryReadyReplicas = 1
s.CanaryStatus.CurrentStepIndex = 1
s.CanaryStatus.NextStepIndex = 2
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateTrafficRouting
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
Expand Down Expand Up @@ -290,6 +294,7 @@ func TestRunCanaryPaused(t *testing.T) {
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 3
obj.Status.CanaryStatus.NextStepIndex = 4
obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused
return obj
Expand All @@ -301,6 +306,7 @@ func TestRunCanaryPaused(t *testing.T) {
obj.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.CanaryStatus.CurrentStepIndex = 3
obj.CanaryStatus.NextStepIndex = 4
obj.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStatePaused
return obj
Expand Down
53 changes: 26 additions & 27 deletions pkg/trafficrouting/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
if c.LastUpdateTime != nil {
// wait seconds for network providers to consume the modification about workload, service and so on.
if verifyTime := c.LastUpdateTime.Add(time.Second * time.Duration(trafficRouting.GracePeriodSeconds)); verifyTime.After(time.Now()) {
klog.Infof("%s update workload or service selector, and wait 3 seconds", c.Key)
klog.Infof("%s update workload or service selector, and wait %d seconds", c.Key, trafficRouting.GracePeriodSeconds)
return false, nil
}
}
Expand All @@ -139,6 +139,7 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
klog.Warningf("%s stableRevision or podTemplateHash can not be empty, and wait a moment", c.Key)
return false, nil
}
serviceModified := false
// fetch canary service
err = m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: canaryServiceName}, canaryService)
if err != nil && !errors.IsNotFound(err) {
Expand All @@ -149,21 +150,19 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
if err != nil {
return false, err
}
}

serviceModified := false
// patch canary service to only select the canary pods
if canaryService.Spec.Selector[c.RevisionLabelKey] != c.CanaryRevision {
serviceModified = true
} else if canaryService.Spec.Selector[c.RevisionLabelKey] != c.CanaryRevision {
// patch canary service to only select the canary pods
body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.CanaryRevision)
if err = m.Patch(context.TODO(), canaryService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
klog.Errorf("%s patch canary service(%s) selector failed: %s", c.Key, canaryService.Name, err.Error())
return false, err
}
serviceModified = true
// update canary service time, and wait 3 seconds, just to be safe
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("%s patch canary service(%s) selector(%s=%s) success",
c.Key, canaryService.Name, c.RevisionLabelKey, c.CanaryRevision)
serviceModified = true
}
// patch stable service to only select the stable pods
if stableService.Spec.Selector[c.RevisionLabelKey] != c.StableRevision {
Expand All @@ -181,6 +180,13 @@ func (m *Manager) DoTrafficRouting(c *TrafficRoutingContext) (bool, error) {
if serviceModified {
return false, nil
}
} else if c.DisableGenerateCanaryService {
// if DisableGenerateCanaryService is on, selector is not needed, we should remove it
// it's necessary because selector probably has been patched in CanaryStepStateInit step within runCanary function
verify, err := m.restoreStableService(c)
if err != nil || !verify {
return false, err
}
}

// new network provider, ingress or gateway
Expand Down Expand Up @@ -217,9 +223,16 @@ func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext, onlyRestore
}

cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}}
// In rollout failure case, no canary-service will be created, but stable service may be patched
// if and only if both of following conditions are met, we can just return
// 1. canary service has been already cleaned up
// 2. stable service has no selector
stableService := &corev1.Service{}
err = m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
stableServiceRestored := errors.IsNotFound(err) || (err == nil && stableService.Spec.Selector[c.RevisionLabelKey] == "")
// if canary svc has been already cleaned up, just return
// even DisableGenerateCanaryService is true, canary svc still exists, because canary service is stable service
if err = m.Get(context.TODO(), client.ObjectKeyFromObject(cService), cService); err != nil {
if err = m.Get(context.TODO(), client.ObjectKeyFromObject(cService), cService); err != nil && stableServiceRestored {
if !errors.IsNotFound(err) {
klog.Errorf("%s get canary service(%s) failed: %s", c.Key, cServiceName, err.Error())
return false, err
Expand Down Expand Up @@ -281,10 +294,6 @@ func (m *Manager) RestoreGateway(c *TrafficRoutingContext) error {
return nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}

cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
trController, err := newNetworkProvider(m.Client, c, trafficRouting.Service, cServiceName)
if err != nil {
Expand All @@ -300,10 +309,6 @@ func (m *Manager) RemoveCanaryService(c *TrafficRoutingContext) error {
return nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}

cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}}
// end to end deployment scenario OR disableGenerateCanaryService is true, don't remove the canary service;
Expand All @@ -327,14 +332,11 @@ func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}
if c.OnlyTrafficRouting {
return true, nil
}

gracePeriodSeconds := util.GracePeriodSecondsOrDefault(c.ObjectRef, defaultGracePeriodSeconds)
trafficRouting := c.ObjectRef[0]
//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
Expand All @@ -351,8 +353,8 @@ func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) {
if c.LastUpdateTime == nil {
return true, nil
}
if time.Since(c.LastUpdateTime.Time) < time.Second*time.Duration(trafficRouting.GracePeriodSeconds) {
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success, but we need wait %d seconds", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision, trafficRouting.GracePeriodSeconds)
if time.Since(c.LastUpdateTime.Time) < time.Second*time.Duration(gracePeriodSeconds) {
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success, but we need wait %d seconds", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision, gracePeriodSeconds)
return false, nil
}
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success and complete", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision)
Expand All @@ -366,7 +368,7 @@ func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) {
return false, err
}
c.LastUpdateTime = &metav1.Time{Time: time.Now()}
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success, but we need wait %d seconds", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision, trafficRouting.GracePeriodSeconds)
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success, but we need wait %d seconds", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision, gracePeriodSeconds)
return false, nil
}

Expand All @@ -377,9 +379,6 @@ func (m *Manager) RestoreStableService(c *TrafficRoutingContext) (bool, error) {
return true, nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}
//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
Expand Down
Loading

0 comments on commit 7e1281a

Please sign in to comment.