Skip to content

Commit

Permalink
[Multicast] Support removal of staled multicast routes
Browse files Browse the repository at this point in the history
Check packet count difference every minute
for each multicast route and remove ones that have
identical packet count in past mRouteTimeout.

Signed-off-by: ceclinux <[email protected]>
  • Loading branch information
ceclinux committed Apr 25, 2022
1 parent 79414ec commit c3785c7
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 10 deletions.
2 changes: 2 additions & 0 deletions pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,9 @@ func (c *Controller) syncGroup(groupKey string) error {
klog.ErrorS(err, "Failed to uninstall multicast flows", "group", groupKey)
return err
}
c.installedGroupsMutex.Lock()
err := c.mRouteClient.deleteInboundMrouteEntryByGroup(status.group)
c.installedGroupsMutex.Unlock()
if err != nil {
klog.ErrorS(err, "Cannot delete multicast group", "group", groupKey)
return err
Expand Down
73 changes: 64 additions & 9 deletions pkg/agent/multicast/mcast_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"fmt"
"net"
"strings"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
Expand All @@ -38,6 +40,7 @@ func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, mul
nodeConfig: nodeconfig,
groupCache: groupCache,
inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}),
outboundRouteCache: cache.NewIndexer(getMulticastOutboundEntryKey, cache.Indexers{}),
multicastInterfaces: multicastInterfaces.List(),
socket: multicastSocket,
}
Expand Down Expand Up @@ -72,6 +75,8 @@ type MRouteClient struct {
nodeConfig *config.NodeConfig
multicastInterfaces []string
inboundRouteCache cache.Indexer
inboundRouteCacheMutex sync.RWMutex
outboundRouteCache cache.Indexer
groupCache cache.Indexer
socket RouteInterface
multicastInterfaceConfigs []multicastInterfaceConfig
Expand Down Expand Up @@ -148,15 +153,32 @@ func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error)
mEntries, _ := c.inboundRouteCache.ByIndex(GroupNameIndexName, group.String())
for _, route := range mEntries {
entry := route.(*inboundMulticastRouteEntry)
err := c.socket.DelMrouteEntry(net.ParseIP(entry.src), net.ParseIP(entry.group), entry.vif)
err := c.deleteInboundMRoute(entry)
if err != nil {
return err
}
c.inboundRouteCache.Delete(route)
}
return nil
}

func (c *MRouteClient) deleteInboundMRoute(mRoute *inboundMulticastRouteEntry) (err error) {
err = c.socket.DelMrouteEntry(net.ParseIP(mRoute.src).To4(), net.ParseIP(mRoute.group).To4(), mRoute.vif)
if err != nil {
return err
}
c.inboundRouteCache.Delete(mRoute)
return nil
}

func (c *MRouteClient) deleteOutboundMRoute(mRoute *outboundMulticastRouteEntry) (err error) {
err = c.socket.DelMrouteEntry(net.ParseIP(mRoute.src).To4(), net.ParseIP(mRoute.group).To4(), c.internalInterfaceVIF)
if err != nil {
return err
}
c.outboundRouteCache.Delete(mRoute)
return nil
}

// addOutboundMrouteEntry configures multicast route from Antrea gateway to all the multicast interfaces,
// allowing multicast sender Pods to send multicast traffic to external.
func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err error) {
Expand All @@ -165,6 +187,11 @@ func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err err
if err != nil {
return err
}
routeEntry := &outboundMulticastRouteEntry{}
routeEntry.group = group.String()
routeEntry.src = src.String()
routeEntry.updatedTime = time.Now()
c.outboundRouteCache.Add(routeEntry)
return nil
}

Expand All @@ -177,34 +204,60 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI
return err
}
routeEntry := &inboundMulticastRouteEntry{
group: group.String(),
src: src.String(),
vif: inboundVIF,
vif: inboundVIF,
}
routeEntry.group = group.String()
routeEntry.src = src.String()
routeEntry.updatedTime = time.Now()
c.inboundRouteCache.Add(routeEntry)
return nil
}

type multicastRouteEntry struct {
group string
src string
pktCount uint32
updatedTime time.Time
}

// outboundMulticastRouteEntry encodes the outbound multicast routing entry.
// For example,
// type inboundMulticastRouteEntry struct {
// group "226.94.9.9"
// src "10.0.0.55"
// } encodes the multicast route entry from Antrea gateway to multicast interfaces
// (10.0.0.55,226.94.9.9) Iif: antrea-gw0 Oifs: list of multicastInterfaces.
// The iif is always Antrea gateway and oifs are always outbound interfaces
// so we do not put them in the struct.
// Field pktCount and updatedTime are used for removing staled multicast routes.
type outboundMulticastRouteEntry struct {
multicastRouteEntry
}

