diff --git a/build/charts/antrea/conf/antrea-agent.conf b/build/charts/antrea/conf/antrea-agent.conf index 11ee1f54b0e..5cfe934ff6f 100644 --- a/build/charts/antrea/conf/antrea-agent.conf +++ b/build/charts/antrea/conf/antrea-agent.conf @@ -23,6 +23,9 @@ featureGates: # Enable traceflow which provides packet tracing feature to diagnose network issue. {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "Traceflow" "default" true) }} +# Enable PacketSampling feature which provides packets sampling (capture) feature to diagnose network issue. +{{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "PacketSampling" "default" false) }} + # Enable NodePortLocal feature to make the Pods reachable externally through NodePort {{- include "featureGate" (dict "featureGates" .Values.featureGates "name" "NodePortLocal" "default" true) }} diff --git a/build/charts/antrea/templates/agent/clusterrole.yaml b/build/charts/antrea/templates/agent/clusterrole.yaml index 05ca58c1ab6..1cabd7c6605 100644 --- a/build/charts/antrea/templates/agent/clusterrole.yaml +++ b/build/charts/antrea/templates/agent/clusterrole.yaml @@ -33,6 +33,12 @@ rules: - pods/status verbs: - patch + - apiGroups: + - "" + resources: + - secrets + verbs: + - get - apiGroups: - "" resources: @@ -154,6 +160,20 @@ rules: - patch - create - delete + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings + verbs: + - get + - watch + - list + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings/status + verbs: + - patch - apiGroups: - crd.antrea.io resources: diff --git a/build/charts/antrea/templates/webhooks/validating/crdvalidator.yaml b/build/charts/antrea/templates/webhooks/validating/crdvalidator.yaml index 0dfe1f8acd3..c5ba497552c 100644 --- a/build/charts/antrea/templates/webhooks/validating/crdvalidator.yaml +++ b/build/charts/antrea/templates/webhooks/validating/crdvalidator.yaml @@ -184,3 +184,18 @@ webhooks: admissionReviewVersions: ["v1", "v1beta1"] sideEffects: None timeoutSeconds: 5 + - name: "packetsamplingvalidator.antrea.io" + clientConfig: + service: + name: "antrea" + namespace: {{ .Release.Namespace }} + path: "/validate/packetsampling" + rules: + - operations: ["CREATE", "UPDATE"] + apiGroups: ["crd.antrea.io"] + apiVersions: ["v1alpha1"] + resources: ["packetsampling"] + scope: "Cluster" + admissionReviewVersions: ["v1", "v1beta1"] + sideEffects: None + timeoutSeconds: 5 diff --git a/build/yamls/antrea-aks.yml b/build/yamls/antrea-aks.yml index f4bc9bb2162..95ecef577e4 100644 --- a/build/yamls/antrea-aks.yml +++ b/build/yamls/antrea-aks.yml @@ -4313,6 +4313,182 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/packetsampling.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: packetsamplings.crd.antrea.io + labels: + app: antrea +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + additionalPrinterColumns: + - jsonPath: .status.phase + description: The phase of the PacketSampling. + name: Phase + type: string + - jsonPath: .spec.source.pod + description: The name of the source Pod. + name: Source-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.pod + description: The name of the destination Pod. + name: Destination-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.ip + description: The IP address of the destination. + name: Destination-IP + type: string + priority: 10 + - jsonPath: .spec.timeout + description: Timeout in seconds. + name: Timeout + type: integer + priority: 10 + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + properties: + source: + type: object + properties: + pod: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + destination: + type: object + properties: + pod: + type: string + service: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + packet: + type: object + properties: + ipHeader: + type: object + properties: + protocol: + type: integer + minimum: 0 + maximum: 255 + ipv6Header: + type: object + properties: + nextHeader: + type: integer + minimum: 0 + maximum: 65535 + transportHeader: + type: object + properties: + udp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + tcp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + flags: + type: integer + minimum: 0 + maximum: 255 + timeout: + type: integer + minimum: 1 + maximum: 300 + type: + type: string + firstNSamplingConfig: + type: object + properties: + number: + type: integer + fileServer: + type: object + properties: + url: + type: string + authentication: + type: object + properties: + authType: + type: string + enum: [ "BearerToken", "APIKey", "BasicAuthentication" ] + authSecret: + type: object + properties: + name: + type: string + namespace: + type: string + status: + type: object + properties: + reason: + type: string + phase: + type: string + startTime: + type: string + numCapturedPackets: + type: integer + packetsPath: + type: string + + subresources: + status: {} + scope: Cluster + names: + plural: packetsamplings + singular: packetsampling + kind: PacketSampling + shortNames: + - ps + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -5449,6 +5625,9 @@ data: # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true + # Enable PacketSampling feature which provides packets sampling (capture) feature to diagnose network issue. + # PacketSampling: false + # Enable NodePortLocal feature to make the Pods reachable externally through NodePort # NodePortLocal: true @@ -6008,6 +6187,12 @@ rules: - pods/status verbs: - patch + - apiGroups: + - "" + resources: + - secrets + verbs: + - get - apiGroups: - "" resources: @@ -6129,6 +6314,20 @@ rules: - patch - create - delete + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings + verbs: + - get + - watch + - list + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings/status + verbs: + - patch - apiGroups: - crd.antrea.io resources: @@ -6810,7 +7009,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 8c4aced766a99cc91b0f614593a3f326865f0b882ad92a105827253ffb98d0db labels: app: antrea component: antrea-agent @@ -7048,7 +7247,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 8c4aced766a99cc91b0f614593a3f326865f0b882ad92a105827253ffb98d0db labels: app: antrea component: antrea-controller @@ -7431,3 +7630,18 @@ webhooks: admissionReviewVersions: ["v1", "v1beta1"] sideEffects: None timeoutSeconds: 5 + - name: "packetsamplingvalidator.antrea.io" + clientConfig: + service: + name: "antrea" + namespace: kube-system + path: "/validate/packetsampling" + rules: + - operations: ["CREATE", "UPDATE"] + apiGroups: ["crd.antrea.io"] + apiVersions: ["v1alpha1"] + resources: ["packetsampling"] + scope: "Cluster" + admissionReviewVersions: ["v1", "v1beta1"] + sideEffects: None + timeoutSeconds: 5 diff --git a/build/yamls/antrea-crds.yml b/build/yamls/antrea-crds.yml index e0497dcf8b2..e71b0148ef2 100644 --- a/build/yamls/antrea-crds.yml +++ b/build/yamls/antrea-crds.yml @@ -4294,6 +4294,180 @@ spec: --- apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition +metadata: + name: packetsamplings.crd.antrea.io + labels: + app: antrea +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + additionalPrinterColumns: + - jsonPath: .status.phase + description: The phase of the PacketSampling. + name: Phase + type: string + - jsonPath: .spec.source.pod + description: The name of the source Pod. + name: Source-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.pod + description: The name of the destination Pod. + name: Destination-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.ip + description: The IP address of the destination. + name: Destination-IP + type: string + priority: 10 + - jsonPath: .spec.timeout + description: Timeout in seconds. + name: Timeout + type: integer + priority: 10 + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + properties: + source: + type: object + properties: + pod: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + destination: + type: object + properties: + pod: + type: string + service: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + packet: + type: object + properties: + ipHeader: + type: object + properties: + protocol: + type: integer + minimum: 0 + maximum: 255 + ipv6Header: + type: object + properties: + nextHeader: + type: integer + minimum: 0 + maximum: 65535 + transportHeader: + type: object + properties: + udp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + tcp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + flags: + type: integer + minimum: 0 + maximum: 255 + timeout: + type: integer + minimum: 1 + maximum: 300 + type: + type: string + firstNSamplingConfig: + type: object + properties: + number: + type: integer + fileServer: + type: object + properties: + url: + type: string + authentication: + type: object + properties: + authType: + type: string + enum: [ "BearerToken", "APIKey", "BasicAuthentication" ] + authSecret: + type: object + properties: + name: + type: string + namespace: + type: string + status: + type: object + properties: + reason: + type: string + phase: + type: string + startTime: + type: string + numCapturedPackets: + type: integer + packetsPath: + type: string + + subresources: + status: {} + scope: Cluster + names: + plural: packetsamplings + singular: packetsampling + kind: PacketSampling + shortNames: + - ps +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition metadata: name: supportbundlecollections.crd.antrea.io spec: diff --git a/build/yamls/antrea-eks.yml b/build/yamls/antrea-eks.yml index bec701d3056..24d473777af 100644 --- a/build/yamls/antrea-eks.yml +++ b/build/yamls/antrea-eks.yml @@ -4313,6 +4313,182 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/packetsampling.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: packetsamplings.crd.antrea.io + labels: + app: antrea +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + additionalPrinterColumns: + - jsonPath: .status.phase + description: The phase of the PacketSampling. + name: Phase + type: string + - jsonPath: .spec.source.pod + description: The name of the source Pod. + name: Source-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.pod + description: The name of the destination Pod. + name: Destination-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.ip + description: The IP address of the destination. + name: Destination-IP + type: string + priority: 10 + - jsonPath: .spec.timeout + description: Timeout in seconds. + name: Timeout + type: integer + priority: 10 + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + properties: + source: + type: object + properties: + pod: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + destination: + type: object + properties: + pod: + type: string + service: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + packet: + type: object + properties: + ipHeader: + type: object + properties: + protocol: + type: integer + minimum: 0 + maximum: 255 + ipv6Header: + type: object + properties: + nextHeader: + type: integer + minimum: 0 + maximum: 65535 + transportHeader: + type: object + properties: + udp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + tcp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + flags: + type: integer + minimum: 0 + maximum: 255 + timeout: + type: integer + minimum: 1 + maximum: 300 + type: + type: string + firstNSamplingConfig: + type: object + properties: + number: + type: integer + fileServer: + type: object + properties: + url: + type: string + authentication: + type: object + properties: + authType: + type: string + enum: [ "BearerToken", "APIKey", "BasicAuthentication" ] + authSecret: + type: object + properties: + name: + type: string + namespace: + type: string + status: + type: object + properties: + reason: + type: string + phase: + type: string + startTime: + type: string + numCapturedPackets: + type: integer + packetsPath: + type: string + + subresources: + status: {} + scope: Cluster + names: + plural: packetsamplings + singular: packetsampling + kind: PacketSampling + shortNames: + - ps + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -5449,6 +5625,9 @@ data: # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true + # Enable PacketSampling feature which provides packets sampling (capture) feature to diagnose network issue. + # PacketSampling: false + # Enable NodePortLocal feature to make the Pods reachable externally through NodePort # NodePortLocal: true @@ -6008,6 +6187,12 @@ rules: - pods/status verbs: - patch + - apiGroups: + - "" + resources: + - secrets + verbs: + - get - apiGroups: - "" resources: @@ -6129,6 +6314,20 @@ rules: - patch - create - delete + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings + verbs: + - get + - watch + - list + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings/status + verbs: + - patch - apiGroups: - crd.antrea.io resources: @@ -6810,7 +7009,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 8c4aced766a99cc91b0f614593a3f326865f0b882ad92a105827253ffb98d0db labels: app: antrea component: antrea-agent @@ -7049,7 +7248,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 30843b57762c91dfcffb560917191e3bc7e662c06552759bac2a173bc060b82c + checksum/config: 8c4aced766a99cc91b0f614593a3f326865f0b882ad92a105827253ffb98d0db labels: app: antrea component: antrea-controller @@ -7432,3 +7631,18 @@ webhooks: admissionReviewVersions: ["v1", "v1beta1"] sideEffects: None timeoutSeconds: 5 + - name: "packetsamplingvalidator.antrea.io" + clientConfig: + service: + name: "antrea" + namespace: kube-system + path: "/validate/packetsampling" + rules: + - operations: ["CREATE", "UPDATE"] + apiGroups: ["crd.antrea.io"] + apiVersions: ["v1alpha1"] + resources: ["packetsampling"] + scope: "Cluster" + admissionReviewVersions: ["v1", "v1beta1"] + sideEffects: None + timeoutSeconds: 5 diff --git a/build/yamls/antrea-gke.yml b/build/yamls/antrea-gke.yml index a4ae810f7b0..1d2d050b3ed 100644 --- a/build/yamls/antrea-gke.yml +++ b/build/yamls/antrea-gke.yml @@ -4313,6 +4313,182 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/packetsampling.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: packetsamplings.crd.antrea.io + labels: + app: antrea +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + additionalPrinterColumns: + - jsonPath: .status.phase + description: The phase of the PacketSampling. + name: Phase + type: string + - jsonPath: .spec.source.pod + description: The name of the source Pod. + name: Source-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.pod + description: The name of the destination Pod. + name: Destination-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.ip + description: The IP address of the destination. + name: Destination-IP + type: string + priority: 10 + - jsonPath: .spec.timeout + description: Timeout in seconds. + name: Timeout + type: integer + priority: 10 + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + properties: + source: + type: object + properties: + pod: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + destination: + type: object + properties: + pod: + type: string + service: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + packet: + type: object + properties: + ipHeader: + type: object + properties: + protocol: + type: integer + minimum: 0 + maximum: 255 + ipv6Header: + type: object + properties: + nextHeader: + type: integer + minimum: 0 + maximum: 65535 + transportHeader: + type: object + properties: + udp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + tcp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + flags: + type: integer + minimum: 0 + maximum: 255 + timeout: + type: integer + minimum: 1 + maximum: 300 + type: + type: string + firstNSamplingConfig: + type: object + properties: + number: + type: integer + fileServer: + type: object + properties: + url: + type: string + authentication: + type: object + properties: + authType: + type: string + enum: [ "BearerToken", "APIKey", "BasicAuthentication" ] + authSecret: + type: object + properties: + name: + type: string + namespace: + type: string + status: + type: object + properties: + reason: + type: string + phase: + type: string + startTime: + type: string + numCapturedPackets: + type: integer + packetsPath: + type: string + + subresources: + status: {} + scope: Cluster + names: + plural: packetsamplings + singular: packetsampling + kind: PacketSampling + shortNames: + - ps + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -5449,6 +5625,9 @@ data: # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true + # Enable PacketSampling feature which provides packets sampling (capture) feature to diagnose network issue. + # PacketSampling: false + # Enable NodePortLocal feature to make the Pods reachable externally through NodePort # NodePortLocal: true @@ -6008,6 +6187,12 @@ rules: - pods/status verbs: - patch + - apiGroups: + - "" + resources: + - secrets + verbs: + - get - apiGroups: - "" resources: @@ -6129,6 +6314,20 @@ rules: - patch - create - delete + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings + verbs: + - get + - watch + - list + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings/status + verbs: + - patch - apiGroups: - crd.antrea.io resources: @@ -6810,7 +7009,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: d5cdb5356795c44a69c66fad1b4d67f7c00cdcbe837f3b3b50260e4d9dfd1e7e + checksum/config: e5155e0f5b5f1d56a497787fbb04ceed66a301b8debf12b489508278f9310463 labels: app: antrea component: antrea-agent @@ -7046,7 +7245,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: d5cdb5356795c44a69c66fad1b4d67f7c00cdcbe837f3b3b50260e4d9dfd1e7e + checksum/config: e5155e0f5b5f1d56a497787fbb04ceed66a301b8debf12b489508278f9310463 labels: app: antrea component: antrea-controller @@ -7429,3 +7628,18 @@ webhooks: admissionReviewVersions: ["v1", "v1beta1"] sideEffects: None timeoutSeconds: 5 + - name: "packetsamplingvalidator.antrea.io" + clientConfig: + service: + name: "antrea" + namespace: kube-system + path: "/validate/packetsampling" + rules: + - operations: ["CREATE", "UPDATE"] + apiGroups: ["crd.antrea.io"] + apiVersions: ["v1alpha1"] + resources: ["packetsampling"] + scope: "Cluster" + admissionReviewVersions: ["v1", "v1beta1"] + sideEffects: None + timeoutSeconds: 5 diff --git a/build/yamls/antrea-ipsec.yml b/build/yamls/antrea-ipsec.yml index 393cb59da0a..72a42061c83 100644 --- a/build/yamls/antrea-ipsec.yml +++ b/build/yamls/antrea-ipsec.yml @@ -4313,6 +4313,182 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/packetsampling.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: packetsamplings.crd.antrea.io + labels: + app: antrea +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + additionalPrinterColumns: + - jsonPath: .status.phase + description: The phase of the PacketSampling. + name: Phase + type: string + - jsonPath: .spec.source.pod + description: The name of the source Pod. + name: Source-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.pod + description: The name of the destination Pod. + name: Destination-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.ip + description: The IP address of the destination. + name: Destination-IP + type: string + priority: 10 + - jsonPath: .spec.timeout + description: Timeout in seconds. + name: Timeout + type: integer + priority: 10 + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + properties: + source: + type: object + properties: + pod: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + destination: + type: object + properties: + pod: + type: string + service: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + packet: + type: object + properties: + ipHeader: + type: object + properties: + protocol: + type: integer + minimum: 0 + maximum: 255 + ipv6Header: + type: object + properties: + nextHeader: + type: integer + minimum: 0 + maximum: 65535 + transportHeader: + type: object + properties: + udp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + tcp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + flags: + type: integer + minimum: 0 + maximum: 255 + timeout: + type: integer + minimum: 1 + maximum: 300 + type: + type: string + firstNSamplingConfig: + type: object + properties: + number: + type: integer + fileServer: + type: object + properties: + url: + type: string + authentication: + type: object + properties: + authType: + type: string + enum: [ "BearerToken", "APIKey", "BasicAuthentication" ] + authSecret: + type: object + properties: + name: + type: string + namespace: + type: string + status: + type: object + properties: + reason: + type: string + phase: + type: string + startTime: + type: string + numCapturedPackets: + type: integer + packetsPath: + type: string + + subresources: + status: {} + scope: Cluster + names: + plural: packetsamplings + singular: packetsampling + kind: PacketSampling + shortNames: + - ps + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -5462,6 +5638,9 @@ data: # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true + # Enable PacketSampling feature which provides packets sampling (capture) feature to diagnose network issue. + # PacketSampling: false + # Enable NodePortLocal feature to make the Pods reachable externally through NodePort # NodePortLocal: true @@ -6021,6 +6200,12 @@ rules: - pods/status verbs: - patch + - apiGroups: + - "" + resources: + - secrets + verbs: + - get - apiGroups: - "" resources: @@ -6142,6 +6327,20 @@ rules: - patch - create - delete + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings + verbs: + - get + - watch + - list + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings/status + verbs: + - patch - apiGroups: - crd.antrea.io resources: @@ -6823,7 +7022,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 50f2864cf09e4732327b963130bd59a9fc06c560784b161c94e813c000367615 + checksum/config: 051af47db7b16f375bac60985fb4984c63f443fe27758702613eef508fc9fced checksum/ipsec-secret: d0eb9c52d0cd4311b6d252a951126bf9bea27ec05590bed8a394f0f792dcb2a4 labels: app: antrea @@ -7105,7 +7304,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: 50f2864cf09e4732327b963130bd59a9fc06c560784b161c94e813c000367615 + checksum/config: 051af47db7b16f375bac60985fb4984c63f443fe27758702613eef508fc9fced labels: app: antrea component: antrea-controller @@ -7488,3 +7687,18 @@ webhooks: admissionReviewVersions: ["v1", "v1beta1"] sideEffects: None timeoutSeconds: 5 + - name: "packetsamplingvalidator.antrea.io" + clientConfig: + service: + name: "antrea" + namespace: kube-system + path: "/validate/packetsampling" + rules: + - operations: ["CREATE", "UPDATE"] + apiGroups: ["crd.antrea.io"] + apiVersions: ["v1alpha1"] + resources: ["packetsampling"] + scope: "Cluster" + admissionReviewVersions: ["v1", "v1beta1"] + sideEffects: None + timeoutSeconds: 5 diff --git a/build/yamls/antrea.yml b/build/yamls/antrea.yml index 2451670ca39..ac804a13f75 100644 --- a/build/yamls/antrea.yml +++ b/build/yamls/antrea.yml @@ -4313,6 +4313,182 @@ spec: # Deprecated shortName and shall be removed in Antrea v1.14.0 - anp +--- +# Source: crds/packetsampling.yaml +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: packetsamplings.crd.antrea.io + labels: + app: antrea +spec: + group: crd.antrea.io + versions: + - name: v1alpha1 + served: true + storage: true + additionalPrinterColumns: + - jsonPath: .status.phase + description: The phase of the PacketSampling. + name: Phase + type: string + - jsonPath: .spec.source.pod + description: The name of the source Pod. + name: Source-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.pod + description: The name of the destination Pod. + name: Destination-Pod + type: string + priority: 10 + - jsonPath: .spec.destination.ip + description: The IP address of the destination. + name: Destination-IP + type: string + priority: 10 + - jsonPath: .spec.timeout + description: Timeout in seconds. + name: Timeout + type: integer + priority: 10 + - jsonPath: .metadata.creationTimestamp + name: Age + type: date + schema: + openAPIV3Schema: + type: object + required: + - spec + properties: + spec: + type: object + properties: + source: + type: object + properties: + pod: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + destination: + type: object + properties: + pod: + type: string + service: + type: string + namespace: + type: string + ip: + type: string + oneOf: + - format: ipv4 + - format: ipv6 + packet: + type: object + properties: + ipHeader: + type: object + properties: + protocol: + type: integer + minimum: 0 + maximum: 255 + ipv6Header: + type: object + properties: + nextHeader: + type: integer + minimum: 0 + maximum: 65535 + transportHeader: + type: object + properties: + udp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + tcp: + type: object + properties: + srcPort: + type: integer + minimum: 1 + maximum: 65535 + dstPort: + type: integer + minimum: 1 + maximum: 65535 + flags: + type: integer + minimum: 0 + maximum: 255 + timeout: + type: integer + minimum: 1 + maximum: 300 + type: + type: string + firstNSamplingConfig: + type: object + properties: + number: + type: integer + fileServer: + type: object + properties: + url: + type: string + authentication: + type: object + properties: + authType: + type: string + enum: [ "BearerToken", "APIKey", "BasicAuthentication" ] + authSecret: + type: object + properties: + name: + type: string + namespace: + type: string + status: + type: object + properties: + reason: + type: string + phase: + type: string + startTime: + type: string + numCapturedPackets: + type: integer + packetsPath: + type: string + + subresources: + status: {} + scope: Cluster + names: + plural: packetsamplings + singular: packetsampling + kind: PacketSampling + shortNames: + - ps + --- # Source: crds/supportbundlecollection.yaml apiVersion: apiextensions.k8s.io/v1 @@ -5449,6 +5625,9 @@ data: # Enable traceflow which provides packet tracing feature to diagnose network issue. # Traceflow: true + # Enable PacketSampling feature which provides packets sampling (capture) feature to diagnose network issue. + # PacketSampling: false + # Enable NodePortLocal feature to make the Pods reachable externally through NodePort # NodePortLocal: true @@ -6008,6 +6187,12 @@ rules: - pods/status verbs: - patch + - apiGroups: + - "" + resources: + - secrets + verbs: + - get - apiGroups: - "" resources: @@ -6129,6 +6314,20 @@ rules: - patch - create - delete + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings + verbs: + - get + - watch + - list + - apiGroups: + - crd.antrea.io + resources: + - packetsamplings/status + verbs: + - patch - apiGroups: - crd.antrea.io resources: @@ -6810,7 +7009,7 @@ spec: kubectl.kubernetes.io/default-container: antrea-agent # Automatically restart Pods with a RollingUpdate if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ac3c14eed7ca0dc28bf2d659cd2c4e4a39d55278fb9a8759c30ea12eff89e518 + checksum/config: 9f89f6497340a41f1c461aefd52c1c4d255ecc7494a180d88b645e06153dafa9 labels: app: antrea component: antrea-agent @@ -7046,7 +7245,7 @@ spec: annotations: # Automatically restart Pod if the ConfigMap changes # See https://helm.sh/docs/howto/charts_tips_and_tricks/#automatically-roll-deployments - checksum/config: ac3c14eed7ca0dc28bf2d659cd2c4e4a39d55278fb9a8759c30ea12eff89e518 + checksum/config: 9f89f6497340a41f1c461aefd52c1c4d255ecc7494a180d88b645e06153dafa9 labels: app: antrea component: antrea-controller @@ -7429,3 +7628,18 @@ webhooks: admissionReviewVersions: ["v1", "v1beta1"] sideEffects: None timeoutSeconds: 5 + - name: "packetsamplingvalidator.antrea.io" + clientConfig: + service: + name: "antrea" + namespace: kube-system + path: "/validate/packetsampling" + rules: + - operations: ["CREATE", "UPDATE"] + apiGroups: ["crd.antrea.io"] + apiVersions: ["v1alpha1"] + resources: ["packetsampling"] + scope: "Cluster" + admissionReviewVersions: ["v1", "v1beta1"] + sideEffects: None + timeoutSeconds: 5 diff --git a/ci/kind/test-e2e-kind.sh b/ci/kind/test-e2e-kind.sh index 0d4b5992c3e..2c6f0f6f389 100755 --- a/ci/kind/test-e2e-kind.sh +++ b/ci/kind/test-e2e-kind.sh @@ -55,6 +55,7 @@ THIS_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" TESTBED_CMD="$THIS_DIR/kind-setup.sh" YML_CMD="$THIS_DIR/../../hack/generate-manifest.sh" FLOWAGGREGATOR_YML_CMD="$THIS_DIR/../../hack/generate-manifest-flow-aggregator.sh" +SFTP_DEPLOYMENT_YML="$THIS_DIR/../../hack/externalnode/sftp-deployment.yml" FLOW_VISIBILITY_HELM_VALUES="$THIS_DIR/values-flow-exporter.yml" CH_OPERATOR_YML="$THIS_DIR/../../build/yamls/clickhouse-operator-install-bundle.yml" FLOW_VISIBILITY_CHART="$THIS_DIR/../../test/e2e/charts/flow-visibility" @@ -320,10 +321,15 @@ function run_test { coverage_args="" flow_visibility_args="" + + # used for PacketSampling tests. + cat "$SFTP_DEPLOYMENT_YML" | docker exec -i kind-control-plane dd of=/root/sftp-deployment.yml + if $use_non_default_images; then export AGENT_IMG_NAME=${antrea_agent_image} export CONTROLLER_IMG_NAME=${antrea_controller_image} fi + if $coverage; then $YML_CMD --encap-mode $current_mode $manifest_args | docker exec -i kind-control-plane dd of=/root/antrea-coverage.yml $YML_CMD --ipsec $manifest_args | docker exec -i kind-control-plane dd of=/root/antrea-ipsec-coverage.yml diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 3678b665fc7..804b29e9942 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -43,6 +43,7 @@ import ( "antrea.io/antrea/pkg/agent/controller/networkpolicy" "antrea.io/antrea/pkg/agent/controller/networkpolicy/l7engine" "antrea.io/antrea/pkg/agent/controller/noderoute" + "antrea.io/antrea/pkg/agent/controller/packetsampling" "antrea.io/antrea/pkg/agent/controller/serviceexternalip" "antrea.io/antrea/pkg/agent/controller/traceflow" "antrea.io/antrea/pkg/agent/controller/trafficcontrol" @@ -114,6 +115,7 @@ func run(o *Options) error { informerFactory := informers.NewSharedInformerFactory(k8sClient, informerDefaultResync) crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync) traceflowInformer := crdInformerFactory.Crd().V1beta1().Traceflows() + packetSamplingInformer := crdInformerFactory.Crd().V1alpha1().PacketSamplings() egressInformer := crdInformerFactory.Crd().V1beta1().Egresses() externalIPPoolInformer := crdInformerFactory.Crd().V1beta1().ExternalIPPools() trafficControlInformer := crdInformerFactory.Crd().V1alpha2().TrafficControls() @@ -179,6 +181,7 @@ func run(o *Options) error { enableMulticlusterGW, groupIDAllocator, *o.config.EnablePrometheusMetrics, + features.DefaultFeatureGate.Enabled(features.PacketSampling), o.config.PacketInRate, ) @@ -630,6 +633,21 @@ func run(o *Options) error { o.enableAntreaProxy) } + var packetSamplingController *packetsampling.Controller + if features.DefaultFeatureGate.Enabled(features.PacketSampling) { + packetSamplingController = packetsampling.NewPacketSamplingController( + k8sClient, + crdClient, + serviceInformer, + endpointsInformer, + packetSamplingInformer, + ofClient, + ifaceStore, + nodeConfig, + o.enableAntreaProxy, + ) + } + // TODO: we should call this after installing flows for initial node routes // and initial NetworkPolicies so that no packets will be mishandled. if err := agentInitializer.FlowRestoreComplete(); err != nil { @@ -771,6 +789,10 @@ func run(o *Options) error { go traceflowController.Run(stopCh) } + if features.DefaultFeatureGate.Enabled(features.PacketSampling) { + go packetSamplingController.Run(stopCh) + } + if o.enableAntreaProxy { go proxier.GetProxyProvider().Run(stopCh) diff --git a/go.mod b/go.mod index 80241bb3dc0..21282664ba1 100644 --- a/go.mod +++ b/go.mod @@ -27,6 +27,7 @@ require ( github.com/go-logr/logr v1.4.1 github.com/gogo/protobuf v1.3.2 github.com/google/btree v1.1.2 + github.com/google/gopacket v1.1.19 github.com/google/uuid v1.6.0 github.com/hashicorp/memberlist v0.5.1 github.com/k8snetworkplumbingwg/network-attachment-definition-client v1.3.0 diff --git a/go.sum b/go.sum index fcbea231037..f6f210facbd 100644 --- a/go.sum +++ b/go.sum @@ -363,6 +363,8 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= +github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc= github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= @@ -825,6 +827,8 @@ golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTk golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= @@ -984,6 +988,7 @@ golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgw golang.org/x/tools v0.0.0-20190617190820-da514acc4774/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190920225731-5eefd052ad72/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= diff --git a/hack/.notableofcontents b/hack/.notableofcontents index 476abf7da99..99ca1129f53 100644 --- a/hack/.notableofcontents +++ b/hack/.notableofcontents @@ -38,6 +38,7 @@ docs/noencap-hybrid-modes.md docs/octant-plugin-installation.md docs/os-issues.md docs/ovs-offload.md +docs/packetsampling-guide.md docs/prometheus-integration.md docs/secondary-network.md docs/security.md diff --git a/pkg/agent/controller/networkpolicy/audit_logging.go b/pkg/agent/controller/networkpolicy/audit_logging.go index 3dd3a82695b..717b133ccb2 100644 --- a/pkg/agent/controller/networkpolicy/audit_logging.go +++ b/pkg/agent/controller/networkpolicy/audit_logging.go @@ -233,7 +233,7 @@ func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, packet *binding.Packet, c *Con // Get disposition Allow or Drop. match = getMatchRegField(matchers, openflow.APDispositionField) - disposition, err := getInfoInReg(match, openflow.APDispositionField.GetRange().ToNXRange()) + disposition, err := openflow.GetInfoInReg(match, openflow.APDispositionField.GetRange().ToNXRange()) if err != nil { return fmt.Errorf("received error while unloading disposition from reg: %v", err) } @@ -241,7 +241,7 @@ func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, packet *binding.Packet, c *Con // Get layer 7 NetworkPolicy redirect action, if traffic is redirected, disposition log should be overwritten. if match = getMatchRegField(matchers, openflow.L7NPRegField); match != nil { - l7NPRegVal, err := getInfoInReg(match, openflow.L7NPRegField.GetRange().ToNXRange()) + l7NPRegVal, err := openflow.GetInfoInReg(match, openflow.L7NPRegField.GetRange().ToNXRange()) if err != nil { return fmt.Errorf("received error while unloading l7 NP redirect value from reg: %v", err) } @@ -252,7 +252,7 @@ func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, packet *binding.Packet, c *Con // Get K8s default deny action, if traffic is default deny, no conjunction could be matched. if match = getMatchRegField(matchers, openflow.APDenyRegMark.GetField()); match != nil { - apDenyRegVal, err := getInfoInReg(match, openflow.APDenyRegMark.GetField().GetRange().ToNXRange()) + apDenyRegVal, err := openflow.GetInfoInReg(match, openflow.APDenyRegMark.GetField().GetRange().ToNXRange()) if err != nil { return fmt.Errorf("received error while unloading deny mark from reg: %v", err) } @@ -269,7 +269,7 @@ func getNetworkPolicyInfo(pktIn *ofctrl.PacketIn, packet *binding.Packet, c *Con match = getMatch(matchers, tableID, disposition) // Get NetworkPolicy full name and OF priority of the conjunction. - conjID, err := getInfoInReg(match, nil) + conjID, err := openflow.GetInfoInReg(match, nil) if err != nil { return fmt.Errorf("received error while unloading conjunction id from reg: %v", err) } diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index ac7de7f95ea..1c1fd2e7403 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -21,7 +21,6 @@ import ( "net/netip" "time" - "antrea.io/libOpenflow/openflow15" "antrea.io/ofnet/ofctrl" "github.com/vmware/go-ipfix/pkg/registry" "k8s.io/klog/v2" @@ -91,18 +90,6 @@ func getMatch(matchers *ofctrl.Matchers, tableID uint8, disposition uint32) *ofc return nil } -// getInfoInReg unloads and returns data stored in the match field. -func getInfoInReg(regMatch *ofctrl.MatchField, rng *openflow15.NXRange) (uint32, error) { - regValue, ok := regMatch.GetValue().(*ofctrl.NXRegister) - if !ok { - return 0, errors.New("register value cannot be retrieved") - } - if rng != nil { - return ofctrl.GetUint32ValueWithRange(regValue.Data, rng), nil - } - return regValue.Data, nil -} - func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error { packet, err := binding.ParsePacketIn(pktIn) if err != nil { @@ -147,7 +134,7 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error { tableID := getPacketInTableID(pktIn) // Get disposition Allow, Drop or Reject match = getMatchRegField(matchers, openflow.APDispositionField) - id, err := getInfoInReg(match, openflow.APDispositionField.GetRange().ToNXRange()) + id, err := openflow.GetInfoInReg(match, openflow.APDispositionField.GetRange().ToNXRange()) if err != nil { return fmt.Errorf("error when getting disposition from reg: %v", err) } @@ -156,7 +143,7 @@ func (c *Controller) storeDenyConnection(pktIn *ofctrl.PacketIn) error { // Set match to corresponding ingress/egress reg according to disposition match = getMatch(matchers, tableID, id) if match != nil { - ruleID, err := getInfoInReg(match, nil) + ruleID, err := openflow.GetInfoInReg(match, nil) if err != nil { return fmt.Errorf("error when obtaining rule id from reg: %v", err) } @@ -223,7 +210,7 @@ func getPacketInTableID(pktIn *ofctrl.PacketIn) uint8 { tableID := pktIn.TableId matchers := pktIn.GetMatches() if match := getMatchRegField(matchers, openflow.PacketInTableField); match != nil { - tableVal, err := getInfoInReg(match, openflow.PacketInTableField.GetRange().ToNXRange()) + tableVal, err := openflow.GetInfoInReg(match, openflow.PacketInTableField.GetRange().ToNXRange()) if err == nil { return uint8(tableVal) } else { diff --git a/pkg/agent/controller/networkpolicy/reject.go b/pkg/agent/controller/networkpolicy/reject.go index 75de5abd6b5..4c2fcc95635 100644 --- a/pkg/agent/controller/networkpolicy/reject.go +++ b/pkg/agent/controller/networkpolicy/reject.go @@ -142,7 +142,7 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { if c.antreaProxyEnabled { matches := pktIn.GetMatches() if match := getMatchRegField(matches, openflow.ServiceEPStateField); match != nil { - svcEpstate, err := getInfoInReg(match, openflow.ServiceEPStateField.GetRange().ToNXRange()) + svcEpstate, err := openflow.GetInfoInReg(match, openflow.ServiceEPStateField.GetRange().ToNXRange()) if err != nil { return false } @@ -343,7 +343,7 @@ func parseFlexibleIPAMStatus(pktIn *ofctrl.PacketIn, nodeConfig *config.NodeConf // The generated reject packet should have same ctZone with the incoming packet, otherwise the conntrack cannot work properly. matches := pktIn.GetMatches() if match := getMatchRegField(matches, openflow.CtZoneField); match != nil { - ctZone, err = getInfoInReg(match, openflow.CtZoneField.GetRange().ToNXRange()) + ctZone, err = openflow.GetInfoInReg(match, openflow.CtZoneField.GetRange().ToNXRange()) if err != nil { return false, false, 0, err } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 8354e938850..71fc97a6fe4 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -234,9 +234,15 @@ type Client interface { // InstallTraceflowFlows installs flows for a Traceflow request. InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error + // InstallPacketSamplingFlows installs flows for a Packet Sampling request. + InstallPacketSamplingFlows(dataplaneTag uint8, senderOnly bool, receiverOnly bool, packet *binding.Packet, endpointPackets []binding.Packet, ofPort uint32, timeoutSeconds uint16) error + // UninstallTraceflowFlows uninstalls flows for a Traceflow request. UninstallTraceflowFlows(dataplaneTag uint8) error + // UninstallPacketSamplingFlows uninstalls flows for a Packet Sampling request. + UninstallPacketSamplingFlows(dataplaneTag uint8) error + // GetPolicyInfoFromConjunction returns the following policy information for the provided conjunction ID: // NetworkPolicy reference, OF priority, rule name, label // The boolean return value indicates whether the policy information was found. @@ -928,6 +934,7 @@ func (c *client) generatePipelines() { c.enableL7FlowExporter) c.activatedFeatures = append(c.activatedFeatures, c.featurePodConnectivity) c.traceableFeatures = append(c.traceableFeatures, c.featurePodConnectivity) + c.samplingFeatures = append(c.samplingFeatures, c.featurePodConnectivity) c.featureService = newFeatureService(c.cookieAllocator, c.nodeIPChecker, @@ -943,6 +950,7 @@ func (c *client) generatePipelines() { c.connectUplinkToBridge) c.activatedFeatures = append(c.activatedFeatures, c.featureService) c.traceableFeatures = append(c.traceableFeatures, c.featureService) + c.samplingFeatures = append(c.samplingFeatures, c.featureService) } if c.nodeType == config.ExternalNode { @@ -990,6 +998,11 @@ func (c *client) generatePipelines() { c.featureTraceflow = newFeatureTraceflow() c.activatedFeatures = append(c.activatedFeatures, c.featureTraceflow) + if c.enablePacketSampling { + c.featurePacketSampling = newFeaturePacketSampling() + c.activatedFeatures = append(c.activatedFeatures, c.featurePacketSampling) + } + // Pipelines to generate. pipelineIDs := []binding.PipelineID{pipelineRoot, pipelineIP} if c.networkConfig.IPv4Enabled { @@ -1234,6 +1247,22 @@ func (c *client) SendTraceflowPacket(dataplaneTag uint8, packet *binding.Packet, return c.bridge.SendPacketOut(packetOutObj) } +func (c *client) InstallPacketSamplingFlows(dataplaneTag uint8, senderOnly, receiverOnly bool, packet *binding.Packet, endpointPackets []binding.Packet, ofPort uint32, timeoutSeconds uint16) error { + cacheKey := fmt.Sprintf("%x", dataplaneTag) + var flows []binding.Flow + for _, f := range c.samplingFeatures { + flows = append(flows, f.flowsToSampling(dataplaneTag, + c.ovsMetersAreSupported, + senderOnly, + receiverOnly, + packet, + endpointPackets, + ofPort, + timeoutSeconds)...) + } + return c.addFlows(c.featurePacketSampling.cachedFlows, cacheKey, flows) +} + func (c *client) InstallTraceflowFlows(dataplaneTag uint8, liveTraffic, droppedOnly, receiverOnly bool, packet *binding.Packet, ofPort uint32, timeoutSeconds uint16) error { cacheKey := fmt.Sprintf("%x", dataplaneTag) var flows []binding.Flow @@ -1255,6 +1284,11 @@ func (c *client) UninstallTraceflowFlows(dataplaneTag uint8) error { return c.deleteFlows(c.featureTraceflow.cachedFlows, cacheKey) } +func (c *client) UninstallPacketSamplingFlows(dataplaneTag uint8) error { + cacheKey := fmt.Sprintf("%x", dataplaneTag) + return c.deleteFlows(c.featurePacketSampling.cachedFlows, cacheKey) +} + // setBasePacketOutBuilder sets base IP properties of a packetOutBuilder which can have more packet data added. func setBasePacketOutBuilder(packetOutBuilder binding.PacketOutBuilder, srcMAC string, dstMAC string, srcIP string, dstIP string, inPort uint32, outPort uint32) (binding.PacketOutBuilder, error) { // Set ethernet header. diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 3ff5d7fd808..5ff01c3da5f 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -94,6 +94,7 @@ type clientOptions struct { enableMulticluster bool enableL7NetworkPolicy bool enableL7FlowExporter bool + enablePacketSampling bool trafficEncryptionMode config.TrafficEncryptionModeType } @@ -168,6 +169,10 @@ func enableMulticluster(o *clientOptions) { o.enableMulticluster = true } +func enablePacketSampling(o *clientOptions) { + o.enablePacketSampling = true +} + func setTrafficEncryptionMode(trafficEncryptionMode config.TrafficEncryptionModeType) clientOptionsFn { return func(o *clientOptions) { o.trafficEncryptionMode = trafficEncryptionMode @@ -419,6 +424,7 @@ func newFakeClientWithBridge( o.enableMulticluster, NewGroupAllocator(), false, + o.enablePacketSampling, defaultPacketInRate) // Meters must be supported to enable Egress traffic shaping. @@ -1778,6 +1784,126 @@ func Test_client_InstallEgressQoS(t *testing.T) { require.False(t, ok) } +func Test_client_InstallPacketSamplingFlows(t *testing.T) { + type fields struct { + } + type args struct { + dataplaneTag uint8 + senderOnly bool + receiverOnly bool + packet *binding.Packet + endpointsPacket []binding.Packet + } + srcMAC, _ := net.ParseMAC("11:22:33:44:55:66") + dstMAC, _ := net.ParseMAC("11:22:33:44:55:77") + tests := []struct { + name string + fields fields + args args + wantErr bool + prepareFunc func(*gomock.Controller) *client + }{ + { + name: "packetsampling flow", + fields: fields{}, + args: args{ + dataplaneTag: 1, + packet: &binding.Packet{ + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1.2.3.4"), + DestinationIP: net.ParseIP("1.2.3.5"), + IPProto: 1, + TTL: 64, + }, + }, + wantErr: false, + prepareFunc: preparePacketSamplingFlow, + }, + { + name: "packetsampling flow with receiver only", + fields: fields{}, + args: args{ + dataplaneTag: 1, + receiverOnly: true, + packet: &binding.Packet{ + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1.2.3.4"), + DestinationIP: net.ParseIP("1.2.3.5"), + IPProto: 1, + TTL: 64, + }, + }, + wantErr: false, + prepareFunc: preparePacketSamplingFlow, + }, + { + name: "packetsampling flow with sender only", + fields: fields{}, + args: args{ + dataplaneTag: 1, + senderOnly: true, + packet: &binding.Packet{ + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1.2.3.4"), + DestinationIP: net.ParseIP("1.2.3.5"), + IPProto: 1, + TTL: 64, + }, + }, + wantErr: false, + prepareFunc: preparePacketSamplingFlow, + }, + { + name: "packetsampling flow with endpoints packets", + fields: fields{}, + args: args{ + dataplaneTag: 1, + senderOnly: true, + packet: &binding.Packet{ + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1.2.3.4"), + DestinationIP: net.ParseIP("1.2.3.5"), + IPProto: 1, + TTL: 64, + }, + endpointsPacket: []binding.Packet{ + { + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1.2.3.4"), + DestinationIP: net.ParseIP("1.2.3.6"), + IPProto: 1, + TTL: 64, + }, + { + SourceMAC: srcMAC, + DestinationMAC: dstMAC, + SourceIP: net.ParseIP("1.2.3.4"), + DestinationIP: net.ParseIP("1.2.3.7"), + IPProto: 1, + TTL: 64, + }, + }, + }, + wantErr: false, + prepareFunc: preparePacketSamplingFlow, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + c := tt.prepareFunc(ctrl) + if err := c.InstallPacketSamplingFlows(tt.args.dataplaneTag, tt.args.senderOnly, tt.args.receiverOnly, tt.args.packet, nil, 0, 300); (err != nil) != tt.wantErr { + t.Errorf("InstallPacketSamplingFlows() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} + func Test_client_InstallTraceflowFlows(t *testing.T) { type fields struct { } @@ -1918,6 +2044,32 @@ func Test_client_SendTraceflowPacket(t *testing.T) { } } +func preparePacketSamplingFlow(ctrl *gomock.Controller) *client { + m := opstest.NewMockOFEntryOperations(ctrl) + fc := newFakeClientWithBridge(m, true, false, config.K8sNode, config.TrafficEncapModeEncap, ovsoftest.NewMockBridge(ctrl), enablePacketSampling) + defer resetPipelines() + + m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1) + _, ipCIDR, _ := net.ParseCIDR("192.168.2.30/32") + flows, _ := EgressDefaultTable.ofTable.BuildFlow(priority100).Action().Drop().Done().GetBundleMessages(binding.AddMessage) + flowMsg := flows[0].GetMessage().(*openflow15.FlowMod) + ctx := &conjMatchFlowContext{ + dropFlow: flowMsg, + dropFlowEnableLogging: false, + conjunctiveMatch: &conjunctiveMatch{ + tableID: 1, + matchPairs: []matchPair{ + { + matchKey: MatchCTSrcIPNet, + matchValue: *ipCIDR, + }, + }, + }} + fc.featureNetworkPolicy.globalConjMatchFlowCache["mockContext"] = ctx + fc.featureNetworkPolicy.policyCache.Add(&policyRuleConjunction{metricFlows: []*openflow15.FlowMod{flowMsg}}) + return fc +} + func prepareTraceflowFlow(ctrl *gomock.Controller) *client { m := opstest.NewMockOFEntryOperations(ctrl) fc := newFakeClientWithBridge(m, true, false, config.K8sNode, config.TrafficEncapModeEncap, ovsoftest.NewMockBridge(ctrl)) @@ -2031,7 +2183,7 @@ func Test_client_setBasePacketOutBuilder(t *testing.T) { } func prepareSetBasePacketOutBuilder(ctrl *gomock.Controller, success bool) *client { - ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, false, false, nil, false, defaultPacketInRate) + ofClient := NewClient(bridgeName, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, false, false, nil, false, false, defaultPacketInRate) m := ovsoftest.NewMockBridge(ctrl) ofClient.bridge = m bridge := binding.OFBridge{} diff --git a/pkg/agent/openflow/cookie/allocator.go b/pkg/agent/openflow/cookie/allocator.go index 3aef3db4c84..4ec3dc01f4e 100644 --- a/pkg/agent/openflow/cookie/allocator.go +++ b/pkg/agent/openflow/cookie/allocator.go @@ -39,6 +39,7 @@ const ( Multicluster Traceflow ExternalNodeConnectivity + PacketSampling ) func (c Category) String() string { @@ -61,6 +62,8 @@ func (c Category) String() string { return "Traceflow" case ExternalNodeConnectivity: return "ExternalNodeConnectivity" + case PacketSampling: + return "PacketSampling" default: return "Invalid" } diff --git a/pkg/agent/openflow/fields.go b/pkg/agent/openflow/fields.go index 87d0521af2e..7708d1dea7e 100644 --- a/pkg/agent/openflow/fields.go +++ b/pkg/agent/openflow/fields.go @@ -148,6 +148,9 @@ var ( // consider the packet external sourced as the other IPs are routable externally anyway. FromExternalRegMark = binding.NewOneBitRegMark(4, 27) + // reg4[28..31]: Field mark the flow for packet sampling case. + PacketSamplingMark = binding.NewRegField(4, 28, 31) + // reg5(NXM_NX_REG5) // Field to cache the Egress conjunction ID hit by TraceFlow packet. TFEgressConjIDField = binding.NewRegField(5, 0, 31) diff --git a/pkg/agent/openflow/framework.go b/pkg/agent/openflow/framework.go index d8c353ee2a2..d64073e8c0e 100644 --- a/pkg/agent/openflow/framework.go +++ b/pkg/agent/openflow/framework.go @@ -309,6 +309,10 @@ func (f *featureTraceflow) getRequiredTables() []*Table { return nil } +func (f *featurePacketSampling) getRequiredTables() []*Table { + return nil +} + func (f *featureExternalNodeConnectivity) getRequiredTables() []*Table { return []*Table{ ConntrackTable, @@ -336,3 +340,14 @@ type traceableFeature interface { ofPort uint32, timeoutSeconds uint16) []binding.Flow } + +type samplingFeature interface { + flowsToSampling(dataplaneTag uint8, + ovsMetersAreSupported, + senderOnly bool, + receiverOnly bool, + packet *binding.Packet, + endpointPackets []binding.Packet, + ofPort uint32, + timeoutSeconds uint16) []binding.Flow +} diff --git a/pkg/agent/openflow/packetin.go b/pkg/agent/openflow/packetin.go index 608f1759e34..cc4cdf5d0ea 100644 --- a/pkg/agent/openflow/packetin.go +++ b/pkg/agent/openflow/packetin.go @@ -53,6 +53,8 @@ const ( // PacketInCategorySvcReject is used to process the Service packets not matching any // Endpoints within packetIn message. PacketInCategorySvcReject + // PacketInCategoryPS is used for packetIn messages related to sampling. + PacketInCategoryPS // PacketIn operations below are used to decide which operation(s) should be // executed by a handler. It(they) should be loaded in the second byte of the diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 9daed3adc33..8f958da4440 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -402,6 +402,7 @@ type client struct { enableL7FlowExporter bool enableMulticluster bool enablePrometheusMetrics bool + enablePacketSampling bool connectUplinkToBridge bool nodeType config.NodeType roundInfo types.RoundInfo @@ -421,6 +422,9 @@ type client struct { featureTraceflow *featureTraceflow traceableFeatures []traceableFeature + featurePacketSampling *featurePacketSampling + samplingFeatures []samplingFeature + pipelines map[binding.PipelineID]binding.Pipeline // ofEntryOperations is a wrapper interface for operating multiple OpenFlow entries with action AddAll / ModifyAll / DeleteAll. @@ -847,6 +851,209 @@ func (f *featureService) snatConntrackFlows() []binding.Flow { return flows } +func matchTransportHeader(packet *binding.Packet, flowBuilder binding.FlowBuilder, endpointPackets []binding.Packet) binding.FlowBuilder { + // Match transport header + switch packet.IPProto { + case protocol.Type_ICMP: + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolICMP) + case protocol.Type_IPv6ICMP: + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolICMPv6) + case protocol.Type_TCP: + if packet.IsIPv6 { + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolTCPv6) + } else { + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolTCP) + } + case protocol.Type_UDP: + if packet.IsIPv6 { + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolUDPv6) + } else { + flowBuilder = flowBuilder.MatchProtocol(binding.ProtocolUDP) + } + default: + flowBuilder = flowBuilder.MatchIPProtocolValue(packet.IsIPv6, packet.IPProto) + } + if packet.IPProto == protocol.Type_TCP || packet.IPProto == protocol.Type_UDP { + if endpointPackets != nil && endpointPackets[0].DestinationPort != 0 { + flowBuilder = flowBuilder.MatchDstPort(endpointPackets[0].DestinationPort, nil) + } else if packet.DestinationPort != 0 { + flowBuilder = flowBuilder.MatchDstPort(packet.DestinationPort, nil) + } + if packet.SourcePort != 0 { + flowBuilder = flowBuilder.MatchSrcPort(packet.SourcePort, nil) + } + } + + return flowBuilder +} + +// flowsToSampling generates flows for packet sampling. DSCP flag is used as a mark for the target flow. +// When output, the flag will be cleared. +func (f *featurePodConnectivity) flowsToSampling(dataplaneTag uint8, + ovsMetersAreSupported, + senderOnly bool, + receiverOnly bool, + packet *binding.Packet, + endpointPackets []binding.Packet, + ofPort uint32, + timeout uint16) []binding.Flow { + cookieID := f.cookieAllocator.Request(cookie.PacketSampling).Raw() + var flows []binding.Flow + + tag := uint32(dataplaneTag) + + var flowBuilder binding.FlowBuilder + if !receiverOnly { + // if not receiverOnly, ofPort is inPort + if endpointPackets == nil { + flowBuilder = ConntrackStateTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchInPort(ofPort). + MatchCTStateTrk(true). + Action().LoadToRegField(PacketSamplingMark, tag). + SetHardTimeout(timeout). + Action().GotoStage(stagePreRouting) + if packet.DestinationIP != nil { + flowBuilder = flowBuilder.MatchDstIP(packet.DestinationIP) + } + } else { + // generate flows to endpoints. + for _, epPacket := range endpointPackets { + tmpFlowBuilder := ConntrackStateTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchInPort(ofPort). + MatchCTStateTrk(true). + Action().LoadToRegField(PacketSamplingMark, tag). + SetHardTimeout(timeout). + Action().GotoStage(stagePreRouting) + tmpFlowBuilder.MatchDstIP(epPacket.DestinationIP) + flow := matchTransportHeader(packet, tmpFlowBuilder, endpointPackets).Done() + flows = append(flows, flow) + } + } + } else { + flowBuilder = L2ForwardingCalcTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchCTStateTrk(true). + MatchDstMAC(packet.DestinationMAC). + Action().LoadToRegField(TargetOFPortField, ofPort). + Action().LoadRegMark(OutputToOFPortRegMark). + Action().LoadToRegField(PacketSamplingMark, tag). + SetHardTimeout(timeout). + Action().GotoStage(stageIngressSecurity) + if packet.SourceIP != nil { + flowBuilder = flowBuilder.MatchSrcIP(packet.SourceIP) + } + } + + // for sender only case, capture the first tracked packet for svc. + if senderOnly { + for _, ipProtocol := range f.ipProtocols { + tmpFlowBuilder := ConntrackStateTable.ofTable.BuildFlow(priorityHigh). + Cookie(cookieID). + MatchInPort(ofPort). + MatchProtocol(ipProtocol). + MatchCTMark(ServiceCTMark). + MatchCTStateNew(true). + MatchCTStateTrk(true). + Action().LoadRegMark(RewriteMACRegMark). + Action().LoadToRegField(PacketSamplingMark, tag). + SetHardTimeout(timeout). + Action().GotoStage(stageEgressSecurity) + tmpFlowBuilder = matchTransportHeader(packet, tmpFlowBuilder, nil) + flows = append(flows, tmpFlowBuilder.Done()) + } + } + + if flowBuilder != nil { + flow := matchTransportHeader(packet, flowBuilder, nil).Done() + flows = append(flows, flow) + } + + output := func(fb binding.FlowBuilder) binding.FlowBuilder { + return fb.Action().OutputToRegField(TargetOFPortField) + } + + sendToController := func(fb binding.FlowBuilder) binding.FlowBuilder { + if ovsMetersAreSupported { + fb = fb.Action().Meter(PacketInMeterIDTF) + } + fb = fb.Action().SendToController([]byte{uint8(PacketInCategoryPS)}, false) + return fb + } + + // This generates PacketSampling specific flows that outputs sampling + // non-hairpin packets to OVS port and Antrea Agent after + // L2 forwarding calculation. + for _, ipProtocol := range f.ipProtocols { + if f.networkConfig.TrafficEncapMode.SupportsEncap() { + // SendToController and Output if output port is tunnel port. + fb := OutputTable.ofTable.BuildFlow(priorityNormal+3). + Cookie(cookieID). + MatchRegFieldWithValue(TargetOFPortField, f.tunnelPort). + MatchProtocol(ipProtocol). + MatchRegMark(OutputToOFPortRegMark). + MatchRegFieldWithValue(PacketSamplingMark, tag). + SetHardTimeout(timeout). + Action().OutputToRegField(TargetOFPortField) + fb = sendToController(fb) + flows = append(flows, fb.Done()) + // For injected packets, only SendToController if output port is local gateway. In encapMode, a PacketSampling + // packet going out of the gateway port (i.e. exiting the overlay) essentially means that the PacketSampling + // request is complete. + fb = OutputTable.ofTable.BuildFlow(priorityNormal+2). + Cookie(cookieID). + MatchRegFieldWithValue(TargetOFPortField, f.gatewayPort). + MatchProtocol(ipProtocol). + MatchRegMark(OutputToOFPortRegMark). + MatchRegFieldWithValue(PacketSamplingMark, tag). + SetHardTimeout(timeout) + fb = sendToController(fb) + fb = output(fb) + flows = append(flows, fb.Done()) + } else { + // SendToController and Output if output port is local gateway. Unlike in encapMode, inter-Node Pod-to-Pod + // traffic is expected to go out of the gateway port on the way to its destination. + fb := OutputTable.ofTable.BuildFlow(priorityNormal+2). + Cookie(cookieID). + MatchRegFieldWithValue(TargetOFPortField, f.gatewayPort). + MatchProtocol(ipProtocol). + MatchRegMark(OutputToOFPortRegMark). + MatchRegFieldWithValue(PacketSamplingMark, tag). + SetHardTimeout(timeout). + Action().OutputToRegField(TargetOFPortField) + fb = sendToController(fb) + flows = append(flows, fb.Done()) + } + + gatewayIP := f.gatewayIPs[ipProtocol] + if gatewayIP != nil { + fb := OutputTable.ofTable.BuildFlow(priorityNormal+3). + Cookie(cookieID). + MatchRegFieldWithValue(TargetOFPortField, f.gatewayPort). + MatchProtocol(ipProtocol). + MatchDstIP(gatewayIP). + MatchRegMark(OutputToOFPortRegMark). + MatchRegFieldWithValue(PacketSamplingMark, tag). + SetHardTimeout(timeout) + fb = sendToController(fb) + fb = output(fb) + flows = append(flows, fb.Done()) + } + + fb := OutputTable.ofTable.BuildFlow(priorityNormal+2). + Cookie(cookieID). + MatchProtocol(ipProtocol). + MatchRegMark(OutputToOFPortRegMark). + MatchRegFieldWithValue(PacketSamplingMark, tag). + SetHardTimeout(timeout) + fb = sendToController(fb) + fb = output(fb) + flows = append(flows, fb.Done()) + } + return flows +} + // TODO: Use DuplicateToBuilder or integrate this function into original one to avoid unexpected difference. // flowsToTrace generates Traceflow specific flows in the connectionTrackStateTable or L2ForwardingCalcTable for featurePodConnectivity. // When packet is not provided, the flows bypass the drop flow in conntrackStateFlow to avoid unexpected drop of the @@ -936,14 +1143,7 @@ func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8, default: flowBuilder = flowBuilder.MatchIPProtocolValue(packet.IsIPv6, packet.IPProto) } - if packet.IPProto == protocol.Type_TCP || packet.IPProto == protocol.Type_UDP { - if packet.DestinationPort != 0 { - flowBuilder = flowBuilder.MatchDstPort(packet.DestinationPort, nil) - } - if packet.SourcePort != 0 { - flowBuilder = flowBuilder.MatchSrcPort(packet.SourcePort, nil) - } - } + flows = append(flows, flowBuilder.Done()) } @@ -1040,6 +1240,44 @@ func (f *featurePodConnectivity) flowsToTrace(dataplaneTag uint8, return flows } +// flowsToSampling is used to generate flows for PacketSampling in featureService. +func (f *featureService) flowsToSampling(dataplaneTag uint8, + ovsMetersAreSupported, + senderOnly bool, + receiverOnly bool, + packet *binding.Packet, + endpointPackets []binding.Packet, + ofPort uint32, + timeout uint16) []binding.Flow { + cookieID := f.cookieAllocator.Request(cookie.PacketSampling).Raw() + var flows []binding.Flow + + sendToController := func(fb binding.FlowBuilder) binding.FlowBuilder { + if ovsMetersAreSupported { + fb = fb.Action().Meter(PacketInMeterIDTF) + } + fb = fb.Action().SendToController([]byte{uint8(PacketInCategoryPS)}, false) + return fb + } + + // This generates PacketSampling specific flows that outputs hairpin PacketSampling packets to OVS port and Antrea Agent after + // L2forwarding calculation. + for _, ipProtocol := range f.ipProtocols { + if f.enableProxy { + fb := OutputTable.ofTable.BuildFlow(priorityHigh+2). + Cookie(cookieID). + MatchProtocol(ipProtocol). + MatchCTMark(HairpinCTMark). + MatchRegFieldWithValue(PacketSamplingMark, uint32(dataplaneTag)). + SetHardTimeout(timeout) + fb = sendToController(fb) + fb = fb.Action().OutputToRegField(TargetOFPortField) + flows = append(flows, fb.Done()) + } + } + return flows +} + // flowsToTrace is used to generate flows for Traceflow in featureService. func (f *featureService) flowsToTrace(dataplaneTag uint8, ovsMetersAreSupported, @@ -2834,6 +3072,7 @@ func NewClient(bridgeName string, enableMulticluster bool, groupIDAllocator GroupAllocator, enablePrometheusMetrics bool, + enablePacketSampling bool, packetInRate int, ) *client { bridge := binding.NewOFBridge(bridgeName, mgmtAddr) @@ -2853,6 +3092,7 @@ func NewClient(bridgeName string, enableL7FlowExporter: enableL7FlowExporter, enableMulticluster: enableMulticluster, enablePrometheusMetrics: enablePrometheusMetrics, + enablePacketSampling: enablePacketSampling, connectUplinkToBridge: connectUplinkToBridge, pipelines: make(map[binding.PipelineID]binding.Pipeline), packetInHandlers: map[uint8]PacketInHandler{}, diff --git a/pkg/agent/openflow/pipeline_test.go b/pkg/agent/openflow/pipeline_test.go index 451abe7ccc1..3d094d15ca8 100644 --- a/pkg/agent/openflow/pipeline_test.go +++ b/pkg/agent/openflow/pipeline_test.go @@ -19,6 +19,7 @@ import ( "testing" "antrea.io/libOpenflow/openflow15" + "antrea.io/libOpenflow/protocol" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -301,3 +302,94 @@ func getGroupModLen(g *openflow15.GroupMod) uint32 { } return n } + +func TestMatchTransportHeader(t *testing.T) { + + testCases := []struct { + name string + packet *binding.Packet + endpointPackets []binding.Packet + expectCalls func(ctrl *gomock.Controller, builder *openflowtest.MockFlowBuilder) *openflowtest.MockFlowBuilder + }{ + { + name: "tcp proto", + packet: &binding.Packet{ + IPProto: protocol.Type_TCP, + }, + expectCalls: func(ctrl *gomock.Controller, builder *openflowtest.MockFlowBuilder) *openflowtest.MockFlowBuilder { + builder.EXPECT().MatchProtocol(binding.ProtocolTCP) + return builder + + }, + }, + { + name: "udp proto", + packet: &binding.Packet{ + IPProto: protocol.Type_UDP, + }, + expectCalls: func(ctrl *gomock.Controller, builder *openflowtest.MockFlowBuilder) *openflowtest.MockFlowBuilder { + builder.EXPECT().MatchProtocol(binding.ProtocolUDP) + return builder + }, + }, + { + name: "ipv6-tcp", + packet: &binding.Packet{ + IPProto: protocol.Type_TCP, + IsIPv6: true, + }, + expectCalls: func(ctrl *gomock.Controller, builder *openflowtest.MockFlowBuilder) *openflowtest.MockFlowBuilder { + builder.EXPECT().MatchProtocol(binding.ProtocolTCPv6) + return builder + }, + }, + { + name: "udp-with-src-and-dst-port", + packet: &binding.Packet{ + IPProto: protocol.Type_UDP, + SourcePort: 1000, + DestinationPort: 53, + }, + expectCalls: func(ctrl *gomock.Controller, builder *openflowtest.MockFlowBuilder) *openflowtest.MockFlowBuilder { + builder.EXPECT().MatchProtocol(binding.ProtocolUDP).Return(builder).AnyTimes() + builder.EXPECT().MatchDstPort(uint16(53), nil).Return(builder).AnyTimes() + builder.EXPECT().MatchSrcPort(uint16(1000), nil).Return(builder).AnyTimes() + return builder + }, + }, + { + name: "with endpoints packets", + packet: &binding.Packet{ + IPProto: protocol.Type_TCP, + SourcePort: 1000, + DestinationPort: 53, + }, + endpointPackets: []binding.Packet{ + { + IPProto: protocol.Type_TCP, + SourcePort: 1000, + DestinationPort: 54, + }, + }, + expectCalls: func(ctrl *gomock.Controller, builder *openflowtest.MockFlowBuilder) *openflowtest.MockFlowBuilder { + builder.EXPECT().MatchProtocol(binding.ProtocolTCP).Return(builder).AnyTimes() + builder.EXPECT().MatchDstPort(uint16(54), nil).Return(builder).AnyTimes() + builder.EXPECT().MatchSrcPort(uint16(1000), nil).Return(builder).AnyTimes() + return builder + }, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + fakeOfTable := openflowtest.NewMockTable(ctrl) + ConntrackStateTable.ofTable = fakeOfTable + defer func() { + ConntrackStateTable.ofTable = nil + }() + testBuilder := openflowtest.NewMockFlowBuilder(ctrl) + tc.expectCalls(ctrl, testBuilder) + matchTransportHeader(tc.packet, testBuilder, tc.endpointPackets) + }) + } +} diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index cb1f4cb8cbc..22b83dfc317 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -419,6 +419,20 @@ func (mr *MockClientMockRecorder) InstallNodeFlows(arg0, arg1, arg2, arg3, arg4 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallNodeFlows", reflect.TypeOf((*MockClient)(nil).InstallNodeFlows), arg0, arg1, arg2, arg3, arg4) } +// InstallPacketSamplingFlows mocks base method. +func (m *MockClient) InstallPacketSamplingFlows(arg0 byte, arg1, arg2 bool, arg3 *openflow0.Packet, arg4 []openflow0.Packet, arg5 uint32, arg6 uint16) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallPacketSamplingFlows", arg0, arg1, arg2, arg3, arg4, arg5, arg6) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallPacketSamplingFlows indicates an expected call of InstallPacketSamplingFlows. +func (mr *MockClientMockRecorder) InstallPacketSamplingFlows(arg0, arg1, arg2, arg3, arg4, arg5, arg6 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallPacketSamplingFlows", reflect.TypeOf((*MockClient)(nil).InstallPacketSamplingFlows), arg0, arg1, arg2, arg3, arg4, arg5, arg6) +} + // InstallPodFlows mocks base method. func (m *MockClient) InstallPodFlows(arg0 string, arg1 []net.IP, arg2 net.HardwareAddr, arg3 uint32, arg4 uint16, arg5 *uint32) error { m.ctrl.T.Helper() @@ -959,6 +973,20 @@ func (mr *MockClientMockRecorder) UninstallNodeFlows(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallNodeFlows", reflect.TypeOf((*MockClient)(nil).UninstallNodeFlows), arg0) } +// UninstallPacketSamplingFlows mocks base method. +func (m *MockClient) UninstallPacketSamplingFlows(arg0 byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallPacketSamplingFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallPacketSamplingFlows indicates an expected call of UninstallPacketSamplingFlows. +func (mr *MockClientMockRecorder) UninstallPacketSamplingFlows(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallPacketSamplingFlows", reflect.TypeOf((*MockClient)(nil).UninstallPacketSamplingFlows), arg0) +} + // UninstallPodFlows mocks base method. func (m *MockClient) UninstallPodFlows(arg0 string) error { m.ctrl.T.Helper() diff --git a/pkg/agent/supportbundlecollection/support_bundle_controller.go b/pkg/agent/supportbundlecollection/support_bundle_controller.go index 72b410f97e7..6e03aa45ae0 100644 --- a/pkg/agent/supportbundlecollection/support_bundle_controller.go +++ b/pkg/agent/supportbundlecollection/support_bundle_controller.go @@ -17,16 +17,11 @@ package supportbundlecollection import ( "context" "fmt" - "io" - "net/url" - "path" "reflect" "sync" "time" - "github.com/pkg/sftp" "github.com/spf13/afero" - "golang.org/x/crypto/ssh" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/util/wait" @@ -35,6 +30,8 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/exec" + "antrea.io/antrea/pkg/util/ftp" + "antrea.io/antrea/pkg/agent" agentquerier "antrea.io/antrea/pkg/agent/querier" "antrea.io/antrea/pkg/apis/controlplane" @@ -49,12 +46,8 @@ import ( type ProtocolType string const ( - sftpProtocol ProtocolType = "sftp" - - controllerName = "SupportBundleCollectionController" - - uploadToFileServerTries = 5 - uploadToFileServerRetryDelay = 5 * time.Second + sftpProtocol ProtocolType = "sftp" + controllerName string = "SupportBundleCollectionController" ) var ( @@ -78,7 +71,7 @@ type SupportBundleController struct { npq querier.AgentNetworkPolicyInfoQuerier v4Enabled bool v6Enabled bool - sftpUploader uploader + sftpUploader ftp.UpLoader } func NewSupportBundleController(nodeName string, @@ -101,7 +94,7 @@ func NewSupportBundleController(nodeName string, npq: npq, v4Enabled: v4Enabled, v6Enabled: v6Enabled, - sftpUploader: &sftpUploader{}, + sftpUploader: &ftp.SftpUploader{}, } return c } @@ -299,100 +292,20 @@ func (c *SupportBundleController) uploadSupportBundle(supportBundle *cpv1b2.Supp if err != nil { return fmt.Errorf("failed to upload support bundle while getting uploader: %v", err) } - if _, err := outputFile.Seek(0, 0); err != nil { - return fmt.Errorf("failed to upload support bundle to file server while setting offset: %v", err) - } - // fileServer.URL should be like: 10.92.23.154:22/path or sftp://10.92.23.154:22/path - parsedURL, err := parseUploadUrl(supportBundle.FileServer.URL) - if err != nil { - return fmt.Errorf("failed to upload support bundle while parsing upload URL: %v", err) - } - triesLeft := uploadToFileServerTries - var uploadErr error - for triesLeft > 0 { - if uploadErr = c.uploadToFileServer(uploader, supportBundle.Name, parsedURL, &supportBundle.Authentication, outputFile); uploadErr == nil { - return nil - } - triesLeft-- - if triesLeft == 0 { - return fmt.Errorf("failed to upload support bundle after %d attempts", uploadToFileServerTries) - } - klog.InfoS("Failed to upload support bundle", "UploadError", uploadErr, "TriesLeft", triesLeft) - time.Sleep(uploadToFileServerRetryDelay) - } - return nil -} -func parseUploadUrl(uploadUrl string) (*url.URL, error) { - parsedURL, err := url.Parse(uploadUrl) - if err != nil { - parsedURL, err = url.Parse("sftp://" + uploadUrl) - if err != nil { - return nil, err - } - } - if parsedURL.Scheme != "sftp" { - return nil, fmt.Errorf("not sftp protocol") - } - return parsedURL, nil + fileName := c.nodeName + "_" + supportBundle.Name + ".tar.gz" + serverAuth := supportBundle.Authentication.BasicAuthentication + cfg := ftp.GenSSHClientConfig(serverAuth.Username, serverAuth.Password) + return uploader.Upload(supportBundle.FileServer.URL, fileName, cfg, outputFile) } -func (c *SupportBundleController) uploadToFileServer(up uploader, bundleName string, parsedURL *url.URL, serverAuth *cpv1b2.BundleServerAuthConfiguration, tarGzFile io.Reader) error { - joinedPath := path.Join(parsedURL.Path, c.nodeName+"_"+bundleName+".tar.gz") - cfg := &ssh.ClientConfig{ - User: serverAuth.BasicAuthentication.Username, - Auth: []ssh.AuthMethod{ssh.Password(serverAuth.BasicAuthentication.Password)}, - // #nosec G106: skip host key check here and users can specify their own checks if needed - HostKeyCallback: ssh.InsecureIgnoreHostKey(), - Timeout: time.Second, - } - return up.upload(parsedURL.Host, joinedPath, cfg, tarGzFile) -} - -func (c *SupportBundleController) getUploaderByProtocol(protocol ProtocolType) (uploader, error) { +func (c *SupportBundleController) getUploaderByProtocol(protocol ProtocolType) (ftp.UpLoader, error) { if protocol == sftpProtocol { return c.sftpUploader, nil } return nil, fmt.Errorf("unsupported protocol %s", protocol) } -type uploader interface { - upload(addr string, path string, config *ssh.ClientConfig, tarGzFile io.Reader) error -} - -type sftpUploader struct { -} - -func (uploader *sftpUploader) upload(address string, path string, config *ssh.ClientConfig, tarGzFile io.Reader) error { - conn, err := ssh.Dial("tcp", address, config) - if err != nil { - return fmt.Errorf("error when connecting to fs server: %w", err) - } - sftpClient, err := sftp.NewClient(conn) - if err != nil { - return fmt.Errorf("error when setting up sftp client: %w", err) - } - defer func() { - if err := sftpClient.Close(); err != nil { - klog.ErrorS(err, "Error when closing sftp client") - } - }() - targetFile, err := sftpClient.Create(path) - if err != nil { - return fmt.Errorf("error when creating target file on remote: %v", err) - } - defer func() { - if err := targetFile.Close(); err != nil { - klog.ErrorS(err, "Error when closing target file on remote") - } - }() - if written, err := io.Copy(targetFile, tarGzFile); err != nil { - return fmt.Errorf("error when copying target file: %v, written: %d", err, written) - } - klog.InfoS("Successfully upload file to path", "filePath", path) - return nil -} - func (c *SupportBundleController) updateSupportBundleCollectionStatus(key string, complete bool, genErr error) error { antreaClient, err := c.antreaClientGetter.GetAntreaClient() if err != nil { diff --git a/pkg/agent/supportbundlecollection/support_bundle_controller_test.go b/pkg/agent/supportbundlecollection/support_bundle_controller_test.go index 51a92767050..109e8b8af6e 100644 --- a/pkg/agent/supportbundlecollection/support_bundle_controller_test.go +++ b/pkg/agent/supportbundlecollection/support_bundle_controller_test.go @@ -16,7 +16,6 @@ package supportbundlecollection import ( "fmt" - "io" "testing" "github.com/spf13/afero" @@ -29,6 +28,8 @@ import ( "k8s.io/klog/v2" "k8s.io/utils/exec" + "antrea.io/antrea/pkg/util/ftp" + agentquerier "antrea.io/antrea/pkg/agent/querier" "antrea.io/antrea/pkg/apis/controlplane" cpv1b2 "antrea.io/antrea/pkg/apis/controlplane/v1beta2" @@ -69,7 +70,7 @@ func TestSupportBundleCollectionAdd(t *testing.T) { supportBundleCollection *cpv1b2.SupportBundleCollection expectedCompleted bool agentDumper *mockAgentDumper - uploader uploader + uploader ftp.UpLoader }{ { name: "Add SupportBundleCollection", @@ -90,7 +91,7 @@ func TestSupportBundleCollectionAdd(t *testing.T) { supportBundleCollection: generateSupportbundleCollection("supportBundle3", "https://10.220.175.92:22/root/supportbundle"), expectedCompleted: false, agentDumper: &mockAgentDumper{}, - uploader: &testUploader{}, + uploader: &testFailedUploader{}, }, { name: "Add SupportBundleCollection with retry logics", @@ -191,7 +192,7 @@ func TestSupportBundleCollectionDelete(t *testing.T) { type testUploader struct { } -func (uploader *testUploader) upload(address string, path string, config *ssh.ClientConfig, tarGzFile io.Reader) error { +func (uploader *testUploader) Upload(url string, fileName string, config *ssh.ClientConfig, outputFile afero.File) error { klog.Info("Called test uploader") return nil } @@ -199,7 +200,7 @@ func (uploader *testUploader) upload(address string, path string, config *ssh.Cl type testFailedUploader struct { } -func (uploader *testFailedUploader) upload(address string, path string, config *ssh.ClientConfig, tarGzFile io.Reader) error { +func (uploader *testFailedUploader) Upload(url string, fileName string, config *ssh.ClientConfig, outputFile afero.File) error { klog.Info("Called test uploader for failed case") return fmt.Errorf("uploader failed") } diff --git a/pkg/apis/crd/v1alpha1/register.go b/pkg/apis/crd/v1alpha1/register.go index a9492ab5ac0..fc9924fa7c9 100644 --- a/pkg/apis/crd/v1alpha1/register.go +++ b/pkg/apis/crd/v1alpha1/register.go @@ -61,6 +61,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ExternalNodeList{}, &SupportBundleCollection{}, &SupportBundleCollectionList{}, + &PacketSampling{}, + &PacketSamplingList{}, ) metav1.AddToGroupVersion( diff --git a/pkg/apis/crd/v1alpha1/types.go b/pkg/apis/crd/v1alpha1/types.go index 8682ddeb09b..b8c820c2b09 100644 --- a/pkg/apis/crd/v1alpha1/types.go +++ b/pkg/apis/crd/v1alpha1/types.go @@ -935,3 +935,72 @@ type TLSProtocol struct { // SNI (Server Name Indication) indicates the server domain name in the TLS/SSL hello message. SNI string `json:"sni,omitempty"` } + +type PacketSamplingType string + +const ( + FirstNSampling PacketSamplingType = "FirstNSampling" +) + +type FirstNSamplingConfig struct { + Number int32 `json:"number,omitempty"` +} + +const DefaultPacketSamplingTimeout uint16 = 60 + +type PacketSamplingPhase string + +const ( + PacketSamplingRunning PacketSamplingPhase = "Running" + PacketSamplingSucceeded PacketSamplingPhase = "Succeeded" + PacketSamplingFailed PacketSamplingPhase = "Failed" +) + +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type PacketSamplingList struct { + metav1.TypeMeta `json:",inline"` + // +optional + metav1.ListMeta `json:"metadata,omitempty"` + + Items []PacketSampling `json:"items"` +} + +// +genclient +// +genclient:nonNamespaced +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object + +type PacketSampling struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + Spec PacketSamplingSpec `json:"spec,omitempty"` + Status PacketSamplingStatus `json:"status,omitempty"` +} + +type PacketSamplingSpec struct { + Timeout uint16 `json:"timeout,omitempty"` + // Type is the sampling type. Currently only FirstN is supported. + Type PacketSamplingType `json:"type,omitempty"` + // FirstNSamplingConfig contains the config for the FirstN type sampling. The only supported parameter is + // `Number` at the moment, means capture the first specified number of packet in a flow. + FirstNSamplingConfig *FirstNSamplingConfig `json:"firstNSamplingConfig,omitempty"` + Source Source `json:"source,omitempty"` + Destination Destination `json:"destination,omitempty"` + Packet Packet `json:"packet,omitempty"` + // FileServer the sftp url config for the fileServer. Captured packets will be uploaded to this server. + FileServer BundleFileServer `json:"fileServer,omitempty"` + Authentication BundleServerAuthConfiguration `json:"authentication,omitempty"` +} + +type PacketSamplingStatus struct { + Phase PacketSamplingPhase `json:"phase,omitempty"` + // Reason recorded the failed reason when the sampling failed. + Reason string `json:"reason,omitempty"` + // NumCapturedPackets record how many packets has been captured. If it reach the target number, the sampling + // can be considered as finished. + NumCapturedPackets int32 `json:"numCapturedPackets,omitempty"` + // PacketsPath is the path where the captured packets are temporarily stored in the container. It will be + // removed after the PacketSampling is deleted. + PacketsPath string `json:"packetsPath,omitempty"` + StartTime *metav1.Time `json:"startTime,omitempty"` +} diff --git a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go index f80a99a2c64..493708adcb4 100644 --- a/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/crd/v1alpha1/zz_generated.deepcopy.go @@ -353,6 +353,22 @@ func (in *ExternalNodeSpec) DeepCopy() *ExternalNodeSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *FirstNSamplingConfig) DeepCopyInto(out *FirstNSamplingConfig) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FirstNSamplingConfig. +func (in *FirstNSamplingConfig) DeepCopy() *FirstNSamplingConfig { + if in == nil { + return nil + } + out := new(FirstNSamplingConfig) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HTTPProtocol) DeepCopyInto(out *HTTPProtocol) { *out = *in @@ -864,6 +880,113 @@ func (in *Packet) DeepCopy() *Packet { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PacketSampling) DeepCopyInto(out *PacketSampling) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PacketSampling. +func (in *PacketSampling) DeepCopy() *PacketSampling { + if in == nil { + return nil + } + out := new(PacketSampling) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PacketSampling) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PacketSamplingList) DeepCopyInto(out *PacketSamplingList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]PacketSampling, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PacketSamplingList. +func (in *PacketSamplingList) DeepCopy() *PacketSamplingList { + if in == nil { + return nil + } + out := new(PacketSamplingList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *PacketSamplingList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PacketSamplingSpec) DeepCopyInto(out *PacketSamplingSpec) { + *out = *in + if in.FirstNSamplingConfig != nil { + in, out := &in.FirstNSamplingConfig, &out.FirstNSamplingConfig + *out = new(FirstNSamplingConfig) + **out = **in + } + out.Source = in.Source + out.Destination = in.Destination + in.Packet.DeepCopyInto(&out.Packet) + out.FileServer = in.FileServer + in.Authentication.DeepCopyInto(&out.Authentication) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PacketSamplingSpec. +func (in *PacketSamplingSpec) DeepCopy() *PacketSamplingSpec { + if in == nil { + return nil + } + out := new(PacketSamplingSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *PacketSamplingStatus) DeepCopyInto(out *PacketSamplingStatus) { + *out = *in + if in.StartTime != nil { + in, out := &in.StartTime, &out.StartTime + *out = (*in).DeepCopy() + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PacketSamplingStatus. +func (in *PacketSamplingStatus) DeepCopy() *PacketSamplingStatus { + if in == nil { + return nil + } + out := new(PacketSamplingStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PeerNamespaces) DeepCopyInto(out *PeerNamespaces) { *out = *in diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index 1ce26c162d5..bc3068d23a7 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -66,6 +66,7 @@ import ( "antrea.io/antrea/pkg/controller/externalippool" "antrea.io/antrea/pkg/controller/ipam" controllernetworkpolicy "antrea.io/antrea/pkg/controller/networkpolicy" + "antrea.io/antrea/pkg/controller/packetsampling" "antrea.io/antrea/pkg/controller/querier" "antrea.io/antrea/pkg/controller/stats" controllerbundlecollection "antrea.io/antrea/pkg/controller/supportbundlecollection" @@ -342,6 +343,11 @@ func installHandlers(c *ExtraConfig, s *genericapiserver.GenericAPIServer) { if features.DefaultFeatureGate.Enabled(features.Traceflow) { s.Handler.NonGoRestfulMux.HandleFunc("/validate/traceflow", webhook.HandlerForValidateFunc(c.traceflowController.Validate)) } + + if features.DefaultFeatureGate.Enabled(features.PacketSampling) { + s.Handler.NonGoRestfulMux.HandleFunc("/validate/packetsampling", webhook.HandlerForValidateFunc(packetsampling.Validate)) + } + } func DefaultCAConfig() *certificate.CAConfig { diff --git a/pkg/apiserver/handlers/featuregates/handler_test.go b/pkg/apiserver/handlers/featuregates/handler_test.go index 51f49a68ff5..b06d3ecaeb1 100644 --- a/pkg/apiserver/handlers/featuregates/handler_test.go +++ b/pkg/apiserver/handlers/featuregates/handler_test.go @@ -70,6 +70,7 @@ func Test_getGatesResponse(t *testing.T) { {Component: "agent", Name: "NetworkPolicyStats", Status: "Enabled", Version: "BETA"}, {Component: "agent", Name: "NodeNetworkPolicy", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "NodePortLocal", Status: "Enabled", Version: "GA"}, + {Component: "agent", Name: "PacketSampling", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "SecondaryNetwork", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "ServiceExternalIP", Status: "Disabled", Version: "ALPHA"}, {Component: "agent", Name: "SupportBundleCollection", Status: "Disabled", Version: "ALPHA"}, @@ -201,6 +202,7 @@ func Test_getControllerGatesResponse(t *testing.T) { {Component: "controller", Name: "Multicluster", Status: "Disabled", Version: "ALPHA"}, {Component: "controller", Name: "NetworkPolicyStats", Status: "Enabled", Version: "BETA"}, {Component: "controller", Name: "NodeIPAM", Status: "Enabled", Version: "BETA"}, + {Component: "controller", Name: "PacketSampling", Status: "Disabled", Version: "ALPHA"}, {Component: "controller", Name: "ServiceExternalIP", Status: "Disabled", Version: "ALPHA"}, {Component: "controller", Name: "SupportBundleCollection", Status: "Disabled", Version: "ALPHA"}, {Component: "controller", Name: "Traceflow", Status: "Enabled", Version: "BETA"}, diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go index 140c2de5f47..d0d743b938b 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/crd_client.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ type CrdV1alpha1Interface interface { ClusterNetworkPoliciesGetter ExternalNodesGetter NetworkPoliciesGetter + PacketSamplingsGetter SupportBundleCollectionsGetter TiersGetter TraceflowsGetter @@ -51,6 +52,10 @@ func (c *CrdV1alpha1Client) NetworkPolicies(namespace string) NetworkPolicyInter return newNetworkPolicies(c, namespace) } +func (c *CrdV1alpha1Client) PacketSamplings() PacketSamplingInterface { + return newPacketSamplings(c) +} + func (c *CrdV1alpha1Client) SupportBundleCollections() SupportBundleCollectionInterface { return newSupportBundleCollections(c) } diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go index a90f91178bc..c9bbe091330 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/fake/fake_crd_client.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -38,6 +38,10 @@ func (c *FakeCrdV1alpha1) NetworkPolicies(namespace string) v1alpha1.NetworkPoli return &FakeNetworkPolicies{c, namespace} } +func (c *FakeCrdV1alpha1) PacketSamplings() v1alpha1.PacketSamplingInterface { + return &FakePacketSamplings{c} +} + func (c *FakeCrdV1alpha1) SupportBundleCollections() v1alpha1.SupportBundleCollectionInterface { return &FakeSupportBundleCollections{c} } diff --git a/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go b/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go index d4d390824f7..4569fbeaaf6 100644 --- a/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go +++ b/pkg/client/clientset/versioned/typed/crd/v1alpha1/generated_expansion.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ type ExternalNodeExpansion interface{} type NetworkPolicyExpansion interface{} +type PacketSamplingExpansion interface{} + type SupportBundleCollectionExpansion interface{} type TierExpansion interface{} diff --git a/pkg/client/informers/externalversions/crd/v1alpha1/interface.go b/pkg/client/informers/externalversions/crd/v1alpha1/interface.go index 5f34c67e3e2..78e3db6f7e5 100644 --- a/pkg/client/informers/externalversions/crd/v1alpha1/interface.go +++ b/pkg/client/informers/externalversions/crd/v1alpha1/interface.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -28,6 +28,8 @@ type Interface interface { ExternalNodes() ExternalNodeInformer // NetworkPolicies returns a NetworkPolicyInformer. NetworkPolicies() NetworkPolicyInformer + // PacketSamplings returns a PacketSamplingInformer. + PacketSamplings() PacketSamplingInformer // SupportBundleCollections returns a SupportBundleCollectionInformer. SupportBundleCollections() SupportBundleCollectionInformer // Tiers returns a TierInformer. @@ -62,6 +64,11 @@ func (v *version) NetworkPolicies() NetworkPolicyInformer { return &networkPolicyInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// PacketSamplings returns a PacketSamplingInformer. +func (v *version) PacketSamplings() PacketSamplingInformer { + return &packetSamplingInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} +} + // SupportBundleCollections returns a SupportBundleCollectionInformer. func (v *version) SupportBundleCollections() SupportBundleCollectionInformer { return &supportBundleCollectionInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} diff --git a/pkg/client/informers/externalversions/generic.go b/pkg/client/informers/externalversions/generic.go index e3f7e903ce7..1eb0c2df1e9 100644 --- a/pkg/client/informers/externalversions/generic.go +++ b/pkg/client/informers/externalversions/generic.go @@ -60,6 +60,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().ExternalNodes().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("networkpolicies"): return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().NetworkPolicies().Informer()}, nil + case v1alpha1.SchemeGroupVersion.WithResource("packetsamplings"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().PacketSamplings().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("supportbundlecollections"): return &genericInformer{resource: resource.GroupResource(), informer: f.Crd().V1alpha1().SupportBundleCollections().Informer()}, nil case v1alpha1.SchemeGroupVersion.WithResource("tiers"): diff --git a/pkg/client/listers/crd/v1alpha1/expansion_generated.go b/pkg/client/listers/crd/v1alpha1/expansion_generated.go index 99f199fd8a1..004e8cf7768 100644 --- a/pkg/client/listers/crd/v1alpha1/expansion_generated.go +++ b/pkg/client/listers/crd/v1alpha1/expansion_generated.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -36,6 +36,10 @@ type NetworkPolicyListerExpansion interface{} // NetworkPolicyNamespaceLister. type NetworkPolicyNamespaceListerExpansion interface{} +// PacketSamplingListerExpansion allows custom methods to be added to +// PacketSamplingLister. +type PacketSamplingListerExpansion interface{} + // SupportBundleCollectionListerExpansion allows custom methods to be added to // SupportBundleCollectionLister. type SupportBundleCollectionListerExpansion interface{} diff --git a/pkg/controller/supportbundlecollection/controller.go b/pkg/controller/supportbundlecollection/controller.go index aabf9101f0b..98d52a66aab 100644 --- a/pkg/controller/supportbundlecollection/controller.go +++ b/pkg/controller/supportbundlecollection/controller.go @@ -15,7 +15,6 @@ package supportbundlecollection import ( - "bytes" "context" "fmt" "reflect" @@ -45,6 +44,7 @@ import ( crdinformers "antrea.io/antrea/pkg/client/informers/externalversions/crd/v1alpha1" crdlisters "antrea.io/antrea/pkg/client/listers/crd/v1alpha1" "antrea.io/antrea/pkg/controller/types" + "antrea.io/antrea/pkg/util/ftp" "antrea.io/antrea/pkg/util/k8s" ) @@ -390,7 +390,7 @@ func (c *Controller) createInternalSupportBundleCollection(bundle *v1alpha1.Supp } nodeSpan := nodeNames.Union(externalNodeNames) // Get authentication from the Secret provided in authentication field in the CRD - authentication, err := c.parseBundleAuth(bundle.Spec.Authentication) + authentication, err := ftp.ParseBundleAuth(bundle.Spec.Authentication, c.kubeClient) if err != nil { klog.ErrorS(err, "Failed to get authentication defined in the SupportBundleCollection CR", "name", bundle.Name, "authentication", bundle.Spec.Authentication) return nil, err @@ -511,60 +511,6 @@ func (c *Controller) deleteInternalSupportBundleCollection(key string) error { return nil } -// parseBundleAuth returns the authentication from the Secret provided in BundleServerAuthConfiguration. -// The authentication is stored in the Secret Data with a key decided by the AuthType, and encoded using base64. -func (c *Controller) parseBundleAuth(authentication v1alpha1.BundleServerAuthConfiguration) (*controlplane.BundleServerAuthConfiguration, error) { - secretReference := authentication.AuthSecret - if secretReference == nil { - return nil, fmt.Errorf("authentication is not specified") - } - secret, err := c.kubeClient.CoreV1().Secrets(secretReference.Namespace).Get(context.TODO(), secretReference.Name, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("unable to get Secret with name %s in Namespace %s: %v", secretReference.Name, secretReference.Namespace, err) - } - parseAuthValue := func(secretData map[string][]byte, key string) (string, error) { - authValue, found := secret.Data[key] - if !found { - return "", fmt.Errorf("not found authentication in Secret %s/%s with key %s", secretReference.Namespace, secretReference.Name, key) - } - return bytes.NewBuffer(authValue).String(), nil - } - switch authentication.AuthType { - case v1alpha1.APIKey: - value, err := parseAuthValue(secret.Data, secretKeyWithAPIKey) - if err != nil { - return nil, err - } - return &controlplane.BundleServerAuthConfiguration{ - APIKey: value, - }, nil - case v1alpha1.BearerToken: - value, err := parseAuthValue(secret.Data, secretKeyWithBearerToken) - if err != nil { - return nil, err - } - return &controlplane.BundleServerAuthConfiguration{ - BearerToken: value, - }, nil - case v1alpha1.BasicAuthentication: - username, err := parseAuthValue(secret.Data, secretKeyWithUsername) - if err != nil { - return nil, err - } - password, err := parseAuthValue(secret.Data, secretKeyWithPassword) - if err != nil { - return nil, err - } - return &controlplane.BundleServerAuthConfiguration{ - BasicAuthentication: &controlplane.BasicAuthentication{ - Username: username, - Password: password, - }, - }, nil - } - return nil, fmt.Errorf("unsupported authentication type %s", authentication.AuthType) -} - // addInternalSupportBundleCollection adds internalBundle into supportBundleCollectionStore, and creates a // supportBundleCollectionAppliedTo resource to maintain the SupportBundleCollection's required Nodes or ExternalNodes. func (c *Controller) addInternalSupportBundleCollection( diff --git a/pkg/controller/supportbundlecollection/controller_test.go b/pkg/controller/supportbundlecollection/controller_test.go index 678215dadd5..1b3d2eb490c 100644 --- a/pkg/controller/supportbundlecollection/controller_test.go +++ b/pkg/controller/supportbundlecollection/controller_test.go @@ -660,111 +660,6 @@ type secretConfig struct { data map[string][]byte } -func TestParseBundleAuth(t *testing.T) { - ns := "ns-auth" - apiKey := testKeyString - token := testTokenString - usr := "user" - pwd := "pwd123456" - var secretObjects []runtime.Object - for _, s := range prepareSecrets(ns, []secretConfig{ - {name: "s1", data: map[string][]byte{secretKeyWithAPIKey: []byte(apiKey)}}, - {name: "s2", data: map[string][]byte{secretKeyWithBearerToken: []byte(token)}}, - {name: "s3", data: map[string][]byte{secretKeyWithUsername: []byte(usr), secretKeyWithPassword: []byte(pwd)}}, - {name: "invalid-base64", data: map[string][]byte{secretKeyWithAPIKey: []byte("invalid string to decode with base64")}}, - {name: "invalid-secret", data: map[string][]byte{"unknown": []byte(apiKey)}}, - }) { - secretObjects = append(secretObjects, s) - } - - testClient := newTestClient(secretObjects, nil) - controller := newController(testClient) - stopCh := make(chan struct{}) - testClient.start(stopCh) - - testClient.waitForSync(stopCh) - - for _, tc := range []struct { - authentication v1alpha1.BundleServerAuthConfiguration - expectedError string - expectedAuth *controlplane.BundleServerAuthConfiguration - }{ - { - authentication: v1alpha1.BundleServerAuthConfiguration{ - AuthType: v1alpha1.APIKey, - AuthSecret: &corev1.SecretReference{ - Namespace: ns, - Name: "s1", - }, - }, - expectedAuth: &controlplane.BundleServerAuthConfiguration{ - APIKey: testKeyString, - }, - }, - { - authentication: v1alpha1.BundleServerAuthConfiguration{ - AuthType: v1alpha1.BearerToken, - AuthSecret: &corev1.SecretReference{ - Namespace: ns, - Name: "s2", - }, - }, - expectedAuth: &controlplane.BundleServerAuthConfiguration{ - BearerToken: testTokenString, - }, - }, - { - authentication: v1alpha1.BundleServerAuthConfiguration{ - AuthType: v1alpha1.BasicAuthentication, - AuthSecret: &corev1.SecretReference{ - Namespace: ns, - Name: "s3", - }, - }, - expectedAuth: &controlplane.BundleServerAuthConfiguration{ - BasicAuthentication: &controlplane.BasicAuthentication{ - Username: usr, - Password: pwd, - }, - }, - }, - { - authentication: v1alpha1.BundleServerAuthConfiguration{ - AuthType: v1alpha1.BearerToken, - AuthSecret: &corev1.SecretReference{ - Namespace: ns, - Name: "invalid-secret", - }, - }, - expectedError: fmt.Sprintf("not found authentication in Secret %s/invalid-secret with key %s", ns, secretKeyWithBearerToken), - }, - { - authentication: v1alpha1.BundleServerAuthConfiguration{ - AuthType: v1alpha1.BearerToken, - AuthSecret: &corev1.SecretReference{ - Namespace: ns, - Name: "not-exist", - }, - }, - expectedError: fmt.Sprintf("unable to get Secret with name not-exist in Namespace %s", ns), - }, - { - authentication: v1alpha1.BundleServerAuthConfiguration{ - AuthType: v1alpha1.APIKey, - AuthSecret: nil, - }, - expectedError: "authentication is not specified", - }, - } { - auth, err := controller.parseBundleAuth(tc.authentication) - if tc.expectedError != "" { - assert.Contains(t, err.Error(), tc.expectedError) - } else { - assert.Equal(t, tc.expectedAuth, auth) - } - } -} - func TestCreateAndDeleteInternalSupportBundleCollection(t *testing.T) { coreObjects, crdObjects := prepareTopology() testClient := newTestClient(coreObjects, crdObjects) diff --git a/pkg/features/antrea_features.go b/pkg/features/antrea_features.go index 36fb997ffdc..5f8d0ab5844 100644 --- a/pkg/features/antrea_features.go +++ b/pkg/features/antrea_features.go @@ -67,6 +67,10 @@ const ( // Allows to trace path from a generated packet. Traceflow featuregate.Feature = "Traceflow" + // alpha: v2.0 + // Allows to capture sampling packets for a flow. + PacketSampling featuregate.Feature = "PacketSampling" + // alpha: v0.9 // Flow exporter exports IPFIX flow records of Antrea flows seen in conntrack module. FlowExporter featuregate.Feature = "FlowExporter" @@ -179,6 +183,7 @@ var ( TopologyAwareHints: {Default: true, PreRelease: featuregate.Beta}, CleanupStaleUDPSvcConntrack: {Default: false, PreRelease: featuregate.Alpha}, Traceflow: {Default: true, PreRelease: featuregate.Beta}, + PacketSampling: {Default: false, PreRelease: featuregate.Alpha}, AntreaIPAM: {Default: false, PreRelease: featuregate.Alpha}, FlowExporter: {Default: false, PreRelease: featuregate.Alpha}, NetworkPolicyStats: {Default: true, PreRelease: featuregate.Beta}, @@ -224,6 +229,7 @@ var ( SupportBundleCollection, TopologyAwareHints, Traceflow, + PacketSampling, TrafficControl, EgressTrafficShaping, EgressSeparateSubnet, @@ -276,6 +282,7 @@ var ( EgressSeparateSubnet: {}, NodeNetworkPolicy: {}, L7FlowExporter: {}, + PacketSampling: {}, } // supportedFeaturesOnExternalNode records the features supported on an external // Node. Antrea Agent checks the enabled features if it is running on an diff --git a/test/e2e/framework.go b/test/e2e/framework.go index f81bc17891d..561b9e7df07 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -1527,6 +1527,21 @@ func (data *TestData) createNginxPodOnNode(name string, ns string, nodeName stri }).WithHostNetwork(hostNetwork).Create(data) } +func (data *TestData) createUDPServerPod(name string, ns string, portNum int32, serverNode string) error { + cmd := []string{"/bin/bash", "-c"} + args := []string{ + fmt.Sprintf("/agnhost serve-hostname --udp --http=false --port %v", portNum), + } + port := corev1.ContainerPort{Name: fmt.Sprintf("port-%d", portNum), ContainerPort: portNum} + return NewPodBuilder(name, ns, agnhostImage). + OnNode(serverNode). + WithContainerName("agnhost"). + WithCommand(cmd). + WithArgs(args). + WithPorts([]corev1.ContainerPort{port}). + Create(testData) +} + // createServerPod creates a Pod that can listen to specified port and have named port set. func (data *TestData) createServerPod(name string, ns string, portName string, portNum int32, setHostPort bool, hostNetwork bool) error { // See https://github.com/kubernetes/kubernetes/blob/master/test/images/agnhost/porter/porter.go#L17 for the image's detail. diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 57f48cf619b..45f7f3f8e53 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -120,7 +120,7 @@ func TestConnectivityFlows(t *testing.T) { antrearuntime.WindowsOS = runtime.GOOS } - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, false, groupIDAllocator, false, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -176,7 +176,7 @@ func TestAntreaFlexibleIPAMConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, true, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, true, false, false, false, false, groupIDAllocator, false, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) defer func() { @@ -239,7 +239,7 @@ func TestReplayFlowsConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, false, groupIDAllocator, false, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -281,7 +281,7 @@ func TestReplayFlowsNetworkPolicyFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -466,7 +466,7 @@ func TestNetworkPolicyFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -580,7 +580,7 @@ func TestIPv6ConnectivityFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, true, true, false, false, false, false, false, false, false, false, groupIDAllocator, false, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge: %v", err)) @@ -621,7 +621,7 @@ func TestProxyServiceFlowsAntreaPolicyDisabled(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, false, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -711,7 +711,7 @@ func TestProxyServiceFlowsAntreaPoilcyEnabled(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), true, true, false, false, false, false, false, false, false, false, false, false, false, groupIDAllocator, false, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -1793,7 +1793,7 @@ func testEgressMarkFlows(t *testing.T, trafficShaping bool) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, true, trafficShaping, false, false, false, false, false, false, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, true, trafficShaping, false, false, false, false, false, false, false, false, groupIDAllocator, false, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br)) @@ -1850,7 +1850,7 @@ func TestTrafficControlFlows(t *testing.T) { legacyregistry.Reset() metrics.InitializeOVSMetrics() - c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, false, false, false, false, false, false, false, true, false, false, groupIDAllocator, false, defaultPacketInRate) + c = ofClient.NewClient(br, bridgeMgmtAddr, nodeiptest.NewFakeNodeIPChecker(), false, false, false, false, false, false, false, false, false, false, true, false, false, groupIDAllocator, false, false, defaultPacketInRate) err := ofTestUtils.PrepareOVSBridge(br) require.Nil(t, err, fmt.Sprintf("Failed to prepare OVS bridge %s", br))