Skip to content

Commit

Permalink
traffic: Refactor continous logic
Browse files Browse the repository at this point in the history
Signed-off-by: yunbo <[email protected]>
  • Loading branch information
Funinu committed Jun 25, 2024
1 parent db761a9 commit e5d8110
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 17 deletions.
2 changes: 2 additions & 0 deletions api/v1beta1/rollout_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,8 @@ const (
FinalisingStepTypeCanaryService FinalisingStepType = "DeleteCanayService"
// Delete Batch Release
FinalisingStepTypeDeleteBR FinalisingStepType = "DeleteBatchRelease"
// Delete Canary Service
FinalisingStepTypeDeleteCanaryService FinalisingStepType = "DeleteCanaryService"
)

// +genclient
Expand Down
73 changes: 73 additions & 0 deletions pkg/controller/rollout/rollout_canary.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,79 @@ func (m *canaryReleaseManager) doCanaryFinalising(c *RolloutContext) (bool, erro
return true, nil
}

// do Canary Reset for continuous release, eg, v1->v2(not complete)->v3
func (m *canaryReleaseManager) doCanaryReset(c *RolloutContext) (bool, error) {
canaryStatus := c.NewStatus.CanaryStatus
// when CanaryStatus is nil, which means canary action hasn't started yet, don't need doing cleanup
if canaryStatus == nil {
return true, nil
}
// To ensure respect for grace time between finalising steps, we set start time before the first step
if len(canaryStatus.FinalisingStep) == 0 {
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
}
tr := newTrafficRoutingContext(c)
klog.Infof("rollout(%s/%s) Finalising Step is %s", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
switch canaryStatus.FinalisingStep {
default:
// start from FinalisingStepTypeGateway
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeGateway
fallthrough
// firstly, restore the gateway resources (ingress/gatewayAPI/Istio), that means
// only stable Service will accept the traffic
case v1beta1.FinalisingStepTypeGateway:
done, err := m.trafficRoutingManager.RestoreGateway(tr)
if err != nil || !done {
canaryStatus.LastUpdateTime = tr.LastUpdateTime
return done, err
}
if canaryStatus.LastUpdateTime != nil && canaryStatus.LastUpdateTime.Add(time.Second*time.Duration(3)).After(time.Now()) {
klog.Infof("rollout(%s/%s) in step (%s), and wait 3 seconds", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
} else {
klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteBR
}
// secondly, remove the batchRelease. For canary release, it means the immediate deletion of
// canary deployment, however, for partial style, the v2 pods won't be deleted right away
// in both cases, only the stable pods (v1) accept the traffic
case v1beta1.FinalisingStepTypeDeleteBR:
done, err := m.removeBatchRelease(c)
if err != nil {
klog.Errorf("rollout(%s/%s) Finalize batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
}
if canaryStatus.LastUpdateTime != nil && canaryStatus.LastUpdateTime.Add(time.Second*time.Duration(3)).After(time.Now()) {
klog.Infof("rollout(%s/%s) in step (%s), and wait 3 seconds", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
} else {
klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
canaryStatus.LastUpdateTime = &metav1.Time{Time: time.Now()}
canaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteCanaryService
}
// finally, remove the canary service. This step can swap with the last step.
/*
NOTE: we only remove the canary service, with stable service unchanged, that means the
stable service may still has the selector of stable pods, which is intended. Consider
this senario: continuous release v1->v2->3, and currently we are in step x, which expects
to route 0% traffic to v2 (or simply A/B test), if we release v3 in step x and remove the
stable service selector, then the traffic will route to both v1 and v2 before executing the
first step of v3 release.
*/
case v1beta1.FinalisingStepTypeDeleteCanaryService:
done, err := m.trafficRoutingManager.RemoveCanaryService(tr)
if err != nil || !done {
canaryStatus.LastUpdateTime = tr.LastUpdateTime
return done, err
}
klog.Infof("rollout(%s/%s) in step (%s), and success", c.Rollout.Namespace, c.Rollout.Name, canaryStatus.FinalisingStep)
return true, nil
}

return false, nil
}

func (m *canaryReleaseManager) removeRolloutProgressingAnnotation(c *RolloutContext) error {
if c.Workload == nil {
return nil
Expand Down
27 changes: 13 additions & 14 deletions pkg/controller/rollout/rollout_progressing.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,23 +409,22 @@ func isRollingBackInBatches(rollout *v1beta1.Rollout, workload *util.Workload) b
// 1. modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods
// 2. remove batchRelease CR.
func (r *RolloutReconciler) doProgressingReset(c *RolloutContext) (bool, error) {
if len(c.Rollout.Spec.Strategy.Canary.TrafficRoutings) > 0 {
// modify network api(ingress or gateway api) configuration, and route 100% traffic to stable pods
tr := newTrafficRoutingContext(c)
done, err := r.trafficRoutingManager.FinalisingTrafficRouting(tr, false)
c.NewStatus.CanaryStatus.LastUpdateTime = tr.LastUpdateTime
if err != nil || !done {
return done, err
}
}
done, err := r.canaryManager.removeBatchRelease(c)
releaseManager, err := r.getReleaseManager(c.Rollout)
if err != nil {
klog.Errorf("rollout(%s/%s) DoFinalising batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
}
return true, nil
// if no trafficRouting exists, simply remove batchRelease
if !c.Rollout.Spec.Strategy.HasTrafficRoutings() {
done, err := releaseManager.removeBatchRelease(c)
if err != nil {
klog.Errorf("rollout(%s/%s) DoFinalising batchRelease failed: %s", c.Rollout.Namespace, c.Rollout.Name, err.Error())
return false, err
} else if !done {
return false, nil
}
return true, nil
}
return releaseManager.doCanaryReset(c)
}

func (r *RolloutReconciler) recalculateCanaryStep(c *RolloutContext) (int32, error) {
Expand Down
68 changes: 65 additions & 3 deletions pkg/controller/rollout/rollout_progressing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func TestReconcileRolloutProgressing(t *testing.T) {
},
},
{
name: "ReconcileRolloutProgressing rolling -> continueRelease",
name: "ReconcileRolloutProgressing rolling -> continueRelease1",
getObj: func() ([]*apps.Deployment, []*apps.ReplicaSet) {
dep1 := deploymentDemo.DeepCopy()
dep1.Spec.Template.Spec.Containers[0].Image = "echoserver:v3"
Expand Down Expand Up @@ -578,10 +578,72 @@ func TestReconcileRolloutProgressing(t *testing.T) {
},
expectStatus: func() *v1beta1.RolloutStatus {
s := rolloutDemo.Status.DeepCopy()
s.CanaryStatus = nil
s.CanaryStatus.ObservedWorkloadGeneration = 2
s.CanaryStatus.RolloutHash = "f55bvd874d5f2fzvw46bv966x4bwbdv4wx6bd9f7b46ww788954b8z8w29b7wxfd"
s.CanaryStatus.StableRevision = "pod-template-hash-v1"
s.CanaryStatus.CanaryRevision = "6f8cc56547"
s.CanaryStatus.CurrentStepIndex = 3
s.CanaryStatus.CanaryReplicas = 5
s.CanaryStatus.CanaryReadyReplicas = 3
s.CanaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeGateway
s.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
s.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
cond := util.GetRolloutCondition(*s, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInitializing
cond.Reason = v1alpha1.ProgressingReasonInRolling
util.SetRolloutCondition(s, *cond)
s.CurrentStepIndex = s.CanaryStatus.CurrentStepIndex
s.CurrentStepState = s.CanaryStatus.CurrentStepState
return s
},
},
{
name: "ReconcileRolloutProgressing rolling -> continueRelease2",
getObj: func() ([]*apps.Deployment, []*apps.ReplicaSet) {
dep1 := deploymentDemo.DeepCopy()
dep1.Spec.Template.Spec.Containers[0].Image = "echoserver:v3"
dep2 := deploymentDemo.DeepCopy()
dep2.UID = "1ca4d850-9ec3-48bd-84cb-19f2e8cf4180"
dep2.Name = dep1.Name + "-canary"
dep2.Labels[util.CanaryDeploymentLabel] = dep1.Name
rs1 := rsDemo.DeepCopy()
rs2 := rsDemo.DeepCopy()
rs2.Name = "echoserver-canary-2"
rs2.OwnerReferences = []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "Deployment",
Name: dep2.Name,
UID: "1ca4d850-9ec3-48bd-84cb-19f2e8cf4180",
Controller: utilpointer.BoolPtr(true),
},
}
rs2.Labels["pod-template-hash"] = "pod-template-hash-v2"
rs2.Spec.Template.Spec.Containers[0].Image = "echoserver:v2"
return []*apps.Deployment{dep1, dep2}, []*apps.ReplicaSet{rs1, rs2}
},
getNetwork: func() ([]*corev1.Service, []*netv1.Ingress) {
return []*corev1.Service{demoService.DeepCopy()}, []*netv1.Ingress{demoIngress.DeepCopy()}
},
getRollout: func() (*v1beta1.Rollout, *v1beta1.BatchRelease, *v1alpha1.TrafficRouting) {
obj := rolloutDemo.DeepCopy()
obj.Status.CanaryStatus.ObservedWorkloadGeneration = 2
obj.Status.CanaryStatus.RolloutHash = "f55bvd874d5f2fzvw46bv966x4bwbdv4wx6bd9f7b46ww788954b8z8w29b7wxfd"
obj.Status.CanaryStatus.StableRevision = "pod-template-hash-v1"
obj.Status.CanaryStatus.CanaryRevision = "6f8cc56547"
obj.Status.CanaryStatus.CurrentStepIndex = 3
obj.Status.CanaryStatus.CanaryReplicas = 5
obj.Status.CanaryStatus.CanaryReadyReplicas = 3
obj.Status.CanaryStatus.PodTemplateHash = "pod-template-hash-v2"
obj.Status.CanaryStatus.CurrentStepState = v1beta1.CanaryStepStateUpgrade
obj.Status.CanaryStatus.FinalisingStep = v1beta1.FinalisingStepTypeDeleteCanaryService
cond := util.GetRolloutCondition(obj.Status, v1beta1.RolloutConditionProgressing)
cond.Reason = v1alpha1.ProgressingReasonInRolling
util.SetRolloutCondition(&obj.Status, *cond)
return obj, nil, nil
},
expectStatus: func() *v1beta1.RolloutStatus {
s := rolloutDemo.Status.DeepCopy()
s.Clear()
return s
},
},
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/rollout/rollout_releaseManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ type ReleaseManager interface {
doCanaryFinalising(c *RolloutContext) (bool, error)
fetchBatchRelease(ns, name string) (*v1beta1.BatchRelease, error)
removeBatchRelease(c *RolloutContext) (bool, error)
doCanaryReset(c *RolloutContext) (bool, error)
}
113 changes: 113 additions & 0 deletions pkg/trafficrouting/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,119 @@ func (m *Manager) FinalisingTrafficRouting(c *TrafficRoutingContext, onlyRestore
return true, nil
}

// RestoreGateway can be seen as route all traffic to stable service
func (m *Manager) RestoreGateway(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}

cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
trController, err := newNetworkProvider(m.Client, c, trafficRouting.Service, cServiceName)
if err != nil {
klog.Errorf("%s newTrafficRoutingController failed: %s", c.Key, err.Error())
return false, err
}
if err = trController.Finalise(context.TODO()); err != nil {
return false, err
}
return true, nil
}

// RemoveCanaryService find and delete canary Service. stable Service won't be modified
func (m *Manager) RemoveCanaryService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}

cServiceName := getCanaryServiceName(trafficRouting.Service, c.OnlyTrafficRouting, c.DisableGenerateCanaryService)
cService := &corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: c.Namespace, Name: cServiceName}}
// end to end deployment, don't remove the canary service;
// because canary service is stable service
if !c.OnlyTrafficRouting && !c.DisableGenerateCanaryService {
// remove canary service
err := m.Delete(context.TODO(), cService)
if err != nil && !errors.IsNotFound(err) {
klog.Errorf("%s remove canary service(%s) failed: %s", c.Key, cService.Name, err.Error())
return false, err
}
klog.Infof("%s remove canary service(%s) success", c.Key, cService.Name)
}

return true, nil
}

func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}
if c.OnlyTrafficRouting {
return true, nil
}

//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
if err != nil {
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
// not found, wait a moment, retry
if errors.IsNotFound(err) {
return false, nil
}
return false, err
}

if stableService.Spec.Selector[c.RevisionLabelKey] == c.StableRevision {
return true, nil
}

// patch stable service to only select the stable pods
body := fmt.Sprintf(`{"spec":{"selector":{"%s":"%s"}}}`, c.RevisionLabelKey, c.StableRevision)
if err = m.Patch(context.TODO(), stableService, client.RawPatch(types.StrategicMergePatchType, []byte(body))); err != nil {
klog.Errorf("%s patch stable service(%s) selector failed: %s", c.Key, stableService.Name, err.Error())
return false, err
}
klog.Infof("%s do something special: add stable service(%s) selector(%s=%s) success", c.Key, stableService.Name, c.RevisionLabelKey, c.StableRevision)
return true, nil
}

func (m *Manager) RestoreStableService(c *TrafficRoutingContext) (bool, error) {
if len(c.ObjectRef) == 0 {
return true, nil
}
trafficRouting := c.ObjectRef[0]
if trafficRouting.GracePeriodSeconds <= 0 {
trafficRouting.GracePeriodSeconds = defaultGracePeriodSeconds
}
//fetch stable service
stableService := &corev1.Service{}
err := m.Get(context.TODO(), client.ObjectKey{Namespace: c.Namespace, Name: trafficRouting.Service}, stableService)
if err != nil {
if errors.IsNotFound(err) {
return true, nil
}
klog.Errorf("%s get stable service(%s) failed: %s", c.Key, trafficRouting.Service, err.Error())
return false, err
}
// restore stable Service
verify, err := m.restoreStableService(c)
if err != nil || !verify {
return false, err
}
return true, nil
}

func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, cService string) (network.NetworkProvider, error) {
trafficRouting := con.ObjectRef[0]
if trafficRouting.CustomNetworkRefs != nil {
Expand Down

0 comments on commit e5d8110

Please sign in to comment.