From 8b18060446c79621e26115e6d61fc4fdaac76876 Mon Sep 17 00:00:00 2001 From: Hang Yan Date: Tue, 8 Aug 2023 16:42:51 +0800 Subject: [PATCH] update Signed-off-by: Hang Yan --- pkg/agent/openflow/client.go | 24 +-- pkg/agent/openflow/client_test.go | 15 +- pkg/agent/openflow/framework.go | 1 + pkg/agent/openflow/pipeline.go | 116 ++++++------ pkg/agent/openflow/testing/mock_openflow.go | 8 +- pkg/controller/traceflow/controller.go | 197 +++++++++++++++----- pkg/controller/traceflow/controller_test.go | 2 +- 7 files changed, 223 insertions(+), 140 deletions(-) diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index c341ecdeebd..465eb396a7a 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -26,7 +26,6 @@ import ( "k8s.io/klog/v2" "antrea.io/antrea/pkg/agent/config" - "antrea.io/antrea/pkg/agent/metrics" "antrea.io/antrea/pkg/agent/openflow/cookie" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" @@ -205,7 +204,7 @@ type Client interface { SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet, inPort uint32, outPort int32) error // InstallTraceflowFlows installs flows for a Traceflow request. - InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error + InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, sampling, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error // UninstallTraceflowFlows uninstalls flows for a Traceflow request. UninstallTraceflowFlows(dataplaneTag uint8) error @@ -1118,13 +1117,14 @@ func (c *client) SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet, return c.bridge.SendPacketOut(packetOutObj) } -func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error { +func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, sampling, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error { cacheKey := fmt.Sprintf("%x", dataplaneTag) var flows []binding.Flow for _, f := range c.traceableFeatures { flows = append(flows, f.flowsToTrace(dataplaneTag, c.ovsMetersAreSupported, liveTraffic, + sampling, droppedOnly, receiverOnly, packet, @@ -1551,21 +1551,3 @@ func getFlowModMessage(flow binding.Flow, op binding.OFOperation) *openflow15.Fl messages := GetFlowModMessages([]binding.Flow{flow}, op) return messages[0] } - -// getMeterStats sends a multipart request to get all the meter statistics and -// sets values for antrea_agent_ovs_meter_packet_dropped_count. -func (c *client) getMeterStats() { - handleMeterStatsReply := func(meterID int, packetCount int64) { - switch meterID { - case PacketInMeterIDNP: - metrics.OVSMeterPacketDroppedCount.WithLabelValues("PacketInMeterNetworkPolicy").Set(float64(packetCount)) - case PacketInMeterIDTF: - metrics.OVSMeterPacketDroppedCount.WithLabelValues("PacketInMeterTraceflow").Set(float64(packetCount)) - default: - klog.V(4).InfoS("Received unexpected meterID", "meterID", meterID) - } - } - if err := c.bridge.GetMeterStats(handleMeterStatsReply); err != nil { - klog.ErrorS(err, "Failed to get OVS meter stats") - } -} diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index bde15a4c63f..14163415339 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -361,7 +361,7 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations, fn(o) } - client := NewClient(bridgeName, + cli := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), o.enableProxy, @@ -375,8 +375,8 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations, o.enableMulticast, o.enableTrafficControl, o.enableMulticluster, - NewGroupAllocator(), - false) + NewGroupAllocator()) + client := cli.(*client) var egressExceptCIDRs []net.IPNet var serviceIPv4CIDR, serviceIPv6CIDR *net.IPNet @@ -1585,7 +1585,7 @@ func Test_client_InstallTraceflowFlows(t *testing.T) { t.Run(tt.name, func(t *testing.T) { ctrl := gomock.NewController(t) c := tt.prepareFunc(ctrl) - if err := c.InstallTraceflowFlows(tt.args.dataplaneTag, false, false, false, nil, 0, 300); (err != nil) != tt.wantErr { + if err := c.InstallTraceflowFlows(tt.args.dataplaneTag, false, false, false, false, nil, 0, 300); (err != nil) != tt.wantErr { t.Errorf("InstallTraceflowFlows() error = %v, wantErr %v", err, tt.wantErr) } }) @@ -1815,15 +1815,16 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) { } func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, nil, false) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, nil) + c := ofClient.(*client) m := ovsoftest.NewMockBridge(ctrl) - ofClient.bridge = m + c.bridge = m bridge := binding.OFBridge{} m.EXPECT().BuildPacketOut().Return(bridge.BuildPacketOut()).Times(1) if success { m.EXPECT().SendPacketOut(gomock.Any()).Times(1) } - return ofClient + return c } func Test_client_SendPacketOut(t *testing.T) { diff --git a/pkg/agent/openflow/framework.go b/pkg/agent/openflow/framework.go index a1fedfbb9a6..71f7c50047c 100644 --- a/pkg/agent/openflow/framework.go +++ b/pkg/agent/openflow/framework.go @@ -320,6 +320,7 @@ type traceableFeature interface { flowsToTrace(dataplaneTag uint8, ovsMetersAreSupported, liveTraffic, + sampling, droppedOnly, receiverOnly bool, packet *binding.Packet, diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 4a871daa8b9..30340a0a221 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -25,7 +25,6 @@ import ( "antrea.io/libOpenflow/openflow15" "antrea.io/libOpenflow/protocol" "antrea.io/ofnet/ofctrl" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" netutils "k8s.io/utils/net" @@ -419,23 +418,22 @@ type flowCategoryCache struct { } type client struct { - enableProxy bool - proxyAll bool - enableDSR bool - enableAntreaPolicy bool - enableL7NetworkPolicy bool - enableDenyTracking bool - enableEgress bool - enableMulticast bool - enableTrafficControl bool - enableMulticluster bool - enablePrometheusMetrics bool - connectUplinkToBridge bool - nodeType config.NodeType - roundInfo types.RoundInfo - cookieAllocator cookie.Allocator - bridge binding.Bridge - groupIDAllocator GroupAllocator + enableProxy bool + proxyAll bool + enableDSR bool + enableAntreaPolicy bool + enableL7NetworkPolicy bool + enableDenyTracking bool + enableEgress bool + enableMulticast bool + enableTrafficControl bool + enableMulticluster bool + connectUplinkToBridge bool + nodeType config.NodeType + roundInfo types.RoundInfo + cookieAllocator cookie.Allocator + bridge binding.Bridge + groupIDAllocator GroupAllocator featurePodConnectivity *featurePodConnectivity featureService *featureService @@ -474,18 +472,6 @@ type client struct { nodeIPChecker nodeip.Checker } -func (c *client) Run(stopCh <-chan struct{}) { - // Start PacketIn - c.StartPacketInHandler(stopCh) - // Start OVS meter stats collection - if c.enablePrometheusMetrics { - if c.ovsMetersAreSupported { - klog.Info("Start collecting OVS meter stats") - go wait.Until(c.getMeterStats, time.Second*30, stopCh) - } - } -} - func (c *client) GetTunnelVirtualMAC() net.HardwareAddr { return GlobalVirtualMAC } @@ -959,13 +945,17 @@ func (f *featureService) snatConntrackFlows() []binding.Flow { // flowsToTrace generates Traceflow specific flows in the connectionTrackStateTable or L2ForwardingCalcTable for featurePodConnectivity. // When packet is not provided, the flows bypass the drop flow in conntrackStateFlow to avoid unexpected drop of the // injected Traceflow packet, and to drop any Traceflow packet that has ct_state +rpl, which may happen when the Traceflow -// request destination is the Node's IP. When packet is provided, a flow is added to mark - the first packet of the first -// connection that matches the provided packet - as the Traceflow packet. The flow is added in connectionTrackStateTable -// when receiverOnly is false and it also matches in_port to be the provided ofPort (the sender Pod); otherwise when -// receiverOnly is true, the flow is added into L2ForwardingCalcTable and matches the destination MAC (the receiver Pod MAC). +// request destination is the Node's IP. +// +// When packet is provided, a flow is added to mark the packet that matches the provided packet as the Traceflow packet. +// Specifically, when not on sampling mode, we only mark the first packet of the first connection. The flow is added in +// connectionTrackStateTable when receiverOnly is false and it also matches in_port to be the provided ofPort (the +// sender Pod); otherwise when receiverOnly is true, the flow is added into L2ForwardingCalcTable and matches the +// destination MAC (the receiver Pod MAC). func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8, ovsMetersAreSupported, liveTraffic, + sampling, droppedOnly, receiverOnly bool, packet *binding.Packet, @@ -997,11 +987,9 @@ func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8, } else { var flowBuilder binding.FlowBuilder if !receiverOnly { - flowBuilder = ConntrackStateTable.ofTable.BuildFlow(priorityLow). + flowBuilder = ConntrackStateTable.ofTable.BuildFlow(priorityHigh). Cookie(cookieID). MatchInPort(ofPort). - MatchCTStateNew(true). - MatchCTStateTrk(true). Action().LoadIPDSCP(dataplaneTag). SetHardTimeout(timeout). Action().GotoStage(stagePreRouting) @@ -1011,8 +999,6 @@ func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8, } else { flowBuilder = L2ForwardingCalcTable.ofTable.BuildFlow(priorityHigh). Cookie(cookieID). - MatchCTStateNew(true). - MatchCTStateTrk(true). MatchDstMAC(packet.DestinationMAC). Action().LoadToRegField(TargetOFPortField, ofPort). Action().LoadRegMark(OutputToOFPortRegMark). @@ -1023,6 +1009,15 @@ func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8, flowBuilder = flowBuilder.MatchSrcIP(packet.SourceIP) } } + + // For live traffic traceflow, when on sampling mode, we track every target packet. + // Otherwise, we track only the first packet. + if !sampling { + flowBuilder = flowBuilder. + MatchCTStateNew(true). + MatchCTStateTrk(true) + } + // Match transport header switch packet.IPProto { case protocol.Type_ICMP: @@ -1150,6 +1145,7 @@ func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8, func (f *featureService) flowsToTrace(dataplaneTag uint8, ovsMetersAreSupported, liveTraffic, + sampling, droppedOnly, receiverOnly bool, packet *binding.Packet, @@ -1200,6 +1196,7 @@ func (f *featureService) flowsToTrace(dataplaneTag uint8, func (f *featureNetworkPolicy) flowsToTrace(dataplaneTag uint8, ovsMetersAreSupported, liveTraffic, + sampling, droppedOnly, receiverOnly bool, packet *binding.Packet, @@ -2885,30 +2882,27 @@ func NewClient(bridgeName string, enableMulticast bool, enableTrafficControl bool, enableMulticluster bool, - groupIDAllocator GroupAllocator, - enablePrometheusMetrics bool, -) *client { + groupIDAllocator GroupAllocator) Client { bridge := binding.NewOFBridge(bridgeName, mgmtAddr) c := &client{ - bridge: bridge, - nodeIPChecker: nodeIPCheck, - enableProxy: enableProxy, - proxyAll: proxyAll, - enableDSR: enableDSR, - enableAntreaPolicy: enableAntreaPolicy, - enableL7NetworkPolicy: enableL7NetworkPolicy, - enableDenyTracking: enableDenyTracking, - enableEgress: enableEgress, - enableMulticast: enableMulticast, - enableTrafficControl: enableTrafficControl, - enableMulticluster: enableMulticluster, - enablePrometheusMetrics: enablePrometheusMetrics, - connectUplinkToBridge: connectUplinkToBridge, - pipelines: make(map[binding.PipelineID]binding.Pipeline), - packetInHandlers: map[uint8]PacketInHandler{}, - ovsctlClient: ovsctl.NewClient(bridgeName), - ovsMetersAreSupported: ovsMetersAreSupported(), - groupIDAllocator: groupIDAllocator, + bridge: bridge, + nodeIPChecker: nodeIPCheck, + enableProxy: enableProxy, + proxyAll: proxyAll, + enableDSR: enableDSR, + enableAntreaPolicy: enableAntreaPolicy, + enableL7NetworkPolicy: enableL7NetworkPolicy, + enableDenyTracking: enableDenyTracking, + enableEgress: enableEgress, + enableMulticast: enableMulticast, + enableTrafficControl: enableTrafficControl, + enableMulticluster: enableMulticluster, + connectUplinkToBridge: connectUplinkToBridge, + pipelines: make(map[binding.PipelineID]binding.Pipeline), + packetInHandlers: map[uint8]PacketInHandler{}, + ovsctlClient: ovsctl.NewClient(bridgeName), + ovsMetersAreSupported: ovsMetersAreSupported(), + groupIDAllocator: groupIDAllocator, } c.ofEntryOperations = c return c diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 1cd0a2e75ba..5d8234d7522 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -485,17 +485,17 @@ func (mr *MockClientMockRecorder) InstallServiceGroup(arg0, arg1, arg2 interface } // InstallTraceflowFlows mocks base method -func (m *MockClient) InstallTraceflowFlows(arg0 byte, arg1, arg2, arg3 bool, arg4 *openflow.Packet, arg5 uint32, arg6 uint16) error { +func (m *MockClient) InstallTraceflowFlows(arg0 byte, arg1, arg2, arg3, arg4 bool, arg5 *openflow.Packet, arg6 uint32, arg7 uint16) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallTraceflowFlows", arg0, arg1, arg2, arg3, arg4, arg5, arg6) + ret := m.ctrl.Call(m, "InstallTraceflowFlows", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) ret0, _ := ret[0].(error) return ret0 } // InstallTraceflowFlows indicates an expected call of InstallTraceflowFlows -func (mr *MockClientMockRecorder) InstallTraceflowFlows(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallTraceflowFlows(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallTraceflowFlows", reflect.TypeOf((*MockClient)(nil).InstallTraceflowFlows), arg0, arg1, arg2, arg3, arg4, arg5, arg6) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallTraceflowFlows", reflect.TypeOf((*MockClient)(nil).InstallTraceflowFlows), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) } // InstallTrafficControlMarkFlows mocks base method diff --git a/pkg/controller/traceflow/controller.go b/pkg/controller/traceflow/controller.go index 772345213ef..9d63037ed52 100644 --- a/pkg/controller/traceflow/controller.go +++ b/pkg/controller/traceflow/controller.go @@ -21,6 +21,7 @@ import ( "sync" "time" + "github.com/google/uuid" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -136,7 +137,7 @@ func (c *Controller) Run(stopCh <-chan struct{}) { } go func() { - wait.Until(c.checkTraceflowTimeout, timeoutCheckInterval, stopCh) + wait.Until(c.repostTraceflows, timeoutCheckInterval, stopCh) }() for i := 0; i < defaultWorkers; i++ { @@ -170,7 +171,7 @@ func (c *Controller) worker() { } } -func (c *Controller) checkTraceflowTimeout() { +func (c *Controller) repostTraceflows() { c.runningTraceflowsMutex.Lock() tfs := make([]string, 0, len(c.runningTraceflows)) for _, tfName := range c.runningTraceflows { @@ -251,7 +252,11 @@ func (c *Controller) syncTraceflow(traceflowName string) error { func (c *Controller) startTraceflow(tf *crdv1beta1.Traceflow) error { if err := c.validateTraceflow(tf); err != nil { klog.ErrorS(err, "Invalid Traceflow request", "request", tf) - return c.updateTraceflowStatus(tf, crdv1beta1.Failed, fmt.Sprintf("Invalid Traceflow request, err: %+v", err), 0) + return newTraceflowUpdater(tf, c). + Phase(crdv1beta1.Failed). + Reason(fmt.Sprintf("Invalid Traceflow request, err: %+v", err)). + Tag(0). + Update() } // Allocate data plane tag. tag, err := c.allocateTag(tf.Name) @@ -262,27 +267,86 @@ func (c *Controller) startTraceflow(tf *crdv1beta1.Traceflow) error { return nil } - err = c.updateTraceflowStatus(tf, crdv1beta1.Running, "", tag) + updater := newTraceflowUpdater(tf, c). + Phase(crdv1beta1.Running). + Tag(int8(tag)) + if tf.Spec.SamplingEnabled() { + updater.UID(uuid.New().String()) + } + err = updater.Update() if err != nil { c.deallocateTag(tf.Name, tag) } return err } +// setPodByTranslatedDstIP sets the Pod field in observations by the TranslatedDstIP. +// +// Theoretically, this can be done in the antrea-agent if it has a PodInformer, which is more straightforward, but a +// PodInformer consumes too much resources, so only the antrea-controller has a PodInformer and this function has to be +// executed in the antrea-controller. +func (c *Controller) setPodByTranslatedDstIP(tf *crdv1beta1.Traceflow) { + for i, nodeResult := range tf.Status.Results { + for j, ob := range nodeResult.Observations { + if ob.TranslatedDstIP != "" { + // Add Pod ns/name to observation if TranslatedDstIP (a.k.a. Service Endpoint address) is Pod IP. + pods, err := c.podInformer.Informer().GetIndexer().ByIndex(grouping.PodIPsIndex, ob.TranslatedDstIP) + if err != nil { + klog.Infof("Unable to find Pod from IP, error: %+v", err) + } else if len(pods) > 0 { + pod, ok := pods[0].(*corev1.Pod) + if !ok { + klog.Warningf("Invalid Pod obj in cache") + } else { + tf.Status.Results[i].Observations[j].Pod = fmt.Sprintf("%s/%s", pod.Namespace, pod.Name) + } + } + } + } + } +} + // checkTraceflowStatus is only called for Traceflows in the Running phase func (c *Controller) checkTraceflowStatus(tf *crdv1beta1.Traceflow) error { + if checkTraceflowSucceeded(tf) { + if !(tf.Spec.LiveTraffic && tf.Spec.DroppedOnly) { + c.setPodByTranslatedDstIP(tf) + } + c.deallocateTagForTF(tf) + return newTraceflowUpdater(tf, c). + Phase(crdv1beta1.Succeeded). + Tag(0). + Update() + } + + if checkTraceflowTimeout(tf) { + c.deallocateTagForTF(tf) + return newTraceflowUpdater(tf, c). + Phase(crdv1beta1.Failed). + Reason(traceflowTimeout). + Tag(0). + Update() + } + + return nil +} + +func checkTraceflowSucceeded(tf *crdv1beta1.Traceflow) bool { succeeded := false if tf.Spec.LiveTraffic && tf.Spec.DroppedOnly { - // There should be only one reported NodeResult for droppedOnly - // Traceflow. + // There should be only one reported NodeResult for droppedOnly Traceflow. if len(tf.Status.Results) > 0 { succeeded = true } + } else if tf.Spec.SamplingEnabled() { + if tf.Spec.Sampling.Method == crdv1beta1.FirstN && tf.Status.Sampling.NumCapturedPackets == tf.Spec.Sampling.Num { + succeeded = true + } } else { sender := false receiver := false - for i, nodeResult := range tf.Status.Results { - for j, ob := range nodeResult.Observations { + for _, nodeResult := range tf.Status.Results { + for _, ob := range nodeResult.Observations { if ob.Component == crdv1beta1.ComponentSpoofGuard { sender = true } @@ -292,20 +356,6 @@ func (c *Controller) checkTraceflowStatus(tf *crdv1beta1.Traceflow) error { ob.Action == crdv1beta1.ActionForwardedOutOfOverlay { receiver = true } - if ob.TranslatedDstIP != "" { - // Add Pod ns/name to observation if TranslatedDstIP (a.k.a. Service Endpoint address) is Pod IP. - pods, err := c.podInformer.Informer().GetIndexer().ByIndex(grouping.PodIPsIndex, ob.TranslatedDstIP) - if err != nil { - klog.Infof("Unable to find Pod from IP, error: %+v", err) - } else if len(pods) > 0 { - pod, ok := pods[0].(*corev1.Pod) - if !ok { - klog.Warningf("Invalid Pod obj in cache") - } else { - tf.Status.Results[i].Observations[j].Pod = fmt.Sprintf("%s/%s", pod.Namespace, pod.Name) - } - } - } } } // When the Source Pod is specified, the Traceflow should receive @@ -314,11 +364,10 @@ func (c *Controller) checkTraceflowStatus(tf *crdv1beta1.Traceflow) error { // receiver Node will report the results. succeeded = (sender && receiver) || (receiver && tf.Spec.Source.Pod == "") } - if succeeded { - c.deallocateTagForTF(tf) - return c.updateTraceflowStatus(tf, crdv1beta1.Succeeded, "", 0) - } + return succeeded +} +func checkTraceflowTimeout(tf *crdv1beta1.Traceflow) bool { var timeout time.Duration if tf.Spec.Timeout != 0 { timeout = time.Duration(tf.Spec.Timeout) * time.Second @@ -334,26 +383,7 @@ func (c *Controller) checkTraceflowStatus(tf *crdv1beta1.Traceflow) error { klog.V(2).InfoS("StartTime field in Traceflow Status should not be empty", "Traceflow", klog.KObj(tf)) startTime = tf.CreationTimestamp.Time } - if startTime.Add(timeout).Before(time.Now()) { - c.deallocateTagForTF(tf) - return c.updateTraceflowStatus(tf, crdv1beta1.Failed, traceflowTimeout, 0) - } - return nil -} - -func (c *Controller) updateTraceflowStatus(tf *crdv1beta1.Traceflow, phase crdv1beta1.TraceflowPhase, reason string, dataPlaneTag uint8) error { - update := tf.DeepCopy() - update.Status.Phase = phase - if phase == crdv1beta1.Running && tf.Status.StartTime == nil { - t := metav1.Now() - update.Status.StartTime = &t - } - update.Status.DataplaneTag = int8(dataPlaneTag) - if reason != "" { - update.Status.Reason = reason - } - _, err := c.client.CrdV1beta1().Traceflows().UpdateStatus(context.TODO(), update, metav1.UpdateOptions{}) - return err + return startTime.Add(timeout).Before(time.Now()) } func (c *Controller) occupyTag(tf *crdv1beta1.Traceflow) error { @@ -427,5 +457,80 @@ func (c *Controller) validateTraceflow(tf *crdv1beta1.Traceflow) error { return fmt.Errorf("using hostNetwork Pod as source in non-live-traffic Traceflow is not supported") } } + + // Validate spec related to sampling live traffic traceflow. + if tf.Spec.Sampling.Method != "" { + if !tf.Spec.LiveTraffic { + return errors.New("the sampling is valid only in live traffic mode") + } + } + return nil } + +// traceflowUpdater is a simple wrapper that makes TF updates easier. +// +// Example: newTraceflowUpdater(oldTF, controller).Tag(0).Update() will update the TF's tag while keeping other fields +// unchanged, except for some metadata. +type traceflowUpdater struct { + tf *crdv1beta1.Traceflow + controller *Controller + phase *crdv1beta1.TraceflowPhase + reason *string + uid *string + tag *int8 + packetsPath *string +} + +func newTraceflowUpdater(base *crdv1beta1.Traceflow, c *Controller) *traceflowUpdater { + return &traceflowUpdater{ + tf: base, + controller: c, + } +} + +func (t *traceflowUpdater) Phase(phase crdv1beta1.TraceflowPhase) *traceflowUpdater { + t.phase = &phase + return t +} + +func (t *traceflowUpdater) Reason(reason string) *traceflowUpdater { + t.reason = &reason + return t +} + +func (t *traceflowUpdater) UID(uid string) *traceflowUpdater { + t.uid = &uid + return t +} + +func (t *traceflowUpdater) Tag(tag int8) *traceflowUpdater { + t.tag = &tag + return t +} + +func (t *traceflowUpdater) PacketsPath(path string) *traceflowUpdater { + t.packetsPath = &path +} + +func (t *traceflowUpdater) Update() error { + newTF := t.tf.DeepCopy() + if t.phase != nil { + newTF.Status.Phase = *t.phase + } + if t.tf.Status.Phase == crdv1beta1.Running && t.tf.Status.StartTime == nil { + time := metav1.Now() + t.tf.Status.StartTime = &time + } + if t.tag != nil { + newTF.Status.DataplaneTag = *t.tag + } + if t.reason != nil { + newTF.Status.Reason = *t.reason + } + if t.packetsPath != nil { + newTF.Status.Sampling.PacketsPath = *t.packetsPath + } + _, err := t.controller.client.CrdV1beta1().Traceflows().UpdateStatus(context.TODO(), newTF, metav1.UpdateOptions{}) + return err +} diff --git a/pkg/controller/traceflow/controller_test.go b/pkg/controller/traceflow/controller_test.go index c169f14279a..5925d29bff1 100644 --- a/pkg/controller/traceflow/controller_test.go +++ b/pkg/controller/traceflow/controller_test.go @@ -143,7 +143,7 @@ func TestTraceflow(t *testing.T) { assert.NotNil(t, res) res, _ = tfc.waitForTraceflow("tf1", crdv1beta1.Failed, defaultTimeoutDuration*2) assert.NotNil(t, res) - assert.True(t, time.Now().Sub(startTime) >= time.Second*time.Duration(tf1.Spec.Timeout)) + assert.True(t, time.Since(startTime) >= time.Second*time.Duration(tf1.Spec.Timeout)) assert.Equal(t, res.Status.Reason, traceflowTimeout) assert.True(t, res.Status.DataplaneTag == 0) assert.Equal(t, numRunningTraceflows(), 0)