Skip to content

Commit

Permalink
Ensure full functionality of AntreaProxy with proxyAll enabled when k…
Browse files Browse the repository at this point in the history
…ube-proxy presents

To ensure the full functionality of AntreaProxy with `proxyAll` enabled even when
kube-proxy presents, there are some key changes when `proxyAll` is enabled:

The jump rules for the chains managed by Antrea, `ANTREA-PREROUTING` and `ANTREA-OUTPUT`
in nat table, are installed by inserting instead of appending to bypass the chain
`KUBE-SERVICES` performing Service DNAT managed by kube-proxy. Antrea ensures that
the jump rules take precedence over those managed by kube-proxy.

The iptables rules of nat table chain `ANTREA-PREROUTING` are like below, and they are
similar in chain `ANTREA-OUTPUT`.

```
1. -A ANTREA-PREROUTING -m comment --comment "Antrea: bypass external to kube Service traffic when kube Service Endpoint is not override" -d 10.96.0.1 -p tcp --dport 443 -j KUBE-SERVICES
2. -A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to ClusterIP packets" -d 10.96.0.0/12 -j ACCEPT
3. -A ANTREA-PREROUTING -m comment --comment "Antrea: DNAT external to NodePort packets" -m set --match-set ANTREA-NODEPORT-IP dst,dst -j DNAT --to-destination 169.254.0.252
4. -A ANTREA-PREROUTING -m comment --comment "Antrea: accept external to external IP packets" -m set --match-set ANTREA-EXTERNAL-IP dst -j ACCEPT
```

- Rule 1 is to bypass kube Service when `kubeAPIServerOverride` is not set.
- Rule 2 is to accept ClusterIP traffic and bypass chain `KUBE-SERVICES`. The Service
  CIDR is got from `serviceCIDRProvider`. **TODO: whether to sync iptables immediately
  after the update of Service CIDR.**
- Rule 3 is to accept NodePort traffic and bypass chain `KUBE-SERVICES`. This is not a
  new rule.
- Rule 4 is to accept LoadBalancerIP/ExternalIP traffic and bypass chain `KUBE-SERVICES`.
  We use an ipset to match the traffic.

The iptables rules of raw table chainANTREA-PREROUTING are like below:

```
1. -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK
2. -A ANTREA-PREROUTING -m comment --comment "Antrea: drop Pod multicast traffic forwarded via underlay network" -m set --match-set CLUSTER-NODE-IP src -d 224.0.0.0/4 -j DROP
3. -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to external IP request packets" -m set --match-set ANTREA-EXTERNAL-IP dst -j NOTRACK
4. -A ANTREA-PREROUTING -m comment --comment "Antrea: do not track external to external IP reply packets" -m set --match-set ANTREA-EXTERNAL-IP src -j NOTRACK
```

- Rules 1-2 are not new rules.
- Rules 3-4 are to bypass conntrack for LoadBalancerIP/externalIP traffic.

The following are the benchmark results of a LoadBalancer Service configured with DSR mode.
The results of TCP_STREAM and TCP_RR (single TCP connection) are almost the same as that
before. The result of TCP_CRR (multiple TCP connections) performs better than before. One
reason should be that conntrack is skipped for LoadBalancer Services.

```
Test           v2.0 proxyAll     Dev proxyAll    Delta
TCP_STREAM     4933.97           4918.35         -0.32%
TCP_RR         8095.49           8032.4         -0.78%
TCP_CRR        1645.66           1888.93         +14.79%
```

Signed-off-by: Hongliang Liu <[email protected]>
  • Loading branch information
