Skip to content

Commit

Permalink
Do not apply Egress to traffic destined for ServiceCIDRs (#5495)
Browse files Browse the repository at this point in the history
When AntreaProxy is asked to skip some Services or is not running at
all, Pod-to-Service traffic would be forwarded to Egress Node and be
load-balanced remotely, as opposed to locally, which could incur
performance issue and unexpected behaviors.

This patch installs flows to prevent traffic destined for ServiceCIDRs
from being SNAT'd.

Signed-off-by: Quan Tian <[email protected]>
  • Loading branch information
tnqn authored Sep 20, 2023
1 parent 137a493 commit 5faab07
Show file tree
Hide file tree
Showing 10 changed files with 296 additions and 68 deletions.
2 changes: 1 addition & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func run(o *Options) error {
if o.enableEgress {
egressController, err = egress.NewEgressController(
ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName,
memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, o.config.Egress.MaxEgressIPsPerNode,
memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, serviceCIDRProvider, o.config.Egress.MaxEgressIPsPerNode,
)
if err != nil {
return fmt.Errorf("error creating new Egress controller: %v", err)
Expand Down
52 changes: 52 additions & 0 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"antrea.io/antrea/pkg/agent/memberlist"
"antrea.io/antrea/pkg/agent/openflow"
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/servicecidr"
"antrea.io/antrea/pkg/agent/types"
cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
crdv1b1 "antrea.io/antrea/pkg/apis/crd/v1beta1"
Expand Down Expand Up @@ -147,6 +148,11 @@ type EgressController struct {
ipAssigner ipassigner.IPAssigner

egressIPScheduler *egressIPScheduler

serviceCIDRInterface servicecidr.Interface
serviceCIDRUpdateCh chan struct{}
// Declared for testing.
serviceCIDRUpdateRetryDelay time.Duration
}

func NewEgressController(
Expand All @@ -161,6 +167,7 @@ func NewEgressController(
egressInformer crdinformers.EgressInformer,
nodeInformers coreinformers.NodeInformer,
podUpdateSubscriber channel.Subscriber,
serviceCIDRInterface servicecidr.Interface,
maxEgressIPsPerNode int,
) (*EgressController, error) {
c := &EgressController{
Expand All @@ -181,6 +188,10 @@ func NewEgressController(
localIPDetector: ipassigner.NewLocalIPDetector(),
idAllocator: newIDAllocator(minEgressMark, maxEgressMark),
cluster: cluster,
serviceCIDRInterface: serviceCIDRInterface,
// One buffer is enough as we just use it to ensure the target handler is executed once.
serviceCIDRUpdateCh: make(chan struct{}, 1),
serviceCIDRUpdateRetryDelay: 10 * time.Second,
}
ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice)
if err != nil {
Expand Down Expand Up @@ -223,6 +234,7 @@ func NewEgressController(
podUpdateSubscriber.Subscribe(c.processPodUpdate)
c.localIPDetector.AddEventHandler(c.onLocalIPUpdate)
c.egressIPScheduler.AddEventHandler(c.onEgressIPSchedule)
c.serviceCIDRInterface.AddEventHandler(c.onServiceCIDRUpdate)
return c, nil
}

Expand All @@ -231,6 +243,44 @@ func (c *EgressController) onEgressIPSchedule(egress string) {
c.queue.Add(egress)
}

// onServiceCIDRUpdate will be called when ServiceCIDRs change.
// It ensures updateServiceCIDRs will be executed once after this call.
func (c *EgressController) onServiceCIDRUpdate(_ []*net.IPNet) {
select {
case c.serviceCIDRUpdateCh <- struct{}{}:
default:
// The previous event is not processed yet, discard the new event.
}
}

func (c *EgressController) updateServiceCIDRs(stopCh <-chan struct{}) {
timer := time.NewTimer(0)
defer timer.Stop()
<-timer.C // Consume the first tick.
for {
select {
case <-stopCh:
return
case <-c.serviceCIDRUpdateCh:
klog.V(2).InfoS("Received service CIDR update")
case <-timer.C:
klog.V(2).InfoS("Service CIDR update timer expired")
}
serviceCIDRs, err := c.serviceCIDRInterface.GetServiceCIDRs()
if err != nil {
klog.ErrorS(err, "Failed to get Service CIDRs")
// No need to retry in this case as the Service CIDRs won't be available until it receives a service CIDRs update.
continue
}
err = c.ofClient.InstallSNATBypassServiceFlows(serviceCIDRs)
if err != nil {
klog.ErrorS(err, "Failed to install SNAT bypass flows for Service CIDRs, will retry", "serviceCIDRs", serviceCIDRs)
// Schedule a retry as it should be transient error.
timer.Reset(c.serviceCIDRUpdateRetryDelay)
}
}
}

// processPodUpdate will be called when CNIServer publishes a Pod update event.
// It triggers reconciling the effective Egress of the Pod.
func (c *EgressController) processPodUpdate(e interface{}) {
Expand Down Expand Up @@ -323,6 +373,8 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {

go wait.NonSlidingUntil(c.watchEgressGroup, 5*time.Second, stopCh)

go c.updateServiceCIDRs(stopCh)

for i := 0; i < defaultWorkers; i++ {
go wait.Until(c.worker, time.Second, stopCh)
}
Expand Down
87 changes: 69 additions & 18 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"antrea.io/antrea/pkg/agent/memberlist"
openflowtest "antrea.io/antrea/pkg/agent/openflow/testing"
routetest "antrea.io/antrea/pkg/agent/route/testing"
servicecidrtest "antrea.io/antrea/pkg/agent/servicecidr/testing"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2"
Expand All @@ -49,6 +50,7 @@ import (
fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake"
crdinformers "antrea.io/antrea/pkg/client/informers/externalversions"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/ip"
"antrea.io/antrea/pkg/util/k8s"
)

Expand Down Expand Up @@ -128,14 +130,15 @@ func mockNewIPAssigner(ipAssigner ipassigner.IPAssigner) func() {

type fakeController struct {
*EgressController
mockController *gomock.Controller
mockOFClient *openflowtest.MockClient
mockRouteClient *routetest.MockInterface
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
informerFactory informers.SharedInformerFactory
mockIPAssigner *ipassignertest.MockIPAssigner
podUpdateChannel *channel.SubscribableChannel
mockController *gomock.Controller
mockOFClient *openflowtest.MockClient
mockRouteClient *routetest.MockInterface
crdClient *fakeversioned.Clientset
crdInformerFactory crdinformers.SharedInformerFactory
informerFactory informers.SharedInformerFactory
mockIPAssigner *ipassignertest.MockIPAssigner
mockServiceCIDRInterface *servicecidrtest.MockInterface
podUpdateChannel *channel.SubscribableChannel
}

func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeController {
Expand Down Expand Up @@ -163,7 +166,8 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
addPodInterface(ifaceStore, "ns4", "pod4", 4)

podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100)

mockServiceCIDRProvider := servicecidrtest.NewMockInterface(controller)
mockServiceCIDRProvider.EXPECT().AddEventHandler(gomock.Any())
egressController, _ := NewEgressController(mockOFClient,
&antreaClientGetter{clientset},
crdClient,
Expand All @@ -175,19 +179,21 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll
egressInformer,
nodeInformer,
podUpdateChannel,
mockServiceCIDRProvider,
255,
)
egressController.localIPDetector = localIPDetector
return &fakeController{
EgressController: egressController,
mockController: controller,
mockOFClient: mockOFClient,
mockRouteClient: mockRouteClient,
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
informerFactory: informerFactory,
mockIPAssigner: mockIPAssigner,
podUpdateChannel: podUpdateChannel,
EgressController: egressController,
mockController: controller,
mockOFClient: mockOFClient,
mockRouteClient: mockRouteClient,
crdClient: crdClient,
crdInformerFactory: crdInformerFactory,
informerFactory: informerFactory,
mockIPAssigner: mockIPAssigner,
mockServiceCIDRInterface: mockServiceCIDRProvider,
podUpdateChannel: podUpdateChannel,
}
}

Expand Down Expand Up @@ -1135,6 +1141,51 @@ func TestGetEgressIPByMark(t *testing.T) {
}
}

func TestUpdateServiceCIDRs(t *testing.T) {
c := newFakeController(t, nil)
stopCh := make(chan struct{})
defer close(stopCh)
// Retry immediately.
c.serviceCIDRUpdateRetryDelay = 0

serviceCIDRs := []*net.IPNet{
ip.MustParseCIDR("10.96.0.0/16"),
ip.MustParseCIDR("1096::/64"),
}
assert.Len(t, c.serviceCIDRUpdateCh, 0)
// Call the handler the 1st time, it should enqueue an event.
c.onServiceCIDRUpdate(serviceCIDRs)
assert.Len(t, c.serviceCIDRUpdateCh, 1)
// Call the handler the 2nd time, it should not block and should discard the event.
c.onServiceCIDRUpdate(serviceCIDRs)
assert.Len(t, c.serviceCIDRUpdateCh, 1)

// In the 1st round, returning the ServiceCIDRs fails, it should not retry.
c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(nil, fmt.Errorf("not initialized"))

go c.updateServiceCIDRs(stopCh)

// Wait for the event to be processed.
require.Eventually(t, func() bool {
return len(c.serviceCIDRUpdateCh) == 0
}, time.Second, 100*time.Millisecond)
// In the 2nd round, returning the ServiceCIDR succeeds but installing flows fails, it should retry.
c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil)
c.mockOFClient.EXPECT().InstallSNATBypassServiceFlows(serviceCIDRs).Return(fmt.Errorf("transient error"))
// In the 3rd round, both succeed.
finishCh := make(chan struct{})
c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil)
c.mockOFClient.EXPECT().InstallSNATBypassServiceFlows(serviceCIDRs).Do(func(_ []*net.IPNet) { close(finishCh) }).Return(nil)
// Enqueue only one event as the 2nd failure is supposed to trigger a retry.
c.onServiceCIDRUpdate(serviceCIDRs)

select {
case <-finishCh:
case <-time.After(time.Second):
t.Errorf("InstallSNATBypassServiceFlows didn't succeed in time")
}
}

func checkQueueItemExistence(t *testing.T, queue workqueue.RateLimitingInterface, items ...string) {
t.Logf("queue len %d", queue.Len())
require.Eventually(t, func() bool {
Expand Down
18 changes: 17 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ type Client interface {
// are removed from PolicyRule.From, else from PolicyRule.To.
DeletePolicyRuleAddress(ruleID uint32, addrType types.AddressType, addresses []types.Address, priority *uint16) error

// InstallSNATBypassServiceFlows installs flows to prevent traffic destined for the specified Service CIDRs from
// being SNAT'd. Otherwise, such Pod-to-Service traffic would be forwarded to Egress Node and be load-balanced
// remotely, as opposed to locally, when AntreaProxy is asked to skip some Services or is not running at all.
// Calling the method with new CIDRs will override the flows installed for previous CIDRs.
InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error

// InstallSNATMarkFlows installs flows for a local SNAT IP. On Linux, a
// single flow is added to mark the packets tunnelled from remote Nodes
// that should be SNAT'd with the SNAT IP.
Expand All @@ -145,7 +151,7 @@ type Client interface {

// InstallPodSNATFlows installs the SNAT flows for a local Pod. If the
// SNAT IP for the Pod is on the local Node, a non-zero SNAT ID should
// allocated for the SNAT IP, and the installed flow sets the SNAT IP
// be allocated for the SNAT IP, and the installed flow sets the SNAT IP
// mark on the egress packets from the ofPort; if the SNAT IP is on a
// remote Node, snatMark should be set to 0, and the installed flow
// tunnels egress packets to the remote Node using the SNAT IP as the
Expand Down Expand Up @@ -989,6 +995,16 @@ func (c *client) generatePipelines() {
}
}

func (c *client) InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error {
var flows []binding.Flow
for _, serviceCIDR := range serviceCIDRs {
flows = append(flows, c.featureEgress.snatSkipCIDRFlow(*serviceCIDR))
}
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
return c.modifyFlows(c.featureEgress.cachedFlows, "svc-cidrs", flows)
}

func (c *client) InstallSNATMarkFlows(snatIP net.IP, mark uint32) error {
flow := c.featureEgress.snatIPFromTunnelFlow(snatIP, mark)
cacheKey := fmt.Sprintf("s%x", mark)
Expand Down
81 changes: 81 additions & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1468,6 +1468,87 @@ func Test_client_GetServiceFlowKeys(t *testing.T) {
assert.ElementsMatch(t, expectedFlowKeys, flowKeys)
}

func Test_client_InstallSNATBypassServiceFlows(t *testing.T) {
testCases := []struct {
name string
serviceCIDRs []*net.IPNet
newServiceCIDRs []*net.IPNet
expectedFlows []string
expectedNewFlows []string
}{
{
name: "IPv4",
serviceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/24"),
},
newServiceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/16"),
},
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/24 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
expectedNewFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/16 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "IPv6",
serviceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("1096::/80"),
},
newServiceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("1096::/64"),
},
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/80 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
expectedNewFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/64 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
{
name: "dual-stack",
serviceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/24"),
utilip.MustParseCIDR("1096::/80"),
},
newServiceCIDRs: []*net.IPNet{
utilip.MustParseCIDR("10.96.0.0/16"),
utilip.MustParseCIDR("1096::/64"),
},
expectedFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/24 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/80 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
expectedNewFlows: []string{
"cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/16 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
"cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/64 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
m := oftest.NewMockOFEntryOperations(ctrl)

fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap)
defer resetPipelines()

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
assert.NoError(t, fc.InstallSNATBypassServiceFlows(tc.serviceCIDRs))
fCacheI, ok := fc.featureEgress.cachedFlows.Load("svc-cidrs")
require.True(t, ok)
assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI))

m.EXPECT().BundleOps(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1)
assert.NoError(t, fc.InstallSNATBypassServiceFlows(tc.newServiceCIDRs))
fCacheI, ok = fc.featureEgress.cachedFlows.Load("svc-cidrs")
require.True(t, ok)
assert.ElementsMatch(t, tc.expectedNewFlows, getFlowStrings(fCacheI))
})
}
}

func Test_client_InstallSNATMarkFlows(t *testing.T) {
mark := uint32(100)

Expand Down
Loading

0 comments on commit 5faab07

Please sign in to comment.