Skip to content

Commit

Permalink
Add unit tests for NodeLatencyMonitor (#6429)
Browse files Browse the repository at this point in the history
The tests leverage the net.PacketConn fake implementation which was
recently introduced.
We also introduce a mock for the ListenPacket call, which we use to
validate when sockets are opened and to return our fake implementation.
The new tests didn't reveal any issue in the existing code.

Signed-off-by: Antonin Bas <[email protected]>
  • Loading branch information
antoninbas authored Jun 13, 2024
1 parent 615907d commit 759f201
Show file tree
Hide file tree
Showing 6 changed files with 965 additions and 108 deletions.
1 change: 1 addition & 0 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ MOCKGEN_TARGETS=(
"pkg/agent/memberlist Memberlist ."
"pkg/agent/multicast RouteInterface testing"
"pkg/agent/types McastNetworkPolicyController testing"
"pkg/agent/monitortool PacketListener testing"
"pkg/agent/nodeportlocal/portcache LocalPortOpener testing"
"pkg/agent/nodeportlocal/rules PodPortRules testing"
"pkg/agent/openflow Client testing"
Expand Down
15 changes: 15 additions & 0 deletions pkg/agent/monitortool/latency_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ func (s *LatencyStore) getNodeIPLatencyEntry(nodeIP string) (NodeIPLatencyEntry,
return *entry, ok
}

// getNodeIPLatencyKeys returns the list of Node IPs for which we currently have
// latency measurements.
// It is only used for testing purposes.
func (s *LatencyStore) getNodeIPLatencyKeys() []string {
s.mutex.RLock()
defer s.mutex.RUnlock()

keys := make([]string, 0, len(s.nodeIPLatencyMap))
for key := range s.nodeIPLatencyMap {
keys = append(keys, key)
}

return keys
}

// SetNodeIPLatencyEntry sets the NodeIPLatencyEntry for the given Node IP
func (s *LatencyStore) SetNodeIPLatencyEntry(nodeIP string, mutator func(entry *NodeIPLatencyEntry)) {
s.mutex.Lock()
Expand Down
231 changes: 126 additions & 105 deletions pkg/agent/monitortool/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ import (
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/clock"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/apis/crd/v1alpha1"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1"
)

var (
icmpSeq uint32
// #nosec G404: random number generator not used for security purposes.
icmpEchoID = rand.Int31n(1 << 16)
)
// #nosec G404: random number generator not used for security purposes.
var icmpEchoID = rand.Int31n(1 << 16)

const (
ipv4ProtocolICMPRaw = "ip4:icmp"
Expand All @@ -47,15 +45,14 @@ const (
protocolICMPv6 = 58
)

// getICMPSeq returns the next sequence number as uint16,
// wrapping around to 0 after reaching the maximum value of uint16.
func getICMPSeq() uint16 {
// Increment the sequence number atomically and get the new value.
// We use atomic.AddUint32 and pass 1 as the increment.
// The returned value is the new value post-increment.
newVal := atomic.AddUint32(&icmpSeq, 1)
type PacketListener interface {
ListenPacket(network, address string) (net.PacketConn, error)
}

type ICMPListener struct{}

return uint16(newVal)
func (l *ICMPListener) ListenPacket(network, address string) (net.PacketConn, error) {
return icmp.ListenPacket(network, address)
}

// NodeLatencyMonitor is a tool to monitor the latency of the Node.
Expand All @@ -70,9 +67,15 @@ type NodeLatencyMonitor struct {
isIPv6Enabled bool

// nodeName is the name of the current Node, used to filter out the current Node from the latency monitor.
nodeName string
nodeInformer coreinformers.NodeInformer
nodeLatencyMonitorInformer crdinformers.NodeLatencyMonitorInformer
nodeName string

nodeInformerSynced cache.InformerSynced
nlmInformerSynced cache.InformerSynced

clock clock.WithTicker
listener PacketListener

icmpSeqNum atomic.Uint32
}

// latencyConfig is the config for the latency monitor.
Expand All @@ -84,16 +87,20 @@ type latencyConfig struct {
}

// NewNodeLatencyMonitor creates a new NodeLatencyMonitor.
func NewNodeLatencyMonitor(nodeInformer coreinformers.NodeInformer,
func NewNodeLatencyMonitor(
nodeInformer coreinformers.NodeInformer,
nlmInformer crdinformers.NodeLatencyMonitorInformer,
nodeConfig *config.NodeConfig,
trafficEncapMode config.TrafficEncapModeType) *NodeLatencyMonitor {
trafficEncapMode config.TrafficEncapModeType,
) *NodeLatencyMonitor {
m := &NodeLatencyMonitor{
latencyStore: NewLatencyStore(trafficEncapMode.IsNetworkPolicyOnly()),
latencyConfigChanged: make(chan latencyConfig),
nodeInformer: nodeInformer,
nodeLatencyMonitorInformer: nlmInformer,
nodeName: nodeConfig.Name,
latencyStore: NewLatencyStore(trafficEncapMode.IsNetworkPolicyOnly()),
latencyConfigChanged: make(chan latencyConfig),
nodeInformerSynced: nodeInformer.Informer().HasSynced,
nlmInformerSynced: nlmInformer.Informer().HasSynced,
nodeName: nodeConfig.Name,
clock: clock.RealClock{},
listener: &ICMPListener{},
}

m.isIPv4Enabled, _ = config.IsIPv4Enabled(nodeConfig, trafficEncapMode)
Expand Down Expand Up @@ -169,7 +176,7 @@ func (m *NodeLatencyMonitor) onNodeDelete(obj interface{}) {
// onNodeLatencyMonitorAdd is the event handler for adding NodeLatencyMonitor.
func (m *NodeLatencyMonitor) onNodeLatencyMonitorAdd(obj interface{}) {
nlm := obj.(*v1alpha1.NodeLatencyMonitor)
klog.V(4).InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm))
klog.InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm))

m.updateLatencyConfig(nlm)
}
Expand Down Expand Up @@ -219,8 +226,8 @@ func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error
requestType = ipv4.ICMPTypeEcho
}

timeStart := time.Now()
seqID := getICMPSeq()
timeStart := m.clock.Now()
seqID := m.getICMPSeqNum()
body := &icmp.Echo{
ID: int(icmpEchoID),
Seq: int(seqID),
Expand All @@ -240,8 +247,7 @@ func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error
}

// Send the ICMP message
_, err = socket.WriteTo(msgBytes, ip)
if err != nil {
if _, err = socket.WriteTo(msgBytes, ip); err != nil {
return err
}

Expand All @@ -254,8 +260,77 @@ func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error
return nil
}

// recvPing receives an ICMP message from the target IP address.
func (m *NodeLatencyMonitor) recvPing(socket net.PacketConn, isIPv4 bool) {
func (m *NodeLatencyMonitor) handlePing(buffer []byte, peerIP string, isIPv4 bool) {
// Parse the ICMP message
var msg *icmp.Message
if isIPv4 {
var err error
msg, err = icmp.ParseMessage(protocolICMP, buffer)
if err != nil {
klog.ErrorS(err, "Failed to parse ICMP message")
return
}
if msg.Type != ipv4.ICMPTypeEcho && msg.Type != ipv4.ICMPTypeEchoReply {
klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg)
return
}
// Ignore ICMP echo messages received from other Nodes (they will be answered by the system)
if msg.Type == ipv4.ICMPTypeEcho {
klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg)
return
}
} else {
var err error
msg, err = icmp.ParseMessage(protocolICMPv6, buffer)
if err != nil {
klog.ErrorS(err, "Failed to parse ICMP message")
return
}
if msg.Type != ipv6.ICMPTypeEchoRequest && msg.Type != ipv6.ICMPTypeEchoReply {
klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg)
return
}
// Ignore ICMP echo messages received from other Nodes (they will be answered by the system)
if msg.Type == ipv6.ICMPTypeEchoRequest {
klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg)
return
}
}

echo, ok := msg.Body.(*icmp.Echo)
if !ok {
klog.ErrorS(nil, "Failed to assert type as *icmp.Echo")
return
}
if echo.ID != int(icmpEchoID) {
klog.V(4).InfoS("Ignoring ICMP message with wrong echo ID", "msg", msg)
return
}

klog.V(4).InfoS("Received ICMP message", "IP", peerIP, "msg", msg)

// Parse the time from the ICMP data
sentTime, err := time.Parse(time.RFC3339Nano, string(echo.Data))
if err != nil {
klog.ErrorS(err, "Failed to parse time from ICMP data")
return
}

// Calculate the round-trip time
end := m.clock.Now()
rtt := end.Sub(sentTime)
klog.V(4).InfoS("Updating latency entry for Node IP", "IP", peerIP, "lastSendTime", sentTime, "lastRecvTime", end, "RTT", rtt)

// Update the latency store
mutator := func(entry *NodeIPLatencyEntry) {
entry.LastRecvTime = end
entry.LastMeasuredRTT = rtt
}
m.latencyStore.SetNodeIPLatencyEntry(peerIP, mutator)
}

