Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Multicast] removal of stale multicast routes #3242

Merged
merged 1 commit into from
May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) {
c.groupCache.Add(status)
c.queue.Add(e.group.String())
klog.InfoS("Added new multicast group to cache", "group", e.group, "interface", e.iface.InterfaceName)
return
}

// updateGroupMemberStatus updates the group status in groupCache. If a "join" message is sent from an existing member,
Expand Down Expand Up @@ -162,7 +161,6 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent
}
}
}
return
}

// checkLastMember sends out a query message on the group to check if there are still members in the group. If no new
Expand Down Expand Up @@ -295,7 +293,7 @@ func NewMulticastController(ofClient openflow.Client,
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
podInterfaceIndex: podInterfaceIndexFunc,
})
multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, isEncap, enableFlexibleIPAM)
multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, enableFlexibleIPAM)
c := &Controller{
ofClient: ofClient,
ifaceStore: ifaceStore,
Expand Down Expand Up @@ -497,7 +495,7 @@ func (c *Controller) syncGroup(groupKey string) error {
deleteLocalMulticastGroup := func() error {
err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group)
if err != nil {
klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey)
klog.ErrorS(err, "Failed to delete multicast group", "group", groupKey)
return err
}
klog.InfoS("Removed multicast route entry", "group", status.group)
Expand Down
88 changes: 75 additions & 13 deletions pkg/agent/multicast/mcast_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
Expand All @@ -34,13 +35,14 @@ const (
MulticastRecvBufferSize = 128
)

func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.Set[string], encapEnabled bool, flexibleIPAMEnabled bool) *MRouteClient {
func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.Set[string], flexibleIPAMEnabled bool) *MRouteClient {
var m = &MRouteClient{
igmpMsgChan: make(chan []byte, workerCount),
nodeConfig: nodeconfig,
groupCache: groupCache,
inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}),
multicastInterfaces: sets.List(multicastInterfaces),
outboundRouteCache: cache.NewIndexer(getMulticastOutboundEntryKey, cache.Indexers{}),
socket: multicastSocket,
flexibleIPAMEnabled: flexibleIPAMEnabled,
}
Expand Down Expand Up @@ -75,6 +77,7 @@ type MRouteClient struct {
nodeConfig *config.NodeConfig
multicastInterfaces []string
inboundRouteCache cache.Indexer
outboundRouteCache cache.Indexer
wenyingd marked this conversation as resolved.
Show resolved Hide resolved
groupCache cache.Indexer
socket RouteInterface
multicastInterfaceConfigs []multicastInterfaceConfig
Expand Down Expand Up @@ -152,23 +155,48 @@ func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error)
mEntries, _ := c.inboundRouteCache.ByIndex(GroupNameIndexName, group.String())
for _, route := range mEntries {
entry := route.(*inboundMulticastRouteEntry)
err := c.socket.DelMrouteEntry(net.ParseIP(entry.src), net.ParseIP(entry.group), entry.vif)
err := c.deleteInboundMRoute(entry)
if err != nil {
return err
}
c.inboundRouteCache.Delete(route)
}
return nil
}

func (c *MRouteClient) deleteInboundMRoute(mRoute *inboundMulticastRouteEntry) error {
err := c.socket.DelMrouteEntry(net.ParseIP(mRoute.src), net.ParseIP(mRoute.group), mRoute.vif)
if err != nil {
return err
}
c.inboundRouteCache.Delete(mRoute)
return nil
}

func (c *MRouteClient) deleteOutboundMRoute(mRoute *outboundMulticastRouteEntry) error {
err := c.socket.DelMrouteEntry(net.ParseIP(mRoute.src), net.ParseIP(mRoute.group), c.internalInterfaceVIF)
if err != nil {
return err
}
c.outboundRouteCache.Delete(mRoute)
return nil
}

// addOutboundMrouteEntry configures multicast route from Antrea gateway to all the multicast interfaces,
// allowing multicast srcNode Pods to send multicast traffic to external.
func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err error) {
func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) error {
klog.V(2).InfoS("Adding outbound multicast route entry", "src", src, "group", group, "outboundVIFs", c.externalInterfaceVIFs)
err = c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs)
err := c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs)
if err != nil {
return err
}
routeEntry := &outboundMulticastRouteEntry{
multicastRouteEntry: multicastRouteEntry{
group: group.String(),
src: src.String(),
updatedTime: time.Now(),
},
}
c.outboundRouteCache.Add(routeEntry)
return nil
}

Expand All @@ -181,15 +209,43 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI
return err
}
routeEntry := &inboundMulticastRouteEntry{
group: group.String(),
src: src.String(),
vif: inboundVIF,
vif: inboundVIF,
multicastRouteEntry: multicastRouteEntry{
group: group.String(),
src: src.String(),
updatedTime: time.Now(),
},
}
c.inboundRouteCache.Add(routeEntry)
return nil
}

// Field pktCount and updatedTime are used for removing stale multicast routes.
type multicastRouteEntry struct {
group string
src string
pktCount uint32
updatedTime time.Time
}

// outboundMulticastRouteEntry encodes the outbound multicast routing entry.
// For example,
//
// type outboundMulticastRouteEntry struct {
// group "226.94.9.9"
// src "10.0.0.55"
// } encodes the multicast route entry from Antrea gateway to multicast interfaces
//
// (10.0.0.55,226.94.9.9) Iif: antrea-gw0 Oifs: list of multicastInterfaces.
//
// The iif is always Antrea gateway and oifs are always outbound interfaces
// so we do not put them in the struct.
type outboundMulticastRouteEntry struct {
multicastRouteEntry
}

