Skip to content

Commit

Permalink
add process for lb-svc ports update
Browse files Browse the repository at this point in the history
Signed-off-by: 马洪贞 <[email protected]>
  • Loading branch information
hongzhen-ma committed Nov 6, 2024
1 parent 4ee107a commit 34cb0dc
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 13 deletions.
29 changes: 25 additions & 4 deletions dist/images/vpcnatgateway/lb-svc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,43 @@ function add_dnat() {
done
}

function del_dnat() {
for rule in $@
do
arr=(${rule//,/ })
eip=(${arr[0]//\// })
dport=${arr[1]}
protocol=${arr[2]}
internalIp=${arr[3]}
internalPort=${arr[4]}

checkRule="-d $eip -p $protocol --dport $dport -j DNAT --to-destination $internalIp:$internalPort"
if iptables -t nat -C PREROUTING $checkRule > /dev/null 2>&1; then
exec_cmd "iptables -t nat -D PREROUTING -d $eip -p $protocol --dport $dport -j DNAT --to-destination $internalIp:$internalPort"
fi
done
}

rules=${@:2:${#}}
opt=$1
case $opt in
init)
init)
echo "init $rules"
init $rules
;;
eip-add)
eip-add)
echo "eip-add $rules"
add_eip $rules
;;
dnat-add)
dnat-add)
echo "dnat-add $rules"
add_dnat $rules
;;
*)
dnat-del)
echo "dnat-del rules"
del_dnat $rules
;;
*)
echo "Usage: $0 [init|eip-add|dnat-add] ..."
exit 1
;;
Expand Down
74 changes: 66 additions & 8 deletions pkg/controller/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"reflect"
"strings"
"time"

Expand All @@ -25,6 +26,12 @@ type vpcService struct {
Svc *v1.Service
}

type updateSvcObject struct {
key string
oldPorts []v1.ServicePort
newPorts []v1.ServicePort
}

func (c *Controller) enqueueAddService(obj interface{}) {
var key string
var err error
Expand Down Expand Up @@ -122,7 +129,12 @@ func (c *Controller) enqueueUpdateService(oldObj, newObj interface{}) {
key = strings.Join([]string{key, ipsToDelStr}, "#")
}

c.updateServiceQueue.Add(key)
updateSvc := &updateSvcObject{
key: key,
oldPorts: oldSvc.Spec.Ports,
newPorts: newSvc.Spec.Ports,
}
c.updateServiceQueue.Add(updateSvc)
}