// recvPings receives ICMP messages.
func (m *NodeLatencyMonitor) recvPings(socket net.PacketConn, isIPv4 bool) {
// We only expect small packets, if we receive a larger packet, we will drop the extra data.
readBuffer := make([]byte, 128)
for {
Expand All @@ -268,72 +343,7 @@ func (m *NodeLatencyMonitor) recvPing(socket net.PacketConn, isIPv4 bool) {
return
}

destIP := peer.String()

// Parse the ICMP message
var msg *icmp.Message
if isIPv4 {
msg, err = icmp.ParseMessage(protocolICMP, readBuffer[:n])
if err != nil {
klog.ErrorS(err, "Failed to parse ICMP message")
continue
}
if msg.Type != ipv4.ICMPTypeEcho && msg.Type != ipv4.ICMPTypeEchoReply {
klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg)
continue
}
// Ignore ICMP echo messages received from other Nodes (they will be answered by the system)
if msg.Type == ipv4.ICMPTypeEcho {
klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg)
continue
}
} else {
msg, err = icmp.ParseMessage(protocolICMPv6, readBuffer[:n])
if err != nil {
klog.ErrorS(err, "Failed to parse ICMP message")
continue
}
if msg.Type != ipv6.ICMPTypeEchoRequest && msg.Type != ipv6.ICMPTypeEchoReply {
klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg)
continue
}
// Ignore ICMP echo messages received from other Nodes (they will be answered by the system)
if msg.Type == ipv6.ICMPTypeEchoRequest {
klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg)
continue
}
}

