diff --git a/hack/update-codegen-dockerized.sh b/hack/update-codegen-dockerized.sh index 34a0467155e..26b77491603 100755 --- a/hack/update-codegen-dockerized.sh +++ b/hack/update-codegen-dockerized.sh @@ -45,6 +45,7 @@ MOCKGEN_TARGETS=( "pkg/agent/memberlist Memberlist ." "pkg/agent/multicast RouteInterface testing" "pkg/agent/types McastNetworkPolicyController testing" + "pkg/agent/monitortool PacketListener testing" "pkg/agent/nodeportlocal/portcache LocalPortOpener testing" "pkg/agent/nodeportlocal/rules PodPortRules testing" "pkg/agent/openflow Client testing" diff --git a/pkg/agent/monitortool/latency_store.go b/pkg/agent/monitortool/latency_store.go index 78882a72798..35cef35eb60 100644 --- a/pkg/agent/monitortool/latency_store.go +++ b/pkg/agent/monitortool/latency_store.go @@ -78,6 +78,21 @@ func (s *LatencyStore) getNodeIPLatencyEntry(nodeIP string) (NodeIPLatencyEntry, return *entry, ok } +// getNodeIPLatencyKeys returns the list of Node IPs for which we currently have +// latency measurements. +// It is only used for testing purposes. +func (s *LatencyStore) getNodeIPLatencyKeys() []string { + s.mutex.RLock() + defer s.mutex.RUnlock() + + keys := make([]string, 0, len(s.nodeIPLatencyMap)) + for key := range s.nodeIPLatencyMap { + keys = append(keys, key) + } + + return keys +} + // SetNodeIPLatencyEntry sets the NodeIPLatencyEntry for the given Node IP func (s *LatencyStore) SetNodeIPLatencyEntry(nodeIP string, mutator func(entry *NodeIPLatencyEntry)) { s.mutex.Lock() diff --git a/pkg/agent/monitortool/monitor.go b/pkg/agent/monitortool/monitor.go index efd6693535d..c840c571643 100644 --- a/pkg/agent/monitortool/monitor.go +++ b/pkg/agent/monitortool/monitor.go @@ -28,17 +28,15 @@ import ( coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/utils/clock" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/apis/crd/v1alpha1" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" ) -var ( - icmpSeq uint32 - // #nosec G404: random number generator not used for security purposes. - icmpEchoID = rand.Int31n(1 << 16) -) +// #nosec G404: random number generator not used for security purposes. +var icmpEchoID = rand.Int31n(1 << 16) const ( ipv4ProtocolICMPRaw = "ip4:icmp" @@ -47,15 +45,14 @@ const ( protocolICMPv6 = 58 ) -// getICMPSeq returns the next sequence number as uint16, -// wrapping around to 0 after reaching the maximum value of uint16. -func getICMPSeq() uint16 { - // Increment the sequence number atomically and get the new value. - // We use atomic.AddUint32 and pass 1 as the increment. - // The returned value is the new value post-increment. - newVal := atomic.AddUint32(&icmpSeq, 1) +type PacketListener interface { + ListenPacket(network, address string) (net.PacketConn, error) +} + +type ICMPListener struct{} - return uint16(newVal) +func (l *ICMPListener) ListenPacket(network, address string) (net.PacketConn, error) { + return icmp.ListenPacket(network, address) } // NodeLatencyMonitor is a tool to monitor the latency of the Node. @@ -70,9 +67,15 @@ type NodeLatencyMonitor struct { isIPv6Enabled bool // nodeName is the name of the current Node, used to filter out the current Node from the latency monitor. - nodeName string - nodeInformer coreinformers.NodeInformer - nodeLatencyMonitorInformer crdinformers.NodeLatencyMonitorInformer + nodeName string + + nodeInformerSynced cache.InformerSynced + nlmInformerSynced cache.InformerSynced + + clock clock.WithTicker + listener PacketListener + + icmpSeqNum atomic.Uint32 } // latencyConfig is the config for the latency monitor. @@ -84,16 +87,20 @@ type latencyConfig struct { } // NewNodeLatencyMonitor creates a new NodeLatencyMonitor. -func NewNodeLatencyMonitor(nodeInformer coreinformers.NodeInformer, +func NewNodeLatencyMonitor( + nodeInformer coreinformers.NodeInformer, nlmInformer crdinformers.NodeLatencyMonitorInformer, nodeConfig *config.NodeConfig, - trafficEncapMode config.TrafficEncapModeType) *NodeLatencyMonitor { + trafficEncapMode config.TrafficEncapModeType, +) *NodeLatencyMonitor { m := &NodeLatencyMonitor{ - latencyStore: NewLatencyStore(trafficEncapMode.IsNetworkPolicyOnly()), - latencyConfigChanged: make(chan latencyConfig), - nodeInformer: nodeInformer, - nodeLatencyMonitorInformer: nlmInformer, - nodeName: nodeConfig.Name, + latencyStore: NewLatencyStore(trafficEncapMode.IsNetworkPolicyOnly()), + latencyConfigChanged: make(chan latencyConfig), + nodeInformerSynced: nodeInformer.Informer().HasSynced, + nlmInformerSynced: nlmInformer.Informer().HasSynced, + nodeName: nodeConfig.Name, + clock: clock.RealClock{}, + listener: &ICMPListener{}, } m.isIPv4Enabled, _ = config.IsIPv4Enabled(nodeConfig, trafficEncapMode) @@ -169,7 +176,7 @@ func (m *NodeLatencyMonitor) onNodeDelete(obj interface{}) { // onNodeLatencyMonitorAdd is the event handler for adding NodeLatencyMonitor. func (m *NodeLatencyMonitor) onNodeLatencyMonitorAdd(obj interface{}) { nlm := obj.(*v1alpha1.NodeLatencyMonitor) - klog.V(4).InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm)) + klog.InfoS("NodeLatencyMonitor added", "NodeLatencyMonitor", klog.KObj(nlm)) m.updateLatencyConfig(nlm) } @@ -219,8 +226,8 @@ func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error requestType = ipv4.ICMPTypeEcho } - timeStart := time.Now() - seqID := getICMPSeq() + timeStart := m.clock.Now() + seqID := m.getICMPSeqNum() body := &icmp.Echo{ ID: int(icmpEchoID), Seq: int(seqID), @@ -240,8 +247,7 @@ func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error } // Send the ICMP message - _, err = socket.WriteTo(msgBytes, ip) - if err != nil { + if _, err = socket.WriteTo(msgBytes, ip); err != nil { return err } @@ -254,8 +260,77 @@ func (m *NodeLatencyMonitor) sendPing(socket net.PacketConn, addr net.IP) error return nil } -// recvPing receives an ICMP message from the target IP address. -func (m *NodeLatencyMonitor) recvPing(socket net.PacketConn, isIPv4 bool) { +func (m *NodeLatencyMonitor) handlePing(buffer []byte, peerIP string, isIPv4 bool) { + // Parse the ICMP message + var msg *icmp.Message + if isIPv4 { + var err error + msg, err = icmp.ParseMessage(protocolICMP, buffer) + if err != nil { + klog.ErrorS(err, "Failed to parse ICMP message") + return + } + if msg.Type != ipv4.ICMPTypeEcho && msg.Type != ipv4.ICMPTypeEchoReply { + klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) + return + } + // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) + if msg.Type == ipv4.ICMPTypeEcho { + klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) + return + } + } else { + var err error + msg, err = icmp.ParseMessage(protocolICMPv6, buffer) + if err != nil { + klog.ErrorS(err, "Failed to parse ICMP message") + return + } + if msg.Type != ipv6.ICMPTypeEchoRequest && msg.Type != ipv6.ICMPTypeEchoReply { + klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) + return + } + // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) + if msg.Type == ipv6.ICMPTypeEchoRequest { + klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) + return + } + } + + echo, ok := msg.Body.(*icmp.Echo) + if !ok { + klog.ErrorS(nil, "Failed to assert type as *icmp.Echo") + return + } + if echo.ID != int(icmpEchoID) { + klog.V(4).InfoS("Ignoring ICMP message with wrong echo ID", "msg", msg) + return + } + + klog.V(4).InfoS("Received ICMP message", "IP", peerIP, "msg", msg) + + // Parse the time from the ICMP data + sentTime, err := time.Parse(time.RFC3339Nano, string(echo.Data)) + if err != nil { + klog.ErrorS(err, "Failed to parse time from ICMP data") + return + } + + // Calculate the round-trip time + end := m.clock.Now() + rtt := end.Sub(sentTime) + klog.V(4).InfoS("Updating latency entry for Node IP", "IP", peerIP, "lastSendTime", sentTime, "lastRecvTime", end, "RTT", rtt) + + // Update the latency store + mutator := func(entry *NodeIPLatencyEntry) { + entry.LastRecvTime = end + entry.LastMeasuredRTT = rtt + } + m.latencyStore.SetNodeIPLatencyEntry(peerIP, mutator) +} + +// recvPings receives ICMP messages. +func (m *NodeLatencyMonitor) recvPings(socket net.PacketConn, isIPv4 bool) { // We only expect small packets, if we receive a larger packet, we will drop the extra data. readBuffer := make([]byte, 128) for { @@ -268,72 +343,7 @@ func (m *NodeLatencyMonitor) recvPing(socket net.PacketConn, isIPv4 bool) { return } - destIP := peer.String() - - // Parse the ICMP message - var msg *icmp.Message - if isIPv4 { - msg, err = icmp.ParseMessage(protocolICMP, readBuffer[:n]) - if err != nil { - klog.ErrorS(err, "Failed to parse ICMP message") - continue - } - if msg.Type != ipv4.ICMPTypeEcho && msg.Type != ipv4.ICMPTypeEchoReply { - klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) - continue - } - // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) - if msg.Type == ipv4.ICMPTypeEcho { - klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) - continue - } - } else { - msg, err = icmp.ParseMessage(protocolICMPv6, readBuffer[:n]) - if err != nil { - klog.ErrorS(err, "Failed to parse ICMP message") - continue - } - if msg.Type != ipv6.ICMPTypeEchoRequest && msg.Type != ipv6.ICMPTypeEchoReply { - klog.V(5).InfoS("Ignoring non-ping ICMP message", "msg", msg) - continue - } - // Ignore ICMP echo messages received from other Nodes (they will be answered by the system) - if msg.Type == ipv6.ICMPTypeEchoRequest { - klog.V(7).InfoS("Ignoring ICMP echo request message", "msg", msg) - continue - } - } - - echo, ok := msg.Body.(*icmp.Echo) - if !ok { - klog.ErrorS(nil, "Failed to assert type as *icmp.Echo") - continue - } - if echo.ID != int(icmpEchoID) { - klog.V(4).InfoS("Ignoring ICMP message with wrong echo ID", "msg", msg) - continue - } - - klog.V(4).InfoS("Received ICMP message", "IP", destIP, "msg", msg) - - // Parse the time from the ICMP data - sentTime, err := time.Parse(time.RFC3339Nano, string(echo.Data)) - if err != nil { - klog.ErrorS(err, "Failed to parse time from ICMP data") - continue - } - - // Calculate the round-trip time - end := time.Now() - rtt := end.Sub(sentTime) - klog.V(4).InfoS("Updating latency entry for Node IP", "IP", destIP, "lastSendTime", sentTime, "lastRecvTime", end, "RTT", rtt) - - // Update the latency store - mutator := func(entry *NodeIPLatencyEntry) { - entry.LastRecvTime = end - entry.LastMeasuredRTT = rtt - } - m.latencyStore.SetNodeIPLatencyEntry(destIP, mutator) + m.handlePing(readBuffer[:n], peer.String(), isIPv4) } } @@ -351,7 +361,7 @@ func (m *NodeLatencyMonitor) pingAll(ipv4Socket, ipv6Socket net.PacketConn) { klog.ErrorS(err, "Cannot send ICMP message to Node IP", "IP", toIP) } } else { - klog.ErrorS(nil, "Cannot send ICMP message to Node IP because socket is not initialized for IP family", "IP", toIP) + klog.V(3).InfoS("Cannot send ICMP message to Node IP because socket is not initialized for IP family", "IP", toIP) } } klog.V(4).InfoS("Done pinging all Nodes") @@ -359,6 +369,10 @@ func (m *NodeLatencyMonitor) pingAll(ipv4Socket, ipv6Socket net.PacketConn) { // Run starts the NodeLatencyMonitor. func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) { + if !cache.WaitForNamedCacheSync("NodeLatencyMonitor", stopCh, m.nodeInformerSynced, m.nlmInformerSynced) { + return + } + go m.monitorLoop(stopCh) <-stopCh @@ -367,8 +381,7 @@ func (m *NodeLatencyMonitor) Run(stopCh <-chan struct{}) { // monitorLoop is the main loop to monitor the latency of the Node. func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { klog.InfoS("NodeLatencyMonitor is running") - // Low level goroutine to handle ping loop - var ticker *time.Ticker + var ticker clock.Ticker var tickerCh <-chan time.Time var ipv4Socket, ipv6Socket net.PacketConn var err error @@ -390,8 +403,8 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { if ticker != nil { ticker.Stop() // Stop the current ticker } - ticker = time.NewTicker(interval) - tickerCh = ticker.C + ticker = m.clock.NewTicker(interval) + tickerCh = ticker.C() } wg := sync.WaitGroup{} @@ -417,7 +430,7 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { // recreate it if it is closed(CRD is deleted). if ipv4Socket == nil && m.isIPv4Enabled { // Create a new socket for IPv4 when it is IPv4-only - ipv4Socket, err = icmp.ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0") + ipv4Socket, err = m.listener.ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0") if err != nil { klog.ErrorS(err, "Failed to create ICMP socket for IPv4") return @@ -425,12 +438,12 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { wg.Add(1) go func() { defer wg.Done() - m.recvPing(ipv4Socket, true) + m.recvPings(ipv4Socket, true) }() } if ipv6Socket == nil && m.isIPv6Enabled { // Create a new socket for IPv6 when it is IPv6-only - ipv6Socket, err = icmp.ListenPacket(ipv6ProtocolICMPRaw, "::") + ipv6Socket, err = m.listener.ListenPacket(ipv6ProtocolICMPRaw, "::") if err != nil { klog.ErrorS(err, "Failed to create ICMP socket for IPv6") return @@ -438,7 +451,7 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { wg.Add(1) go func() { defer wg.Done() - m.recvPing(ipv6Socket, false) + m.recvPings(ipv6Socket, false) }() } } else { @@ -467,3 +480,11 @@ func (m *NodeLatencyMonitor) monitorLoop(stopCh <-chan struct{}) { } } } + +// getICMPSeqNum returns the sequence number to be used when sending the next +// ICMP echo request. It wraps around to 0 after reaching the maximum value for +// uint16. +func (m *NodeLatencyMonitor) getICMPSeqNum() uint16 { + newSeqNum := m.icmpSeqNum.Add(1) + return uint16(newSeqNum) +} diff --git a/pkg/agent/monitortool/monitor_test.go b/pkg/agent/monitortool/monitor_test.go new file mode 100644 index 00000000000..30bfe148b47 --- /dev/null +++ b/pkg/agent/monitortool/monitor_test.go @@ -0,0 +1,751 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package monitortool + +import ( + "context" + "net" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "golang.org/x/net/icmp" + "golang.org/x/net/ipv4" + "golang.org/x/net/ipv6" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/clock" + clocktesting "k8s.io/utils/clock/testing" + + "antrea.io/antrea/pkg/agent/config" + monitortesting "antrea.io/antrea/pkg/agent/monitortool/testing" + "antrea.io/antrea/pkg/agent/util/nettest" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/util/ip" +) + +func makeNode(nodeName string, nodeIPs []string, podCIDRs []string) *corev1.Node { + addresses := []corev1.NodeAddress{} + for _, ip := range nodeIPs { + addresses = append(addresses, corev1.NodeAddress{ + Type: corev1.NodeInternalIP, + Address: ip, + }) + } + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: nodeName, + }, + Spec: corev1.NodeSpec{ + PodCIDR: podCIDRs[0], + PodCIDRs: podCIDRs, + }, + Status: corev1.NodeStatus{ + Addresses: addresses, + }, + } +} + +var ( + nodeConfigDualStack = &config.NodeConfig{ + Name: "node1", + PodIPv4CIDR: ip.MustParseCIDR("10.0.1.0/24"), + PodIPv6CIDR: ip.MustParseCIDR("2001:ab03:cd04:55ee:100a::/80"), + NodeIPv4Addr: ip.MustParseCIDR("192.168.77.100/24"), + NodeIPv6Addr: ip.MustParseCIDR("192:168:77::100/80"), + } + nodeConfigIPv4 = &config.NodeConfig{ + Name: "node1", + PodIPv4CIDR: ip.MustParseCIDR("10.0.1.0/24"), + NodeIPv4Addr: ip.MustParseCIDR("192.168.77.100/24"), + } + + nlm = &crdv1alpha1.NodeLatencyMonitor{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default", + }, + Spec: crdv1alpha1.NodeLatencyMonitorSpec{ + PingIntervalSeconds: 60, + }, + } + + node1 = makeNode("node1", []string{"192.168.77.101", "192:168:77::101"}, []string{"10.0.1.0/24", "2001:ab03:cd04:55ee:100a::/80"}) + node2 = makeNode("node2", []string{"192.168.77.102", "192:168:77::102"}, []string{"10.0.2.0/24", "2001:ab03:cd04:55ee:100b::/80"}) + node3 = makeNode("node3", []string{"192.168.77.103", "192:168:77::103"}, []string{"10.0.3.0/24", "2001:ab03:cd04:55ee:100c::/80"}) +) + +type testAddr struct { + network string + address string +} + +func (a *testAddr) Network() string { + return a.network +} +func (a *testAddr) String() string { + return a.address +} + +var ( + testAddrIPv4 = &testAddr{network: ipv4ProtocolICMPRaw, address: "0.0.0.0"} + testAddrIPv6 = &testAddr{network: ipv6ProtocolICMPRaw, address: "::"} +) + +// fakeClock is a wrapper around clocktesting.FakeClock that tracks the number +// of times NewTicker has been called, so we can write a race-free test. +type fakeClock struct { + *clocktesting.FakeClock + tickersAdded atomic.Int32 +} + +func newFakeClock(t time.Time) *fakeClock { + return &fakeClock{ + FakeClock: clocktesting.NewFakeClock(t), + } +} + +func (c *fakeClock) TickersAdded() int32 { + return c.tickersAdded.Load() +} + +func (c *fakeClock) NewTicker(d time.Duration) clock.Ticker { + defer c.tickersAdded.Add(1) + return c.FakeClock.NewTicker(d) +} + +type testMonitor struct { + *NodeLatencyMonitor + clientset *fake.Clientset + informerFactory informers.SharedInformerFactory + crdClientset *fakeversioned.Clientset + crdInformerFactory crdinformers.SharedInformerFactory + ctrl *gomock.Controller + mockListener *monitortesting.MockPacketListener + clock *fakeClock +} + +func newTestMonitor( + t *testing.T, + nodeConfig *config.NodeConfig, + trafficEncapMode config.TrafficEncapModeType, + clockT time.Time, + objects []runtime.Object, + crdObjects []runtime.Object, +) *testMonitor { + ctrl := gomock.NewController(t) + clientset := fake.NewSimpleClientset(objects...) + informerFactory := informers.NewSharedInformerFactory(clientset, 0) + nodeInformer := informerFactory.Core().V1().Nodes() + crdClientset := fakeversioned.NewSimpleClientset(crdObjects...) + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClientset, 0) + nlmInformer := crdInformerFactory.Crd().V1alpha1().NodeLatencyMonitors() + m := NewNodeLatencyMonitor(nodeInformer, nlmInformer, nodeConfig, trafficEncapMode) + fakeClock := newFakeClock(clockT) + m.clock = fakeClock + mockListener := monitortesting.NewMockPacketListener(ctrl) + m.listener = mockListener + + return &testMonitor{ + NodeLatencyMonitor: m, + clientset: clientset, + informerFactory: informerFactory, + crdClientset: crdClientset, + crdInformerFactory: crdInformerFactory, + ctrl: ctrl, + mockListener: mockListener, + clock: fakeClock, + } +} + +func TestEnableMonitor(t *testing.T) { + ctx := context.Background() + + stopCh := make(chan struct{}) + defer close(stopCh) + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, time.Now(), nil, nil) + m.crdInformerFactory.Start(stopCh) + m.informerFactory.Start(stopCh) + m.crdInformerFactory.WaitForCacheSync(stopCh) + m.informerFactory.WaitForCacheSync(stopCh) + go m.Run(stopCh) + + pConnIPv4 := nettest.NewPacketConn(testAddrIPv4, nil, nil) + m.mockListener.EXPECT().ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0").Return(pConnIPv4, nil) + pConnIPv6 := nettest.NewPacketConn(testAddrIPv6, nil, nil) + m.mockListener.EXPECT().ListenPacket(ipv6ProtocolICMPRaw, "::").Return(pConnIPv6, nil) + + _, err := m.crdClientset.CrdV1alpha1().NodeLatencyMonitors().Create(ctx, nlm, metav1.CreateOptions{}) + require.NoError(t, err) + + require.Eventually(t, m.ctrl.Satisfied, 2*time.Second, 10*time.Millisecond) + assert.False(t, pConnIPv4.IsClosed()) + assert.False(t, pConnIPv6.IsClosed()) +} + +// collectProbePackets takes as input a channel used to receive packets, and returns a function that +// can be called to collect received packets. It is useful to write assertions in tests that +// validate the list of received packets. collectProbePackets starts a goroutine in the background, +// which exists when either the input channel or the stop channel is closed. +func collectProbePackets(ch <-chan *nettest.Packet, stopCh <-chan struct{}) func([]*nettest.Packet) []*nettest.Packet { + var m sync.Mutex + newPackets := make([]*nettest.Packet, 0) + go func() { + for { + select { + // It may not always be convenient to close the input packet channel, so + // stopCh can also be used as a signal to terminate the receiving goroutine. + case <-stopCh: + return + case p, ok := <-ch: + if !ok { + return + } + func() { + m.Lock() + defer m.Unlock() + newPackets = append(newPackets, p) + }() + } + } + }() + return func(packets []*nettest.Packet) []*nettest.Packet { + m.Lock() + defer m.Unlock() + packets = append(packets, newPackets...) + newPackets = make([]*nettest.Packet, 0) + return packets + } +} + +func extractIPs(packets []*nettest.Packet) []string { + ips := make([]string, len(packets)) + for idx := range packets { + ips[idx] = packets[idx].Addr.String() + } + return ips +} + +func TestDisableMonitor(t *testing.T) { + ctx := context.Background() + + stopCh := make(chan struct{}) + defer close(stopCh) + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, time.Now(), nil, []runtime.Object{nlm}) + m.crdInformerFactory.Start(stopCh) + m.informerFactory.Start(stopCh) + m.crdInformerFactory.WaitForCacheSync(stopCh) + m.informerFactory.WaitForCacheSync(stopCh) + + pConnIPv4 := nettest.NewPacketConn(testAddrIPv4, nil, nil) + m.mockListener.EXPECT().ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0").Return(pConnIPv4, nil) + pConnIPv6 := nettest.NewPacketConn(testAddrIPv6, nil, nil) + m.mockListener.EXPECT().ListenPacket(ipv6ProtocolICMPRaw, "::").Return(pConnIPv6, nil) + + go m.Run(stopCh) + require.Eventually(t, m.ctrl.Satisfied, 2*time.Second, 10*time.Millisecond) + + err := m.crdClientset.CrdV1alpha1().NodeLatencyMonitors().Delete(ctx, nlm.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.True(t, pConnIPv4.IsClosed()) + assert.True(t, pConnIPv6.IsClosed()) + }, 2*time.Second, 10*time.Millisecond) +} + +func TestUpdateMonitorPingInterval(t *testing.T) { + ctx := context.Background() + + stopCh := make(chan struct{}) + defer close(stopCh) + m := newTestMonitor(t, nodeConfigIPv4, config.TrafficEncapModeEncap, time.Now(), []runtime.Object{node1, node2, node3}, []runtime.Object{nlm}) + m.crdInformerFactory.Start(stopCh) + m.informerFactory.Start(stopCh) + m.crdInformerFactory.WaitForCacheSync(stopCh) + m.informerFactory.WaitForCacheSync(stopCh) + fakeClock := m.clock + + outCh := make(chan *nettest.Packet, 10) + collect := collectProbePackets(outCh, stopCh) + pConnIPv4 := nettest.NewPacketConn(testAddrIPv4, nil, outCh) + m.mockListener.EXPECT().ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0").Return(pConnIPv4, nil) + + go m.Run(stopCh) + + // We wait for the first ticker to be created, which indicates that we can advance the clock + // safely. This is not ideal, because it relies on knowledge of how the implementation + // creates tickers. + require.Eventually(t, func() bool { + return fakeClock.TickersAdded() == 1 + }, 2*time.Second, 10*time.Millisecond) + + // After advancing the clock by 60s (ping interval), we should see the ICMP requests being sent. + fakeClock.Step(60 * time.Second) + packets := []*nettest.Packet{} + assert.EventuallyWithT(t, func(t *assert.CollectT) { + packets = collect(packets) + assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1"}, extractIPs(packets)) + }, 2*time.Second, 10*time.Millisecond) + + // We increase the ping interval from 60s to 90s. + newNLM := nlm.DeepCopy() + newNLM.Spec.PingIntervalSeconds = 90 + newNLM.Generation = 1 + _, err := m.crdClientset.CrdV1alpha1().NodeLatencyMonitors().Update(ctx, newNLM, metav1.UpdateOptions{}) + require.NoError(t, err) + + // Again, we have to wait for the second ticker to be created before we can advance the clock. + require.Eventually(t, func() bool { + return fakeClock.TickersAdded() == 2 + }, 2*time.Second, 10*time.Millisecond) + + // When advancing the clock by 60s (old ping iterval), we should not observe any ICMP requests. + // We only wait for 200ms. + fakeClock.Step(60 * time.Second) + assert.Never(t, func() bool { + return len(collect(nil)) > 0 + }, 200*time.Millisecond, 50*time.Millisecond) + + // After advancing the clock by an extra 30s, we should see the ICMP requests being sent. + fakeClock.Step(30 * time.Second) + packets = []*nettest.Packet{} + assert.EventuallyWithT(t, func(t *assert.CollectT) { + packets = collect(packets) + assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1"}, extractIPs(packets)) + }, 2*time.Second, 10*time.Millisecond) + +} + +func TestSendPing(t *testing.T) { + testCases := []struct { + name string + addr net.Addr + targetIP string + requestType icmp.Type + }{ + { + name: "ipv4", + addr: testAddrIPv4, + targetIP: "10.0.2.1", + requestType: ipv4.ICMPTypeEcho, + }, + { + name: "ipv6", + addr: testAddrIPv6, + targetIP: "2001:ab03:cd04:55ee:100b::1", + requestType: ipv6.ICMPTypeEchoRequest, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + now := time.Now() + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, now, nil, nil) + const icmpSeqNum = 12 + m.icmpSeqNum.Store(icmpSeqNum) + expectedMsg := icmp.Message{ + Type: tc.requestType, + Code: 0, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: icmpSeqNum + 1, + Data: []byte(now.Format(time.RFC3339Nano)), + }, + } + outCh := make(chan *nettest.Packet, 1) + pConn := nettest.NewPacketConn(tc.addr, nil, outCh) + require.NoError(t, m.sendPing(pConn, net.ParseIP(tc.targetIP))) + expectedBytes, err := expectedMsg.Marshal(nil) + require.NoError(t, err) + select { + case p := <-outCh: + assert.Equal(t, tc.targetIP, p.Addr.String()) + assert.Equal(t, expectedBytes, p.Bytes) + case <-time.After(1 * time.Second): + assert.Fail(t, "ICMP message was not sent correctly") + } + entry, ok := m.latencyStore.getNodeIPLatencyEntry(tc.targetIP) + assert.True(t, ok) + assert.Equal(t, now, entry.LastSendTime) + }) + } +} + +// TestRecvPings tests that ICMP messages are handled correctly when received. We only consider the +// "normal" case here. The ICMP parsing and validation logic is tested comprehensively in +// TestHandlePing. +func TestRecvPings(t *testing.T) { + now := time.Now() + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, now, nil, nil) + inCh := make(chan *nettest.Packet, 1) + pConn := nettest.NewPacketConn(testAddrIPv4, inCh, nil) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + // This will block until the socket is closed. + m.recvPings(pConn, true) + }() + msg := icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 13, + Data: []byte(now.Format(time.RFC3339Nano)), + }, + } + msgBytes, err := msg.Marshal(nil) + require.NoError(t, err) + peerIP := "10.0.2.1" + peerAddr := &testAddr{network: ipv4ProtocolICMPRaw, address: peerIP} + inCh <- &nettest.Packet{ + Addr: peerAddr, + Bytes: msgBytes, + } + assert.Eventually(t, func() bool { + _, ok := m.latencyStore.getNodeIPLatencyEntry(peerIP) + return ok + }, 2*time.Second, 10*time.Millisecond) + + pConn.Close() + select { + case <-doneCh: + break + case <-time.After(1 * time.Second): + assert.Fail(t, "recvPings should return when socket is closed") + } +} + +func MustMarshal(msg *icmp.Message) []byte { + msgBytes, err := msg.Marshal(nil) + if err != nil { + panic("failed to marshal ICMP message") + } + return msgBytes +} + +func TestHandlePing(t *testing.T) { + now := time.Now() + payload := []byte(now.Format(time.RFC3339Nano)) + + testCases := []struct { + name string + msgBytes []byte + isIPv4 bool + isValid bool + }{ + { + name: "valid IPv4", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: payload, + }, + }), + isIPv4: true, + isValid: true, + }, + { + name: "valid IPv6", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv6.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: payload, + }, + }), + isIPv4: false, + isValid: true, + }, + { + name: "invalid ICMP message", + msgBytes: []byte("foo"), // this is too short to be a valid ICMP message + isIPv4: true, + isValid: false, + }, + { + name: "wrong IP family", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: payload, + }, + }), + isIPv4: false, + isValid: false, + }, + { + name: "not an ICMP echo reply IPv4", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEcho, + Code: 0, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: payload, + }, + }), + isIPv4: true, + isValid: false, + }, + { + name: "not an ICMP echo reply IPv6", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv6.ICMPTypeEchoRequest, + Code: 0, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: payload, + }, + }), + isIPv4: false, + isValid: false, + }, + { + name: "wrong echo ID", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID) + 1, + Seq: 1, + Data: payload, + }, + }), + isIPv4: true, + isValid: false, + }, + { + name: "invalid payload", + msgBytes: MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: &icmp.Echo{ + ID: int(icmpEchoID), + Seq: 1, + Data: []byte("foobar"), + }, + }), + isIPv4: true, + isValid: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, now, nil, nil) + peerIP := "10.0.2.1" + if !tc.isIPv4 { + peerIP = "2001:ab03:cd04:55ee:100b::1" + } + const rtt = 1 * time.Second + m.clock.Step(rtt) + m.handlePing(tc.msgBytes, peerIP, tc.isIPv4) + entry, ok := m.latencyStore.getNodeIPLatencyEntry(peerIP) + if tc.isValid { + require.True(t, ok) + assert.Equal(t, m.clock.Now(), entry.LastRecvTime) + assert.Equal(t, rtt, entry.LastMeasuredRTT) + } else { + assert.False(t, ok) + } + }) + } +} + +func TestNodeAddUpdateDelete(t *testing.T) { + ctx := context.Background() + + node := makeNode("node3", []string{"192.168.77.103", "192:168:77::103"}, []string{"10.0.3.0/24", "2001:ab03:cd04:55ee:100c::/80"}) + updatedNode := makeNode("node3", []string{"192.168.77.104", "192:168:77::104"}, []string{"10.0.4.0/24", "2001:ab03:cd04:55ee:100d::/80"}) + + testCases := []struct { + encapMode config.TrafficEncapModeType + // before update + expectedNodeIPs1 []string + // after update + expectedNodeIPs2 []string + }{ + { + encapMode: config.TrafficEncapModeEncap, + expectedNodeIPs1: []string{"10.0.3.1", "2001:ab03:cd04:55ee:100c::1"}, + expectedNodeIPs2: []string{"10.0.4.1", "2001:ab03:cd04:55ee:100d::1"}, + }, + { + encapMode: config.TrafficEncapModeNoEncap, + expectedNodeIPs1: []string{"10.0.3.1", "2001:ab03:cd04:55ee:100c::1"}, + expectedNodeIPs2: []string{"10.0.4.1", "2001:ab03:cd04:55ee:100d::1"}, + }, + { + encapMode: config.TrafficEncapModeNetworkPolicyOnly, + expectedNodeIPs1: []string{"192.168.77.103", "192:168:77::103"}, + expectedNodeIPs2: []string{"192.168.77.104", "192:168:77::104"}, + }, + } + + convertIPsToStrs := func(ips []net.IP) []string { + ipStrs := make([]string, len(ips)) + for idx := range ips { + ipStrs[idx] = ips[idx].String() + } + return ipStrs + } + + for _, tc := range testCases { + t.Run(tc.encapMode.String(), func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + // We start with node1 (the current Node) only, and it should be ignored. + m := newTestMonitor(t, nodeConfigIPv4, tc.encapMode, time.Now(), []runtime.Object{node1}, nil) + m.informerFactory.Start(stopCh) + m.informerFactory.WaitForCacheSync(stopCh) + go m.Run(stopCh) + + require.Empty(t, m.latencyStore.ListNodeIPs()) + + _, err := m.clientset.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}) + require.NoError(t, err) + + // We convert the []net.IP slice to []string before comparing the slices, + // and not the reverse, because creating a net.IP with net.ParseIP for an + // IPv4 address will yield a 16-byte slice which may not exactly match the + // result of ListNodeIPs(), even though the values indeed represent the same + // IP address. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.ElementsMatch(t, tc.expectedNodeIPs1, convertIPsToStrs(m.latencyStore.ListNodeIPs())) + }, 2*time.Second, 10*time.Millisecond) + + _, err = m.clientset.CoreV1().Nodes().Update(ctx, updatedNode, metav1.UpdateOptions{}) + require.NoError(t, err) + + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.ElementsMatch(t, tc.expectedNodeIPs2, convertIPsToStrs(m.latencyStore.ListNodeIPs())) + }, 2*time.Second, 10*time.Millisecond) + + err = m.clientset.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + assert.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Empty(t, m.latencyStore.ListNodeIPs()) + }, 2*time.Second, 10*time.Millisecond) + }) + } +} + +func TestMonitorLoop(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + m := newTestMonitor(t, nodeConfigDualStack, config.TrafficEncapModeEncap, time.Now(), []runtime.Object{node1, node2, node3}, []runtime.Object{nlm}) + m.crdInformerFactory.Start(stopCh) + m.informerFactory.Start(stopCh) + m.crdInformerFactory.WaitForCacheSync(stopCh) + m.informerFactory.WaitForCacheSync(stopCh) + fakeClock := m.clock + + in4Ch := make(chan *nettest.Packet, 10) + in6Ch := make(chan *nettest.Packet, 10) + outCh := make(chan *nettest.Packet, 10) + collect := collectProbePackets(outCh, stopCh) + pConnIPv4 := nettest.NewPacketConn(testAddrIPv4, in4Ch, outCh) + m.mockListener.EXPECT().ListenPacket(ipv4ProtocolICMPRaw, "0.0.0.0").Return(pConnIPv4, nil) + pConnIPv6 := nettest.NewPacketConn(testAddrIPv6, in6Ch, outCh) + m.mockListener.EXPECT().ListenPacket(ipv6ProtocolICMPRaw, "::").Return(pConnIPv6, nil) + + go m.Run(stopCh) + + // We wait for the first ticker to be created, which indicates that we can advance the clock + // safely. This is not ideal, because it relies on knowledge of how the implementation + // creates tickers. + require.Eventually(t, func() bool { + return fakeClock.TickersAdded() == 1 + }, 2*time.Second, 10*time.Millisecond) + + require.Empty(t, m.latencyStore.getNodeIPLatencyKeys()) + + // After advancing the clock by 60s (ping interval), we should see the ICMP requests being sent. + fakeClock.Step(60 * time.Second) + packets := []*nettest.Packet{} + assert.EventuallyWithT(t, func(t *assert.CollectT) { + packets = collect(packets) + assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1", "2001:ab03:cd04:55ee:100b::1", "2001:ab03:cd04:55ee:100c::1"}, extractIPs(packets)) + }, 2*time.Second, 10*time.Millisecond) + + // The store is updated when sending the ICMP requests, as we need to store the send timestamp. + assert.ElementsMatch(t, []string{"10.0.2.1", "10.0.3.1", "2001:ab03:cd04:55ee:100b::1", "2001:ab03:cd04:55ee:100c::1"}, m.latencyStore.getNodeIPLatencyKeys()) + + // Advance the clock by one more second, and send replies for all ICMP requests. + fakeClock.Step(1 * time.Second) + for _, packet := range packets { + if packet.Addr.Network() == ipv4ProtocolICMPRaw { + request, err := icmp.ParseMessage(protocolICMP, packet.Bytes) + require.NoError(t, err) + replyBytes := MustMarshal(&icmp.Message{ + Type: ipv4.ICMPTypeEchoReply, + Body: request.Body, + }) + in4Ch <- &nettest.Packet{ + Addr: packet.Addr, + Bytes: replyBytes, + } + } else { + request, err := icmp.ParseMessage(protocolICMPv6, packet.Bytes) + require.NoError(t, err) + replyBytes := MustMarshal(&icmp.Message{ + Type: ipv6.ICMPTypeEchoReply, + Body: request.Body, + }) + in6Ch <- &nettest.Packet{ + Addr: packet.Addr, + Bytes: replyBytes, + } + } + } + + // The store should eventually be updated with the correct RTT measurements. + assert.EventuallyWithT(t, func(t *assert.CollectT) { + for _, ip := range []string{"10.0.2.1", "10.0.3.1", "2001:ab03:cd04:55ee:100b::1", "2001:ab03:cd04:55ee:100c::1"} { + entry, _ := m.latencyStore.getNodeIPLatencyEntry(ip) + assert.Equal(t, 1*time.Second, entry.LastMeasuredRTT) + } + }, 2*time.Second, 10*time.Millisecond) + + // Delete node3 synchronously, which simplifies testing. + m.onNodeDelete(node3) + + // After advancing the clock by another 60s (ping interval), we should see another round of + // ICMP requests being sent, this time not including the Node that was deleted. + // The latency store should also eventually be cleaned up to remove the stale entries for + // that Node. + fakeClock.Step(60 * time.Second) + packets = []*nettest.Packet{} + assert.EventuallyWithT(t, func(t *assert.CollectT) { + packets = collect(packets) + assert.ElementsMatch(t, []string{"10.0.2.1", "2001:ab03:cd04:55ee:100b::1"}, extractIPs(packets)) + nodeIPs := m.latencyStore.getNodeIPLatencyKeys() + assert.ElementsMatch(t, []string{"10.0.2.1", "2001:ab03:cd04:55ee:100b::1"}, nodeIPs) + }, 2*time.Second, 10*time.Millisecond) +} diff --git a/pkg/agent/monitortool/testing/mock_monitortool.go b/pkg/agent/monitortool/testing/mock_monitortool.go new file mode 100644 index 00000000000..5631dbb5ce6 --- /dev/null +++ b/pkg/agent/monitortool/testing/mock_monitortool.go @@ -0,0 +1,69 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +// Code generated by MockGen. DO NOT EDIT. +// Source: antrea.io/antrea/pkg/agent/monitortool (interfaces: PacketListener) +// +// Generated by this command: +// +// mockgen -copyright_file hack/boilerplate/license_header.raw.txt -destination pkg/agent/monitortool/testing/mock_monitortool.go -package testing antrea.io/antrea/pkg/agent/monitortool PacketListener +// +// Package testing is a generated GoMock package. +package testing + +import ( + net "net" + reflect "reflect" + + gomock "go.uber.org/mock/gomock" +) + +// MockPacketListener is a mock of PacketListener interface. +type MockPacketListener struct { + ctrl *gomock.Controller + recorder *MockPacketListenerMockRecorder +} + +// MockPacketListenerMockRecorder is the mock recorder for MockPacketListener. +type MockPacketListenerMockRecorder struct { + mock *MockPacketListener +} + +// NewMockPacketListener creates a new mock instance. +func NewMockPacketListener(ctrl *gomock.Controller) *MockPacketListener { + mock := &MockPacketListener{ctrl: ctrl} + mock.recorder = &MockPacketListenerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPacketListener) EXPECT() *MockPacketListenerMockRecorder { + return m.recorder +} + +// ListenPacket mocks base method. +func (m *MockPacketListener) ListenPacket(arg0, arg1 string) (net.PacketConn, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListenPacket", arg0, arg1) + ret0, _ := ret[0].(net.PacketConn) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListenPacket indicates an expected call of ListenPacket. +func (mr *MockPacketListenerMockRecorder) ListenPacket(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListenPacket", reflect.TypeOf((*MockPacketListener)(nil).ListenPacket), arg0, arg1) +} diff --git a/pkg/agent/util/nettest/packetconn.go b/pkg/agent/util/nettest/packetconn.go index f4ba56f4be7..cc19f952b66 100644 --- a/pkg/agent/util/nettest/packetconn.go +++ b/pkg/agent/util/nettest/packetconn.go @@ -52,7 +52,7 @@ func (pc *PacketConn) ReadFrom(p []byte) (int, net.Addr, error) { // function. It is still possible for the connection to be closed between this check and the // select, but it doesn't matter in this case because that would mean the 2 function calls // (Close and ReadFrom) are concurrent. - if pc.isClosed() { + if pc.IsClosed() { return 0, nil, pc.closedConnectionError("read") } select { @@ -66,7 +66,7 @@ func (pc *PacketConn) ReadFrom(p []byte) (int, net.Addr, error) { func (pc *PacketConn) WriteTo(p []byte, addr net.Addr) (int, error) { // See the comment in ReadFrom. - if pc.isClosed() { + if pc.IsClosed() { return 0, pc.closedConnectionError("write") } packet := &Packet{ @@ -131,7 +131,7 @@ func (pc *PacketConn) Receive() ([]byte, net.Addr, error) { } } -func (pc *PacketConn) isClosed() bool { +func (pc *PacketConn) IsClosed() bool { select { case <-pc.closeCh: return true