Skip to content

Commit

Permalink
[SecondaryNetwork] Attach OVS uplink for after FlowRestoreComplete
Browse files Browse the repository at this point in the history
Physcial network interface is attached on the OVS bridge with SecondaryNetwork
feature enabled. Antrea uses a global configuration flow-restore-wait='true'
to ensure that OVS OpenFlow entries can start working after the dependencies are
ready. A connectivity issue exists if a setup uses the Node NIC as secondary
network interface and connects the NIC to OVS bridge before removing the
flow-restore-wait option.

This change ensures agent attaches the physical network interface to the secondary
OVS bridge after the global flow-restore-wait option is removed.

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd committed Jul 5, 2024
1 parent 75699cf commit 331a31c
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 70 deletions.
21 changes: 15 additions & 6 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,14 +731,15 @@ func run(o *Options) error {
go ipamController.Run(stopCh)
}

var secondaryNetworkController *secondarynetwork.Controller
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
defer secondarynetwork.RestoreHostInterfaceConfiguration(&o.config.SecondaryNetwork)
if err := secondarynetwork.Initialize(
secondaryNetworkController, err = secondarynetwork.NewController(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer.Get(), nodeConfig.Name,
podUpdateChannel, stopCh,
&o.config.SecondaryNetwork, ovsdbConnection); err != nil {
return fmt.Errorf("failed to initialize secondary network: %v", err)
k8sClient, localPodInformer.Get(),
podUpdateChannel,
&o.config.SecondaryNetwork, ovsdbConnection)
if err != nil {
return fmt.Errorf("failed to create secondary network controller: %w", err)
}
}

Expand Down Expand Up @@ -864,6 +865,14 @@ func run(o *Options) error {
return fmt.Errorf("failed to connect uplink to OVS bridge: %w", err)
}
}
// secondaryNetworkController Initialize must be run after FlowRestoreComplete.
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
defer secondaryNetworkController.RestoreHostInterfaceConfiguration()
if err = secondaryNetworkController.Initialize(); err != nil {
return fmt.Errorf("failed to initialize secondary network: %v", err)
}
go secondaryNetworkController.PodController.Run(stopCh)
}

// statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for
// NetworkPolicy stats and Multicast stats.
Expand Down
73 changes: 41 additions & 32 deletions pkg/agent/secondarynetwork/init_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,30 +43,56 @@ var (
newOVSBridgeFn = ovsconfig.NewOVSBridge
)

