Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete secondaryNetwork OVS ports correctly after an Agent restart #6853

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,6 +646,19 @@ func run(o *Options) error {
}
}

// Secondary network Controller should be created before CNIServer.Run() to make sure no Pod CNI updates will be missed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Controller -> controller

var secondaryNetworkController *secondarynetwork.Controller
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
secondaryNetworkController, err = secondarynetwork.NewController(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer.Get(),
podUpdateChannel, ifaceStore,
&o.config.SecondaryNetwork, ovsdbConnection)
if err != nil {
return fmt.Errorf("failed to create secondary network controller: %w", err)
}
}

var traceflowController *traceflow.Controller
if features.DefaultFeatureGate.Enabled(features.Traceflow) {
traceflowController = traceflow.NewTraceflowController(
Expand Down Expand Up @@ -760,18 +773,6 @@ func run(o *Options) error {
go ipamController.Run(stopCh)
}

var secondaryNetworkController *secondarynetwork.Controller
if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
secondaryNetworkController, err = secondarynetwork.NewController(
o.config.ClientConnection, o.config.KubeAPIServerOverride,
k8sClient, localPodInformer.Get(),
podUpdateChannel,
&o.config.SecondaryNetwork, ovsdbConnection)
if err != nil {
return fmt.Errorf("failed to create secondary network controller: %w", err)
}
}

var bgpController *bgp.Controller
if features.DefaultFeatureGate.Enabled(features.BGPPolicy) {
bgpPolicyInformer := crdInformerFactory.Crd().V1alpha1().BGPPolicies()
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/secondarynetwork/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
componentbaseconfig "k8s.io/component-base/config"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
agentconfig "antrea.io/antrea/pkg/config/agent"
"antrea.io/antrea/pkg/ovs/ovsconfig"
Expand All @@ -47,6 +48,7 @@ func NewController(
k8sClient clientset.Interface,
podInformer cache.SharedIndexInformer,
podUpdateSubscriber channel.Subscriber,
pIfaceStore interfacestore.InterfaceStore,
secNetConfig *agentconfig.SecondaryNetworkConfig, ovsdb *ovsdb.OVSDB,
) (*Controller, error) {
ovsBridgeClient, err := createOVSBridge(secNetConfig.OVSBridges, ovsdb)
Expand All @@ -65,7 +67,7 @@ func NewController(
// k8s.v1.cni.cncf.io/networks Annotation defined.
podWatchController, err := podwatch.NewPodController(
k8sClient, netAttachDefClient, podInformer,
podUpdateSubscriber, ovsBridgeClient)
podUpdateSubscriber, pIfaceStore, ovsBridgeClient)
if err != nil {
return nil, err
}
Expand Down
92 changes: 92 additions & 0 deletions pkg/agent/secondarynetwork/podwatch/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewPodController(
netAttachDefClient netdefclient.K8sCniCncfIoV1Interface,
podInformer cache.SharedIndexInformer,
podUpdateSubscriber channel.Subscriber,
pIfaceStore interfacestore.InterfaceStore,
ovsBridgeClient ovsconfig.OVSBridgeClient,
) (*PodController, error) {
ifaceStore := interfacestore.NewInterfaceStore()
Expand Down Expand Up @@ -133,6 +134,18 @@ func NewPodController(
},
resyncPeriod,
)

err = pc.initializeSecondaryInterfaceStore()
if err != nil {
klog.ErrorS(err, "Failed to initialize the secondary interface store")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically we do not log if the returned error will be logged in the caller; instead we can append extra context to the returned error like: return fmt.Errorf("failed to initialize secondary interface store: %v", err)

return &pc, err
}

if err := pc.reconcileSecondaryInterfaces(pIfaceStore); err != nil {
klog.ErrorS(err, "Failed to restore CNI cache and reconcile secondary interfaces")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

return &pc, err
}

// podUpdateSubscriber can be nil with test code.
if podUpdateSubscriber != nil {
// Subscribe Pod CNI add/del events.
Expand Down Expand Up @@ -502,3 +515,82 @@ func checkForPodSecondaryNetworkAttachement(pod *corev1.Pod) (string, bool) {
return netObj, false
}
}

// initializeSecondaryInterfaceStore restore secondary interfacestore when agent restart.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restore -> restores

interfacestore -> "interface store" or "InterfaceStore"

restart -> restarts

func (pc *PodController) initializeSecondaryInterfaceStore() error {
if pc.ovsBridgeClient == nil {
return nil
}

ovsPorts, err := pc.ovsBridgeClient.GetPortList()
if err != nil {
klog.ErrorS(err, "Failed to list OVS ports for the secondary bridge", "bridgeName", pc.ovsBridgeClient.GetBridgeName())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, do we need to log here?

return err
}

ifaceList := make([]*interfacestore.InterfaceConfig, 0, len(ovsPorts))
for index := range ovsPorts {
port := &ovsPorts[index]
ovsPort := &interfacestore.OVSPortConfig{
PortUUID: port.UUID,
OFPort: port.OFPort,
}

interfaceType, ok := port.ExternalIDs[interfacestore.AntreaInterfaceTypeKey]
if !ok {
klog.InfoS("Interface type is not set for the secondary bridge", "interfaceName", port.Name)
continue
}

var intf *interfacestore.InterfaceConfig
switch interfaceType {
case interfacestore.AntreaContainer:
intf = cniserver.ParseOVSPortInterfaceConfig(port, ovsPort)
default:
klog.InfoS("Unknown Antrea interface type for the secondary bridge", "type", interfaceType)
KMAnju-2021 marked this conversation as resolved.
Show resolved Hide resolved
}

if intf != nil {
ifaceList = append(ifaceList, intf)
}

}

pc.interfaceStore.Initialize(ifaceList)
KMAnju-2021 marked this conversation as resolved.
Show resolved Hide resolved
klog.InfoS("Successfully initialized the secondary bridge interface store")

return nil
}

// reconcileSecondaryInterfaces restore cniCache when agent restart using primary interfacestore.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

restores

restarts

InterfaceStore

func (pc *PodController) reconcileSecondaryInterfaces(pIfaceStore interfacestore.InterfaceStore) error {
knownInterfaces := pIfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
for _, containerConfig := range knownInterfaces {
config := containerConfig.ContainerInterfaceConfig
podKey := podKeyGet(config.PodName, config.PodNamespace)

pc.cniCache.Store(podKey, &podCNIInfo{
containerID: config.ContainerID,
})
pc.queue.Add(podKey)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we queue the task? I feel it is not needed.

}

var staleInterfaces []*interfacestore.InterfaceConfig
// secondaryInterfaces is the list of interfaces currently in the secondary local cache and delete ports not in the CNI cache.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this sentence needs to be revised.

Probably make it:
// secondaryInterfaces is the list of interfaces currently in the secondary local cache.

And move // Deletes ports not in the CNI cache. to be before line 584?

secondaryInterfaces := pc.interfaceStore.GetInterfacesByType(interfacestore.ContainerInterface)
for _, containerConfig := range secondaryInterfaces {
_, exists := pIfaceStore.GetContainerInterface(containerConfig.ContainerID)
if !exists || containerConfig.OFPort == -1 {
staleInterfaces = append(staleInterfaces, containerConfig)
}
}

// If there are any stale interfaces, pass them to removeInterfaces()
if len(staleInterfaces) > 0 {
if err := pc.removeInterfaces(staleInterfaces); err != nil {
return fmt.Errorf("failed to remove stale interfaces: %w", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel we should just log the error, but no need to return it and fail the agent start.

Check https://github.com/antrea-io/antrea/blob/main/pkg/agent/cniserver/pod_configuration.go#L472

}
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/agent/secondarynetwork/podwatch/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestPodControllerRun(t *testing.T) {
client,
netdefclient,
informerFactory.Core().V1().Pods().Informer(),
nil, nil)
nil, nil, nil)
podController.interfaceConfigurator = interfaceConfigurator
podController.ipamAllocator = mockIPAM
cniCache := &podController.cniCache
Expand Down
Loading