diff --git a/pkg/agent/controller/traceflow/traceflow_controller.go b/pkg/agent/controller/traceflow/traceflow_controller.go index 49e913457d2..2835ac1efbb 100644 --- a/pkg/agent/controller/traceflow/traceflow_controller.go +++ b/pkg/agent/controller/traceflow/traceflow_controller.go @@ -70,7 +70,9 @@ const ( ) type traceflowState struct { - name string + name string + // Used to uniquely identify Traceflow. + uid types.UID tag int8 liveTraffic bool droppedOnly bool @@ -268,10 +270,17 @@ func (c *Controller) syncTraceflow(traceflowName string) error { if tf.Status.DataplaneTag != 0 { start := false c.runningTraceflowsMutex.Lock() - if _, ok := c.runningTraceflows[tf.Status.DataplaneTag]; !ok { + tfState, ok := c.runningTraceflows[tf.Status.DataplaneTag] + c.runningTraceflowsMutex.Unlock() + // This may happen if a Traceflow is assigned with a tag that was just released from an old Traceflow but + // the agent hasn't processed the deletion event of the old Traceflow yet. + if ok && tfState.uid != tf.UID { + klog.V(2).InfoS("Found a stale Traceflow associated with the dataplane tag, cleaning it up", "tag", tf.Status.DataplaneTag, "currentTraceflow", traceflowName, "staleTraceflow", tfState.name) + c.cleanupTraceflow(tfState.name) + start = true + } else if !ok { start = true } - c.runningTraceflowsMutex.Unlock() if start { err = c.startTraceflow(tf) } @@ -336,7 +345,7 @@ func (c *Controller) startTraceflow(tf *crdv1beta1.Traceflow) error { // Store Traceflow to cache. c.runningTraceflowsMutex.Lock() tfState := traceflowState{ - name: tf.Name, tag: tf.Status.DataplaneTag, + uid: tf.UID, name: tf.Name, tag: tf.Status.DataplaneTag, liveTraffic: liveTraffic, droppedOnly: tf.Spec.DroppedOnly && liveTraffic, receiverOnly: receiverOnly, isSender: isSender} c.runningTraceflows[tfState.tag] = &tfState @@ -570,29 +579,19 @@ func (c *Controller) errorTraceflowCRD(tf *crdv1beta1.Traceflow, reason string) return c.crdClient.CrdV1beta1().Traceflows().Patch(context.TODO(), tf.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status") } -// Delete Traceflow from cache. -func (c *Controller) deleteTraceflowState(tfName string) *traceflowState { +// Delete Traceflow state and OVS flows. +func (c *Controller) cleanupTraceflow(tfName string) { c.runningTraceflowsMutex.Lock() defer c.runningTraceflowsMutex.Unlock() - // Controller could have deallocated the tag and cleared the DataplaneTag - // field in the Traceflow Status, so try looking up the tag from the - // cache by Traceflow name. for tag, tfState := range c.runningTraceflows { if tfName == tfState.name { + // This must be executed before deleting the tag from runningTraceflows, otherwise it may uninstall another + // Traceflow's flows if the tag is reassigned. + if err := c.ofClient.UninstallTraceflowFlows(uint8(tag)); err != nil { + klog.ErrorS(err, "Failed to uninstall Traceflow flows", "Traceflow", tfName, "state", tfState) + } delete(c.runningTraceflows, tag) - return tfState - } - } - return nil -} - -// Delete Traceflow state and OVS flows. -func (c *Controller) cleanupTraceflow(tfName string) { - tfState := c.deleteTraceflowState(tfName) - if tfState != nil { - err := c.ofClient.UninstallTraceflowFlows(uint8(tfState.tag)) - if err != nil { - klog.Errorf("Failed to uninstall Traceflow %s flows: %v", tfName, err) + break } } } diff --git a/pkg/agent/controller/traceflow/traceflow_controller_test.go b/pkg/agent/controller/traceflow/traceflow_controller_test.go index 3acd7e1adb2..f82afe0132e 100644 --- a/pkg/agent/controller/traceflow/traceflow_controller_test.go +++ b/pkg/agent/controller/traceflow/traceflow_controller_test.go @@ -676,7 +676,8 @@ func TestSyncTraceflow(t *testing.T) { tcs := []struct { name string tf *crdv1beta1.Traceflow - tfState *traceflowState + existingState *traceflowState + newState *traceflowState expectedCalls func(mockOFClient *openflowtest.MockClient) }{ { @@ -698,11 +699,83 @@ func TestSyncTraceflow(t *testing.T) { DataplaneTag: 1, }, }, - tfState: &traceflowState{ + existingState: &traceflowState{ name: "tf1", + uid: "uid1", + tag: 1, + }, + newState: &traceflowState{ + name: "tf1", + uid: "uid1", tag: 1, }, }, + { + name: "traceflow in running phase with empty state", + tf: &crdv1beta1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"}, + Spec: crdv1beta1.TraceflowSpec{ + Source: crdv1beta1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1beta1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + }, + Status: crdv1beta1.TraceflowStatus{ + Phase: crdv1beta1.Running, + DataplaneTag: 1, + }, + }, + newState: &traceflowState{ + name: "tf1", + uid: "uid1", + tag: 1, + isSender: true, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().InstallTraceflowFlows(uint8(1), false, false, false, nil, uint32(1), uint16(20)) + mockOFClient.EXPECT().SendTraceflowPacket(uint8(1), gomock.Any(), ofPortPod1, int32(-1)) + }, + }, + { + name: "traceflow in running phase with conflict state", + tf: &crdv1beta1.Traceflow{ + ObjectMeta: metav1.ObjectMeta{Name: "tf1", UID: "uid1"}, + Spec: crdv1beta1.TraceflowSpec{ + Source: crdv1beta1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1beta1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + }, + Status: crdv1beta1.TraceflowStatus{ + Phase: crdv1beta1.Running, + DataplaneTag: 1, + }, + }, + existingState: &traceflowState{ + name: "tf1", + uid: "uid2", + tag: 1, + }, + newState: &traceflowState{ + name: "tf1", + uid: "uid1", + tag: 1, + isSender: true, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().UninstallTraceflowFlows(uint8(1)) + mockOFClient.EXPECT().InstallTraceflowFlows(uint8(1), false, false, false, nil, uint32(1), uint16(20)) + mockOFClient.EXPECT().SendTraceflowPacket(uint8(1), gomock.Any(), ofPortPod1, int32(-1)) + }, + }, { name: "traceflow in failed phase", tf: &crdv1beta1.Traceflow{ @@ -722,7 +795,7 @@ func TestSyncTraceflow(t *testing.T) { DataplaneTag: 1, }, }, - tfState: &traceflowState{ + existingState: &traceflowState{ name: "tf1", tag: 1, }, @@ -740,13 +813,17 @@ func TestSyncTraceflow(t *testing.T) { tfc.crdInformerFactory.Start(stopCh) tfc.crdInformerFactory.WaitForCacheSync(stopCh) - tfc.runningTraceflows[tt.tf.Status.DataplaneTag] = tt.tfState + if tt.existingState != nil { + tfc.runningTraceflows[tt.tf.Status.DataplaneTag] = tt.existingState + } + if tt.expectedCalls != nil { tt.expectedCalls(tfc.mockOFClient) } err := tfc.syncTraceflow(tt.tf.Name) require.NoError(t, err) + assert.Equal(t, tt.newState, tfc.runningTraceflows[tt.tf.Status.DataplaneTag]) }) } }