From 759f2017c9c1a9c1b46656b0c68f23e743b2de33 Mon Sep 17 00:00:00 2001 From: Antonin Bas Date: Thu, 13 Jun 2024 10:35:11 -0700 Subject: [PATCH] Add unit tests for NodeLatencyMonitor (#6429) The tests leverage the net.PacketConn fake implementation which was recently introduced. We also introduce a mock for the ListenPacket call, which we use to validate when sockets are opened and to return our fake implementation. The new tests didn't reveal any issue in the existing code. Signed-off-by: Antonin Bas --- hack/update-codegen-dockerized.sh | 1 + pkg/agent/monitortool/latency_store.go | 15 + pkg/agent/monitortool/monitor.go | 231 +++--- pkg/agent/monitortool/monitor_test.go | 751 ++++++++++++++++++ .../monitortool/testing/mock_monitortool.go | 69 ++ pkg/agent/util/nettest/packetconn.go | 6 +- 6 files changed, 965 insertions(+), 108 deletions(-) create mode 100644 pkg/agent/monitortool/monitor_test.go create mode 100644 pkg/agent/monitortool/testing/mock_monitortool.go diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index 34a0467155e..26b77491603 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -45,6 +45,7 @@ MOCKGEN_TARGETS=( "pkg/agent/memberlist Memberlist ." "pkg/agent/multicast RouteInterface testing" "pkg/agent/types McastNetworkPolicyController testing" + "pkg/agent/monitortool PacketListener testing" "pkg/agent/nodeportlocal/portcache LocalPortOpener testing" "pkg/agent/nodeportlocal/rules PodPortRules testing" "pkg/agent/openflow Client testing" diff --git a/pkg/agent/monitortool/latency_store.go b/pkg/agent/monitortool/latency_store.go index 78882a72798..35cef35eb60 100644 --- a/pkg/agent/monitortool/latency_store.go +++ b/pkg/agent/monitortool/latency_store.go @@ -78,6 +78,21 @@ func (s *LatencyStore) getNodeIPLatencyEntry(nodeIP string) (NodeIPLatencyEntry, return *entry, ok } +// getNodeIPLatencyKeys returns the list of Node IPs for which we currently have +// latency measurements. +// It is only used for testing purposes. +func (s *LatencyStore) getNodeIPLatencyKeys() []string { + s.mutex.RLock() + defer s.mutex.RUnlock() + + keys := make([]string, 0, len(s.nodeIPLatencyMap)) + for key := range s.nodeIPLatencyMap { + keys = append(keys, key) + } + + return keys +} + // SetNodeIPLatencyEntry sets the NodeIPLatencyEntry for the given Node IP func (s *LatencyStore) SetNodeIPLatencyEntry(nodeIP string, mutator func(entry *NodeIPLatencyEntry)) { s.mutex.Lock() diff --git a/pkg/agent/monitortool/monitor.go b/pkg/agent/monitortool/monitor.go index efd6693535d..c840c571643 100644 --- a/pkg/agent/monitortool/monitor.go +++ b/pkg/agent/monitortool/monitor.go @@ -28,17 +28,15 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/utils/clock" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/apis/crd/v1alpha1" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" ) -var ( - icmpSeq uint32 - // #nosec G404: random number generator not used for security purposes. - icmpEchoID = rand.Int31n(1 << 16) -) +// #nosec G404: random number generator not used for security purposes. +var icmpEchoID = rand.Int31n(1 << 16) const ( ipv4ProtocolICMPRaw = "ip4:icmp" @@ -47,15 +45,14 @@ const ( protocolICMPv6 = 58 ) -// getICMPSeq returns the next sequence number as uint16, -// wrapping around to 0 after reaching the maximum value of uint16. -func getICMPSeq() uint16 { - // Increment the sequence number atomically and get the new value. - // We use atomic.AddUint32 and pass 1 as the increment. - // The returned value is the new value post-increment. - newVal := atomic.AddUint32(&icmpSeq, 1) +type PacketListener interface { + ListenPacket(network, address string) (net.PacketConn, error) +} + +type ICMPListener struct{} - return uint16(newVal) +func (l *ICMPListener) ListenPacket(network, address string) (net.PacketConn, error) { + return icmp.ListenPacket(network, address) } // NodeLatencyMonitor is a tool to monitor the latency of the Node. @@ -70,9 +67,15 @@ type NodeLatencyMonitor struct { isIPv6Enabled bool // nodeName is the name of the current Node, used to filter out the current Node from the latency monitor. - nodeName string - nodeInformer coreinformers.NodeInformer - nodeLatencyMonitorInformer crdinformers.NodeLatencyMonitorInformer + nodeName string + + nodeInformerSynced cache.InformerSynced + nlmInformerSynced cache.InformerSynced + + clock clock.WithTicker + listener PacketListener + + icmpSeqNum atomic.Uint32 } // latencyConfig is the config for the latency monitor. @@ -84,16 +87,20 @@ type latencyConfig struct { } // NewNodeLatencyMonitor creates a new NodeLatencyMonitor. -func NewNodeLatencyMonitor(nodeInformer coreinformers.NodeInformer, +func NewNodeLatencyMonitor( + nodeInformer coreinformers.NodeInformer, nlmInformer crdinformers.NodeLatencyMonitorInformer, nodeConfig *config.NodeConfig, - trafficEncapMode config.TrafficEncapModeType) *NodeLatencyMonitor { + trafficEncapMode config.TrafficEncapModeType, +) *NodeLatencyMonitor { m := &NodeLatencyMonitor{ - latencyStore: NewLatencyStore(trafficEncapMode.IsNetworkPolicyOnly()), - latencyConfigChanged: make(chan latencyConfig), - nodeInformer: nodeInformer, - nodeLatencyMonitorInformer: nlmInformer, - nodeName: nodeConfig.Name, + latencyStore: NewLatencyStore(trafficEncapMode.IsNetworkPolicyOnly()), + latencyConfigChanged: make(chan latencyConfig), + nodeInformerSynced: nodeInformer.Informer().HasSynced, + nlmInformerSynced: nlmInformer.Informer().HasSynced, + nodeName: nodeConfig.Name, + clock: clock.RealClock{}, + listener: &ICMPListener{}, } m.isIPv4Enabled, _ = config.IsIPv4Enabled(nodeConfig, trafficEncapMode) @@ -169,7 +176,7 @@ func (m *NodeLatencyMonitor) onNodeDelete(obj interface{}) { // onNodeLatencyMonitorAdd is the event handler for adding NodeLatencyMonitor. func (m *NodeLatencyMonitor) onNodeLatencyMonitorAdd(obj interface{}) { nlm := obj.(*v1alpha1.NodeLatencyMonitor) - klog.V(4).InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm)) + klog.InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm)) m.updateLatencyConfig(nlm) } @@ -219,8 +226,8 @@ func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error requestType = ipv4.ICMPTypeEcho } - timeStart := time.Now() - seqID := getICMPSeq() + timeStart := m.clock.Now() + seqID := m.getICMPSeqNum() body := &icmp.Echo{ ID: int(icmpEchoID), Seq: int(seqID), @@ -240,8 +247,7 @@ func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error } // Send the ICMP message - _, err = socket.WriteTo(msgBytes, ip) - if err != nil { + if _, err = socket.WriteTo(msgBytes, ip); err != nil { return err } @@ -254,8 +260,77 @@ func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error return nil } -// recvPing receives an ICMP message from the target IP address. -func (m *NodeLatencyMonitor) recvPing(socket net.PacketConn, isIPv4 bool) { +func (m *NodeLatencyMonitor) handlePing(buffer []byte, peerIP string, isIPv4 bool) { + // Parse the ICMP message + var msg *icmp.Message + if isIPv4 { + var err error + msg, err = icmp.ParseMessage(protocolICMP, buffer) + if err != nil { + klog.ErrorS(err, "Failed to parse ICMP message") + return + } + if msg.Type != ipv4.ICMPTypeEcho && msg.Type != ipv4.ICMPTypeEchoReply { + klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) + return + } + // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) + if msg.Type == ipv4.ICMPTypeEcho { + klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) + return + } + } else { + var err error + msg, err = icmp.ParseMessage(protocolICMPv6, buffer) + if err != nil { + klog.ErrorS(err, "Failed to parse ICMP message") + return + } + if msg.Type != ipv6.ICMPTypeEchoRequest && msg.Type != ipv6.ICMPTypeEchoReply { + klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) + return + } + // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) + if msg.Type == ipv6.ICMPTypeEchoRequest { + klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) + return + } + } + + echo, ok := msg.Body.(*icmp.Echo) + if !ok { + klog.ErrorS(nil, "Failed to assert type as *icmp.Echo") + return + } + if echo.ID != int(icmpEchoID) { + klog.V(4).InfoS("Ignoring ICMP message with wrong echo ID", "msg", msg) + return + } + + klog.V(4).InfoS("Received ICMP message", "IP", peerIP, "msg", msg) + + // Parse the time from the ICMP data + sentTime, err := time.Parse(time.RFC3339Nano, string(echo.Data)) + if err != nil { + klog.ErrorS(err, "Failed to parse time from ICMP data") + return + } + + // Calculate the round-trip time + end := m.clock.Now() + rtt := end.Sub(sentTime) + klog.V(4).InfoS("Updating latency entry for Node IP", "IP", peerIP, "lastSendTime", sentTime, "lastRecvTime", end, "RTT", rtt) + + // Update the latency store + mutator := func(entry *NodeIPLatencyEntry) { + entry.LastRecvTime = end + entry.LastMeasuredRTT = rtt + } + m.latencyStore.SetNodeIPLatencyEntry(peerIP, mutator) +} + +// recvPings receives ICMP messages. +func (m *NodeLatencyMonitor) recvPings(socket net.PacketConn, isIPv4 bool) { // We only expect small packets, if we receive a larger packet, we will drop the extra data. readBuffer := make([]byte, 128) for { @@ -268,72 +343,7 @@ func (m *NodeLatencyMonitor) recvPing(socket net.PacketConn, isIPv4 bool) { return } - destIP := peer.String() - - // Parse the ICMP message - var msg *icmp.Message - if isIPv4 { - msg, err = icmp.ParseMessage(protocolICMP, readBuffer[:n]) - if err != nil { - klog.ErrorS(err, "Failed to parse ICMP message") - continue - } - if msg.Type != ipv4.ICMPTypeEcho && msg.Type != ipv4.ICMPTypeEchoReply { - klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) - continue - } - // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) - if msg.Type == ipv4.ICMPTypeEcho { - klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) - continue - } - } else { - msg, err = icmp.ParseMessage(protocolICMPv6, readBuffer[:n]) - if err != nil { - klog.ErrorS(err, "Failed to parse ICMP message") - continue - } - if msg.Type != ipv6.ICMPTypeEchoRequest && msg.Type != ipv6.ICMPTypeEchoReply { - klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) - continue - } - // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) - if msg.Type == ipv6.ICMPTypeEchoRequest { - klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) - continue - } - } - - echo, ok := msg.Body.(*icmp.Echo) - if !ok { - klog.ErrorS(nil, "Failed to assert type as *icmp.Echo") - continue - } - if echo.ID != int(icmpEchoID) { - klog.V(4).InfoS("Ignoring ICMP message with wrong echo ID", "msg", msg) - continue - } - - klog.V(4).InfoS("Received ICMP message", "IP", destIP, "msg", msg) - - // Parse the time from the ICMP data - sentTime, err := time.Parse(time.RFC3339Nano, string(echo.Data)) - if err != nil { - klog.ErrorS(err, "Failed to parse time from ICMP data") - continue - } - - // Calculate the round-trip time - end := time.Now() - rtt := end.Sub(sentTime) - klog.V(4).InfoS("Updating latency entry for Node IP", "IP", destIP, "lastSendTime", sentTime, "lastRecvTime", end, "RTT", rtt) - - // Update the latency store - mutator := func(entry *NodeIPLatencyEntry) { - entry.LastRecvTime = end - entry.LastMeasuredRTT = rtt - } - m.latencyStore.SetNodeIPLatencyEntry(destIP, mutator) + m.handlePing(readBuffer[:n], peer.String(), isIPv4) } } @@ -351,7 +361,7 @@ func (m *NodeLatencyMonitor) pingAll(ipv4Socket, ipv6Socket net.PacketConn) { klog.ErrorS(err, "Cannot send ICMP message to Node IP", "IP", toIP) } } else { - klog.ErrorS(nil, "Cannot send ICMP message to Node IP because socket is not initialized for IP family", "IP", toIP) + klog.V(3).InfoS("Cannot send ICMP message to Node IP because socket is not initialized for IP family", "IP", toIP) } } klog.V(4).InfoS("Done pinging all Nodes") @@ -359,6 +369,10 @@ func (m *NodeLatencyMonitor) pingAll(ipv4Socket, ipv6Socket net.PacketConn) { // Run starts the NodeLatencyMonitor. func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) { + if !cache.WaitForNamedCacheSync("NodeLatencyMonitor", stopCh, m.nodeInformerSynced, m.nlmInformerSynced) { + return + } + go m.monitorLoop(stopCh) <-stopCh @@ -367,8 +381,7 @@ func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) { // monitorLoop is the main loop to monitor the latency of the Node. func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { klog.InfoS("NodeLatencyMonitor is running") - // Low level goroutine to handle ping loop - var ticker *time.Ticker + var ticker clock.Ticker var tickerCh <-chan time.Time var ipv4Socket, ipv6Socket net.PacketConn var err error @@ -390,8 +403,8 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { if ticker != nil { ticker.Stop() // Stop the current ticker } - ticker = time.NewTicker(interval) - tickerCh = ticker.C + ticker = m.clock.NewTicker(interval) + tickerCh = ticker.C() } wg := sync.WaitGroup{} @@ -417,7 +430,7 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { // recreate it if it is closed(CRD is deleted). if ipv4Socket == nil && m.isIPv4Enabled { // Create a new socket for IPv4 when it is IPv4-only - ipv4Socket, err = icmp.ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0") + ipv4Socket, err = m.listener.ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0") if err != nil { klog.ErrorS(err, "Failed to create ICMP socket for IPv4") return @@ -425,12 +438,12 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { wg.Add(1) go func() { defer wg.Done() - m.recvPing(ipv4Socket, true) + m.recvPings(ipv4Socket, true) }() } if ipv6Socket == nil && m.isIPv6Enabled { // Create a new socket for IPv6 when it is IPv6-only - ipv6Socket, err = icmp.ListenPacket(ipv6ProtocolICMPRaw, "::") + ipv6Socket, err = m.listener.ListenPacket(ipv6ProtocolICMPRaw, "::") if err != nil { klog.ErrorS(err, "Failed to create ICMP socket for IPv6") return @@ -438,7 +451,7 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { wg.Add(1) go func() { defer wg.Done() - m.recvPing(ipv6Socket, false) + m.recvPings(ipv6Socket, false) }() } } else { @@ -467,3 +480,11 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { } } } + +// getICMPSeqNum returns the sequence number to be used when sending the next +// ICMP echo request. It wraps around to 0 after reaching the maximum value for +// uint16. +func (m *NodeLatencyMonitor) getICMPSeqNum() uint16 { + newSeqNum := m.icmpSeqNum.Add(1) + return uint16(newSeqNum) +} diff --git a/pkg/agent/monitortool/monitor_test.go b/pkg/agent/monitortool/monitor_test.go new file mode 100644 index 00000000000..30bfe148b47 --- /dev/null +++ b/pkg/agent/monitortool/monitor_test.go @@ -0,0 +1,751 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitortool + +import ( + "context" + "net" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/clock" + clocktesting "k8s.io/utils/clock/testing" + + "antrea.io/antrea/pkg/agent/config" + monitortesting "antrea.io/antrea/pkg/agent/monitortool/testing" + "antrea.io/antrea/pkg/agent/util/nettest" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/util/ip" +) + +func makeNode(nodeName string, nodeIPs []string, podCIDRs []string) *corev1.Node { + addresses := []corev1.NodeAddress{} + for _, ip := range nodeIPs { + addresses = append(addresses, corev1.NodeAddress{ + Type: corev1.NodeInternalIP, + Address: ip, + }) + } + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + Spec: corev1.NodeSpec{ + PodCIDR: podCIDRs[0], + PodCIDRs: podCIDRs, + }, + Status: corev1.NodeStatus{ + Addresses: addresses, + }, + } +} + +var ( + nodeConfigDualStack = &config.NodeConfig{ + Name: "node1", + PodIPv4CIDR: ip.MustParseCIDR("10.0.1.0/24"), + PodIPv6CIDR: ip.MustParseCIDR("2001:ab03:cd04:55ee:100a::/80"), + NodeIPv4Addr: ip.MustParseCIDR("192.168.77.100/24"), + NodeIPv6Addr: ip.MustParseCIDR("192:168:77::100/80"), + } + nodeConfigIPv4 = &config.NodeConfig{ + Name: "node1", + PodIPv4CIDR: ip.MustParseCIDR("10.0.1.0/24"), + NodeIPv4Addr: ip.MustParseCIDR("192.168.77.100/24"), + } + + nlm = &crdv1alpha1.NodeLatencyMonitor{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + Spec: crdv1alpha1.NodeLatencyMonitorSpec{ + PingIntervalSeconds: 60, + }, + } + + node1 = makeNode("node1", []string{"192.168.77.101", "192:168:77::101"}, []string{"10.0.1.0/24", "2001:ab03:cd04:55ee:100a::/80"}) + node2 = makeNode("node2", []string{"192.168.77.102", "192:168:77::102"}, []string{"10.0.2.0/24", "2001:ab03:cd04:55ee:100b::/80"}) + node3 = makeNode("node3", []string{"192.168.77.103", "192:168:77::103"}, []string{"10.0.3.0/24", "2001:ab03:cd04:55ee:100c::/80"}) +) + +type testAddr struct { + network string + address string +} + +func (a *testAddr) Network() string { + return a.network +} +func (a *testAddr) String() string { + return a.address +} + +var ( + testAddrIPv4 = &testAddr{network: ipv4ProtocolICMPRaw, address: "0.0.0.0"} + testAddrIPv6 = &testAddr{network: ipv6ProtocolICMPRaw, address: "::"} +) + +// fakeClock is a wrapper around clocktesting.FakeClock that tracks the number +// of times NewTicker has been called, so we can write a race-free test. +type fakeClock struct { + *clocktesting.FakeClock + tickersAdded atomic.Int32 +} + +func newFakeClock(t time.Time) *fakeClock { + return &fakeClock{ + FakeClock: clocktesting.NewFakeClock(t), + } +} + +func (c *fakeClock) TickersAdded() int32 { + return c.tickersAdded.Load() +} + +func (c *fakeClock) NewTicker(d time.Duration) clock.Ticker { + defer c.tickersAdded.Add(1) + return c.FakeClock.NewTicker(d) +} + +type testMonitor struct { + *NodeLatencyMonitor + clientset *fake.Clientset + informerFactory informers.SharedInformerFactory + crdClientset *fakeversioned.Clientset + crdInformerFactory crdinformers.SharedInformerFactory + ctrl *gomock.Controller + mockListener *monitortesting.MockPacketListener + clock *fakeClock +} + +func newTestMonitor( + t *testing.T, + nodeConfig *config.NodeConfig, + trafficEncapMode config.TrafficEncapModeType, + clockT time.Time, + objects []runtime.Object, + crdObjects []runtime.Object, +) *testMonitor { + ctrl := gomock.NewController(t) + clientset := fake.NewSimpleClientset(objects...) + informerFactory := informers.NewSharedInformerFactory(clientset, 0) + nodeInformer := informerFactory.Core().V1().Nodes() + crdClientset := fakeversioned.NewSimpleClientset(crdObjects...) + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClientset, 0) + nlmInformer := crdInformerFactory.Crd().V1alpha1().NodeLatencyMonitors() + m := NewNodeLatencyMonitor(nodeInformer, nlmInformer, nodeConfig, trafficEncapMode) + fakeClock := newFakeClock(clockT) + m.clock = fakeClock + mockListener := monitortesting.NewMockPacketListener(ctrl) + m.listener = mockListener + + return &testMonitor{ + NodeLatencyMonitor: m, + clientset: clientset, + informerFactory: informerFactory, + crdClientset: crdClientset, + crdInformerFactory: crdInformerFactory, + ctrl: ctrl, + mockListener: mockListener, + clock: fakeClock, + } +} + +func TestEnableMonitor(t *testing.T) { + ctx := context.Background() + + stopCh := make(chan struct{}) + defer close(stopCh) + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, time.Now(), nil, nil) + m.crdInformerFactory.Start(stopCh) + m.informerFactory.Start(stopCh) + m.crdInformerFactory.WaitForCacheSync(stopCh) + m.informerFactory.WaitForCacheSync(stopCh) + go m.Run(stopCh) + + pConnIPv4 := nettest.NewPacketConn(testAddrIPv4, nil, nil) + m.mockListener.EXPECT().ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0").Return(pConnIPv4, nil) + pConnIPv6 := nettest.NewPacketConn(testAddrIPv6, nil, nil) + m.mockListener.EXPECT().ListenPacket(ipv6ProtocolICMPRaw, "::").Return(pConnIPv6, nil) + + _, err := m.crdClientset.CrdV1alpha1().NodeLatencyMonitors().Create(ctx, nlm, metav1.CreateOptions{}) + require.NoError(t, err) + + require.Eventually(t, m.ctrl.Satisfied, 2*time.Second, 10*time.Millisecond) + assert.False(t, pConnIPv4.IsClosed()) + assert.False(t, pConnIPv6.IsClosed()) +} + +// collectProbePackets takes as input a channel used to receive packets, and returns a function that +// can be called to collect received packets. It is useful to write assertions in tests that +// validate the list of received packets. collectProbePackets starts a goroutine in the background, +// which exists when either the input channel or the stop channel is closed. +func collectProbePackets(ch <-chan *nettest.Packet, stopCh <-chan struct{}) func([]*nettest.Packet) []*nettest.Packet { + var m sync.Mutex + newPackets := make([]*nettest.Packet, 0) + go func() { + for { + select { + // It may not always be convenient to close the input packet channel, so + // stopCh can also be used as a signal to terminate the receiving goroutine. + case <-stopCh: + return + case p, ok := <-ch: + if !ok { + return + } + func() { + m.Lock() + defer m.Unlock() + newPackets = append(newPackets, p) + }() + } + } + }() + return func(packets []*nettest.Packet) []*nettest.Packet { + m.Lock() + defer m.Unlock() + packets = append(packets, newPackets...) + newPackets = make([]*nettest.Packet, 0) + return packets + } +} + +func extractIPs(packets []*nettest.Packet) []string { + ips := make([]string, len(packets)) + for idx := range packets { + ips[idx] = packets[idx].Addr.String() + } + return ips +} + +func TestDisableMonitor(t *testing.T) { + ctx := context.Background() + + stopCh := make(chan struct{}) + defer close(stopCh) + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, time.Now(), nil, []runtime.Object{nlm}) + m.crdInformerFactory.Start(stopCh) + m.informerFactory.Start(stopCh) + m.crdInformerFactory.WaitForCacheSync(stopCh) + m.informerFactory.WaitForCacheSync(stopCh) + + pConnIPv4 := nettest.NewPacketConn(testAddrIPv4, nil, nil) + m.mockListener.EXPECT().ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0").Return(pConnIPv4, nil) + pConnIPv6 := nettest.NewPacketConn(testAddrIPv6, nil, nil) + m.mockListener.EXPECT().ListenPacket(ipv6ProtocolICMPRaw, "::").Return(pConnIPv6, nil) + + go m.Run(stopCh) + require.Eventually(t, m.ctrl.Satisfied, 2*time.Second, 10*time.Millisecond) + + err := m.crdClientset.CrdV1alpha1().NodeLatencyMonitors().Delete(ctx, nlm.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.True(t, pConnIPv4.IsClosed()) + assert.True(t, pConnIPv6.IsClosed()) + }, 2*time.Second, 10*time.Millisecond) +} + +func TestUpdateMonitorPingInterval(t *testing.T) { + ctx := context.Background() + + stopCh := make(chan struct{}) + defer close(stopCh) + m := newTestMonitor(t, nodeConfigIPv4, config.TrafficEncapModeEncap, time.Now(), []runtime.Object{node1, node2, node3}, []runtime.Object{nlm}) + m.crdInformerFactory.Start(stopCh) + m.informerFactory.Start(stopCh) + m.crdInformerFactory.WaitForCacheSync(stopCh) + m.informerFactory.WaitForCacheSync(stopCh) + fakeClock := m.clock + + outCh := make(chan *nettest.Packet, 10) + collect := collectProbePackets(outCh, stopCh) + pConnIPv4 := nettest.NewPacketConn(testAddrIPv4, nil, outCh) + m.mockListener.EXPECT().ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0").Return(pConnIPv4, nil) + + go m.Run(stopCh) + + // We wait for the first ticker to be created, which indicates that we can advance the clock + // safely. This is not ideal, because it relies on knowledge of how the implementation + // creates tickers. + require.Eventually(t, func() bool { + return fakeClock.TickersAdded() == 1 + }, 2*time.Second, 10*time.Millisecond) + + // After advancing the clock by 60s (ping interval), we should see the ICMP requests being sent. + fakeClock.Step(60 * time.Second) + packets := []*nettest.Packet{} + assert.EventuallyWithT(t, func(t *assert.CollectT) { + packets = collect(packets) + assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1"}, extractIPs(packets)) + }, 2*time.Second, 10*time.Millisecond) + + // We increase the ping interval from 60s to 90s. + newNLM := nlm.DeepCopy() + newNLM.Spec.PingIntervalSeconds = 90 + newNLM.Generation = 1 + _, err := m.crdClientset.CrdV1alpha1().NodeLatencyMonitors().Update(ctx, newNLM, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Again, we have to wait for the second ticker to be created before we can advance the clock. + require.Eventually(t, func() bool { + return fakeClock.TickersAdded() == 2 + }, 2*time.Second, 10*time.Millisecond) + + // When advancing the clock by 60s (old ping iterval), we should not observe any ICMP requests. + // We only wait for 200ms. + fakeClock.Step(60 * time.Second) + assert.Never(t, func() bool { + return len(collect(nil)) > 0 + }, 200*time.Millisecond, 50*time.Millisecond) + + // After advancing the clock by an extra 30s, we should see the ICMP requests being sent. + fakeClock.Step(30 * time.Second) + packets = []*nettest.Packet{} + assert.EventuallyWithT(t, func(t *assert.CollectT) { + packets = collect(packets) + assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1"}, extractIPs(packets)) + }, 2*time.Second, 10*time.Millisecond) + +} + +func TestSendPing(t *testing.T) { + testCases := []struct { + name string + addr net.Addr + targetIP string + requestType icmp.Type + }{ + { + name: "ipv4", + addr: testAddrIPv4, + targetIP: "10.0.2.1", + requestType: ipv4.ICMPTypeEcho, + }, + { + name: "ipv6", + addr: testAddrIPv6, + targetIP: "2001:ab03:cd04:55ee:100b::1", + requestType: ipv6.ICMPTypeEchoRequest, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + now := time.Now() + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, now, nil, nil) + const icmpSeqNum = 12 + m.icmpSeqNum.Store(icmpSeqNum) + expectedMsg := icmp.Message{ + Type: tc.requestType, + Code: 0, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: icmpSeqNum + 1, + Data: []byte(now.Format(time.RFC3339Nano)), + }, + } + outCh := make(chan *nettest.Packet, 1) + pConn := nettest.NewPacketConn(tc.addr, nil, outCh) + require.NoError(t, m.sendPing(pConn, net.ParseIP(tc.targetIP))) + expectedBytes, err := expectedMsg.Marshal(nil) + require.NoError(t, err) + select { + case p := <-outCh: + assert.Equal(t, tc.targetIP, p.Addr.String()) + assert.Equal(t, expectedBytes, p.Bytes) + case <-time.After(1 * time.Second): + assert.Fail(t, "ICMP message was not sent correctly") + } + entry, ok := m.latencyStore.getNodeIPLatencyEntry(tc.targetIP) + assert.True(t, ok) + assert.Equal(t, now, entry.LastSendTime) + }) + } +} + +// TestRecvPings tests that ICMP messages are handled correctly when received. We only consider the +// "normal" case here. The ICMP parsing and validation logic is tested comprehensively in +// TestHandlePing. +func TestRecvPings(t *testing.T) { + now := time.Now() + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, now, nil, nil) + inCh := make(chan *nettest.Packet, 1) + pConn := nettest.NewPacketConn(testAddrIPv4, inCh, nil) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + // This will block until the socket is closed. + m.recvPings(pConn, true) + }() + msg := icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 13, + Data: []byte(now.Format(time.RFC3339Nano)), + }, + } + msgBytes, err := msg.Marshal(nil) + require.NoError(t, err) + peerIP := "10.0.2.1" + peerAddr := &testAddr{network: ipv4ProtocolICMPRaw, address: peerIP} + inCh <- &nettest.Packet{ + Addr: peerAddr, + Bytes: msgBytes, + } + assert.Eventually(t, func() bool { + _, ok := m.latencyStore.getNodeIPLatencyEntry(peerIP) + return ok + }, 2*time.Second, 10*time.Millisecond) + + pConn.Close() + select { + case <-doneCh: + break + case <-time.After(1 * time.Second): + assert.Fail(t, "recvPings should return when socket is closed") + } +} + +func MustMarshal(msg *icmp.Message) []byte { + msgBytes, err := msg.Marshal(nil) + if err != nil { + panic("failed to marshal ICMP message") + } + return msgBytes +} + +func TestHandlePing(t *testing.T) { + now := time.Now() + payload := []byte(now.Format(time.RFC3339Nano)) + + testCases := []struct { + name string + msgBytes []byte + isIPv4 bool + isValid bool + }{ + { + name: "valid IPv4", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: payload, + }, + }), + isIPv4: true, + isValid: true, + }, + { + name: "valid IPv6", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv6.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: payload, + }, + }), + isIPv4: false, + isValid: true, + }, + { + name: "invalid ICMP message", + msgBytes: []byte("foo"), // this is too short to be a valid ICMP message + isIPv4: true, + isValid: false, + }, + { + name: "wrong IP family", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: payload, + }, + }), + isIPv4: false, + isValid: false, + }, + { + name: "not an ICMP echo reply IPv4", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEcho, + Code: 0, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: payload, + }, + }), + isIPv4: true, + isValid: false, + }, + { + name: "not an ICMP echo reply IPv6", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv6.ICMPTypeEchoRequest, + Code: 0, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: payload, + }, + }), + isIPv4: false, + isValid: false, + }, + { + name: "wrong echo ID", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID) + 1, + Seq: 1, + Data: payload, + }, + }), + isIPv4: true, + isValid: false, + }, + { + name: "invalid payload", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: []byte("foobar"), + }, + }), + isIPv4: true, + isValid: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, now, nil, nil) + peerIP := "10.0.2.1" + if !tc.isIPv4 { + peerIP = "2001:ab03:cd04:55ee:100b::1" + } + const rtt = 1 * time.Second + m.clock.Step(rtt) + m.handlePing(tc.msgBytes, peerIP, tc.isIPv4) + entry, ok := m.latencyStore.getNodeIPLatencyEntry(peerIP) + if tc.isValid { + require.True(t, ok) + assert.Equal(t, m.clock.Now(), entry.LastRecvTime) + assert.Equal(t, rtt, entry.LastMeasuredRTT) + } else { + assert.False(t, ok) + } + }) + } +} + +func TestNodeAddUpdateDelete(t *testing.T) { + ctx := context.Background() + + node := makeNode("node3", []string{"192.168.77.103", "192:168:77::103"}, []string{"10.0.3.0/24", "2001:ab03:cd04:55ee:100c::/80"}) + updatedNode := makeNode("node3", []string{"192.168.77.104", "192:168:77::104"}, []string{"10.0.4.0/24", "2001:ab03:cd04:55ee:100d::/80"}) + + testCases := []struct { + encapMode config.TrafficEncapModeType + // before update + expectedNodeIPs1 []string + // after update + expectedNodeIPs2 []string + }{ + { + encapMode: config.TrafficEncapModeEncap, + expectedNodeIPs1: []string{"10.0.3.1", "2001:ab03:cd04:55ee:100c::1"}, + expectedNodeIPs2: []string{"10.0.4.1", "2001:ab03:cd04:55ee:100d::1"}, + }, + { + encapMode: config.TrafficEncapModeNoEncap, + expectedNodeIPs1: []string{"10.0.3.1", "2001:ab03:cd04:55ee:100c::1"}, + expectedNodeIPs2: []string{"10.0.4.1", "2001:ab03:cd04:55ee:100d::1"}, + }, + { + encapMode: config.TrafficEncapModeNetworkPolicyOnly, + expectedNodeIPs1: []string{"192.168.77.103", "192:168:77::103"}, + expectedNodeIPs2: []string{"192.168.77.104", "192:168:77::104"}, + }, + } + + convertIPsToStrs := func(ips []net.IP) []string { + ipStrs := make([]string, len(ips)) + for idx := range ips { + ipStrs[idx] = ips[idx].String() + } + return ipStrs + } + + for _, tc := range testCases { + t.Run(tc.encapMode.String(), func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + // We start with node1 (the current Node) only, and it should be ignored. + m := newTestMonitor(t, nodeConfigIPv4, tc.encapMode, time.Now(), []runtime.Object{node1}, nil) + m.informerFactory.Start(stopCh) + m.informerFactory.WaitForCacheSync(stopCh) + go m.Run(stopCh) + + require.Empty(t, m.latencyStore.ListNodeIPs()) + + _, err := m.clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + require.NoError(t, err) + + // We convert the []net.IP slice to []string before comparing the slices, + // and not the reverse, because creating a net.IP with net.ParseIP for an + // IPv4 address will yield a 16-byte slice which may not exactly match the + // result of ListNodeIPs(), even though the values indeed represent the same + // IP address. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.ElementsMatch(t, tc.expectedNodeIPs1, convertIPsToStrs(m.latencyStore.ListNodeIPs())) + }, 2*time.Second, 10*time.Millisecond) + + _, err = m.clientset.CoreV1().Nodes().Update(ctx, updatedNode, metav1.UpdateOptions{}) + require.NoError(t, err) + + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.ElementsMatch(t, tc.expectedNodeIPs2, convertIPsToStrs(m.latencyStore.ListNodeIPs())) + }, 2*time.Second, 10*time.Millisecond) + + err = m.clientset.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Empty(t, m.latencyStore.ListNodeIPs()) + }, 2*time.Second, 10*time.Millisecond) + }) + } +} + +func TestMonitorLoop(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, time.Now(), []runtime.Object{node1, node2, node3}, []runtime.Object{nlm}) + m.crdInformerFactory.Start(stopCh) + m.informerFactory.Start(stopCh) + m.crdInformerFactory.WaitForCacheSync(stopCh) + m.informerFactory.WaitForCacheSync(stopCh) + fakeClock := m.clock + + in4Ch := make(chan *nettest.Packet, 10) + in6Ch := make(chan *nettest.Packet, 10) + outCh := make(chan *nettest.Packet, 10) + collect := collectProbePackets(outCh, stopCh) + pConnIPv4 := nettest.NewPacketConn(testAddrIPv4, in4Ch, outCh) + m.mockListener.EXPECT().ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0").Return(pConnIPv4, nil) + pConnIPv6 := nettest.NewPacketConn(testAddrIPv6, in6Ch, outCh) + m.mockListener.EXPECT().ListenPacket(ipv6ProtocolICMPRaw, "::").Return(pConnIPv6, nil) + + go m.Run(stopCh) + + // We wait for the first ticker to be created, which indicates that we can advance the clock + // safely. This is not ideal, because it relies on knowledge of how the implementation + // creates tickers. + require.Eventually(t, func() bool { + return fakeClock.TickersAdded() == 1 + }, 2*time.Second, 10*time.Millisecond) + + require.Empty(t, m.latencyStore.getNodeIPLatencyKeys()) + + // After advancing the clock by 60s (ping interval), we should see the ICMP requests being sent. + fakeClock.Step(60 * time.Second) + packets := []*nettest.Packet{} + assert.EventuallyWithT(t, func(t *assert.CollectT) { + packets = collect(packets) + assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1", "2001:ab03:cd04:55ee:100b::1", "2001:ab03:cd04:55ee:100c::1"}, extractIPs(packets)) + }, 2*time.Second, 10*time.Millisecond) + + // The store is updated when sending the ICMP requests, as we need to store the send timestamp. + assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1", "2001:ab03:cd04:55ee:100b::1", "2001:ab03:cd04:55ee:100c::1"}, m.latencyStore.getNodeIPLatencyKeys()) + + // Advance the clock by one more second, and send replies for all ICMP requests. + fakeClock.Step(1 * time.Second) + for _, packet := range packets { + if packet.Addr.Network() == ipv4ProtocolICMPRaw { + request, err := icmp.ParseMessage(protocolICMP, packet.Bytes) + require.NoError(t, err) + replyBytes := MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: request.Body, + }) + in4Ch <- &nettest.Packet{ + Addr: packet.Addr, + Bytes: replyBytes, + } + } else { + request, err := icmp.ParseMessage(protocolICMPv6, packet.Bytes) + require.NoError(t, err) + replyBytes := MustMarshal(&icmp.Message{ + Type: ipv6.ICMPTypeEchoReply, + Body: request.Body, + }) + in6Ch <- &nettest.Packet{ + Addr: packet.Addr, + Bytes: replyBytes, + } + } + } + + // The store should eventually be updated with the correct RTT measurements. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + for _, ip := range []string{"10.0.2.1", "10.0.3.1", "2001:ab03:cd04:55ee:100b::1", "2001:ab03:cd04:55ee:100c::1"} { + entry, _ := m.latencyStore.getNodeIPLatencyEntry(ip) + assert.Equal(t, 1*time.Second, entry.LastMeasuredRTT) + } + }, 2*time.Second, 10*time.Millisecond) + + // Delete node3 synchronously, which simplifies testing. + m.onNodeDelete(node3) + + // After advancing the clock by another 60s (ping interval), we should see another round of + // ICMP requests being sent, this time not including the Node that was deleted. + // The latency store should also eventually be cleaned up to remove the stale entries for + // that Node. + fakeClock.Step(60 * time.Second) + packets = []*nettest.Packet{} + assert.EventuallyWithT(t, func(t *assert.CollectT) { + packets = collect(packets) + assert.ElementsMatch(t, []string{"10.0.2.1", "2001:ab03:cd04:55ee:100b::1"}, extractIPs(packets)) + nodeIPs := m.latencyStore.getNodeIPLatencyKeys() + assert.ElementsMatch(t, []string{"10.0.2.1", "2001:ab03:cd04:55ee:100b::1"}, nodeIPs) + }, 2*time.Second, 10*time.Millisecond) +} diff --git a/pkg/agent/monitortool/testing/mock_monitortool.go b/pkg/agent/monitortool/testing/mock_monitortool.go new file mode 100644 index 00000000000..5631dbb5ce6 --- /dev/null +++ b/pkg/agent/monitortool/testing/mock_monitortool.go @@ -0,0 +1,69 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: antrea.io/antrea/pkg/agent/monitortool (interfaces: PacketListener) +// +// Generated by this command: +// +// mockgen -copyright_file hack/boilerplate/license_header.raw.txt -destination pkg/agent/monitortool/testing/mock_monitortool.go -package testing antrea.io/antrea/pkg/agent/monitortool PacketListener +// +// Package testing is a generated GoMock package. +package testing + +import ( + net "net" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockPacketListener is a mock of PacketListener interface. +type MockPacketListener struct { + ctrl *gomock.Controller + recorder *MockPacketListenerMockRecorder +} + +// MockPacketListenerMockRecorder is the mock recorder for MockPacketListener. +type MockPacketListenerMockRecorder struct { + mock *MockPacketListener +} + +// NewMockPacketListener creates a new mock instance. +func NewMockPacketListener(ctrl *gomock.Controller) *MockPacketListener { + mock := &MockPacketListener{ctrl: ctrl} + mock.recorder = &MockPacketListenerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPacketListener) EXPECT() *MockPacketListenerMockRecorder { + return m.recorder +} + +// ListenPacket mocks base method. +func (m *MockPacketListener) ListenPacket(arg0, arg1 string) (net.PacketConn, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListenPacket", arg0, arg1) + ret0, _ := ret[0].(net.PacketConn) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListenPacket indicates an expected call of ListenPacket. +func (mr *MockPacketListenerMockRecorder) ListenPacket(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListenPacket", reflect.TypeOf((*MockPacketListener)(nil).ListenPacket), arg0, arg1) +} diff --git a/pkg/agent/util/nettest/packetconn.go b/pkg/agent/util/nettest/packetconn.go index f4ba56f4be7..cc19f952b66 100644 --- a/pkg/agent/util/nettest/packetconn.go +++ b/pkg/agent/util/nettest/packetconn.go @@ -52,7 +52,7 @@ func (pc *PacketConn) ReadFrom(p []byte) (int, net.Addr, error) { // function. It is still possible for the connection to be closed between this check and the // select, but it doesn't matter in this case because that would mean the 2 function calls // (Close and ReadFrom) are concurrent. - if pc.isClosed() { + if pc.IsClosed() { return 0, nil, pc.closedConnectionError("read") } select { @@ -66,7 +66,7 @@ func (pc *PacketConn) ReadFrom(p []byte) (int, net.Addr, error) { func (pc *PacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { // See the comment in ReadFrom. - if pc.isClosed() { + if pc.IsClosed() { return 0, pc.closedConnectionError("write") } packet := &Packet{ @@ -131,7 +131,7 @@ func (pc *PacketConn) Receive() ([]byte, net.Addr, error) { } } -func (pc *PacketConn) isClosed() bool { +func (pc *PacketConn) IsClosed() bool { select { case <-pc.closeCh: return true