Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Hang Yan <[email protected]>
  • Loading branch information
hangyan committed Apr 10, 2024
1 parent da2c907 commit 3e625d7
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 141 deletions.
1 change: 0 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,6 @@ func run(o *Options) error {
ofClient,
ifaceStore,
nodeConfig,
o.enableAntreaProxy,
)
}

Expand Down
3 changes: 1 addition & 2 deletions docs/packetsampling-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ such actions on the target traffic flow.

The PacketSampling feature is disabled by default. If you
want to enable this feature, you need to set PacketSampling feature gate to true in `antrea-config`
ConfigMap for antrea-agent. In order to use a Service as the destination
in sampling, you also need to ensure [AntreaProxy](feature-gates.md#antreaproxy) is enabled in the agent configuration:
ConfigMap for antrea-agent.
```yaml
antrea-agent.conf: |
featureGates:
Expand Down
36 changes: 14 additions & 22 deletions pkg/agent/controller/packetsampling/packetin.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
)

// HandlePacketIn processes PacketIn messages from the OFSwitch. If the reg flag match, it will be counted and captured.
// Once the total number reaches the target one, the PacketSampling will be marked as Succeed.
// HandlePacketIn processes PacketIn messages from the OFSwitch. If the register value match, it will be counted and captured.
// Once the total number reaches the target, the PacketSampling will be marked as Succeed.
func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
klog.V(4).InfoS("PacketIn for PacketSampling", "PacketIn", pktIn.PacketIn)
samplingState, samplingFinished, err := c.parsePacketIn(pktIn)
Expand All @@ -55,24 +55,26 @@ func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
if err != nil {
return fmt.Errorf("get PacketSampling failed: %w", err)
}
err = c.updatePacketSamplingStatus(ps, crdv1alpha1.PacketSamplingRunning, "", samplingState.numCapturedPackets)
if err != nil {
return fmt.Errorf("failed to update the PacketSampling: %w", err)
}
klog.InfoS("Updated PacketSampling", "packetsampling", klog.KObj(ps), "numCapturedPackets", samplingState.numCapturedPackets)
// if reach the target. upload it.
// if reach the target. flush the file and upload it.
if reachTarget {
if err := samplingState.finishWriting(); err != nil {
if err := samplingState.pcapngWriter.Flush(); err != nil {
return err
}
return c.uploadPacketsFile(ps)
if err := c.uploadPackets(ps, samplingState.pcapngFile); err != nil {
return err
}
}
err = c.updatePacketSamplingStatus(ps, crdv1alpha1.PacketSamplingRunning, "", samplingState.numCapturedPackets)
if err != nil {
return fmt.Errorf("failed to update the PacketSampling: %w", err)
}
klog.InfoS("Updated PacketSampling", "PacketSampling", klog.KObj(ps), "numCapturedPackets", samplingState.numCapturedPackets)
}
return nil
}

// parsePacketIn parses the packet-in message and returns
// 1. the sampling state of the PacketSampling (on sampling mode)
// parsePacketIn parses the packet-in message. If the value in register match with existing PacketSampling's state(tag),
// it will be counted. If the total count reach the target, the ovs flow will be uninstalled.
func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (_ *packetSamplingState, samplingFinished bool, _ error) {
var tag uint8
matchers := pktIn.GetMatches()
Expand Down Expand Up @@ -105,13 +107,3 @@ func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (_ *packetSamplingSta
}
return psState, false, nil
}

func (c *Controller) uploadPacketsFile(ps *crdv1alpha1.PacketSampling) error {
name := uidToPath(string(ps.UID))
file, err := defaultFS.Open(name)
if err != nil {
return err
}
defer file.Close()
return c.uploadPackets(ps, file)
}
26 changes: 14 additions & 12 deletions pkg/agent/controller/packetsampling/packetin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/config"
Expand All @@ -45,10 +44,13 @@ const (
maxNum = 5
)

var (
const (
testTag = uint8(1)
testUID = "1-2-3-4"
testSFTPUrl = "sftp://10.220.175.92:22/root/packetsamplings"
)

var (
// parse to tag(1)
testTagData = []byte{0x11, 0x00, 0x00, 0x11}
)
Expand All @@ -75,12 +77,11 @@ func generateMatch(regID int, data []byte) openflow15.MatchField {
}
}

func getTestPacketBytes(dstIP string, dscp uint8) []byte {
func getTestPacketBytes(dstIP string) []byte {
ipPacket := &protocol.IPv4{
Version: 0x4,
IHL: 5,
Protocol: uint8(8),
DSCP: dscp,
Length: 20,
NWSrc: net.IP(pod1IPv4),
NWDst: net.IP(dstIP),
Expand All @@ -93,13 +94,14 @@ func getTestPacketBytes(dstIP string, dscp uint8) []byte {
return pktBytes
}

func generateTestPsState(name string, writer *pcapgo.NgWriter, num int32) *packetSamplingState {
func generateTestPsState(name string, pcapngFile afero.File, writer *pcapgo.NgWriter, num int32) *packetSamplingState {
return &packetSamplingState{
name: name,
maxNumCapturedPackets: maxNum,
numCapturedPackets: num,
tag: testTag,
pcapngWriter: writer,
pcapngFile: pcapngFile,
shouldSyncPackets: true,
updateRateLimiter: rate.NewLimiter(rate.Every(samplingStatusUpdatePeriod), 1),
}
Expand All @@ -109,7 +111,7 @@ func generatePacketSampling(name string) *crdv1alpha1.PacketSampling {
return &crdv1alpha1.PacketSampling{
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(testUID),
UID: testUID,
},
Status: crdv1alpha1.PacketSamplingStatus{},
Spec: crdv1alpha1.PacketSamplingSpec{
Expand Down Expand Up @@ -157,8 +159,8 @@ func (uploader *testUploader) Upload(url string, fileName string, config *ssh.Cl

func TestHandlePacketSamplingPacketIn(t *testing.T) {

invalidPktBytes := getTestPacketBytes("89.207.132.170", 0)
pktBytesPodToPod := getTestPacketBytes(pod2IPv4, testTag)
invalidPktBytes := getTestPacketBytes("89.207.132.170")
pktBytesPodToPod := getTestPacketBytes(pod2IPv4)

// create test os
defaultFS = afero.NewMemMapFs()
Expand Down Expand Up @@ -186,7 +188,7 @@ func TestHandlePacketSamplingPacketIn(t *testing.T) {
}{
{
name: "invalid packets",
psState: generateTestPsState("ps-with-invalid-packet", testWriter, 0),
psState: generateTestPsState("ps-with-invalid-packet", nil, testWriter, 0),
expectedPS: generatePacketSampling("ps-with-invalid-packet"),
pktIn: &ofctrl.PacketIn{
PacketIn: &openflow15.PacketIn{
Expand All @@ -197,7 +199,7 @@ func TestHandlePacketSamplingPacketIn(t *testing.T) {
},
{
name: "not hitting target number",
psState: generateTestPsState("ps-with-less-num", testWriter, 1),
psState: generateTestPsState("ps-with-less-num", nil, testWriter, 1),
expectedPS: generatePacketSampling("ps-with-less-num"),
expectedNum: 2,
pktIn: &ofctrl.PacketIn{
Expand All @@ -211,7 +213,7 @@ func TestHandlePacketSamplingPacketIn(t *testing.T) {
},
{
name: "hit target number",
psState: generateTestPsState("ps-with-max-num", testWriter, maxNum-1),
psState: generateTestPsState("ps-with-max-num", file, testWriter, maxNum-1),
expectedPS: generatePacketSampling("ps-with-max-num"),
expectedNum: maxNum,
pktIn: &ofctrl.PacketIn{
Expand All @@ -223,7 +225,7 @@ func TestHandlePacketSamplingPacketIn(t *testing.T) {
},
},
expectedCalls: func(mockOFClient *openflowtest.MockClient) {
mockOFClient.EXPECT().UninstallPacketSamplingFlows(uint8(testTag))
mockOFClient.EXPECT().UninstallPacketSamplingFlows(testTag)
},
},
}
Expand Down
Loading

0 comments on commit 3e625d7

Please sign in to comment.