Skip to content

Commit

Permalink
[Multicast] Support removal of stale multicast routes
Browse files Browse the repository at this point in the history
Check the packet count difference every minute
for each multicast route and remove the ones that have
identical packet count in the past 10 minutes.

Signed-off-by: ceclinux <[email protected]>
  • Loading branch information
ceclinux committed Mar 26, 2024
1 parent 5843387 commit 30b1c69
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 27 deletions.
4 changes: 1 addition & 3 deletions pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func (c *Controller) addGroupMemberStatus(e *mcastGroupEvent) {
c.groupCache.Add(status)
c.queue.Add(e.group.String())
klog.InfoS("Added new multicast group to cache", "group", e.group, "interface", e.iface.InterfaceName)
return
}

// updateGroupMemberStatus updates the group status in groupCache. If a "join" message is sent from an existing member,
Expand Down Expand Up @@ -161,7 +160,6 @@ func (c *Controller) updateGroupMemberStatus(obj interface{}, e *mcastGroupEvent
}
}
}
return
}

// checkLastMember sends out a query message on the group to check if there are still members in the group. If no new
Expand Down Expand Up @@ -285,7 +283,7 @@ func NewMulticastController(ofClient openflow.Client,
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
podInterfaceIndex: podInterfaceIndexFunc,
})
multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, isEncap, enableFlexibleIPAM)
multicastRouteClient := newRouteClient(nodeConfig, groupCache, multicastSocket, multicastInterfaces, enableFlexibleIPAM)
c := &Controller{
ofClient: ofClient,
ifaceStore: ifaceStore,
Expand Down
88 changes: 75 additions & 13 deletions pkg/agent/multicast/mcast_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"net"
"strings"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
Expand All @@ -34,13 +35,14 @@ const (
MulticastRecvBufferSize = 128
)

func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.Set[string], encapEnabled bool, flexibleIPAMEnabled bool) *MRouteClient {
func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, multicastSocket RouteInterface, multicastInterfaces sets.Set[string], flexibleIPAMEnabled bool) *MRouteClient {
var m = &MRouteClient{
igmpMsgChan: make(chan []byte, workerCount),
nodeConfig: nodeconfig,
groupCache: groupCache,
inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}),
multicastInterfaces: sets.List(multicastInterfaces),
outboundRouteCache: cache.NewIndexer(getMulticastOutboundEntryKey, cache.Indexers{}),
socket: multicastSocket,
flexibleIPAMEnabled: flexibleIPAMEnabled,
}
Expand Down Expand Up @@ -75,6 +77,7 @@ type MRouteClient struct {
nodeConfig *config.NodeConfig
multicastInterfaces []string
inboundRouteCache cache.Indexer
outboundRouteCache cache.Indexer
groupCache cache.Indexer
socket RouteInterface
multicastInterfaceConfigs []multicastInterfaceConfig
Expand Down Expand Up @@ -152,23 +155,48 @@ func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error)
mEntries, _ := c.inboundRouteCache.ByIndex(GroupNameIndexName, group.String())
for _, route := range mEntries {
entry := route.(*inboundMulticastRouteEntry)
err := c.socket.DelMrouteEntry(net.ParseIP(entry.src), net.ParseIP(entry.group), entry.vif)
err := c.deleteInboundMRoute(entry)
if err != nil {
return err
}
c.inboundRouteCache.Delete(route)
}
return nil
}

func (c *MRouteClient) deleteInboundMRoute(mRoute *inboundMulticastRouteEntry) error {
err := c.socket.DelMrouteEntry(net.ParseIP(mRoute.src), net.ParseIP(mRoute.group), mRoute.vif)
if err != nil {
return err
}
c.inboundRouteCache.Delete(mRoute)
return nil
}

func (c *MRouteClient) deleteOutboundMRoute(mRoute *outboundMulticastRouteEntry) error {
err := c.socket.DelMrouteEntry(net.ParseIP(mRoute.src), net.ParseIP(mRoute.group), c.internalInterfaceVIF)
if err != nil {
return err
}
c.outboundRouteCache.Delete(mRoute)
return nil
}

// addOutboundMrouteEntry configures multicast route from Antrea gateway to all the multicast interfaces,
// allowing multicast srcNode Pods to send multicast traffic to external.
func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err error) {
func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) error {
klog.V(2).InfoS("Adding outbound multicast route entry", "src", src, "group", group, "outboundVIFs", c.externalInterfaceVIFs)
err = c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs)
err := c.socket.AddMrouteEntry(src, group, c.internalInterfaceVIF, c.externalInterfaceVIFs)
if err != nil {
return err
}
routeEntry := &outboundMulticastRouteEntry{
multicastRouteEntry: multicastRouteEntry{
group: group.String(),
src: src.String(),
updatedTime: time.Now(),
},
}
c.outboundRouteCache.Add(routeEntry)
return nil
}

Expand All @@ -181,15 +209,43 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI
return err
}
routeEntry := &inboundMulticastRouteEntry{
group: group.String(),
src: src.String(),
vif: inboundVIF,
vif: inboundVIF,
multicastRouteEntry: multicastRouteEntry{
group: group.String(),
src: src.String(),
updatedTime: time.Now(),
},
}
c.inboundRouteCache.Add(routeEntry)
return nil
}

// Field pktCount and updatedTime are used for removing stale multicast routes.
type multicastRouteEntry struct {
group string
src string
pktCount uint32
updatedTime time.Time
}

