Skip to content

Commit

Permalink
Addressed comments, not finished
Browse files Browse the repository at this point in the history
Signed-off-by: Hongliang Liu <[email protected]>
  • Loading branch information
hongliangl committed Jul 15, 2024
1 parent 3030553 commit b7c4345
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 107 deletions.
1 change: 1 addition & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ func run(o *Options) error {
egressInformer,
bgpPolicyInformer,
endpointSliceInformer,
o.enableEgress,
k8sClient,
nodeConfig,
networkConfig)
Expand Down
157 changes: 84 additions & 73 deletions pkg/agent/controller/bgp/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package bgp
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/watch"
"reflect"
"sync"
"time"
Expand Down Expand Up @@ -48,7 +49,6 @@ import (
crdinformersv1b1 "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1beta1"
crdlistersv1a1 "antrea.io/antrea/pkg/client/listers/crd/v1alpha1"
crdlistersv1b1 "antrea.io/antrea/pkg/client/listers/crd/v1beta1"
"antrea.io/antrea/pkg/features"
"antrea.io/antrea/pkg/util/env"
utilipset "antrea.io/antrea/pkg/util/sets"
)
Expand Down Expand Up @@ -138,6 +138,7 @@ func NewBGPPolicyController(ctx context.Context,
egressInformer crdinformersv1b1.EgressInformer,
bgpPolicyInformer crdinformersv1a1.BGPPolicyInformer,
endpointSliceInformer discoveryinformers.EndpointSliceInformer,
egressEnabled bool,
k8sClient kubernetes.Interface,
nodeConfig *config.NodeConfig,
networkConfig *config.NetworkConfig) (*Controller, error) {
Expand All @@ -149,9 +150,6 @@ func NewBGPPolicyController(ctx context.Context,
serviceInformer: serviceInformer.Informer(),
serviceLister: serviceInformer.Lister(),
serviceListerSynced: serviceInformer.Informer().HasSynced,
egressInformer: egressInformer.Informer(),
egressLister: egressInformer.Lister(),
egressListerSynced: egressInformer.Informer().HasSynced,
bgpPolicyInformer: bgpPolicyInformer.Informer(),
bgpPolicyLister: bgpPolicyInformer.Lister(),
bgpPolicyListerSynced: bgpPolicyInformer.Informer().HasSynced,
Expand All @@ -166,7 +164,7 @@ func NewBGPPolicyController(ctx context.Context,
podIPv4CIDR: nodeConfig.PodIPv4CIDR.String(),
podIPv6CIDR: nodeConfig.PodIPv6CIDR.String(),
nodeIPv4Addr: nodeConfig.NodeIPv4Addr.IP.String(),
egressEnabled: features.DefaultFeatureGate.Enabled(features.Egress),
egressEnabled: egressEnabled,
newBGPServerFn: func(globalConfig *bgp.GlobalConfig) bgp.Interface {
return gobgp.NewGoBGPServer(globalConfig)
},
Expand Down Expand Up @@ -197,6 +195,9 @@ func NewBGPPolicyController(ctx context.Context,
resyncPeriod,
)
if c.egressEnabled {
c.egressInformer = egressInformer.Informer()
c.egressLister = egressInformer.Lister()
c.egressListerSynced = egressInformer.Informer().HasSynced
c.egressInformer.AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: c.addEgress,
Expand All @@ -219,39 +220,61 @@ func NewBGPPolicyController(ctx context.Context,

// watchSecretChanges uses watch API directly to watch for the changes of the specific Secret storing passwords of BGP
// peers.
func (c *Controller) watchSecretChanges(endCh <-chan struct{}) error {
ns := env.GetAntreaNamespace()
watcher, err := c.k8sClient.CoreV1().Secrets(ns).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{
Namespace: ns,
func (c *Controller) watchSecretChanges(stopCh <-chan struct{}) error {
secretMeta := &metav1.ObjectMeta{
Name: types.BGPPolicySecretName,
Namespace: env.GetAntreaNamespace(),
}
watcher, err := c.k8sClient.CoreV1().Secrets(secretMeta.Namespace).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{
Namespace: secretMeta.Namespace,
Name: secretMeta.Name,
}))
if err != nil {
return fmt.Errorf("failed to create Secret watcher: %v", err)
}

ch := watcher.ResultChan()
defer watcher.Stop()
klog.InfoS("Starting watching Secret changes", "Secret", fmt.Sprintf("%s/%s", ns, types.BGPPolicySecretName))
klog.InfoS("Starting watching Secret changes", "Secret", klog.KObj(secretMeta))
for {
select {
case event, ok := <-ch:
if !ok {
return nil
}
// Update BGP peer passwords.
klog.InfoS("Processing Secret event", "Secret", fmt.Sprintf("%s/%s", ns, types.BGPPolicySecretName))
func() {
c.bgpPeerPasswordsMutex.Lock()
defer c.bgpPeerPasswordsMutex.Unlock()

secretObj := event.Object.(*corev1.Secret)
c.bgpPeerPasswords = make(map[string]string)
for key, data := range secretObj.Data {
c.bgpPeerPasswords[key] = string(data)
}
}()
c.queue.Add(key)
case <-endCh:
klog.InfoS("Processing Secret event", "Secret", klog.KObj(secretMeta))

switch event.Type {
case watch.Added, watch.Modified:
klog.V(2).InfoS("Secret added or modified", "Secret", klog.KObj(secretMeta))
func() {
c.bgpPeerPasswordsMutex.Lock()
defer c.bgpPeerPasswordsMutex.Unlock()

secretObj := event.Object.(*corev1.Secret)
c.bgpPeerPasswords = make(map[string]string)
for k, v := range secretObj.Data {
c.bgpPeerPasswords[k] = string(v)
}
}()
c.queue.Add(key)
case watch.Deleted:
klog.V(2).InfoS("Secret deleted", "Secret", klog.KObj(secretMeta))
func() {
c.bgpPeerPasswordsMutex.Lock()
defer c.bgpPeerPasswordsMutex.Unlock()

// Clear the passwords since the Secret is deleted
c.bgpPeerPasswords = make(map[string]string)
}()
c.queue.Add(key)
case watch.Bookmark:
klog.V(2).InfoS("Received a bookmark event", "Secret", klog.KObj(secretMeta))
case watch.Error:
klog.V(2).InfoS("Received an error event", "Secret", klog.KObj(secretMeta))
}
case <-stopCh:
return nil
}
}
Expand All @@ -263,13 +286,17 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
klog.InfoS("Starting", "controllerName", controllerName)
defer klog.InfoS("Shutting down", "controllerName", controllerName)

if !cache.WaitForNamedCacheSync(controllerName,
stopCh,
cacheSyncs := []cache.InformerSynced{
c.nodeListerSynced,
c.serviceListerSynced,
c.egressListerSynced,
c.bgpPolicyListerSynced,
c.endpointSliceListerSynced) {
c.endpointSliceListerSynced,
}
if c.egressEnabled {
cacheSyncs = append(cacheSyncs, c.egressListerSynced)
}

if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) {
return
}

Expand Down Expand Up @@ -303,20 +330,22 @@ func (c *Controller) processNextWorkItem() bool {
return true
}

// filterBGPPolicies filters the BGPPolicies applied to the current Node.
func (c *Controller) filterBGPPolicies() (map[string]*v1alpha1.BGPPolicy, error) {
func (c *Controller) getBGPPolicy() (*v1alpha1.BGPPolicy, error) {
allBPs, err := c.bgpPolicyLister.List(labels.Everything())
if err != nil {
return nil, err
}

bpMap := make(map[string]*v1alpha1.BGPPolicy)
var oldestBP *v1alpha1.BGPPolicy
for _, bp := range allBPs {
if c.matchedCurrentNode(bp) {
bpMap[bp.GetName()] = bp
if oldestBP == nil || bp.CreationTimestamp.Before(&oldestBP.CreationTimestamp) {
oldestBP = bp
}
}
}
return bpMap, nil

return oldestBP, nil
}

func (c *Controller) syncBGPPolicy() error {
Expand All @@ -325,62 +354,44 @@ func (c *Controller) syncBGPPolicy() error {
klog.V(2).InfoS("Finished syncing BGPPolicy", "durationTime", time.Since(startTime))
}()

// Get all available BGPPolicies applied to the current Node.
allBGPPolicies, err := c.filterBGPPolicies()
// Get the available BGPPolicy applied to the current Node.
effectiveBP, err := c.getBGPPolicy()
if err != nil {
return err
}

allBGPPolicyNames := sets.KeySet(allBGPPolicies)
preState := c.bgpPolicyState
var curState *bgpPolicyState
var effectiveBGPPolicyName string
var needUpdateBGPServer bool

if preState == nil {
if allBGPPolicyNames.Len() == 0 {
if effectiveBP == nil {
if preState == nil {
return nil
}
// If there is no effective BGPPolicy in the last sync, select a random available BGPPolicy as the effective one.
effectiveBGPPolicyName, _ = allBGPPolicyNames.PopAny()
curState = &bgpPolicyState{bgpPolicy: effectiveBGPPolicyName}
needUpdateBGPServer = true
} else {
// If there is an effective BGPPolicy in the last sync, check if it is still included in the available BGPPolicies.
if !allBGPPolicyNames.Has(preState.bgpPolicy) {
// If there is no available BGPPolicy, stop the previous BGP server, clean up the state and return.
if allBGPPolicyNames.Len() == 0 {
if err := preState.bgpServer.Stop(c.ctx); err != nil {
return err
}
c.bgpPolicyState = nil
return nil
}
// Select a new effective BGPPolicy from the available ones.
effectiveBGPPolicyName, _ = allBGPPolicyNames.PopAny()
curState = &bgpPolicyState{bgpPolicy: effectiveBGPPolicyName}
needUpdateBGPServer = true
} else {
// Retain the effective BGPPolicy in the last sync.
effectiveBGPPolicyName = preState.bgpPolicy
curState = &bgpPolicyState{bgpPolicy: effectiveBGPPolicyName}
if err := preState.bgpServer.Stop(c.ctx); err != nil {
return err
}
return nil
}
bgpPolicy := allBGPPolicies[effectiveBGPPolicyName]

klog.V(2).InfoS("Syncing BGPPolicy", "BGPPolicy", effectiveBGPPolicyName)
var needUpdateBGPServer bool
if preState == nil || preState.bgpPolicy != effectiveBP.Name {
needUpdateBGPServer = true
}

klog.V(2).InfoS("Syncing BGPPolicy", "BGPPolicy", effectiveBP)

curState := &bgpPolicyState{bgpPolicy: effectiveBP.Name}

// Retrieve the listen port, local AS number and router ID from the effective BGPPolicy, and update them to the
// current state.
curState.routerID, err = c.getRouterID()
if err != nil {
return err
}
if bgpPolicy.Spec.ListenPort != nil {
curState.listenPort = *bgpPolicy.Spec.ListenPort
if effectiveBP.Spec.ListenPort != nil {
curState.listenPort = *effectiveBP.Spec.ListenPort
} else {
curState.listenPort = defaultBGPListenPort
}
curState.localASN = bgpPolicy.Spec.LocalASN
curState.localASN = effectiveBP.Spec.LocalASN

// If any of the listen port, local AS number, or router ID have changed, mark the BGP server for an update.
if preState != nil {
Expand All @@ -394,7 +405,7 @@ func (c *Controller) syncBGPPolicy() error {
// Stop the stale BGP server.
if preState != nil {
if err := preState.bgpServer.Stop(c.ctx); err != nil {
klog.ErrorS(err, "failed to stop stale BGP server")
return fmt.Errorf("failed to stop stale BGP server: %w", err)
}
}
// Start the new BGP server.
Expand All @@ -414,7 +425,7 @@ func (c *Controller) syncBGPPolicy() error {
}

// Reconcile BGP peers.
curPeerConfigs, err := c.getPeerConfigs(bgpPolicy.Spec.BGPPeers)
curPeerConfigs, err := c.getPeerConfigs(effectiveBP.Spec.BGPPeers)
if err != nil {
return err
}
Expand All @@ -427,7 +438,7 @@ func (c *Controller) syncBGPPolicy() error {
}

// Reconcile BGP routes generated from advertisements.
curRoutes, err := c.getRoutes(bgpPolicy.Spec.Advertisements)
curRoutes, err := c.getRoutes(effectiveBP.Spec.Advertisements)
if err != nil {
return err
}
Expand Down Expand Up @@ -527,7 +538,7 @@ func (c *Controller) reconcileRoutes(curRoutes, preRoutes sets.Set[bgp.Route], b

func (c *Controller) getRouterID() (string, error) {
var routerID string
// For IPv6 only environment, the BGP routerID should be specified by K8s Node annotation `antrea.io/bgp-route-id`.
// For IPv6 only environment, the BGP routerID should be specified by K8s Node annotation `antrea.io/bgp-router-id`.
if !c.enabledIPv4 && c.enabledIPv6 {
nodeObj, _ := c.nodeLister.Get(c.nodeName)
var exists bool
Expand Down
Loading

0 comments on commit b7c4345

Please sign in to comment.