Skip to content

Commit

Permalink
Delay the initialization of ARP/NDP responders
Browse files Browse the repository at this point in the history
For secondary-network scenarios, the transport interface can be
changed after the agent is started. The ARP/NDP responders should
be started after the initialization of secondary-network to bind
to the transport interface of the new index.

Besides, this change also addresses the following issues:
- NDP responder may fail to bind to the new interface due to the
Duplicate Address Detection process.
- Golang caches the zone index for the interface, which may result
in NDP responder binding on the stale interface

Fixes: #6623

Signed-off-by: Xu Liu <[email protected]>
  • Loading branch information
xliuxu committed Sep 29, 2024
1 parent a60f535 commit 2dd49d8
Show file tree
Hide file tree
Showing 11 changed files with 390 additions and 247 deletions.
15 changes: 8 additions & 7 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,10 +800,6 @@ func run(o *Options) error {
go memberlistCluster.Run(stopCh)
}

if features.DefaultFeatureGate.Enabled(features.ServiceExternalIP) {
go externalIPController.Run(stopCh)
}

if features.DefaultFeatureGate.Enabled(features.Traceflow) {
go traceflowController.Run(stopCh)
}
Expand All @@ -827,9 +823,6 @@ func run(o *Options) error {
}

go networkPolicyController.Run(stopCh)
if o.enableEgress {
go egressController.Run(stopCh)
}