// inboundMulticastRouteEntry encodes the inbound multicast routing entry.
// It has extra field Iif to represent inbound interface VIF.
// For example,
// type inboundMulticastRouteEntry struct {
// group "226.94.9.9"
// src "10.0.0.55"
// vif vif of wlan0
// } encodes the multicast route entry from wlan0 to Antrea gateway
// (10.0.0.55,226.94.9.9) Iif: wlan0 Oifs: antrea-gw0.
// The oif is always Antrea gateway so we do not put it in the struct.
type inboundMulticastRouteEntry struct {
group string
src string
vif uint16
multicastRouteEntry
vif uint16
}

func getMulticastInboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*inboundMulticastRouteEntry)
return entry.group + "/" + entry.src + "/" + fmt.Sprint(entry.vif), nil
}

func getMulticastOutboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*outboundMulticastRouteEntry)
return entry.group + "/" + entry.src, nil
}

func inboundGroupIndexFunc(obj interface{}) ([]string, error) {
entry, ok := obj.(*inboundMulticastRouteEntry)
if !ok {
Expand Down Expand Up @@ -272,6 +325,8 @@ type RouteInterface interface {
// AddMrouteEntry adds multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif) and outbound multicast interfaces(oifs).
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) (err error)
// GetMroutePacketCount returns number of routed packets by multicast route entry.
GetMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error)
// DelMrouteEntry deletes multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif).
DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error)
Expand Down
77 changes: 77 additions & 0 deletions pkg/agent/multicast/mcast_route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import (
"fmt"
"net"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/util/runtime"
)

const (
mRouteTimeout = time.Minute * 10
)

// parseIGMPMsg parses the kernel version into parsedIGMPMsg. Note we need to consider the change
// after linux 5.9 in the igmpmsg struct when parsing vif. Please check
// https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e.
Expand Down Expand Up @@ -71,10 +77,81 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) {
}
}()

// Check packet count difference every minute
// for each multicast route and remove ones that does not route any packets
// in past mRouteTimeout. The remaining
// multicast routes' statistics are getting updated by this process as well.
go wait.NonSlidingUntil(c.updateMrouteStats, time.Minute, stopCh)

for i := 0; i < int(workerCount); i++ {
go c.worker(stopCh)
}
<-stopCh
c.socket.FlushMRoute()
syscall.Close(c.socket.GetFD())
}

func (c *MRouteClient) updateMulticastRouteStatsEntry(entry *multicastRouteEntry) (isUpdated bool) {
packetCount, err := c.socket.GetMroutePacketCount(net.ParseIP(entry.src).To4(), net.ParseIP(entry.group).To4())
if err != nil {
klog.ErrorS(err, "Failed to get packet count for outbound multicast route", "outboundRoute", entry)
return
}
packetCountDiff := packetCount - entry.pktCount
now := time.Now()
klog.V(4).Infof("Outbound multicast route %v routes %d packets in last %s", entry, packetCountDiff, mRouteTimeout.String())
if packetCountDiff == uint32(0) && now.Sub(entry.updatedTime) > mRouteTimeout {
return true
}
entry.pktCount = packetCount
entry.updatedTime = now
return false
}

func (c *MRouteClient) updateInboundMrouteStats() {
deletedInboundRoutes := make([]*inboundMulticastRouteEntry, 0)
for _, obj := range c.inboundRouteCache.List() {
entry := obj.(*inboundMulticastRouteEntry)
if c.updateMulticastRouteStatsEntry(&entry.multicastRouteEntry) {
deletedInboundRoutes = append(deletedInboundRoutes, entry)
} else {
c.inboundRouteCache.Update(entry)
}
}
for _, inboundRoute := range deletedInboundRoutes {
klog.V(2).InfoS("Deleting staled inbound multicast route", "group", inboundRoute.group, "source", inboundRoute.src)
err := c.deleteInboundMRoute(inboundRoute)
if err != nil {
klog.ErrorS(err, "Failed to delete inbound multicast route", "group", inboundRoute.group, "source", inboundRoute.src, "VIF", inboundRoute.vif)
return
}
}
}

func (c *MRouteClient) updateOutboundMrouteStats() {
deletedOutboundRoutes := make([]*outboundMulticastRouteEntry, 0)
for _, obj := range c.outboundRouteCache.List() {
entry := obj.(*outboundMulticastRouteEntry)
if c.updateMulticastRouteStatsEntry(&entry.multicastRouteEntry) {
deletedOutboundRoutes = append(deletedOutboundRoutes, entry)
} else {
c.outboundRouteCache.Update(entry)
}
}
for _, outboundRoute := range deletedOutboundRoutes {
klog.V(2).InfoS("Deleting staled outbound multicast route", "group", outboundRoute.group, "source", outboundRoute.src)
err := c.deleteOutboundMRoute(outboundRoute)
if err != nil {
klog.ErrorS(err, "Failed to delete outbound multicast route", "group", outboundRoute.group, "source", outboundRoute.src)
return
}
}
}