// inboundMulticastRouteEntry encodes the inbound multicast routing entry.
// It has extra field vif to represent inbound interface VIF.
// For example,
//
// type inboundMulticastRouteEntry struct {
Expand All @@ -201,16 +257,20 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI
// (10.0.0.55,226.94.9.9) Iif: wlan0 Oifs: antrea-gw0.
// The oif is always Antrea gateway so we do not put it in the struct.
type inboundMulticastRouteEntry struct {
group string
src string
vif uint16
multicastRouteEntry
vif uint16
}

func getMulticastInboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*inboundMulticastRouteEntry)
return entry.group + "/" + entry.src + "/" + fmt.Sprint(entry.vif), nil
}

func getMulticastOutboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*outboundMulticastRouteEntry)
return entry.group + "/" + entry.src, nil
}

func inboundGroupIndexFunc(obj interface{}) ([]string, error) {
entry, ok := obj.(*inboundMulticastRouteEntry)
if !ok {
Expand Down Expand Up @@ -277,10 +337,12 @@ type RouteInterface interface {
MulticastInterfaceLeaveMgroup(mgroup net.IP, ifaceIP net.IP, ifaceName string) error
// AddMrouteEntry adds multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif) and outbound multicast interfaces(oifs).
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) (err error)
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) error
// GetMroutePacketCount returns the number of routed packets by the multicast route entry.
GetMroutePacketCount(src net.IP, group net.IP) (uint32, error)
// DelMrouteEntry deletes multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif).
DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error)
DelMrouteEntry(src net.IP, group net.IP, iif uint16) error
// FlushMRoute flushes static multicast routing entries.
FlushMRoute()
// GetFD returns socket file descriptor.
Expand Down
68 changes: 68 additions & 0 deletions pkg/agent/multicast/mcast_route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import (
"fmt"
"net"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/util/runtime"
)

const (
mRouteTimeout = time.Minute * 10
)

// parseIGMPMsg parses the kernel version into parsedIGMPMsg. Note we need to consider the change
// after linux 5.9 in the igmpmsg struct when parsing vif. Please check
// https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e.
Expand Down Expand Up @@ -78,10 +84,72 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) {
}
}()

// Check packet count difference every minute for each multicast route and
// remove ones that do not route any packets in past mRouteTimeout.
// The remaining multicast routes' statistics are getting updated by
// this process as well.
go wait.NonSlidingUntil(c.updateMrouteStats, time.Minute, stopCh)

for i := 0; i < int(workerCount); i++ {
go c.worker(stopCh)
}
<-stopCh
c.socket.FlushMRoute()
syscall.Close(c.socket.GetFD())
}

func (c *MRouteClient) updateMulticastRouteStatsEntry(entry multicastRouteEntry) (isStale bool, newEntry *multicastRouteEntry) {
packetCount, err := c.socket.GetMroutePacketCount(net.ParseIP(entry.src), net.ParseIP(entry.group))
if err != nil {
klog.ErrorS(err, "Failed to get packet count for multicast route", "route", entry)
return false, nil
}
packetCountDiff := packetCount - entry.pktCount
klog.V(4).Infof("Multicast route %v routes %d packets in last %s", entry, packetCountDiff, time.Minute)
now := time.Now()
if packetCountDiff == uint32(0) {
return now.Sub(entry.updatedTime) > mRouteTimeout, nil
}
newEntry = &multicastRouteEntry{group: entry.group, src: entry.src, pktCount: packetCount, updatedTime: now}
return false, newEntry
}

func (c *MRouteClient) updateInboundMrouteStats() {
for _, obj := range c.inboundRouteCache.List() {
entry := obj.(*inboundMulticastRouteEntry)
isStale, newEntry := c.updateMulticastRouteStatsEntry(entry.multicastRouteEntry)
if isStale {
klog.V(2).InfoS("Deleting stale inbound multicast route", "group", entry.group, "source", entry.src, "VIF", entry.vif)
err := c.deleteInboundMRoute(entry)
if err != nil {
klog.ErrorS(err, "Failed to delete inbound multicast route", "group", entry.group, "source", entry.src, "VIF", entry.vif)
}
} else if newEntry != nil {
newInboundEntry := inboundMulticastRouteEntry{*newEntry, entry.vif}
c.inboundRouteCache.Update(&newInboundEntry)
}
}
}

func (c *MRouteClient) updateOutboundMrouteStats() {
for _, obj := range c.outboundRouteCache.List() {
entry := obj.(*outboundMulticastRouteEntry)
isStale, newEntry := c.updateMulticastRouteStatsEntry(entry.multicastRouteEntry)
if isStale {
klog.V(2).InfoS("Deleting stale outbound multicast route", "group", entry.group, "source", entry.src)
err := c.deleteOutboundMRoute(entry)
if err != nil {
klog.ErrorS(err, "Failed to delete outbound multicast route", "group", entry.group, "source", entry.src)
}
} else if newEntry != nil {
newOutboundEntry := outboundMulticastRouteEntry{*newEntry}
c.outboundRouteCache.Update(&newOutboundEntry)
}
}
}

func (c *MRouteClient) updateMrouteStats() {
klog.V(2).InfoS("Updating multicast route statistics and removing stale multicast routes")
c.updateInboundMrouteStats()
c.updateOutboundMrouteStats()
}
Loading
Loading