Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Hang Yan <[email protected]>
  • Loading branch information
hangyan committed Aug 8, 2023
1 parent 4345445 commit 8b18060
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 140 deletions.
24 changes: 3 additions & 21 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
Expand Down Expand Up @@ -205,7 +204,7 @@ type Client interface {
SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet, inPort uint32, outPort int32) error

// InstallTraceflowFlows installs flows for a Traceflow request.
InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error
InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, sampling, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error

// UninstallTraceflowFlows uninstalls flows for a Traceflow request.
UninstallTraceflowFlows(dataplaneTag uint8) error
Expand Down Expand Up @@ -1118,13 +1117,14 @@ func (c *client) SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet,
return c.bridge.SendPacketOut(packetOutObj)
}

func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error {
func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, sampling, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error {
cacheKey := fmt.Sprintf("%x", dataplaneTag)
var flows []binding.Flow
for _, f := range c.traceableFeatures {
flows = append(flows, f.flowsToTrace(dataplaneTag,
c.ovsMetersAreSupported,
liveTraffic,
sampling,
droppedOnly,
receiverOnly,
packet,
Expand Down Expand Up @@ -1551,21 +1551,3 @@ func getFlowModMessage(flow binding.Flow, op binding.OFOperation) *openflow15.Fl
messages := GetFlowModMessages([]binding.Flow{flow}, op)
return messages[0]
}

// getMeterStats sends a multipart request to get all the meter statistics and
// sets values for antrea_agent_ovs_meter_packet_dropped_count.
func (c *client) getMeterStats() {
handleMeterStatsReply := func(meterID int, packetCount int64) {
switch meterID {
case PacketInMeterIDNP:
metrics.OVSMeterPacketDroppedCount.WithLabelValues("PacketInMeterNetworkPolicy").Set(float64(packetCount))
case PacketInMeterIDTF:
metrics.OVSMeterPacketDroppedCount.WithLabelValues("PacketInMeterTraceflow").Set(float64(packetCount))
default:
klog.V(4).InfoS("Received unexpected meterID", "meterID", meterID)
}
}
if err := c.bridge.GetMeterStats(handleMeterStatsReply); err != nil {
klog.ErrorS(err, "Failed to get OVS meter stats")
}
}
15 changes: 8 additions & 7 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations,
fn(o)
}

client := NewClient(bridgeName,
cli := NewClient(bridgeName,
bridgeMgmtAddr,
nodeiptest.NewFakeNodeIPChecker(),
o.enableProxy,
Expand All @@ -375,8 +375,8 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations,
o.enableMulticast,
o.enableTrafficControl,
o.enableMulticluster,
NewGroupAllocator(),
false)
NewGroupAllocator())
client := cli.(*client)

var egressExceptCIDRs []net.IPNet
var serviceIPv4CIDR, serviceIPv6CIDR *net.IPNet
Expand Down Expand Up @@ -1585,7 +1585,7 @@ func Test_client_InstallTraceflowFlows(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
c := tt.prepareFunc(ctrl)
if err := c.InstallTraceflowFlows(tt.args.dataplaneTag, false, false, false, nil, 0, 300); (err != nil) != tt.wantErr {
if err := c.InstallTraceflowFlows(tt.args.dataplaneTag, false, false, false, false, nil, 0, 300); (err != nil) != tt.wantErr {
t.Errorf("InstallTraceflowFlows() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down Expand Up @@ -1815,15 +1815,16 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) {
}

func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, nil, false)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, nil)
c := ofClient.(*client)
m := ovsoftest.NewMockBridge(ctrl)
ofClient.bridge = m
c.bridge = m
bridge := binding.OFBridge{}
m.EXPECT().BuildPacketOut().Return(bridge.BuildPacketOut()).Times(1)
if success {
m.EXPECT().SendPacketOut(gomock.Any()).Times(1)
}
return ofClient
return c
}

func Test_client_SendPacketOut(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/agent/openflow/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ type traceableFeature interface {
flowsToTrace(dataplaneTag uint8,
ovsMetersAreSupported,
liveTraffic,
sampling,
droppedOnly,
receiverOnly bool,
packet *binding.Packet,
Expand Down
116 changes: 55 additions & 61 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
"antrea.io/ofnet/ofctrl"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
Expand Down Expand Up @@ -419,23 +418,22 @@ type flowCategoryCache struct {
}

type client struct {
enableProxy bool
proxyAll bool
enableDSR bool
enableAntreaPolicy bool
enableL7NetworkPolicy bool
enableDenyTracking bool
enableEgress bool
enableMulticast bool
enableTrafficControl bool
enableMulticluster bool
enablePrometheusMetrics bool
connectUplinkToBridge bool
nodeType config.NodeType
roundInfo types.RoundInfo
cookieAllocator cookie.Allocator
bridge binding.Bridge
groupIDAllocator GroupAllocator
enableProxy bool
proxyAll bool
enableDSR bool
enableAntreaPolicy bool
enableL7NetworkPolicy bool
enableDenyTracking bool
enableEgress bool
enableMulticast bool
enableTrafficControl bool
enableMulticluster bool
connectUplinkToBridge bool
nodeType config.NodeType
roundInfo types.RoundInfo
cookieAllocator cookie.Allocator
bridge binding.Bridge
groupIDAllocator GroupAllocator

featurePodConnectivity *featurePodConnectivity
featureService *featureService
Expand Down Expand Up @@ -474,18 +472,6 @@ type client struct {
nodeIPChecker nodeip.Checker
}

func (c *client) Run(stopCh <-chan struct{}) {
// Start PacketIn
c.StartPacketInHandler(stopCh)
// Start OVS meter stats collection
if c.enablePrometheusMetrics {
if c.ovsMetersAreSupported {
klog.Info("Start collecting OVS meter stats")
go wait.Until(c.getMeterStats, time.Second*30, stopCh)
}
}
}

func (c *client) GetTunnelVirtualMAC() net.HardwareAddr {
return GlobalVirtualMAC
}
Expand Down Expand Up @@ -959,13 +945,17 @@ func (f *featureService) snatConntrackFlows() []binding.Flow {
// flowsToTrace generates Traceflow specific flows in the connectionTrackStateTable or L2ForwardingCalcTable for featurePodConnectivity.
// When packet is not provided, the flows bypass the drop flow in conntrackStateFlow to avoid unexpected drop of the
// injected Traceflow packet, and to drop any Traceflow packet that has ct_state +rpl, which may happen when the Traceflow
// request destination is the Node's IP. When packet is provided, a flow is added to mark - the first packet of the first
// connection that matches the provided packet - as the Traceflow packet. The flow is added in connectionTrackStateTable
// when receiverOnly is false and it also matches in_port to be the provided ofPort (the sender Pod); otherwise when
// receiverOnly is true, the flow is added into L2ForwardingCalcTable and matches the destination MAC (the receiver Pod MAC).
// request destination is the Node's IP.
//
// When packet is provided, a flow is added to mark the packet that matches the provided packet as the Traceflow packet.
// Specifically, when not on sampling mode, we only mark the first packet of the first connection. The flow is added in
// connectionTrackStateTable when receiverOnly is false and it also matches in_port to be the provided ofPort (the
// sender Pod); otherwise when receiverOnly is true, the flow is added into L2ForwardingCalcTable and matches the
// destination MAC (the receiver Pod MAC).
func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8,
ovsMetersAreSupported,
liveTraffic,
sampling,
droppedOnly,
receiverOnly bool,
packet *binding.Packet,
Expand Down Expand Up @@ -997,11 +987,9 @@ func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8,
} else {
var flowBuilder binding.FlowBuilder
if !receiverOnly {
flowBuilder = ConntrackStateTable.ofTable.BuildFlow(priorityLow).
flowBuilder = ConntrackStateTable.ofTable.BuildFlow(priorityHigh).
Cookie(cookieID).
MatchInPort(ofPort).
MatchCTStateNew(true).
MatchCTStateTrk(true).
Action().LoadIPDSCP(dataplaneTag).
SetHardTimeout(timeout).
Action().GotoStage(stagePreRouting)
Expand All @@ -1011,8 +999,6 @@ func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8,
} else {
flowBuilder = L2ForwardingCalcTable.ofTable.BuildFlow(priorityHigh).
Cookie(cookieID).
MatchCTStateNew(true).
MatchCTStateTrk(true).
MatchDstMAC(packet.DestinationMAC).
Action().LoadToRegField(TargetOFPortField, ofPort).
Action().LoadRegMark(OutputToOFPortRegMark).
Expand All @@ -1023,6 +1009,15 @@ func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8,
flowBuilder = flowBuilder.MatchSrcIP(packet.SourceIP)
}
}

// For live traffic traceflow, when on sampling mode, we track every target packet.
// Otherwise, we track only the first packet.
if !sampling {
flowBuilder = flowBuilder.
MatchCTStateNew(true).
MatchCTStateTrk(true)
}

// Match transport header
switch packet.IPProto {
case protocol.Type_ICMP:
Expand Down Expand Up @@ -1150,6 +1145,7 @@ func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8,
func (f *featureService) flowsToTrace(dataplaneTag uint8,
ovsMetersAreSupported,
liveTraffic,
sampling,
droppedOnly,
receiverOnly bool,
packet *binding.Packet,
Expand Down Expand Up @@ -1200,6 +1196,7 @@ func (f *featureService) flowsToTrace(dataplaneTag uint8,
func (f *featureNetworkPolicy) flowsToTrace(dataplaneTag uint8,
ovsMetersAreSupported,
liveTraffic,
sampling,
droppedOnly,
receiverOnly bool,
packet *binding.Packet,
Expand Down Expand Up @@ -2885,30 +2882,27 @@ func NewClient(bridgeName string,
enableMulticast bool,
enableTrafficControl bool,
enableMulticluster bool,
groupIDAllocator GroupAllocator,
enablePrometheusMetrics bool,
) *client {
groupIDAllocator GroupAllocator) Client {
bridge := binding.NewOFBridge(bridgeName, mgmtAddr)
c := &client{
bridge: bridge,
nodeIPChecker: nodeIPCheck,
enableProxy: enableProxy,
proxyAll: proxyAll,
enableDSR: enableDSR,
enableAntreaPolicy: enableAntreaPolicy,
enableL7NetworkPolicy: enableL7NetworkPolicy,
enableDenyTracking: enableDenyTracking,
enableEgress: enableEgress,
enableMulticast: enableMulticast,
enableTrafficControl: enableTrafficControl,
enableMulticluster: enableMulticluster,
enablePrometheusMetrics: enablePrometheusMetrics,
connectUplinkToBridge: connectUplinkToBridge,
pipelines: make(map[binding.PipelineID]binding.Pipeline),
packetInHandlers: map[uint8]PacketInHandler{},
ovsctlClient: ovsctl.NewClient(bridgeName),
ovsMetersAreSupported: ovsMetersAreSupported(),
groupIDAllocator: groupIDAllocator,
bridge: bridge,
nodeIPChecker: nodeIPCheck,
enableProxy: enableProxy,
proxyAll: proxyAll,
enableDSR: enableDSR,
enableAntreaPolicy: enableAntreaPolicy,
enableL7NetworkPolicy: enableL7NetworkPolicy,
enableDenyTracking: enableDenyTracking,
enableEgress: enableEgress,
enableMulticast: enableMulticast,
enableTrafficControl: enableTrafficControl,
enableMulticluster: enableMulticluster,
connectUplinkToBridge: connectUplinkToBridge,
pipelines: make(map[binding.PipelineID]binding.Pipeline),
packetInHandlers: map[uint8]PacketInHandler{},
ovsctlClient: ovsctl.NewClient(bridgeName),
ovsMetersAreSupported: ovsMetersAreSupported(),
groupIDAllocator: groupIDAllocator,
}
c.ofEntryOperations = c
return c
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 8b18060

Please sign in to comment.