// Initialize sets up OVS bridges and starts the Pod controller for secondary networks.
func Initialize(
type Controller struct {
ovsBridgeClient ovsconfig.OVSBridgeClient
secNetConfig *agentconfig.SecondaryNetworkConfig
PodController *podwatch.PodController
}

func NewController(
clientConnectionConfig componentbaseconfig.ClientConnectionConfiguration,
kubeAPIServerOverride string,
k8sClient clientset.Interface,
podInformer cache.SharedIndexInformer,
nodeName string,
podUpdateSubscriber channel.Subscriber,
stopCh <-chan struct{},
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB) error {

secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB,
) (*Controller, error) {
ovsBridgeClient, err := createOVSBridge(secNetConfig.OVSBridges, ovsdb)
if err != nil {
return err
return nil, err
}

// Create the NetworkAttachmentDefinition client, which handles access to secondary network object
// definition from the API Server.
netAttachDefClient, err := createNetworkAttachDefClient(clientConnectionConfig, kubeAPIServerOverride)
if err != nil {
return nil, fmt.Errorf("NetworkAttachmentDefinition client creation failed: %v", err)
}

// Create podController to handle secondary network configuration for Pods with
// k8s.v1.cni.cncf.io/networks Annotation defined.
podWatchController, err := podwatch.NewPodController(
k8sClient, netAttachDefClient, podInformer,
podUpdateSubscriber, ovsBridgeClient)
if err != nil {
return nil, err
}
return &Controller{
ovsBridgeClient: ovsBridgeClient,
secNetConfig: secNetConfig,
PodController: podWatchController}, nil
}

// Initialize sets up OVS bridges.
func (c *Controller) Initialize() error {
// We only support moving and restoring of interface configuration to OVS Bridge for the single physical interface case.
if len(secNetConfig.OVSBridges) != 0 {
phyInterfaces := make([]string, len(secNetConfig.OVSBridges[0].PhysicalInterfaces))
copy(phyInterfaces, secNetConfig.OVSBridges[0].PhysicalInterfaces)
if len(c.secNetConfig.OVSBridges) != 0 {
phyInterfaces := make([]string, len(c.secNetConfig.OVSBridges[0].PhysicalInterfaces))
copy(phyInterfaces, c.secNetConfig.OVSBridges[0].PhysicalInterfaces)
if len(phyInterfaces) == 1 {

bridgedName, _, err := util.PrepareHostInterfaceConnection(
ovsBridgeClient,
c.ovsBridgeClient,
phyInterfaces[0],
0,
map[string]interface{}{
Expand All @@ -78,34 +104,17 @@ func Initialize(
}
phyInterfaces[0] = bridgedName
}
if err = connectPhyInterfacesToOVSBridge(ovsBridgeClient, phyInterfaces); err != nil {
if err := connectPhyInterfacesToOVSBridge(c.ovsBridgeClient, phyInterfaces); err != nil {
return err
}
}

// Create the NetworkAttachmentDefinition client, which handles access to secondary network object
// definition from the API Server.
netAttachDefClient, err := createNetworkAttachDefClient(clientConnectionConfig, kubeAPIServerOverride)
if err != nil {
return fmt.Errorf("NetworkAttachmentDefinition client creation failed: %v", err)
}

// Create podController to handle secondary network configuration for Pods with
// k8s.v1.cni.cncf.io/networks Annotation defined.
if podWatchController, err := podwatch.NewPodController(
k8sClient, netAttachDefClient, podInformer,
podUpdateSubscriber, ovsBridgeClient); err != nil {
return err
} else {
go podWatchController.Run(stopCh)
}
return nil
}

// RestoreHostInterfaceConfiguration restores interface configuration from secondary-bridge back to host-interface.
func RestoreHostInterfaceConfiguration(secNetConfig *agentconfig.SecondaryNetworkConfig) {
if len(secNetConfig.OVSBridges) != 0 && len(secNetConfig.OVSBridges[0].PhysicalInterfaces) == 1 {
util.RestoreHostInterfaceConfiguration(secNetConfig.OVSBridges[0].BridgeName, secNetConfig.OVSBridges[0].PhysicalInterfaces[0])
func (c *Controller) RestoreHostInterfaceConfiguration() {
if len(c.secNetConfig.OVSBridges) != 0 && len(c.secNetConfig.OVSBridges[0].PhysicalInterfaces) == 1 {
util.RestoreHostInterfaceConfiguration(c.secNetConfig.OVSBridges[0].BridgeName, c.secNetConfig.OVSBridges[0].PhysicalInterfaces[0])
}
}

Expand Down
20 changes: 14 additions & 6 deletions pkg/agent/secondarynetwork/init_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,30 @@ import (
"k8s.io/client-go/tools/cache"
componentbaseconfig "k8s.io/component-base/config"

"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
agentconfig "antrea.io/antrea/pkg/config/agent"
"antrea.io/antrea/pkg/util/channel"
)

func Initialize(
type Controller struct {
PodController *podwatch.PodController
}

func NewController(
clientConnectionConfig componentbaseconfig.ClientConnectionConfiguration,
kubeAPIServerOverride string,
k8sClient clientset.Interface,
podInformer cache.SharedIndexInformer,
nodeName string,
podUpdateSubscriber channel.Subscriber,
stopCh <-chan struct{},
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB) error {
return errors.New("not supported on Windows")
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB,
) (*Controller, error) {
return nil, errors.New("not supported on Windows")
}

func (c *Controller) Initialize() error {
return nil
}

func RestoreHostInterfaceConfiguration(secNetConfig *agentconfig.SecondaryNetworkConfig) {
func (c *Controller) RestoreHostInterfaceConfiguration() {
// Not supported on Windows.
}
26 changes: 13 additions & 13 deletions pkg/agent/secondarynetwork/podwatch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type podCNIInfo struct {
netNS string
}

type podController struct {
type PodController struct {
kubeClient clientset.Interface
netAttachDefClient netdefclient.K8sCniCncfIoV1Interface
queue workqueue.RateLimitingInterface
Expand All @@ -103,13 +103,13 @@ func NewPodController(
podInformer cache.SharedIndexInformer,
podUpdateSubscriber channel.Subscriber,
ovsBridgeClient ovsconfig.OVSBridgeClient,
) (*podController, error) {
) (*PodController, error) {
ifaceStore := interfacestore.NewInterfaceStore()
interfaceConfigurator, err := cniserver.NewSecondaryInterfaceConfigurator(ovsBridgeClient, ifaceStore)
if err != nil {
return nil, fmt.Errorf("failed to create SecondaryInterfaceConfigurator: %v", err)
}
pc := podController{
pc := PodController{
kubeClient: kubeClient,
netAttachDefClient: netAttachDefClient,
queue: workqueue.NewNamedRateLimitingQueue(
Expand Down Expand Up @@ -153,7 +153,7 @@ func allocatePodSecondaryIfaceName(usedIFNames sets.Set[string]) (string, error)
return "", fmt.Errorf("no more interface names")
}

func (pc *podController) enqueuePod(obj interface{}) {
func (pc *PodController) enqueuePod(obj interface{}) {
pod, isPod := obj.(*corev1.Pod)
if !isPod {
podDeletedState, ok := obj.(cache.DeletedFinalStateUnknown)
Expand All @@ -172,7 +172,7 @@ func (pc *podController) enqueuePod(obj interface{}) {
}

// processCNIUpdate will be called when CNIServer publishes a Pod update event.
func (pc *podController) processCNIUpdate(e interface{}) {
func (pc *PodController) processCNIUpdate(e interface{}) {
event := e.(types.PodUpdate)
podKey := podKeyGet(event.PodName, event.PodNamespace)
if event.IsAdd {
Expand All @@ -184,7 +184,7 @@ func (pc *podController) processCNIUpdate(e interface{}) {
}

// handleAddUpdatePod handles Pod Add, Update events and updates annotation if required.
func (pc *podController) handleAddUpdatePod(pod *corev1.Pod, podCNIInfo *podCNIInfo,
func (pc *PodController) handleAddUpdatePod(pod *corev1.Pod, podCNIInfo *podCNIInfo,
storedInterfaces []*interfacestore.InterfaceConfig) error {
if len(storedInterfaces) > 0 {
// We do not support secondary network update at the moment. Return as long as one
Expand Down Expand Up @@ -219,7 +219,7 @@ func (pc *podController) handleAddUpdatePod(pod *corev1.Pod, podCNIInfo *podCNII
return pc.configurePodSecondaryNetwork(pod, networklist, podCNIInfo)
}

func (pc *podController) removeInterfaces(interfaces []*interfacestore.InterfaceConfig) error {
func (pc *PodController) removeInterfaces(interfaces []*interfacestore.InterfaceConfig) error {
var savedErr error
for _, interfaceConfig := range interfaces {
podName := interfaceConfig.PodName
Expand Down Expand Up @@ -256,7 +256,7 @@ func (pc *podController) removeInterfaces(interfaces []*interfacestore.Interface
return savedErr
}

func (pc *podController) syncPod(key string) error {
func (pc *PodController) syncPod(key string) error {
var pod *corev1.Pod
var cniInfo *podCNIInfo
podExists := false
Expand Down Expand Up @@ -297,12 +297,12 @@ func (pc *podController) syncPod(key string) error {
return pc.handleAddUpdatePod(pod, cniInfo, storedInterfaces)
}

func (pc *podController) Worker() {
func (pc *PodController) Worker() {
for pc.processNextWorkItem() {
}
}

func (pc *podController) processNextWorkItem() bool {
func (pc *PodController) processNextWorkItem() bool {
obj, quit := pc.queue.Get()
if quit {
return false
Expand All @@ -319,7 +319,7 @@ func (pc *podController) processNextWorkItem() bool {
}

// Configure Secondary Network Interface.
func (pc *podController) configureSecondaryInterface(
func (pc *PodController) configureSecondaryInterface(
pod *corev1.Pod,
network *netdefv1.NetworkSelectionElement,
podCNIInfo *podCNIInfo,
Expand Down Expand Up @@ -370,7 +370,7 @@ func (pc *podController) configureSecondaryInterface(
return ifConfigErr
}

func (pc *podController) configurePodSecondaryNetwork(pod *corev1.Pod, networkList []*netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo) error {
func (pc *PodController) configurePodSecondaryNetwork(pod *corev1.Pod, networkList []*netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo) error {
usedIFNames := sets.New[string]()
for _, network := range networkList {
if network.InterfaceRequest != "" {
Expand Down Expand Up @@ -477,7 +477,7 @@ func validateNetworkConfig(cniConfig []byte) (*SecondaryNetworkConfig, error) {
return &networkConfig, nil
}

func (pc *podController) Run(stopCh <-chan struct{}) {
func (pc *PodController) Run(stopCh <-chan struct{}) {
defer func() {
klog.InfoS("Shutting down", "controller", controllerName)
pc.queue.ShutDown()
Expand Down
14 changes: 7 additions & 7 deletions pkg/agent/secondarynetwork/podwatch/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ func TestPodControllerAddPod(t *testing.T) {
podKey := podKeyGet(podName, testNamespace)

// Create Pod and wait for Informer cache updated.
createPodFn := func(pc *podController, pod *corev1.Pod) {
createPodFn := func(pc *PodController, pod *corev1.Pod) {
_, err := pc.kubeClient.CoreV1().Pods(testNamespace).Create(context.Background(),
pod, metav1.CreateOptions{})
require.NoError(t, err, "error when creating test Pod")
Expand All @@ -596,7 +596,7 @@ func TestPodControllerAddPod(t *testing.T) {
return ok == true && err == nil
}, 1*time.Second, 10*time.Millisecond)
}
deletePodFn := func(pc *podController, podName string) {
deletePodFn := func(pc *PodController, podName string) {
require.NoError(t, pc.kubeClient.CoreV1().Pods(testNamespace).Delete(context.Background(),
podName, metav1.DeleteOptions{}), "error when deleting test Pod")
assert.Eventually(t, func() bool {
Expand Down Expand Up @@ -899,16 +899,16 @@ func TestPodControllerAddPod(t *testing.T) {
}

func testPodController(ctrl *gomock.Controller) (
*podController, *podwatchtesting.MockIPAMAllocator,
*PodController, *podwatchtesting.MockIPAMAllocator,
*podwatchtesting.MockInterfaceConfigurator) {
client := fake.NewSimpleClientset()
netdefclient := netdefclientfake.NewSimpleClientset().K8sCniCncfIoV1()
informerFactory := informers.NewSharedInformerFactory(client, resyncPeriod)
interfaceConfigurator := podwatchtesting.NewMockInterfaceConfigurator(ctrl)
mockIPAM := podwatchtesting.NewMockIPAMAllocator(ctrl)

// podController without event handlers.
return &podController{
// PodController without event handlers.
return &PodController{
kubeClient: client,
netAttachDefClient: netdefclient,
queue: workqueue.NewNamedRateLimitingQueue(
Expand All @@ -921,9 +921,9 @@ func testPodController(ctrl *gomock.Controller) (
}, mockIPAM, interfaceConfigurator
}

// Create a test podController and start informerFactory.
// Create a test PodController and start informerFactory.
func testPodControllerStart(ctrl *gomock.Controller) (
*podController, *podwatchtesting.MockIPAMAllocator,
*PodController, *podwatchtesting.MockIPAMAllocator,
*podwatchtesting.MockInterfaceConfigurator) {
podController, mockIPAM, interfaceConfigurator := testPodController(ctrl)
informerFactory := informers.NewSharedInformerFactory(podController.kubeClient, resyncPeriod)
Expand Down
12 changes: 6 additions & 6 deletions pkg/agent/secondarynetwork/podwatch/sriov.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func getPodContainerDeviceIDs(podName string, podNamespace string) ([]string, er
// which is still not associated with a network device name.
// NOTE: buildVFDeviceIDListPerPod is called only if a Pod specific VF to Interface mapping cache
// was not build earlier. Sample initial entry per Pod: "{18:01.1,""},{18:01.2,""},{18:01.3,""}"
func (pc *podController) buildVFDeviceIDListPerPod(podName, podNamespace string) ([]podSriovVFDeviceIDInfo, error) {
func (pc *PodController) buildVFDeviceIDListPerPod(podName, podNamespace string) ([]podSriovVFDeviceIDInfo, error) {
podKey := podKeyGet(podName, podNamespace)
deviceCache, cacheFound := pc.vfDeviceIDUsageMap.Load(podKey)
if cacheFound {
Expand All @@ -127,7 +127,7 @@ func (pc *podController) buildVFDeviceIDListPerPod(podName, podNamespace string)
return vfDeviceIDInfoCache, nil
}

func (pc *podController) deleteVFDeviceIDListPerPod(podName, podNamespace string) {
func (pc *PodController) deleteVFDeviceIDListPerPod(podName, podNamespace string) {
podKey := podKeyGet(podName, podNamespace)
_, cacheFound := pc.vfDeviceIDUsageMap.Load(podKey)
if cacheFound {
Expand All @@ -137,7 +137,7 @@ func (pc *podController) deleteVFDeviceIDListPerPod(podName, podNamespace string
return
}

func (pc *podController) releaseSriovVFDeviceID(podName, podNamespace, interfaceName string) {
func (pc *PodController) releaseSriovVFDeviceID(podName, podNamespace, interfaceName string) {
podKey := podKeyGet(podName, podNamespace)
obj, cacheFound := pc.vfDeviceIDUsageMap.Load(podKey)
if !cacheFound {
Expand All @@ -151,7 +151,7 @@ func (pc *podController) releaseSriovVFDeviceID(podName, podNamespace, interface
}
}

func (pc *podController) assignUnusedSriovVFDeviceID(podName, podNamespace, interfaceName string) (string, error) {
func (pc *PodController) assignUnusedSriovVFDeviceID(podName, podNamespace, interfaceName string) (string, error) {
var cache []podSriovVFDeviceIDInfo
cache, err := pc.buildVFDeviceIDListPerPod(podName, podNamespace)
if err != nil {
Expand All @@ -168,7 +168,7 @@ func (pc *podController) assignUnusedSriovVFDeviceID(podName, podNamespace, inte
}

// Configure SRIOV VF as a Secondary Network Interface.
func (pc *podController) configureSriovAsSecondaryInterface(pod *corev1.Pod, network *netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo, mtu int, result *current.Result) error {
func (pc *PodController) configureSriovAsSecondaryInterface(pod *corev1.Pod, network *netdefv1.NetworkSelectionElement, podCNIInfo *podCNIInfo, mtu int, result *current.Result) error {
podSriovVFDeviceID, err := pc.assignUnusedSriovVFDeviceID(pod.Name, pod.Namespace, network.InterfaceRequest)
if err != nil {
return err
Expand All @@ -181,7 +181,7 @@ func (pc *podController) configureSriovAsSecondaryInterface(pod *corev1.Pod, net
return nil
}

func (pc *podController) deleteSriovSecondaryInterface(interfaceConfig *interfacestore.InterfaceConfig) error {
func (pc *PodController) deleteSriovSecondaryInterface(interfaceConfig *interfacestore.InterfaceConfig) error {
// NOTE: SR-IOV VF interface clean-up will be handled by SR-IOV device plugin. The interface
// is not deleted here.
if err := pc.interfaceConfigurator.DeleteSriovSecondaryInterface(interfaceConfig); err != nil {
Expand Down

0 comments on commit 331a31c

Please sign in to comment.