Skip to content

Commit

Permalink
refactor pipeline code
Browse files Browse the repository at this point in the history
Signed-off-by: Hang Yan <[email protected]>
  • Loading branch information
hangyan committed Sep 23, 2024
1 parent 4ef7a4b commit 223bd5c
Show file tree
Hide file tree
Showing 5 changed files with 268 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ var (
ofPortPod2 = uint32(2)
testTCPFlags = int32(11)
icmp6Proto = intstr.FromInt(58)
tcpProto = intstr.FromString("TCP")
icmpProto = intstr.FromString("ICMP")
port80 int32 = 80
port81 int32 = 81
Expand Down
34 changes: 15 additions & 19 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -934,7 +934,6 @@ func (c *client) generatePipelines() {
c.enableL7FlowExporter)
c.activatedFeatures = append(c.activatedFeatures, c.featurePodConnectivity)
c.traceableFeatures = append(c.traceableFeatures, c.featurePodConnectivity)
c.packetCaptureFeatures = append(c.packetCaptureFeatures, c.featurePodConnectivity)

c.featureService = newFeatureService(c.cookieAllocator,
c.nodeIPChecker,
Expand All @@ -950,7 +949,7 @@ func (c *client) generatePipelines() {
c.connectUplinkToBridge)
c.activatedFeatures = append(c.activatedFeatures, c.featureService)
c.traceableFeatures = append(c.traceableFeatures, c.featureService)
c.packetCaptureFeatures = append(c.packetCaptureFeatures, c.featureService)

}

if c.nodeType == config.ExternalNode {
Expand Down Expand Up @@ -999,7 +998,7 @@ func (c *client) generatePipelines() {
c.activatedFeatures = append(c.activatedFeatures, c.featureTraceflow)

if c.enablePacketCapture {
c.featurePacketCapture = newFeaturePacketCapture()
c.featurePacketCapture = newFeaturePacketCapture(c.cookieAllocator, []binding.Protocol{binding.ProtocolIP, binding.ProtocolIPv6}, c.enableProxy, c.networkConfig, c.nodeConfig)
c.activatedFeatures = append(c.activatedFeatures, c.featurePacketCapture)
}

Expand Down Expand Up @@ -1051,6 +1050,19 @@ func (c *client) generatePipelines() {
}
}

func (c *client) InstallPacketCaptureFlows(dataplaneTag uint8, senderOnly, receiverOnly bool, packet *binding.Packet, endpointPackets []binding.Packet, ofPort uint32, timeoutSeconds uint16) error {
cacheKey := fmt.Sprintf("%x", dataplaneTag)
flows := c.featurePacketCapture.genFlows(dataplaneTag,
c.ovsMetersAreSupported,
senderOnly,
receiverOnly,
packet,
endpointPackets,
ofPort,
timeoutSeconds)
return c.addFlows(c.featurePacketCapture.cachedFlows, cacheKey, flows)
}

func (c *client) InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error {
var flows []binding.Flow
for _, serviceCIDR := range serviceCIDRs {
Expand Down Expand Up @@ -1247,22 +1259,6 @@ func (c *client) SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet,
return c.bridge.SendPacketOut(packetOutObj)
}

func (c *client) InstallPacketCaptureFlows(dataplaneTag uint8, senderOnly, receiverOnly bool, packet *binding.Packet, endpointPackets []binding.Packet, ofPort uint32, timeoutSeconds uint16) error {
cacheKey := fmt.Sprintf("%x", dataplaneTag)
var flows []binding.Flow
for _, f := range c.packetCaptureFeatures {
flows = append(flows, f.flowsToCapture(dataplaneTag,
c.ovsMetersAreSupported,
senderOnly,
receiverOnly,
packet,
endpointPackets,
ofPort,
timeoutSeconds)...)
}
return c.addFlows(c.featurePacketCapture.cachedFlows, cacheKey, flows)
}

func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error {
cacheKey := fmt.Sprintf("%x", dataplaneTag)
var flows []binding.Flow
Expand Down
11 changes: 0 additions & 11 deletions pkg/agent/openflow/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,14 +340,3 @@ type traceableFeature interface {
ofPort uint32,
timeoutSeconds uint16) []binding.Flow
}

type packetCaptureFeature interface {
flowsToCapture(dataplaneTag uint8,
ovsMetersAreSupported,
senderOnly bool,
receiverOnly bool,
packet *binding.Packet,
endpointPackets []binding.Packet,
ofPort uint32,
timeoutSeconds uint16) []binding.Flow
}
255 changes: 252 additions & 3 deletions pkg/agent/openflow/packetcapture.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,54 @@
package openflow

import (
"net"

"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
binding "antrea.io/antrea/pkg/ovs/openflow"
)

type featurePacketCapture struct {
cachedFlows *flowCategoryCache
cookieAllocator cookie.Allocator
cachedFlows *flowCategoryCache
ipProtocols []binding.Protocol
networkConfig *config.NetworkConfig
enableProxy bool
tunnelPort uint32
gatewayPort uint32
gatewayIPs map[binding.Protocol]net.IP
}

func (f *featurePacketCapture) getFeatureName() string {
return "PacketCapture"
}

func newFeaturePacketCapture() *featurePacketCapture {
func newFeaturePacketCapture(cookieAllocator cookie.Allocator,
ipProtocols []binding.Protocol,
enableProxy bool,
networkConfig *config.NetworkConfig,
nodeConfig *config.NodeConfig) *featurePacketCapture {
gatewayIPs := make(map[binding.Protocol]net.IP)
for _, ipProtocol := range ipProtocols {
if ipProtocol == binding.ProtocolIP {
gatewayIPs[ipProtocol] = nodeConfig.GatewayConfig.IPv4
} else if ipProtocol == binding.ProtocolIPv6 {
gatewayIPs[ipProtocol] = nodeConfig.GatewayConfig.IPv6
}

}
return &featurePacketCapture{
cachedFlows: newFlowCategoryCache(),
cachedFlows: newFlowCategoryCache(),
cookieAllocator: cookieAllocator,
ipProtocols: ipProtocols,
networkConfig: networkConfig,
enableProxy: enableProxy,
tunnelPort: nodeConfig.TunnelOFPort,
gatewayPort: nodeConfig.GatewayConfig.OFPort,
gatewayIPs: gatewayIPs,
}
}

Expand All @@ -53,3 +85,220 @@ func (f *featurePacketCapture) replayGroups() []binding.OFEntry {
func (f *featurePacketCapture) replayMeters() []binding.OFEntry {
return nil
}

// genFlows generates flows for packet capture. dataplaneTag is used as a mark for the target flow.
func (f *featurePacketCapture) genFlows(dataplaneTag uint8,
ovsMetersAreSupported,
senderOnly bool,
receiverOnly bool,
packet *binding.Packet,
endpointPackets []binding.Packet,
ofPort uint32,
timeout uint16) []binding.Flow {
cookieID := f.cookieAllocator.Request(cookie.PacketCapture).Raw()
var flows []binding.Flow
tag := uint32(dataplaneTag)
var flowBuilder binding.FlowBuilder
if !receiverOnly {
// if not receiverOnly, ofPort is inPort
if endpointPackets == nil {
flowBuilder = ConntrackStateTable.ofTable.BuildFlow(priorityHigh).
Cookie(cookieID).
MatchInPort(ofPort).
MatchCTStateTrk(true).
Action().LoadToRegField(PacketCaptureMark, tag).
SetHardTimeout(timeout).
Action().GotoStage(stagePreRouting)
if packet.DestinationIP != nil {
flowBuilder = flowBuilder.MatchDstIP(packet.DestinationIP)
}
} else {
// generate flows to endpoints.
for _, epPacket := range endpointPackets {
tmpFlowBuilder := ConntrackStateTable.ofTable.BuildFlow(priorityHigh).
Cookie(cookieID).
MatchInPort(ofPort).
MatchCTStateTrk(true).
Action().LoadToRegField(PacketCaptureMark, tag).
SetHardTimeout(timeout).
Action().GotoStage(stagePreRouting)
tmpFlowBuilder.MatchDstIP(epPacket.DestinationIP)
flow := matchTransportHeader(packet, tmpFlowBuilder, endpointPackets).Done()
flows = append(flows, flow)
}
}
} else {
flowBuilder = L2ForwardingCalcTable.ofTable.BuildFlow(priorityHigh).
Cookie(cookieID).
MatchCTStateTrk(true).
MatchDstMAC(packet.DestinationMAC).
Action().LoadToRegField(TargetOFPortField, ofPort).
Action().LoadRegMark(OutputToOFPortRegMark).
Action().LoadToRegField(PacketCaptureMark, tag).
SetHardTimeout(timeout).
Action().GotoStage(stageIngressSecurity)
if packet.SourceIP != nil {
flowBuilder = flowBuilder.MatchSrcIP(packet.SourceIP)
}
}

// for sender only case, capture the first tracked packet for svc.
if senderOnly {
for _, ipProtocol := range f.ipProtocols {
tmpFlowBuilder := ConntrackStateTable.ofTable.BuildFlow(priorityHigh).
Cookie(cookieID).
MatchInPort(ofPort).
MatchProtocol(ipProtocol).
MatchCTStateNew(true).
MatchCTStateTrk(true).
Action().LoadRegMark(RewriteMACRegMark).
Action().LoadToRegField(PacketCaptureMark, tag).
SetHardTimeout(timeout).
Action().GotoStage(stagePreRouting)
tmpFlowBuilder.MatchDstIP(packet.DestinationIP)
tmpFlowBuilder = matchTransportHeader(packet, tmpFlowBuilder, nil)
flows = append(flows, tmpFlowBuilder.Done())
}
}

if flowBuilder != nil {
flow := matchTransportHeader(packet, flowBuilder, nil).Done()
flows = append(flows, flow)
}

output := func(fb binding.FlowBuilder) binding.FlowBuilder {
return fb.Action().OutputToRegField(TargetOFPortField)
}

sendToController := func(fb binding.FlowBuilder) binding.FlowBuilder {
if ovsMetersAreSupported {
fb = fb.Action().Meter(PacketInMeterIDTF)
}
fb = fb.Action().SendToController([]byte{uint8(PacketInCategoryPacketCapture)}, false)
return fb
}

// This generates PacketCapture specific flows that outputs capture
// non-hairpin packets to OVS port and Antrea Agent after
// L2 forwarding calculation.
for _, ipProtocol := range f.ipProtocols {
if f.networkConfig.TrafficEncapMode.SupportsEncap() {
// SendToController and Output if output port is tunnel port.
fb := OutputTable.ofTable.BuildFlow(priorityNormal+3).
Cookie(cookieID).
MatchRegFieldWithValue(TargetOFPortField, f.tunnelPort).
MatchProtocol(ipProtocol).
MatchRegMark(OutputToOFPortRegMark).
MatchRegFieldWithValue(PacketCaptureMark, tag).
SetHardTimeout(timeout).
Action().OutputToRegField(TargetOFPortField)
fb = sendToController(fb)
flows = append(flows, fb.Done())
// For injected packets, only SendToController if output port is local gateway. In encapMode, a PacketCapture
// packet going out of the gateway port (i.e. exiting the overlay) essentially means that the PacketCapture
// request is complete.
fb = OutputTable.ofTable.BuildFlow(priorityNormal+2).
Cookie(cookieID).
MatchRegFieldWithValue(TargetOFPortField, f.gatewayPort).
MatchProtocol(ipProtocol).
MatchRegMark(OutputToOFPortRegMark).
MatchRegFieldWithValue(PacketCaptureMark, tag).
SetHardTimeout(timeout)
fb = sendToController(fb)
fb = output(fb)
flows = append(flows, fb.Done())
} else {
// SendToController and Output if output port is local gateway. Unlike in encapMode, inter-Node Pod-to-Pod
// traffic is expected to go out of the gateway port on the way to its destination.
fb := OutputTable.ofTable.BuildFlow(priorityNormal+2).
Cookie(cookieID).
MatchRegFieldWithValue(TargetOFPortField, f.gatewayPort).
MatchProtocol(ipProtocol).
MatchRegMark(OutputToOFPortRegMark).
MatchRegFieldWithValue(PacketCaptureMark, tag).
SetHardTimeout(timeout).
Action().OutputToRegField(TargetOFPortField)
fb = sendToController(fb)
flows = append(flows, fb.Done())
}

gatewayIP := f.gatewayIPs[ipProtocol]
if gatewayIP != nil {
fb := OutputTable.ofTable.BuildFlow(priorityNormal+3).
Cookie(cookieID).
MatchRegFieldWithValue(TargetOFPortField, f.gatewayPort).
MatchProtocol(ipProtocol).
MatchDstIP(gatewayIP).
MatchRegMark(OutputToOFPortRegMark).
MatchRegFieldWithValue(PacketCaptureMark, tag).
SetHardTimeout(timeout)
fb = sendToController(fb)
fb = output(fb)
flows = append(flows, fb.Done())
}

fb := OutputTable.ofTable.BuildFlow(priorityNormal+2).
Cookie(cookieID).
MatchProtocol(ipProtocol).
MatchRegMark(OutputToOFPortRegMark).
MatchRegFieldWithValue(PacketCaptureMark, tag).
SetHardTimeout(timeout)
fb = sendToController(fb)
fb = output(fb)
flows = append(flows, fb.Done())
}

// This generates PacketCapture specific flows that outputs hairpin PacketCapture packets to OVS port and Antrea Agent after
// L2forwarding calculation.
for _, ipProtocol := range f.ipProtocols {
if f.enableProxy {
fb := OutputTable.ofTable.BuildFlow(priorityHigh+2).
Cookie(cookieID).
MatchProtocol(ipProtocol).
MatchCTMark(HairpinCTMark).
MatchRegFieldWithValue(PacketCaptureMark, uint32(dataplaneTag)).
SetHardTimeout(timeout)
fb = sendToController(fb)
fb = fb.Action().OutputToRegField(TargetOFPortField)
flows = append(flows, fb.Done())
}
}

return flows
}

func matchTransportHeader(packet *binding.Packet, flowBuilder binding.FlowBuilder, endpointPackets []binding.Packet) binding.FlowBuilder {
// Match transport header
switch packet.IPProto {
case protocol.Type_ICMP:
flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolICMP)
case protocol.Type_IPv6ICMP:
flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolICMPv6)
case protocol.Type_TCP:
if packet.IsIPv6 {
flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolTCPv6)
} else {
flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolTCP)
}
case protocol.Type_UDP:
if packet.IsIPv6 {
flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolUDPv6)
} else {
flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolUDP)
}
default:
flowBuilder = flowBuilder.MatchIPProtocolValue(packet.IsIPv6, packet.IPProto)
}
if packet.IPProto == protocol.Type_TCP || packet.IPProto == protocol.Type_UDP {
if endpointPackets != nil && endpointPackets[0].DestinationPort != 0 {
flowBuilder = flowBuilder.MatchDstPort(endpointPackets[0].DestinationPort, nil)
} else if packet.DestinationPort != 0 {
flowBuilder = flowBuilder.MatchDstPort(packet.DestinationPort, nil)
}
if packet.SourcePort != 0 {
flowBuilder = flowBuilder.MatchSrcPort(packet.SourcePort, nil)
}
}

return flowBuilder
}
Loading

0 comments on commit 223bd5c

Please sign in to comment.