Skip to content

Commit

Permalink
[Bugfix] Install Multicast related iptables rules only on IPv4 cluster (
Browse files Browse the repository at this point in the history
#6123)

Add a pre-check on the Multicast feature gate status with IPv6-only cluster
settings in agent Initializer, and install the iptables rules only in the IPv4
related chains.

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd authored Mar 28, 2024
1 parent a585238 commit fcfbe9d
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 22 deletions.
4 changes: 3 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,9 @@ func run(o *Options) error {
validator,
networkConfig.TrafficEncapMode.SupportsEncap(),
nodeInformer,
enableBridgingMode)
enableBridgingMode,
v4Enabled,
v6Enabled)
if err := mcastController.Initialize(); err != nil {
return err
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package multicast

import (
"fmt"
"net"
"sync"
"time"
Expand Down Expand Up @@ -265,6 +266,13 @@ type Controller struct {
installedNodes sets.Set[string]
encapEnabled bool
flexibleIPAMEnabled bool
// ipv4Enabled is the flag that if it is running on IPv4 cluster. An error is returned if IPv4Enabled is false
// in Initialize as Multicast does not support IPv6 for now.
// TODO: remove this flag after IPv6 is supported in Multicast.
ipv4Enabled bool
// ipv6Enabled is the flag that if it is running on IPv6 cluster.
// TODO: remove this flag after IPv6 is supported in Multicast.
ipv6Enabled bool
}

func NewMulticastController(ofClient openflow.Client,
Expand All @@ -279,7 +287,9 @@ func NewMulticastController(ofClient openflow.Client,
validator types.McastNetworkPolicyController,
isEncap bool,
nodeInformer coreinformers.NodeInformer,
enableFlexibleIPAM bool) *Controller {
enableFlexibleIPAM bool,
ipv4Enabled bool,
ipv6Enabled bool) *Controller {
eventCh := make(chan *mcastGroupEvent, workerCount)
groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, igmpQueryVersions, validator, isEncap)
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
Expand All @@ -303,6 +313,8 @@ func NewMulticastController(ofClient openflow.Client,
queryGroupId: v4GroupAllocator.Allocate(),
encapEnabled: isEncap,
flexibleIPAMEnabled: enableFlexibleIPAM,
ipv4Enabled: ipv4Enabled,
ipv6Enabled: ipv6Enabled,
}
if isEncap {
c.nodeGroupID = v4GroupAllocator.Allocate()
Expand Down Expand Up @@ -331,6 +343,11 @@ func NewMulticastController(ofClient openflow.Client,
}

func (c *Controller) Initialize() error {
if !c.ipv4Enabled {
return fmt.Errorf("Multicast is not supported on an IPv6-only cluster")
} else if c.ipv6Enabled {
klog.InfoS("Multicast only works with IPv4 traffic on a dual-stack cluster")
}
err := c.mRouteClient.Initialize()
if err != nil {
return err
Expand Down
72 changes: 56 additions & 16 deletions pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func TestAddGroupMemberStatus(t *testing.T) {
iface: if1,
}
mctrl := newMockMulticastController(t, false, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{
{Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}},
}
Expand All @@ -115,7 +115,7 @@ func TestAddGroupMemberStatus(t *testing.T) {

func TestUpdateGroupMemberStatus(t *testing.T) {
mctrl := newMockMulticastController(t, false, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
assert.NoError(t, err)
mgroup := net.ParseIP("224.96.1.4")
event := &mcastGroupEvent{
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestUpdateGroupMemberStatus(t *testing.T) {

func TestCheckNodeUpdate(t *testing.T) {
mockController := newMockMulticastController(t, false, false)
err := mockController.initialize(t)
err := mockController.initialize()
require.NoError(t, err)

for _, tc := range []struct {
Expand Down Expand Up @@ -349,7 +349,7 @@ func TestGetGroupPods(t *testing.T) {
now := time.Now()

mctrl := newMockMulticastController(t, false, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)
groupMemberStatuses := []*GroupMemberStatus{
{
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestGetGroupPods(t *testing.T) {

func TestGetPodStats(t *testing.T) {
mctrl := newMockMulticastController(t, false, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)

iface := if1
Expand All @@ -402,7 +402,7 @@ func TestGetPodStats(t *testing.T) {

func TestGetAllPodStats(t *testing.T) {
mctrl := newMockMulticastController(t, false, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)

for _, tc := range []struct {
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestGetAllPodStats(t *testing.T) {
func TestClearStaleGroupsCreatingLeaveEvent(t *testing.T) {
mctrl := newMockMulticastController(t, false, false)
workerCount = 1
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)
now := time.Now()
staleTime := now.Add(-mctrl.mcastGroupTimeout - time.Second)
Expand Down Expand Up @@ -483,7 +483,7 @@ func TestClearStaleGroupsCreatingLeaveEvent(t *testing.T) {
func TestClearStaleGroups(t *testing.T) {
mctrl := newMockMulticastController(t, false, false)
workerCount = 1
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)
mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{
{Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}},
Expand Down Expand Up @@ -734,13 +734,13 @@ func TestProcessPacketIn(t *testing.T) {
func TestEncapModeInitialize(t *testing.T) {
mockController := newMockMulticastController(t, true, false)
assert.NotZero(t, mockController.nodeGroupID)
err := mockController.initialize(t)
err := mockController.initialize()
assert.NoError(t, err)
}

func TestEncapLocalReportAndNotifyRemote(t *testing.T) {
mockController := newMockMulticastController(t, true, false)
_ = mockController.initialize(t)
_ = mockController.initialize()
mockController.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{
{Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}},
}
Expand Down Expand Up @@ -942,7 +942,7 @@ func TestNodeUpdate(t *testing.T) {

func TestMemberChanged(t *testing.T) {
mockController := newMockMulticastController(t, false, false)
_ = mockController.initialize(t)
_ = mockController.initialize()

containerA := &interfacestore.ContainerInterfaceConfig{PodNamespace: "nameA", PodName: "podA", ContainerID: "tttt"}
containerB := &interfacestore.ContainerInterfaceConfig{PodNamespace: "nameA", PodName: "podB", ContainerID: "mmmm"}
Expand Down Expand Up @@ -1085,7 +1085,7 @@ func TestConcurrentEventHandlerAndWorkers(t *testing.T) {

func TestRemoteMemberJoinLeave(t *testing.T) {
mockController := newMockMulticastController(t, true, false)
_ = mockController.initialize(t)
_ = mockController.initialize()
stopCh := make(chan struct{})
defer close(stopCh)

Expand Down Expand Up @@ -1255,17 +1255,58 @@ func newMockMulticastController(t *testing.T, isEncap bool, enableFlexibleIPAM b
clientset = fake.NewSimpleClientset()
informerFactory = informers.NewSharedInformerFactory(clientset, 12*time.Hour)
nodeInformer := informerFactory.Core().V1().Nodes()
mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM)
mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM, true, false)
return mctrl
}

func TestFlexibleIPAMModeInitialize(t *testing.T) {
mockController := newMockMulticastController(t, false, true)
err := mockController.initialize(t)
err := mockController.initialize()
assert.NoError(t, err)
}

func (c *Controller) initialize(t *testing.T) error {
func TestMulticastControllerOnIPv6Cluster(t *testing.T) {
for _, tc := range []struct {
name string
ipv4Enabled bool
ipv6Enabled bool
expErr string
}{
{
name: "Fails on IPv6-only cluster",
ipv4Enabled: false,
ipv6Enabled: true,
expErr: "Multicast is not supported on an IPv6-only cluster",
},
{
name: "Succeeds on dual-stack cluster",
ipv4Enabled: true,
ipv6Enabled: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
mockController := newMockMulticastController(t, true, false)
mockController.ipv4Enabled = tc.ipv4Enabled
mockController.ipv6Enabled = tc.ipv6Enabled
if tc.expErr == "" {
mockController.initMocks()
}
err := mockController.Initialize()
if tc.expErr != "" {
assert.EqualError(t, err, tc.expErr)
} else {
assert.NoError(t, err)
}
})
}
}

func (c *Controller) initialize() error {
c.initMocks()
return c.Initialize()
}

func (c *Controller) initMocks() {
mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1)
mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any())
mockIfaceStore.EXPECT().GetInterfacesByType(interfacestore.InterfaceType(0)).Times(1).Return([]*interfacestore.InterfaceConfig{})
Expand All @@ -1278,7 +1319,6 @@ func (c *Controller) initialize(t *testing.T) error {
if c.flexibleIPAMEnabled {
mockOFClient.EXPECT().InstallMulticastFlexibleIPAMFlows().Times(1)
}
return c.Initialize()
}

func createInterface(name string, ofport uint32) *interfacestore.InterfaceConfig {
Expand Down
11 changes: 8 additions & 3 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,8 @@ type Client struct {
nodePortsIPv6 sync.Map
// clusterNodeIPs stores the IPv4 of all other Nodes in the cluster
clusterNodeIPs sync.Map
// clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster
// clusterNodeIP6s stores the IPv6 address of all other Nodes in the cluster. It is maintained but not consumed
// until Multicast supports IPv6.
clusterNodeIP6s sync.Map
// egressRoutes caches ip routes about Egresses.
egressRoutes sync.Map
Expand Down Expand Up @@ -709,7 +710,9 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet,
}...)
}

if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() {
// Note: Multicast can only work with IPv4 for now. Remove condition "!isIPv6" in the future after
// IPv6 is supported.
if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsEncap() {
// Drop the multicast packets forwarded from other Nodes in the cluster. This is because
// the packet sent out from the sender Pod is already received via tunnel port with encap mode,
// and the one forwarded via the underlay network is to send to external receivers
Expand Down Expand Up @@ -832,7 +835,9 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet,
writeLine(iptablesData, iptables.MakeChainLine(antreaPostRoutingChain))
// The masqueraded multicast traffic will become unicast so we
// stop traversing this antreaPostRoutingChain for multicast traffic.
if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsNoEncap() {
// Note: Multicast can only work with IPv4 for now. Remove condition "!isIPv6" in the future after
// IPv6 is supported.
if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsNoEncap() {
writeLine(iptablesData, []string{
"-A", antreaPostRoutingChain,
"-m", "comment", "--comment", `"Antrea: skip masquerade for multicast traffic"`,
Expand Down
1 change: 0 additions & 1 deletion pkg/agent/route/route_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,6 @@ COMMIT
:ANTREA-OUTPUT - [0:0]
-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK
-A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK
-A ANTREA-PREROUTING -m comment --comment "Antrea: drop Pod multicast traffic forwarded via underlay network" -m set --match-set CLUSTER-NODE-IP6 src -d 224.0.0.0/4 -j DROP
COMMIT
*mangle
:ANTREA-MANGLE - [0:0]
Expand Down

0 comments on commit fcfbe9d

Please sign in to comment.