// outboundMulticastRouteEntry encodes the outbound multicast routing entry.
// For example,
//
// type inboundMulticastRouteEntry struct {
// group "226.94.9.9"
// src "10.0.0.55"
// } encodes the multicast route entry from Antrea gateway to multicast interfaces
//
// (10.0.0.55,226.94.9.9) Iif: antrea-gw0 Oifs: list of multicastInterfaces.
//
// The iif is always Antrea gateway and oifs are always outbound interfaces
// so we do not put them in the struct.
type outboundMulticastRouteEntry struct {
multicastRouteEntry
}

// inboundMulticastRouteEntry encodes the inbound multicast routing entry.
// It has extra field Iif to represent inbound interface VIF.
// For example,
//
// type inboundMulticastRouteEntry struct {
Expand All @@ -201,16 +257,20 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI
// (10.0.0.55,226.94.9.9) Iif: wlan0 Oifs: antrea-gw0.
// The oif is always Antrea gateway so we do not put it in the struct.
type inboundMulticastRouteEntry struct {
group string
src string
vif uint16
multicastRouteEntry
vif uint16
}

func getMulticastInboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*inboundMulticastRouteEntry)
return entry.group + "/" + entry.src + "/" + fmt.Sprint(entry.vif), nil
}

func getMulticastOutboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*outboundMulticastRouteEntry)
return entry.group + "/" + entry.src, nil
}

func inboundGroupIndexFunc(obj interface{}) ([]string, error) {
entry, ok := obj.(*inboundMulticastRouteEntry)
if !ok {
Expand Down Expand Up @@ -277,10 +337,12 @@ type RouteInterface interface {
MulticastInterfaceLeaveMgroup(mgroup net.IP, ifaceIP net.IP, ifaceName string) error
// AddMrouteEntry adds multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif) and outbound multicast interfaces(oifs).
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) (err error)
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) error
// GetMroutePacketCount returns the number of routed packets by the multicast route entry.
GetMroutePacketCount(src net.IP, group net.IP) (uint32, error)
// DelMrouteEntry deletes multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif).
DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error)
DelMrouteEntry(src net.IP, group net.IP, iif uint16) error
// FlushMRoute flushes static multicast routing entries.
FlushMRoute()
// GetFD returns socket file descriptor.
Expand Down
68 changes: 68 additions & 0 deletions pkg/agent/multicast/mcast_route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import (
"fmt"
"net"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/util/runtime"
)

const (
mRouteTimeout = time.Minute * 10
)

// parseIGMPMsg parses the kernel version into parsedIGMPMsg. Note we need to consider the change
// after linux 5.9 in the igmpmsg struct when parsing vif. Please check
// https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e.
Expand Down Expand Up @@ -78,10 +84,72 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) {
}
}()

// Check packet count difference every minute for each multicast route and
// remove ones that do not route any packets in past mRouteTimeout.
// The remaining multicast routes' statistics are getting updated by
// this process as well.
go wait.NonSlidingUntil(c.updateMrouteStats, time.Minute, stopCh)

for i := 0; i < int(workerCount); i++ {
go c.worker(stopCh)
}
<-stopCh
c.socket.FlushMRoute()
syscall.Close(c.socket.GetFD())
}

func (c *MRouteClient) updateMulticastRouteStatsEntry(entry *multicastRouteEntry) (isDeleted bool, newEntry *multicastRouteEntry) {
packetCount, err := c.socket.GetMroutePacketCount(net.ParseIP(entry.src), net.ParseIP(entry.group))
if err != nil {
klog.ErrorS(err, "Failed to get packet count for multicast route", "route", entry)
return false, nil
}
packetCountDiff := packetCount - entry.pktCount
klog.V(4).Infof("Multicast route %v routes %d packets in last %s", entry, packetCountDiff, time.Minute)
now := time.Now()
if packetCountDiff == uint32(0) {
return now.Sub(entry.updatedTime) > mRouteTimeout, nil
}
newEntry = &multicastRouteEntry{group: entry.group, src: entry.src, pktCount: packetCount, updatedTime: now}
return false, newEntry
}

func (c *MRouteClient) updateInboundMrouteStats() {
for _, obj := range c.inboundRouteCache.List() {
entry := obj.(*inboundMulticastRouteEntry)
isDeleted, newEntry := c.updateMulticastRouteStatsEntry(&entry.multicastRouteEntry)
if isDeleted {
klog.V(2).InfoS("Deleting stale inbound multicast route", "group", entry.group, "source", entry.src, "VIF", entry.vif)
err := c.deleteInboundMRoute(entry)
if err != nil {
klog.ErrorS(err, "Failed to delete inbound multicast route", "group", entry.group, "source", entry.src, "VIF", entry.vif)
}
} else if newEntry != nil {
newInboundEntry := inboundMulticastRouteEntry{*newEntry, entry.vif}
c.inboundRouteCache.Update(&newInboundEntry)
}
}
}

func (c *MRouteClient) updateOutboundMrouteStats() {
for _, obj := range c.outboundRouteCache.List() {
entry := obj.(*outboundMulticastRouteEntry)
isDeleted, newEntry := c.updateMulticastRouteStatsEntry(&entry.multicastRouteEntry)
if isDeleted {
klog.V(2).InfoS("Deleting stale outbound multicast route", "group", entry.group, "source", entry.src)
err := c.deleteOutboundMRoute(entry)
if err != nil {
klog.ErrorS(err, "Failed to delete outbound multicast route", "group", entry.group, "source", entry.src)
}
} else if newEntry != nil {
newOutboundEntry := outboundMulticastRouteEntry{*newEntry}
c.outboundRouteCache.Update(&newOutboundEntry)
}
}
}

func (c *MRouteClient) updateMrouteStats() {
klog.V(2).InfoS("Updating multicast route statistics and removing stale multicast routes")
c.updateInboundMrouteStats()
c.updateOutboundMrouteStats()
}
Loading

0 comments on commit 30b1c69

Please sign in to comment.