Skip to content

Commit

Permalink
[Multicast] Support removal of staled outbound multicast routes
Browse files Browse the repository at this point in the history
Check packet count difference every 10 minutes for each
outbound multicast route and remove ones that have
identical packet count between two checks.

Signed-off-by: ceclinux <[email protected]>
  • Loading branch information
ceclinux committed Mar 16, 2022
1 parent 871806d commit e249a80
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 1 deletion.
41 changes: 41 additions & 0 deletions pkg/agent/multicast/mcast_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"fmt"
"net"
"strings"
"sync"
"time"

"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
Expand All @@ -38,6 +40,7 @@ func newRouteClient(nodeconfig *config.NodeConfig, groupCache cache.Indexer, mul
nodeConfig: nodeconfig,
groupCache: groupCache,
inboundRouteCache: cache.NewIndexer(getMulticastInboundEntryKey, cache.Indexers{GroupNameIndexName: inboundGroupIndexFunc}),
mRouteStatsEntry: cache.NewIndexer(getMulticastRouteStatsKey, cache.Indexers{}),
multicastInterfaces: multicastInterfaces.List(),
socket: multicastSocket,
}
Expand Down Expand Up @@ -72,6 +75,8 @@ type MRouteClient struct {
nodeConfig *config.NodeConfig
multicastInterfaces []string
inboundRouteCache cache.Indexer
inboundRouteCacheMutex sync.RWMutex
mRouteStatsEntry cache.Indexer
groupCache cache.Indexer
socket RouteInterface
multicastInterfaceConfigs []multicastInterfaceConfig
Expand Down Expand Up @@ -152,11 +157,27 @@ func (c *MRouteClient) deleteInboundMrouteEntryByGroup(group net.IP) (err error)
if err != nil {
return err
}
c.inboundRouteCacheMutex.Lock()
c.inboundRouteCache.Delete(route)
routeStats := &multicastRouteStatsEntry{
group: entry.group,
src: entry.src,
}
c.mRouteStatsEntry.Delete(routeStats)
c.inboundRouteCacheMutex.Unlock()
}
return nil
}

func (c *MRouteClient) addRouteStats(src net.IP, group net.IP) {
routeStats := &multicastRouteStatsEntry{
group: group.String(),
src: src.String(),
createdTime: time.Now(),
}
c.mRouteStatsEntry.Add(routeStats)
}

// addOutboundMrouteEntry configures multicast route from Antrea gateway to all the multicast interfaces,
// allowing multicast sender Pods to send multicast traffic to external.
func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err error) {
Expand All @@ -165,6 +186,7 @@ func (c *MRouteClient) addOutboundMrouteEntry(src net.IP, group net.IP) (err err
if err != nil {
return err
}
c.addRouteStats(src, group)
return nil
}

Expand All @@ -181,7 +203,10 @@ func (c *MRouteClient) addInboundMrouteEntry(src net.IP, group net.IP, inboundVI
src: src.String(),
vif: inboundVIF,
}
c.inboundRouteCacheMutex.Lock()
c.inboundRouteCache.Add(routeEntry)
c.addRouteStats(src, group)
c.inboundRouteCacheMutex.Unlock()
return nil
}

Expand All @@ -200,11 +225,24 @@ type inboundMulticastRouteEntry struct {
vif uint16
}

// multicastRouteStatsEntry encodes the multicast routing entry statistics.
type multicastRouteStatsEntry struct {
group string
src string
pktCount uint32
createdTime time.Time
}

func getMulticastInboundEntryKey(obj interface{}) (string, error) {
entry := obj.(*inboundMulticastRouteEntry)
return entry.group + "/" + entry.src + "/" + fmt.Sprint(entry.vif), nil
}

func getMulticastRouteStatsKey(obj interface{}) (string, error) {
entry := obj.(*multicastRouteStatsEntry)
return entry.group + "/" + entry.src, nil
}

func inboundGroupIndexFunc(obj interface{}) ([]string, error) {
entry, ok := obj.(*inboundMulticastRouteEntry)
if !ok {
Expand Down Expand Up @@ -272,6 +310,9 @@ type RouteInterface interface {
// AddMrouteEntry adds multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif) and outbound multicast interfaces(oifs).
AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifs []uint16) (err error)
// GetMroutePacketCount returns number of routed packets by a multicast route
// identified by source and group.
GetMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error)
// DelMrouteEntry deletes multicast route with specified source(src), multicast group IP(group),
// inbound multicast interface(iif).
DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error)
Expand Down
39 changes: 39 additions & 0 deletions pkg/agent/multicast/mcast_route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@ import (
"fmt"
"net"
"syscall"
"time"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/util/runtime"
)

