Skip to content

Commit

Permalink
Add packetsampling feature
Browse files Browse the repository at this point in the history
Signed-off-by: Hang Yan <[email protected]>
  • Loading branch information
hangyan committed Apr 9, 2024
1 parent 476672c commit da2c907
Show file tree
Hide file tree
Showing 18 changed files with 4,307 additions and 0 deletions.
173 changes: 173 additions & 0 deletions build/charts/antrea/crds/packetsampling.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: packetsamplings.crd.antrea.io
labels:
app: antrea
spec:
group: crd.antrea.io
versions:
- name: v1alpha1
served: true
storage: true
additionalPrinterColumns:
- jsonPath: .status.phase
description: The phase of the PacketSampling.
name: Phase
type: string
- jsonPath: .spec.source.pod
description: The name of the source Pod.
name: Source-Pod
type: string
priority: 10
- jsonPath: .spec.destination.pod
description: The name of the destination Pod.
name: Destination-Pod
type: string
priority: 10
- jsonPath: .spec.destination.ip
description: The IP address of the destination.
name: Destination-IP
type: string
priority: 10
- jsonPath: .spec.timeout
description: Timeout in seconds.
name: Timeout
type: integer
priority: 10
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
schema:
openAPIV3Schema:
type: object
required:
- spec
properties:
spec:
type: object
properties:
source:
type: object
properties:
pod:
type: string
namespace:
type: string
ip:
type: string
oneOf:
- format: ipv4
- format: ipv6
destination:
type: object
properties:
pod:
type: string
service:
type: string
namespace:
type: string
ip:
type: string
oneOf:
- format: ipv4
- format: ipv6
packet:
type: object
properties:
ipHeader:
type: object
properties:
protocol:
type: integer
minimum: 0
maximum: 255
ipv6Header:
type: object
properties:
nextHeader:
type: integer
minimum: 0
maximum: 65535
transportHeader:
type: object
properties:
udp:
type: object
properties:
srcPort:
type: integer
minimum: 1
maximum: 65535
dstPort:
type: integer
minimum: 1
maximum: 65535
tcp:
type: object
properties:
srcPort:
type: integer
minimum: 1
maximum: 65535
dstPort:
type: integer
minimum: 1
maximum: 65535
flags:
type: integer
minimum: 0
maximum: 255
timeout:
type: integer
minimum: 1
maximum: 300
type:
type: string
firstNSamplingConfig:
type: object
properties:
number:
type: integer
fileServer:
type: object
properties:
url:
type: string
authentication:
type: object
properties:
authType:
type: string
enum: [ "BearerToken", "APIKey", "BasicAuthentication" ]
authSecret:
type: object
properties:
name:
type: string
namespace:
type: string
status:
type: object
properties:
reason:
type: string
phase:
type: string
startTime:
type: string
numCapturedPackets:
type: integer
packetsPath:
type: string

subresources:
status: {}
scope: Cluster
names:
plural: packetsamplings
singular: packetsampling
kind: PacketSampling
shortNames:
- ps
76 changes: 76 additions & 0 deletions docs/packetsampling-guide.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# PacketSampling User Guide

Starting with Antrea v1.16, Antrea supports using PacketSampling for network diagnosis.
It can capture specified number of packets from real traffic and upload them to a
supported storage location. Users can create a PacketSampling CRD to trigger
such actions on the target traffic flow.

