From da2c9071330d97ac63f9600de9ef8ec4f2fb45ec Mon Sep 17 00:00:00 2001 From: Hang Yan Date: Tue, 9 Apr 2024 19:56:16 +0800 Subject: [PATCH] Add packetsampling feature Signed-off-by: Hang Yan --- build/charts/antrea/crds/packetsampling.yaml | 173 +++ docs/packetsampling-guide.md | 76 ++ .../controller/packetsampling/packetin.go | 117 ++ .../packetsampling/packetin_test.go | 256 +++++ .../packetsampling_controller.go | 750 ++++++++++++ .../packetsampling_controller_test.go | 1012 +++++++++++++++++ pkg/agent/openflow/packetsampling.go | 55 + .../crd/v1alpha1/fake/fake_packetsampling.go | 130 +++ .../typed/crd/v1alpha1/packetsampling.go | 182 +++ .../crd/v1alpha1/packetsampling.go | 87 ++ .../listers/crd/v1alpha1/packetsampling.go | 66 ++ pkg/controller/packetsampling/validate.go | 103 ++ .../packetsampling/validate_test.go | 201 ++++ pkg/util/ftp/auth.go | 102 ++ pkg/util/ftp/auth_test.go | 183 +++ pkg/util/ftp/ftp.go | 112 ++ pkg/util/ftp/ftp_test.go | 66 ++ test/e2e/packetsampling_test.go | 636 +++++++++++ 18 files changed, 4307 insertions(+) create mode 100644 build/charts/antrea/crds/packetsampling.yaml create mode 100644 docs/packetsampling-guide.md create mode 100644 pkg/agent/controller/packetsampling/packetin.go create mode 100644 pkg/agent/controller/packetsampling/packetin_test.go create mode 100644 pkg/agent/controller/packetsampling/packetsampling_controller.go create mode 100644 pkg/agent/controller/packetsampling/packetsampling_controller_test.go create mode 100644 pkg/agent/openflow/packetsampling.go create mode 100644 pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_packetsampling.go create mode 100644 pkg/client/clientset/versioned/typed/crd/v1alpha1/packetsampling.go create mode 100644 pkg/client/informers/externalversions/crd/v1alpha1/packetsampling.go create mode 100644 pkg/client/listers/crd/v1alpha1/packetsampling.go create mode 100644 pkg/controller/packetsampling/validate.go create mode 100644 pkg/controller/packetsampling/validate_test.go create mode 100644 pkg/util/ftp/auth.go create mode 100644 pkg/util/ftp/auth_test.go create mode 100644 pkg/util/ftp/ftp.go create mode 100644 pkg/util/ftp/ftp_test.go create mode 100644 test/e2e/packetsampling_test.go diff --git a/build/charts/antrea/crds/packetsampling.yaml b/build/charts/antrea/crds/packetsampling.yaml new file mode 100644 index 00000000000..3b1bf30e4a9 --- /dev/null +++ b/build/charts/antrea/crds/packetsampling.yaml @@ -0,0 +1,173 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: packetsamplings.crd.antrea.io + labels: + app: antrea +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + additionalPrinterColumns: + - jsonPath: .status.phase + description: The phase of the PacketSampling. + name: Phase + type: string + - jsonPath: .spec.source.pod + description: The name of the source Pod. + name: Source-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.pod + description: The name of the destination Pod. + name: Destination-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.ip + description: The IP address of the destination. + name: Destination-IP + type: string + priority: 10 + - jsonPath: .spec.timeout + description: Timeout in seconds. + name: Timeout + type: integer + priority: 10 + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + properties: + source: + type: object + properties: + pod: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + destination: + type: object + properties: + pod: + type: string + service: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + packet: + type: object + properties: + ipHeader: + type: object + properties: + protocol: + type: integer + minimum: 0 + maximum: 255 + ipv6Header: + type: object + properties: + nextHeader: + type: integer + minimum: 0 + maximum: 65535 + transportHeader: + type: object + properties: + udp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + tcp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + flags: + type: integer + minimum: 0 + maximum: 255 + timeout: + type: integer + minimum: 1 + maximum: 300 + type: + type: string + firstNSamplingConfig: + type: object + properties: + number: + type: integer + fileServer: + type: object + properties: + url: + type: string + authentication: + type: object + properties: + authType: + type: string + enum: [ "BearerToken", "APIKey", "BasicAuthentication" ] + authSecret: + type: object + properties: + name: + type: string + namespace: + type: string + status: + type: object + properties: + reason: + type: string + phase: + type: string + startTime: + type: string + numCapturedPackets: + type: integer + packetsPath: + type: string + + subresources: + status: {} + scope: Cluster + names: + plural: packetsamplings + singular: packetsampling + kind: PacketSampling + shortNames: + - ps diff --git a/docs/packetsampling-guide.md b/docs/packetsampling-guide.md new file mode 100644 index 00000000000..f054a81f367 --- /dev/null +++ b/docs/packetsampling-guide.md @@ -0,0 +1,76 @@ +# PacketSampling User Guide + +Starting with Antrea v1.16, Antrea supports using PacketSampling for network diagnosis. +It can capture specified number of packets from real traffic and upload them to a +supported storage location. Users can create a PacketSampling CRD to trigger +such actions on the target traffic flow. + + +- [Prerequisites](#prerequisites) +- [Start a new PacketSampling](#start-a-new-packetsampling) + + +## Prerequisites + +The PacketSampling feature is disabled by default. If you +want to enable this feature, you need to set PacketSampling feature gate to true in `antrea-config` +ConfigMap for antrea-agent. In order to use a Service as the destination +in sampling, you also need to ensure [AntreaProxy](feature-gates.md#antreaproxy) is enabled in the agent configuration: +```yaml + antrea-agent.conf: | + featureGates: + # Enable packetsampling feature to capture real traffic packets. + PacketSampling: true +``` + +## Start a new PacketSampling + +When start a new packet sampling, you can provide the following information to identify +the target flow: + +* Source Pod +* Destination Pod, Service or IP address +* Transport protocol (TCP/UDP/ICMP) +* Transport ports + +You can start a new packet sampling by creating PacketSampling CRD via +`kubectl` and a yaml file which contains the essential configuration of +PacketSampling CRD. Following is an example of PacketSampling CRD: + +```yaml +apiVersion: crd.antrea.io/v1alpha1 +kind: PacketSampling +metadata: + name: ps-test +spec: + fileServer: + url: sftp://127.0.0.1:22/upload # define your own ftp url here. + authentication: + authType: "BasicAuthentication" + authSecret: + name: test-secret + namespace: default + timeout: 600 + type: FirstNSampling + firstNSamplingConfig: + number: 5 + source: + namespace: default + pod: frontend + destination: + namespace: default + pod: backend + # Destination can also be an IP address ('ip' field) or a Service name ('service' field); the 3 choices are mutually exclusive. + packet: + ipHeader: # If ipHeader/ipv6Header is not set, the default value is IPv4+ICMP. + protocol: 6 # Protocol here can be 6 (TCP), 17 (UDP) or 1 (ICMP), default value is 1 (ICMP) + transportHeader: + tcp: + dstPort: 8080 # Destination port needs to be set when the protocol is TCP/UDP. +``` + +The CRD above starts a new packet sampling from a Pod named `frontend` +to the port 8080 of a Pod named `backend` using TCP protocol. It will capture the first 5 packets +that meet this criterion and upload them to the file server specified in the PacketSampling's +specifications. Users can download the packet file from the ftp server and analysis it's content +with common network diagnose tools like Wireshark or `tcpdump`. diff --git a/pkg/agent/controller/packetsampling/packetin.go b/pkg/agent/controller/packetsampling/packetin.go new file mode 100644 index 00000000000..9dfcc81456c --- /dev/null +++ b/pkg/agent/controller/packetsampling/packetin.go @@ -0,0 +1,117 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packetsampling + +import ( + "fmt" + "time" + + "antrea.io/libOpenflow/util" + "antrea.io/ofnet/ofctrl" + "github.com/google/gopacket" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/openflow" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" +) + +// HandlePacketIn processes PacketIn messages from the OFSwitch. If the reg flag match, it will be counted and captured. +// Once the total number reaches the target one, the PacketSampling will be marked as Succeed. +func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error { + klog.V(4).InfoS("PacketIn for PacketSampling", "PacketIn", pktIn.PacketIn) + samplingState, samplingFinished, err := c.parsePacketIn(pktIn) + if err != nil { + return fmt.Errorf("parsePacketIn error: %v", err) + } + if samplingFinished { + return nil + } + rawData := pktIn.Data.(*util.Buffer).Bytes() + ci := gopacket.CaptureInfo{ + Timestamp: time.Now(), + CaptureLength: len(rawData), + Length: len(rawData), + } + err = samplingState.pcapngWriter.WritePacket(ci, rawData) + if err != nil { + return fmt.Errorf("couldn't write packet: %w", err) + } + reachTarget := samplingState.numCapturedPackets == samplingState.maxNumCapturedPackets + // use rate limiter to reduce the times we need to update status. + if reachTarget || samplingState.updateRateLimiter.Allow() { + ps, err := c.packetSamplingLister.Get(samplingState.name) + if err != nil { + return fmt.Errorf("get PacketSampling failed: %w", err) + } + err = c.updatePacketSamplingStatus(ps, crdv1alpha1.PacketSamplingRunning, "", samplingState.numCapturedPackets) + if err != nil { + return fmt.Errorf("failed to update the PacketSampling: %w", err) + } + klog.InfoS("Updated PacketSampling", "packetsampling", klog.KObj(ps), "numCapturedPackets", samplingState.numCapturedPackets) + // if reach the target. upload it. + if reachTarget { + if err := samplingState.finishWriting(); err != nil { + return err + } + return c.uploadPacketsFile(ps) + } + } + return nil +} + +// parsePacketIn parses the packet-in message and returns +// 1. the sampling state of the PacketSampling (on sampling mode) +func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (_ *packetSamplingState, samplingFinished bool, _ error) { + var tag uint8 + matchers := pktIn.GetMatches() + match := openflow.GetMatchFieldByRegID(matchers, openflow.PacketSamplingMark.GetRegID()) + if match != nil { + value, err := openflow.GetInfoInReg(match, openflow.PacketSamplingMark.GetRange().ToNXRange()) + if err != nil { + return nil, false, fmt.Errorf("failed to get PacketSampling tag from packet-in message: %v", err) + } + tag = uint8(value) + } + c.runningPacketSamplingsMutex.Lock() + psState, exists := c.runningPacketSamplings[tag] + if exists { + if psState.numCapturedPackets == psState.maxNumCapturedPackets { + c.runningPacketSamplingsMutex.Unlock() + return nil, true, nil + } + psState.numCapturedPackets++ + if psState.numCapturedPackets == psState.maxNumCapturedPackets { + err := c.ofClient.UninstallPacketSamplingFlows(tag) + if err != nil { + return nil, false, fmt.Errorf("uninstall PacketSampling ovs flow failed: %v", err) + } + } + } + c.runningPacketSamplingsMutex.Unlock() + if !exists { + return nil, false, fmt.Errorf("PacketSampling for dataplane tag %d not found in cache", tag) + } + return psState, false, nil +} + +func (c *Controller) uploadPacketsFile(ps *crdv1alpha1.PacketSampling) error { + name := uidToPath(string(ps.UID)) + file, err := defaultFS.Open(name) + if err != nil { + return err + } + defer file.Close() + return c.uploadPackets(ps, file) +} diff --git a/pkg/agent/controller/packetsampling/packetin_test.go b/pkg/agent/controller/packetsampling/packetin_test.go new file mode 100644 index 00000000000..7e0f82263ba --- /dev/null +++ b/pkg/agent/controller/packetsampling/packetin_test.go @@ -0,0 +1,256 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packetsampling + +import ( + "context" + "net" + "testing" + + "antrea.io/libOpenflow/openflow15" + "antrea.io/libOpenflow/protocol" + "antrea.io/libOpenflow/util" + "antrea.io/ofnet/ofctrl" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcapgo" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "golang.org/x/crypto/ssh" + "golang.org/x/time/rate" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/openflow" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" +) + +const ( + maxNum = 5 +) + +var ( + testTag = uint8(1) + testUID = "1-2-3-4" + testSFTPUrl = "sftp://10.220.175.92:22/root/packetsamplings" + // parse to tag(1) + testTagData = []byte{0x11, 0x00, 0x00, 0x11} +) + +func genMatchers() []openflow15.MatchField { + m := generateMatch(openflow.PacketSamplingMark.GetRegID(), testTagData) + matchers := []openflow15.MatchField{m} + return matchers +} + +func generateMatch(regID int, data []byte) openflow15.MatchField { + baseData := make([]byte, 8, 8) + if regID%2 == 0 { + copy(baseData[0:4], data) + } else { + copy(baseData[4:8], data) + } + return openflow15.MatchField{ + Class: openflow15.OXM_CLASS_PACKET_REGS, + // convert reg (4-byte) ID to xreg (8-byte) ID + Field: uint8(regID / 2), + HasMask: false, + Value: &openflow15.ByteArrayField{Data: baseData}, + } +} + +func getTestPacketBytes(dstIP string, dscp uint8) []byte { + ipPacket := &protocol.IPv4{ + Version: 0x4, + IHL: 5, + Protocol: uint8(8), + DSCP: dscp, + Length: 20, + NWSrc: net.IP(pod1IPv4), + NWDst: net.IP(dstIP), + } + ethernetPkt := protocol.NewEthernet() + ethernetPkt.HWSrc = pod1MAC + ethernetPkt.Ethertype = protocol.IPv4_MSG + ethernetPkt.Data = ipPacket + pktBytes, _ := ethernetPkt.MarshalBinary() + return pktBytes +} + +func generateTestPsState(name string, writer *pcapgo.NgWriter, num int32) *packetSamplingState { + return &packetSamplingState{ + name: name, + maxNumCapturedPackets: maxNum, + numCapturedPackets: num, + tag: testTag, + pcapngWriter: writer, + shouldSyncPackets: true, + updateRateLimiter: rate.NewLimiter(rate.Every(samplingStatusUpdatePeriod), 1), + } +} + +func generatePacketSampling(name string) *crdv1alpha1.PacketSampling { + return &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: types.UID(testUID), + }, + Status: crdv1alpha1.PacketSamplingStatus{}, + Spec: crdv1alpha1.PacketSamplingSpec{ + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + FileServer: crdv1alpha1.BundleFileServer{ + URL: testSFTPUrl, + }, + Authentication: crdv1alpha1.BundleServerAuthConfiguration{ + AuthType: crdv1alpha1.BasicAuthentication, + AuthSecret: &v1.SecretReference{ + Name: "AAA", + Namespace: "default", + }, + }, + }, + } +} + +func generateTestSecret() *v1.Secret { + return &v1.Secret{ + TypeMeta: metav1.TypeMeta{ + Kind: "Secret", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "AAA", + Namespace: "default", + }, + Data: map[string][]byte{ + "username": []byte("AAA"), + "password": []byte("BBBCCC"), + }, + } +} + +type testUploader struct { +} + +func (uploader *testUploader) Upload(url string, fileName string, config *ssh.ClientConfig, outputFile afero.File) error { + klog.Info("Called test uploader") + return nil +} + +func TestHandlePacketSamplingPacketIn(t *testing.T) { + + invalidPktBytes := getTestPacketBytes("89.207.132.170", 0) + pktBytesPodToPod := getTestPacketBytes(pod2IPv4, testTag) + + // create test os + defaultFS = afero.NewMemMapFs() + defaultFS.MkdirAll("/tmp/packetsampling/packets", 0755) + file, err := defaultFS.Create(uidToPath(testUID)) + if err != nil { + t.Fatal("create pcapng file error: ", err) + } + + testWriter, err := pcapgo.NewNgWriter(file, layers.LinkTypeEthernet) + if err != nil { + t.Fatal("create test pcapng writer failed: ", err) + } + + tests := []struct { + name string + networkConfig *config.NetworkConfig + nodeConfig *config.NodeConfig + psState *packetSamplingState + pktIn *ofctrl.PacketIn + expectedPS *crdv1alpha1.PacketSampling + expectedErrStr string + expectedCalls func(mockOFClient *openflowtest.MockClient) + expectedNum int32 + }{ + { + name: "invalid packets", + psState: generateTestPsState("ps-with-invalid-packet", testWriter, 0), + expectedPS: generatePacketSampling("ps-with-invalid-packet"), + pktIn: &ofctrl.PacketIn{ + PacketIn: &openflow15.PacketIn{ + Data: util.NewBuffer(invalidPktBytes), + }, + }, + expectedErrStr: "parsePacketIn error: PacketSampling for dataplane tag 0 not found in cache", + }, + { + name: "not hitting target number", + psState: generateTestPsState("ps-with-less-num", testWriter, 1), + expectedPS: generatePacketSampling("ps-with-less-num"), + expectedNum: 2, + pktIn: &ofctrl.PacketIn{ + PacketIn: &openflow15.PacketIn{ + Data: util.NewBuffer(pktBytesPodToPod), + Match: openflow15.Match{ + Fields: genMatchers(), + }, + }, + }, + }, + { + name: "hit target number", + psState: generateTestPsState("ps-with-max-num", testWriter, maxNum-1), + expectedPS: generatePacketSampling("ps-with-max-num"), + expectedNum: maxNum, + pktIn: &ofctrl.PacketIn{ + PacketIn: &openflow15.PacketIn{ + Data: util.NewBuffer(pktBytesPodToPod), + Match: openflow15.Match{ + Fields: genMatchers(), + }, + }, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().UninstallPacketSamplingFlows(uint8(testTag)) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + psc := newFakePacketSamplingController(t, nil, []runtime.Object{tt.expectedPS}, nil, &config.NodeConfig{Name: "node1"}) + if tt.expectedCalls != nil { + tt.expectedCalls(psc.mockOFClient) + } + stopCh := make(chan struct{}) + defer close(stopCh) + psc.crdInformerFactory.Start(stopCh) + psc.crdInformerFactory.WaitForCacheSync(stopCh) + psc.runningPacketSamplings[tt.psState.tag] = tt.psState + + err := psc.HandlePacketIn(tt.pktIn) + if err == nil { + assert.Equal(t, tt.expectedErrStr, "") + // check target num in status + ps, err := psc.crdClient.CrdV1alpha1().PacketSamplings().Get(context.TODO(), tt.expectedPS.Name, metav1.GetOptions{}) + assert.Nil(t, err) + assert.Equal(t, tt.expectedNum, ps.Status.NumCapturedPackets) + } else { + assert.Equal(t, tt.expectedErrStr, err.Error()) + } + + }) + } +} diff --git a/pkg/agent/controller/packetsampling/packetsampling_controller.go b/pkg/agent/controller/packetsampling/packetsampling_controller.go new file mode 100644 index 00000000000..cb28b2c351f --- /dev/null +++ b/pkg/agent/controller/packetsampling/packetsampling_controller.go @@ -0,0 +1,750 @@ +// Copyright 2024 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packetsampling + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net" + "os" + "path" + "path/filepath" + "sync" + "time" + + "antrea.io/libOpenflow/protocol" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcapgo" + "github.com/spf13/afero" + "golang.org/x/time/rate" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + clientset "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/agent/util" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + clientsetversioned "antrea.io/antrea/pkg/client/clientset/versioned" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" + crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + binding "antrea.io/antrea/pkg/ovs/openflow" + "antrea.io/antrea/pkg/util/ftp" +) + +type StorageProtocolType string + +const ( + sftpProtocol StorageProtocolType = "sftp" +) + +const ( + controllerName = "AntreaAgentPacketSamplingController" + resyncPeriod time.Duration = 0 + + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + + defaultWorkers = 4 + + // 4bits in ovs reg4 + minTagNum uint8 = 1 + maxTagNum uint8 = 15 + + // reason for timeout + samplingTimeoutReason = "PacketSampling timeout" + defaultTimeoutDuration = time.Second * time.Duration(crdv1alpha1.DefaultPacketSamplingTimeout) + + samplingStatusUpdatePeriod = 10 * time.Second +) + +var ( + packetDirectory = getPacketDirectory() + defaultFS = afero.NewOsFs() +) + +func getPacketDirectory() string { + return filepath.Join(os.TempDir(), "antrea", "packetsampling", "packets") +} + +type packetSamplingState struct { + name string + tag uint8 + // shouldSyncPackets means this node will be responsible for doing the actual packet capture job. + shouldSyncPackets bool + numCapturedPackets int32 + maxNumCapturedPackets int32 + updateRateLimiter *rate.Limiter + uid string + pcapngFile afero.File + pcapngWriter *pcapgo.NgWriter + receiverOnly bool + isSender bool +} + +type Controller struct { + kubeClient clientset.Interface + crdClient clientsetversioned.Interface + serviceLister corelisters.ServiceLister + serviceListerSynced cache.InformerSynced + endpointLister corelisters.EndpointsLister + endpointSynced cache.InformerSynced + packetSamplingInformer crdinformers.PacketSamplingInformer + packetSamplingLister crdlisters.PacketSamplingLister + packetSamplingSynced cache.InformerSynced + ofClient openflow.Client + interfaceStore interfacestore.InterfaceStore + nodeConfig *config.NodeConfig + queue workqueue.RateLimitingInterface + runningPacketSamplingsMutex sync.RWMutex + runningPacketSamplings map[uint8]*packetSamplingState + enableAntreaProxy bool + sftpUploader ftp.UpLoader +} + +func NewPacketSamplingController( + kubeClient clientset.Interface, + crdClient clientsetversioned.Interface, + serviceInformer coreinformers.ServiceInformer, + endpointInformer coreinformers.EndpointsInformer, + packetSamplingInformer crdinformers.PacketSamplingInformer, + client openflow.Client, + interfaceStore interfacestore.InterfaceStore, + nodeConfig *config.NodeConfig, + enableAntreaProxy bool, +) *Controller { + c := &Controller{ + kubeClient: kubeClient, + crdClient: crdClient, + packetSamplingInformer: packetSamplingInformer, + packetSamplingLister: packetSamplingInformer.Lister(), + packetSamplingSynced: packetSamplingInformer.Informer().HasSynced, + ofClient: client, + interfaceStore: interfaceStore, + nodeConfig: nodeConfig, + queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), + workqueue.RateLimitingQueueConfig{Name: "packetsampling"}), + runningPacketSamplings: make(map[uint8]*packetSamplingState), + sftpUploader: &ftp.SftpUploader{}, + enableAntreaProxy: enableAntreaProxy, + } + + packetSamplingInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addPacketSampling, + UpdateFunc: c.updatePacketSampling, + DeleteFunc: c.deletePacketSampling, + }, resyncPeriod) + + c.ofClient.RegisterPacketInHandler(uint8(openflow.PacketInCategoryPS), c) + + if c.enableAntreaProxy { + c.serviceLister = serviceInformer.Lister() + c.serviceListerSynced = serviceInformer.Informer().HasSynced + c.endpointLister = endpointInformer.Lister() + c.endpointSynced = endpointInformer.Informer().HasSynced + } + return c +} + +func (c *Controller) enqueuePacketSampling(ps *crdv1alpha1.PacketSampling) { + c.queue.Add(ps.Name) +} + +// Run will create defaultWorkers workers (go routines) which will process the PacketSampling events from the +// workqueue. +func (c *Controller) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.InfoS("Starting packetsampling controller.", "name", controllerName) + defer klog.InfoS("Shutting down packetsampling controller.", "name", controllerName) + + cacheSynced := []cache.InformerSynced{c.packetSamplingSynced} + if c.enableAntreaProxy { + cacheSynced = append(cacheSynced, c.serviceListerSynced, c.endpointSynced) + } + if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSynced...) { + return + } + + err := defaultFS.MkdirAll(packetDirectory, 0755) + if err != nil { + klog.ErrorS(err, "Couldn't create directory for storing sampling packets", "directory", packetDirectory) + return + } + + for i := 0; i < defaultWorkers; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + <-stopCh +} + +func (c *Controller) addPacketSampling(obj interface{}) { + ps := obj.(*crdv1alpha1.PacketSampling) + klog.InfoS("Processing PacketSampling ADD event", "name", ps.Name) + c.enqueuePacketSampling(ps) +} + +func (c *Controller) updatePacketSampling(_, obj interface{}) { + ps := obj.(*crdv1alpha1.PacketSampling) + klog.InfoS("Processing PacketSampling UPDATE EVENT", "name", ps.Name) + c.enqueuePacketSampling(ps) +} + +func (c *Controller) deletePacketSampling(obj interface{}) { + ps := obj.(*crdv1alpha1.PacketSampling) + klog.InfoS("Processing PacketSampling DELETE event", "name", ps.Name) + err := deletePcapngFile(string(ps.UID)) + if err != nil { + klog.ErrorS(err, "Couldn't delete pcapng file") + + } + c.enqueuePacketSampling(ps) + +} + +func deletePcapngFile(uid string) error { + return defaultFS.Remove(uidToPath(uid)) +} + +func uidToPath(uid string) string { + return path.Join(packetDirectory, uid+".pcapng") +} + +func (c *Controller) worker() { + for c.processPacketSamplingItem() { + } +} + +func (c *Controller) processPacketSamplingItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + + defer c.queue.Done(obj) + if key, ok := obj.(string); !ok { + c.queue.Forget(obj) + klog.ErrorS(nil, "Expected string in work queue but got:", "obj", obj) + return true + } else if err := c.syncPacketSampling(key); err == nil { + c.queue.Forget(key) + } else { + klog.ErrorS(err, "Error syncing PacketSampling, exiting.", "key", key) + } + return true +} + +func (c *Controller) validatePacketSampling(ps *crdv1alpha1.PacketSampling) error { + if ps.Spec.Destination.Service != "" && !c.enableAntreaProxy { + return errors.New("using Service destination requires AntreaProxy feature enabled") + } + if ps.Spec.Destination.IP != "" { + destIP := net.ParseIP(ps.Spec.Destination.IP) + if destIP == nil { + return fmt.Errorf("destination IP %s is not valid", ps.Spec.Destination.IP) + } + } + return nil +} + +func (c *Controller) cleanupPacketSampling(psName string) { + psState := c.deletePacketSamplingState(psName) + if psState != nil { + err := c.ofClient.UninstallPacketSamplingFlows(psState.tag) + if err != nil { + klog.ErrorS(err, "Error cleaning up flows for PacketSampling.", "name", psName) + } + } +} + +func (state *packetSamplingState) finishWriting() error { + err := state.pcapngWriter.Flush() + if err != nil { + return err + } + return state.pcapngFile.Close() +} + +func (c *Controller) deletePacketSamplingState(psName string) *packetSamplingState { + c.runningPacketSamplingsMutex.Lock() + defer c.runningPacketSamplingsMutex.Unlock() + + for tag, state := range c.runningPacketSamplings { + if state.name == psName { + delete(c.runningPacketSamplings, tag) + return state + } + } + return nil +} + +func (c *Controller) startPacketSampling(ps *crdv1alpha1.PacketSampling, psState *packetSamplingState) error { + err := c.validatePacketSampling(ps) + defer func() { + if err != nil { + c.cleanupPacketSampling(ps.Name) + c.updatePacketSamplingStatus(ps, crdv1alpha1.PacketSamplingFailed, fmt.Sprintf("Node: %s, error:%+v", c.nodeConfig.Name, err), 0) + + } + }() + if err != nil { + return err + } + + receiverOnly := false + senderOnly := false + var pod, ns string + + if ps.Spec.Source.Pod != "" { + pod = ps.Spec.Source.Pod + ns = ps.Spec.Source.Namespace + if ps.Spec.Destination.Pod == "" { + senderOnly = true + } + } else { + pod = ps.Spec.Destination.Pod + ns = ps.Spec.Destination.Namespace + receiverOnly = true + } + + podInterfaces := c.interfaceStore.GetContainerInterfacesByPod(pod, ns) + isSender := !receiverOnly && len(podInterfaces) > 0 + + var packet, senderPacket *binding.Packet + var endpointPackets []binding.Packet + var ofPort uint32 + + if len(podInterfaces) > 0 { + packet, err = c.preparePacket(ps, podInterfaces[0], receiverOnly) + if err != nil { + return err + } + ofPort = uint32(podInterfaces[0].OFPort) + senderPacket = packet + klog.V(2).InfoS("PacketSampling packet:", "packet", *packet) + if senderOnly && ps.Spec.Destination.Service != "" { + endpointPackets, err = c.genEndpointMatchPackets(ps) + if err != nil { + return fmt.Errorf("couldn't generate endpoint match packets: %w", err) + } + } + } + + c.runningPacketSamplingsMutex.Lock() + psState.receiverOnly = receiverOnly + psState.maxNumCapturedPackets = ps.Spec.FirstNSamplingConfig.Number + psState.isSender = isSender + + filePath := uidToPath(string(ps.UID)) + exists, err := fileExists(filePath) + if err != nil { + return fmt.Errorf("couldn't check if the file exists: %w", err) + } + if exists { + return fmt.Errorf("packet file already exists. this may be due to an unexpected termination") + } + file, err := defaultFS.Create(filePath) + if err != nil { + return fmt.Errorf("failed to create pcapng file: %w", err) + } + writer, err := pcapgo.NewNgWriter(file, layers.LinkTypeEthernet) + if err != nil { + return fmt.Errorf("couldn't init pcap writer: %w", err) + } + + psState.shouldSyncPackets = len(podInterfaces) > 0 + psState.uid = string(ps.UID) + psState.pcapngFile = file + psState.pcapngWriter = writer + + if psState.shouldSyncPackets { + psState.updateRateLimiter = rate.NewLimiter(rate.Every(samplingStatusUpdatePeriod), 1) + } + c.runningPacketSamplings[psState.tag] = psState + c.runningPacketSamplingsMutex.Unlock() + timeout := ps.Spec.Timeout + if timeout == 0 { + timeout = crdv1alpha1.DefaultPacketSamplingTimeout + } + if psState.shouldSyncPackets { + klog.V(2).InfoS("installing flow entries for PacketSampling.", "name", ps.Name) + err = c.ofClient.InstallPacketSamplingFlows(psState.tag, senderOnly, receiverOnly, senderPacket, endpointPackets, ofPort, timeout) + if err != nil { + klog.ErrorS(err, "install flow entries failed.", "name", ps.Name) + } + } + return err + +} + +func fileExists(path string) (bool, error) { + _, err := defaultFS.Stat(path) + if err == nil { + return true, nil + } else { + if os.IsNotExist(err) { + return false, nil + } else { + return false, err + } + } +} + +// genEndpointMatchPackets generates match packets (with destination Endpoint's IP/port info) besides the normal match packet. +// these match packets will help the pipeline to capture the pod -> svc traffic. +// TODO: 1. support name based port name 2. dual-stack support +func (c *Controller) genEndpointMatchPackets(ps *crdv1alpha1.PacketSampling) ([]binding.Packet, error) { + var port int32 + if ps.Spec.Packet.TransportHeader.TCP != nil { + port = ps.Spec.Packet.TransportHeader.TCP.DstPort + } else if ps.Spec.Packet.TransportHeader.UDP != nil { + port = ps.Spec.Packet.TransportHeader.UDP.DstPort + } + var packets []binding.Packet + dstSvc, err := c.serviceLister.Services(ps.Spec.Destination.Namespace).Get(ps.Spec.Destination.Service) + if err != nil { + return nil, err + } + for _, item := range dstSvc.Spec.Ports { + if item.Port == port { + if item.TargetPort.Type == intstr.Int { + port = item.TargetPort.IntVal + } + } + } + dstEndpoint, err := c.endpointLister.Endpoints(ps.Spec.Destination.Namespace).Get(ps.Spec.Destination.Service) + if err != nil { + return nil, err + } + for _, item := range dstEndpoint.Subsets[0].Addresses { + packet := binding.Packet{} + packet.DestinationIP = net.ParseIP(item.IP) + if port != 0 { + packet.DestinationPort = uint16(port) + } + packet.IPProto, _ = parseTargetProto(&ps.Spec.Packet) + packets = append(packets, packet) + } + return packets, nil +} + +func (c *Controller) preparePacket(ps *crdv1alpha1.PacketSampling, intf *interfacestore.InterfaceConfig, receiverOnly bool) (*binding.Packet, error) { + packet := new(binding.Packet) + packet.IsIPv6 = ps.Spec.Packet.IPv6Header != nil + + if receiverOnly { + if ps.Spec.Source.IP != "" { + packet.SourceIP = net.ParseIP(ps.Spec.Source.IP) + } + packet.DestinationMAC = intf.MAC + } else if ps.Spec.Destination.IP != "" { + packet.DestinationIP = net.ParseIP(ps.Spec.Destination.IP) + } else if ps.Spec.Destination.Pod != "" { + dstPodInterfaces := c.interfaceStore.GetContainerInterfacesByPod(ps.Spec.Destination.Pod, ps.Spec.Destination.Namespace) + if len(dstPodInterfaces) > 0 { + if packet.IsIPv6 { + packet.DestinationIP = dstPodInterfaces[0].GetIPv6Addr() + } else { + packet.DestinationIP = dstPodInterfaces[0].GetIPv4Addr() + } + } else { + dstPod, err := c.kubeClient.CoreV1().Pods(ps.Spec.Destination.Namespace).Get(context.TODO(), ps.Spec.Destination.Pod, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get the destination pod %s/%s: %v", ps.Spec.Destination.Namespace, ps.Spec.Destination.Pod, err) + } + podIPs := make([]net.IP, len(dstPod.Status.PodIPs)) + for i, ip := range dstPod.Status.PodIPs { + podIPs[i] = net.ParseIP(ip.IP) + } + if packet.IsIPv6 { + packet.DestinationIP, _ = util.GetIPWithFamily(podIPs, util.FamilyIPv6) + } else { + packet.DestinationIP = util.GetIPv4Addr(podIPs) + } + } + if packet.DestinationIP == nil { + if packet.IsIPv6 { + return nil, errors.New("destination Pod does not have an IPv6 address") + } + return nil, errors.New("destination Pod does not have an IPv4 address") + } + } else if ps.Spec.Destination.Service != "" { + dstSvc, err := c.serviceLister.Services(ps.Spec.Destination.Namespace).Get(ps.Spec.Destination.Service) + if err != nil { + return nil, fmt.Errorf("failed to get the destination service %s/%s: %v", ps.Spec.Destination.Namespace, ps.Spec.Destination.Service, err) + } + if dstSvc.Spec.ClusterIP == "" { + return nil, errors.New("destination Service does not have a ClusterIP") + } + + packet.DestinationIP = net.ParseIP(dstSvc.Spec.ClusterIP) + if !packet.IsIPv6 { + packet.DestinationIP = packet.DestinationIP.To4() + if packet.DestinationIP == nil { + return nil, errors.New("destination Service does not have an IPv4 address") + } + } else if packet.DestinationIP.To4() != nil { + return nil, errors.New("destination Service does not have an IPv6 address") + } + } else { + return nil, errors.New("destination is not specified") + } + + if ps.Spec.Packet.TransportHeader.TCP != nil { + packet.SourcePort = uint16(ps.Spec.Packet.TransportHeader.TCP.SrcPort) + packet.DestinationPort = uint16(ps.Spec.Packet.TransportHeader.TCP.DstPort) + if ps.Spec.Packet.TransportHeader.TCP.Flags != 0 { + packet.TCPFlags = uint8(ps.Spec.Packet.TransportHeader.TCP.Flags) + } + } else if ps.Spec.Packet.TransportHeader.UDP != nil { + packet.SourcePort = uint16(ps.Spec.Packet.TransportHeader.UDP.SrcPort) + packet.DestinationPort = uint16(ps.Spec.Packet.TransportHeader.UDP.DstPort) + } + + proto, err := parseTargetProto(&ps.Spec.Packet) + if err != nil { + return nil, err + } + packet.IPProto = proto + return packet, nil +} + +func parseTargetProto(packet *crdv1alpha1.Packet) (uint8, error) { + var ipProto uint8 + var isIPv6 bool + if packet.IPv6Header != nil { + isIPv6 = true + if packet.IPv6Header.NextHeader != nil { + ipProto = uint8(*packet.IPv6Header.NextHeader) + } + } else if packet.IPHeader.Protocol != 0 { + ipProto = uint8(packet.IPHeader.Protocol) + } + + proto2 := ipProto + if packet.TransportHeader.TCP != nil { + proto2 = protocol.Type_TCP + } else if packet.TransportHeader.UDP != nil { + proto2 = protocol.Type_UDP + } else if packet.TransportHeader.ICMP != nil || ipProto == 0 { + proto2 = protocol.Type_ICMP + if isIPv6 { + proto2 = protocol.Type_IPv6ICMP + } + } + + if ipProto != 0 && proto2 != ipProto { + return 0, errors.New("conflicting protocol settings in ipHeader and transportHeader") + } + return proto2, nil +} + +func (c *Controller) syncPacketSampling(psName string) error { + startTime := time.Now() + defer func() { + klog.V(4).InfoS("Finished syncing PacketSampling.", "name", psName, "startTime", time.Since(startTime)) + }() + + ps, err := c.packetSamplingLister.Get(psName) + if err != nil { + if apierrors.IsNotFound(err) { + c.cleanupPacketSampling(psName) + return nil + } + return err + } + + switch ps.Status.Phase { + case "": + err = c.initPacketSampling(ps) + case crdv1alpha1.PacketSamplingRunning: + err = c.checkPacketSamplingStatus(ps) + default: + c.cleanupPacketSampling(psName) + } + return err + +} + +// Allocates a tag. If the PacketSampling request has been allocated with a tag +// already, 0 is returned. If number of existing PacketSampling requests reaches +// the upper limit, an error is returned. +func (c *Controller) allocateTag(name string) (uint8, error) { + c.runningPacketSamplingsMutex.Lock() + defer c.runningPacketSamplingsMutex.Unlock() + + for _, state := range c.runningPacketSamplings { + if state != nil && state.name == name { + // The packetsampling request has been processed already. + return 0, nil + } + } + for i := minTagNum; i <= maxTagNum; i += 1 { + if _, ok := c.runningPacketSamplings[i]; !ok { + c.runningPacketSamplings[i] = &packetSamplingState{ + name: name, + tag: i, + } + return i, nil + } + } + return 0, fmt.Errorf("number of on-going PacketSampling operations already reached the upper limit: %d", maxTagNum) +} + +func (c *Controller) getUploaderByProtocol(protocol StorageProtocolType) (ftp.UpLoader, error) { + if protocol == sftpProtocol { + return c.sftpUploader, nil + } + return nil, fmt.Errorf("unsupported protocol %s", protocol) +} + +func (c *Controller) generatePacketsPathForServer(name string) string { + return name + ".pcapng" +} + +func (c *Controller) uploadPackets(ps *crdv1alpha1.PacketSampling, outputFile afero.File) error { + klog.V(2).InfoS("Uploading captured packets for PacketSampling", "name", ps.Name) + uploader, err := c.getUploaderByProtocol(sftpProtocol) + if err != nil { + return fmt.Errorf("failed to upload support bundle while getting uploader: %v", err) + } + serverAuth, err := ftp.ParseBundleAuth(ps.Spec.Authentication, c.kubeClient) + if err != nil { + klog.ErrorS(err, "Failed to get authentication defined in the PacketSampling CR", "name", ps.Name, "authentication", ps.Spec.Authentication) + return err + } + cfg := ftp.GenSSHClientConfig(serverAuth.BasicAuthentication.Username, serverAuth.BasicAuthentication.Password) + return uploader.Upload(ps.Spec.FileServer.URL, c.generatePacketsPathForServer(string(ps.UID)), cfg, outputFile) + +} + +// initPacketSampling mark the packetsampling as running and allocate tag for it, then start the sampling. the tag will +// serve as a unique id for concurrent processing. +func (c *Controller) initPacketSampling(ps *crdv1alpha1.PacketSampling) error { + tag, err := c.allocateTag(ps.Name) + if err != nil { + return err + } + if tag == 0 { + return nil + } + err = c.updatePacketSamplingStatus(ps, crdv1alpha1.PacketSamplingRunning, "", 0) + if err != nil { + c.deallocateTag(ps.Name, tag) + return err + } + return c.startPacketSampling(ps, c.runningPacketSamplings[tag]) +} + +func (c *Controller) updatePacketSamplingStatus(ps *crdv1alpha1.PacketSampling, phase crdv1alpha1.PacketSamplingPhase, reason string, numCapturedPackets int32) error { + type PacketSampling struct { + Status crdv1alpha1.PacketSamplingStatus `json:"status,omitempty"` + } + patchData := PacketSampling{Status: crdv1alpha1.PacketSamplingStatus{Phase: phase}} + if phase == crdv1alpha1.PacketSamplingRunning && ps.Status.StartTime == nil { + t := metav1.Now() + patchData.Status.StartTime = &t + } + if reason != "" { + patchData.Status.Reason = reason + } + if numCapturedPackets != 0 { + patchData.Status.NumCapturedPackets = numCapturedPackets + } + if phase == crdv1alpha1.PacketSamplingSucceeded { + patchData.Status.PacketsPath = c.generatePacketsPathForServer(string(ps.UID)) + } + payloads, _ := json.Marshal(patchData) + _, err := c.crdClient.CrdV1alpha1().PacketSamplings().Patch(context.TODO(), ps.Name, types.MergePatchType, payloads, metav1.PatchOptions{}, "status") + return err +} + +func (c *Controller) deallocateTag(name string, tag uint8) { + c.runningPacketSamplingsMutex.Lock() + defer c.runningPacketSamplingsMutex.Unlock() + if state, ok := c.runningPacketSamplings[tag]; ok { + if state != nil && name == state.name { + delete(c.runningPacketSamplings, tag) + } + } +} + +func (c *Controller) getTagForPacketSampling(name string) uint8 { + c.runningPacketSamplingsMutex.Lock() + defer c.runningPacketSamplingsMutex.Unlock() + for tag, state := range c.runningPacketSamplings { + if state != nil && state.name == name { + // The packetsampling request has been processed already. + return tag + } + } + return 0 +} + +// checkPacketSamplingStatus is only called for PacketSamplings in the Running phase +func (c *Controller) checkPacketSamplingStatus(ps *crdv1alpha1.PacketSampling) error { + tag := c.getTagForPacketSampling(ps.Name) + if tag == 0 { + return nil + } + if checkPacketSamplingSucceeded(ps) { + c.deallocateTag(ps.Name, tag) + return c.updatePacketSamplingStatus(ps, crdv1alpha1.PacketSamplingSucceeded, "", 0) + } + + if checkPacketSamplingTimeout(ps) { + c.deallocateTag(ps.Name, tag) + return c.updatePacketSamplingStatus(ps, crdv1alpha1.PacketSamplingFailed, samplingTimeoutReason, 0) + } + return nil +} + +func checkPacketSamplingSucceeded(ps *crdv1alpha1.PacketSampling) bool { + succeeded := false + if ps.Spec.Type == crdv1alpha1.FirstNSampling && ps.Status.NumCapturedPackets == ps.Spec.FirstNSamplingConfig.Number { + succeeded = true + } + return succeeded +} + +func checkPacketSamplingTimeout(ps *crdv1alpha1.PacketSampling) bool { + var timeout time.Duration + if ps.Spec.Timeout != 0 { + timeout = time.Duration(ps.Spec.Timeout) * time.Second + } else { + timeout = defaultTimeoutDuration + } + var startTime time.Time + if ps.Status.StartTime != nil { + startTime = ps.Status.StartTime.Time + } else { + // a fallback that should not be needed in general since we are in the Running phase + // when upgrading Antrea from a previous version, the field would be empty + klog.V(2).InfoS("StartTime field in PacketSampling Status should not be empty", "Traceflow", klog.KObj(ps)) + startTime = ps.CreationTimestamp.Time + } + return startTime.Add(timeout).Before(time.Now()) +} diff --git a/pkg/agent/controller/packetsampling/packetsampling_controller_test.go b/pkg/agent/controller/packetsampling/packetsampling_controller_test.go new file mode 100644 index 00000000000..ec6195d4c65 --- /dev/null +++ b/pkg/agent/controller/packetsampling/packetsampling_controller_test.go @@ -0,0 +1,1012 @@ +// Copyright 2024 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packetsampling + +import ( + "bytes" + "net" + "os" + "reflect" + "testing" + "time" + + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcapgo" + "github.com/spf13/afero" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/klog/v2" + + "antrea.io/libOpenflow/protocol" + + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" + "antrea.io/antrea/pkg/agent/util" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + binding "antrea.io/antrea/pkg/ovs/openflow" + "antrea.io/antrea/pkg/util/k8s" +) + +var ( + pod1IPv4 = "192.168.10.10" + pod2IPv4 = "192.168.11.10" + service1IPv4 = "10.96.0.10" + dstIPv4 = "192.168.99.99" + pod1MAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:0f") + pod2MAC, _ = net.ParseMAC("aa:bb:cc:dd:ee:00") + ofPortPod1 = uint32(1) + ofPortPod2 = uint32(2) + protocolICMPv6 = int32(58) + + pod1 = v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-1", + Namespace: "default", + }, + Status: v1.PodStatus{ + PodIP: pod1IPv4, + }, + } + pod2 = v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-2", + Namespace: "default", + }, + Status: v1.PodStatus{ + PodIP: pod2IPv4, + }, + } + pod3 = v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod-3", + Namespace: "default", + }, + } + + service1 = v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "service-1", + Namespace: "default", + }, + Spec: v1.ServiceSpec{ + ClusterIP: service1IPv4, + }, + } +) + +type fakePacketSamplingController struct { + *Controller + kubeClient kubernetes.Interface + mockController *gomock.Controller + mockOFClient *openflowtest.MockClient + crdClient *fakeversioned.Clientset + crdInformerFactory crdinformers.SharedInformerFactory + informerFactory informers.SharedInformerFactory +} + +func newFakePacketSamplingController(t *testing.T, runtimeObjects []runtime.Object, initObjects []runtime.Object, networkConfig *config.NetworkConfig, nodeConfig *config.NodeConfig) *fakePacketSamplingController { + controller := gomock.NewController(t) + objs := []runtime.Object{ + &pod1, + &pod2, + &pod3, + &service1, + } + objs = append(objs, generateTestSecret()) + if runtimeObjects != nil { + objs = append(objs, runtimeObjects...) + } + kubeClient := fake.NewSimpleClientset(objs...) + mockOFClient := openflowtest.NewMockClient(controller) + crdClient := fakeversioned.NewSimpleClientset(initObjects...) + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0) + packetSamplingInformer := crdInformerFactory.Crd().V1alpha1().PacketSamplings() + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) + serviceInformer := informerFactory.Core().V1().Services() + endpointInformer := informerFactory.Core().V1().Endpoints() + + ifaceStore := interfacestore.NewInterfaceStore() + addPodInterface(ifaceStore, pod1.Namespace, pod1.Name, pod1IPv4, pod1MAC.String(), int32(ofPortPod1)) + addPodInterface(ifaceStore, pod2.Namespace, pod2.Name, pod2IPv4, pod2MAC.String(), int32(ofPortPod2)) + + mockOFClient.EXPECT().RegisterPacketInHandler(gomock.Any(), gomock.Any()).Times(1) + psController := NewPacketSamplingController( + kubeClient, + crdClient, + serviceInformer, + endpointInformer, + packetSamplingInformer, + mockOFClient, + ifaceStore, + nodeConfig, + true, + ) + psController.sftpUploader = &testUploader{} + + return &fakePacketSamplingController{ + Controller: psController, + kubeClient: kubeClient, + mockController: controller, + mockOFClient: mockOFClient, + crdClient: crdClient, + crdInformerFactory: crdInformerFactory, + informerFactory: informerFactory, + } +} + +func addPodInterface(ifaceStore interfacestore.InterfaceStore, podNamespace, podName, podIP, podMac string, ofPort int32) { + containerName := k8s.NamespacedName(podNamespace, podName) + ifIPs := []net.IP{net.ParseIP(podIP)} + mac, _ := net.ParseMAC(podMac) + ifaceStore.AddInterface(&interfacestore.InterfaceConfig{ + IPs: ifIPs, + MAC: mac, + InterfaceName: util.GenerateContainerInterfaceName(podName, podNamespace, containerName), + ContainerInterfaceConfig: &interfacestore.ContainerInterfaceConfig{PodName: podName, PodNamespace: podNamespace, ContainerID: containerName}, + OVSPortConfig: &interfacestore.OVSPortConfig{OFPort: ofPort}, + }) +} + +func TestErrPacketSamplingCRD(t *testing.T) { + ps := &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ps", + UID: "uid", + }, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + }, + Status: crdv1alpha1.PacketSamplingStatus{ + Phase: crdv1alpha1.PacketSamplingRunning, + }, + } + expectedPS := ps + reason := "failed" + expectedPS.Status.Phase = crdv1alpha1.PacketSamplingFailed + expectedPS.Status.Reason = reason + + psc := newFakePacketSamplingController(t, nil, []runtime.Object{ps}, nil, nil) + + err := psc.updatePacketSamplingStatus(ps, crdv1alpha1.PacketSamplingFailed, reason, 0) + require.NoError(t, err) +} + +func TestPreparePacket(t *testing.T) { + pss := []struct { + name string + ps *crdv1alpha1.PacketSampling + intf *interfacestore.InterfaceConfig + receiverOnly bool + expectedPacket *binding.Packet + expectedErr string + }{ + { + name: "empty destination", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps2", UID: "uid2"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + }, + }, + expectedErr: "destination is not specified", + }, + { + name: "ipv4 tcp packet", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps3", UID: "uid3"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + Packet: crdv1alpha1.Packet{ + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{ + SrcPort: 80, + DstPort: 81, + Flags: 11, + }, + }, + }, + }, + }, + expectedPacket: &binding.Packet{ + DestinationIP: net.ParseIP(pod2IPv4), + IPProto: protocol.Type_TCP, + SourcePort: 80, + DestinationPort: 81, + TCPFlags: 11, + }, + }, + { + name: "receiver only with source ip", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps4", UID: "uid4"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + IP: "192.168.12.4", + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Packet: crdv1alpha1.Packet{ + IPHeader: crdv1alpha1.IPHeader{}, + }, + }, + }, + receiverOnly: true, + expectedPacket: &binding.Packet{ + SourceIP: net.ParseIP("192.168.12.4"), + DestinationMAC: pod1MAC, + IPProto: 1, + }, + }, + { + name: "destination Pod without IPv6 address", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps4", UID: "uid4"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + Packet: crdv1alpha1.Packet{ + IPv6Header: &crdv1alpha1.IPv6Header{}, + }, + }, + }, + expectedErr: "destination Pod does not have an IPv6 address", + }, + { + name: "pod to ipv6 packet sampling", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps5", UID: "uid5"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + IP: "2001:db8::68", + }, + Packet: crdv1alpha1.Packet{ + IPv6Header: &crdv1alpha1.IPv6Header{NextHeader: &protocolICMPv6}, + }, + }, + }, + expectedPacket: &binding.Packet{ + IsIPv6: true, + DestinationIP: net.ParseIP("2001:db8::68"), + IPProto: protocol.Type_IPv6ICMP, + }, + }, + { + name: "tcp packet without flags", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps6", UID: "uid6"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + Packet: crdv1alpha1.Packet{ + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{ + SrcPort: 80, + DstPort: 81, + }, + }, + }, + }, + }, + expectedPacket: &binding.Packet{ + DestinationIP: net.ParseIP(pod2IPv4), + IPProto: protocol.Type_TCP, + SourcePort: 80, + DestinationPort: 81, + }, + }, + { + name: "udp packet", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps7", UID: "uid7"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + Packet: crdv1alpha1.Packet{ + TransportHeader: crdv1alpha1.TransportHeader{ + UDP: &crdv1alpha1.UDPHeader{ + SrcPort: 80, + DstPort: 100, + }, + }, + }, + }, + }, + expectedPacket: &binding.Packet{ + DestinationIP: net.ParseIP(pod2IPv4), + IPProto: protocol.Type_UDP, + SourcePort: 80, + DestinationPort: 100, + }, + }, + { + name: "icmp packet", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps8", UID: "uid8"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + Packet: crdv1alpha1.Packet{ + TransportHeader: crdv1alpha1.TransportHeader{ + ICMP: &crdv1alpha1.ICMPEchoRequestHeader{}, + }, + }, + }, + }, + expectedPacket: &binding.Packet{ + DestinationIP: net.ParseIP(pod2IPv4), + IPProto: protocol.Type_ICMP, + }, + }, + { + name: "destination Pod unavailable", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps11", UID: "uid11"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Destination: crdv1alpha1.Destination{ + Pod: "unknown pod", + Namespace: "default", + }, + }, + }, + expectedErr: "failed to get the destination pod default/unknown pod: pods \"unknown pod\"", + }, + { + name: "to service packet", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps12", UID: "uid12"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Service: service1.Name, + Namespace: service1.Namespace, + }, + Packet: crdv1alpha1.Packet{ + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{ + SrcPort: 80, + DstPort: 81, + Flags: 11, + }, + }, + }, + }, + }, + expectedPacket: &binding.Packet{ + DestinationIP: net.ParseIP(service1IPv4).To4(), + IPProto: protocol.Type_TCP, + SourcePort: 80, + DestinationPort: 81, + TCPFlags: 11, + }, + }, + } + for _, ps := range pss { + t.Run(ps.name, func(t *testing.T) { + psc := newFakePacketSamplingController(t, nil, []runtime.Object{ps.ps}, nil, nil) + podInterfaces := psc.interfaceStore.GetContainerInterfacesByPod(pod1.Name, pod1.Namespace) + if ps.intf != nil { + podInterfaces[0] = ps.intf + } + stopCh := make(chan struct{}) + defer close(stopCh) + psc.crdInformerFactory.Start(stopCh) + psc.crdInformerFactory.WaitForCacheSync(stopCh) + psc.informerFactory.Start(stopCh) + psc.informerFactory.WaitForCacheSync(stopCh) + + pkt, err := psc.preparePacket(ps.ps, podInterfaces[0], ps.receiverOnly) + if ps.expectedErr == "" { + require.NoError(t, err) + assert.Equal(t, ps.expectedPacket, pkt) + } else { + assert.ErrorContains(t, err, ps.expectedErr) + assert.Nil(t, pkt) + } + }) + } +} + +func TestSyncPacketSampling(t *testing.T) { + // create test os + defaultFS = afero.NewMemMapFs() + defaultFS.MkdirAll("/tmp/antrea/packetsampling/packets", 0755) + file, err := defaultFS.Create(uidToPath(testUID)) + if err != nil { + t.Fatal("create pcapng file error: ", err) + } + + testWriter, err := pcapgo.NewNgWriter(file, layers.LinkTypeEthernet) + if err != nil { + t.Fatal("create test pcapng writer failed: ", err) + } + + pcs := []struct { + name string + ps *crdv1alpha1.PacketSampling + existingState *packetSamplingState + newState *packetSamplingState + expectedCalls func(mockOFClient *openflowtest.MockClient) + }{ + { + name: "start packetsampling ", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps1", UID: "uid1"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + }, + }, + existingState: &packetSamplingState{ + name: "ps1", + uid: "uid1", + tag: 1, + }, + newState: &packetSamplingState{ + name: "ps1", + uid: "uid1", + tag: 1, + }, + }, + + { + name: "packetsampling in failed phase", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps1", UID: types.UID(testUID)}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + }, + Status: crdv1alpha1.PacketSamplingStatus{ + Phase: crdv1alpha1.PacketSamplingFailed, + }, + }, + existingState: &packetSamplingState{ + name: "ps1", + uid: testUID, + pcapngFile: file, + pcapngWriter: testWriter, + tag: 1, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().UninstallPacketSamplingFlows(uint8(1)) + }, + }, + } + + for _, ps := range pcs { + t.Run(ps.name, func(t *testing.T) { + psc := newFakePacketSamplingController(t, nil, []runtime.Object{ps.ps}, nil, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + psc.crdInformerFactory.Start(stopCh) + psc.crdInformerFactory.WaitForCacheSync(stopCh) + + if ps.existingState != nil { + psc.runningPacketSamplings[ps.existingState.tag] = ps.existingState + } + + if ps.expectedCalls != nil { + ps.expectedCalls(psc.mockOFClient) + } + + err := psc.syncPacketSampling(ps.ps.Name) + require.NoError(t, err) + assert.Equal(t, ps.newState, psc.runningPacketSamplings[ps.existingState.tag]) + }) + } +} + +// TestPacketSamplingControllerRun was used to validate the whole run process is working. it didn't wait for +// the test ps to finished. +func TestPacketSamplingControllerRun(t *testing.T) { + // create test os + defaultFS = afero.NewMemMapFs() + defaultFS.MkdirAll("/tmp/antrea/packetsampling/packets", 0755) + ps := struct { + name string + ps *crdv1alpha1.PacketSampling + newState *packetSamplingState + }{ + name: "start packetsampling", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps1", UID: "uid1"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + }, + }, + newState: &packetSamplingState{tag: 1}, + } + + psc := newFakePacketSamplingController(t, nil, []runtime.Object{ps.ps}, nil, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + psc.crdInformerFactory.Start(stopCh) + psc.crdInformerFactory.WaitForCacheSync(stopCh) + psc.informerFactory.Start(stopCh) + psc.informerFactory.WaitForCacheSync(stopCh) + psc.mockOFClient.EXPECT().InstallPacketSamplingFlows(ps.newState.tag, false, false, + &binding.Packet{DestinationIP: net.ParseIP(pod2.Status.PodIP), IPProto: protocol.Type_ICMP}, + nil, ofPortPod1, crdv1alpha1.DefaultPacketSamplingTimeout) + go psc.Run(stopCh) + time.Sleep(300 * time.Millisecond) +} + +func TestProcessPacketSamplingItem(t *testing.T) { + // create test os + defaultFS = afero.NewMemMapFs() + defaultFS.MkdirAll("/tmp/antrea/packetsampling/packets", 0755) + pc := struct { + ps *crdv1alpha1.PacketSampling + ofPort uint32 + receiverOnly bool + packet *binding.Packet + expected bool + }{ + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps1", UID: "uid1"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + Type: crdv1alpha1.FirstNSampling, + }, + }, + ofPort: ofPortPod1, + packet: &binding.Packet{ + DestinationIP: net.ParseIP(pod2IPv4), + IPProto: 1, + }, + expected: true, + } + + psc := newFakePacketSamplingController(t, nil, []runtime.Object{pc.ps}, nil, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + psc.crdInformerFactory.Start(stopCh) + psc.crdInformerFactory.WaitForCacheSync(stopCh) + + psc.mockOFClient.EXPECT().InstallPacketSamplingFlows(uint8(1), false, pc.receiverOnly, pc.packet, nil, pc.ofPort, uint16(crdv1alpha1.DefaultPacketSamplingTimeout)) + psc.enqueuePacketSampling(pc.ps) + got := psc.processPacketSamplingItem() + assert.Equal(t, pc.expected, got) +} + +func TestValidatePacketSampling(t *testing.T) { + pss := []struct { + name string + ps *crdv1alpha1.PacketSampling + antreaProxyEnabled bool + expectedErr string + }{ + { + name: "AntreaProxy disabled with destination as service", + ps: &crdv1alpha1.PacketSampling{ + Spec: crdv1alpha1.PacketSamplingSpec{ + Destination: crdv1alpha1.Destination{ + Service: "svcTest", + }, + }, + }, + expectedErr: "using Service destination requires AntreaProxy feature enabled", + }, + } + + for _, pt := range pss { + t.Run(pt.name, func(t *testing.T) { + psc := newFakePacketSamplingController(t, nil, []runtime.Object{pt.ps}, nil, nil) + psc.enableAntreaProxy = pt.antreaProxyEnabled + err := psc.validatePacketSampling(pt.ps) + assert.ErrorContains(t, err, pt.expectedErr) + }) + } +} + +func TestStartPacketSampling(t *testing.T) { + defaultFS = afero.NewMemMapFs() + defaultFS.MkdirAll(packetDirectory, 0755) + tcs := []struct { + name string + ps *crdv1alpha1.PacketSampling + state *packetSamplingState + ofPort uint32 + receiverOnly bool + packet *binding.Packet + expectedCalls func(mockOFClient *openflowtest.MockClient) + nodeConfig *config.NodeConfig + expectedErr string + expectedErrLog string + }{ + { + name: "Pod-to-Pod PacketSampling", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps1", UID: "uid1"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod2.Namespace, + Pod: pod2.Name, + }, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + }, + + Status: crdv1alpha1.PacketSamplingStatus{ + Phase: crdv1alpha1.PacketSamplingRunning, + }, + }, + state: &packetSamplingState{tag: 1}, + ofPort: ofPortPod1, + packet: &binding.Packet{ + SourceIP: net.ParseIP(pod1IPv4), + SourceMAC: pod1MAC, + DestinationIP: net.ParseIP(pod2IPv4), + DestinationMAC: pod2MAC, + IPProto: 1, + TTL: 64, + ICMPType: 8, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().InstallPacketSamplingFlows(uint8(1), false, false, + &binding.Packet{ + DestinationIP: net.ParseIP(pod2IPv4), + IPProto: 1, + }, + nil, ofPortPod1, crdv1alpha1.DefaultPacketSamplingTimeout) + }, + }, + { + name: "Pod-to-IPv4 packetsampling", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps2", UID: "uid2"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + IP: dstIPv4, + }, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + }, + Status: crdv1alpha1.PacketSamplingStatus{ + Phase: crdv1alpha1.PacketSamplingRunning, + }, + }, + state: &packetSamplingState{tag: 2}, + ofPort: ofPortPod1, + packet: &binding.Packet{ + SourceIP: net.ParseIP(pod1IPv4), + SourceMAC: pod1MAC, + DestinationIP: net.ParseIP(dstIPv4), + IPProto: 1, + TTL: 64, + ICMPType: 8, + }, + expectedCalls: func(mockOFClient *openflowtest.MockClient) { + mockOFClient.EXPECT().InstallPacketSamplingFlows(uint8(2), true, false, &binding.Packet{ + DestinationIP: net.ParseIP(dstIPv4), + IPProto: 1, + }, nil, ofPortPod1, crdv1alpha1.DefaultPacketSamplingTimeout) + }, + }, + } + + for _, tt := range tcs { + t.Run(tt.name, func(t *testing.T) { + tfc := newFakePacketSamplingController(t, nil, []runtime.Object{tt.ps}, nil, tt.nodeConfig) + if tt.expectedCalls != nil { + tt.expectedCalls(tfc.mockOFClient) + } + + bufWriter := bytes.NewBuffer(nil) + klog.SetOutput(bufWriter) + klog.LogToStderr(false) + defer func() { + klog.SetOutput(os.Stderr) + klog.LogToStderr(true) + }() + + err := tfc.startPacketSampling(tt.ps, tt.state) + if tt.expectedErr != "" { + assert.ErrorContains(t, err, tt.expectedErr) + } else { + require.NoError(t, err) + } + if tt.expectedErrLog != "" { + assert.Contains(t, bufWriter.String(), tt.expectedErrLog) + } + }) + } +} + +func TestPrepareEndpointsPackets(t *testing.T) { + pss := []struct { + name string + ps *crdv1alpha1.PacketSampling + expectedPackets []binding.Packet + objs []runtime.Object + expectedErr string + }{ + { + name: "svc-not-exist", + expectedErr: "service \"svc1\" not found", + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps2", UID: "uid2"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod1.Namespace, + Service: "svc1", + }, + Packet: crdv1alpha1.Packet{ + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{ + DstPort: 80, + }, + }, + }, + }, + }, + }, + { + name: "ep-not-exist", + expectedErr: "endpoints \"svc1\" not found", + objs: []runtime.Object{&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: pod1.Namespace, + Name: "svc1", + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.IntOrString{ + Type: intstr.Type(intstr.Int), + IntVal: 8080, + }, + }, + }, + }, + }}, + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps2", UID: "uid2"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod1.Namespace, + Service: "svc1", + }, + Packet: crdv1alpha1.Packet{ + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{ + DstPort: 80, + }, + }, + }, + }, + }, + }, + { + name: "tcp-2-backends-svc", + expectedPackets: []binding.Packet{ + { + DestinationIP: net.ParseIP(pod1.Status.PodIP), + DestinationPort: 8080, + IPProto: protocol.Type_TCP, + }, + { + DestinationIP: net.ParseIP(pod2.Status.PodIP), + DestinationPort: 8080, + IPProto: protocol.Type_TCP, + }, + }, + objs: []runtime.Object{&v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: pod1.Namespace, + Name: "svc1", + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{ + { + Name: "http", + Port: 80, + TargetPort: intstr.IntOrString{ + Type: intstr.Type(intstr.Int), + IntVal: 8080, + }, + }, + }, + }, + }, &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: pod1.Namespace, + Name: "svc1", + }, + Subsets: []v1.EndpointSubset{ + { + Addresses: []v1.EndpointAddress{ + { + IP: pod1.Status.PodIP, + }, + { + IP: pod2.Status.PodIP, + }, + }, + Ports: []v1.EndpointPort{ + { + Name: "http", + Port: 8080, + }, + }, + }, + }, + }}, + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{Name: "ps1", UID: "uid1"}, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: pod1.Namespace, + Pod: pod1.Name, + }, + Destination: crdv1alpha1.Destination{ + Namespace: pod1.Namespace, + Service: "svc1", + }, + Packet: crdv1alpha1.Packet{ + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{ + DstPort: 80, + }, + }, + }, + }, + }, + }, + } + + for _, ps := range pss { + t.Run(ps.name, func(t *testing.T) { + psc := newFakePacketSamplingController(t, ps.objs, []runtime.Object{ps.ps}, nil, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + psc.crdInformerFactory.Start(stopCh) + psc.crdInformerFactory.WaitForCacheSync(stopCh) + psc.informerFactory.Start(stopCh) + psc.informerFactory.WaitForCacheSync(stopCh) + + pkts, err := psc.genEndpointMatchPackets(ps.ps) + if ps.expectedErr == "" { + require.NoError(t, err) + if !reflect.DeepEqual(ps.expectedPackets, pkts) { + t.Errorf("expected packets: %+v, got: %+v", ps.expectedPackets, pkts) + } + + } else { + assert.ErrorContains(t, err, ps.expectedErr) + assert.Nil(t, pkts) + } + }) + } +} diff --git a/pkg/agent/openflow/packetsampling.go b/pkg/agent/openflow/packetsampling.go new file mode 100644 index 00000000000..9605ebfe7f6 --- /dev/null +++ b/pkg/agent/openflow/packetsampling.go @@ -0,0 +1,55 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package openflow + +import ( + "antrea.io/libOpenflow/openflow15" + + binding "antrea.io/antrea/pkg/ovs/openflow" +) + +type featurePacketSampling struct { + cachedFlows *flowCategoryCache +} + +func (f *featurePacketSampling) getFeatureName() string { + return "PacketSampling" +} + +func newFeaturePacketSampling() *featurePacketSampling { + return &featurePacketSampling{ + cachedFlows: newFlowCategoryCache(), + } +} + +func (f *featurePacketSampling) initFlows() []*openflow15.FlowMod { + return []*openflow15.FlowMod{} +} + +func (f *featurePacketSampling) replayFlows() []*openflow15.FlowMod { + return []*openflow15.FlowMod{} +} + +func (f *featurePacketSampling) initGroups() []binding.OFEntry { + return nil +} + +func (f *featurePacketSampling) replayGroups() []binding.OFEntry { + return nil +} + +func (f *featurePacketSampling) replayMeters() []binding.OFEntry { + return nil +} diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_packetsampling.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_packetsampling.go new file mode 100644 index 00000000000..ddd5d60d88a --- /dev/null +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_packetsampling.go @@ -0,0 +1,130 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + v1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakePacketSamplings implements PacketSamplingInterface +type FakePacketSamplings struct { + Fake *FakeCrdV1alpha1 +} + +var packetsamplingsResource = v1alpha1.SchemeGroupVersion.WithResource("packetsamplings") + +var packetsamplingsKind = v1alpha1.SchemeGroupVersion.WithKind("PacketSampling") + +// Get takes name of the packetSampling, and returns the corresponding packetSampling object, and an error if there is any. +func (c *FakePacketSamplings) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.PacketSampling, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootGetAction(packetsamplingsResource, name), &v1alpha1.PacketSampling{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PacketSampling), err +} + +// List takes label and field selectors, and returns the list of PacketSamplings that match those selectors. +func (c *FakePacketSamplings) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.PacketSamplingList, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootListAction(packetsamplingsResource, packetsamplingsKind, opts), &v1alpha1.PacketSamplingList{}) + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &v1alpha1.PacketSamplingList{ListMeta: obj.(*v1alpha1.PacketSamplingList).ListMeta} + for _, item := range obj.(*v1alpha1.PacketSamplingList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested packetSamplings. +func (c *FakePacketSamplings) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewRootWatchAction(packetsamplingsResource, opts)) +} + +// Create takes the representation of a packetSampling and creates it. Returns the server's representation of the packetSampling, and an error, if there is any. +func (c *FakePacketSamplings) Create(ctx context.Context, packetSampling *v1alpha1.PacketSampling, opts v1.CreateOptions) (result *v1alpha1.PacketSampling, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootCreateAction(packetsamplingsResource, packetSampling), &v1alpha1.PacketSampling{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PacketSampling), err +} + +// Update takes the representation of a packetSampling and updates it. Returns the server's representation of the packetSampling, and an error, if there is any. +func (c *FakePacketSamplings) Update(ctx context.Context, packetSampling *v1alpha1.PacketSampling, opts v1.UpdateOptions) (result *v1alpha1.PacketSampling, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateAction(packetsamplingsResource, packetSampling), &v1alpha1.PacketSampling{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PacketSampling), err +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakePacketSamplings) UpdateStatus(ctx context.Context, packetSampling *v1alpha1.PacketSampling, opts v1.UpdateOptions) (*v1alpha1.PacketSampling, error) { + obj, err := c.Fake. + Invokes(testing.NewRootUpdateSubresourceAction(packetsamplingsResource, "status", packetSampling), &v1alpha1.PacketSampling{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PacketSampling), err +} + +// Delete takes name of the packetSampling and deletes it. Returns an error if one occurs. +func (c *FakePacketSamplings) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewRootDeleteActionWithOptions(packetsamplingsResource, name, opts), &v1alpha1.PacketSampling{}) + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakePacketSamplings) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewRootDeleteCollectionAction(packetsamplingsResource, listOpts) + + _, err := c.Fake.Invokes(action, &v1alpha1.PacketSamplingList{}) + return err +} + +// Patch applies the patch and returns the patched packetSampling. +func (c *FakePacketSamplings) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.PacketSampling, err error) { + obj, err := c.Fake. + Invokes(testing.NewRootPatchSubresourceAction(packetsamplingsResource, name, pt, data, subresources...), &v1alpha1.PacketSampling{}) + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.PacketSampling), err +} diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/packetsampling.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/packetsampling.go new file mode 100644 index 00000000000..5eb8ca4de60 --- /dev/null +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/packetsampling.go @@ -0,0 +1,182 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by client-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + "time" + + v1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + scheme "antrea.io/antrea/pkg/client/clientset/versioned/scheme" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// PacketSamplingsGetter has a method to return a PacketSamplingInterface. +// A group's client should implement this interface. +type PacketSamplingsGetter interface { + PacketSamplings() PacketSamplingInterface +} + +// PacketSamplingInterface has methods to work with PacketSampling resources. +type PacketSamplingInterface interface { + Create(ctx context.Context, packetSampling *v1alpha1.PacketSampling, opts v1.CreateOptions) (*v1alpha1.PacketSampling, error) + Update(ctx context.Context, packetSampling *v1alpha1.PacketSampling, opts v1.UpdateOptions) (*v1alpha1.PacketSampling, error) + UpdateStatus(ctx context.Context, packetSampling *v1alpha1.PacketSampling, opts v1.UpdateOptions) (*v1alpha1.PacketSampling, error) + Delete(ctx context.Context, name string, opts v1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error + Get(ctx context.Context, name string, opts v1.GetOptions) (*v1alpha1.PacketSampling, error) + List(ctx context.Context, opts v1.ListOptions) (*v1alpha1.PacketSamplingList, error) + Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.PacketSampling, err error) + PacketSamplingExpansion +} + +// packetSamplings implements PacketSamplingInterface +type packetSamplings struct { + client rest.Interface +} + +// newPacketSamplings returns a PacketSamplings +func newPacketSamplings(c *CrdV1alpha1Client) *packetSamplings { + return &packetSamplings{ + client: c.RESTClient(), + } +} + +// Get takes name of the packetSampling, and returns the corresponding packetSampling object, and an error if there is any. +func (c *packetSamplings) Get(ctx context.Context, name string, options v1.GetOptions) (result *v1alpha1.PacketSampling, err error) { + result = &v1alpha1.PacketSampling{} + err = c.client.Get(). + Resource("packetsamplings"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of PacketSamplings that match those selectors. +func (c *packetSamplings) List(ctx context.Context, opts v1.ListOptions) (result *v1alpha1.PacketSamplingList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1alpha1.PacketSamplingList{} + err = c.client.Get(). + Resource("packetsamplings"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested packetSamplings. +func (c *packetSamplings) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Resource("packetsamplings"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a packetSampling and creates it. Returns the server's representation of the packetSampling, and an error, if there is any. +func (c *packetSamplings) Create(ctx context.Context, packetSampling *v1alpha1.PacketSampling, opts v1.CreateOptions) (result *v1alpha1.PacketSampling, err error) { + result = &v1alpha1.PacketSampling{} + err = c.client.Post(). + Resource("packetsamplings"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(packetSampling). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a packetSampling and updates it. Returns the server's representation of the packetSampling, and an error, if there is any. +func (c *packetSamplings) Update(ctx context.Context, packetSampling *v1alpha1.PacketSampling, opts v1.UpdateOptions) (result *v1alpha1.PacketSampling, err error) { + result = &v1alpha1.PacketSampling{} + err = c.client.Put(). + Resource("packetsamplings"). + Name(packetSampling.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(packetSampling). + Do(ctx). + Into(result) + return +} + +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *packetSamplings) UpdateStatus(ctx context.Context, packetSampling *v1alpha1.PacketSampling, opts v1.UpdateOptions) (result *v1alpha1.PacketSampling, err error) { + result = &v1alpha1.PacketSampling{} + err = c.client.Put(). + Resource("packetsamplings"). + Name(packetSampling.Name). + SubResource("status"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(packetSampling). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the packetSampling and deletes it. Returns an error if one occurs. +func (c *packetSamplings) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + return c.client.Delete(). + Resource("packetsamplings"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *packetSamplings) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Resource("packetsamplings"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched packetSampling. +func (c *packetSamplings) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *v1alpha1.PacketSampling, err error) { + result = &v1alpha1.PacketSampling{} + err = c.client.Patch(pt). + Resource("packetsamplings"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/client/informers/externalversions/crd/v1alpha1/packetsampling.go b/pkg/client/informers/externalversions/crd/v1alpha1/packetsampling.go new file mode 100644 index 00000000000..24f4e6c2975 --- /dev/null +++ b/pkg/client/informers/externalversions/crd/v1alpha1/packetsampling.go @@ -0,0 +1,87 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by informer-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + "context" + time "time" + + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + versioned "antrea.io/antrea/pkg/client/clientset/versioned" + internalinterfaces "antrea.io/antrea/pkg/client/informers/externalversions/internalinterfaces" + v1alpha1 "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// PacketSamplingInformer provides access to a shared informer and lister for +// PacketSamplings. +type PacketSamplingInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1alpha1.PacketSamplingLister +} + +type packetSamplingInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc +} + +// NewPacketSamplingInformer constructs a new informer for PacketSampling type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewPacketSamplingInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredPacketSamplingInformer(client, resyncPeriod, indexers, nil) +} + +// NewFilteredPacketSamplingInformer constructs a new informer for PacketSampling type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredPacketSamplingInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options v1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CrdV1alpha1().PacketSamplings().List(context.TODO(), options) + }, + WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.CrdV1alpha1().PacketSamplings().Watch(context.TODO(), options) + }, + }, + &crdv1alpha1.PacketSampling{}, + resyncPeriod, + indexers, + ) +} + +func (f *packetSamplingInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredPacketSamplingInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *packetSamplingInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&crdv1alpha1.PacketSampling{}, f.defaultInformer) +} + +func (f *packetSamplingInformer) Lister() v1alpha1.PacketSamplingLister { + return v1alpha1.NewPacketSamplingLister(f.Informer().GetIndexer()) +} diff --git a/pkg/client/listers/crd/v1alpha1/packetsampling.go b/pkg/client/listers/crd/v1alpha1/packetsampling.go new file mode 100644 index 00000000000..6c2d7abf8a0 --- /dev/null +++ b/pkg/client/listers/crd/v1alpha1/packetsampling.go @@ -0,0 +1,66 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Code generated by lister-gen. DO NOT EDIT. + +package v1alpha1 + +import ( + v1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// PacketSamplingLister helps list PacketSamplings. +// All objects returned here must be treated as read-only. +type PacketSamplingLister interface { + // List lists all PacketSamplings in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1alpha1.PacketSampling, err error) + // Get retrieves the PacketSampling from the index for a given name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1alpha1.PacketSampling, error) + PacketSamplingListerExpansion +} + +// packetSamplingLister implements the PacketSamplingLister interface. +type packetSamplingLister struct { + indexer cache.Indexer +} + +// NewPacketSamplingLister returns a new PacketSamplingLister. +func NewPacketSamplingLister(indexer cache.Indexer) PacketSamplingLister { + return &packetSamplingLister{indexer: indexer} +} + +// List lists all PacketSamplings in the indexer. +func (s *packetSamplingLister) List(selector labels.Selector) (ret []*v1alpha1.PacketSampling, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1alpha1.PacketSampling)) + }) + return ret, err +} + +// Get retrieves the PacketSampling from the index for a given name. +func (s *packetSamplingLister) Get(name string) (*v1alpha1.PacketSampling, error) { + obj, exists, err := s.indexer.GetByKey(name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1alpha1.Resource("packetsampling"), name) + } + return obj.(*v1alpha1.PacketSampling), nil +} diff --git a/pkg/controller/packetsampling/validate.go b/pkg/controller/packetsampling/validate.go new file mode 100644 index 00000000000..5392216e750 --- /dev/null +++ b/pkg/controller/packetsampling/validate.go @@ -0,0 +1,103 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packetsampling + +import ( + "encoding/json" + "fmt" + "net" + + admv1 "k8s.io/api/admission/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" +) + +func Validate(review *admv1.AdmissionReview) *admv1.AdmissionResponse { + newResponse := func(allowed bool, deniedReason string) *admv1.AdmissionResponse { + resp := &admv1.AdmissionResponse{ + UID: review.Request.UID, + Allowed: allowed, + } + if !allowed { + resp.Result = &metav1.Status{ + Message: deniedReason, + } + } + return resp + } + + klog.V(2).InfoS("Validating PacketSampling", "request", review.Request) + + var newObj crdv1alpha1.PacketSampling + if review.Request.Object.Raw != nil { + if err := json.Unmarshal(review.Request.Object.Raw, &newObj); err != nil { + klog.ErrorS(err, "Error de-serializing current Traceflow") + return newResponse(false, err.Error()) + } + } + + switch review.Request.Operation { + case admv1.Create: + klog.V(2).InfoS("Validating CREATE request for PacketSampling", "name", newObj.Name) + allowed, deniedReason := validate(&newObj) + return newResponse(allowed, deniedReason) + case admv1.Update: + klog.V(2).InfoS("Validating UPDATE request for PacketSampling", "name", newObj.Name) + allowed, deniedReason := validate(&newObj) + return newResponse(allowed, deniedReason) + default: + err := fmt.Errorf("invalid request operation %s for Traceflow", review.Request.Operation) + klog.ErrorS(err, "Failed to validate PacketSampling", "name", newObj.Name) + return newResponse(false, err.Error()) + } +} + +func validate(ps *crdv1alpha1.PacketSampling) (allowed bool, deniedReason string) { + if ps.Spec.Source.Pod == "" && ps.Spec.Destination.Pod == "" { + return false, fmt.Sprintf("PacketSampling %s has neither source nor destination Pod specified", ps.Name) + } + + if ps.Spec.Type != crdv1alpha1.FirstNSampling { + return false, fmt.Sprintf("PacketSampling %s has invalid type %s (supported are [%s])", ps.Name, ps.Spec.Type, crdv1alpha1.FirstNSampling) + } + + if ps.Spec.FirstNSamplingConfig == nil { + return false, fmt.Sprintf("PacketSampling %s has no FirstNSamplingConfig", ps.Name) + } + + isIPv6 := ps.Spec.Packet.IPv6Header != nil + if ps.Spec.Source.IP != "" { + sourceIP := net.ParseIP(ps.Spec.Source.IP) + if sourceIP == nil { + return false, "source IP is not valid" + } + if isIPv6 != (sourceIP.To4() == nil) { + return false, "source IP does not match the IP header family" + } + } + + if ps.Spec.Destination.IP != "" { + destIP := net.ParseIP(ps.Spec.Destination.IP) + if destIP == nil { + return false, "destination IP is not valid" + } + if isIPv6 != (destIP.To4() == nil) { + return false, "destination IP does not match the IP header family" + } + } + return true, "" +} diff --git a/pkg/controller/packetsampling/validate_test.go b/pkg/controller/packetsampling/validate_test.go new file mode 100644 index 00000000000..48668b88e30 --- /dev/null +++ b/pkg/controller/packetsampling/validate_test.go @@ -0,0 +1,201 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package packetsampling + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + admv1 "k8s.io/api/admission/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" +) + +func TestControllerValidate(t *testing.T) { + tests := []struct { + name string + + // input + oldSpec *crdv1alpha1.PacketSamplingSpec + newSpec *crdv1alpha1.PacketSamplingSpec + + // expected output + allowed bool + deniedReason string + }{ + { + name: "Traceflow should have either source or destination Pod assigned", + newSpec: &crdv1alpha1.PacketSamplingSpec{}, + deniedReason: "PacketSampling ps has neither source nor destination Pod specified", + }, + { + name: "Must assign sampling type", + newSpec: &crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: "test-ns", + Pod: "test-pod", + }, + }, + deniedReason: "PacketSampling ps has invalid type (supported are [FirstNSampling])", + }, + { + name: "FistNSampling config not set", + newSpec: &crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: "test-ns", + Pod: "test-pod", + }, + Type: crdv1alpha1.FirstNSampling, + }, + deniedReason: "PacketSampling ps has no FirstNSamplingConfig", + }, + { + name: "Source IP family does not match", + newSpec: &crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + IP: "127.0.0.1", + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 4, + }, + Destination: crdv1alpha1.Destination{ + Namespace: "test-ns", + Pod: "test-pod", + }, + Packet: crdv1alpha1.Packet{ + IPv6Header: &crdv1alpha1.IPv6Header{ + HopLimit: 1, + }, + }, + }, + allowed: false, + deniedReason: "source IP does not match the IP header family", + }, + { + name: "Destination IP family does not match", + newSpec: &crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: "test-ns", + Pod: "test-pod", + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 4, + }, + Destination: crdv1alpha1.Destination{ + IP: "fe80::aede:48ff:fe00:1122", + }, + Packet: crdv1alpha1.Packet{}, + }, + allowed: false, + deniedReason: "destination IP does not match the IP header family", + }, + { + name: "Destination IP not valid", + newSpec: &crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: "test-ns", + Pod: "test-pod", + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 4, + }, + Destination: crdv1alpha1.Destination{ + IP: "aaa:111", + }, + Packet: crdv1alpha1.Packet{}, + }, + allowed: false, + deniedReason: "destination IP is not valid", + }, + { + name: "source IP not valid", + newSpec: &crdv1alpha1.PacketSamplingSpec{ + Destination: crdv1alpha1.Destination{ + Namespace: "test-ns", + Pod: "test-pod", + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 4, + }, + Source: crdv1alpha1.Source{ + IP: "aaa:111", + }, + Packet: crdv1alpha1.Packet{}, + }, + allowed: false, + deniedReason: "source IP is not valid", + }, + { + name: "Valid request", + newSpec: &crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: "test-ns", + Pod: "test-pod", + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 4, + }, + }, + allowed: true, + }, + } + for _, ps := range tests { + t.Run(ps.name, func(t *testing.T) { + var request *admv1.AdmissionRequest + if ps.oldSpec != nil && ps.newSpec != nil { + request = &admv1.AdmissionRequest{ + Operation: admv1.Update, + OldObject: toRawExtension(ps.oldSpec), + Object: toRawExtension(ps.newSpec), + } + } else if ps.newSpec != nil { + request = &admv1.AdmissionRequest{ + Operation: admv1.Create, + Object: toRawExtension(ps.newSpec), + } + } + review := &admv1.AdmissionReview{ + Request: request, + } + + expectedResponse := &admv1.AdmissionResponse{ + Allowed: ps.allowed, + } + if !ps.allowed { + expectedResponse.Result = &metav1.Status{ + Message: ps.deniedReason, + } + } + + response := Validate(review) + assert.Equal(t, expectedResponse, response) + }) + } +} + +func toRawExtension(spec *crdv1alpha1.PacketSamplingSpec) runtime.RawExtension { + ps := &crdv1alpha1.PacketSampling{Spec: *spec} + ps.Name = "ps" + raw, _ := json.Marshal(ps) + return runtime.RawExtension{Raw: raw} +} diff --git a/pkg/util/ftp/auth.go b/pkg/util/ftp/auth.go new file mode 100644 index 00000000000..2900c0e4812 --- /dev/null +++ b/pkg/util/ftp/auth.go @@ -0,0 +1,102 @@ +// Copyright 2024 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ftp + +import ( + "bytes" + "context" + "fmt" + "time" + + "golang.org/x/crypto/ssh" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clientset "k8s.io/client-go/kubernetes" + + "antrea.io/antrea/pkg/apis/controlplane" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" +) + +const ( + secretKeyWithAPIKey = "apikey" + secretKeyWithBearerToken = "token" + secretKeyWithUsername = "username" + secretKeyWithPassword = "password" +) + +// GenSSHClientConfig generates ssh.ClientConfig from username and password +func GenSSHClientConfig(username, password string) *ssh.ClientConfig { + cfg := &ssh.ClientConfig{ + User: username, + Auth: []ssh.AuthMethod{ssh.Password(password)}, + // #nosec G106: skip host key check here and users can specify their own checks if needed + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + Timeout: time.Second, + } + return cfg +} + +// ParseBundleAuth returns the authentication from the Secret provided in BundleServerAuthConfiguration. +// The authentication is stored in the Secret Data with a key decided by the AuthType, and encoded using base64. +func ParseBundleAuth(authentication crdv1alpha1.BundleServerAuthConfiguration, kubeClient clientset.Interface) (*controlplane.BundleServerAuthConfiguration, error) { + secretReference := authentication.AuthSecret + if secretReference == nil { + return nil, fmt.Errorf("authentication is not specified") + } + secret, err := kubeClient.CoreV1().Secrets(secretReference.Namespace).Get(context.TODO(), secretReference.Name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("unable to get Secret with name %s in Namespace %s: %v", secretReference.Name, secretReference.Namespace, err) + } + parseAuthValue := func(secretData map[string][]byte, key string) (string, error) { + authValue, found := secret.Data[key] + if !found { + return "", fmt.Errorf("not found authentication in Secret %s/%s with key %s", secretReference.Namespace, secretReference.Name, key) + } + return bytes.NewBuffer(authValue).String(), nil + } + switch authentication.AuthType { + case crdv1alpha1.APIKey: + value, err := parseAuthValue(secret.Data, secretKeyWithAPIKey) + if err != nil { + return nil, err + } + return &controlplane.BundleServerAuthConfiguration{ + APIKey: value, + }, nil + case crdv1alpha1.BearerToken: + value, err := parseAuthValue(secret.Data, secretKeyWithBearerToken) + if err != nil { + return nil, err + } + return &controlplane.BundleServerAuthConfiguration{ + BearerToken: value, + }, nil + case crdv1alpha1.BasicAuthentication: + username, err := parseAuthValue(secret.Data, secretKeyWithUsername) + if err != nil { + return nil, err + } + password, err := parseAuthValue(secret.Data, secretKeyWithPassword) + if err != nil { + return nil, err + } + return &controlplane.BundleServerAuthConfiguration{ + BasicAuthentication: &controlplane.BasicAuthentication{ + Username: username, + Password: password, + }, + }, nil + } + return nil, fmt.Errorf("unsupported authentication type %s", authentication.AuthType) +} diff --git a/pkg/util/ftp/auth_test.go b/pkg/util/ftp/auth_test.go new file mode 100644 index 00000000000..f2d4a3dc0bf --- /dev/null +++ b/pkg/util/ftp/auth_test.go @@ -0,0 +1,183 @@ +// Copyright 2024 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ftp + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + + "antrea.io/antrea/pkg/apis/controlplane" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" +) + +const ( + informerDefaultResync = 30 * time.Second + + testKeyString = "it is a valid API key" + testTokenString = "it is a valid token" +) + +type secretConfig struct { + name string + data map[string][]byte +} + +func prepareSecrets(ns string, secretConfigs []secretConfig) []*corev1.Secret { + secrets := make([]*corev1.Secret, 0) + for _, s := range secretConfigs { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: s.name, + Namespace: ns, + }, + Data: s.data, + } + secrets = append(secrets, secret) + } + return secrets +} + +type testClient struct { + client kubernetes.Interface + informerFactory informers.SharedInformerFactory +} + +func (c *testClient) start(stopCh <-chan struct{}) { + c.informerFactory.Start(stopCh) +} + +func (c *testClient) waitForSync(stopCh <-chan struct{}) { + c.informerFactory.WaitForCacheSync(stopCh) +} + +func newTestClient(coreObjects []runtime.Object, crdObjects []runtime.Object) *testClient { + client := fake.NewSimpleClientset(coreObjects...) + return &testClient{ + client: client, + informerFactory: informers.NewSharedInformerFactory(client, informerDefaultResync), + } +} + +func TestParseBundleAuth(t *testing.T) { + ns := "ns-auth" + apiKey := testKeyString + token := testTokenString + usr := "user" + pwd := "pwd123456" + var secretObjects []runtime.Object + for _, s := range prepareSecrets(ns, []secretConfig{ + {name: "s1", data: map[string][]byte{secretKeyWithAPIKey: []byte(apiKey)}}, + {name: "s2", data: map[string][]byte{secretKeyWithBearerToken: []byte(token)}}, + {name: "s3", data: map[string][]byte{secretKeyWithUsername: []byte(usr), secretKeyWithPassword: []byte(pwd)}}, + {name: "invalid-base64", data: map[string][]byte{secretKeyWithAPIKey: []byte("invalid string to decode with base64")}}, + {name: "invalid-secret", data: map[string][]byte{"unknown": []byte(apiKey)}}, + }) { + secretObjects = append(secretObjects, s) + } + + testClient := newTestClient(secretObjects, nil) + stopCh := make(chan struct{}) + testClient.start(stopCh) + testClient.waitForSync(stopCh) + + for _, tc := range []struct { + authentication v1alpha1.BundleServerAuthConfiguration + expectedError string + expectedAuth *controlplane.BundleServerAuthConfiguration + }{ + { + authentication: v1alpha1.BundleServerAuthConfiguration{ + AuthType: v1alpha1.APIKey, + AuthSecret: &corev1.SecretReference{ + Namespace: ns, + Name: "s1", + }, + }, + expectedAuth: &controlplane.BundleServerAuthConfiguration{ + APIKey: testKeyString, + }, + }, + { + authentication: v1alpha1.BundleServerAuthConfiguration{ + AuthType: v1alpha1.BearerToken, + AuthSecret: &corev1.SecretReference{ + Namespace: ns, + Name: "s2", + }, + }, + expectedAuth: &controlplane.BundleServerAuthConfiguration{ + BearerToken: testTokenString, + }, + }, + { + authentication: v1alpha1.BundleServerAuthConfiguration{ + AuthType: v1alpha1.BasicAuthentication, + AuthSecret: &corev1.SecretReference{ + Namespace: ns, + Name: "s3", + }, + }, + expectedAuth: &controlplane.BundleServerAuthConfiguration{ + BasicAuthentication: &controlplane.BasicAuthentication{ + Username: usr, + Password: pwd, + }, + }, + }, + { + authentication: v1alpha1.BundleServerAuthConfiguration{ + AuthType: v1alpha1.BearerToken, + AuthSecret: &corev1.SecretReference{ + Namespace: ns, + Name: "invalid-secret", + }, + }, + expectedError: fmt.Sprintf("not found authentication in Secret %s/invalid-secret with key %s", ns, secretKeyWithBearerToken), + }, + { + authentication: v1alpha1.BundleServerAuthConfiguration{ + AuthType: v1alpha1.BearerToken, + AuthSecret: &corev1.SecretReference{ + Namespace: ns, + Name: "not-exist", + }, + }, + expectedError: fmt.Sprintf("unable to get Secret with name not-exist in Namespace %s", ns), + }, + { + authentication: v1alpha1.BundleServerAuthConfiguration{ + AuthType: v1alpha1.APIKey, + AuthSecret: nil, + }, + expectedError: "authentication is not specified", + }, + } { + auth, err := ParseBundleAuth(tc.authentication, testClient.client) + if tc.expectedError != "" { + assert.Contains(t, err.Error(), tc.expectedError) + } else { + assert.Equal(t, tc.expectedAuth, auth) + } + } +} diff --git a/pkg/util/ftp/ftp.go b/pkg/util/ftp/ftp.go new file mode 100644 index 00000000000..7fd2feaa6a2 --- /dev/null +++ b/pkg/util/ftp/ftp.go @@ -0,0 +1,112 @@ +// Copyright 2024 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ftp + +import ( + "fmt" + "io" + "net/url" + "path" + "time" + + "github.com/pkg/sftp" + "github.com/spf13/afero" + "golang.org/x/crypto/ssh" + "k8s.io/klog/v2" +) + +const ( + uploadToFileServerTries = 5 + uploadToFileServerRetryDelay = 5 * time.Second +) + +func parseFTPUploadUrl(uploadUrl string) (*url.URL, error) { + parsedURL, err := url.Parse(uploadUrl) + if err != nil { + parsedURL, err = url.Parse("sftp://" + uploadUrl) + if err != nil { + return nil, err + } + } + if parsedURL.Scheme != "sftp" { + return nil, fmt.Errorf("not sftp protocol") + } + return parsedURL, nil +} + +type UpLoader interface { + // Upload uploads a file to the target sftp address using ssh config. + Upload(url string, fileName string, config *ssh.ClientConfig, outputFile afero.File) error +} + +type SftpUploader struct { +} + +func (uploader *SftpUploader) Upload(url string, fileName string, config *ssh.ClientConfig, outputFile afero.File) error { + if _, err := outputFile.Seek(0, 0); err != nil { + return fmt.Errorf("failed to upload to file server while setting offset: %v", err) + } + // url should be like: 10.92.23.154:22/path or sftp://10.92.23.154:22/path + parsedURL, err := parseFTPUploadUrl(url) + if err != nil { + return fmt.Errorf("failed to upload file while parsing upload URL: %v", err) + } + joinedPath := path.Join(parsedURL.Path, fileName) + + triesLeft := uploadToFileServerTries + var uploadErr error + for triesLeft > 0 { + if uploadErr = upload(parsedURL.Host, joinedPath, config, outputFile); uploadErr == nil { + return nil + } + triesLeft-- + if triesLeft == 0 { + return fmt.Errorf("failed to upload file after %d attempts", uploadToFileServerTries) + } + klog.InfoS("Failed to upload file", "UploadError", uploadErr, "TriesLeft", triesLeft) + time.Sleep(uploadToFileServerRetryDelay) + } + return nil +} + +func upload(address string, path string, config *ssh.ClientConfig, file io.Reader) error { + conn, err := ssh.Dial("tcp", address, config) + if err != nil { + return fmt.Errorf("error when connecting to fs server: %w", err) + } + sftpClient, err := sftp.NewClient(conn) + if err != nil { + return fmt.Errorf("error when setting up sftp client: %w", err) + } + defer func() { + if err := sftpClient.Close(); err != nil { + klog.ErrorS(err, "Error when closing sftp client") + } + }() + targetFile, err := sftpClient.Create(path) + if err != nil { + return fmt.Errorf("error when creating target file on remote: %v", err) + } + defer func() { + if err := targetFile.Close(); err != nil { + klog.ErrorS(err, "Error when closing target file on remote") + } + }() + if written, err := io.Copy(targetFile, file); err != nil { + return fmt.Errorf("error when copying target file: %v, written: %d", err, written) + } + klog.InfoS("Successfully upload file to path", "filePath", path) + return nil +} diff --git a/pkg/util/ftp/ftp_test.go b/pkg/util/ftp/ftp_test.go new file mode 100644 index 00000000000..b54ea710886 --- /dev/null +++ b/pkg/util/ftp/ftp_test.go @@ -0,0 +1,66 @@ +// Copyright 2024 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ftp + +import ( + "net/url" + "reflect" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestParseFTPUploadUrl(t *testing.T) { + cases := []struct { + url string + expectedError string + expectedURL url.URL + }{ + { + url: "127.0.0.1:22/path", + expectedURL: url.URL{ + Scheme: "sftp", + Host: "127.0.0.1:22", + Path: "/path", + }, + }, + { + url: "sftp://127.0.0.1:22/path", + expectedURL: url.URL{ + Scheme: "sftp", + Host: "127.0.0.1:22", + Path: "/path", + }, + }, + { + url: "https://10.220.175.92:22/root/supportbundle", + expectedError: "not sftp protocol", + }, + } + + for _, tc := range cases { + uploadUrl, err := parseFTPUploadUrl(tc.url) + if tc.expectedError == "" { + assert.NoError(t, err) + if !reflect.DeepEqual(tc.expectedURL, *uploadUrl) { + t.Errorf("expected %v, got %v", tc.expectedURL, *uploadUrl) + + } + } else { + assert.Equal(t, tc.expectedError, err.Error()) + } + } + +} diff --git a/test/e2e/packetsampling_test.go b/test/e2e/packetsampling_test.go new file mode 100644 index 00000000000..c1349955ca7 --- /dev/null +++ b/test/e2e/packetsampling_test.go @@ -0,0 +1,636 @@ +// Copyright 2024 Antrea Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package e2e + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + agentconfig "antrea.io/antrea/pkg/config/agent" + controllerconfig "antrea.io/antrea/pkg/config/controller" + "antrea.io/antrea/pkg/features" +) + +var ( + psNamespace = "default" + psSecretName = "ps-secret" + tcpServerPodName = "tcp-server" + psToolboxPodName = "toolbox" + udpServerPodName = "udp-server" + nonExistPodName = "non-existing-pod" + dstServiceName = "svc" + dstServiceIP = "" +) + +type psTestCase struct { + name string + ps *crdv1alpha1.PacketSampling + expectedPhase crdv1alpha1.PacketSamplingPhase + expectedReason string + expectedNum int32 + // required IP version, skip if not match, default is 0 (no restrict) + ipVersion int + // Source Pod to run ping for live-traffic PacketSampling. + srcPod string + skipIfNeeded func(t *testing.T) +} + +// TestPacketSampling is the top-level test which contains all subtests for +// PacketSampling related test cases so they can share setup, teardown. +func TestPacketSampling(t *testing.T) { + + data, err := setupTest(t) + if err != nil { + t.Fatalf("Error when setting up test: %v", err) + } + defer teardownTest(t, data) + + var previousAgentPacketSamplingEnableState bool + var previousControllerPacketSamplingEnableState bool + + ac := func(config *agentconfig.AgentConfig) { + previousAgentPacketSamplingEnableState = config.FeatureGates[string(features.PacketSampling)] + config.FeatureGates[string(features.PacketSampling)] = true + } + cc := func(config *controllerconfig.ControllerConfig) { + previousControllerPacketSamplingEnableState = config.FeatureGates[string(features.PacketSampling)] + config.FeatureGates[string(features.PacketSampling)] = true + } + if err := data.mutateAntreaConfigMap(cc, ac, true, true); err != nil { + t.Fatalf("Failed to enable PacketSampling flag: %v", err) + } + defer func() { + ac := func(config *agentconfig.AgentConfig) { + config.FeatureGates[string(features.PacketSampling)] = previousAgentPacketSamplingEnableState + } + cc := func(config *controllerconfig.ControllerConfig) { + config.FeatureGates[string(features.PacketSampling)] = previousControllerPacketSamplingEnableState + } + if err := data.mutateAntreaConfigMap(cc, ac, true, true); err != nil { + t.Errorf("Failed to disable PacketSampling flag: %v", err) + } + }() + + // setup sftp server for test. + sftpServiceYAML := "sftp-deployment.yml" + secretUserName := "foo" + secretPassword := "pass" + + applySFTPYamlCommand := fmt.Sprintf("kubectl apply -f %s -n %s", sftpServiceYAML, data.testNamespace) + code, stdout, stderr, err := data.RunCommandOnNode(controlPlaneNodeName(), applySFTPYamlCommand) + require.NoError(t, err) + defer func() { + deleteSFTPYamlCommand := fmt.Sprintf("kubectl delete -f %s -n %s", sftpServiceYAML, data.testNamespace) + data.RunCommandOnNode(controlPlaneNodeName(), deleteSFTPYamlCommand) + }() + t.Logf("Stdout of the command '%s': %s", applySFTPYamlCommand, stdout) + if code != 0 { + t.Errorf("Error when applying %s: %v", sftpServiceYAML, stderr) + } + failOnError(data.waitForDeploymentReady(t, data.testNamespace, "sftp", defaultTimeout), t) + + sec := &v1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: psSecretName, + }, + Data: map[string][]byte{ + "username": []byte(secretUserName), + "password": []byte(secretPassword), + }, + } + _, err = data.clientset.CoreV1().Secrets(psNamespace).Create(context.TODO(), sec, metav1.CreateOptions{}) + require.NoError(t, err) + defer data.clientset.CoreV1().Secrets(psNamespace).Delete(context.TODO(), psSecretName, metav1.DeleteOptions{}) + + t.Run("testPacketSamplingBasic", func(t *testing.T) { + testPacketSamplingBasic(t, data) + }) + t.Run("testPacketSampling", func(t *testing.T) { + testPacketSampling(t, data) + }) +} + +func testPacketSampling(t *testing.T, data *TestData) { + nodeIdx := 0 + if len(clusterInfo.windowsNodes) != 0 { + nodeIdx = clusterInfo.windowsNodes[0] + } + node1 := nodeName(nodeIdx) + + err := data.createServerPodWithLabels(tcpServerPodName, data.testNamespace, serverPodPort, nil) + require.NoError(t, err) + err = data.createToolboxPodOnNode(psToolboxPodName, data.testNamespace, node1, false) + require.NoError(t, err) + + svc, cleanup := data.createAgnhostServiceAndBackendPods(t, dstServiceName, data.testNamespace, node1, v1.ServiceTypeClusterIP) + defer cleanup() + t.Logf("%s Service is ready", dstServiceName) + dstServiceIP = svc.Spec.ClusterIP + + podIPs := waitForPodIPs(t, data, []PodInfo{ + {tcpServerPodName, getOSString(), "", data.testNamespace}, + {psToolboxPodName, getOSString(), "", data.testNamespace}, + }) + + // Give a little time for Windows containerd Nodes to setup OVS. + // Containerd configures port asynchronously, which could cause execution time of installing flow longer than docker. + time.Sleep(time.Second * 1) + + testcases := []psTestCase{ + { + name: "to-ipv4-ip", + ipVersion: 4, + srcPod: psToolboxPodName, + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", data.testNamespace, psToolboxPodName, data.testNamespace, tcpServerPodName)), + }, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: data.testNamespace, + Pod: psToolboxPodName, + }, + Destination: crdv1alpha1.Destination{ + IP: podIPs[tcpServerPodName].IPv4.String(), + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + FileServer: crdv1alpha1.BundleFileServer{ + URL: fmt.Sprintf("%s:30010/upload", controlPlaneNodeIPv4()), + }, + Authentication: crdv1alpha1.BundleServerAuthConfiguration{ + AuthType: "BasicAuthentication", + AuthSecret: &v1.SecretReference{ + Name: psSecretName, + Namespace: psNamespace, + }, + }, + Packet: crdv1alpha1.Packet{ + IPHeader: crdv1alpha1.IPHeader{ + Protocol: protocolTCP, + }, + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{ + DstPort: serverPodPort, + }, + }, + }, + }, + }, + + expectedPhase: crdv1alpha1.PacketSamplingSucceeded, + expectedNum: 5, + }, + { + name: "to-svc", + ipVersion: 4, + srcPod: psToolboxPodName, + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", data.testNamespace, psToolboxPodName, data.testNamespace, tcpServerPodName)), + }, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: data.testNamespace, + Pod: psToolboxPodName, + }, + Destination: crdv1alpha1.Destination{ + Service: dstServiceName, + Namespace: data.testNamespace, + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + FileServer: crdv1alpha1.BundleFileServer{ + URL: fmt.Sprintf("%s:30010/upload", controlPlaneNodeIPv4()), + }, + Authentication: crdv1alpha1.BundleServerAuthConfiguration{ + AuthType: "BasicAuthentication", + AuthSecret: &v1.SecretReference{ + Name: psSecretName, + Namespace: psNamespace, + }, + }, + Packet: crdv1alpha1.Packet{ + IPHeader: crdv1alpha1.IPHeader{ + Protocol: protocolTCP, + }, + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{ + DstPort: serverPodPort, + }, + }, + }, + }, + }, + + expectedPhase: crdv1alpha1.PacketSamplingSucceeded, + expectedNum: 5, + }, + } + t.Run("testPacketSampling", func(t *testing.T) { + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + runPacketSamplingTest(t, data, tc) + }) + } + }) + +} + +// testPacketSamplingTCP verifies if PacketSampling can capture tcp packets. this function only contains basic +// cases with pod-to-pod. +func testPacketSamplingBasic(t *testing.T, data *TestData) { + nodeIdx := 0 + if len(clusterInfo.windowsNodes) != 0 { + nodeIdx = clusterInfo.windowsNodes[0] + } + node1 := nodeName(nodeIdx) + + node1Pods, _, _ := createTestAgnhostPods(t, data, 3, data.testNamespace, node1) + err := data.createUDPServerPod(udpServerPodName, data.testNamespace, serverPodPort, node1) + defer data.DeletePodAndWait(defaultTimeout, udpServerPodName, data.testNamespace) + require.NoError(t, err) + // test tcp server pod + err = data.createServerPodWithLabels(tcpServerPodName, data.testNamespace, serverPodPort, nil) + defer data.DeletePodAndWait(defaultTimeout, tcpServerPodName, data.testNamespace) + require.NoError(t, err) + err = data.createToolboxPodOnNode(psToolboxPodName, data.testNamespace, node1, false) + defer data.DeletePodAndWait(defaultTimeout, psToolboxPodName, data.testNamespace) + require.NoError(t, err) + + // Give a little time for Windows containerd Nodes to setup OVS. + // Containerd configures port asynchronously, which could cause execution time of installing flow longer than docker. + time.Sleep(time.Second * 1) + + testcases := []psTestCase{ + { + name: "ipv4-tcp", + ipVersion: 4, + srcPod: psToolboxPodName, + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", data.testNamespace, psToolboxPodName, data.testNamespace, tcpServerPodName)), + }, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: data.testNamespace, + Pod: psToolboxPodName, + }, + Destination: crdv1alpha1.Destination{ + Namespace: data.testNamespace, + Pod: tcpServerPodName, + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + FileServer: crdv1alpha1.BundleFileServer{ + URL: fmt.Sprintf("%s:30010/upload", controlPlaneNodeIPv4()), + }, + Authentication: crdv1alpha1.BundleServerAuthConfiguration{ + AuthType: "BasicAuthentication", + AuthSecret: &v1.SecretReference{ + Name: psSecretName, + Namespace: psNamespace, + }, + }, + Packet: crdv1alpha1.Packet{ + IPHeader: crdv1alpha1.IPHeader{ + Protocol: protocolTCP, + }, + TransportHeader: crdv1alpha1.TransportHeader{ + TCP: &crdv1alpha1.TCPHeader{ + DstPort: serverPodPort, + }, + }, + }, + }, + }, + expectedPhase: crdv1alpha1.PacketSamplingSucceeded, + expectedNum: 5, + }, + { + name: "ipv4-udp", + ipVersion: 4, + srcPod: psToolboxPodName, + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", data.testNamespace, psToolboxPodName, data.testNamespace, udpServerPodName)), + }, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: data.testNamespace, + Pod: psToolboxPodName, + }, + Destination: crdv1alpha1.Destination{ + Namespace: data.testNamespace, + Pod: udpServerPodName, + }, + + Type: crdv1alpha1.FirstNSampling, + Timeout: 300, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + FileServer: crdv1alpha1.BundleFileServer{ + URL: fmt.Sprintf("%s:30010/upload", controlPlaneNodeIPv4()), + }, + Authentication: crdv1alpha1.BundleServerAuthConfiguration{ + AuthType: "BasicAuthentication", + AuthSecret: &v1.SecretReference{ + Name: psSecretName, + Namespace: psNamespace, + }, + }, + Packet: crdv1alpha1.Packet{ + IPHeader: crdv1alpha1.IPHeader{ + Protocol: protocolUDP, + }, + TransportHeader: crdv1alpha1.TransportHeader{ + UDP: &crdv1alpha1.UDPHeader{ + DstPort: serverPodPort, + }, + }, + }, + }, + }, + expectedPhase: crdv1alpha1.PacketSamplingSucceeded, + expectedNum: 5, + }, + { + name: "ipv4-icmp", + ipVersion: 4, + srcPod: node1Pods[0], + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", data.testNamespace, node1Pods[0], data.testNamespace, node1Pods[1])), + }, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: data.testNamespace, + Pod: node1Pods[0], + }, + Destination: crdv1alpha1.Destination{ + Namespace: data.testNamespace, + Pod: node1Pods[1], + }, + + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + FileServer: crdv1alpha1.BundleFileServer{ + URL: fmt.Sprintf("%s:30010/upload", controlPlaneNodeIPv4()), + }, + Authentication: crdv1alpha1.BundleServerAuthConfiguration{ + AuthType: "BasicAuthentication", + AuthSecret: &v1.SecretReference{ + Name: psSecretName, + Namespace: psNamespace, + }, + }, + Packet: crdv1alpha1.Packet{ + IPHeader: crdv1alpha1.IPHeader{ + Protocol: protocolICMP, + }, + }, + }, + }, + expectedPhase: crdv1alpha1.PacketSamplingSucceeded, + expectedNum: 5, + }, + { + name: "ipv6-icmp", + ipVersion: 6, + srcPod: node1Pods[0], + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-ipv6", data.testNamespace, node1Pods[0], data.testNamespace, node1Pods[1])), + }, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: data.testNamespace, + Pod: node1Pods[0], + }, + Destination: crdv1alpha1.Destination{ + Namespace: data.testNamespace, + Pod: node1Pods[1], + }, + + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + FileServer: crdv1alpha1.BundleFileServer{ + URL: fmt.Sprintf("%s:30010/upload", controlPlaneNodeIPv4()), + }, + Authentication: crdv1alpha1.BundleServerAuthConfiguration{ + AuthType: "BasicAuthentication", + AuthSecret: &v1.SecretReference{ + Name: psSecretName, + Namespace: psNamespace, + }, + }, + Packet: crdv1alpha1.Packet{ + IPv6Header: &crdv1alpha1.IPv6Header{ + NextHeader: &protocolICMPv6, + }, + }, + }, + }, + expectedPhase: crdv1alpha1.PacketSamplingSucceeded, + expectedNum: 5, + }, + { + + name: "non-exist-pod", + ipVersion: 4, + srcPod: node1Pods[0], + ps: &crdv1alpha1.PacketSampling{ + ObjectMeta: metav1.ObjectMeta{ + Name: randName(fmt.Sprintf("%s-%s-to-%s-%s-", data.testNamespace, node1Pods[0], data.testNamespace, nonExistPodName)), + }, + Spec: crdv1alpha1.PacketSamplingSpec{ + Source: crdv1alpha1.Source{ + Namespace: data.testNamespace, + Pod: node1Pods[0], + }, + Destination: crdv1alpha1.Destination{ + Namespace: data.testNamespace, + Pod: nonExistPodName, + }, + Type: crdv1alpha1.FirstNSampling, + FirstNSamplingConfig: &crdv1alpha1.FirstNSamplingConfig{ + Number: 5, + }, + FileServer: crdv1alpha1.BundleFileServer{ + URL: fmt.Sprintf("%s:30010/upload", controlPlaneNodeIPv4()), + }, + Authentication: crdv1alpha1.BundleServerAuthConfiguration{ + AuthType: "BasicAuthentication", + AuthSecret: &v1.SecretReference{ + Name: psSecretName, + Namespace: psNamespace, + }, + }, + }, + }, + expectedPhase: crdv1alpha1.PacketSamplingFailed, + expectedReason: fmt.Sprintf("Node: %s, error:failed to get the destination pod %s/%s: pods \"%s\" not found", node1, data.testNamespace, nonExistPodName, nonExistPodName), + }, + } + t.Run("testPacketSamplingBasic", func(t *testing.T) { + for _, tc := range testcases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + runPacketSamplingTest(t, data, tc) + }) + } + }) +} + +func getOSString() string { + if len(clusterInfo.windowsNodes) != 0 { + return "windows" + } else { + return "linux" + } +} + +func runPacketSamplingTest(t *testing.T, data *TestData, tc psTestCase) { + switch tc.ipVersion { + case 4: + skipIfNotIPv4Cluster(t) + case 6: + skipIfNotIPv6Cluster(t) + } + if tc.skipIfNeeded != nil { + tc.skipIfNeeded(t) + } + + dstPodName := tc.ps.Spec.Destination.Pod + var dstPodIPs *PodIPs + if dstPodName != nonExistPodName && dstPodName != "" { + // wait for pods to be ready first , or the ps will skip install flow + podIPs := waitForPodIPs(t, data, []PodInfo{{dstPodName, getOSString(), "", data.testNamespace}}) + dstPodIPs = podIPs[dstPodName] + } + + if _, err := data.crdClient.CrdV1alpha1().PacketSamplings().Create(context.TODO(), tc.ps, metav1.CreateOptions{}); err != nil { + t.Fatalf("Error when creating PacketSampling: %v", err) + } + defer func() { + if err := data.crdClient.CrdV1alpha1().PacketSamplings().Delete(context.TODO(), tc.ps.Name, metav1.DeleteOptions{}); err != nil { + t.Errorf("Error when deleting PacketSampling: %v", err) + } + }() + + if tc.ps.Spec.Destination.Pod != nonExistPodName { + srcPod := tc.srcPod + if dstIP := tc.ps.Spec.Destination.IP; dstIP != "" { + ip := net.ParseIP(dstIP) + if ip.To4() != nil { + dstPodIPs = &PodIPs{IPv4: &ip} + } else { + dstPodIPs = &PodIPs{IPv6: &ip} + } + } else if tc.ps.Spec.Destination.Service != "" { + ip := net.ParseIP(dstServiceIP) + if ip.To4() != nil { + dstPodIPs = &PodIPs{IPv4: &ip} + } else { + dstPodIPs = &PodIPs{IPv6: &ip} + } + } + // Give a little time for Nodes to install OVS flows. + time.Sleep(time.Second * 2) + protocol := tc.ps.Spec.Packet.IPHeader.Protocol + if tc.ps.Spec.Packet.IPv6Header != nil { + protocol = *tc.ps.Spec.Packet.IPv6Header.NextHeader + } + server := dstPodIPs.IPv4.String() + if tc.ipVersion == 6 { + server = dstPodIPs.IPv6.String() + } + // Send an ICMP echo packet from the source Pod to the destination. + if protocol == protocolICMP || protocol == protocolICMPv6 { + if err := data.RunPingCommandFromTestPod(PodInfo{srcPod, getOSString(), "", data.testNamespace}, + data.testNamespace, dstPodIPs, agnhostContainerName, 10, 0, false); err != nil { + t.Logf("Ping(%d) '%s' -> '%v' failed: ERROR (%v)", protocol, srcPod, *dstPodIPs, err) + } + } else if protocol == protocolTCP { + for i := 1; i <= 5; i++ { + if err := data.runNetcatCommandFromTestPodWithProtocol(tc.srcPod, data.testNamespace, toolboxContainerName, server, serverPodPort, "tcp"); err != nil { + t.Logf("Netcat(TCP) '%s' -> '%v' failed: ERROR (%v)", srcPod, server, err) + } + } + } else if protocol == protocolUDP { + for i := 1; i <= 5; i++ { + if err := data.runNetcatCommandFromTestPodWithProtocol(tc.srcPod, data.testNamespace, toolboxContainerName, server, serverPodPort, "udp"); err != nil { + t.Logf("Netcat(UDP) '%s' -> '%v' failed: ERROR (%v)", srcPod, server, err) + } + } + } + } + + ps, err := data.waitForPacketSampling(t, tc.ps.Name, tc.expectedPhase) + if err != nil { + t.Fatalf("Error: Get PacketSampling failed: %v", err) + } + if tc.expectedPhase == crdv1alpha1.PacketSamplingFailed { + if ps.Status.Reason != tc.expectedReason { + t.Fatalf("Error: PacketSampling Error Reason should be %v, but got %s", tc.expectedReason, ps.Status.Reason) + } + } + if ps.Status.NumCapturedPackets != tc.expectedNum { + t.Fatalf("Error: PacketSampling captured packets count should be %v, but got %v", tc.expectedNum, ps.Status.NumCapturedPackets) + } + +} + +func (data *TestData) waitForPacketSampling(t *testing.T, name string, phase crdv1alpha1.PacketSamplingPhase) (*crdv1alpha1.PacketSampling, error) { + var ps *crdv1alpha1.PacketSampling + var err error + timeout := 15 * time.Second + if err = wait.PollUntilContextTimeout(context.Background(), defaultInterval, timeout, true, func(ctx context.Context) (bool, error) { + ps, err = data.crdClient.CrdV1alpha1().PacketSamplings().Get(ctx, name, metav1.GetOptions{}) + if err != nil || ps.Status.Phase != phase { + return false, nil + } + return true, nil + }); err != nil { + if ps != nil { + t.Errorf("Latest PacketSampling status: %s %v", ps.Name, ps.Status) + } + return nil, err + } + return ps, nil +}