const (
mRouteStaledTimeout = time.Minute
)

// parseIGMPMsg parses the kernel version into parsedIGMPMsg. Note we need to consider the change
// after linux 5.9 in the igmpmsg struct when parsing vif. Please check
// https://github.com/torvalds/linux/commit/c8715a8e9f38906e73d6d78764216742db13ba0e.
Expand Down Expand Up @@ -71,10 +77,43 @@ func (c *MRouteClient) run(stopCh <-chan struct{}) {
}
}()

go wait.NonSlidingUntil(c.updateMrouteStats, mRouteStaledTimeout, stopCh)

for i := 0; i < int(workerCount); i++ {
go c.worker(stopCh)
}
<-stopCh
c.socket.FlushMRoute()
syscall.Close(c.socket.GetFD())
}

func (c *MRouteClient) updateMrouteStats() {
klog.V(2).InfoS("Updating multicast route statistics and removing staled routes")
deletedRoutes := make([]*multicastRouteStatsEntry, 0)
now := time.Now()
for _, obj := range c.mRouteStatsEntry.List() {
mrouteStatsEntry, _ := obj.(*multicastRouteStatsEntry)
packetCount, err := c.socket.GetMroutePacketCount(net.ParseIP(mrouteStatsEntry.src).To4(), net.ParseIP(mrouteStatsEntry.group).To4())
if err != nil {
klog.ErrorS(err, "Failed to get packet count for multicast route", "source", mrouteStatsEntry.src, "group", mrouteStatsEntry.group)
return
}
packetCountDiff := packetCount - mrouteStatsEntry.pktCount
klog.V(4).Infof("Multicast route %v routes %d packets in last %s", mrouteStatsEntry, packetCountDiff, mRouteStaledTimeout.String())
if packetCountDiff == uint32(0) && now.Sub(mrouteStatsEntry.createdTime) > mRouteStaledTimeout {
deletedRoutes = append(deletedRoutes, mrouteStatsEntry)
} else {
mrouteStatsEntry.pktCount = packetCount
c.mRouteStatsEntry.Update(mrouteStatsEntry)
}
}
for _, mRouteStatsEntry := range deletedRoutes {
klog.InfoS("Deleting staled multicast route", "group", mRouteStatsEntry.group, "source", mRouteStatsEntry.src)
err := c.socket.DelMrouteEntry(net.ParseIP(mRouteStatsEntry.src).To4(), net.ParseIP(mRouteStatsEntry.group).To4(), c.internalInterfaceVIF)
if err != nil {
klog.ErrorS(err, "Failed to delete multicast route", "group", mRouteStatsEntry.group, "source", mRouteStatsEntry.src)
return
}
c.mRouteStatsEntry.Delete(mRouteStatsEntry)
}
}
50 changes: 50 additions & 0 deletions pkg/agent/multicast/mcast_route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -64,6 +65,55 @@ func TestParseIGMPMsg(t *testing.T) {
}
}

func TestUpdateOutboundMrouteStats(t *testing.T) {
mRoute := newMockMulticastRouteClient(t)
err := mRoute.initialize(t)
assert.Nil(t, err)
now := time.Now()
for _, m := range []struct {
outboundMrouteEntry *multicastRouteStatsEntry
isStaled bool
currStats uint32
}{
{
outboundMrouteEntry: &multicastRouteStatsEntry{
group: "224.3.5.7",
src: "10.1.2.3",
createdTime: now,
},
isStaled: false,
currStats: 0,
},
{
outboundMrouteEntry: &multicastRouteStatsEntry{
group: "224.3.5.8",
src: "10.1.2.3",
createdTime: now.Add(time.Duration(-mRouteStaledTimeout)),
pktCount: 10,
},
isStaled: false,
currStats: 9,
},
{
outboundMrouteEntry: &multicastRouteStatsEntry{
group: "224.3.5.9",
src: "10.1.2.3",
createdTime: now.Add(time.Duration(-mRouteStaledTimeout)),
pktCount: 0,
},
isStaled: true,
currStats: 0,
},
} {
mRoute.mRouteStatsEntry.Add(m.outboundMrouteEntry)
mockMulticastSocket.EXPECT().GetMroutePacketCount(net.ParseIP(m.outboundMrouteEntry.src).To4(), net.ParseIP(m.outboundMrouteEntry.group).To4()).Times(1).Return(m.currStats, nil)
if m.isStaled {
mockMulticastSocket.EXPECT().DelMrouteEntry(net.ParseIP(m.outboundMrouteEntry.src).To4(), net.ParseIP(m.outboundMrouteEntry.group).To4(), uint16(0)).Times(1)
}
}
mRoute.updateMrouteStats()
}