func (c *MRouteClient) updateMrouteStats() {
klog.V(2).InfoS("Updating multicast route statistics and removing staled multicast routes")
c.inboundRouteCacheMutex.Lock()
c.updateInboundMrouteStats()
c.inboundRouteCacheMutex.Unlock()
c.updateOutboundMrouteStats()
}
108 changes: 108 additions & 0 deletions pkg/agent/multicast/mcast_route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -64,6 +65,113 @@ func TestParseIGMPMsg(t *testing.T) {
}
}

func TestUpdateOutboundMrouteStats(t *testing.T) {
mRoute := newMockMulticastRouteClient(t)
err := mRoute.initialize(t)
assert.Nil(t, err)
now := time.Now()
for _, m := range []struct {
isStaled bool
currStats uint32
group string
source string
packetCount uint32
createdTime time.Time
}{
{
group: "224.3.5.7",
source: "10.1.2.3",
createdTime: now,
isStaled: false,
currStats: 0,
},
{
group: "224.3.5.8",
source: "10.1.2.4",
createdTime: now.Add(time.Duration(-mRouteTimeout)),
packetCount: 10,
isStaled: false,
currStats: 9,
},
{
group: "224.3.5.9",
source: "10.1.2.5",
createdTime: now.Add(time.Duration(-mRouteTimeout)),
packetCount: 0,
isStaled: true,
currStats: 0,
},
} {
outboundMrouteEntry := &outboundMulticastRouteEntry{}
outboundMrouteEntry.src = m.source
outboundMrouteEntry.group = m.group
outboundMrouteEntry.pktCount = m.packetCount
outboundMrouteEntry.updatedTime = m.createdTime
mRoute.outboundRouteCache.Add(outboundMrouteEntry)
mockMulticastSocket.EXPECT().GetMroutePacketCount(net.ParseIP(m.source).To4(), net.ParseIP(m.group).To4()).Times(1).Return(m.currStats, nil)
if m.isStaled {
mockMulticastSocket.EXPECT().DelMrouteEntry(net.ParseIP(m.source).To4(), net.ParseIP(m.group).To4(), uint16(0)).Times(1)
}
}
mRoute.updateMrouteStats()
}

func TestUpdateInboundMrouteStats(t *testing.T) {
mRoute := newMockMulticastRouteClient(t)
err := mRoute.initialize(t)
assert.Nil(t, err)
now := time.Now()
for _, m := range []struct {
isStaled bool
currPacketCount uint32
vif uint16
group string
source string
packetCount uint32
createdTime time.Time
}{
{
group: "224.3.5.7",
source: "192.168.50.60",
createdTime: now,
isStaled: false,
currPacketCount: 0,
vif: 3,
},
{
group: "224.3.5.8",
source: "192.168.50.61",
createdTime: now.Add(time.Duration(-mRouteTimeout)),
packetCount: 10,
isStaled: false,
currPacketCount: 9,
vif: 4,
},
{
group: "224.3.5.9",
source: "192.168.50.62",
createdTime: now.Add(time.Duration(-mRouteTimeout)),
packetCount: 5,
isStaled: true,
currPacketCount: 5,
vif: 5,
},
} {
inboundMrouteEntry := &inboundMulticastRouteEntry{}
inboundMrouteEntry.src = m.source
inboundMrouteEntry.group = m.group
inboundMrouteEntry.vif = m.vif
inboundMrouteEntry.pktCount = m.packetCount
inboundMrouteEntry.updatedTime = m.createdTime
mRoute.inboundRouteCache.Add(inboundMrouteEntry)
mockMulticastSocket.EXPECT().GetMroutePacketCount(net.ParseIP(m.source).To4(), net.ParseIP(m.group).To4()).Times(1).Return(m.currPacketCount, nil)
if m.isStaled {
mockMulticastSocket.EXPECT().DelMrouteEntry(net.ParseIP(m.source).To4(), net.ParseIP(m.group).To4(), m.vif).Times(1)
}
}
mRoute.updateMrouteStats()
}

func TestProcessIGMPNocacheMsg(t *testing.T) {
mRoute := newMockMulticastRouteClient(t)
err := mRoute.initialize(t)
Expand Down
Loading

0 comments on commit c3785c7

Please sign in to comment.