diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index e140f3c0790..268a911c623 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -91,6 +91,10 @@ featureGates: # Enable NodeLatencyMonitor to monitor the latency between Nodes. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "NodeLatencyMonitor" "default" false) }} +# Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to +# remote BGP peers. +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "BGPPolicy" "default" false) }} + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: {{ .Values.ovs.bridgeName | quote }} diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index ef1c43e40e8..be46ac81f8b 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -177,6 +177,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -234,3 +235,12 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index 1d9c4c57eff..52fd80281fb 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,15 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5124,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: f976029accf54258d01ad907fe19b50ac671eee014cd8aea968c6a0bc7e8f95a + checksum/config: 32e079ffe409d512a54f34fa9312e54a5d4c9125aa9695fda915c83fe77e29b9 labels: app: antrea component: antrea-agent @@ -5348,7 +5362,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: f976029accf54258d01ad907fe19b50ac671eee014cd8aea968c6a0bc7e8f95a + checksum/config: 32e079ffe409d512a54f34fa9312e54a5d4c9125aa9695fda915c83fe77e29b9 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index 62e00bc5f1b..ea06a90c07d 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,15 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5124,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: f976029accf54258d01ad907fe19b50ac671eee014cd8aea968c6a0bc7e8f95a + checksum/config: 32e079ffe409d512a54f34fa9312e54a5d4c9125aa9695fda915c83fe77e29b9 labels: app: antrea component: antrea-agent @@ -5349,7 +5363,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: f976029accf54258d01ad907fe19b50ac671eee014cd8aea968c6a0bc7e8f95a + checksum/config: 32e079ffe409d512a54f34fa9312e54a5d4c9125aa9695fda915c83fe77e29b9 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index 22665a948c9..705df6d5c0e 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,15 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5124,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 5299e6235e262daf606758cf900766470fcb8dd21a0d707a3ae284548bd8c2b2 + checksum/config: 47194398fc7170ce91a41f7d0ee487c0adaf6725adc56aa08b299babc9ff6bc1 labels: app: antrea component: antrea-agent @@ -5346,7 +5360,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 5299e6235e262daf606758cf900766470fcb8dd21a0d707a3ae284548bd8c2b2 + checksum/config: 47194398fc7170ce91a41f7d0ee487c0adaf6725adc56aa08b299babc9ff6bc1 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 0c9afaa9857..544fa36b1f2 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -3820,6 +3820,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4458,6 +4462,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4515,6 +4520,15 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5123,7 +5137,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ba93df141f512a1f8483114b5994444c7231b298e7e9133483ddc1f4210ec395 + checksum/config: 7458c788aadb791e4450d83323a480c10aec1151dd41fce875ec3168526dc618 checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -5405,7 +5419,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ba93df141f512a1f8483114b5994444c7231b298e7e9133483ddc1f4210ec395 + checksum/config: 7458c788aadb791e4450d83323a480c10aec1151dd41fce875ec3168526dc618 labels: app: antrea component: antrea-controller diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index b938d9f83bc..6acef97dfa8 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -3807,6 +3807,10 @@ data: # Enable NodeLatencyMonitor to monitor the latency between Nodes. # NodeLatencyMonitor: false + # Allow users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs, Pod IPs and Egress IPs to + # remote BGP peers. + # BGPPolicy: false + # Name of the OpenVSwitch bridge antrea-agent will create and use. # Make sure it doesn't conflict with your existing OpenVSwitch bridges. ovsBridge: "br-int" @@ -4445,6 +4449,7 @@ rules: - apiGroups: - crd.antrea.io resources: + - bgppolicies - externalippools - ippools - trafficcontrols @@ -4502,6 +4507,15 @@ rules: - create - patch - update + - apiGroups: + - "" + resources: + - secrets + resourceNames: + - antrea-bgp-passwords + verbs: + - get + - watch --- # Source: antrea/templates/antctl/clusterrole.yaml kind: ClusterRole @@ -5110,7 +5124,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: aca23e21519e0fc112647f23d3ce6f92a3dea0bc7ebf1c6d7a7eed2dbe80f0a3 + checksum/config: b625d373051b7eb617a1e3e8f762c5b559fecf3569610b6c4494fb6bf3866e8b labels: app: antrea component: antrea-agent @@ -5346,7 +5360,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: aca23e21519e0fc112647f23d3ce6f92a3dea0bc7ebf1c6d7a7eed2dbe80f0a3 + checksum/config: b625d373051b7eb617a1e3e8f762c5b559fecf3569610b6c4494fb6bf3866e8b labels: app: antrea component: antrea-controller diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 4f7099b6ae0..57e8b322748 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -38,6 +38,7 @@ import ( "antrea.io/antrea/pkg/agent/cniserver" "antrea.io/antrea/pkg/agent/cniserver/ipam" "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/controller/bgp" "antrea.io/antrea/pkg/agent/controller/egress" "antrea.io/antrea/pkg/agent/controller/ipseccertificate" "antrea.io/antrea/pkg/agent/controller/l7flowexporter" @@ -743,6 +744,23 @@ func run(o *Options) error { } } + if features.DefaultFeatureGate.Enabled(features.BGPPolicy) { + bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies() + bgpController, err := bgp.NewBGPPolicyController(ctx, + nodeInformer, + serviceInformer, + egressInformer, + bgpPolicyInformer, + endpointSliceInformer, + k8sClient, + nodeConfig, + networkConfig) + if err != nil { + return err + } + go bgpController.Run(stopCh) + } + if features.DefaultFeatureGate.Enabled(features.TrafficControl) { tcController := trafficcontrol.NewTrafficControlController(ofClient, ifaceStore, diff --git a/docs/feature-gates.md b/docs/feature-gates.md index 287d9dacc58..ce1bbba26c8 100644 --- a/docs/feature-gates.md +++ b/docs/feature-gates.md @@ -59,6 +59,7 @@ edit the Agent configuration in the | `EgressSeparateSubnet` | Agent | `false` | Alpha | v1.15 | N/A | N/A | No | | | `NodeNetworkPolicy` | Agent | `false` | Alpha | v1.15 | N/A | N/A | Yes | | | `L7FlowExporter` | Agent | `false` | Alpha | v1.15 | N/A | N/A | Yes | | +| `BGPPolicy` | Agent | `false` | Alpha | v2.1 | N/A | N/A | No | | ## Description and Requirements of Features @@ -438,3 +439,9 @@ Refer to this [document](network-flow-visibility.md#l7-visibility) for more info #### Requirements for this Feature - Linux Nodes only. + +### BGPPolicy + +`BGPPolicy` allows users to initiate BGP process on selected Kubernetes Nodes and advertise Service IPs (e.g., +ClusterIPs, ExternalIPs, LoadBalancerIPs), Pod IPs and Egress IPs to remote BGP peers, providing a flexible mechanism +for integrating Kubernetes clusters with external BGP-enabled networks. diff --git a/pkg/agent/controller/bgp/controller.go b/pkg/agent/controller/bgp/controller.go new file mode 100644 index 00000000000..008cd8bc02c --- /dev/null +++ b/pkg/agent/controller/bgp/controller.go @@ -0,0 +1,954 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bgp + +import ( + "context" + "fmt" + "reflect" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + discoveryinformers "k8s.io/client-go/informers/discovery/v1" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + discoverylisters "k8s.io/client-go/listers/discovery/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/utils/net" + "k8s.io/utils/strings/slices" + + "antrea.io/antrea/pkg/agent/bgp" + "antrea.io/antrea/pkg/agent/bgp/gobgp" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + "antrea.io/antrea/pkg/apis/crd/v1beta1" + crdinformersv1a1 "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" + crdinformersv1b1 "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1beta1" + crdlistersv1a1 "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" + crdlistersv1b1 "antrea.io/antrea/pkg/client/listers/crd/v1beta1" + "antrea.io/antrea/pkg/features" + "antrea.io/antrea/pkg/util/env" + utilipset "antrea.io/antrea/pkg/util/sets" +) + +const ( + controllerName = "BGPPolicyController" + // How long to wait before retrying the processing of a BGPPolicy change. + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second + // Disable resyncing. + resyncPeriod time.Duration = 0 +) + +const defaultBGPListenPort int32 = 179 + +const ( + ipv4Suffix = "/32" + ipv6Suffix = "/128" +) + +const key = "dummyKey" + +type bgpPolicyState struct { + // The name of BGPPolicy takes effect. + bgpPolicy string + // The local BGP server instance. + bgpServer bgp.Interface + // The port on which the local BGP server listens. + listenPort int32 + // The AS number used by the local BGP server. + localASN int32 + // The router ID used by the local BGP server. + routerID string + // routes stores all BGP routers advertised to BGP peers. + routes sets.Set[bgp.Route] + // peerConfigs is a map that stores configurations of BGP peers. The map keys are the concatenated strings of BGP + // peer IP address and ASN (e.g., "192.168.77.100-65000", "2001::1-65000"). + peerConfigs map[string]bgp.PeerConfig +} + +type Controller struct { + ctx context.Context + + nodeInformer cache.SharedIndexInformer + nodeLister corelisters.NodeLister + nodeListerSynced cache.InformerSynced + + serviceInformer cache.SharedIndexInformer + serviceLister corelisters.ServiceLister + serviceListerSynced cache.InformerSynced + + egressInformer cache.SharedIndexInformer + egressLister crdlistersv1b1.EgressLister + egressListerSynced cache.InformerSynced + + bgpPolicyInformer cache.SharedIndexInformer + bgpPolicyLister crdlistersv1a1.BGPPolicyLister + bgpPolicyListerSynced cache.InformerSynced + + endpointSliceInformer cache.SharedIndexInformer + endpointSliceLister discoverylisters.EndpointSliceLister + endpointSliceListerSynced cache.InformerSynced + + bgpPolicyState *bgpPolicyState + + k8sClient kubernetes.Interface + bgpPeerPasswords map[string]string + bgpPeerPasswordsMutex sync.RWMutex + + nodeName string + enabledIPv4 bool + enabledIPv6 bool + podIPv4CIDR string + podIPv6CIDR string + nodeIPv4Addr string + + egressEnabled bool + + newBGPServerFn func(globalConfig *bgp.GlobalConfig) bgp.Interface + + queue workqueue.RateLimitingInterface +} + +func NewBGPPolicyController(ctx context.Context, + nodeInformer coreinformers.NodeInformer, + serviceInformer coreinformers.ServiceInformer, + egressInformer crdinformersv1b1.EgressInformer, + bgpPolicyInformer crdinformersv1a1.BGPPolicyInformer, + endpointSliceInformer discoveryinformers.EndpointSliceInformer, + k8sClient kubernetes.Interface, + nodeConfig *config.NodeConfig, + networkConfig *config.NetworkConfig) (*Controller, error) { + c := &Controller{ + ctx: ctx, + nodeInformer: nodeInformer.Informer(), + nodeLister: nodeInformer.Lister(), + nodeListerSynced: nodeInformer.Informer().HasSynced, + serviceInformer: serviceInformer.Informer(), + serviceLister: serviceInformer.Lister(), + serviceListerSynced: serviceInformer.Informer().HasSynced, + egressInformer: egressInformer.Informer(), + egressLister: egressInformer.Lister(), + egressListerSynced: egressInformer.Informer().HasSynced, + bgpPolicyInformer: bgpPolicyInformer.Informer(), + bgpPolicyLister: bgpPolicyInformer.Lister(), + bgpPolicyListerSynced: bgpPolicyInformer.Informer().HasSynced, + endpointSliceInformer: endpointSliceInformer.Informer(), + endpointSliceLister: endpointSliceInformer.Lister(), + endpointSliceListerSynced: endpointSliceInformer.Informer().HasSynced, + k8sClient: k8sClient, + bgpPeerPasswords: make(map[string]string), + nodeName: nodeConfig.Name, + enabledIPv4: networkConfig.IPv4Enabled, + enabledIPv6: networkConfig.IPv6Enabled, + podIPv4CIDR: nodeConfig.PodIPv4CIDR.String(), + podIPv6CIDR: nodeConfig.PodIPv6CIDR.String(), + nodeIPv4Addr: nodeConfig.NodeIPv4Addr.IP.String(), + egressEnabled: features.DefaultFeatureGate.Enabled(features.Egress), + newBGPServerFn: func(globalConfig *bgp.GlobalConfig) bgp.Interface { + return gobgp.NewGoBGPServer(globalConfig) + }, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "bgpPolicyGroup"), + } + c.bgpPolicyInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addBGPPolicy, + UpdateFunc: c.updateBGPPolicy, + DeleteFunc: c.deleteBGPPolicy, + }, + resyncPeriod, + ) + c.serviceInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addService, + UpdateFunc: c.updateService, + DeleteFunc: c.deleteService, + }, + resyncPeriod, + ) + c.endpointSliceInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addEndpointSlice, + UpdateFunc: c.updateEndpointSlice, + DeleteFunc: nil, + }, + resyncPeriod, + ) + if c.egressEnabled { + c.egressInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: c.addEgress, + UpdateFunc: c.updateEgress, + DeleteFunc: c.deleteEgress, + }, + resyncPeriod, + ) + } + c.nodeInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: nil, + UpdateFunc: c.updateNode, + DeleteFunc: nil, + }, + resyncPeriod, + ) + return c, nil +} + +// watchSecretChanges uses watch API directly to watch for the changes of the specific Secret storing passwords of BGP +// peers. +func (c *Controller) watchSecretChanges(endCh <-chan struct{}) error { + ns := env.GetAntreaNamespace() + watcher, err := c.k8sClient.CoreV1().Secrets(ns).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{ + Namespace: ns, + Name: types.BGPPolicySecretName, + })) + if err != nil { + return fmt.Errorf("failed to create Secret watcher: %v", err) + } + + ch := watcher.ResultChan() + defer watcher.Stop() + klog.InfoS("Starting watching Secret changes", "Secret", fmt.Sprintf("%s/%s", ns, types.BGPPolicySecretName)) + for { + select { + case event, ok := <-ch: + if !ok { + return nil + } + // Update BGP peer passwords. + klog.InfoS("Processing Secret event", "Secret", fmt.Sprintf("%s/%s", ns, types.BGPPolicySecretName)) + func() { + c.bgpPeerPasswordsMutex.Lock() + defer c.bgpPeerPasswordsMutex.Unlock() + + secretObj := event.Object.(*corev1.Secret) + c.bgpPeerPasswords = make(map[string]string) + for key, data := range secretObj.Data { + c.bgpPeerPasswords[key] = string(data) + } + }() + c.queue.Add(key) + case <-endCh: + return nil + } + } +} + +func (c *Controller) Run(stopCh <-chan struct{}) { + defer c.queue.ShutDown() + + klog.InfoS("Starting", "controllerName", controllerName) + defer klog.InfoS("Shutting down", "controllerName", controllerName) + + if !cache.WaitForNamedCacheSync(controllerName, + stopCh, + c.nodeListerSynced, + c.serviceListerSynced, + c.egressListerSynced, + c.bgpPolicyListerSynced, + c.endpointSliceListerSynced) { + return + } + + go wait.NonSlidingUntil(func() { + if err := c.watchSecretChanges(stopCh); err != nil { + klog.ErrorS(err, "Watch Secret error", "secret", types.BGPPolicySecretName) + } + }, time.Second*10, stopCh) + + go wait.Until(c.worker, time.Second, stopCh) + + <-stopCh +} + +func (c *Controller) worker() { + for c.processNextWorkItem() { + } +} + +func (c *Controller) processNextWorkItem() bool { + defer c.queue.Done(key) + + if err := c.syncBGPPolicy(); err == nil { + // If no error occurs we Forget this item, so it does not get queued again until another change happens. + c.queue.Forget(key) + } else { + // Put the item back on the work queue to handle any transient errors. + c.queue.AddRateLimited(key) + klog.ErrorS(err, "Syncing BGPPolicy failed, requeue") + } + return true +} + +// filterBGPPolicies filters the BGPPolicies applied to the current Node. +func (c *Controller) filterBGPPolicies() (map[string]*v1alpha1.BGPPolicy, error) { + allBPs, err := c.bgpPolicyLister.List(labels.Everything()) + if err != nil { + return nil, err + } + + bpMap := make(map[string]*v1alpha1.BGPPolicy) + for _, bp := range allBPs { + if c.matchedCurrentNode(bp) { + bpMap[bp.GetName()] = bp + } + } + return bpMap, nil +} + +func (c *Controller) syncBGPPolicy() error { + startTime := time.Now() + defer func() { + klog.V(2).InfoS("Finished syncing BGPPolicy", "durationTime", time.Since(startTime)) + }() + + // Get all available BGPPolicies applied to the current Node. + allBGPPolicies, err := c.filterBGPPolicies() + if err != nil { + return err + } + + allBGPPolicyNames := sets.KeySet(allBGPPolicies) + preState := c.bgpPolicyState + var curState *bgpPolicyState + var effectiveBGPPolicyName string + var needUpdateBGPServer bool + + if preState == nil { + if allBGPPolicyNames.Len() == 0 { + return nil + } + // If there is no effective BGPPolicy in the last sync, select a random available BGPPolicy as the effective one. + effectiveBGPPolicyName, _ = allBGPPolicyNames.PopAny() + curState = &bgpPolicyState{bgpPolicy: effectiveBGPPolicyName} + needUpdateBGPServer = true + } else { + // If there is an effective BGPPolicy in the last sync, check if it is still included in the available BGPPolicies. + if !allBGPPolicyNames.Has(preState.bgpPolicy) { + // If there is no available BGPPolicy, stop the previous BGP server, clean up the state and return. + if allBGPPolicyNames.Len() == 0 { + if err := preState.bgpServer.Stop(c.ctx); err != nil { + return err + } + c.bgpPolicyState = nil + return nil + } + // Select a new effective BGPPolicy from the available ones. + effectiveBGPPolicyName, _ = allBGPPolicyNames.PopAny() + curState = &bgpPolicyState{bgpPolicy: effectiveBGPPolicyName} + needUpdateBGPServer = true + } else { + // Retain the effective BGPPolicy in the last sync. + effectiveBGPPolicyName = preState.bgpPolicy + curState = &bgpPolicyState{bgpPolicy: effectiveBGPPolicyName} + } + } + bgpPolicy := allBGPPolicies[effectiveBGPPolicyName] + + klog.V(2).InfoS("Syncing BGPPolicy", "BGPPolicy", effectiveBGPPolicyName) + // Retrieve the listen port, local AS number and router ID from the effective BGPPolicy, and update them to the + // current state. + curState.routerID, err = c.getRouterID() + if err != nil { + return err + } + if bgpPolicy.Spec.ListenPort != nil { + curState.listenPort = *bgpPolicy.Spec.ListenPort + } else { + curState.listenPort = defaultBGPListenPort + } + curState.localASN = bgpPolicy.Spec.LocalASN + + // If any of the listen port, local AS number, or router ID have changed, mark the BGP server for an update. + if preState != nil { + needUpdateBGPServer = needUpdateBGPServer || + preState.listenPort != curState.listenPort || + preState.localASN != curState.localASN || + preState.routerID != curState.routerID + } + + if needUpdateBGPServer { + // Stop the stale BGP server. + if preState != nil { + if err := preState.bgpServer.Stop(c.ctx); err != nil { + klog.ErrorS(err, "failed to stop stale BGP server") + } + } + // Start the new BGP server. + globalConfig := &bgp.GlobalConfig{ + ASN: uint32(curState.localASN), + RouterID: curState.routerID, + ListenPort: curState.listenPort, + } + bgpServer := c.newBGPServerFn(globalConfig) + if err := bgpServer.Start(c.ctx); err != nil { + return err + } + // Update the current effective BGPPolicy state. + curState.bgpServer = bgpServer + } else { + curState.bgpServer = preState.bgpServer + } + + // Reconcile BGP peers. + curPeerConfigs, err := c.getPeerConfigs(bgpPolicy.Spec.BGPPeers) + if err != nil { + return err + } + prePeerConfigs := make(map[string]bgp.PeerConfig) + if preState != nil { + prePeerConfigs = preState.peerConfigs + } + if err := c.reconcileBGPPeers(curPeerConfigs, prePeerConfigs, curState.bgpServer, needUpdateBGPServer); err != nil { + return err + } + + // Reconcile BGP routes generated from advertisements. + curRoutes, err := c.getRoutes(bgpPolicy.Spec.Advertisements) + if err != nil { + return err + } + preRoutes := sets.Set[bgp.Route]{} + if preState != nil { + preRoutes = preState.routes + } + if err := c.reconcileRoutes(curRoutes, preRoutes, curState.bgpServer, needUpdateBGPServer); err != nil { + return err + } + + // Update the current effective BGPPolicy state. + curState.routes = curRoutes + curState.peerConfigs = curPeerConfigs + c.bgpPolicyState = curState + + return nil +} + +func getPeerConfigsForKeys(peerKeys sets.Set[string], allPeerConfigs map[string]bgp.PeerConfig) []bgp.PeerConfig { + peerConfigs := make([]bgp.PeerConfig, 0, len(peerKeys)) + for peer := range peerKeys { + peerConfigs = append(peerConfigs, allPeerConfigs[peer]) + } + return peerConfigs +} + +func (c *Controller) reconcileBGPPeers(curPeerConfigs, prePeerConfigs map[string]bgp.PeerConfig, bgpServer bgp.Interface, bgpServerUpdated bool) error { + prePeerKeys := sets.KeySet(prePeerConfigs) + curPeerKeys := sets.KeySet(curPeerConfigs) + + var peerToAddKeys sets.Set[string] + if !bgpServerUpdated { + peerToAddKeys = curPeerKeys.Difference(prePeerKeys) + } else { + peerToAddKeys = curPeerKeys + } + peerConfigsToAdd := getPeerConfigsForKeys(peerToAddKeys, curPeerConfigs) + for _, peer := range peerConfigsToAdd { + if err := bgpServer.AddPeer(c.ctx, peer); err != nil { + return err + } + } + + if !bgpServerUpdated { + peerToUpdateKeys := sets.New[string]() + remainPeerKeys := prePeerKeys.Intersection(curPeerKeys) + for peerKey := range remainPeerKeys { + prevPeerConfig := prePeerConfigs[peerKey] + curPeerConfig := curPeerConfigs[peerKey] + if !reflect.DeepEqual(prevPeerConfig, curPeerConfig) { + peerToUpdateKeys.Insert(peerKey) + } + } + peerToUpdateConfigs := getPeerConfigsForKeys(peerToUpdateKeys, curPeerConfigs) + for _, peer := range peerToUpdateConfigs { + if err := bgpServer.UpdatePeer(c.ctx, peer); err != nil { + return err + } + } + + peerToDeleteKeys := prePeerKeys.Difference(curPeerKeys) + peerToDeleteConfigs := getPeerConfigsForKeys(peerToDeleteKeys, prePeerConfigs) + for _, peer := range peerToDeleteConfigs { + if err := bgpServer.RemovePeer(c.ctx, peer); err != nil { + return err + } + } + } + return nil +} + +func (c *Controller) reconcileRoutes(curRoutes, preRoutes sets.Set[bgp.Route], bgpServer bgp.Interface, bgpServerUpdated bool) error { + var routesToAdvertise sets.Set[bgp.Route] + if !bgpServerUpdated { + routesToAdvertise = curRoutes.Difference(preRoutes) + } else { + routesToAdvertise = curRoutes + } + if routesToAdvertise.Len() != 0 { + if err := bgpServer.AdvertiseRoutes(c.ctx, routesToAdvertise.UnsortedList()); err != nil { + return err + } + } + + if !bgpServerUpdated { + routesToWithdraw := preRoutes.Difference(curRoutes) + if routesToWithdraw.Len() != 0 { + if err := bgpServer.WithdrawRoutes(c.ctx, routesToWithdraw.UnsortedList()); err != nil { + return err + } + } + } + + return nil +} + +func (c *Controller) getRouterID() (string, error) { + var routerID string + // For IPv6 only environment, the BGP routerID should be specified by K8s Node annotation `antrea.io/bgp-route-id`. + if !c.enabledIPv4 && c.enabledIPv6 { + nodeObj, _ := c.nodeLister.Get(c.nodeName) + var exists bool + if routerID, exists = nodeObj.GetAnnotations()[types.NodeBGPPolicyRouterIDAnnotationKey]; !exists { + return "", fmt.Errorf("BGP routerID should be assigned by annotation manually when IPv6 is only enabled") + } + if !net.IsIPv4String(routerID) { + return "", fmt.Errorf("BGP routerID should be an IPv4 address") + } + } else { + routerID = c.nodeIPv4Addr + } + return routerID, nil +} + +func (c *Controller) getRoutes(advertisements v1alpha1.Advertisements) (sets.Set[bgp.Route], error) { + allRoutes := sets.New[bgp.Route]() + + if advertisements.Service != nil { + if err := c.addServiceRoutes(advertisements.Service, allRoutes); err != nil { + return nil, err + } + } + if c.egressEnabled && advertisements.Egress != nil { + if err := c.addEgressRoutes(allRoutes); err != nil { + return nil, err + } + } + if advertisements.Pod != nil { + c.addPodRoutes(allRoutes) + } + + return allRoutes, nil +} + +func serviceIPTypesToAdvertise(serviceIPTypes []v1alpha1.ServiceIPType) sets.Set[v1alpha1.ServiceIPType] { + ipTypeMap := sets.New[v1alpha1.ServiceIPType]() + for _, ipType := range serviceIPTypes { + ipTypeMap.Insert(ipType) + } + return ipTypeMap +} + +func (c *Controller) addServiceRoutes(advertisement *v1alpha1.ServiceAdvertisement, allRoutes sets.Set[bgp.Route]) error { + ipTypeMap := serviceIPTypesToAdvertise(advertisement.IPTypes) + + services, err := c.serviceLister.List(labels.Everything()) + if err != nil { + return err + } + + var serviceIPs []string + for _, svc := range services { + internalLocal := svc.Spec.InternalTrafficPolicy != nil && *svc.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyLocal + externalLocal := svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyLocal + var hasLocalEndpoints bool + if internalLocal || externalLocal { + var err error + hasLocalEndpoints, err = c.hasLocalEndpoints(svc) + if err != nil { + return err + } + } + if ipTypeMap.Has(v1alpha1.ServiceIPTypeClusterIP) { + if internalLocal && hasLocalEndpoints || !internalLocal { + for _, clusterIP := range svc.Spec.ClusterIPs { + serviceIPs = append(serviceIPs, clusterIP) + } + } + } + if ipTypeMap.Has(v1alpha1.ServiceIPTypeExternalIP) { + if externalLocal && hasLocalEndpoints || !externalLocal { + for _, externalIP := range svc.Spec.ExternalIPs { + serviceIPs = append(serviceIPs, externalIP) + } + } + } + if ipTypeMap.Has(v1alpha1.ServiceIPTypeLoadBalancerIP) && svc.Spec.Type == corev1.ServiceTypeLoadBalancer { + if externalLocal && hasLocalEndpoints || !externalLocal { + for _, ingressIP := range svc.Status.LoadBalancer.Ingress { + if ingressIP.IP != "" { + serviceIPs = append(serviceIPs, ingressIP.IP) + } + } + } + } + } + + for _, ip := range serviceIPs { + if c.enabledIPv4 && net.IsIPv4String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv4Suffix}) + } + if c.enabledIPv6 && net.IsIPv6String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv6Suffix}) + } + } + + return nil +} + +func (c *Controller) addEgressRoutes(allRoutes sets.Set[bgp.Route]) error { + egresses, err := c.egressLister.List(labels.Everything()) + if err != nil { + return err + } + + for _, eg := range egresses { + if eg.Status.EgressNode != c.nodeName { + continue + } + ip := eg.Status.EgressIP + if c.enabledIPv4 && net.IsIPv4String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv4Suffix}) + } + if c.enabledIPv6 && net.IsIPv6String(ip) { + allRoutes.Insert(bgp.Route{Prefix: ip + ipv6Suffix}) + } + } + + return nil +} + +func (c *Controller) addPodRoutes(allRoutes sets.Set[bgp.Route]) { + if c.enabledIPv4 { + allRoutes.Insert(bgp.Route{Prefix: c.podIPv4CIDR}) + } + if c.enabledIPv6 { + allRoutes.Insert(bgp.Route{Prefix: c.podIPv6CIDR}) + } +} + +func (c *Controller) hasLocalEndpoints(svc *corev1.Service) (bool, error) { + labelSelector := labels.Set{discovery.LabelServiceName: svc.GetName()}.AsSelector() + items, err := c.endpointSliceLister.EndpointSlices(svc.GetNamespace()).List(labelSelector) + if err != nil { + return false, err + } + for _, eps := range items { + for _, ep := range eps.Endpoints { + if ep.NodeName != nil && *ep.NodeName == c.nodeName { + return true, nil + } + } + } + + return false, nil +} + +func (c *Controller) generateBGPPeerConfig(peer *v1alpha1.BGPPeer) bgp.PeerConfig { + bgpPeerConfig := bgp.PeerConfig{ + BGPPeer: peer, + } + bgpPeerKey := generateBGPPeerKey(peer.Address, peer.ASN) + c.bgpPeerPasswordsMutex.RLock() + defer c.bgpPeerPasswordsMutex.RUnlock() + if password, exists := c.bgpPeerPasswords[bgpPeerKey]; exists { + bgpPeerConfig.Password = password + } + return bgpPeerConfig +} + +func (c *Controller) getPeerConfigs(allPeers []v1alpha1.BGPPeer) (map[string]bgp.PeerConfig, error) { + peerConfigs := make(map[string]bgp.PeerConfig) + for i := range allPeers { + if c.enabledIPv4 && net.IsIPv4String(allPeers[i].Address) || + c.enabledIPv6 && net.IsIPv6String(allPeers[i].Address) { + peerKey := generateBGPPeerKey(allPeers[i].Address, allPeers[i].ASN) + peerConfigs[peerKey] = c.generateBGPPeerConfig(&allPeers[i]) + } + } + return peerConfigs, nil +} + +func generateBGPPeerKey(address string, asn int32) string { + return fmt.Sprintf("%s-%d", address, asn) +} + +func (c *Controller) addBGPPolicy(obj interface{}) { + bp := obj.(*v1alpha1.BGPPolicy) + if !c.matchedCurrentNode(bp) { + return + } + klog.V(2).InfoS("Processing BGPPolicy ADD event", "BGPPolicy", klog.KObj(bp)) + c.queue.Add(key) +} + +func (c *Controller) updateBGPPolicy(oldObj, obj interface{}) { + oldBP := oldObj.(*v1alpha1.BGPPolicy) + bp := obj.(*v1alpha1.BGPPolicy) + if !c.matchedCurrentNode(bp) && !c.matchedCurrentNode(oldBP) { + return + } + if bp.GetGeneration() != oldBP.GetGeneration() { + klog.V(2).InfoS("Processing BGPPolicy UPDATE event", "BGPPolicy", klog.KObj(bp)) + c.queue.Add(key) + } +} + +func (c *Controller) deleteBGPPolicy(obj interface{}) { + bp := obj.(*v1alpha1.BGPPolicy) + if !c.matchedCurrentNode(bp) { + return + } + klog.V(2).InfoS("Processing BGPPolicy DELETE event", "BGPPolicy", klog.KObj(bp)) + c.queue.Add(key) +} + +func getIngressIPs(svc *corev1.Service) []string { + var ips []string + for _, ingress := range svc.Status.LoadBalancer.Ingress { + if ingress.IP != "" { + ips = append(ips, ingress.IP) + } + } + return ips +} + +func (c *Controller) matchedCurrentNode(bp *v1alpha1.BGPPolicy) bool { + nodeSelector, _ := metav1.LabelSelectorAsSelector(&bp.Spec.NodeSelector) + node, _ := c.nodeLister.Get(c.nodeName) + return nodeSelector.Matches(labels.Set(node.GetLabels())) +} + +func (c *Controller) matchedNode(node *corev1.Node, bp *v1alpha1.BGPPolicy) bool { + nodeSel, _ := metav1.LabelSelectorAsSelector(&bp.Spec.NodeSelector) + if !nodeSel.Matches(labels.Set(node.Labels)) { + return false + } + return true +} + +func (c *Controller) filterAffectedBPsByService(svc *corev1.Service) sets.Set[string] { + affectedBPs := sets.New[string]() + allBPs, _ := c.bgpPolicyLister.List(labels.Everything()) + for _, bp := range allBPs { + if bp.Spec.Advertisements.Service == nil { + continue + } + ipTypeMap := serviceIPTypesToAdvertise(bp.Spec.Advertisements.Service.IPTypes) + + if ipTypeMap.Has(v1alpha1.ServiceIPTypeClusterIP) && len(svc.Spec.ClusterIPs) != 0 || + ipTypeMap.Has(v1alpha1.ServiceIPTypeExternalIP) && len(svc.Spec.ExternalIPs) != 0 || + ipTypeMap.Has(v1alpha1.ServiceIPTypeLoadBalancerIP) && len(getIngressIPs(svc)) != 0 { + if c.matchedCurrentNode(bp) { + affectedBPs.Insert(bp.GetName()) + } + } + } + return affectedBPs +} + +func (c *Controller) hasAffectedBPsByService(svc *corev1.Service) bool { + allBPs, _ := c.bgpPolicyLister.List(labels.Everything()) + for _, bp := range allBPs { + if !c.matchedCurrentNode(bp) || bp.Spec.Advertisements.Service == nil { + continue + } + ipTypeMap := serviceIPTypesToAdvertise(bp.Spec.Advertisements.Service.IPTypes) + if ipTypeMap.Has(v1alpha1.ServiceIPTypeClusterIP) && len(svc.Spec.ClusterIPs) != 0 || + ipTypeMap.Has(v1alpha1.ServiceIPTypeExternalIP) && len(svc.Spec.ExternalIPs) != 0 || + ipTypeMap.Has(v1alpha1.ServiceIPTypeLoadBalancerIP) && len(getIngressIPs(svc)) != 0 { + return true + } + } + return false +} + +func (c *Controller) addService(obj interface{}) { + svc := obj.(*corev1.Service) + if c.hasAffectedBPsByService(svc) { + klog.V(2).InfoS("Processing Service ADD event", "Service", klog.KObj(svc)) + c.queue.Add(key) + } +} + +func (c *Controller) updateService(oldObj, obj interface{}) { + oldSvc := oldObj.(*corev1.Service) + svc := obj.(*corev1.Service) + + if slices.Equal(oldSvc.Spec.ClusterIPs, svc.Spec.ClusterIPs) && + slices.Equal(oldSvc.Spec.ExternalIPs, svc.Spec.ExternalIPs) && + slices.Equal(getIngressIPs(oldSvc), getIngressIPs(svc)) && + oldSvc.Spec.ExternalTrafficPolicy == svc.Spec.ExternalTrafficPolicy && + reflect.DeepEqual(oldSvc.Spec.InternalTrafficPolicy, svc.Spec.InternalTrafficPolicy) { + return + } + oldAffectedBPs := c.filterAffectedBPsByService(oldSvc) + newAffectedBPs := c.filterAffectedBPsByService(svc) + if len(utilipset.MergeString(oldAffectedBPs, newAffectedBPs)) != 0 { + klog.V(2).InfoS("Processing Service UPDATE event", "Service", klog.KObj(svc)) + c.queue.Add(key) + } +} + +func (c *Controller) addEndpointSlice(obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + svc, _ := c.serviceLister.Services(eps.GetNamespace()).Get(eps.GetLabels()[discovery.LabelServiceName]) + if svc == nil { + return + } + if svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeCluster && + (svc.Spec.InternalTrafficPolicy == nil || *svc.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyCluster) { + return + } + if c.hasAffectedBPsByService(svc) { + klog.V(2).InfoS("Processing EndpointSlice ADD event", "EndpointSlice", klog.KObj(eps)) + c.queue.Add(key) + } +} + +func (c *Controller) updateEndpointSlice(oldObj, obj interface{}) { + eps := obj.(*discovery.EndpointSlice) + svc, _ := c.serviceLister.Services(eps.GetNamespace()).Get(eps.GetLabels()[discovery.LabelServiceName]) + if svc == nil { + return + } + if svc.Spec.ExternalTrafficPolicy == corev1.ServiceExternalTrafficPolicyTypeCluster && + (svc.Spec.InternalTrafficPolicy == nil || *svc.Spec.InternalTrafficPolicy == corev1.ServiceInternalTrafficPolicyCluster) { + return + } + affectedBPs := c.filterAffectedBPsByService(svc) + if len(affectedBPs) != 0 { + klog.V(2).InfoS("Processing EndpointSlice UPDATE event", "EndpointSlice", klog.KObj(eps)) + c.queue.Add(key) + } +} + +func (c *Controller) deleteService(obj interface{}) { + svc := obj.(*corev1.Service) + affectedBPs := c.filterAffectedBPsByService(svc) + if len(affectedBPs) != 0 { + klog.V(2).InfoS("Processing Service DELETE event", "Service", klog.KObj(svc)) + c.queue.Add(key) + } +} + +func (c *Controller) hasAffectedBPsByEgress() bool { + allBPs, _ := c.bgpPolicyLister.List(labels.Everything()) + for _, bp := range allBPs { + if !c.matchedCurrentNode(bp) { + continue + } + if bp.Spec.Advertisements.Egress != nil { + return true + } + } + return false +} + +func (c *Controller) addEgress(obj interface{}) { + if !c.egressEnabled { + return + } + eg := obj.(*v1beta1.Egress) + if eg.Status.EgressNode != c.nodeName { + return + } + if c.hasAffectedBPsByEgress() { + klog.V(2).InfoS("Processing Egress ADD event", "Egress", klog.KObj(eg)) + c.queue.Add(key) + } +} + +func (c *Controller) updateEgress(oldObj, obj interface{}) { + if !c.egressEnabled { + return + } + oldEg := oldObj.(*v1beta1.Egress) + eg := obj.(*v1beta1.Egress) + if oldEg.Status.EgressNode != c.nodeName && eg.Status.EgressNode != c.nodeName { + return + } + if oldEg.Status.EgressIP == eg.Status.EgressIP { + return + } + if c.hasAffectedBPsByEgress() { + klog.V(2).InfoS("Processing Egress UPDATE event", "Egress", klog.KObj(eg)) + c.queue.Add(key) + } +} + +func (c *Controller) deleteEgress(obj interface{}) { + if !c.egressEnabled { + return + } + eg := obj.(*v1beta1.Egress) + if eg.Status.EgressNode != c.nodeName { + return + } + if c.hasAffectedBPsByEgress() { + klog.V(2).InfoS("Processing Egress DELETE event", "Service", klog.KObj(eg)) + c.queue.Add(key) + } +} + +func (c *Controller) filterAffectedBPsByNode(node *corev1.Node) sets.Set[string] { + affectedBPs := sets.New[string]() + allBPs, _ := c.bgpPolicyLister.List(labels.Everything()) + for _, bp := range allBPs { + if c.matchedNode(node, bp) { + affectedBPs.Insert(bp.GetName()) + } + } + return affectedBPs +} + +func (c *Controller) updateNode(oldObj, obj interface{}) { + oldNode := oldObj.(*corev1.Node) + node := obj.(*corev1.Node) + if node.GetName() != c.nodeName { + return + } + if reflect.DeepEqual(node.GetLabels(), oldNode.GetLabels()) && + reflect.DeepEqual(node.GetAnnotations(), oldNode.GetAnnotations()) { + return + } + oldAffectedBPs := c.filterAffectedBPsByNode(oldNode) + newAffectedBPs := c.filterAffectedBPsByNode(node) + affectedBPs := utilipset.SymmetricDifferenceString(oldAffectedBPs, newAffectedBPs) + if len(affectedBPs) != 0 { + klog.V(2).InfoS("Processing Node UPDATE event", "Node", klog.KObj(node)) + c.queue.Add(key) + } +} diff --git a/pkg/agent/controller/bgp/controller_test.go b/pkg/agent/controller/bgp/controller_test.go new file mode 100644 index 00000000000..6dd2f17f3cc --- /dev/null +++ b/pkg/agent/controller/bgp/controller_test.go @@ -0,0 +1,1668 @@ +// Copyright 2024 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package bgp + +import ( + "context" + "fmt" + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + netutils "k8s.io/utils/net" + "k8s.io/utils/ptr" + + "antrea.io/antrea/pkg/agent/bgp" + bgptest "antrea.io/antrea/pkg/agent/bgp/testing" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/types" + "antrea.io/antrea/pkg/apis/crd/v1alpha1" + crdv1b1 "antrea.io/antrea/pkg/apis/crd/v1beta1" + fakeversioned "antrea.io/antrea/pkg/client/clientset/versioned/fake" + crdinformers "antrea.io/antrea/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/util/ip" +) + +var ( + podIPv4CIDR = ip.MustParseCIDR("10.10.0.0/24") + podIPv6CIDR = ip.MustParseCIDR("fec0:10:10::/64") + nodeIPv4Addr = ip.MustParseCIDR("192.168.77.100/24") + + testNodeConfig = &config.NodeConfig{ + PodIPv4CIDR: podIPv4CIDR, + PodIPv6CIDR: podIPv6CIDR, + NodeIPv4Addr: nodeIPv4Addr, + Name: localNodeName, + } + + peer1ASN = int32(65531) + peer1AuthPassword = "bgp-peer1" // #nosec G101 + ipv4Peer1Addr = "192.168.77.251" + ipv6Peer1Addr = "fec0::196:168:77:251" + ipv4Peer1 = generateBGPPeer(ipv4Peer1Addr, peer1ASN, 179, 120) + ipv6Peer1 = generateBGPPeer(ipv6Peer1Addr, peer1ASN, 179, 120) + ipv4Peer1Config = generateBGPPeerConfig(&ipv4Peer1, peer1AuthPassword) + ipv6Peer1Config = generateBGPPeerConfig(&ipv6Peer1, peer1AuthPassword) + + peer2ASN = int32(65532) + peer2AuthPassword = "bgp-peer2" // #nosec G101 + ipv4Peer2Addr = "192.168.77.252" + ipv6Peer2Addr = "fec0::196:168:77:252" + ipv4Peer2 = generateBGPPeer(ipv4Peer2Addr, peer2ASN, 179, 120) + ipv6Peer2 = generateBGPPeer(ipv6Peer2Addr, peer2ASN, 179, 120) + ipv4Peer2Config = generateBGPPeerConfig(&ipv4Peer2, peer2AuthPassword) + ipv6Peer2Config = generateBGPPeerConfig(&ipv6Peer2, peer2AuthPassword) + + updatedIPv4Peer2 = generateBGPPeer(ipv4Peer2Addr, peer2ASN, 179, 60) + updatedIPv6Peer2 = generateBGPPeer(ipv6Peer2Addr, peer2ASN, 179, 60) + updatedIPv4Peer2Config = generateBGPPeerConfig(&updatedIPv4Peer2, peer2AuthPassword) + updatedIPv6Peer2Config = generateBGPPeerConfig(&updatedIPv6Peer2, peer2AuthPassword) + + peer3ASN = int32(65533) + peer3AuthPassword = "bgp-peer3" // #nosec G101 + ipv4Peer3Addr = "192.168.77.253" + ipv6Peer3Addr = "fec0::196:168:77:253" + ipv4Peer3 = generateBGPPeer(ipv4Peer3Addr, peer3ASN, 179, 120) + ipv6Peer3 = generateBGPPeer(ipv6Peer3Addr, peer3ASN, 179, 120) + ipv4Peer3Config = generateBGPPeerConfig(&ipv4Peer3, peer3AuthPassword) + ipv6Peer3Config = generateBGPPeerConfig(&ipv6Peer3, peer3AuthPassword) + + nodeLabels1 = map[string]string{"node": "control-plane"} + nodeLabels2 = map[string]string{"os": "linux"} + nodeLabels3 = map[string]string{"node": "control-plane", "os": "linux"} + nodeAnnotations1 = map[string]string{types.NodeBGPPolicyRouterIDAnnotationKey: "192.168.77.100"} + nodeAnnotations2 = map[string]string{types.NodeBGPPolicyRouterIDAnnotationKey: "10.10.0.100"} + + localNodeName = "local" + node = generateNode(localNodeName, nodeLabels1, nodeAnnotations1) + + ipv4EgressIP1 = "192.168.77.200" + ipv6EgressIP1 = "fec0::192:168:77:200" + ipv4EgressIP2 = "192.168.77.201" + ipv6EgressIP2 = "fec0::192:168:77:2001" + + ipv4Egress1 = generateEgress("eg1-4", ipv4EgressIP1, localNodeName) + ipv6Egress1 = generateEgress("eg1-6", ipv6EgressIP1, localNodeName) + ipv4Egress2 = generateEgress("eg2-4", ipv4EgressIP2, "test-remote-node") + ipv6Egress2 = generateEgress("eg2-6", ipv6EgressIP2, "test-remote-node") + + bgpPolicyName1 = "bp-1" + bgpPolicyName2 = "bp-2" + bgpPolicyName3 = "bp-3" + + clusterIPv4 = "10.96.10.10" + externalIPv4 = "192.168.77.100" + loadBalancerIPv4 = "192.168.77.150" + endpointIPv4 = "10.10.0.10" + clusterIPv6 = "fec0::10:96:10:10" + externalIPv6 = "fec0::192:168:77:100" + loadBalancerIPv6 = "fec0::192:168:77:150" + endpointIPv6 = "fec0::10:10:0:10" + + ipv4ClusterIPName1 = "clusterip-4" + ipv4ClusterIPName2 = "clusterip-4-local" + ipv6ClusterIPName1 = "clusterip-6" + ipv6ClusterIPName2 = "clusterip-6-local" + ipv4LoadBalancerName = "loadbalancer-4" + ipv6LoadBalancerName = "loadbalancer-6" + + ipv4ClusterIP1 = generateService(ipv4ClusterIPName1, corev1.ServiceTypeClusterIP, clusterIPv4, externalIPv4, "", false, false) + ipv4ClusterIP1Eps = generateEndpointSlice(ipv4ClusterIPName1, false, false, endpointIPv4) + ipv4ClusterIP2 = generateService(ipv4ClusterIPName2, corev1.ServiceTypeClusterIP, clusterIPv4, externalIPv4, "", true, true) + ipv4ClusterIP2Eps = generateEndpointSlice(ipv4ClusterIPName2, false, false, endpointIPv4) + + ipv6ClusterIP1 = generateService(ipv6ClusterIPName1, corev1.ServiceTypeClusterIP, clusterIPv6, externalIPv6, "", false, false) + ipv6ClusterIP1Eps = generateEndpointSlice(ipv6ClusterIPName1, false, false, endpointIPv6) + ipv6ClusterIP2 = generateService(ipv6ClusterIPName2, corev1.ServiceTypeClusterIP, clusterIPv6, externalIPv6, "", true, true) + ipv6ClusterIP2Eps = generateEndpointSlice(ipv6ClusterIPName2, false, false, endpointIPv6) + + ipv4LoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, clusterIPv4, externalIPv4, loadBalancerIPv4, false, false) + ipv4LoadBalancerEps = generateEndpointSlice(ipv4LoadBalancerName, false, false, endpointIPv4) + ipv6LoadBalancer = generateService(ipv6LoadBalancerName, corev1.ServiceTypeLoadBalancer, clusterIPv6, externalIPv6, loadBalancerIPv6, false, false) + ipv6LoadBalancerEps = generateEndpointSlice(ipv6LoadBalancerName, false, false, endpointIPv6) + + bgpPeerPasswords = map[string]string{ + generateBGPPeerKey(ipv4Peer1Addr, peer1ASN): peer1AuthPassword, + generateBGPPeerKey(ipv6Peer1Addr, peer1ASN): peer1AuthPassword, + generateBGPPeerKey(ipv4Peer2Addr, peer2ASN): peer2AuthPassword, + generateBGPPeerKey(ipv6Peer2Addr, peer2ASN): peer2AuthPassword, + generateBGPPeerKey(ipv4Peer3Addr, peer3ASN): peer3AuthPassword, + generateBGPPeerKey(ipv6Peer3Addr, peer3ASN): peer3AuthPassword, + } + + ctx = context.Background() +) + +type fakeController struct { + *Controller + mockController *gomock.Controller + mockBGPServer *bgptest.MockInterface + crdClient *fakeversioned.Clientset + crdInformerFactory crdinformers.SharedInformerFactory + client *fake.Clientset + informerFactory informers.SharedInformerFactory +} + +func (c *fakeController) startInformers(stopCh chan struct{}) { + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.crdInformerFactory.Start(stopCh) + c.crdInformerFactory.WaitForCacheSync(stopCh) +} + +func newFakeController(t *testing.T, objects []runtime.Object, crdObjects []runtime.Object, ipv4Enabled, ipv6Enabled bool) *fakeController { + ctrl := gomock.NewController(t) + mockBGPServer := bgptest.NewMockInterface(ctrl) + + client := fake.NewSimpleClientset(objects...) + crdClient := fakeversioned.NewSimpleClientset(crdObjects...) + + crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, 0) + informerFactory := informers.NewSharedInformerFactory(client, 0) + + nodeInformer := informerFactory.Core().V1().Nodes() + serviceInformer := informerFactory.Core().V1().Services() + egressInformer := crdInformerFactory.Crd().V1beta1().Egresses() + endpointSliceInformer := informerFactory.Discovery().V1().EndpointSlices() + bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies() + + bgpController, _ := NewBGPPolicyController(ctx, + nodeInformer, + serviceInformer, + egressInformer, + bgpPolicyInformer, + endpointSliceInformer, + client, + testNodeConfig, + &config.NetworkConfig{ + IPv4Enabled: ipv4Enabled, + IPv6Enabled: ipv6Enabled, + }) + bgpController.egressEnabled = true + bgpController.newBGPServerFn = func(_ *bgp.GlobalConfig) bgp.Interface { + return mockBGPServer + } + + return &fakeController{ + Controller: bgpController, + mockController: ctrl, + mockBGPServer: mockBGPServer, + crdClient: crdClient, + crdInformerFactory: crdInformerFactory, + client: client, + informerFactory: informerFactory, + } +} + +func TestBGPPolicyAdd(t *testing.T) { + testCases := []struct { + name string + ipv4Enabled bool + ipv6Enabled bool + bpToAdd *v1alpha1.BGPPolicy + objects []runtime.Object + crdObjects []runtime.Object + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + }{ + { + name: "IPv4, as effective BGPPolicy, advertise ClusterIP", + ipv4Enabled: true, + bpToAdd: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + true, + false, + true, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1}), + objects: []runtime.Object{ + ipv4ClusterIP1, + ipv4ClusterIP1Eps, + node, + }, + expectedState: generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(ctx) + mockBGPServer.AddPeer(ctx, ipv4Peer1Config) + mockBGPServer.AdvertiseRoutes(ctx, []bgp.Route{{Prefix: ipStrToPrefix(clusterIPv4)}}) + }, + }, + { + name: "IPv6, as effective BGPPolicy, advertise ExternalIP", + ipv6Enabled: true, + bpToAdd: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + false, + true, + true, + true, + false, + []v1alpha1.BGPPeer{ipv6Peer1}), + objects: []runtime.Object{ + ipv6ClusterIP1, + ipv6ClusterIP1Eps, + node, + }, + expectedState: generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + "192.168.77.100", + []string{ipStrToPrefix(externalIPv6)}, + []bgp.PeerConfig{ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(ctx) + mockBGPServer.AddPeer(ctx, ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(ctx, []bgp.Route{{Prefix: ipStrToPrefix(externalIPv6)}}) + }, + }, + { + name: "IPv4 & IPv6, as effective BGPPolicy, advertise LoadBalancerIP", + ipv4Enabled: true, + ipv6Enabled: true, + bpToAdd: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}), + objects: []runtime.Object{ + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + }, + expectedState: generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(loadBalancerIPv4), ipStrToPrefix(loadBalancerIPv6)}, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + routesToAdvertise := []bgp.Route{ + {Prefix: ipStrToPrefix(loadBalancerIPv4)}, + {Prefix: ipStrToPrefix(loadBalancerIPv6)}, + } + mockBGPServer.Start(ctx) + mockBGPServer.AddPeer(ctx, ipv4Peer1Config) + mockBGPServer.AddPeer(ctx, ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(ctx, gomock.InAnyOrder(routesToAdvertise)) + }, + }, + { + name: "IPv4, as effective BGPPolicy, advertise EgressIP", + ipv4Enabled: true, + bpToAdd: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + true, + true, + true, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1}), + objects: []runtime.Object{node}, + crdObjects: []runtime.Object{ + ipv4Egress1, + ipv4Egress2, + }, + expectedState: generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(ipv4EgressIP1)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(ctx) + mockBGPServer.AddPeer(ctx, ipv4Peer1Config) + mockBGPServer.AdvertiseRoutes(ctx, []bgp.Route{{Prefix: ipStrToPrefix(ipv4EgressIP1)}}) + }, + }, + { + name: "IPv6, as effective BGPPolicy, advertise Pod CIDR", + ipv6Enabled: true, + bpToAdd: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + true, + true, + true, + true, + true, + []v1alpha1.BGPPeer{ipv6Peer1}), + objects: []runtime.Object{node}, + expectedState: generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + "192.168.77.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(ctx) + mockBGPServer.AddPeer(ctx, ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(ctx, []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + { + name: "IPv4 & IPv6, as effective BGPPolicy, not advertise any Service IP due to no local Endpoint", + ipv4Enabled: true, + ipv6Enabled: true, + bpToAdd: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 1179, + 65001, + true, + true, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}), + objects: []runtime.Object{ + ipv4ClusterIP2, + ipv4ClusterIP2Eps, + ipv6ClusterIP2, + ipv6ClusterIP2Eps, + node, + }, + expectedState: generateBGPPolicyState(bgpPolicyName1, + 1179, + 65001, + nodeIPv4Addr.IP.String(), + nil, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(ctx) + mockBGPServer.AddPeer(ctx, ipv4Peer1Config) + mockBGPServer.AddPeer(ctx, ipv6Peer1Config) + }, + }, + { + name: "IPv4, as alternative BGPPolicy", + ipv4Enabled: true, + bpToAdd: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + true, + false, + false, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1}), + objects: []runtime.Object{ipv4ClusterIP1, ipv4ClusterIP1Eps, node}, + crdObjects: []runtime.Object{generateBGPPolicy(bgpPolicyName2, + nodeLabels1, + 179, + 65000, + true, + false, + false, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1})}, + existingState: generateBGPPolicyState(bgpPolicyName2, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + expectedState: generateBGPPolicyState(bgpPolicyName2, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4)}, + []bgp.PeerConfig{ipv4Peer1Config}, + ), + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, tt.objects, append(tt.crdObjects, tt.bpToAdd), tt.ipv4Enabled, tt.ipv6Enabled) + + stopCh := make(chan struct{}) + defer close(stopCh) + c.startInformers(stopCh) + + // Ignore the dummy event triggered by BGPPolicy ADD events. + waitEvents(t, 1, c) + + // Fake the BGP state and the passwords of BGP peers. + c.bgpPolicyState = tt.existingState + if c.bgpPolicyState != nil { + c.bgpPolicyState.bgpServer = c.mockBGPServer + } + c.bgpPeerPasswords = bgpPeerPasswords + + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + waitEvents(t, 1, c) + assert.NoError(t, c.syncBGPPolicy()) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + }) + } +} + +func TestBGPPolicyUpdate(t *testing.T) { + effectiveBP := generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }) + effectiveBPState := generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ) + alternativeBP1 := generateBGPPolicy(bgpPolicyName2, + nodeLabels1, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }) + alternativeBP2 := generateBGPPolicy(bgpPolicyName3, + nodeLabels2, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }) + objects := []runtime.Object{ + ipv4ClusterIP2, + ipv4ClusterIP2Eps, + ipv6ClusterIP2, + ipv6ClusterIP2Eps, + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + } + crdObjects := []runtime.Object{ipv4Egress1, + ipv4Egress2, + ipv6Egress1, + ipv6Egress2, + effectiveBP, + alternativeBP1, + alternativeBP2, + } + testCases := []struct { + name string + bpToUpdate *v1alpha1.BGPPolicy + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + }{ + { + name: "Effective BGPPolicy, update NodeSelector (not applied to current Node), an alternative takes effect", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + nodeLabels2, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(ctx) + mockBGPServer.Start(ctx) + mockBGPServer.AddPeer(ctx, ipv4Peer1Config) + mockBGPServer.AddPeer(ctx, ipv4Peer2Config) + mockBGPServer.AddPeer(ctx, ipv6Peer1Config) + mockBGPServer.AddPeer(ctx, ipv6Peer2Config) + routes := []bgp.Route{ + {Prefix: ipStrToPrefix(clusterIPv4)}, + {Prefix: ipStrToPrefix(clusterIPv6)}, + {Prefix: ipStrToPrefix(loadBalancerIPv4)}, + {Prefix: ipStrToPrefix(loadBalancerIPv6)}, + {Prefix: podIPv4CIDR.String()}, + {Prefix: podIPv6CIDR.String()}, + } + mockBGPServer.AdvertiseRoutes(ctx, gomock.InAnyOrder(routes)) + }, + expectedState: generateBGPPolicyState(bgpPolicyName2, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + }, + { + name: "Effective BGPPolicy, update Advertisements", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + false, + true, + false, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedState: generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + ipStrToPrefix(ipv4EgressIP1), + ipStrToPrefix(ipv6EgressIP1), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + routesToAdvertise := []bgp.Route{ + {Prefix: ipStrToPrefix(externalIPv4)}, + {Prefix: ipStrToPrefix(ipv4EgressIP1)}, + {Prefix: ipStrToPrefix(externalIPv6)}, + {Prefix: ipStrToPrefix(ipv6EgressIP1)}, + } + routesToWithdraw := []bgp.Route{ + {Prefix: ipStrToPrefix(clusterIPv4)}, + {Prefix: ipStrToPrefix(loadBalancerIPv4)}, + {Prefix: podIPv4CIDR.String()}, + {Prefix: ipStrToPrefix(clusterIPv6)}, + {Prefix: ipStrToPrefix(loadBalancerIPv6)}, + {Prefix: podIPv6CIDR.String()}, + } + mockBGPServer.AdvertiseRoutes(ctx, gomock.InAnyOrder(routesToAdvertise)) + mockBGPServer.WithdrawRoutes(ctx, gomock.InAnyOrder(routesToWithdraw)) + }, + }, + { + name: "Effective BGPPolicy, update LocalASN and Advertisements", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65001, + false, + true, + false, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedState: generateBGPPolicyState(bgpPolicyName1, + 179, + 65001, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + ipStrToPrefix(ipv4EgressIP1), + ipStrToPrefix(ipv6EgressIP1), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(ctx) + mockBGPServer.Stop(ctx) + mockBGPServer.AddPeer(ctx, ipv4Peer1Config) + mockBGPServer.AddPeer(ctx, ipv4Peer2Config) + mockBGPServer.AddPeer(ctx, ipv6Peer1Config) + mockBGPServer.AddPeer(ctx, ipv6Peer2Config) + routesToAdvertise := []bgp.Route{ + {Prefix: ipStrToPrefix(externalIPv4)}, + {Prefix: ipStrToPrefix(ipv4EgressIP1)}, + {Prefix: ipStrToPrefix(externalIPv6)}, + {Prefix: ipStrToPrefix(ipv6EgressIP1)}, + } + mockBGPServer.AdvertiseRoutes(ctx, gomock.InAnyOrder(routesToAdvertise)) + }, + }, + { + name: "Effective BGPPolicy, update ListenPort", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 1179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + expectedState: generateBGPPolicyState(bgpPolicyName1, + 1179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{ipv4Peer1Config, + ipv6Peer1Config, + ipv4Peer2Config, + ipv6Peer2Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(ctx) + mockBGPServer.Stop(ctx) + mockBGPServer.AddPeer(ctx, ipv4Peer1Config) + mockBGPServer.AddPeer(ctx, ipv4Peer2Config) + mockBGPServer.AddPeer(ctx, ipv6Peer1Config) + mockBGPServer.AddPeer(ctx, ipv6Peer2Config) + routesToAdvertise := []bgp.Route{ + {Prefix: ipStrToPrefix(clusterIPv4)}, + {Prefix: ipStrToPrefix(loadBalancerIPv4)}, + {Prefix: podIPv4CIDR.String()}, + {Prefix: ipStrToPrefix(clusterIPv6)}, + {Prefix: ipStrToPrefix(loadBalancerIPv6)}, + {Prefix: podIPv6CIDR.String()}, + } + mockBGPServer.AdvertiseRoutes(ctx, gomock.InAnyOrder(routesToAdvertise)) + }, + }, + { + name: "Effective BGPPolicy, update BGPPeers", + bpToUpdate: generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{updatedIPv4Peer2, + updatedIPv6Peer2, + ipv4Peer3, + ipv6Peer3}), + expectedState: generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ipStrToPrefix(clusterIPv4), + ipStrToPrefix(clusterIPv6), + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + podIPv4CIDR.String(), + podIPv6CIDR.String(), + }, + []bgp.PeerConfig{updatedIPv4Peer2Config, + updatedIPv6Peer2Config, + ipv4Peer3Config, + ipv6Peer3Config, + }, + ), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.AddPeer(ctx, ipv4Peer3Config) + mockBGPServer.AddPeer(ctx, ipv6Peer3Config) + mockBGPServer.RemovePeer(ctx, ipv4Peer1Config) + mockBGPServer.RemovePeer(ctx, ipv6Peer1Config) + mockBGPServer.UpdatePeer(ctx, updatedIPv4Peer2Config) + mockBGPServer.UpdatePeer(ctx, updatedIPv6Peer2Config) + }, + }, + { + name: "Unrelated BGPPolicy, update NodeSelector (applied to current Node)", + bpToUpdate: generateBGPPolicy(bgpPolicyName3, + nodeLabels1, + 179, + 65000, + true, + false, + true, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, + ipv4Peer2, + ipv6Peer1, + ipv6Peer2, + }), + existingState: effectiveBPState, + expectedState: effectiveBPState, + }, + { + name: "Alternative BGPPolicy, update Advertisements, LocalASN, ListenPort and BGPPeers", + bpToUpdate: generateBGPPolicy(bgpPolicyName2, + nodeLabels1, + 1179, + 65001, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1, + updatedIPv4Peer2, + ipv6Peer1, + updatedIPv6Peer2, + }), + existingState: effectiveBPState, + expectedState: effectiveBPState, + }, + } + + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, objects, crdObjects, true, true) + + stopCh := make(chan struct{}) + defer close(stopCh) + c.startInformers(stopCh) + + waitEvents(t, 1, c) + item, _ := c.queue.Get() + c.queue.Done(item) + + // Fake the BGPPolicy state the passwords of BGP peers. + c.bgpPolicyState = effectiveBPState + c.bgpPolicyState.bgpServer = c.mockBGPServer + c.bgpPeerPasswords = bgpPeerPasswords + + tt.bpToUpdate.Generation += 1 + _, err := c.crdClient.CrdV1alpha1().BGPPolicies().Update(context.TODO(), tt.bpToUpdate, metav1.UpdateOptions{}) + require.NoError(t, err) + waitEvents(t, 1, c) + + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + assert.NoError(t, c.syncBGPPolicy()) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + }) + } +} + +func TestBGPPolicyDelete(t *testing.T) { + bp1 := generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + false, + false, + true, + false, + false, + []v1alpha1.BGPPeer{ + ipv4Peer1, + ipv6Peer1, + }) + bp1State := generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ + ipStrToPrefix(loadBalancerIPv4), + ipStrToPrefix(loadBalancerIPv6), + }, + []bgp.PeerConfig{ + ipv4Peer1Config, + ipv6Peer1Config, + }, + ) + bp2 := generateBGPPolicy(bgpPolicyName2, + nodeLabels1, + 179, + 65000, + false, + true, + false, + false, + false, + []v1alpha1.BGPPeer{ + ipv4Peer2, + ipv6Peer2, + }) + bp2State := generateBGPPolicyState(bgpPolicyName2, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{ + ipStrToPrefix(externalIPv4), + ipStrToPrefix(externalIPv6), + }, + []bgp.PeerConfig{ + ipv4Peer2Config, + ipv6Peer2Config}, + ) + objects := []runtime.Object{ + ipv4LoadBalancer, + ipv4LoadBalancerEps, + ipv6LoadBalancer, + ipv6LoadBalancerEps, + node, + } + testCases := []struct { + name string + bpToDelete string + crdObjects []runtime.Object + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + }{ + { + name: "Delete effective BGPPolicy and there is no alternative one", + bpToDelete: bgpPolicyName1, + crdObjects: []runtime.Object{bp1}, + existingState: bp1State, + expectedState: nil, + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(ctx) + }, + }, + { + name: "Delete effective BGPPolicy and there is an alternative one", + bpToDelete: bgpPolicyName1, + crdObjects: []runtime.Object{bp1, bp2}, + existingState: bp1State, + expectedState: bp2State, + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(ctx) + mockBGPServer.Start(ctx) + mockBGPServer.AddPeer(ctx, ipv4Peer2Config) + mockBGPServer.AddPeer(ctx, ipv6Peer2Config) + routesToAdvertise := []bgp.Route{ + {Prefix: ipStrToPrefix(externalIPv4)}, + {Prefix: ipStrToPrefix(externalIPv6)}, + } + mockBGPServer.AdvertiseRoutes(ctx, gomock.InAnyOrder(routesToAdvertise)) + }, + }, + { + name: "Delete an alternative BGPPolicy", + bpToDelete: bgpPolicyName2, + crdObjects: []runtime.Object{bp1, bp2}, + existingState: bp1State, + expectedState: bp1State, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, objects, tt.crdObjects, true, true) + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.startInformers(stopCh) + + // Ignore the BGPPolicy ADD events for the test BGPPolicy. + waitEvents(t, 1, c) + + c.bgpPolicyState = tt.existingState + c.bgpPolicyState.bgpServer = c.mockBGPServer + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + err := c.crdClient.CrdV1alpha1().BGPPolicies().Delete(context.TODO(), tt.bpToDelete, metav1.DeleteOptions{}) + require.NoError(t, err) + waitEvents(t, 1, c) + + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + assert.NoError(t, c.syncBGPPolicy()) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + }) + } +} + +func TestNodeUpdate(t *testing.T) { + bp1 := generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}) + bp1State := generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{podIPv4CIDR.String(), podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}) + bp2 := generateBGPPolicy(bgpPolicyName2, + nodeLabels2, + 179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}) + bp2State := generateBGPPolicyState(bgpPolicyName2, + 179, + 65000, + nodeIPv4Addr.IP.String(), + []string{podIPv4CIDR.String(), podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv4Peer1Config, ipv6Peer1Config}) + bp3 := generateBGPPolicy(bgpPolicyName3, + nodeLabels3, + 179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv6Peer1}) + crdObjects := []runtime.Object{bp1, + bp2, + bp3, + } + testCases := []struct { + name string + ipv4Enabled bool + ipv6Enabled bool + node *corev1.Node + updatedNode *corev1.Node + existingState *bgpPolicyState + expectedState *bgpPolicyState + expectedCalls func(mockBGPServer *bgptest.MockInterfaceMockRecorder) + }{ + { + name: "Update labels, a BGPPolicy is added to alternatives", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels3, nodeAnnotations1), + existingState: bp1State, + expectedState: bp1State, + }, + { + name: "Update labels, a BGPPolicy is removed from alternatives", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels3, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + existingState: bp1State, + expectedState: bp1State, + }, + { + name: "Update labels, effective BGPPolicy is updated to another one", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels2, nodeAnnotations1), + existingState: bp1State, + expectedState: bp2State, + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(ctx) + mockBGPServer.Stop(ctx) + mockBGPServer.AddPeer(ctx, ipv4Peer1Config) + mockBGPServer.AddPeer(ctx, ipv6Peer1Config) + routesToAdvertise := []bgp.Route{ + {Prefix: podIPv4CIDR.String()}, + {Prefix: podIPv6CIDR.String()}, + } + mockBGPServer.AdvertiseRoutes(ctx, gomock.InAnyOrder(routesToAdvertise)) + }, + }, + { + name: "Update labels, effective BGPPolicy is updated to empty", + ipv4Enabled: true, + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nil, nodeAnnotations1), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Stop(ctx) + }, + existingState: bp1State, + }, + { + name: "IPv6 only, update annotations, effective BGPPolicy router ID is updated", + ipv6Enabled: true, + node: generateNode(localNodeName, nodeLabels1, nodeAnnotations1), + updatedNode: generateNode(localNodeName, nodeLabels1, nodeAnnotations2), + existingState: generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + "192.168.77.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}), + expectedState: generateBGPPolicyState(bgpPolicyName1, + 179, + 65000, + "10.10.0.100", + []string{podIPv6CIDR.String()}, + []bgp.PeerConfig{ipv6Peer1Config}), + expectedCalls: func(mockBGPServer *bgptest.MockInterfaceMockRecorder) { + mockBGPServer.Start(ctx) + mockBGPServer.Stop(ctx) + mockBGPServer.AddPeer(ctx, ipv6Peer1Config) + mockBGPServer.AdvertiseRoutes(ctx, []bgp.Route{{Prefix: podIPv6CIDR.String()}}) + }, + }, + } + for _, tt := range testCases { + t.Run(tt.name, func(t *testing.T) { + c := newFakeController(t, []runtime.Object{tt.node}, crdObjects, tt.ipv4Enabled, tt.ipv6Enabled) + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.startInformers(stopCh) + + // Ignore the BGPPolicy ADD events for the test BGPPolicies. + waitEvents(t, 1, c) + + // Fake the BGPPolicy state, effective BGPPolicy and alternative BGPPolicies. + c.bgpPolicyState = tt.existingState + if c.bgpPolicyState != nil { + c.bgpPolicyState.bgpServer = c.mockBGPServer + } + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + _, err := c.client.CoreV1().Nodes().Update(context.TODO(), tt.updatedNode, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitEvents(t, 1, c) + if tt.expectedCalls != nil { + tt.expectedCalls(c.mockBGPServer.EXPECT()) + } + assert.NoError(t, c.syncBGPPolicy()) + checkBGPPolicyState(t, tt.expectedState, c.bgpPolicyState) + }) + } +} + +func TestServiceLifecycle(t *testing.T) { + bp := generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + true, + true, + true, + false, + false, + []v1alpha1.BGPPeer{ipv4Peer1}) + c := newFakeController(t, []runtime.Object{node, ipv4LoadBalancerEps}, []runtime.Object{bp}, true, false) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.startInformers(stopCh) + + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + // Initialize the test BGPPolicy. + waitEvents(t, 1, c) + mockBGPServer.EXPECT().Start(ctx) + mockBGPServer.EXPECT().AddPeer(ctx, ipv4Peer1Config) + item, _ := c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) + + // Create a Service. + loadBalancer := generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.100", "192.168.77.150", false, false) + _, err := c.client.CoreV1().Services("default").Create(context.TODO(), loadBalancer, metav1.CreateOptions{}) + require.NoError(t, err) + + waitEvents(t, 1, c) + mockBGPServer.EXPECT().AdvertiseRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "10.96.10.10/32"}, {Prefix: "192.168.77.100/32"}, {Prefix: "192.168.77.150/32"}})) + item, _ = c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) + + // Update externalIPs and LoadBalancerIPs of the Service. + updatedLoadBalancer := generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", false, false) + _, err = c.client.CoreV1().Services("default").Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitEvents(t, 1, c) + mockBGPServer.EXPECT().AdvertiseRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.101/32"}, {Prefix: "192.168.77.151/32"}})) + mockBGPServer.EXPECT().WithdrawRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.100/32"}, {Prefix: "192.168.77.150/32"}})) + item, _ = c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) + + // Update externalTrafficPolicy of the Service from Cluster to Local. + updatedLoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", false, true) + _, err = c.client.CoreV1().Services("default").Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitEvents(t, 1, c) + mockBGPServer.EXPECT().WithdrawRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.101/32"}, {Prefix: "192.168.77.151/32"}})) + item, _ = c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) + + // Update internalTrafficPolicy of the Service from Cluster to Local. + updatedLoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", true, true) + _, err = c.client.CoreV1().Services("default").Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitEvents(t, 1, c) + mockBGPServer.EXPECT().WithdrawRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "10.96.10.10/32"}})) + item, _ = c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) + + // Update externalTrafficPolicy of the Service from Local to Cluster. + updatedLoadBalancer = generateService(ipv4LoadBalancerName, corev1.ServiceTypeLoadBalancer, "10.96.10.10", "192.168.77.101", "192.168.77.151", true, false) + _, err = c.client.CoreV1().Services("default").Update(context.TODO(), updatedLoadBalancer, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitEvents(t, 1, c) + mockBGPServer.EXPECT().AdvertiseRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.101/32"}, {Prefix: "192.168.77.151/32"}})) + item, _ = c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) + + // Delete the Service. + err = c.client.CoreV1().Services("default").Delete(context.TODO(), updatedLoadBalancer.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + waitEvents(t, 1, c) + mockBGPServer.EXPECT().WithdrawRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.101/32"}, {Prefix: "192.168.77.151/32"}})) + item, _ = c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) +} + +func TestEgressLifecycle(t *testing.T) { + bp := generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + false, + false, + false, + true, + false, + []v1alpha1.BGPPeer{ipv4Peer1}) + c := newFakeController(t, []runtime.Object{node}, []runtime.Object{bp}, true, false) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + + c.startInformers(stopCh) + + // Fake the passwords of BGP peers. + c.bgpPeerPasswords = bgpPeerPasswords + + // Initialize the test BGPPolicy. + waitEvents(t, 1, c) + mockBGPServer.EXPECT().Start(ctx) + mockBGPServer.EXPECT().AddPeer(ctx, ipv4Peer1Config) + item, _ := c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) + + // Create an Egress. + egress := generateEgress("eg1-4", "192.168.77.200", localNodeName) + _, err := c.crdClient.CrdV1beta1().Egresses().Create(context.TODO(), egress, metav1.CreateOptions{}) + require.NoError(t, err) + + waitEvents(t, 1, c) + mockBGPServer.EXPECT().AdvertiseRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.200/32"}})) + item, _ = c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) + + // Update the Egress. + updatedEgress := generateEgress("eg1-4", "192.168.77.201", localNodeName) + _, err = c.crdClient.CrdV1beta1().Egresses().Update(context.TODO(), updatedEgress, metav1.UpdateOptions{}) + require.NoError(t, err) + + waitEvents(t, 1, c) + mockBGPServer.EXPECT().AdvertiseRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.201/32"}})) + mockBGPServer.EXPECT().WithdrawRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.200/32"}})) + item, _ = c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) + + // Delete the Egress. + err = c.crdClient.CrdV1beta1().Egresses().Delete(context.TODO(), updatedEgress.Name, metav1.DeleteOptions{}) + require.NoError(t, err) + + waitEvents(t, 1, c) + mockBGPServer.EXPECT().WithdrawRoutes(ctx, gomock.InAnyOrder([]bgp.Route{{Prefix: "192.168.77.201/32"}})) + item, _ = c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) +} + +func TestBGPSecretUpdate(t *testing.T) { + bp := generateBGPPolicy(bgpPolicyName1, + nodeLabels1, + 179, + 65000, + false, + false, + false, + false, + true, + []v1alpha1.BGPPeer{ipv4Peer1, ipv4Peer2, ipv4Peer3}) + c := newFakeController(t, []runtime.Object{node}, []runtime.Object{bp}, true, false) + mockBGPServer := c.mockBGPServer + + stopCh := make(chan struct{}) + defer close(stopCh) + c.startInformers(stopCh) + go c.watchSecretChanges(stopCh) + + // Wait the Secret watcher to be ready. + time.Sleep(time.Second) + + // Create the Secret. + secret := generateSecret(bgpPeerPasswords) + _, err := c.client.CoreV1().Secrets("kube-system").Create(context.TODO(), secret, metav1.CreateOptions{}) + require.NoError(t, err) + + require.Eventually(t, func() bool { + c.bgpPeerPasswordsMutex.RLock() + defer c.bgpPeerPasswordsMutex.RUnlock() + if reflect.DeepEqual(c.bgpPeerPasswords, bgpPeerPasswords) { + return true + } + return false + }, 5*time.Second, 10*time.Millisecond) + + // Initialize the test BGPPolicy. + waitEvents(t, 1, c) + mockBGPServer.EXPECT().Start(ctx) + mockBGPServer.EXPECT().AddPeer(ctx, ipv4Peer1Config) + mockBGPServer.EXPECT().AddPeer(ctx, ipv4Peer2Config) + mockBGPServer.EXPECT().AddPeer(ctx, ipv4Peer3Config) + mockBGPServer.EXPECT().AdvertiseRoutes(ctx, []bgp.Route{{Prefix: podIPv4CIDR.String()}}) + item, _ := c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) + + // Update the Secret. + updatedBGPPeerPasswords := map[string]string{ + generateBGPPeerKey(ipv4Peer1Addr, peer1ASN): "updated-" + peer1AuthPassword, + generateBGPPeerKey(ipv4Peer2Addr, peer2ASN): peer2AuthPassword, + generateBGPPeerKey(ipv4Peer3Addr, peer3ASN): "updated-" + peer3AuthPassword, + } + updatedSecret := generateSecret(updatedBGPPeerPasswords) + _, err = c.client.CoreV1().Secrets("kube-system").Update(context.TODO(), updatedSecret, metav1.UpdateOptions{}) + require.NoError(t, err) + require.Eventually(t, func() bool { + c.bgpPeerPasswordsMutex.RLock() + defer c.bgpPeerPasswordsMutex.RUnlock() + if reflect.DeepEqual(c.bgpPeerPasswords, updatedBGPPeerPasswords) { + return true + } + return false + }, 5*time.Second, 10*time.Millisecond) + + // Process the event triggered by the update of the Secret. + waitEvents(t, 1, c) + updatedIPv4Peer1Config := ipv4Peer1Config + updatedIPv4Peer3Config := ipv4Peer3Config + updatedIPv4Peer1Config.Password = "updated-" + peer1AuthPassword + updatedIPv4Peer3Config.Password = "updated-" + peer3AuthPassword + mockBGPServer.EXPECT().UpdatePeer(ctx, updatedIPv4Peer1Config) + mockBGPServer.EXPECT().UpdatePeer(ctx, updatedIPv4Peer3Config) + item, _ = c.queue.Get() + require.NoError(t, c.syncBGPPolicy()) + c.queue.Done(item) +} + +func generateBGPPolicyState(bgpPolicyName string, + listenPort int32, + localASN int32, + routerID string, + prefixes []string, + peerConfigs []bgp.PeerConfig) *bgpPolicyState { + routes := sets.New[bgp.Route]() + peerConfigMap := make(map[string]bgp.PeerConfig) + + for _, prefix := range prefixes { + routes.Insert(bgp.Route{Prefix: prefix}) + } + for _, peerConfig := range peerConfigs { + peerKey := generateBGPPeerKey(peerConfig.Address, peerConfig.ASN) + peerConfigMap[peerKey] = peerConfig + } + + return &bgpPolicyState{ + bgpPolicy: bgpPolicyName, + listenPort: listenPort, + localASN: localASN, + routerID: routerID, + routes: routes, + peerConfigs: peerConfigMap, + } +} + +func checkBGPPolicyState(t *testing.T, expected, got *bgpPolicyState) { + require.Equal(t, expected != nil, got != nil) + if expected != nil { + assert.Equal(t, expected.bgpPolicy, got.bgpPolicy) + assert.Equal(t, expected.listenPort, got.listenPort) + assert.Equal(t, expected.localASN, got.localASN) + assert.Equal(t, expected.routerID, got.routerID) + assert.Equal(t, expected.routes, got.routes) + assert.Equal(t, expected.peerConfigs, got.peerConfigs) + } +} + +func generateBGPPolicy(name string, + nodeSelector map[string]string, + listenPort int32, + localASN int32, + advertiseClusterIP bool, + advertiseExternalIP bool, + advertiseLoadBalancerIP bool, + advertiseEgressIP bool, + advertisePodCIDR bool, + externalPeers []v1alpha1.BGPPeer) *v1alpha1.BGPPolicy { + var advertisement v1alpha1.Advertisements + advertisement.Service = &v1alpha1.ServiceAdvertisement{} + if advertiseClusterIP { + advertisement.Service.IPTypes = append(advertisement.Service.IPTypes, v1alpha1.ServiceIPTypeClusterIP) + } + if advertiseExternalIP { + advertisement.Service.IPTypes = append(advertisement.Service.IPTypes, v1alpha1.ServiceIPTypeExternalIP) + } + if advertiseLoadBalancerIP { + advertisement.Service.IPTypes = append(advertisement.Service.IPTypes, v1alpha1.ServiceIPTypeLoadBalancerIP) + } + if advertiseEgressIP { + advertisement.Egress = &v1alpha1.EgressAdvertisement{} + } + + if advertisePodCIDR { + advertisement.Pod = &v1alpha1.PodAdvertisement{} + } + return &v1alpha1.BGPPolicy{ + ObjectMeta: metav1.ObjectMeta{Name: name, UID: "test-uid"}, + Spec: v1alpha1.BGPPolicySpec{ + NodeSelector: metav1.LabelSelector{MatchLabels: nodeSelector}, + LocalASN: localASN, + ListenPort: &listenPort, + Advertisements: advertisement, + BGPPeers: externalPeers, + }, + } +} +func generateService(name string, + svcType corev1.ServiceType, + clusterIP string, + externalIP string, + LoadBalancerIP string, + internalTrafficPolicyLocal bool, + externalTrafficPolicyLocal bool) *corev1.Service { + itp := corev1.ServiceInternalTrafficPolicyCluster + if internalTrafficPolicyLocal { + itp = corev1.ServiceInternalTrafficPolicyLocal + } + etp := corev1.ServiceExternalTrafficPolicyCluster + if externalTrafficPolicyLocal { + etp = corev1.ServiceExternalTrafficPolicyLocal + } + var externalIPs []string + if externalIP != "" { + externalIPs = append(externalIPs, externalIP) + } + + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: "test-uid", + }, + Spec: corev1.ServiceSpec{ + Type: svcType, + ClusterIP: clusterIP, + Ports: []corev1.ServicePort{{ + Name: "p80", + Port: 80, + Protocol: corev1.ProtocolTCP, + }}, + ClusterIPs: []string{clusterIP}, + ExternalIPs: externalIPs, + InternalTrafficPolicy: &itp, + ExternalTrafficPolicy: etp, + }, + } + if LoadBalancerIP != "" { + ingress := []corev1.LoadBalancerIngress{{IP: LoadBalancerIP}} + svc.Status.LoadBalancer.Ingress = ingress + } + return svc +} + +func generateEgress(name string, ip string, nodeName string) *crdv1b1.Egress { + return &crdv1b1.Egress{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: "test-uid", + }, + Spec: crdv1b1.EgressSpec{ + EgressIP: ip, + }, + Status: crdv1b1.EgressStatus{ + EgressIP: ip, + EgressNode: nodeName, + }, + } +} + +func generateNode(name string, labels, annotations map[string]string) *corev1.Node { + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + UID: "test-uid", + Labels: labels, + Annotations: annotations, + }, + } +} + +func generateEndpointSlice(svcName string, + isLocal bool, + isIPv6 bool, + endpointIP string) *discovery.EndpointSlice { + addrType := discovery.AddressTypeIPv4 + if isIPv6 { + addrType = discovery.AddressTypeIPv6 + } + var nodeName *string + if isLocal { + nodeName = &localNodeName + } + protocol := corev1.ProtocolTCP + endpointSlice := &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", svcName, rand.String(5)), + Namespace: "default", + UID: "test-uid", + Labels: map[string]string{ + discovery.LabelServiceName: svcName, + }, + }, + AddressType: addrType, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{ + endpointIP, + }, + Conditions: discovery.EndpointConditions{ + Ready: ptr.To(true), + }, + Hostname: nodeName, + NodeName: nodeName, + }}, + Ports: []discovery.EndpointPort{{ + Name: ptr.To("p80"), + Port: ptr.To(int32(80)), + Protocol: &protocol, + }}, + } + + return endpointSlice +} + +func generateBGPPeer(ip string, asn, port, gracefulRestartTimeSeconds int32) v1alpha1.BGPPeer { + return v1alpha1.BGPPeer{ + Address: ip, + Port: &port, + ASN: asn, + MultihopTTL: ptr.To(int32(1)), + GracefulRestartTimeSeconds: &gracefulRestartTimeSeconds, + } +} + +func generateBGPPeerConfig(peerConfig *v1alpha1.BGPPeer, password string) bgp.PeerConfig { + return bgp.PeerConfig{ + BGPPeer: peerConfig, + Password: password, + } +} + +func generateSecret(rawData map[string]string) *corev1.Secret { + data := make(map[string][]byte) + for k, v := range rawData { + data[k] = []byte(v) + } + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: types.BGPPolicySecretName, + Namespace: "kube-system", + UID: "test-uid", + }, + Type: corev1.SecretTypeOpaque, + Data: data, + } +} + +func ipStrToPrefix(ipStr string) string { + if netutils.IsIPv4String(ipStr) { + return ipStr + ipv4Suffix + } else if netutils.IsIPv6String(ipStr) { + return ipStr + ipv6Suffix + } + return "" +} + +func waitEvents(t *testing.T, expectedEvents int, c *fakeController) { + require.Eventually(t, func() bool { + return c.queue.Len() == expectedEvents + }, 5*time.Second, 10*time.Millisecond) +} diff --git a/pkg/agent/types/annotations.go b/pkg/agent/types/annotations.go index b5b63e1a8d8..3af28653c62 100644 --- a/pkg/agent/types/annotations.go +++ b/pkg/agent/types/annotations.go @@ -38,4 +38,7 @@ const ( // L7FlowExporterAnnotationKey is the key of the L7 network flow export annotation that enables L7 network flow export for annotated Pod or Namespace based on the value of annotation which is direction of traffic. L7FlowExporterAnnotationKey string = "visibility.antrea.io/l7-export" + + // NodeBGPPolicyRouterIDAnnotationKey represents the key of the Node's BGP router ID in the Annotations of the Node. + NodeBGPPolicyRouterIDAnnotationKey string = "node.antrea.io/bgppolicy-router-id" ) diff --git a/pkg/agent/types/bgppolicy.go b/pkg/agent/types/bgppolicy.go new file mode 100644 index 00000000000..3e3cc70cab3 --- /dev/null +++ b/pkg/agent/types/bgppolicy.go @@ -0,0 +1,24 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package types + +// BGPPolicySecretName is the name of the Kubernetes Secret used to store BGP peer passwords. Each entry in the Secret +// uses a key which is a concatenated string BGP peer IP address and ASN. The corresponding value is the password for +// that BGP peer. +// +// Examples of keys: +// - "192.168.77.100-65000" +// - "2001:db8::1-65000" +const BGPPolicySecretName = "antrea-bgp-passwords" // #nosec G101 diff --git a/pkg/apiserver/handlers/featuregates/handler_test.go b/pkg/apiserver/handlers/featuregates/handler_test.go index adb630a1324..fe1318252d1 100644 --- a/pkg/apiserver/handlers/featuregates/handler_test.go +++ b/pkg/apiserver/handlers/featuregates/handler_test.go @@ -55,6 +55,7 @@ func Test_getGatesResponse(t *testing.T) { {Component: "agent", Name: "AntreaIPAM", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "AntreaPolicy", Status: "Disabled", Version: "BETA"}, {Component: "agent", Name: "AntreaProxy", Status: "Enabled", Version: "GA"}, + {Component: "agent", Name: "BGPPolicy", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "CleanupStaleUDPSvcConntrack", Status: cleanupStaleUDPSvcConntrackStatus, Version: "BETA"}, {Component: "agent", Name: "Egress", Status: egressStatus, Version: "BETA"}, {Component: "agent", Name: "EgressSeparateSubnet", Status: "Disabled", Version: "ALPHA"}, @@ -105,6 +106,7 @@ func Test_getGatesWindowsResponse(t *testing.T) { want: []apis.FeatureGateResponse{ {Component: "agent-windows", Name: "AntreaPolicy", Status: "Disabled", Version: "BETA"}, {Component: "agent-windows", Name: "AntreaProxy", Status: "Enabled", Version: "GA"}, + {Component: "agent-windows", Name: "BGPPolicy", Status: "Disabled", Version: "ALPHA"}, {Component: "agent-windows", Name: "EndpointSlice", Status: "Enabled", Version: "GA"}, {Component: "agent-windows", Name: "ExternalNode", Status: "Disabled", Version: "ALPHA"}, {Component: "agent-windows", Name: "FlowExporter", Status: "Disabled", Version: "ALPHA"}, diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 05cb51a9df5..c12243e92a1 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -163,6 +163,10 @@ const ( // alpha: v2.1 // Enable the NodeLatencyMonitor feature. NodeLatencyMonitor featuregate.Feature = "NodeLatencyMonitor" + + // alpha: v2.1 + // Allow users to advertise Service IPs, Pod IPs, and Egress IPs to external BGP peers. + BGPPolicy featuregate.Feature = "BGPPolicy" ) var ( @@ -179,6 +183,7 @@ var ( DefaultAntreaFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ AntreaPolicy: {Default: true, PreRelease: featuregate.Beta}, AntreaProxy: {Default: true, PreRelease: featuregate.GA}, + BGPPolicy: {Default: false, PreRelease: featuregate.Alpha}, Egress: {Default: true, PreRelease: featuregate.Beta}, EndpointSlice: {Default: true, PreRelease: featuregate.GA}, TopologyAwareHints: {Default: true, PreRelease: featuregate.Beta}, @@ -213,6 +218,7 @@ var ( AntreaIPAM, AntreaPolicy, AntreaProxy, + BGPPolicy, CleanupStaleUDPSvcConntrack, Egress, EndpointSlice,