Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Hang Yan <[email protected]>
  • Loading branch information
hangyan committed Apr 13, 2024
1 parent 9ae8636 commit 3126693
Show file tree
Hide file tree
Showing 11 changed files with 540 additions and 56 deletions.
25 changes: 11 additions & 14 deletions pkg/agent/controller/packetsampling/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,20 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (_ *packetSamplingSta
tag = uint8(value)
}
c.runningPacketSamplingsMutex.Lock()
defer c.runningPacketSamplingsMutex.Unlock()
psState, exists := c.runningPacketSamplings[tag]
if exists {
if psState.numCapturedPackets == psState.maxNumCapturedPackets {
c.runningPacketSamplingsMutex.Unlock()
return nil, true, nil
}
psState.numCapturedPackets++
if psState.numCapturedPackets == psState.maxNumCapturedPackets {
err := c.ofClient.UninstallPacketSamplingFlows(tag)
if err != nil {
return nil, false, fmt.Errorf("uninstall PacketSampling ovs flow failed: %v", err)
}
}
}
c.runningPacketSamplingsMutex.Unlock()
if !exists {
return nil, false, fmt.Errorf("PacketSampling for dataplane tag %d not found in cache", tag)
}
if psState.numCapturedPackets == psState.maxNumCapturedPackets {
return nil, true, nil
}
psState.numCapturedPackets++
if psState.numCapturedPackets == psState.maxNumCapturedPackets {
err := c.ofClient.UninstallPacketSamplingFlows(tag)
if err != nil {
return nil, false, fmt.Errorf("uninstall PacketSampling ovs flow failed: %v", err)
}
}
return psState, false, nil
}
83 changes: 48 additions & 35 deletions pkg/agent/controller/packetsampling/packetin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package packetsampling

import (
"context"
"encoding/binary"
"fmt"
"net"
"testing"

Expand All @@ -33,10 +35,8 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow"
openflowtest "antrea.io/antrea/pkg/agent/openflow/testing"
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
)
Expand All @@ -46,36 +46,35 @@ const (
)

const (
testTag = uint8(1)
testTag = uint8(3)
testUID = "1-2-3-4"
testSFTPUrl = "sftp://10.220.175.92:22/root/packetsamplings"
testSFTPUrl = "sftp://127.0.0.1:22/root/packetsamplings"
)

