Skip to content

Commit

Permalink
Define API for Agents to report Node latency stats (#6392)
Browse files Browse the repository at this point in the history
Follow up to #6120 

See #5514 

Signed-off-by: Asklv <[email protected]>
  • Loading branch information
IRONICBo authored Jun 18, 2024
1 parent 668d813 commit 25899c3
Show file tree
Hide file tree
Showing 24 changed files with 1,987 additions and 223 deletions.
6 changes: 6 additions & 0 deletions build/charts/antrea/templates/agent/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ metadata:
labels:
app: antrea
rules:
- apiGroups:
- stats.antrea.io
resources:
- nodelatencystats
verbs:
- create
- apiGroups:
- ""
resources:
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-aks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4144,6 +4144,12 @@ metadata:
labels:
app: antrea
rules:
- apiGroups:
- stats.antrea.io
resources:
- nodelatencystats
verbs:
- create
- apiGroups:
- ""
resources:
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-eks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4144,6 +4144,12 @@ metadata:
labels:
app: antrea
rules:
- apiGroups:
- stats.antrea.io
resources:
- nodelatencystats
verbs:
- create
- apiGroups:
- ""
resources:
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-gke.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4144,6 +4144,12 @@ metadata:
labels:
app: antrea
rules:
- apiGroups:
- stats.antrea.io
resources:
- nodelatencystats
verbs:
- create
- apiGroups:
- ""
resources:
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea-ipsec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4157,6 +4157,12 @@ metadata:
labels:
app: antrea
rules:
- apiGroups:
- stats.antrea.io
resources:
- nodelatencystats
verbs:
- create
- apiGroups:
- ""
resources:
Expand Down
6 changes: 6 additions & 0 deletions build/yamls/antrea.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4144,6 +4144,12 @@ metadata:
labels:
app: antrea
rules:
- apiGroups:
- stats.antrea.io
resources:
- nodelatencystats
verbs:
- create
- apiGroups:
- ""
resources:
Expand Down
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -953,6 +953,7 @@ func run(o *Options) error {
// Start the node latency monitor.
if features.DefaultFeatureGate.Enabled(features.NodeLatencyMonitor) && o.nodeType == config.K8sNode {
nodeLatencyMonitor := monitortool.NewNodeLatencyMonitor(
antreaClientProvider,
nodeInformer,
nodeLatencyMonitorInformer,
nodeConfig,
Expand Down
42 changes: 42 additions & 0 deletions pkg/agent/monitortool/latency_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (

"github.com/containernetworking/plugins/pkg/ip"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"

statsv1alpha1 "antrea.io/antrea/pkg/apis/stats/v1alpha1"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -248,3 +250,43 @@ func (s *LatencyStore) DeleteStaleNodeIPs() {
}
}
}

// ConvertList converts the latency store to a list of PeerNodeLatencyStats.
func (l *LatencyStore) ConvertList(currentNodeName string) []statsv1alpha1.PeerNodeLatencyStats {
l.mutex.RLock()
defer l.mutex.RUnlock()

// PeerNodeLatencyStats should be a list of size N-1, where N is the number of Nodes in the cluster.
// TargetIPLatencyStats will be a list of size 1 (single-stack case) or 2 (dual-stack case).
peerNodeLatencyStatsList := make([]statsv1alpha1.PeerNodeLatencyStats, 0, len(l.nodeIPLatencyMap))
for nodeName, nodeIPs := range l.nodeTargetIPsMap {
// Even though the current Node should already be excluded from the map, we add an extra check as an additional guarantee.
if nodeName == currentNodeName {
continue
}

targetIPLatencyStats := make([]statsv1alpha1.TargetIPLatencyStats, 0, len(nodeIPs))
for _, nodeIP := range nodeIPs {
nodeIPStr := nodeIP.String()
latencyEntry, ok := l.nodeIPLatencyMap[nodeIPStr]
if !ok {
continue
}
entry := statsv1alpha1.TargetIPLatencyStats{
TargetIP: nodeIPStr,
LastSendTime: metav1.NewTime(latencyEntry.LastSendTime),
LastRecvTime: metav1.NewTime(latencyEntry.LastRecvTime),
LastMeasuredRTTNanoseconds: latencyEntry.LastMeasuredRTT.Nanoseconds(),
}
targetIPLatencyStats = append(targetIPLatencyStats, entry)
}

peerNodeLatencyStats := statsv1alpha1.PeerNodeLatencyStats{
NodeName: nodeName,
TargetIPLatencyStats: targetIPLatencyStats,
}
peerNodeLatencyStatsList = append(peerNodeLatencyStatsList, peerNodeLatencyStats)
}

return peerNodeLatencyStatsList
}
33 changes: 32 additions & 1 deletion pkg/agent/monitortool/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package monitortool

import (
"context"
"math/rand"
"net"
"sync"
Expand All @@ -25,13 +26,16 @@ import (
"golang.org/x/net/ipv4"
"golang.org/x/net/ipv6"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

"antrea.io/antrea/pkg/agent/client"
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
statsv1alpha1 "antrea.io/antrea/pkg/apis/stats/v1alpha1"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1"
)

Expand Down Expand Up @@ -66,6 +70,8 @@ type NodeLatencyMonitor struct {
// isIPv6Enabled is the flag to indicate whether the IPv6 is enabled.
isIPv6Enabled bool

// antreaClientProvider provides interfaces to get antreaClient, which will be used to report the statistics
antreaClientProvider client.AntreaClientProvider
// nodeName is the name of the current Node, used to filter out the current Node from the latency monitor.
nodeName string

Expand All @@ -88,6 +94,7 @@ type latencyConfig struct {

// NewNodeLatencyMonitor creates a new NodeLatencyMonitor.
func NewNodeLatencyMonitor(
antreaClientProvider client.AntreaClientProvider,
nodeInformer coreinformers.NodeInformer,
nlmInformer crdinformers.NodeLatencyMonitorInformer,
nodeConfig *config.NodeConfig,
Expand All @@ -96,6 +103,7 @@ func NewNodeLatencyMonitor(
m := &NodeLatencyMonitor{
latencyStore: NewLatencyStore(trafficEncapMode.IsNetworkPolicyOnly()),
latencyConfigChanged: make(chan latencyConfig),
antreaClientProvider: antreaClientProvider,
nodeInformerSynced: nodeInformer.Informer().HasSynced,
nlmInformerSynced: nlmInformer.Informer().HasSynced,
nodeName: nodeConfig.Name,
Expand Down Expand Up @@ -176,7 +184,7 @@ func (m *NodeLatencyMonitor) onNodeDelete(obj interface{}) {
// onNodeLatencyMonitorAdd is the event handler for adding NodeLatencyMonitor.
func (m *NodeLatencyMonitor) onNodeLatencyMonitorAdd(obj interface{}) {
nlm := obj.(*v1alpha1.NodeLatencyMonitor)
klog.InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm))
klog.V(4).InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm))

m.updateLatencyConfig(nlm)
}
Expand Down Expand Up @@ -367,6 +375,28 @@ func (m *NodeLatencyMonitor) pingAll(ipv4Socket, ipv6Socket net.PacketConn) {
klog.V(4).InfoS("Done pinging all Nodes")
}

// getSummary returns the latency summary of the given Node IP.
func (m *NodeLatencyMonitor) getSummary() *statsv1alpha1.NodeLatencyStats {
return &statsv1alpha1.NodeLatencyStats{
ObjectMeta: metav1.ObjectMeta{
Name: m.nodeName,
},
PeerNodeLatencyStats: m.latencyStore.ConvertList(m.nodeName),
}
}

func (m *NodeLatencyMonitor) report() {
summary := m.getSummary()
antreaClient, err := m.antreaClientProvider.GetAntreaClient()
if err != nil {
klog.ErrorS(err, "Failed to get Antrea client")
return
}
if _, err := antreaClient.StatsV1alpha1().NodeLatencyStatses().Create(context.TODO(), summary, metav1.CreateOptions{}); err != nil {
klog.ErrorS(err, "Failed to create NodeIPLatencyStats")
}
}

// Run starts the NodeLatencyMonitor.
func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) {
if !cache.WaitForNamedCacheSync("NodeLatencyMonitor", stopCh, m.nodeInformerSynced, m.nlmInformerSynced) {
Expand Down Expand Up @@ -418,6 +448,7 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
// to avoid consistency issues and because it would not be sufficient to avoid stale entries completely.
// This means that we have to periodically invoke DeleteStaleNodeIPs to avoid stale entries in the map.
m.latencyStore.DeleteStaleNodeIPs()
m.report()
case <-stopCh:
return
case latencyConfig := <-m.latencyConfigChanged:
Expand Down
12 changes: 11 additions & 1 deletion pkg/agent/monitortool/monitor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
monitortesting "antrea.io/antrea/pkg/agent/monitortool/testing"
"antrea.io/antrea/pkg/agent/util/nettest"
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
"antrea.io/antrea/pkg/client/clientset/versioned"
fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/util/ip"
Expand Down Expand Up @@ -134,6 +135,14 @@ func (c *fakeClock) NewTicker(d time.Duration) clock.Ticker {
return c.FakeClock.NewTicker(d)
}

type antreaClientGetter struct {
clientset versioned.Interface
}

func (g *antreaClientGetter) GetAntreaClient() (versioned.Interface, error) {
return g.clientset, nil
}

type testMonitor struct {
*NodeLatencyMonitor
clientset *fake.Clientset
Expand All @@ -160,7 +169,8 @@ func newTestMonitor(
crdClientset := fakeversioned.NewSimpleClientset(crdObjects...)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClientset, 0)
nlmInformer := crdInformerFactory.Crd().V1alpha1().NodeLatencyMonitors()
m := NewNodeLatencyMonitor(nodeInformer, nlmInformer, nodeConfig, trafficEncapMode)
antreaClientProvider := &antreaClientGetter{fakeversioned.NewSimpleClientset(crdObjects...)}
m := NewNodeLatencyMonitor(antreaClientProvider, nodeInformer, nlmInformer, nodeConfig, trafficEncapMode)
fakeClock := newFakeClock(clockT)
m.clock = fakeClock
mockListener := monitortesting.NewMockPacketListener(ctrl)
Expand Down
Loading

0 comments on commit 25899c3

Please sign in to comment.