diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 5742c6bc3a6..f1167fdb24a 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -514,7 +514,7 @@ func run(o *Options) error { if o.enableEgress { egressController, err = egress.NewEgressController( ofClient, antreaClientProvider, crdClient, ifaceStore, routeClient, nodeConfig.Name, nodeConfig.NodeTransportInterfaceName, - memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, o.config.Egress.MaxEgressIPsPerNode, + memberlistCluster, egressInformer, nodeInformer, podUpdateChannel, serviceCIDRProvider, o.config.Egress.MaxEgressIPsPerNode, ) if err != nil { return fmt.Errorf("error creating new Egress controller: %v", err) diff --git a/pkg/agent/controller/egress/egress_controller.go b/pkg/agent/controller/egress/egress_controller.go index d310c757cf3..50771d365cb 100644 --- a/pkg/agent/controller/egress/egress_controller.go +++ b/pkg/agent/controller/egress/egress_controller.go @@ -42,6 +42,7 @@ import ( "antrea.io/antrea/pkg/agent/memberlist" "antrea.io/antrea/pkg/agent/openflow" "antrea.io/antrea/pkg/agent/route" + "antrea.io/antrea/pkg/agent/servicecidr" "antrea.io/antrea/pkg/agent/types" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" crdv1b1 "antrea.io/antrea/pkg/apis/crd/v1beta1" @@ -147,6 +148,11 @@ type EgressController struct { ipAssigner ipassigner.IPAssigner egressIPScheduler *egressIPScheduler + + serviceCIDRInterface servicecidr.Interface + serviceCIDRUpdateCh chan struct{} + // Declared for testing. + serviceCIDRUpdateRetryDelay time.Duration } func NewEgressController( @@ -161,6 +167,7 @@ func NewEgressController( egressInformer crdinformers.EgressInformer, nodeInformers coreinformers.NodeInformer, podUpdateSubscriber channel.Subscriber, + serviceCIDRInterface servicecidr.Interface, maxEgressIPsPerNode int, ) (*EgressController, error) { c := &EgressController{ @@ -181,6 +188,10 @@ func NewEgressController( localIPDetector: ipassigner.NewLocalIPDetector(), idAllocator: newIDAllocator(minEgressMark, maxEgressMark), cluster: cluster, + serviceCIDRInterface: serviceCIDRInterface, + // One buffer is enough as we just use it to ensure the target handler is executed once. + serviceCIDRUpdateCh: make(chan struct{}, 1), + serviceCIDRUpdateRetryDelay: 10 * time.Second, } ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice) if err != nil { @@ -223,6 +234,7 @@ func NewEgressController( podUpdateSubscriber.Subscribe(c.processPodUpdate) c.localIPDetector.AddEventHandler(c.onLocalIPUpdate) c.egressIPScheduler.AddEventHandler(c.onEgressIPSchedule) + c.serviceCIDRInterface.AddEventHandler(c.onServiceCIDRUpdate) return c, nil } @@ -231,6 +243,44 @@ func (c *EgressController) onEgressIPSchedule(egress string) { c.queue.Add(egress) } +// onServiceCIDRUpdate will be called when ServiceCIDRs change. +// It ensures updateServiceCIDRs will be executed once after this call. +func (c *EgressController) onServiceCIDRUpdate(_ []*net.IPNet) { + select { + case c.serviceCIDRUpdateCh <- struct{}{}: + default: + // The previous event is not processed yet, discard the new event. + } +} + +func (c *EgressController) updateServiceCIDRs(stopCh <-chan struct{}) { + timer := time.NewTimer(0) + defer timer.Stop() + <-timer.C // Consume the first tick. + for { + select { + case <-stopCh: + return + case <-c.serviceCIDRUpdateCh: + klog.V(2).InfoS("Received service CIDR update") + case <-timer.C: + klog.V(2).InfoS("Service CIDR update timer expired") + } + serviceCIDRs, err := c.serviceCIDRInterface.GetServiceCIDRs() + if err != nil { + klog.ErrorS(err, "Failed to get Service CIDRs") + // No need to retry in this case as the Service CIDRs won't be available until it receives a service CIDRs update. + continue + } + err = c.ofClient.InstallSNATBypassServiceFlows(serviceCIDRs) + if err != nil { + klog.ErrorS(err, "Failed to install SNAT bypass flows for Service CIDRs, will retry", "serviceCIDRs", serviceCIDRs) + // Schedule a retry as it should be transient error. + timer.Reset(c.serviceCIDRUpdateRetryDelay) + } + } +} + // processPodUpdate will be called when CNIServer publishes a Pod update event. // It triggers reconciling the effective Egress of the Pod. func (c *EgressController) processPodUpdate(e interface{}) { @@ -323,6 +373,8 @@ func (c *EgressController) Run(stopCh <-chan struct{}) { go wait.NonSlidingUntil(c.watchEgressGroup, 5*time.Second, stopCh) + go c.updateServiceCIDRs(stopCh) + for i := 0; i < defaultWorkers; i++ { go wait.Until(c.worker, time.Second, stopCh) } diff --git a/pkg/agent/controller/egress/egress_controller_test.go b/pkg/agent/controller/egress/egress_controller_test.go index f956f280070..07e9155f71d 100644 --- a/pkg/agent/controller/egress/egress_controller_test.go +++ b/pkg/agent/controller/egress/egress_controller_test.go @@ -41,6 +41,7 @@ import ( "antrea.io/antrea/pkg/agent/memberlist" openflowtest "antrea.io/antrea/pkg/agent/openflow/testing" routetest "antrea.io/antrea/pkg/agent/route/testing" + servicecidrtest "antrea.io/antrea/pkg/agent/servicecidr/testing" "antrea.io/antrea/pkg/agent/types" "antrea.io/antrea/pkg/agent/util" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" @@ -49,6 +50,7 @@ import ( fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" "antrea.io/antrea/pkg/util/channel" + "antrea.io/antrea/pkg/util/ip" "antrea.io/antrea/pkg/util/k8s" ) @@ -128,14 +130,15 @@ func mockNewIPAssigner(ipAssigner ipassigner.IPAssigner) func() { type fakeController struct { *EgressController - mockController *gomock.Controller - mockOFClient *openflowtest.MockClient - mockRouteClient *routetest.MockInterface - crdClient *fakeversioned.Clientset - crdInformerFactory crdinformers.SharedInformerFactory - informerFactory informers.SharedInformerFactory - mockIPAssigner *ipassignertest.MockIPAssigner - podUpdateChannel *channel.SubscribableChannel + mockController *gomock.Controller + mockOFClient *openflowtest.MockClient + mockRouteClient *routetest.MockInterface + crdClient *fakeversioned.Clientset + crdInformerFactory crdinformers.SharedInformerFactory + informerFactory informers.SharedInformerFactory + mockIPAssigner *ipassignertest.MockIPAssigner + mockServiceCIDRInterface *servicecidrtest.MockInterface + podUpdateChannel *channel.SubscribableChannel } func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeController { @@ -163,7 +166,8 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll addPodInterface(ifaceStore, "ns4", "pod4", 4) podUpdateChannel := channel.NewSubscribableChannel("PodUpdate", 100) - + mockServiceCIDRProvider := servicecidrtest.NewMockInterface(controller) + mockServiceCIDRProvider.EXPECT().AddEventHandler(gomock.Any()) egressController, _ := NewEgressController(mockOFClient, &antreaClientGetter{clientset}, crdClient, @@ -175,19 +179,21 @@ func newFakeController(t *testing.T, initObjects []runtime.Object) *fakeControll egressInformer, nodeInformer, podUpdateChannel, + mockServiceCIDRProvider, 255, ) egressController.localIPDetector = localIPDetector return &fakeController{ - EgressController: egressController, - mockController: controller, - mockOFClient: mockOFClient, - mockRouteClient: mockRouteClient, - crdClient: crdClient, - crdInformerFactory: crdInformerFactory, - informerFactory: informerFactory, - mockIPAssigner: mockIPAssigner, - podUpdateChannel: podUpdateChannel, + EgressController: egressController, + mockController: controller, + mockOFClient: mockOFClient, + mockRouteClient: mockRouteClient, + crdClient: crdClient, + crdInformerFactory: crdInformerFactory, + informerFactory: informerFactory, + mockIPAssigner: mockIPAssigner, + mockServiceCIDRInterface: mockServiceCIDRProvider, + podUpdateChannel: podUpdateChannel, } } @@ -1135,6 +1141,51 @@ func TestGetEgressIPByMark(t *testing.T) { } } +func TestUpdateServiceCIDRs(t *testing.T) { + c := newFakeController(t, nil) + stopCh := make(chan struct{}) + defer close(stopCh) + // Retry immediately. + c.serviceCIDRUpdateRetryDelay = 0 + + serviceCIDRs := []*net.IPNet{ + ip.MustParseCIDR("10.96.0.0/16"), + ip.MustParseCIDR("1096::/64"), + } + assert.Len(t, c.serviceCIDRUpdateCh, 0) + // Call the handler the 1st time, it should enqueue an event. + c.onServiceCIDRUpdate(serviceCIDRs) + assert.Len(t, c.serviceCIDRUpdateCh, 1) + // Call the handler the 2nd time, it should not block and should discard the event. + c.onServiceCIDRUpdate(serviceCIDRs) + assert.Len(t, c.serviceCIDRUpdateCh, 1) + + // In the 1st round, returning the ServiceCIDRs fails, it should not retry. + c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(nil, fmt.Errorf("not initialized")) + + go c.updateServiceCIDRs(stopCh) + + // Wait for the event to be processed. + require.Eventually(t, func() bool { + return len(c.serviceCIDRUpdateCh) == 0 + }, time.Second, 100*time.Millisecond) + // In the 2nd round, returning the ServiceCIDR succeeds but installing flows fails, it should retry. + c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil) + c.mockOFClient.EXPECT().InstallSNATBypassServiceFlows(serviceCIDRs).Return(fmt.Errorf("transient error")) + // In the 3rd round, both succeed. + finishCh := make(chan struct{}) + c.mockServiceCIDRInterface.EXPECT().GetServiceCIDRs().Return(serviceCIDRs, nil) + c.mockOFClient.EXPECT().InstallSNATBypassServiceFlows(serviceCIDRs).Do(func(_ []*net.IPNet) { close(finishCh) }).Return(nil) + // Enqueue only one event as the 2nd failure is supposed to trigger a retry. + c.onServiceCIDRUpdate(serviceCIDRs) + + select { + case <-finishCh: + case <-time.After(time.Second): + t.Errorf("InstallSNATBypassServiceFlows didn't succeed in time") + } +} + func checkQueueItemExistence(t *testing.T, queue workqueue.RateLimitingInterface, items ...string) { t.Logf("queue len %d", queue.Len()) require.Eventually(t, func() bool { diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 92a88b522ea..69c925fa245 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -134,6 +134,12 @@ type Client interface { // are removed from PolicyRule.From, else from PolicyRule.To. DeletePolicyRuleAddress(ruleID uint32, addrType types.AddressType, addresses []types.Address, priority *uint16) error + // InstallSNATBypassServiceFlows installs flows to prevent traffic destined for the specified Service CIDRs from + // being SNAT'd. Otherwise, such Pod-to-Service traffic would be forwarded to Egress Node and be load-balanced + // remotely, as opposed to locally, when AntreaProxy is asked to skip some Services or is not running at all. + // Calling the method with new CIDRs will override the flows installed for previous CIDRs. + InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error + // InstallSNATMarkFlows installs flows for a local SNAT IP. On Linux, a // single flow is added to mark the packets tunnelled from remote Nodes // that should be SNAT'd with the SNAT IP. @@ -145,7 +151,7 @@ type Client interface { // InstallPodSNATFlows installs the SNAT flows for a local Pod. If the // SNAT IP for the Pod is on the local Node, a non-zero SNAT ID should - // allocated for the SNAT IP, and the installed flow sets the SNAT IP + // be allocated for the SNAT IP, and the installed flow sets the SNAT IP // mark on the egress packets from the ofPort; if the SNAT IP is on a // remote Node, snatMark should be set to 0, and the installed flow // tunnels egress packets to the remote Node using the SNAT IP as the @@ -989,6 +995,16 @@ func (c *client) generatePipelines() { } } +func (c *client) InstallSNATBypassServiceFlows(serviceCIDRs []*net.IPNet) error { + var flows []binding.Flow + for _, serviceCIDR := range serviceCIDRs { + flows = append(flows, c.featureEgress.snatSkipCIDRFlow(*serviceCIDR)) + } + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + return c.modifyFlows(c.featureEgress.cachedFlows, "svc-cidrs", flows) +} + func (c *client) InstallSNATMarkFlows(snatIP net.IP, mark uint32) error { flow := c.featureEgress.snatIPFromTunnelFlow(snatIP, mark) cacheKey := fmt.Sprintf("s%x", mark) diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index b02f503352e..e4623b2955f 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -1468,6 +1468,87 @@ func Test_client_GetServiceFlowKeys(t *testing.T) { assert.ElementsMatch(t, expectedFlowKeys, flowKeys) } +func Test_client_InstallSNATBypassServiceFlows(t *testing.T) { + testCases := []struct { + name string + serviceCIDRs []*net.IPNet + newServiceCIDRs []*net.IPNet + expectedFlows []string + expectedNewFlows []string + }{ + { + name: "IPv4", + serviceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("10.96.0.0/24"), + }, + newServiceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("10.96.0.0/16"), + }, + expectedFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/24 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + expectedNewFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/16 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + }, + { + name: "IPv6", + serviceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("1096::/80"), + }, + newServiceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("1096::/64"), + }, + expectedFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/80 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + expectedNewFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/64 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + }, + { + name: "dual-stack", + serviceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("10.96.0.0/24"), + utilip.MustParseCIDR("1096::/80"), + }, + newServiceCIDRs: []*net.IPNet{ + utilip.MustParseCIDR("10.96.0.0/16"), + utilip.MustParseCIDR("1096::/64"), + }, + expectedFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/24 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + "cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/80 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + expectedNewFlows: []string{ + "cookie=0x1040000000000, table=EgressMark, priority=210,ip,nw_dst=10.96.0.0/16 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + "cookie=0x1040000000000, table=EgressMark, priority=210,ipv6,ipv6_dst=1096::/64 actions=set_field:0x20/0xf0->reg0,goto_table:L2ForwardingCalc", + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + m := oftest.NewMockOFEntryOperations(ctrl) + + fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap) + defer resetPipelines() + + m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) + assert.NoError(t, fc.InstallSNATBypassServiceFlows(tc.serviceCIDRs)) + fCacheI, ok := fc.featureEgress.cachedFlows.Load("svc-cidrs") + require.True(t, ok) + assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI)) + + m.EXPECT().BundleOps(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(1) + assert.NoError(t, fc.InstallSNATBypassServiceFlows(tc.newServiceCIDRs)) + fCacheI, ok = fc.featureEgress.cachedFlows.Load("svc-cidrs") + require.True(t, ok) + assert.ElementsMatch(t, tc.expectedNewFlows, getFlowStrings(fCacheI)) + }) + } +} + func Test_client_InstallSNATMarkFlows(t *testing.T) { mark := uint32(100) diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 2bc7e108bad..3219d96c1a7 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2283,6 +2283,18 @@ func (f *featureNetworkPolicy) ingressClassifierFlows() []binding.Flow { return flows } +// snatSkipCIDRFlow generates the flow to skip SNAT for connection destined for the provided CIDR. +func (f *featureEgress) snatSkipCIDRFlow(cidr net.IPNet) binding.Flow { + ipProtocol := getIPProtocol(cidr.IP) + return EgressMarkTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(ipProtocol). + MatchDstIPNet(cidr). + Action().LoadRegMark(ToGatewayRegMark). + Action().GotoStage(stageSwitching). + Done() +} + // snatSkipNodeFlow generates the flow to skip SNAT for connection destined for the transport IP of a remote Node. func (f *featureEgress) snatSkipNodeFlow(nodeIP net.IP) binding.Flow { ipProtocol := getIPProtocol(nodeIP) @@ -2719,13 +2731,7 @@ func (f *featureEgress) externalFlows() []binding.Flow { ) // This generates the flows to bypass the packets sourced from local Pods and destined for the except CIDRs for Egress. for _, cidr := range f.exceptCIDRs[ipProtocol] { - flows = append(flows, EgressMarkTable.ofTable.BuildFlow(priorityHigh). - Cookie(cookieID). - MatchProtocol(ipProtocol). - MatchDstIPNet(cidr). - Action().LoadRegMark(ToGatewayRegMark). - Action().GotoStage(stageSwitching). - Done()) + flows = append(flows, f.snatSkipCIDRFlow(cidr)) } } // This generates the flow to match the packets of tracked Egress connection and forward them to stageSwitching. diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 1cd0a2e75ba..5bd0b28c00b 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -442,6 +442,20 @@ func (mr *MockClientMockRecorder) InstallPolicyRuleFlows(arg0 interface{}) *gomo return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPolicyRuleFlows", reflect.TypeOf((*MockClient)(nil).InstallPolicyRuleFlows), arg0) } +// InstallSNATBypassServiceFlows mocks base method +func (m *MockClient) InstallSNATBypassServiceFlows(arg0 []*net.IPNet) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallSNATBypassServiceFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallSNATBypassServiceFlows indicates an expected call of InstallSNATBypassServiceFlows +func (mr *MockClientMockRecorder) InstallSNATBypassServiceFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallSNATBypassServiceFlows", reflect.TypeOf((*MockClient)(nil).InstallSNATBypassServiceFlows), arg0) +} + // InstallSNATMarkFlows mocks base method func (m *MockClient) InstallSNATMarkFlows(arg0 net.IP, arg1 uint32) error { m.ctrl.T.Helper() diff --git a/pkg/agent/servicecidr/discoverer.go b/pkg/agent/servicecidr/discoverer.go index a6a2dc74ba5..9dbe18afc81 100644 --- a/pkg/agent/servicecidr/discoverer.go +++ b/pkg/agent/servicecidr/discoverer.go @@ -42,7 +42,7 @@ const ( type EventHandler func(serviceCIDRs []*net.IPNet) type Interface interface { - GetServiceCIDR(isIPv6 bool) (*net.IPNet, error) + GetServiceCIDRs() ([]*net.IPNet, error) // The added handlers will be called when Service CIDR changes. AddEventHandler(handler EventHandler) } @@ -56,6 +56,8 @@ type Discoverer struct { eventHandlers []EventHandler // queue maintains the Service objects that need to be synced. queue workqueue.Interface + // initialized indicates whether the Discoverer has been initialized. + initialized bool } func NewServiceCIDRDiscoverer(serviceInformer coreinformers.ServiceInformer) *Discoverer { @@ -104,19 +106,20 @@ func (d *Discoverer) Run(stopCh <-chan struct{}) { <-stopCh } -func (d *Discoverer) GetServiceCIDR(isIPv6 bool) (*net.IPNet, error) { +func (d *Discoverer) GetServiceCIDRs() ([]*net.IPNet, error) { d.RLock() defer d.RUnlock() - if isIPv6 { - if d.serviceIPv6CIDR == nil { - return nil, fmt.Errorf("Service IPv6 CIDR is not available yet") - } - return d.serviceIPv6CIDR, nil + if !d.initialized { + return nil, fmt.Errorf("Service CIDR discoverer is not initialized yet") + } + var serviceCIDRs []*net.IPNet + if d.serviceIPv4CIDR != nil { + serviceCIDRs = append(serviceCIDRs, d.serviceIPv4CIDR) } - if d.serviceIPv4CIDR == nil { - return nil, fmt.Errorf("Service IPv4 CIDR is not available yet") + if d.serviceIPv6CIDR != nil { + serviceCIDRs = append(serviceCIDRs, d.serviceIPv6CIDR) } - return d.serviceIPv4CIDR, nil + return serviceCIDRs, nil } func (d *Discoverer) AddEventHandler(handler EventHandler) { @@ -178,8 +181,10 @@ func (d *Discoverer) updateServiceCIDR(svcs ...*corev1.Service) { continue } } else { + mask := net.CIDRMask(mask, mask) + clusterIP := clusterIP.Mask(mask) // If the calculated Service CIDR doesn't exist, generate a new Service CIDR with the ClusterIP. - newServiceCIDR = &net.IPNet{IP: clusterIP, Mask: net.CIDRMask(mask, mask)} + newServiceCIDR = &net.IPNet{IP: clusterIP, Mask: mask} } if isIPv6 { @@ -207,6 +212,7 @@ func (d *Discoverer) updateServiceCIDR(svcs ...*corev1.Service) { klog.InfoS("Service IPv6 CIDR was updated", "ServiceCIDR", curServiceIPv6CIDR) newServiceCIDRs = append(newServiceCIDRs, curServiceIPv6CIDR) } + d.initialized = true }() for _, handler := range d.eventHandlers { handler(newServiceCIDRs) diff --git a/pkg/agent/servicecidr/discoverer_test.go b/pkg/agent/servicecidr/discoverer_test.go index 850db83de6a..48d13f243a2 100644 --- a/pkg/agent/servicecidr/discoverer_test.go +++ b/pkg/agent/servicecidr/discoverer_test.go @@ -25,6 +25,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + + "antrea.io/antrea/pkg/util/ip" ) func makeService(namespace, name string, clusterIP string, protocol corev1.Protocol) *corev1.Service { @@ -54,12 +56,10 @@ func TestServiceCIDRProvider(t *testing.T) { informerFactory := informers.NewSharedInformerFactory(client, 0) svcInformer := informerFactory.Core().V1().Services() serviceCIDRProvider := NewServiceCIDRDiscoverer(svcInformer) - serviceCIDRChan := make(chan *net.IPNet) + serviceCIDRChan := make(chan []*net.IPNet) serviceCIDRProvider.AddEventHandler(func(serviceCIDRs []*net.IPNet) { { - for _, serviceCIDR := range serviceCIDRs { - serviceCIDRChan <- serviceCIDR - } + serviceCIDRChan <- serviceCIDRs } }) @@ -69,11 +69,11 @@ func TestServiceCIDRProvider(t *testing.T) { informerFactory.WaitForCacheSync(stopCh) go serviceCIDRProvider.Run(stopCh) - check := func(expectedServiceCIDR string, isServiceCIDRUpdated, isIPv6 bool) { - if isServiceCIDRUpdated { + check := func(expectedServiceCIDRs []*net.IPNet, expectedEvent []*net.IPNet) { + if expectedEvent != nil { select { case event := <-serviceCIDRChan: - assert.Equal(t, expectedServiceCIDR, event.String()) + assert.Equal(t, expectedEvent, event) case <-time.After(time.Second): t.Fatalf("timed out waiting for expected Service CIDR") } @@ -84,70 +84,72 @@ func TestServiceCIDRProvider(t *testing.T) { case <-time.After(100 * time.Millisecond): } } - serviceCIDR, err := serviceCIDRProvider.GetServiceCIDR(isIPv6) - if expectedServiceCIDR != "" { + serviceCIDRs, err := serviceCIDRProvider.GetServiceCIDRs() + if expectedServiceCIDRs != nil { assert.NoError(t, err) - assert.Equal(t, expectedServiceCIDR, serviceCIDR.String()) + assert.Equal(t, expectedServiceCIDRs, serviceCIDRs) } else { - assert.ErrorContains(t, err, "CIDR is not available yet") + assert.ErrorContains(t, err, "Service CIDR discoverer is not initialized yet") } } + check(nil, nil) + svc := makeService("ns1", "svc0", "None", corev1.ProtocolTCP) _, err := client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("", false, false) + check(nil, nil) svc = makeService("ns1", "svc1", "10.10.0.1", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10.10.0.1/32", true, false) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.1/32")}, []*net.IPNet{ip.MustParseCIDR("10.10.0.1/32")}) svc = makeService("ns1", "svc2", "10.10.0.2", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10.10.0.0/30", true, false) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/30")}, []*net.IPNet{ip.MustParseCIDR("10.10.0.0/30")}) svc = makeService("ns1", "svc5", "10.10.0.5", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10.10.0.0/29", true, false) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29")}, []*net.IPNet{ip.MustParseCIDR("10.10.0.0/29")}) svc = makeService("ns1", "svc4", "10.10.0.4", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10.10.0.0/29", false, false) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29")}, nil) err = client.CoreV1().Services("ns1").Delete(context.TODO(), "svc4", metav1.DeleteOptions{}) assert.NoError(t, err) - check("10.10.0.0/29", false, false) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29")}, nil) svc = makeService("ns1", "svc60", "None", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("", false, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29")}, nil) svc = makeService("ns1", "svc61", "10::1", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10::1/128", true, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29"), ip.MustParseCIDR("10::1/128")}, []*net.IPNet{ip.MustParseCIDR("10::1/128")}) svc = makeService("ns1", "svc62", "10::2", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10::/126", true, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29"), ip.MustParseCIDR("10::/126")}, []*net.IPNet{ip.MustParseCIDR("10::/126")}) svc = makeService("ns1", "svc65", "10::5", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10::/125", true, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29"), ip.MustParseCIDR("10::/125")}, []*net.IPNet{ip.MustParseCIDR("10::/125")}) svc = makeService("ns1", "svc64", "10::4", corev1.ProtocolTCP) _, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{}) assert.NoError(t, err) - check("10::/125", false, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29"), ip.MustParseCIDR("10::/125")}, nil) err = client.CoreV1().Services("ns1").Delete(context.TODO(), "svc64", metav1.DeleteOptions{}) assert.NoError(t, err) - check("10::/125", false, true) + check([]*net.IPNet{ip.MustParseCIDR("10.10.0.0/29"), ip.MustParseCIDR("10::/125")}, nil) } diff --git a/pkg/agent/servicecidr/testing/mock_servicecidr.go b/pkg/agent/servicecidr/testing/mock_servicecidr.go index 3c149aba119..56e940f0e8f 100644 --- a/pkg/agent/servicecidr/testing/mock_servicecidr.go +++ b/pkg/agent/servicecidr/testing/mock_servicecidr.go @@ -61,17 +61,17 @@ func (mr *MockInterfaceMockRecorder) AddEventHandler(arg0 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddEventHandler", reflect.TypeOf((*MockInterface)(nil).AddEventHandler), arg0) } -// GetServiceCIDR mocks base method -func (m *MockInterface) GetServiceCIDR(arg0 bool) (*net.IPNet, error) { +// GetServiceCIDRs mocks base method +func (m *MockInterface) GetServiceCIDRs() ([]*net.IPNet, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetServiceCIDR", arg0) - ret0, _ := ret[0].(*net.IPNet) + ret := m.ctrl.Call(m, "GetServiceCIDRs") + ret0, _ := ret[0].([]*net.IPNet) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetServiceCIDR indicates an expected call of GetServiceCIDR -func (mr *MockInterfaceMockRecorder) GetServiceCIDR(arg0 interface{}) *gomock.Call { +// GetServiceCIDRs indicates an expected call of GetServiceCIDRs +func (mr *MockInterfaceMockRecorder) GetServiceCIDRs() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceCIDR", reflect.TypeOf((*MockInterface)(nil).GetServiceCIDR), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetServiceCIDRs", reflect.TypeOf((*MockInterface)(nil).GetServiceCIDRs)) }