From ebb61fb9196546edc52e87ae290d681454f01985 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Fri, 2 Feb 2024 12:42:26 +0800 Subject: [PATCH] Fix race condition in agent Traceflow controller (#5954) It may happen that a Traceflow is assigned with a tag that was just released from an old Traceflow but the controller hasn't processed the deletion event of the old Traceflow yet. Previously the controller skipped starting new Traceflow if the tag was already being used, which caused the Traceflow to timeout. The commit adds a check when determining whether it should start a Traceflow. If the tag is associated with another Traceflow, it will clean it up then start a new trace for the current one. It also fixes a bug in cleanupTraceflow, which might uninstall flows for another Traceflow if the tag is reassigned. Signed-off-by: Quan Tian --- .../traceflow/traceflow_controller.go | 43 +++++----- .../traceflow/traceflow_controller_test.go | 85 ++++++++++++++++++- 2 files changed, 102 insertions(+), 26 deletions(-) diff --git a/pkg/agent/controller/traceflow/traceflow_controller.go b/pkg/agent/controller/traceflow/traceflow_controller.go index 49e913457d2..2835ac1efbb 100644 --- a/pkg/agent/controller/traceflow/traceflow_controller.go +++ b/pkg/agent/controller/traceflow/traceflow_controller.go @@ -70,7 +70,9 @@ const ( ) type traceflowState struct { - name string + name string + // Used to uniquely identify Traceflow. + uid types.UID tag int8 liveTraffic bool droppedOnly bool @@ -268,10 +270,17 @@ func (c *Controller) syncTraceflow(traceflowName string) error { if tf.Status.DataplaneTag != 0 { start := false c.runningTraceflowsMutex.Lock() - if _, ok := c.runningTraceflows[tf.Status.DataplaneTag]; !ok { + tfState, ok := c.runningTraceflows[tf.Status.DataplaneTag] + c.runningTraceflowsMutex.Unlock() + // This may happen if a Traceflow is assigned with a tag that was just released from an old Traceflow but + // the agent hasn't processed the deletion event of the old Traceflow yet. + if ok && tfState.uid != tf.UID { + klog.V(2).InfoS("Found a stale Traceflow associated with the dataplane tag, cleaning it up", "tag", tf.Status.DataplaneTag, "currentTraceflow", traceflowName, "staleTraceflow", tfState.name) + c.cleanupTraceflow(tfState.name) + start = true + } else if !ok { start = true } - c.runningTraceflowsMutex.Unlock() if start { err = c.startTraceflow(tf) } @@ -336,7 +345,7 @@ func (c *Controller) startTraceflow(tf *crdv1beta1.Traceflow) error { // Store Traceflow to cache. c.runningTraceflowsMutex.Lock() tfState := traceflowState{ - name: tf.Name, tag: tf.Status.DataplaneTag, + uid: tf.UID, name: tf.Name, tag: tf.Status.DataplaneTag, liveTraffic: liveTraffic, droppedOnly: tf.Spec.DroppedOnly && liveTraffic, receiverOnly: receiverOnly, isSender: isSender} c.runningTraceflows[tfState.tag] = &tfState @@ -570,29 +579,19 @@ func (c *Controller) errorTraceflowCRD(tf *crdv1beta1.Traceflow, reason string) return c.crdClient.CrdV1beta1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status") } -// Delete Traceflow from cache. -func (c *Controller) deleteTraceflowState(tfName string) *traceflowState { +// Delete Traceflow state and OVS flows. +func (c *Controller) cleanupTraceflow(tfName string) { c.runningTraceflowsMutex.Lock() defer c.runningTraceflowsMutex.Unlock() - // Controller could have deallocated the tag and cleared the DataplaneTag - // field in the Traceflow Status, so try looking up the tag from the - // cache by Traceflow name. for tag, tfState := range c.runningTraceflows { if tfName == tfState.name { + // This must be executed before deleting the tag from runningTraceflows, otherwise it may uninstall another + // Traceflow's flows if the tag is reassigned. + if err := c.ofClient.UninstallTraceflowFlows(uint8(tag)); err != nil { + klog.ErrorS(err, "Failed to uninstall Traceflow flows", "Traceflow", tfName, "state", tfState) + } delete(c.runningTraceflows, tag) - return tfState - } - } - return nil -} - -// Delete Traceflow state and OVS flows. -func (c *Controller) cleanupTraceflow(tfName string) { - tfState := c.deleteTraceflowState(tfName) - if tfState != nil { - err := c.ofClient.UninstallTraceflowFlows(uint8(tfState.tag)) - if err != nil { - klog.Errorf("Failed to uninstall Traceflow %s flows: %v", tfName, err) + break } } } diff --git a/pkg/agent/controller/traceflow/traceflow_controller_test.go b/pkg/agent/controller/traceflow/traceflow_controller_test.go index 3acd7e1adb2..f82afe0132e 100644 --- a/pkg/agent/controller/traceflow/traceflow_controller_test.go +++ b/pkg/agent/controller/traceflow/traceflow_controller_test.go @@ -676,7 +676,8 @@ func TestSyncTraceflow(t *testing.T) { tcs := []struct { name string tf *crdv1beta1.Traceflow - tfState *traceflowState + existingState *traceflowState + newState *traceflowState expectedCalls func(mockOFClient *openflowtest.MockClient) }{ { @@ -698,11 +699,83 @@ func TestSyncTraceflow(t *testing.T) { DataplaneTag: 1, }, }, - tfState: &traceflowState{ + existingState: &traceflowState{ name: "tf1", + uid: "uid1", + tag: 1, + }, + newState: &traceflowState{ + name: "tf1", + uid: "uid1", tag: 1, }, }, + { + name: "traceflow in running phase with empty state", + tf: &crdv1beta1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"}, + Spec: crdv1beta1.TraceflowSpec{ + Source: crdv1beta1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1beta1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + }, + Status: crdv1beta1.TraceflowStatus{ + Phase: crdv1beta1.Running, + DataplaneTag: 1, + }, + }, + newState: &traceflowState{ + name: "tf1", + uid: "uid1", + tag: 1, + isSender: true, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().InstallTraceflowFlows(uint8(1), false, false, false, nil, uint32(1), uint16(20)) + mockOFClient.EXPECT().SendTraceflowPacket(uint8(1), gomock.Any(), ofPortPod1, int32(-1)) + }, + }, + { + name: "traceflow in running phase with conflict state", + tf: &crdv1beta1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"}, + Spec: crdv1beta1.TraceflowSpec{ + Source: crdv1beta1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1beta1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + }, + Status: crdv1beta1.TraceflowStatus{ + Phase: crdv1beta1.Running, + DataplaneTag: 1, + }, + }, + existingState: &traceflowState{ + name: "tf1", + uid: "uid2", + tag: 1, + }, + newState: &traceflowState{ + name: "tf1", + uid: "uid1", + tag: 1, + isSender: true, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().UninstallTraceflowFlows(uint8(1)) + mockOFClient.EXPECT().InstallTraceflowFlows(uint8(1), false, false, false, nil, uint32(1), uint16(20)) + mockOFClient.EXPECT().SendTraceflowPacket(uint8(1), gomock.Any(), ofPortPod1, int32(-1)) + }, + }, { name: "traceflow in failed phase", tf: &crdv1beta1.Traceflow{ @@ -722,7 +795,7 @@ func TestSyncTraceflow(t *testing.T) { DataplaneTag: 1, }, }, - tfState: &traceflowState{ + existingState: &traceflowState{ name: "tf1", tag: 1, }, @@ -740,13 +813,17 @@ func TestSyncTraceflow(t *testing.T) { tfc.crdInformerFactory.Start(stopCh) tfc.crdInformerFactory.WaitForCacheSync(stopCh) - tfc.runningTraceflows[tt.tf.Status.DataplaneTag] = tt.tfState + if tt.existingState != nil { + tfc.runningTraceflows[tt.tf.Status.DataplaneTag] = tt.existingState + } + if tt.expectedCalls != nil { tt.expectedCalls(tfc.mockOFClient) } err := tfc.syncTraceflow(tt.tf.Name) require.NoError(t, err) + assert.Equal(t, tt.newState, tfc.runningTraceflows[tt.tf.Status.DataplaneTag]) }) } }