hongliangl committed Jul 4, 2024
1 parent 6004f42 commit 54e5429
Show file tree
Hide file tree
Showing 19 changed files with 1,063 additions and 559 deletions.
1 change: 1 addition & 0 deletions .github/workflows/kind.yml
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ jobs:
--coverage \
--encap-mode encap \
--proxy-all \
--no-kube-proxy \
--feature-gates LoadBalancerModeDSR=true \
--load-balancer-mode dsr \
--node-ipam
Expand Down
10 changes: 8 additions & 2 deletions ci/kind/test-e2e-kind.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ _usage="Usage: $0 [--encap-mode <mode>] [--ip-family <v4|v6|dual>] [--coverage]
--feature-gates A comma-separated list of key=value pairs that describe feature gates, e.g. AntreaProxy=true,Egress=false.
--run Run only tests matching the regexp.
--proxy-all Enables Antrea proxy with all Service support.
--no-kube-proxy Don't deploy kube-proxy.
--load-balancer-mode LoadBalancer mode.
--node-ipam Enables Antrea NodeIPAM.
--multicast Enables Multicast.
Expand Down Expand Up @@ -73,6 +74,7 @@ mode=""
ipfamily="v4"
feature_gates=""
proxy_all=false
no_kube_proxy=false
load_balancer_mode=""
node_ipam=false
multicast=false
Expand Down Expand Up @@ -108,6 +110,10 @@ case $key in
proxy_all=true
shift
;;
--no-kube-proxy)
no_kube_proxy=true
shift
;;
--load-balancer-mode)
load_balancer_mode="$2"
shift 2
Expand Down Expand Up @@ -305,7 +311,7 @@ function setup_cluster {
echoerr "invalid value for --ip-family \"$ipfamily\", expected \"v4\" or \"v6\""
exit 1
fi
if $proxy_all; then
if $no_kube_proxy; then
args="$args --no-kube-proxy"
fi
if $node_ipam; then
Expand Down Expand Up @@ -359,7 +365,7 @@ function run_test {
cat $CH_OPERATOR_YML | docker exec -i kind-control-plane dd of=/root/clickhouse-operator-install-bundle.yml
fi

if $proxy_all; then
if $no_kube_proxy; then
apiserver=$(docker exec -i kind-control-plane kubectl get endpoints kubernetes --no-headers | awk '{print $2}')
if $coverage; then
docker exec -i kind-control-plane sed -i.bak -E "s/^[[:space:]]*[#]?kubeAPIServerOverride[[:space:]]*:[[:space:]]*[a-z\"]+[[:space:]]*$/ kubeAPIServerOverride: \"$apiserver\"/" /root/antrea-coverage.yml /root/antrea-ipsec-coverage.yml
Expand Down
2 changes: 2 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func run(o *Options) error {
routeClient, err := route.NewClient(networkConfig,
o.config.NoSNAT,
o.config.AntreaProxy.ProxyAll,
o.config.KubeAPIServerOverride != "",
connectUplinkToBridge,
nodeNetworkPolicyEnabled,
multicastEnabled,
Expand Down Expand Up @@ -456,6 +457,7 @@ func run(o *Options) error {
o.defaultLoadBalancerMode,
v4GroupCounter,
v6GroupCounter,
o.config.KubeAPIServerOverride != "",
enableMulticlusterGW)
if err != nil {
return fmt.Errorf("error when creating proxier: %v", err)
Expand Down
18 changes: 12 additions & 6 deletions docs/antrea-proxy.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,20 @@ the introduction of `proxyAll`, Antrea relied on userspace kube-proxy, which is
no longer actively maintained by the K8s community and is slower than other
kube-proxy backends.

Note that on Linux, even when `proxyAll` is enabled, kube-proxy will usually
take priority and will keep handling NodePort Service traffic (unless the source
is a Pod, which is pretty unusual as Pods typically access Services by
ClusterIP). This is because kube-proxy rules typically come before the rules
installed by AntreaProxy to redirect traffic to OVS. When kube-proxy is not
deployed or is removed from the cluster, AntreaProxy will then handle all
Note that on Linux, before Antrea v2.1, when `proxyAll` is enabled, kube-proxy
will usually take priority and will keep handling NodePort Service traffic
(unless the source is a Pod, which is pretty unusual as Pods typically access
Services by ClusterIP). This is because kube-proxy rules typically come before
the rules installed by AntreaProxy to redirect traffic to OVS. When kube-proxy
is not deployed or is removed from the cluster, AntreaProxy will then handle all
Service traffic.

It's worth noting that starting with Antrea v2.1, when only `proxyAll` is enabled,
even if kube-proxy is present, AntreaProxy is capable of handing all types of
Service traffic except for that of the kube Service. This is accomplished by
prioritizing the rules installed by AntreaProxy redirecting Service traffic to OVS
over those installed by kube-proxy.

### Removing kube-proxy

In this section, we will provide steps to run a K8s cluster without kube-proxy,
Expand Down
124 changes: 43 additions & 81 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
apimachinerytypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
coreinformers "k8s.io/client-go/informers/core/v1"
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -121,13 +120,6 @@ type proxier struct {
serviceHealthServer healthcheck.ServiceHealthServer
numLocalEndpoints map[apimachinerytypes.NamespacedName]int

// serviceIPRouteReferences tracks the references of Service IP routes. The key is the Service IP and the value is
// the set of ServiceInfo strings. Because a Service could have multiple ports and each port will generate a
// ServicePort (which is the unit of the processing), a Service IP route may be required by several ServicePorts.
// With the references, we install a route exactly once as long as it's used by any ServicePorts and uninstall it
// exactly once when it's no longer used by any ServicePorts.
// It applies to ClusterIP and LoadBalancerIP.
serviceIPRouteReferences map[string]sets.Set[string]
// syncedOnce returns true if the proxier has synced rules at least once.
syncedOnce bool
syncedOnceMutex sync.RWMutex
Expand Down Expand Up @@ -338,10 +330,7 @@ func (p *proxier) removeStaleServiceConntrackEntries(svcPortName k8sproxy.Servic
svcPort := uint16(svcInfo.Port())
nodePort := uint16(svcInfo.NodePort())
svcProto := svcInfo.OFProtocol
virtualNodePortDNATIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
virtualNodePortDNATIP = agentconfig.VirtualNodePortDNATIPv6
}
virtualNodePortDNATIP := virtualNodePortDNATIP(p.isIPv6)

svcIPToPort := make(map[string]uint16)
svcIPToPort[svcInfo.ClusterIP().String()] = svcPort
Expand Down Expand Up @@ -384,10 +373,7 @@ func (p *proxier) removeStaleConntrackEntries(svcPortName k8sproxy.ServicePortNa
externalIPStrings := svcInfo.ExternalIPStrings()
pLoadBalancerIPStrings := pSvcInfo.LoadBalancerIPStrings()
loadBalancerIPStrings := svcInfo.LoadBalancerIPStrings()
virtualNodePortDNATIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
virtualNodePortDNATIP = agentconfig.VirtualNodePortDNATIPv6
}
virtualNodePortDNATIP := virtualNodePortDNATIP(p.isIPv6)
var svcPortChanged, svcNodePortChanged bool

staleSvcIPToPort := make(map[string]uint16)
Expand Down Expand Up @@ -552,12 +538,8 @@ func (p *proxier) installNodePortService(localGroupID, clusterGroupID binding.Gr
if svcPort == 0 {
return nil
}
svcIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
svcIP = agentconfig.VirtualNodePortDNATIPv6
}
if err := p.ofClient.InstallServiceFlows(&agenttypes.ServiceConfig{
ServiceIP: svcIP,
ServiceIP: virtualNodePortDNATIP(p.isIPv6),
ServicePort: svcPort,
Protocol: protocol,
TrafficPolicyLocal: trafficPolicyLocal,
Expand All @@ -569,10 +551,10 @@ func (p *proxier) installNodePortService(localGroupID, clusterGroupID binding.Gr
IsNested: false, // Unsupported for NodePort
IsDSR: false, // Unsupported because external traffic has been DNAT'd in host network before it's forwarded to OVS.
}); err != nil {
return fmt.Errorf("failed to install NodePort load balancing flows: %w", err)
return fmt.Errorf("failed to install NodePort load balancing OVS flows: %w", err)
}
if err := p.routeClient.AddNodePort(p.nodePortAddresses, svcPort, protocol); err != nil {
return fmt.Errorf("failed to install NodePort traffic redirecting rules: %w", err)
if err := p.routeClient.AddNodePortConfigs(p.nodePortAddresses, svcPort, protocol); err != nil {
return fmt.Errorf("failed to install NodePort traffic redirecting routing configurations: %w", err)
}
return nil
}
Expand All @@ -581,15 +563,11 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot
if svcPort == 0 {
return nil
}
svcIP := agentconfig.VirtualNodePortDNATIPv4
if p.isIPv6 {
svcIP = agentconfig.VirtualNodePortDNATIPv6
}
if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil {
if err := p.ofClient.UninstallServiceFlows(virtualNodePortDNATIP(p.isIPv6), svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove NodePort load balancing flows: %w", err)
}
if err := p.routeClient.DeleteNodePort(p.nodePortAddresses, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove NodePort traffic redirecting rules: %w", err)
if err := p.routeClient.DeleteNodePortConfigs(p.nodePortAddresses, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove NodePort traffic redirecting routing configurations: %w", err)
}
return nil
}
Expand Down Expand Up @@ -618,10 +596,10 @@ func (p *proxier) installExternalIPService(svcInfoStr string,
IsNested: false, // Unsupported for ExternalIP
IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR,
}); err != nil {
return fmt.Errorf("failed to install ExternalIP load balancing flows: %w", err)
return fmt.Errorf("failed to install ExternalIP load balancing OVS flows: %w", err)
}
if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil {
return fmt.Errorf("failed to install ExternalIP traffic redirecting routes: %w", err)
if err := p.routeClient.AddExternalIPConfigs(svcInfoStr, ip); err != nil {
return fmt.Errorf("failed to install ExternalIP load balancing routing configurations: %w", err)
}
}
return nil
Expand All @@ -631,10 +609,10 @@ func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPString
for _, externalIP := range externalIPStrings {
ip := net.ParseIP(externalIP)
if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove ExternalIP load balancing flows: %w", err)
return fmt.Errorf("failed to remove ExternalIP load balancing OVS flows: %w", err)
}
if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil {
return fmt.Errorf("failed to remove ExternalIP traffic redirecting routes: %w", err)
if err := p.routeClient.DeleteExternalIPConfigs(svcInfoStr, ip); err != nil {
return fmt.Errorf("failed to remove ExternalIP traffic redirecting routing configurations: %w", err)
}
}
return nil
Expand Down Expand Up @@ -665,71 +643,35 @@ func (p *proxier) installLoadBalancerService(svcInfoStr string,
IsNested: false, // Unsupported for LoadBalancerIP
IsDSR: features.DefaultFeatureGate.Enabled(features.LoadBalancerModeDSR) && loadBalancerMode == agentconfig.LoadBalancerModeDSR,
}); err != nil {
return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err)
return fmt.Errorf("failed to install LoadBalancerIP load balancing OVS flows: %w", err)
}
if p.proxyAll {
if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil {
return fmt.Errorf("failed to install LoadBalancer traffic redirecting routes: %w", err)
if err := p.routeClient.AddExternalIPConfigs(svcInfoStr, ip); err != nil {
return fmt.Errorf("failed to install LoadBalancerIP traffic redirecting routing configurations: %w", err)
}
}
}
}
return nil
}

func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn func(net.IP) error) error {
ipStr := ip.String()
references, exists := p.serviceIPRouteReferences[ipStr]
// If the IP was not referenced by any Service port, install a route for it.
// Otherwise, just reference it.
if !exists {
if err := addRouteFn(ip); err != nil {
return err
}
references = sets.New[string](svcInfoStr)
p.serviceIPRouteReferences[ipStr] = references
} else {
references.Insert(svcInfoStr)
}
return nil
}

func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error {
for _, ingress := range loadBalancerIPStrings {
if ingress != "" {
ip := net.ParseIP(ingress)
if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove LoadBalancer load balancing flows: %w", err)
return fmt.Errorf("failed to remove LoadBalancerIP load balancing OVS flows: %w", err)
}
if p.proxyAll {
if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil {
return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err)
if err := p.routeClient.DeleteExternalIPConfigs(svcInfoStr, ip); err != nil {
return fmt.Errorf("failed to remove LoadBalancerIP traffic redirecting routing configurations: %w", err)
}
}
}
}
return nil
}

func (p *proxier) deleteRouteForServiceIP(svcInfoStr string, ip net.IP, deleteRouteFn func(net.IP) error) error {
ipStr := ip.String()
references, exists := p.serviceIPRouteReferences[ipStr]
// If the IP was not referenced by this Service port, skip it.
if exists && references.Has(svcInfoStr) {
// Delete the IP only if this Service port is the last one referencing it.
// Otherwise, just dereference it.
if references.Len() == 1 {
if err := deleteRouteFn(ip); err != nil {
return err
}
delete(p.serviceIPRouteReferences, ipStr)
} else {
references.Delete(svcInfoStr)
}
}
return nil
}

func (p *proxier) installServices() {
for svcPortName, svcPort := range p.serviceMap {
svcInfo := svcPort.(*types.ServiceInfo)
Expand Down Expand Up @@ -1392,6 +1334,7 @@ func newProxier(
nodeIPChecker nodeip.Checker,
nodePortAddresses []net.IP,
proxyAllEnabled bool,
isKubeAPIServerOverridden bool,
skipServices []string,
proxyLoadBalancerIPs bool,
defaultLoadBalancerMode agentconfig.LoadBalancerMode,
Expand Down Expand Up @@ -1422,7 +1365,13 @@ func newProxier(
}

var serviceHealthServer healthcheck.ServiceHealthServer
if proxyAllEnabled {
if proxyAllEnabled && isKubeAPIServerOverridden {
// The serviceHealthServer of AntreaProxy should be initialized only when kube-proxy is removed. If kube-proxy
// is present, it also provides the serviceHealthServer functionality, and both servers would attempt to start
// one HTTP service on the same port in a K8s Node, causing conflicts. The option `kubeAPIServerOverride`
// in antrea-agent is used to determine if the serviceHealthServer of AntreaProxy should be initialized. The
// option must be set if kube-proxy is removed, though it can also be set when kube-proxy is present (not
// recommended and unnecessary). We assume this option is set only when kube-proxy is removed.
nodePortAddressesString := make([]string, len(nodePortAddresses))
for i, address := range nodePortAddresses {
nodePortAddressesString[i] = address.String()
Expand Down Expand Up @@ -1454,7 +1403,6 @@ func newProxier(
endpointsInstalledMap: types.EndpointsMap{},
endpointsMap: types.EndpointsMap{},
endpointReferenceCounter: map[string]int{},
serviceIPRouteReferences: map[string]sets.Set[string]{},
nodeLabels: map[string]string{},
serviceStringMap: map[string]k8sproxy.ServicePortName{},
groupCounter: groupCounter,
Expand Down Expand Up @@ -1533,6 +1481,7 @@ func newDualStackProxier(
nodePortAddressesIPv4 []net.IP,
nodePortAddressesIPv6 []net.IP,
proxyAllEnabled bool,
isKubeAPIServerOverridden bool,
skipServices []string,
proxyLoadBalancerIPs bool,
defaultLoadBalancerMode agentconfig.LoadBalancerMode,
Expand All @@ -1554,6 +1503,7 @@ func newDualStackProxier(
nodeIPChecker,
nodePortAddressesIPv4,
proxyAllEnabled,
isKubeAPIServerOverridden,
skipServices,
proxyLoadBalancerIPs,
defaultLoadBalancerMode,
Expand All @@ -1576,6 +1526,7 @@ func newDualStackProxier(
nodeIPChecker,
nodePortAddressesIPv6,
proxyAllEnabled,
isKubeAPIServerOverridden,
skipServices,
proxyLoadBalancerIPs,
defaultLoadBalancerMode,
Expand Down Expand Up @@ -1608,6 +1559,7 @@ func NewProxier(hostname string,
defaultLoadBalancerMode agentconfig.LoadBalancerMode,
v4GroupCounter types.GroupCounter,
v6GroupCounter types.GroupCounter,
isKubeAPIServerOverridden bool,
nestedServiceSupport bool) (Proxier, error) {
proxyAllEnabled := proxyConfig.ProxyAll
skipServices := proxyConfig.SkipServices
Expand All @@ -1631,6 +1583,7 @@ func NewProxier(hostname string,
nodePortAddressesIPv4,
nodePortAddressesIPv6,
proxyAllEnabled,
isKubeAPIServerOverridden,
skipServices,
proxyLoadBalancerIPs,
defaultLoadBalancerMode,
Expand All @@ -1654,6 +1607,7 @@ func NewProxier(hostname string,
nodeIPChecker,
nodePortAddressesIPv4,
proxyAllEnabled,
isKubeAPIServerOverridden,
skipServices,
proxyLoadBalancerIPs,
defaultLoadBalancerMode,
Expand All @@ -1676,6 +1630,7 @@ func NewProxier(hostname string,
nodeIPChecker,
nodePortAddressesIPv6,
proxyAllEnabled,
isKubeAPIServerOverridden,
skipServices,
proxyLoadBalancerIPs,
defaultLoadBalancerMode,
Expand All @@ -1694,3 +1649,10 @@ func NewProxier(hostname string,
func needClearConntrackEntries(protocol binding.Protocol) bool {
return protocol == binding.ProtocolUDP || protocol == binding.ProtocolUDPv6
}

func virtualNodePortDNATIP(isIPv6 bool) net.IP {
if isIPv6 {
return agentconfig.VirtualNodePortDNATIPv6
}
return agentconfig.VirtualNodePortDNATIPv4
}
Loading

0 comments on commit 54e5429

Please sign in to comment.