func (c *Controller) runAddServiceWorker() {
Expand Down Expand Up @@ -208,16 +220,16 @@ func (c *Controller) processNextUpdateServiceWorkItem() bool {

err := func(obj interface{}) error {
defer c.updateServiceQueue.Done(obj)
var key string
var svcObject *updateSvcObject
var ok bool
if key, ok = obj.(string); !ok {
if svcObject, ok = obj.(*updateSvcObject); !ok {
c.updateServiceQueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
utilruntime.HandleError(fmt.Errorf("expected updateSvcObject in workqueue but got %#v", obj))
return nil
}
if err := c.handleUpdateService(key); err != nil {
c.updateServiceQueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
if err := c.handleUpdateService(svcObject); err != nil {
c.updateServiceQueue.AddRateLimited(svcObject)
return fmt.Errorf("error syncing '%s': %s, requeuing", svcObject.key, err.Error())
}
c.updateServiceQueue.Forget(obj)
return nil
Expand Down Expand Up @@ -297,7 +309,8 @@ func (c *Controller) handleDeleteService(service *vpcService) error {
return nil
}

func (c *Controller) handleUpdateService(key string) error {
func (c *Controller) handleUpdateService(svcObject *updateSvcObject) error {
key := svcObject.key
keys := strings.Split(key, "#")
key = keys[0]
var ipsToDel []string
Expand Down Expand Up @@ -431,6 +444,34 @@ func (c *Controller) handleUpdateService(key string) error {
if needUpdateEndpointQueue {
c.updateEndpointQueue.Add(key)
}

if c.config.EnableLbSvc && svc.Spec.Type == v1.ServiceTypeLoadBalancer {
changed, err := c.checkLbSvcDeployAnnotationChanged(svc)
if err != nil {
klog.Errorf("failed to check annotation change for lb svc %s: %v", key, err)
return err
}

// only process svc.spec.ports update
if !changed {
pod, err := c.getLbSvcPod(name, namespace)
if err != nil {
klog.Errorf("failed to get pod for lb svc %s: %v", key, err)
return err
}

toDel := diffSvcPorts(svcObject.oldPorts, svcObject.newPorts)
if err := c.delDnatRules(pod, toDel, svc); err != nil {
klog.Errorf("failed to delete dnat rules, err: %v", err)
return err
}
if err = c.updatePodAttachNets(pod, svc); err != nil {
klog.Errorf("failed to update pod attachment network for lb svc %s: %v", key, err)
return err
}
}
}

return nil
}

Expand Down Expand Up @@ -549,3 +590,20 @@ func getVipIps(svc *v1.Service) []string {
}
return ips
}

func diffSvcPorts(oldPorts, newPorts []v1.ServicePort) (toDel []v1.ServicePort) {
for _, oldPort := range oldPorts {
found := false
for _, newPort := range newPorts {
if reflect.DeepEqual(oldPort, newPort) {
found = true
break
}
}
if !found {
toDel = append(toDel, oldPort)
}
}

return toDel
}
52 changes: 51 additions & 1 deletion pkg/controller/service_lb.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
initRouteTable = "init"
podEIPAdd = "eip-add"
podDNATAdd = "dnat-add"
podDNATDel = "dnat-del"
attachmentName = "lb-svc-attachment"
attachmentNs = "kube-system"
)
Expand Down Expand Up @@ -327,7 +328,11 @@ func (c *Controller) updatePodAttachNets(pod *corev1.Pod, svc *corev1.Service) e
}

var rules []string
rules = append(rules, fmt.Sprintf("%s,%d,%s,%s,%d,%s", loadBalancerIP, port.Port, protocol, svc.Spec.ClusterIP, port.Port, defaultGateway))
targetPort := port.TargetPort.IntValue()
if targetPort == 0 {
targetPort = int(port.Port)
}
rules = append(rules, fmt.Sprintf("%s,%d,%s,%s,%d,%s", loadBalancerIP, port.Port, protocol, svc.Spec.ClusterIP, targetPort, defaultGateway))
klog.Infof("add dnat rules for lb svc pod, %v", rules)
if err := c.execNatRules(pod, podDNATAdd, rules); err != nil {
klog.Errorf("failed to add dnat for pod, err: %v", err)
Expand All @@ -337,3 +342,48 @@ func (c *Controller) updatePodAttachNets(pod *corev1.Pod, svc *corev1.Service) e

return nil
}

func (c *Controller) checkLbSvcDeployAnnotationChanged(svc *corev1.Service) (bool, error) {
deployName := genLbSvcDpName(svc.Name)
deploy, err := c.config.KubeClient.AppsV1().Deployments(svc.Namespace).Get(context.Background(), deployName, metav1.GetOptions{})
if err != nil {
return false, err
}

if newDeploy := c.updateLbSvcDeployment(svc, deploy); newDeploy == nil {
klog.V(3).Infof("no need to update deployment %s/%s", deploy.Namespace, deploy.Name)
return false, nil
}
return true, nil
}

func (c *Controller) delDnatRules(pod *corev1.Pod, toDel []corev1.ServicePort, svc *corev1.Service) error {
providerName := getAttachNetworkProvider(svc)
attachIPAnnotation := fmt.Sprintf(util.IPAddressAnnotationTemplate, providerName)
loadBalancerIP := pod.Annotations[attachIPAnnotation]

for _, port := range toDel {
var protocol string
switch port.Protocol {
case corev1.ProtocolTCP:
protocol = util.ProtocolTCP
case corev1.ProtocolUDP:
protocol = util.ProtocolUDP
case corev1.ProtocolSCTP:
protocol = util.ProtocolSCTP
}

var rules []string
targetPort := port.TargetPort.IntValue()
if targetPort == 0 {
targetPort = int(port.Port)
}
rules = append(rules, fmt.Sprintf("%s,%d,%s,%s,%d", loadBalancerIP, port.Port, protocol, svc.Spec.ClusterIP, targetPort))
klog.Infof("delete dnat rules for lb svc pod, %v", rules)
if err := c.execNatRules(pod, podDNATDel, rules); err != nil {
klog.Errorf("failed to del dnat rules for pod, err: %v", err)
return err
}
}
return nil
}

0 comments on commit 34cb0dc

Please sign in to comment.