diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 83a2d05286c..013f5d45c6b 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -731,14 +731,15 @@ func run(o *Options) error { go ipamController.Run(stopCh) } + var secondaryNetworkController *secondarynetwork.Controller if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { - defer secondarynetwork.RestoreHostInterfaceConfiguration(&o.config.SecondaryNetwork) - if err := secondarynetwork.Initialize( + secondaryNetworkController, err = secondarynetwork.NewController( o.config.ClientConnection, o.config.KubeAPIServerOverride, - k8sClient, localPodInformer.Get(), nodeConfig.Name, - podUpdateChannel, stopCh, - &o.config.SecondaryNetwork, ovsdbConnection); err != nil { - return fmt.Errorf("failed to initialize secondary network: %v", err) + k8sClient, localPodInformer.Get(), + podUpdateChannel, + &o.config.SecondaryNetwork, ovsdbConnection) + if err != nil { + return fmt.Errorf("failed to create secondary network controller: %w", err) } } @@ -864,6 +865,14 @@ func run(o *Options) error { return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err) } } + // secondaryNetworkController Initialize must be run after FlowRestoreComplete. + if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { + defer secondaryNetworkController.Restore() + if err = secondaryNetworkController.Initialize(); err != nil { + return fmt.Errorf("failed to initialize secondary network: %v", err) + } + go secondaryNetworkController.Run(stopCh) + } // statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for // NetworkPolicy stats and Multicast stats. diff --git a/pkg/agent/secondarynetwork/init_linux.go b/pkg/agent/secondarynetwork/init_linux.go index 6a8b16c2021..5d5edaa2ece 100644 --- a/pkg/agent/secondarynetwork/init_linux.go +++ b/pkg/agent/secondarynetwork/init_linux.go @@ -43,30 +43,56 @@ var ( newOVSBridgeFn = ovsconfig.NewOVSBridge ) -// Initialize sets up OVS bridges and starts the Pod controller for secondary networks. -func Initialize( +type Controller struct { + ovsBridgeClient ovsconfig.OVSBridgeClient + secNetConfig *agentconfig.SecondaryNetworkConfig + PodController *podwatch.PodController +} + +func NewController( clientConnectionConfig componentbaseconfig.ClientConnectionConfiguration, kubeAPIServerOverride string, k8sClient clientset.Interface, podInformer cache.SharedIndexInformer, - nodeName string, podUpdateSubscriber channel.Subscriber, - stopCh <-chan struct{}, - secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB) error { - + secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB, +) (*Controller, error) { ovsBridgeClient, err := createOVSBridge(secNetConfig.OVSBridges, ovsdb) if err != nil { - return err + return nil, err + } + + // Create the NetworkAttachmentDefinition client, which handles access to secondary network object + // definition from the API Server. + netAttachDefClient, err := createNetworkAttachDefClient(clientConnectionConfig, kubeAPIServerOverride) + if err != nil { + return nil, fmt.Errorf("NetworkAttachmentDefinition client creation failed: %v", err) } + // Create podController to handle secondary network configuration for Pods with + // k8s.v1.cni.cncf.io/networks Annotation defined. + podWatchController, err := podwatch.NewPodController( + k8sClient, netAttachDefClient, podInformer, + podUpdateSubscriber, ovsBridgeClient) + if err != nil { + return nil, err + } + return &Controller{ + ovsBridgeClient: ovsBridgeClient, + secNetConfig: secNetConfig, + PodController: podWatchController}, nil +} + +// Initialize sets up OVS bridges. +func (c *Controller) Initialize() error { // We only support moving and restoring of interface configuration to OVS Bridge for the single physical interface case. - if len(secNetConfig.OVSBridges) != 0 { - phyInterfaces := make([]string, len(secNetConfig.OVSBridges[0].PhysicalInterfaces)) - copy(phyInterfaces, secNetConfig.OVSBridges[0].PhysicalInterfaces) + if len(c.secNetConfig.OVSBridges) != 0 { + phyInterfaces := make([]string, len(c.secNetConfig.OVSBridges[0].PhysicalInterfaces)) + copy(phyInterfaces, c.secNetConfig.OVSBridges[0].PhysicalInterfaces) if len(phyInterfaces) == 1 { bridgedName, _, err := util.PrepareHostInterfaceConnection( - ovsBridgeClient, + c.ovsBridgeClient, phyInterfaces[0], 0, map[string]interface{}{ @@ -78,34 +104,22 @@ func Initialize( } phyInterfaces[0] = bridgedName } - if err = connectPhyInterfacesToOVSBridge(ovsBridgeClient, phyInterfaces); err != nil { + if err := connectPhyInterfacesToOVSBridge(c.ovsBridgeClient, phyInterfaces); err != nil { return err } } - - // Create the NetworkAttachmentDefinition client, which handles access to secondary network object - // definition from the API Server. - netAttachDefClient, err := createNetworkAttachDefClient(clientConnectionConfig, kubeAPIServerOverride) - if err != nil { - return fmt.Errorf("NetworkAttachmentDefinition client creation failed: %v", err) - } - - // Create podController to handle secondary network configuration for Pods with - // k8s.v1.cni.cncf.io/networks Annotation defined. - if podWatchController, err := podwatch.NewPodController( - k8sClient, netAttachDefClient, podInformer, - podUpdateSubscriber, ovsBridgeClient); err != nil { - return err - } else { - go podWatchController.Run(stopCh) - } return nil } -// RestoreHostInterfaceConfiguration restores interface configuration from secondary-bridge back to host-interface. -func RestoreHostInterfaceConfiguration(secNetConfig *agentconfig.SecondaryNetworkConfig) { - if len(secNetConfig.OVSBridges) != 0 && len(secNetConfig.OVSBridges[0].PhysicalInterfaces) == 1 { - util.RestoreHostInterfaceConfiguration(secNetConfig.OVSBridges[0].BridgeName, secNetConfig.OVSBridges[0].PhysicalInterfaces[0]) +// Run starts the Pod controller for secondary networks. +func (c *Controller) Run(stopCh <-chan struct{}) { + c.PodController.Run(stopCh) +} + +// Restore restores interface configuration from secondary-bridge back to host-interface. +func (c *Controller) Restore() { + if len(c.secNetConfig.OVSBridges) != 0 && len(c.secNetConfig.OVSBridges[0].PhysicalInterfaces) == 1 { + util.RestoreHostInterfaceConfiguration(c.secNetConfig.OVSBridges[0].BridgeName, c.secNetConfig.OVSBridges[0].PhysicalInterfaces[0]) } } diff --git a/pkg/agent/secondarynetwork/init_windows.go b/pkg/agent/secondarynetwork/init_windows.go index 45b86210652..0c79def7da3 100644 --- a/pkg/agent/secondarynetwork/init_windows.go +++ b/pkg/agent/secondarynetwork/init_windows.go @@ -29,18 +29,27 @@ import ( "antrea.io/antrea/pkg/util/channel" ) -func Initialize( +type Controller struct { +} + +func NewController( clientConnectionConfig componentbaseconfig.ClientConnectionConfiguration, kubeAPIServerOverride string, k8sClient clientset.Interface, podInformer cache.SharedIndexInformer, - nodeName string, podUpdateSubscriber channel.Subscriber, - stopCh <-chan struct{}, - secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB) error { - return errors.New("not supported on Windows") + secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB, +) (*Controller, error) { + return nil, errors.New("not supported on Windows") +} + +func (c *Controller) Initialize() error { + return nil +} + +func (c *Controller) Run(stopCh <-chan struct{}) { } -func RestoreHostInterfaceConfiguration(secNetConfig *agentconfig.SecondaryNetworkConfig) { +func (c *Controller) Restore() { // Not supported on Windows. } diff --git a/pkg/agent/secondarynetwork/podwatch/controller.go b/pkg/agent/secondarynetwork/podwatch/controller.go index 3c9fb502c1c..43409fb12f8 100644 --- a/pkg/agent/secondarynetwork/podwatch/controller.go +++ b/pkg/agent/secondarynetwork/podwatch/controller.go @@ -82,7 +82,7 @@ type podCNIInfo struct { netNS string } -type podController struct { +type PodController struct { kubeClient clientset.Interface netAttachDefClient netdefclient.K8sCniCncfIoV1Interface queue workqueue.RateLimitingInterface @@ -103,13 +103,13 @@ func NewPodController( podInformer cache.SharedIndexInformer, podUpdateSubscriber channel.Subscriber, ovsBridgeClient ovsconfig.OVSBridgeClient, -) (*podController, error) { +) (*PodController, error) { ifaceStore := interfacestore.NewInterfaceStore() interfaceConfigurator, err := cniserver.NewSecondaryInterfaceConfigurator(ovsBridgeClient, ifaceStore) if err != nil { return nil, fmt.Errorf("failed to create SecondaryInterfaceConfigurator: %v", err) } - pc := podController{ + pc := PodController{ kubeClient: kubeClient, netAttachDefClient: netAttachDefClient, queue: workqueue.NewNamedRateLimitingQueue( @@ -153,7 +153,7 @@ func allocatePodSecondaryIfaceName(usedIFNames sets.Set[string]) (string, error) return "", fmt.Errorf("no more interface names") } -func (pc *podController) enqueuePod(obj interface{}) { +func (pc *PodController) enqueuePod(obj interface{}) { pod, isPod := obj.(*corev1.Pod) if !isPod { podDeletedState, ok := obj.(cache.DeletedFinalStateUnknown) @@ -172,7 +172,7 @@ func (pc *podController) enqueuePod(obj interface{}) { } // processCNIUpdate will be called when CNIServer publishes a Pod update event. -func (pc *podController) processCNIUpdate(e interface{}) { +func (pc *PodController) processCNIUpdate(e interface{}) { event := e.(types.PodUpdate) podKey := podKeyGet(event.PodName, event.PodNamespace) if event.IsAdd { @@ -184,7 +184,7 @@ func (pc *podController) processCNIUpdate(e interface{}) { } // handleAddUpdatePod handles Pod Add, Update events and updates annotation if required. -func (pc *podController) handleAddUpdatePod(pod *corev1.Pod, podCNIInfo *podCNIInfo, +func (pc *PodController) handleAddUpdatePod(pod *corev1.Pod, podCNIInfo *podCNIInfo, storedInterfaces []*interfacestore.InterfaceConfig) error { if len(storedInterfaces) > 0 { // We do not support secondary network update at the moment. Return as long as one @@ -219,7 +219,7 @@ func (pc *podController) handleAddUpdatePod(pod *corev1.Pod, podCNIInfo *podCNII return pc.configurePodSecondaryNetwork(pod, networklist, podCNIInfo) } -func (pc *podController) removeInterfaces(interfaces []*interfacestore.InterfaceConfig) error { +func (pc *PodController) removeInterfaces(interfaces []*interfacestore.InterfaceConfig) error { var savedErr error for _, interfaceConfig := range interfaces { podName := interfaceConfig.PodName @@ -256,7 +256,7 @@ func (pc *podController) removeInterfaces(interfaces []*interfacestore.Interface return savedErr } -func (pc *podController) syncPod(key string) error { +func (pc *PodController) syncPod(key string) error { var pod *corev1.Pod var cniInfo *podCNIInfo podExists := false @@ -297,12 +297,12 @@ func (pc *podController) syncPod(key string) error { return pc.handleAddUpdatePod(pod, cniInfo, storedInterfaces) } -func (pc *podController) Worker() { +func (pc *PodController) Worker() { for pc.processNextWorkItem() { } } -func (pc *podController) processNextWorkItem() bool { +func (pc *PodController) processNextWorkItem() bool { obj, quit := pc.queue.Get() if quit { return false @@ -319,7 +319,7 @@ func (pc *podController) processNextWorkItem() bool { } // Configure Secondary Network Interface. -func (pc *podController) configureSecondaryInterface( +func (pc *PodController) configureSecondaryInterface( pod *corev1.Pod, network *netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo, @@ -370,7 +370,7 @@ func (pc *podController) configureSecondaryInterface( return ifConfigErr } -func (pc *podController) configurePodSecondaryNetwork(pod *corev1.Pod, networkList []*netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo) error { +func (pc *PodController) configurePodSecondaryNetwork(pod *corev1.Pod, networkList []*netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo) error { usedIFNames := sets.New[string]() for _, network := range networkList { if network.InterfaceRequest != "" { @@ -477,7 +477,7 @@ func validateNetworkConfig(cniConfig []byte) (*SecondaryNetworkConfig, error) { return &networkConfig, nil } -func (pc *podController) Run(stopCh <-chan struct{}) { +func (pc *PodController) Run(stopCh <-chan struct{}) { defer func() { klog.InfoS("Shutting down", "controller", controllerName) pc.queue.ShutDown() diff --git a/pkg/agent/secondarynetwork/podwatch/controller_test.go b/pkg/agent/secondarynetwork/podwatch/controller_test.go index f1c9e81572d..ccce64c4cc7 100644 --- a/pkg/agent/secondarynetwork/podwatch/controller_test.go +++ b/pkg/agent/secondarynetwork/podwatch/controller_test.go @@ -587,7 +587,7 @@ func TestPodControllerAddPod(t *testing.T) { podKey := podKeyGet(podName, testNamespace) // Create Pod and wait for Informer cache updated. - createPodFn := func(pc *podController, pod *corev1.Pod) { + createPodFn := func(pc *PodController, pod *corev1.Pod) { _, err := pc.kubeClient.CoreV1().Pods(testNamespace).Create(context.Background(), pod, metav1.CreateOptions{}) require.NoError(t, err, "error when creating test Pod") @@ -596,7 +596,7 @@ func TestPodControllerAddPod(t *testing.T) { return ok == true && err == nil }, 1*time.Second, 10*time.Millisecond) } - deletePodFn := func(pc *podController, podName string) { + deletePodFn := func(pc *PodController, podName string) { require.NoError(t, pc.kubeClient.CoreV1().Pods(testNamespace).Delete(context.Background(), podName, metav1.DeleteOptions{}), "error when deleting test Pod") assert.Eventually(t, func() bool { @@ -899,7 +899,7 @@ func TestPodControllerAddPod(t *testing.T) { } func testPodController(ctrl *gomock.Controller) ( - *podController, *podwatchtesting.MockIPAMAllocator, + *PodController, *podwatchtesting.MockIPAMAllocator, *podwatchtesting.MockInterfaceConfigurator) { client := fake.NewSimpleClientset() netdefclient := netdefclientfake.NewSimpleClientset().K8sCniCncfIoV1() @@ -907,8 +907,8 @@ func testPodController(ctrl *gomock.Controller) ( interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl) mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl) - // podController without event handlers. - return &podController{ + // PodController without event handlers. + return &PodController{ kubeClient: client, netAttachDefClient: netdefclient, queue: workqueue.NewNamedRateLimitingQueue( @@ -921,9 +921,9 @@ func testPodController(ctrl *gomock.Controller) ( }, mockIPAM, interfaceConfigurator } -// Create a test podController and start informerFactory. +// Create a test PodController and start informerFactory. func testPodControllerStart(ctrl *gomock.Controller) ( - *podController, *podwatchtesting.MockIPAMAllocator, + *PodController, *podwatchtesting.MockIPAMAllocator, *podwatchtesting.MockInterfaceConfigurator) { podController, mockIPAM, interfaceConfigurator := testPodController(ctrl) informerFactory := informers.NewSharedInformerFactory(podController.kubeClient, resyncPeriod) diff --git a/pkg/agent/secondarynetwork/podwatch/sriov.go b/pkg/agent/secondarynetwork/podwatch/sriov.go index da94428c5bd..cca4766604e 100644 --- a/pkg/agent/secondarynetwork/podwatch/sriov.go +++ b/pkg/agent/secondarynetwork/podwatch/sriov.go @@ -107,7 +107,7 @@ func getPodContainerDeviceIDs(podName string, podNamespace string) ([]string, er // which is still not associated with a network device name. // NOTE: buildVFDeviceIDListPerPod is called only if a Pod specific VF to Interface mapping cache // was not build earlier. Sample initial entry per Pod: "{18:01.1,""},{18:01.2,""},{18:01.3,""}" -func (pc *podController) buildVFDeviceIDListPerPod(podName, podNamespace string) ([]podSriovVFDeviceIDInfo, error) { +func (pc *PodController) buildVFDeviceIDListPerPod(podName, podNamespace string) ([]podSriovVFDeviceIDInfo, error) { podKey := podKeyGet(podName, podNamespace) deviceCache, cacheFound := pc.vfDeviceIDUsageMap.Load(podKey) if cacheFound { @@ -127,7 +127,7 @@ func (pc *podController) buildVFDeviceIDListPerPod(podName, podNamespace string) return vfDeviceIDInfoCache, nil } -func (pc *podController) deleteVFDeviceIDListPerPod(podName, podNamespace string) { +func (pc *PodController) deleteVFDeviceIDListPerPod(podName, podNamespace string) { podKey := podKeyGet(podName, podNamespace) _, cacheFound := pc.vfDeviceIDUsageMap.Load(podKey) if cacheFound { @@ -137,7 +137,7 @@ func (pc *podController) deleteVFDeviceIDListPerPod(podName, podNamespace string return } -func (pc *podController) releaseSriovVFDeviceID(podName, podNamespace, interfaceName string) { +func (pc *PodController) releaseSriovVFDeviceID(podName, podNamespace, interfaceName string) { podKey := podKeyGet(podName, podNamespace) obj, cacheFound := pc.vfDeviceIDUsageMap.Load(podKey) if !cacheFound { @@ -151,7 +151,7 @@ func (pc *podController) releaseSriovVFDeviceID(podName, podNamespace, interface } } -func (pc *podController) assignUnusedSriovVFDeviceID(podName, podNamespace, interfaceName string) (string, error) { +func (pc *PodController) assignUnusedSriovVFDeviceID(podName, podNamespace, interfaceName string) (string, error) { var cache []podSriovVFDeviceIDInfo cache, err := pc.buildVFDeviceIDListPerPod(podName, podNamespace) if err != nil { @@ -168,7 +168,7 @@ func (pc *podController) assignUnusedSriovVFDeviceID(podName, podNamespace, inte } // Configure SRIOV VF as a Secondary Network Interface. -func (pc *podController) configureSriovAsSecondaryInterface(pod *corev1.Pod, network *netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo, mtu int, result *current.Result) error { +func (pc *PodController) configureSriovAsSecondaryInterface(pod *corev1.Pod, network *netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo, mtu int, result *current.Result) error { podSriovVFDeviceID, err := pc.assignUnusedSriovVFDeviceID(pod.Name, pod.Namespace, network.InterfaceRequest) if err != nil { return err @@ -181,7 +181,7 @@ func (pc *podController) configureSriovAsSecondaryInterface(pod *corev1.Pod, net return nil } -func (pc *podController) deleteSriovSecondaryInterface(interfaceConfig *interfacestore.InterfaceConfig) error { +func (pc *PodController) deleteSriovSecondaryInterface(interfaceConfig *interfacestore.InterfaceConfig) error { // NOTE: SR-IOV VF interface clean-up will be handled by SR-IOV device plugin. The interface // is not deleted here. if err := pc.interfaceConfigurator.DeleteSriovSecondaryInterface(interfaceConfig); err != nil {