<!-- toc -->
- [Prerequisites](#prerequisites)
- [Start a new PacketSampling](#start-a-new-packetsampling)
<!-- /toc -->

## Prerequisites

The PacketSampling feature is disabled by default. If you
want to enable this feature, you need to set PacketSampling feature gate to true in `antrea-config`
ConfigMap for antrea-agent. In order to use a Service as the destination
in sampling, you also need to ensure [AntreaProxy](feature-gates.md#antreaproxy) is enabled in the agent configuration:
```yaml
antrea-agent.conf: |
featureGates:
# Enable packetsampling feature to capture real traffic packets.
PacketSampling: true
```
## Start a new PacketSampling
When start a new packet sampling, you can provide the following information to identify
the target flow:
* Source Pod
* Destination Pod, Service or IP address
* Transport protocol (TCP/UDP/ICMP)
* Transport ports
You can start a new packet sampling by creating PacketSampling CRD via
`kubectl` and a yaml file which contains the essential configuration of
PacketSampling CRD. Following is an example of PacketSampling CRD:

```yaml
apiVersion: crd.antrea.io/v1alpha1
kind: PacketSampling
metadata:
name: ps-test
spec:
fileServer:
url: sftp://127.0.0.1:22/upload # define your own ftp url here.
authentication:
authType: "BasicAuthentication"
authSecret:
name: test-secret
namespace: default
timeout: 600
type: FirstNSampling
firstNSamplingConfig:
number: 5
source:
namespace: default
pod: frontend
destination:
namespace: default
pod: backend
# Destination can also be an IP address ('ip' field) or a Service name ('service' field); the 3 choices are mutually exclusive.
packet:
ipHeader: # If ipHeader/ipv6Header is not set, the default value is IPv4+ICMP.
protocol: 6 # Protocol here can be 6 (TCP), 17 (UDP) or 1 (ICMP), default value is 1 (ICMP)
transportHeader:
tcp:
dstPort: 8080 # Destination port needs to be set when the protocol is TCP/UDP.
```

The CRD above starts a new packet sampling from a Pod named `frontend`
to the port 8080 of a Pod named `backend` using TCP protocol. It will capture the first 5 packets
that meet this criterion and upload them to the file server specified in the PacketSampling's
specifications. Users can download the packet file from the ftp server and analysis it's content
with common network diagnose tools like Wireshark or `tcpdump`.
117 changes: 117 additions & 0 deletions pkg/agent/controller/packetsampling/packetin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2024 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package packetsampling

import (
"fmt"
"time"

"antrea.io/libOpenflow/util"
"antrea.io/ofnet/ofctrl"
"github.com/google/gopacket"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/openflow"
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
)

// HandlePacketIn processes PacketIn messages from the OFSwitch. If the reg flag match, it will be counted and captured.
// Once the total number reaches the target one, the PacketSampling will be marked as Succeed.
func (c *Controller) HandlePacketIn(pktIn *ofctrl.PacketIn) error {
klog.V(4).InfoS("PacketIn for PacketSampling", "PacketIn", pktIn.PacketIn)
samplingState, samplingFinished, err := c.parsePacketIn(pktIn)
if err != nil {
return fmt.Errorf("parsePacketIn error: %v", err)
}
if samplingFinished {
return nil
}
rawData := pktIn.Data.(*util.Buffer).Bytes()
ci := gopacket.CaptureInfo{
Timestamp: time.Now(),
CaptureLength: len(rawData),
Length: len(rawData),
}
err = samplingState.pcapngWriter.WritePacket(ci, rawData)
if err != nil {
return fmt.Errorf("couldn't write packet: %w", err)
}
reachTarget := samplingState.numCapturedPackets == samplingState.maxNumCapturedPackets
// use rate limiter to reduce the times we need to update status.
if reachTarget || samplingState.updateRateLimiter.Allow() {
ps, err := c.packetSamplingLister.Get(samplingState.name)
if err != nil {
return fmt.Errorf("get PacketSampling failed: %w", err)
}
err = c.updatePacketSamplingStatus(ps, crdv1alpha1.PacketSamplingRunning, "", samplingState.numCapturedPackets)
if err != nil {
return fmt.Errorf("failed to update the PacketSampling: %w", err)
}
klog.InfoS("Updated PacketSampling", "packetsampling", klog.KObj(ps), "numCapturedPackets", samplingState.numCapturedPackets)
// if reach the target. upload it.
if reachTarget {
if err := samplingState.finishWriting(); err != nil {
return err
}
return c.uploadPacketsFile(ps)
}
}
return nil
}

// parsePacketIn parses the packet-in message and returns
// 1. the sampling state of the PacketSampling (on sampling mode)
func (c *Controller) parsePacketIn(pktIn *ofctrl.PacketIn) (_ *packetSamplingState, samplingFinished bool, _ error) {
var tag uint8
matchers := pktIn.GetMatches()
match := openflow.GetMatchFieldByRegID(matchers, openflow.PacketSamplingMark.GetRegID())
if match != nil {
value, err := openflow.GetInfoInReg(match, openflow.PacketSamplingMark.GetRange().ToNXRange())
if err != nil {
return nil, false, fmt.Errorf("failed to get PacketSampling tag from packet-in message: %v", err)
}
tag = uint8(value)
}
c.runningPacketSamplingsMutex.Lock()
psState, exists := c.runningPacketSamplings[tag]
if exists {
if psState.numCapturedPackets == psState.maxNumCapturedPackets {
c.runningPacketSamplingsMutex.Unlock()
return nil, true, nil
}
psState.numCapturedPackets++
if psState.numCapturedPackets == psState.maxNumCapturedPackets {
err := c.ofClient.UninstallPacketSamplingFlows(tag)
if err != nil {
return nil, false, fmt.Errorf("uninstall PacketSampling ovs flow failed: %v", err)
}
}
}
c.runningPacketSamplingsMutex.Unlock()
if !exists {
return nil, false, fmt.Errorf("PacketSampling for dataplane tag %d not found in cache", tag)
}
return psState, false, nil
}

func (c *Controller) uploadPacketsFile(ps *crdv1alpha1.PacketSampling) error {
name := uidToPath(string(ps.UID))
file, err := defaultFS.Open(name)
if err != nil {
return err
}
defer file.Close()
return c.uploadPackets(ps, file)
}
Loading

0 comments on commit da2c907

Please sign in to comment.