Skip to content

Commit

Permalink
Add ovs meter metrics (#5165)
Browse files Browse the repository at this point in the history
We have implemented rate-limiting for packet-in messages on
NetworkPolicy audit logging and Traceflow.
This change adds a metric to show the packet count which is
got from meter statistics.
A separate goroutine is used here to get the statistics every
30 seconds and collect the metric. The value more than 0 indicates
that current rate exceeds predefined limit(100 per second).

Fixes: #5037

Signed-off-by: Mengdie Song <[email protected]>
  • Loading branch information
mengdie-song authored Jul 25, 2023
1 parent bef3711 commit 2304804
Show file tree
Hide file tree
Showing 13 changed files with 218 additions and 64 deletions.
6 changes: 3 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ func run(o *Options) error {
if *o.config.EnablePrometheusMetrics {
metrics.InitializePrometheusMetrics()
}

// Create ovsdb and openflow clients.
ovsdbAddress := ovsconfig.GetConnAddress(o.config.OVSRunDir)
ovsdbConnection, err := ovsconfig.NewOVSDBConnectionUDS(ovsdbAddress)
Expand Down Expand Up @@ -164,6 +163,7 @@ func run(o *Options) error {
features.DefaultFeatureGate.Enabled(features.TrafficControl),
enableMulticlusterGW,
groupIDAllocator,
*o.config.EnablePrometheusMetrics,
)

var serviceCIDRNet *net.IPNet
Expand Down Expand Up @@ -873,8 +873,8 @@ func run(o *Options) error {
agentMonitor := monitor.NewAgentMonitor(crdClient, agentQuerier, agentAPICertData)
go agentMonitor.Run(stopCh)

// Start PacketIn
go ofClient.StartPacketInHandler(stopCh)
// Start PacketIn and OVS meter stats collection for Prometheus
go ofClient.Run(stopCh)

// Start the goroutine to periodically export IPFIX flow records.
if features.DefaultFeatureGate.Enabled(features.FlowExporter) && o.config.FlowExporter.Enable {
Expand Down
2 changes: 2 additions & 0 deletions docs/prometheus-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ partitioned by operation type (add, modify and delete).
errors, partitioned by operation type (add, modify and delete).
- **antrea_agent_ovs_flow_ops_latency_milliseconds:** The latency of OVS
flow operations, partitioned by operation type (add, modify and delete).
- **antrea_agent_ovs_meter_packet_dropped_count:** Number of packets dropped by
OVS meter. The value is greater than 0 when the packets exceed the rate-limit.
- **antrea_agent_ovs_total_flow_count:** Total flow count of all OVS flow
tables.

Expand Down
17 changes: 17 additions & 0 deletions pkg/agent/metrics/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ var (
[]string{"operation"},
)

// OVSMeterPacketDroppedCount is defined as a Gauge and not a Counter, even though this metric is monotonically
// increasing (only being reset to 0 on restart). This is because we want to set its value directly using the
// Set method (using the value provided by OVS), and using Inc / Add is not convenient.
OVSMeterPacketDroppedCount = metrics.NewGaugeVec(
&metrics.GaugeOpts{
Namespace: metricNamespaceAntrea,
Subsystem: metricSubsystemAgent,
Name: "ovs_meter_packet_dropped_count",
Help: "Number of packets dropped by OVS meter. The value is greater than 0 when the packets exceed the rate-limit.",
StabilityLevel: metrics.ALPHA,
},
[]string{"meter_id"},
)

TotalConnectionsInConnTrackTable = metrics.NewGauge(
&metrics.GaugeOpts{
Namespace: metricNamespaceAntrea,
Expand Down Expand Up @@ -213,6 +227,9 @@ func InitializeOVSMetrics() {
if err := legacyregistry.Register(OVSFlowOpsLatency); err != nil {
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_ovs_flow_ops_latency_milliseconds")
}
if err := legacyregistry.Register(OVSMeterPacketDroppedCount); err != nil {
klog.ErrorS(err, "Failed to register metrics with Prometheus", "metrics", "antrea_agent_ovs_meter_packet_dropped_count")
}
// Initialize OpenFlow operations metrics with label add, modify and delete
// since those metrics won't come out until observation.
opsArray := [3]string{"add", "modify", "delete"}
Expand Down
19 changes: 19 additions & 0 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow/cookie"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
Expand Down Expand Up @@ -1550,3 +1551,21 @@ func getFlowModMessage(flow binding.Flow, op binding.OFOperation) *openflow15.Fl
messages := GetFlowModMessages([]binding.Flow{flow}, op)
return messages[0]
}

// getMeterStats sends a multipart request to get all the meter statistics and
// sets values for antrea_agent_ovs_meter_packet_dropped_count.
func (c *client) getMeterStats() {
handleMeterStatsReply := func(meterID int, packetCount int64) {
switch meterID {
case PacketInMeterIDNP:
metrics.OVSMeterPacketDroppedCount.WithLabelValues("PacketInMeterNetworkPolicy").Set(float64(packetCount))
case PacketInMeterIDTF:
metrics.OVSMeterPacketDroppedCount.WithLabelValues("PacketInMeterTraceflow").Set(float64(packetCount))
default:
klog.V(4).InfoS("Received unexpected meterID", "meterID", meterID)
}
}
if err := c.bridge.GetMeterStats(handleMeterStatsReply); err != nil {
klog.ErrorS(err, "Failed to get OVS meter stats")
}
}
13 changes: 6 additions & 7 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations,
fn(o)
}

cli := NewClient(bridgeName,
client := NewClient(bridgeName,
bridgeMgmtAddr,
nodeiptest.NewFakeNodeIPChecker(),
o.enableProxy,
Expand All @@ -375,8 +375,8 @@ func newFakeClient(mockOFEntryOperations *oftest.MockOFEntryOperations,
o.enableMulticast,
o.enableTrafficControl,
o.enableMulticluster,
NewGroupAllocator())
client := cli.(*client)
NewGroupAllocator(),
false)

var egressExceptCIDRs []net.IPNet
var serviceIPv4CIDR, serviceIPv6CIDR *net.IPNet
Expand Down Expand Up @@ -1815,16 +1815,15 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) {
}

func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client {
ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, nil)
c := ofClient.(*client)
ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, nil, false)
m := ovsoftest.NewMockBridge(ctrl)
c.bridge = m
ofClient.bridge = m
bridge := binding.OFBridge{}
m.EXPECT().BuildPacketOut().Return(bridge.BuildPacketOut()).Times(1)
if success {
m.EXPECT().SendPacketOut(gomock.Any()).Times(1)
}
return c
return ofClient
}

func Test_client_SendPacketOut(t *testing.T) {
Expand Down
87 changes: 52 additions & 35 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"antrea.io/libOpenflow/openflow15"
"antrea.io/libOpenflow/protocol"
"antrea.io/ofnet/ofctrl"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
netutils "k8s.io/utils/net"
Expand Down Expand Up @@ -418,22 +419,23 @@ type flowCategoryCache struct {
}

type client struct {
enableProxy bool
proxyAll bool
enableDSR bool
enableAntreaPolicy bool
enableL7NetworkPolicy bool
enableDenyTracking bool
enableEgress bool
enableMulticast bool
enableTrafficControl bool
enableMulticluster bool
connectUplinkToBridge bool
nodeType config.NodeType
roundInfo types.RoundInfo
cookieAllocator cookie.Allocator
bridge binding.Bridge
groupIDAllocator GroupAllocator
enableProxy bool
proxyAll bool
enableDSR bool
enableAntreaPolicy bool
enableL7NetworkPolicy bool
enableDenyTracking bool
enableEgress bool
enableMulticast bool
enableTrafficControl bool
enableMulticluster bool
enablePrometheusMetrics bool
connectUplinkToBridge bool
nodeType config.NodeType
roundInfo types.RoundInfo
cookieAllocator cookie.Allocator
bridge binding.Bridge
groupIDAllocator GroupAllocator

featurePodConnectivity *featurePodConnectivity
featureService *featureService
Expand Down Expand Up @@ -472,6 +474,18 @@ type client struct {
nodeIPChecker nodeip.Checker
}

func (c *client) Run(stopCh <-chan struct{}) {
// Start PacketIn
c.StartPacketInHandler(stopCh)
// Start OVS meter stats collection
if c.enablePrometheusMetrics {
if c.ovsMetersAreSupported {
klog.Info("Start collecting OVS meter stats")
go wait.Until(c.getMeterStats, time.Second*30, stopCh)
}
}
}

func (c *client) GetTunnelVirtualMAC() net.HardwareAddr {
return GlobalVirtualMAC
}
Expand Down Expand Up @@ -2871,27 +2885,30 @@ func NewClient(bridgeName string,
enableMulticast bool,
enableTrafficControl bool,
enableMulticluster bool,
groupIDAllocator GroupAllocator) Client {
groupIDAllocator GroupAllocator,
enablePrometheusMetrics bool,
) *client {
bridge := binding.NewOFBridge(bridgeName, mgmtAddr)
c := &client{
bridge: bridge,
nodeIPChecker: nodeIPCheck,
enableProxy: enableProxy,
proxyAll: proxyAll,
enableDSR: enableDSR,
enableAntreaPolicy: enableAntreaPolicy,
enableL7NetworkPolicy: enableL7NetworkPolicy,
enableDenyTracking: enableDenyTracking,
enableEgress: enableEgress,
enableMulticast: enableMulticast,
enableTrafficControl: enableTrafficControl,
enableMulticluster: enableMulticluster,
connectUplinkToBridge: connectUplinkToBridge,
pipelines: make(map[binding.PipelineID]binding.Pipeline),
packetInHandlers: map[uint8]PacketInHandler{},
ovsctlClient: ovsctl.NewClient(bridgeName),
ovsMetersAreSupported: ovsMetersAreSupported(),
groupIDAllocator: groupIDAllocator,
bridge: bridge,
nodeIPChecker: nodeIPCheck,
enableProxy: enableProxy,
proxyAll: proxyAll,
enableDSR: enableDSR,
enableAntreaPolicy: enableAntreaPolicy,
enableL7NetworkPolicy: enableL7NetworkPolicy,
enableDenyTracking: enableDenyTracking,
enableEgress: enableEgress,
enableMulticast: enableMulticast,
enableTrafficControl: enableTrafficControl,
enableMulticluster: enableMulticluster,
enablePrometheusMetrics: enablePrometheusMetrics,
connectUplinkToBridge: connectUplinkToBridge,
pipelines: make(map[binding.PipelineID]binding.Pipeline),
packetInHandlers: map[uint8]PacketInHandler{},
ovsctlClient: ovsctl.NewClient(bridgeName),
ovsMetersAreSupported: ovsMetersAreSupported(),
groupIDAllocator: groupIDAllocator,
}
c.ofEntryOperations = c
return c
Expand Down
1 change: 1 addition & 0 deletions pkg/ovs/openflow/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type Bridge interface {
NewGroup(id GroupIDType) Group
NewMeter(id MeterIDType, flags ofctrl.MeterFlag) Meter
DeleteMeterAll() error
GetMeterStats(handleMeterStatsReply func(meterID int, packetCount int64)) error
DumpTableStatus() []TableStatus
// DumpFlows queries the Openflow entries from OFSwitch. The filter of the query is Openflow cookieID; the result is
// a map from flow cookieID to FlowStates.
Expand Down
61 changes: 53 additions & 8 deletions pkg/ovs/openflow/ofctrl_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,10 @@ type OFBridge struct {
// connected is an internal channel to notify if connected to the OFSwitch or not. It is used only in Connect method.
connected chan bool
// pktConsumers is a map from PacketIn category to the channel that is used to publish the PacketIn message.
pktConsumers sync.Map
multipartReplyChs map[uint32]chan *openflow15.MultipartReply
pktConsumers sync.Map

mpReplyChsMutex sync.RWMutex
mpReplyChs map[uint32]chan *openflow15.MultipartReply
// tunMetadataLengthMap is used to store the tlv-map settings on the OVS bridge. Key is the index of tunnel metedata,
// and value is the length configured in this tunnel metadata.
tunMetadataLengthMap map[uint16]uint8
Expand Down Expand Up @@ -240,6 +242,32 @@ func (b *OFBridge) DeleteMeterAll() error {
return nil
}

func (b *OFBridge) GetMeterStats(handleMeterStatsReply func(meterID int, packetCount int64)) error {
const OFPM_ALL = 0xffffffff // Represents all meters
mpMeterStatsReq := openflow15.NewMpRequest(openflow15.MultipartType_MeterStats)
meterMPReq := openflow15.NewMeterMultipartRequest(OFPM_ALL)
mpMeterStatsReq.Body = append(mpMeterStatsReq.Body, meterMPReq)
ch := make(chan *openflow15.MultipartReply, 1)
b.registerMpReplyCh(mpMeterStatsReq.Xid, ch)
go func() {
defer b.unregisterMpReplyCh(mpMeterStatsReq.Xid)
select {
case reply := <-ch:
if reply.Type == openflow15.MultipartType_MeterStats {
for _, entry := range reply.Body {
stats := entry.(*openflow15.MeterStats)
if len(stats.BandStats) > 0 {
handleMeterStatsReply(int(stats.MeterId), int64(stats.BandStats[0].PacketBandCount))
}
}
}
case <-time.After(5 * time.Second):
klog.InfoS("Timeout waiting for OVS MeterStats reply")
}
}()
return b.ofSwitch.Send(mpMeterStatsReq)
}

func (b *OFBridge) NewTable(table Table, next uint8, missAction MissActionType) Table {
table.SetNext(next)
table.SetMissAction(missAction)
Expand Down Expand Up @@ -319,7 +347,13 @@ func (b *OFBridge) SetOFSwitch(sw *ofctrl.OFSwitch) {
// MultipartReply is a callback when multipartReply message is received on ofctrl.OFSwitch is connected.
// Client uses this method to handle the reply message if it has customized MultipartRequest message.
func (b *OFBridge) MultipartReply(sw *ofctrl.OFSwitch, rep *openflow15.MultipartReply) {
if ch, ok := b.multipartReplyChs[rep.Xid]; ok {
ch, ok := func() (chan *openflow15.MultipartReply, bool) {
b.mpReplyChsMutex.RLock()
defer b.mpReplyChsMutex.RUnlock()
ch, ok := b.mpReplyChs[rep.Xid]
return ch, ok
}()
if ok {
ch <- rep
}
}
Expand Down Expand Up @@ -693,12 +727,10 @@ func (b *OFBridge) queryTableFeatures() {
// sending the Multipart Request messages to modify the tables' names. The buffer size "20" is the observed number
// of the Multipart Reply messages sent from OVS.
tableFeatureCh := make(chan *openflow15.MultipartReply, 20)
b.multipartReplyChs[mpartRequest.Xid] = tableFeatureCh
b.registerMpReplyCh(mpartRequest.Xid, tableFeatureCh)
go func() {
// Delete the channel which is used to receive the MultipartReply message after all tables' features are received.
defer func() {
delete(b.multipartReplyChs, mpartRequest.Xid)
}()
defer b.unregisterMpReplyCh(mpartRequest.Xid)
b.processTableFeatures(tableFeatureCh)
}()
b.ofSwitch.Send(mpartRequest)
Expand Down Expand Up @@ -742,14 +774,27 @@ func (b *OFBridge) processTableFeatures(ch chan *openflow15.MultipartReply) {
}
}

func (b *OFBridge) registerMpReplyCh(xid uint32, ch chan *openflow15.MultipartReply) {
b.mpReplyChsMutex.Lock()
defer b.mpReplyChsMutex.Unlock()
b.mpReplyChs[xid] = ch

}

func (b *OFBridge) unregisterMpReplyCh(xid uint32) {
b.mpReplyChsMutex.Lock()
defer b.mpReplyChsMutex.Unlock()
delete(b.mpReplyChs, xid)
}

func NewOFBridge(br string, mgmtAddr string) *OFBridge {
s := &OFBridge{
bridgeName: br,
mgmtAddr: mgmtAddr,
tableCache: make(map[uint8]*ofTable),
retryInterval: 1 * time.Second,
pktConsumers: sync.Map{},
multipartReplyChs: make(map[uint32]chan *openflow15.MultipartReply),
mpReplyChs: make(map[uint32]chan *openflow15.MultipartReply),
tunMetadataLengthMap: make(map[uint16]uint8),
}
s.controller = ofctrl.NewController(s)
Expand Down
Loading

0 comments on commit 2304804

Please sign in to comment.