Skip to content

Commit

Permalink
[Windows] CNI Server installs OpenFlow entries after PortStatus messa…
Browse files Browse the repository at this point in the history
…ge is received

Signed-off-by: Wenying Dong <[email protected]>
  • Loading branch information
wenyingd committed Oct 24, 2024
1 parent c722fc3 commit 9ee313d
Show file tree
Hide file tree
Showing 21 changed files with 886 additions and 126 deletions.
9 changes: 4 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ module antrea.io/antrea
go 1.23.0

require (
antrea.io/libOpenflow v0.14.0
antrea.io/ofnet v0.12.0
antrea.io/libOpenflow v0.15.0
antrea.io/ofnet v0.14.0
github.com/ClickHouse/clickhouse-go/v2 v2.6.1
github.com/DATA-DOG/go-sqlmock v1.5.2
github.com/Mellanox/sriovnet v1.1.0
Expand Down Expand Up @@ -113,10 +113,9 @@ require (
github.com/aws/smithy-go v1.12.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenk/hub v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cenkalti/hub v1.0.1 // indirect
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa // indirect
github.com/cenkalti/hub v1.0.2 // indirect
github.com/cenkalti/rpc2 v1.0.3 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/chai2010/gettext-go v1.0.2 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
Expand Down
18 changes: 8 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
antrea.io/libOpenflow v0.14.0 h1:6MS1E52nGQyz/AJ8j3OrotgAc/1ubef5vc5i8ytIxFE=
antrea.io/libOpenflow v0.14.0/go.mod h1:U8YR0ithWrjwLdUUhyeCsYnlKg/mEFjG5CbPNt1V+j0=
antrea.io/ofnet v0.12.0 h1:pgygAsEZJUPK/kGmeuIesDh5hoGLpYeavSLdG/Dp8Ao=
antrea.io/ofnet v0.12.0/go.mod h1:MB3qaInEimj+T8qtpBcIQK+EqeNiY1S/WbUdGk+TzNg=
antrea.io/libOpenflow v0.15.0 h1:wGk+IVCf8piGZgC4+lbf4qfGrJG5ikzfq5Y1T5LzqmI=
antrea.io/libOpenflow v0.15.0/go.mod h1:Mq1JEjYrb6eTVA7qjZRj9plVTKcsLM8wnQ87sLLYuiQ=
antrea.io/ofnet v0.14.0 h1:BGOqg5DdRkvxpBqyoEgWmvGd4EvpacdU/Py1s6qOvSc=
antrea.io/ofnet v0.14.0/go.mod h1:W5JPYFFcRM7tLwsItgmsKqIhtW/QofyIeNsUIecFaBo=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
Expand Down Expand Up @@ -114,14 +114,12 @@ github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdn
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/blang/semver/v4 v4.0.0 h1:1PFHFE6yCCTv8C1TeyNNarDzntLi7wMI5i/pzqYIsAM=
github.com/blang/semver/v4 v4.0.0/go.mod h1:IbckMUScFkM3pff0VJDNKRiT6TG/YpiHIM2yvyW5YoQ=
github.com/cenk/hub v1.0.1 h1:RBwXNOF4a8KjD8BJ08XqN8KbrqaGiQLDrgvUGJSHuPA=
github.com/cenk/hub v1.0.1/go.mod h1:rJM1LNAW0ppT8FMMuPK6c2NP/R2nH/UthtuRySSaf6Y=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/cenkalti/hub v1.0.1 h1:UMtjc6dHSaOQTO15SVA50MBIR9zQwvsukQupDrkIRtg=
github.com/cenkalti/hub v1.0.1/go.mod h1:tcYwtS3a2d9NO/0xDXVJWx3IedurUjYCqFCmpi0lpHs=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa h1:t+iWhuJE2aropY4uxKMVbyP+IJ29o422f7YAd73aTjg=
github.com/cenkalti/rpc2 v0.0.0-20180727162946-9642ea02d0aa/go.mod h1:v2npkhrXyk5BCnkNIiPdRI23Uq6uWPUQGL2hnRcRr/M=
github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8=
github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU=
github.com/cenkalti/rpc2 v1.0.3 h1:OkMsNP/sP9seN1VRCLqhX1xkVGHPoLwWS6fZR14Ji/k=
github.com/cenkalti/rpc2 v1.0.3/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
Expand Down
204 changes: 197 additions & 7 deletions pkg/agent/cniserver/pod_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,24 @@
package cniserver

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net"
"strings"
"sync"
"time"

"antrea.io/libOpenflow/openflow15"
current "github.com/containernetworking/cni/pkg/types/100"
"github.com/containernetworking/cni/pkg/version"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
Expand Down Expand Up @@ -59,8 +67,15 @@ const (
defaultIFDevName = "eth0"
)

const (
podNotReadyTimeInSeconds = 30
)

var (
getNSPath = util.GetNSPath

// retryInterval is the interval to re-install Pod OpenFlow entries if any error happened.
retryInterval = time.Second * 5
)

type podConfigurator struct {
Expand All @@ -76,9 +91,11 @@ type podConfigurator struct {
// isSecondaryNetwork is true if this instance of podConfigurator is used to configure
// Pod secondary network interfaces.
isSecondaryNetwork bool
podIfMonitor *podIfaceMonitor
}

func newPodConfigurator(
kubeClient clientset.Interface,
ovsBridgeClient ovsconfig.OVSBridgeClient,
ofClient openflow.Client,
routeClient route.Interface,
Expand All @@ -93,6 +110,7 @@ func newPodConfigurator(
if err != nil {
return nil, err
}
ifMonitor := newPodInterfaceMonitor(kubeClient, ofClient, ifaceStore, podUpdateNotifier)
return &podConfigurator{
ovsBridgeClient: ovsBridgeClient,
ofClient: ofClient,
Expand All @@ -101,6 +119,7 @@ func newPodConfigurator(
gatewayMAC: gatewayMAC,
ifConfigurator: ifConfigurator,
podUpdateNotifier: podUpdateNotifier,
podIfMonitor: ifMonitor,
}, nil
}

Expand Down Expand Up @@ -166,13 +185,13 @@ func getContainerIPsString(ips []net.IP) string {
// not created for a Pod interface.
func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *interfacestore.OVSPortConfig) *interfacestore.InterfaceConfig {
if portData.ExternalIDs == nil {
klog.V(2).Infof("OVS port %s has no external_ids", portData.Name)
klog.V(2).InfoS("OVS port has no external_ids", "port", portData.Name)
return nil
}

containerID, found := portData.ExternalIDs[ovsExternalIDContainerID]
if !found {
klog.V(2).Infof("OVS port %s has no %s in external_ids", portData.Name, ovsExternalIDContainerID)
klog.V(2).InfoS("OVS port has no containerID in external_ids", "port", portData.Name)
return nil
}

Expand All @@ -187,8 +206,7 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in

containerMAC, err := net.ParseMAC(portData.ExternalIDs[ovsExternalIDMAC])
if err != nil {
klog.Errorf("Failed to parse MAC address from OVS external config %s: %v",
portData.ExternalIDs[ovsExternalIDMAC], err)
klog.ErrorS(err, "Failed to parse MAC address from OVS external config")
}
podName, _ := portData.ExternalIDs[ovsExternalIDPodName]
podNamespace, _ := portData.ExternalIDs[ovsExternalIDPodNamespace]
Expand Down Expand Up @@ -279,7 +297,7 @@ func (pc *podConfigurator) createOVSPort(ovsPortName string, ovsAttachInfo map[s
func (pc *podConfigurator) removeInterfaces(containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}

Expand Down Expand Up @@ -498,7 +516,7 @@ func (pc *podConfigurator) reconcile(pods []corev1.Pod, containerAccess *contain
// disconnectInterfaceFromOVS disconnects an existing interface from ovs br-int.
func (pc *podConfigurator) disconnectInterfaceFromOVS(containerConfig *interfacestore.InterfaceConfig) error {
containerID := containerConfig.ContainerID
klog.V(2).Infof("Deleting Openflow entries for container %s", containerID)
klog.V(2).InfoS("Deleting Openflow entries for container", "container", containerID)
if !pc.isSecondaryNetwork {
if err := pc.ofClient.UninstallPodFlows(containerConfig.InterfaceName); err != nil {
return fmt.Errorf("failed to delete Openflow entries for container %s: %v", containerID, err)
Expand Down Expand Up @@ -558,7 +576,7 @@ func (pc *podConfigurator) connectInterceptedInterface(
func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace, containerID string) error {
containerConfig, found := pc.ifaceStore.GetContainerInterface(containerID)
if !found {
klog.V(2).Infof("Did not find the port for container %s in local cache", containerID)
klog.V(2).InfoS("Did not find the port for container in local cache", "container", containerID)
return nil
}
for _, ip := range containerConfig.IPs {
Expand All @@ -570,3 +588,175 @@ func (pc *podConfigurator) disconnectInterceptedInterface(podName, podNamespace,
return pc.disconnectInterfaceFromOVS(containerConfig)
// TODO recover pre-connect state? repatch vethpair to original bridge etc ?? to make first CNI happy??
}

type unReadyPodInfo struct {
podName string
podNamespace string
flowInstalled bool
createTime time.Time
}

type podIfaceMonitor struct {
kubeClient clientset.Interface
ifaceStore interfacestore.InterfaceStore
ofClient openflow.Client
podUpdateNotifier channel.Notifier

// unReadyInterfaces is a map to store the OVS ports which is waiting for the PortStatus from OpenFlow switch.
// The key in the map is the OVS port name, and its value is unReadyPodInfo.
// It is used only on Windows now.
unReadyInterfaces map[string]*unReadyPodInfo
statusCh chan *openflow15.PortStatus
}

func (m *podIfaceMonitor) Run(stopCh <-chan struct{}) {
klog.Info("Starting the monitor to wait for new OpenFlow ports")
go func() {
ticker := time.Tick(time.Second * 5)
for {
select {
case <-stopCh:
return
case status := <-m.statusCh:
klog.V(2).InfoS("Received PortStatus message", "message", status)
// Update Pod OpenFlow entries only after the OpenFlow port state is live.
if status.Desc.State == openflow15.PS_LIVE {
m.updateUnReadyPod(status)
}
case <-ticker:
m.checkUnReadyPods()
}
}
}()
}

func (m *podIfaceMonitor) updatePodFlows(ifName string, ofPort int32) error {
ifConfig, found := m.ifaceStore.GetInterfaceByName(ifName)
if !found {
return fmt.Errorf("interface config %s is not found in local cache", ifName)
}

// Update interface config with the ofPort.
ifConfig.OVSPortConfig.OFPort = ofPort
m.ifaceStore.UpdateInterface(ifConfig)

// Install OpenFlow entries for the Pod.
klog.V(2).InfoS("Setting up Openflow entries for OVS port", "port", ifName)
if err := m.ofClient.InstallPodFlows(ifName, ifConfig.IPs, ifConfig.MAC, uint32(ofPort), ifConfig.VLANID, nil); err != nil {
return fmt.Errorf("failed to add Openflow entries for OVS port %s: %v", ifName, err)
}

// Notify the Pod update event to required components.
event := agenttypes.PodUpdate{
PodName: ifConfig.PodName,
PodNamespace: ifConfig.PodNamespace,
IsAdd: true,
ContainerID: ifConfig.ContainerID,
}
m.podUpdateNotifier.Notify(event)
return nil
}

func (m *podIfaceMonitor) updatePodUnreadyAnnotation(ctx context.Context, pod *corev1.Pod, addAnnotation bool) error {
podName, podNamespace := pod.Name, pod.Namespace
annotated := false
if pod.Annotations != nil {
_, annotated = pod.Annotations[agenttypes.PodNotReadyAnnotationKey]
}

if addAnnotation && !annotated {
// Add the annotation on Pod with '"pod.antrea.io/not-ready": ""'
patch, _ := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{agenttypes.PodNotReadyAnnotationKey: ""},
},
})
_, err := m.kubeClient.CoreV1().Pods(podNamespace).Patch(ctx, podName, apitypes.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return err
}
} else if !addAnnotation && annotated {
// Remove the annotation on Pod with '"pod.antrea.io/not-ready": ""'
patch, _ := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{agenttypes.PodNotReadyAnnotationKey: nil},
},
})
_, err := m.kubeClient.CoreV1().Pods(podNamespace).Patch(context.Background(), podName, apitypes.MergePatchType, patch, metav1.PatchOptions{})
if err != nil {
return err
}
}
return nil
}

func (m *podIfaceMonitor) updateUnReadyPod(status *openflow15.PortStatus) {
ovsPort := string(bytes.Trim(status.Desc.Name, "\x00"))
podInfo, found := m.unReadyInterfaces[ovsPort]
if !found {
klog.InfoS("OVS port is not found", "ovsPort", ovsPort)
return
}

ofPort := status.Desc.PortNo
if err := m.updatePodFlows(ovsPort, int32(ofPort)); err != nil {
klog.ErrorS(err, "Failed to update Pod's OpenFlow entries, requeue the PortStatus message after 10s", "PodName", podInfo.podName, "PodNamespace", podInfo.podNamespace, "OVSPort", ovsPort)
m.reQueuePortStatus(status)
return
}

podInfo.flowInstalled = true
}

func (m *podIfaceMonitor) checkUnReadyPods() {
for key, podInfo := range m.unReadyInterfaces {
ctx := context.Background()
pod, err := m.kubeClient.CoreV1().Pods(podInfo.podNamespace).Get(ctx, podInfo.podName, metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
delete(m.unReadyInterfaces, key)
}
klog.ErrorS(err, "Failed to get Pod, retrying", "Namespace", podInfo.podNamespace, "Name", podInfo.podName)
return
}

if podInfo.flowInstalled {
// Remove "not-ready" annotation from Pod
if err = m.updatePodUnreadyAnnotation(ctx, pod, false); err != nil {
klog.ErrorS(err, "Failed to patch Pod by removing not-ready annotation", "Namespace", podInfo.podNamespace, "name", podInfo.podName)
return
}
// Remove un-ready pod from local cache.
delete(m.unReadyInterfaces, key)
return
}

if time.Now().Sub(podInfo.createTime).Seconds() > podNotReadyTimeInSeconds {
// Add "not-ready" annotation on Pod if it is not ready within 30s.
if err = m.updatePodUnreadyAnnotation(ctx, pod, true); err != nil {
klog.ErrorS(err, "Failed to patch Pod by adding not-ready annotation", "Namespace", podInfo.podNamespace, "name", podInfo.podName)
}
return
}
}
}

func (m *podIfaceMonitor) addUnReadyPodInterface(ifConfig *interfacestore.InterfaceConfig) {
klog.InfoS("Added OVS port into unready interfaces", "ovsPort", ifConfig.InterfaceName,
"podName", ifConfig.PodName, "podNamespace", ifConfig.PodNamespace)
m.unReadyInterfaces[ifConfig.InterfaceName] = &unReadyPodInfo{
podName: ifConfig.PodName,
podNamespace: ifConfig.PodNamespace,
flowInstalled: false,
createTime: time.Now(),
}
}

func (m *podIfaceMonitor) reQueuePortStatus(status *openflow15.PortStatus) {
go func() {
select {
case <-time.After(retryInterval):
m.statusCh <- status
}
}()
}
11 changes: 11 additions & 0 deletions pkg/agent/cniserver/pod_configuration_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
"fmt"

current "github.com/containernetworking/cni/pkg/types/100"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"

"antrea.io/antrea/pkg/agent/cniserver/ipam"
"antrea.io/antrea/pkg/agent/interfacestore"
"antrea.io/antrea/pkg/agent/openflow"
agenttypes "antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/util/channel"
)

// connectInterfaceToOVS connects an existing interface to the OVS bridge.
Expand Down Expand Up @@ -113,3 +116,11 @@ func (pc *podConfigurator) reconcileMissingPods(ifConfigs []*interfacestore.Inte
klog.Warningf("Interface for Pod %s/%s not found in the interface store", ifaceConfig.PodNamespace, ifaceConfig.PodName)
}
}

func newPodInterfaceMonitor(kubeClient clientset.Interface,
_ openflow.Client,
_ interfacestore.InterfaceStore,
_ channel.Notifier,
) *podIfaceMonitor {
return nil
}
Loading

0 comments on commit 9ee313d

Please sign in to comment.