var mcastController *multicast.Controller
if multicastEnabled {
Expand Down Expand Up @@ -999,6 +992,14 @@ func run(o *Options) error {
go nodeLatencyMonitor.Run(stopCh)
}

if egressController != nil {
go egressController.Run(stopCh)
}

if externalIPController != nil {
go externalIPController.Run(stopCh)
}

<-stopCh
klog.InfoS("Stopping Antrea Agent")
return nil
Expand Down
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/lithammer/dedent v1.1.0
github.com/mdlayher/arp v0.0.0-20220221190821-c37aaafac7f9
github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118
github.com/mdlayher/ndp v0.8.0
github.com/mdlayher/ndp v1.1.0
github.com/mdlayher/packet v1.1.2
github.com/miekg/dns v1.1.62
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
Expand Down Expand Up @@ -214,7 +214,6 @@ require (
github.com/vishvananda/netns v0.0.4 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f // indirect
go.etcd.io/etcd/api/v3 v3.5.14 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.5.14 // indirect
go.etcd.io/etcd/client/v3 v3.5.14 // indirect
Expand Down Expand Up @@ -251,3 +250,6 @@ require (
sigs.k8s.io/kustomize/kyaml v0.17.1 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
)

// remove this when https://github.com/mdlayher/ndp/pull/32 gets merged
replace github.com/mdlayher/ndp => github.com/xliuxu/ndp v0.0.0-20240926134643-8cf547505092
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,6 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
Expand Down Expand Up @@ -529,8 +528,6 @@ github.com/mdlayher/ethernet v0.0.0-20220221185849-529eae5b6118/go.mod h1:ZFUnHI
github.com/mdlayher/ethtool v0.0.0-20210210192532-2b88debcdd43/go.mod h1:+t7E0lkKfbBsebllff1xdTmyJt8lH37niI6kwFk9OTo=
github.com/mdlayher/genetlink v1.0.0 h1:OoHN1OdyEIkScEmRgxLEe2M9U8ClMytqA5niynLtfj0=
github.com/mdlayher/genetlink v1.0.0/go.mod h1:0rJ0h4itni50A86M2kHcgS85ttZazNt7a8H2a2cw0Gc=
github.com/mdlayher/ndp v0.8.0 h1:oVCl5JZSzT/YJE6cJd7EnNDWmX1fl4hJV0S/UCBNoHE=
github.com/mdlayher/ndp v0.8.0/go.mod h1:32w/5dDZWVSEOxyniAgKK4d7dHTuO6TCxWmUznQe3f8=
github.com/mdlayher/netlink v0.0.0-20190409211403-11939a169225/go.mod h1:eQB3mZE4aiYnlUsyGGCOpPETfdQq4Jhsgf1fk3cwQaA=
github.com/mdlayher/netlink v1.0.0/go.mod h1:KxeJAFOFLG6AjpyDkQ/iIhxygIUKD+vcwqcnu43w/+M=
github.com/mdlayher/netlink v1.1.0/go.mod h1:H4WCitaheIsdF9yOYu8CFmCgQthAPIWZmcKp9uZHgmY=
Expand Down Expand Up @@ -768,12 +765,12 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xlab/treeprint v1.2.0 h1:HzHnuAF1plUN2zGlAFHbSQP2qJ0ZAD3XF5XD7OesXRQ=
github.com/xlab/treeprint v1.2.0/go.mod h1:gj5Gd3gPdKtR1ikdDK6fnFLdmIS0X30kTTuNd/WEJu0=
github.com/xliuxu/ndp v0.0.0-20240926134643-8cf547505092 h1:1sBcuJrdQq9bawMA4Jm58h+cwefaV5ZIx5r50T/ZgTk=
github.com/xliuxu/ndp v0.0.0-20240926134643-8cf547505092/go.mod h1:FmgESgemgjl38vuOIyAHWUUL6vQKA/pQNkvXdWsdQFM=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f h1:Wku8eEdeJqIOFHtrfkYUByc4bCaTeA6fL0UJgfEiFMI=
gitlab.com/golang-commonmark/puny v0.0.0-20191124015043-9f83538fa04f/go.mod h1:Tiuhl+njh/JIg0uS/sOJVYi0x2HEa5rc1OAaVsb5tAs=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI=
Expand Down Expand Up @@ -888,7 +885,6 @@ golang.org/x/net v0.0.0-20191007182048-72f939374954/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
Expand Down Expand Up @@ -916,7 +912,6 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down Expand Up @@ -953,7 +948,6 @@ golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200217220822-9197077df867/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200602100848-8d3cce7afc34/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200728102440-3e129f6d46b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201009025420-dfb3f7c4e634/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
10 changes: 2 additions & 8 deletions pkg/agent/ipassigner/ip_assigner_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,17 +242,11 @@ func NewIPAssigner(nodeTransportInterface string, dummyDeviceName string) (IPAss
return nil, err
}
if dummyDeviceName == "" || arpIgnore > 0 {
a.defaultAssignee.arpResponder, err = responder.NewARPResponder(externalInterface)
if err != nil {
return nil, fmt.Errorf("failed to create ARP responder for link %s: %v", externalInterface.Name, err)
}
a.defaultAssignee.arpResponder = responder.NewARPResponder(externalInterface.Name)
}
}
if ipv6 != nil {
a.defaultAssignee.ndpResponder, err = responder.NewNDPResponder(externalInterface)
if err != nil {
return nil, fmt.Errorf("failed to create NDP responder for link %s: %v", externalInterface.Name, err)
}
a.defaultAssignee.ndpResponder = responder.NewNDPResponder(externalInterface.Name)
}
if dummyDeviceName != "" {
a.defaultAssignee.link, err = ensureDummyDevice(dummyDeviceName)
Expand Down
60 changes: 35 additions & 25 deletions pkg/agent/ipassigner/responder/arp_responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,44 +18,34 @@ import (
"fmt"
"net"
"sync"
"time"

"github.com/mdlayher/arp"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
)

type arpResponder struct {
iface *net.Interface
conn *arp.Client
once sync.Once
ifaceName string
assignedIPs sets.Set[string]
mutex sync.Mutex
}

var _ Responder = (*arpResponder)(nil)

func NewARPResponder(iface *net.Interface) (*arpResponder, error) {
conn, err := arp.Dial(iface)
if err != nil {
return nil, fmt.Errorf("creating ARP responder for %q: %s", iface.Name, err)
}
return &arpResponder{
iface: iface,
conn: conn,
assignedIPs: sets.New[string](),
}, nil
}

func (r *arpResponder) InterfaceName() string {
return r.iface.Name
return r.ifaceName
}

func (r *arpResponder) AddIP(ip net.IP) error {
if !utilnet.IsIPv4(ip) {
return fmt.Errorf("only IPv4 is supported")
}
if r.addIP(ip) {
klog.InfoS("Assigned IP to ARP responder", "ip", ip, "interface", r.iface.Name)
klog.InfoS("Assigned IP to ARP responder", "ip", ip, "interface", r.ifaceName)
}
return nil
}
Expand All @@ -65,40 +55,60 @@ func (r *arpResponder) RemoveIP(ip net.IP) error {
return fmt.Errorf("only IPv4 is supported")
}
if r.deleteIP(ip) {
klog.InfoS("Removed IP from ARP responder", "ip", ip, "interface", r.iface.Name)
klog.InfoS("Removed IP from ARP responder", "ip", ip, "interface", r.ifaceName)
}
return nil
}

func (r *arpResponder) handleARPRequest() error {
pkt, _, err := r.conn.Read()
func (r *arpResponder) handleARPRequest(client *arp.Client, iface *net.Interface) error {
pkt, _, err := client.Read()
if err != nil {
return err
}
if pkt.Operation != arp.OperationRequest {
return nil
}
if !r.isIPAssigned(pkt.TargetIP) {
klog.V(4).InfoS("Ignored ARP request", "ip", pkt.TargetIP, "interface", r.iface.Name)
klog.V(4).InfoS("Ignored ARP request", "ip", pkt.TargetIP, "interface", r.ifaceName)
return nil
}
if err := r.conn.Reply(pkt, r.iface.HardwareAddr, pkt.TargetIP); err != nil {
if err := client.Reply(pkt, iface.HardwareAddr, pkt.TargetIP); err != nil {
return fmt.Errorf("failed to reply ARP packet for IP %s: %v", pkt.TargetIP, err)
}
klog.V(4).InfoS("Sent ARP response", "ip", pkt.TargetIP, "interface", r.iface.Name)
klog.V(4).InfoS("Sent ARP response", "ip", pkt.TargetIP, "interface", r.ifaceName)
return nil
}

func (r *arpResponder) Run(stopCh <-chan struct{}) {
r.once.Do(func() {
wait.NonSlidingUntil(func() {
r.dialAndHandleRequests(stopCh)
}, time.Second, stopCh)
})
<-stopCh
}

func (r *arpResponder) dialAndHandleRequests(stopCh <-chan struct{}) {
transportInterface, err := net.InterfaceByName(r.ifaceName)
if err != nil {
klog.ErrorS(err, "Failed to get interface by name", "deviceName", r.ifaceName)
return
}
client, err := arp.Dial(transportInterface)
if err != nil {
klog.ErrorS(err, "Failed to dial ARP client", "deviceName", r.ifaceName)
return
}
klog.InfoS("ARP responder started", "interface", transportInterface.Name, "index", transportInterface.Index)
for {
select {
case <-stopCh:
r.conn.Close()
client.Close()
return
default:
err := r.handleARPRequest()
err := r.handleARPRequest(client, transportInterface)
if err != nil {
klog.ErrorS(err, "Failed to handle ARP request", "deviceName", r.iface.Name)
klog.ErrorS(err, "Failed to handle ARP request", "deviceName", r.ifaceName)
}
}
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/agent/ipassigner/responder/arp_responder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,13 @@ func TestARPResponder_HandleARPRequest(t *testing.T) {
assignedIPs.Insert(ip.String())
}
r := arpResponder{
iface: localIface,
conn: localARPClient,
ifaceName: localIface.Name,
assignedIPs: sets.New[string](),
}
for _, ip := range tt.assignedIPs {
r.AddIP(ip)
}
err = r.handleARPRequest()
err = r.handleARPRequest(localARPClient, localIface)
require.NoError(t, err)
// We cannot use remoteARPClient.ReadFrom as it is blocking.
replyB, addr, err := remoteConn.Receive()
Expand Down Expand Up @@ -159,7 +158,7 @@ func Test_arpResponder_addIP(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &arpResponder{
iface: iface,
ifaceName: iface.Name,
assignedIPs: tt.assignedIPs,
}
err := r.AddIP(tt.ip)
Expand Down Expand Up @@ -207,7 +206,7 @@ func Test_arpResponder_removeIP(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := &arpResponder{
iface: iface,
ifaceName: iface.Name,
assignedIPs: tt.assignedIPs,
}
err := r.RemoveIP(tt.ip)
Expand Down
62 changes: 62 additions & 0 deletions pkg/agent/ipassigner/responder/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package responder

import (
"net/netip"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)

var (
// map of transportInterfaceName to ARP responder
arpResponders = make(map[string]*arpResponder)
// map of transportInterfaceName to NDP responder
ndpResponders = make(map[string]*ndpResponder)
)

// NewARPResponder creates a new ARP responder if it does not exist for the given transportInterfaceName.
// This function is not thread-safe.
func NewARPResponder(transportInterfaceName string) *arpResponder {
if responder, ok := arpResponders[transportInterfaceName]; ok {
klog.InfoS("ARP responder already exists", "interface", transportInterfaceName)
return responder
}
a := &arpResponder{
ifaceName: transportInterfaceName,
assignedIPs: sets.New[string](),
}
klog.InfoS("Created new ARP responder", "interface", transportInterfaceName)
arpResponders[transportInterfaceName] = a
return a
}

// NewNDPResponder creates a new NDP responder if it does not exist for the given transportInterfaceName.
// This function is not thread-safe.
func NewNDPResponder(transportInterfaceName string) *ndpResponder {
if responder, ok := ndpResponders[transportInterfaceName]; ok {
klog.InfoS("NDP responder already exists", "interface", transportInterfaceName)
return responder
}
n := &ndpResponder{
ifaceName: transportInterfaceName,
multicastGroups: make(map[netip.Addr]int),
assignedIPs: sets.New[netip.Addr](),
}
klog.InfoS("Created new NDP responder", "interface", transportInterfaceName)
ndpResponders[transportInterfaceName] = n
return n
}
4 changes: 3 additions & 1 deletion pkg/agent/ipassigner/responder/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package responder

import "net"
import (
"net"
)

// Responder is an interface to handle ARP (IPv4)/NS (IPv6) queries using raw sockets.
type Responder interface {
Expand Down
Loading

0 comments on commit 2dd49d8

Please sign in to comment.