echo, ok := msg.Body.(*icmp.Echo)
if !ok {
klog.ErrorS(nil, "Failed to assert type as *icmp.Echo")
continue
}
if echo.ID != int(icmpEchoID) {
klog.V(4).InfoS("Ignoring ICMP message with wrong echo ID", "msg", msg)
continue
}

klog.V(4).InfoS("Received ICMP message", "IP", destIP, "msg", msg)

// Parse the time from the ICMP data
sentTime, err := time.Parse(time.RFC3339Nano, string(echo.Data))
if err != nil {
klog.ErrorS(err, "Failed to parse time from ICMP data")
continue
}

// Calculate the round-trip time
end := time.Now()
rtt := end.Sub(sentTime)
klog.V(4).InfoS("Updating latency entry for Node IP", "IP", destIP, "lastSendTime", sentTime, "lastRecvTime", end, "RTT", rtt)

// Update the latency store
mutator := func(entry *NodeIPLatencyEntry) {
entry.LastRecvTime = end
entry.LastMeasuredRTT = rtt
}
m.latencyStore.SetNodeIPLatencyEntry(destIP, mutator)
m.handlePing(readBuffer[:n], peer.String(), isIPv4)
}
}

Expand All @@ -351,14 +361,18 @@ func (m *NodeLatencyMonitor) pingAll(ipv4Socket, ipv6Socket net.PacketConn) {
klog.ErrorS(err, "Cannot send ICMP message to Node IP", "IP", toIP)
}
} else {
klog.ErrorS(nil, "Cannot send ICMP message to Node IP because socket is not initialized for IP family", "IP", toIP)
klog.V(3).InfoS("Cannot send ICMP message to Node IP because socket is not initialized for IP family", "IP", toIP)
}
}
klog.V(4).InfoS("Done pinging all Nodes")
}

// Run starts the NodeLatencyMonitor.
func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) {
if !cache.WaitForNamedCacheSync("NodeLatencyMonitor", stopCh, m.nodeInformerSynced, m.nlmInformerSynced) {
return
}

go m.monitorLoop(stopCh)

<-stopCh
Expand All @@ -367,8 +381,7 @@ func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) {
// monitorLoop is the main loop to monitor the latency of the Node.
func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
klog.InfoS("NodeLatencyMonitor is running")
// Low level goroutine to handle ping loop
var ticker *time.Ticker
var ticker clock.Ticker
var tickerCh <-chan time.Time
var ipv4Socket, ipv6Socket net.PacketConn
var err error
Expand All @@ -390,8 +403,8 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
if ticker != nil {
ticker.Stop() // Stop the current ticker
}
ticker = time.NewTicker(interval)
tickerCh = ticker.C
ticker = m.clock.NewTicker(interval)
tickerCh = ticker.C()
}

wg := sync.WaitGroup{}
Expand All @@ -417,28 +430,28 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
// recreate it if it is closed(CRD is deleted).
if ipv4Socket == nil && m.isIPv4Enabled {
// Create a new socket for IPv4 when it is IPv4-only
ipv4Socket, err = icmp.ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0")
ipv4Socket, err = m.listener.ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0")
if err != nil {
klog.ErrorS(err, "Failed to create ICMP socket for IPv4")
return
}
wg.Add(1)
go func() {
defer wg.Done()
m.recvPing(ipv4Socket, true)
m.recvPings(ipv4Socket, true)
}()
}
if ipv6Socket == nil && m.isIPv6Enabled {
// Create a new socket for IPv6 when it is IPv6-only
ipv6Socket, err = icmp.ListenPacket(ipv6ProtocolICMPRaw, "::")
ipv6Socket, err = m.listener.ListenPacket(ipv6ProtocolICMPRaw, "::")
if err != nil {
klog.ErrorS(err, "Failed to create ICMP socket for IPv6")
return
}
wg.Add(1)
go func() {
defer wg.Done()
m.recvPing(ipv6Socket, false)
m.recvPings(ipv6Socket, false)
}()
}
} else {
Expand Down Expand Up @@ -467,3 +480,11 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) {
}
}
}

// getICMPSeqNum returns the sequence number to be used when sending the next
// ICMP echo request. It wraps around to 0 after reaching the maximum value for
// uint16.
func (m *NodeLatencyMonitor) getICMPSeqNum() uint16 {
newSeqNum := m.icmpSeqNum.Add(1)
return uint16(newSeqNum)
}
Loading

0 comments on commit 759f201

Please sign in to comment.