func TestProcessIGMPNocacheMsg(t *testing.T) {
mRoute := newMockMulticastRouteClient(t)
err := mRoute.initialize(t)
Expand Down
12 changes: 12 additions & 0 deletions pkg/agent/multicast/mcast_socket_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs []
return multicastsyscall.SetsockoptMfcctl(s.GetFD(), syscall.IPPROTO_IP, multicastsyscall.MRT_ADD_MFC, mc)
}

func (s *Socket) GetMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error) {
siocSgReq := multicastsyscall.SiocSgReq{
Src: [4]byte{src[0], src[1], src[2], src[3]},
Grp: [4]byte{group[0], group[1], group[2], group[3]},
}
stats, err := multicastsyscall.IoctlGetSiocSgReq(s.GetFD(), &siocSgReq)
if err != nil {
return 0, err
}
return stats.Pktcnt, nil
}

func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) {
mc := &multicastsyscall.Mfcctl{}
origin := src.To4()
Expand Down
4 changes: 4 additions & 0 deletions pkg/agent/multicast/mcast_socket_others.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (s *Socket) AddMrouteEntry(src net.IP, group net.IP, iif uint16, oifVIFs []
return nil
}

func (s *Socket) GetoutboundMroutePacketCount(src net.IP, group net.IP) (pktCount uint32, err error) {
return 0, nil
}

func (s *Socket) DelMrouteEntry(src net.IP, group net.IP, iif uint16) (err error) {
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/agent/multicast/testing/mock_multicast.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pkg/agent/util/syscall/linux/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ const (

type Mfcctl C.struct_mfcctl
type Vifctl C.struct_vifctl_with_ifindex
type SiocSgReq C.struct_siocsgreq

const SizeofMfcctl = C.sizeof_struct_mfcctl
const SizeofVifctl = C.sizeof_struct_vifctl_with_ifindex
Expand Down
17 changes: 16 additions & 1 deletion pkg/agent/util/syscall/syscall_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package syscall

import (
"runtime"
"syscall"
"unsafe"
)
Expand All @@ -34,7 +35,15 @@ func setsockopt(s int, level int, name int, val unsafe.Pointer, vallen uintptr)
return
}

// Please add your wrapped syscall functions below
func ioctl(fd int, req uint, arg uintptr) (err error) {
_, _, e1 := syscall.Syscall(syscall.SYS_IOCTL, uintptr(fd), uintptr(req), uintptr(arg))
if e1 != 0 {
return e1
}
return
}

// Please add your wrapped syscall functions below.

func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error {
return setsockopt(fd, level, opt, unsafe.Pointer(mfcctl), SizeofMfcctl)
Expand All @@ -43,3 +52,9 @@ func SetsockoptMfcctl(fd, level, opt int, mfcctl *Mfcctl) error {
func SetsockoptVifctl(fd, level, opt int, vifctl *Vifctl) error {
return setsockopt(fd, level, opt, unsafe.Pointer(vifctl), SizeofVifctl)
}

func IoctlGetSiocSgReq(fd int, siocsgreq *SiocSgReq) (*SiocSgReq, error) {
err := ioctl(fd, SIOCGETSGCNT, uintptr(unsafe.Pointer(siocsgreq)))
runtime.KeepAlive(siocsgreq)
return siocsgreq, err
}
9 changes: 9 additions & 0 deletions pkg/agent/util/syscall/ztypes_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
MRT_INIT = 0xc8
MRT_FLUSH = 0xd4
MAXVIFS = 0x20
SIOCGETSGCNT = 0x89e1
)

type Mfcctl struct {
Expand All @@ -48,6 +49,14 @@ type Vifctl struct {
Rmt_addr [4]byte /* in_addr */
}

type SiocSgReq = struct {
Src [4]byte /* in_addr */
Grp [4]byte /* in_addr */
Pktcnt uint32
Bytecnt uint32
If uint32
}

const SizeofMfcctl = 0x3c
const SizeofVifctl = 0x10
const SizeofIgmpmsg = 0x14

0 comments on commit e249a80

Please sign in to comment.