var (
// parse to tag(1)
testTagData = []byte{0x11, 0x00, 0x00, 0x11}
)
// generatePacketInMatchFromTag reverse the packetIn message/matcher -> REG4/tag value path
// to generate test matchers. It follows the following process:
// 1. shift bits to generate uint32, which represents data in REG4 and another REG (unrelated)
// 2. convert uint32 to bytes(bigEndian), which will be the Match value/mask.
// 3. generate MatchField from the bytes.
func generatePacketInMatchFromTag(tag uint8) *openflow15.MatchField {
value := uint32(tag) << 28
regID := 4
data := make([]byte, 8)
binary.BigEndian.PutUint32(data, value)

func genMatchers() []openflow15.MatchField {
m := generateMatch(openflow.PacketSamplingMark.GetRegID(), testTagData)
matchers := []openflow15.MatchField{m}
return matchers
}

func generateMatch(regID int, data []byte) openflow15.MatchField {
baseData := make([]byte, 8, 8)
if regID%2 == 0 {
copy(baseData[0:4], data)
} else {
copy(baseData[4:8], data)
}
return openflow15.MatchField{
Class: openflow15.OXM_CLASS_PACKET_REGS,
// convert reg (4-byte) ID to xreg (8-byte) ID
m := openflow15.MatchField{
Class: openflow15.OXM_CLASS_PACKET_REGS,
Field: uint8(regID / 2),
HasMask: false,
Value: &openflow15.ByteArrayField{Data: baseData},
Value: &openflow15.ByteArrayField{Data: data},
}
return &m
}

func genMatchers() []openflow15.MatchField {
// m := generateMatch(openflow.PacketSamplingMark.GetRegID(), testTagData)
matchers := []openflow15.MatchField{*generatePacketInMatchFromTag(testTag)}
return matchers
}

func getTestPacketBytes(dstIP string) []byte {
Expand Down Expand Up @@ -151,10 +150,17 @@ func generateTestSecret() *v1.Secret {
}

type testUploader struct {
url string
fileName string
}

func (uploader *testUploader) Upload(url string, fileName string, config *ssh.ClientConfig, outputFile afero.File) error {
klog.Info("Called test uploader")
if url != uploader.url {
return fmt.Errorf("expected url: %s for uploader, got: %s", uploader.url, url)
}
if fileName != uploader.fileName {
return fmt.Errorf("expected filename: %s for uploader, got: %s", uploader.fileName, fileName)
}
return nil
}

Expand All @@ -177,15 +183,16 @@ func TestHandlePacketSamplingPacketIn(t *testing.T) {
}

tests := []struct {
name string
networkConfig *config.NetworkConfig
nodeConfig *config.NodeConfig
psState *packetSamplingState
pktIn *ofctrl.PacketIn
expectedPS *crdv1alpha1.PacketSampling
expectedErrStr string
expectedCalls func(mockOFClient *openflowtest.MockClient)
expectedNum int32
name string
networkConfig *config.NetworkConfig
nodeConfig *config.NodeConfig
psState *packetSamplingState
pktIn *ofctrl.PacketIn
expectedPS *crdv1alpha1.PacketSampling
expectedErrStr string
expectedCalls func(mockOFClient *openflowtest.MockClient)
expectedNum int32
expectedUploader *testUploader
}{
{
name: "invalid packets",
Expand Down Expand Up @@ -228,6 +235,10 @@ func TestHandlePacketSamplingPacketIn(t *testing.T) {
expectedCalls: func(mockOFClient *openflowtest.MockClient) {
mockOFClient.EXPECT().UninstallPacketSamplingFlows(testTag)
},
expectedUploader: &testUploader{
fileName: testUID + ".pcapng",
url: testSFTPUrl,
},
},
}

Expand All @@ -243,6 +254,8 @@ func TestHandlePacketSamplingPacketIn(t *testing.T) {
psc.crdInformerFactory.WaitForCacheSync(stopCh)
psc.runningPacketSamplings[tt.psState.tag] = tt.psState

psc.sftpUploader = tt.expectedUploader

err := psc.HandlePacketIn(tt.pktIn)
if err == nil {
assert.Equal(t, tt.expectedErrStr, "")
Expand Down
15 changes: 11 additions & 4 deletions pkg/agent/controller/packetsampling/packetsampling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
return
}

// cleanup existing packets file first. successful PacketSampling will upload them to the target file server.
// others are useless once we restart the controller.
if err := defaultFS.RemoveAll(packetDirectory); err != nil {
klog.ErrorS(err, "Remove packets dir error", "directory", packetDirectory)
}
err := defaultFS.MkdirAll(packetDirectory, 0755)
if err != nil {
klog.ErrorS(err, "Couldn't create directory for storing sampling packets", "directory", packetDirectory)
Expand Down Expand Up @@ -280,8 +285,10 @@ func (c *Controller) cleanupPacketSampling(psName string) {
if err != nil {
klog.ErrorS(err, "Error cleaning up flows for PacketSampling", "name", psName)
}
if err := psState.pcapngFile.Close(); err != nil {
klog.ErrorS(err, "Error closing pcap file", "name", psName)
if psState.pcapngFile != nil {
if err := psState.pcapngFile.Close(); err != nil {
klog.ErrorS(err, "Error closing pcap file", "name", psName)
}
}
}
}
Expand Down Expand Up @@ -663,8 +670,8 @@ func (c *Controller) deallocateTag(name string, tag uint8) {
}

func (c *Controller) getTagForPacketSampling(name string) uint8 {
c.runningPacketSamplingsMutex.Lock()
defer c.runningPacketSamplingsMutex.Unlock()
c.runningPacketSamplingsMutex.RLock()
defer c.runningPacketSamplingsMutex.RUnlock()
for tag, state := range c.runningPacketSamplings {
if state != nil && state.name == name {
// The packetsampling request has been processed already.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"antrea.io/libOpenflow/protocol"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcapgo"
"github.com/spf13/afero"
Expand All @@ -37,7 +38,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/klog/v2"
"antrea.io/libOpenflow/protocol"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/interfacestore"
Expand Down
1 change: 0 additions & 1 deletion pkg/apiserver/handlers/featuregates/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ func Test_getControllerGatesResponse(t *testing.T) {
{Component: "controller", Name: "Multicluster", Status: "Disabled", Version: "ALPHA"},
{Component: "controller", Name: "NetworkPolicyStats", Status: "Enabled", Version: "BETA"},
{Component: "controller", Name: "NodeIPAM", Status: "Enabled", Version: "BETA"},
{Component: "controller", Name: "PacketSampling", Status: "Disabled", Version: "ALPHA"},
{Component: "controller", Name: "ServiceExternalIP", Status: "Disabled", Version: "ALPHA"},
{Component: "controller", Name: "SupportBundleCollection", Status: "Disabled", Version: "ALPHA"},
{Component: "controller", Name: "Traceflow", Status: "Enabled", Version: "BETA"},
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 3126693

Please sign in to comment.