From 44d0e3bb28750c57b2b8d56bacb8350783d286dd Mon Sep 17 00:00:00 2001 From: Adi Muraru Date: Fri, 29 Oct 2021 00:30:53 +0300 Subject: [PATCH] Ensure Kafka broker pods are maintained in Service rotation until clean shutdown completes On kafka pod clean termination (e.g. when pod is evicted during an underlying k8s node maintenance, or rescheduling) kubelet sends a SIGTERM to the pod and kafka process initiates the clean shutdown procedure: https://kafka.apache.org/documentation/#basic_ops_restarting During this period and upon `terminationGracePeriodSeconds` the kafka broker will: - flush and sync all the segment logs - migrate any partitions the server is the leader for to other replicas To ensure the broker is still accessible during this period we are setting/extending two new parameters: 1/ Configurable `terminationGracePeriodSeconds ` https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#podspec-v1-core Default 120s, can be overridden for larger Kafka brokers that may take longer to terminate 2/ Set publishNotReadyAddresses=true for Kafka headless service https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#servicespec-v1-core This is set to ensure that a broker entering the Terminating phase is still advertised in the headless DNS services so clients can continue reaching to it until it migrates all the leader partitions and shuts down successfully There is also a drawback for this though, the kafka pod will be advertised in the headless service as soon as the pod is started and not waiting for the readiness probe to be successful. This was considered to be a non-issue as: 1/ the opearator doesn't set yet a readiness probe for kafka pod 2/ even if there is one, the kafka clients will retry if they hit a kafka broker not yet fully ready --- api/v1beta1/kafkacluster_types.go | 14 +++++++++ api/v1beta1/zz_generated.deepcopy.go | 5 ++++ charts/kafka-operator/templates/crds.yaml | 30 ++++++++++++++----- .../kafka.banzaicloud.io_kafkaclusters.yaml | 30 ++++++++++++++----- pkg/resources/kafka/headlessService.go | 11 +++---- pkg/resources/kafka/pod.go | 2 +- 6 files changed, 70 insertions(+), 22 deletions(-) diff --git a/api/v1beta1/kafkacluster_types.go b/api/v1beta1/kafkacluster_types.go index a0f9eb32d..ea442cfee 100644 --- a/api/v1beta1/kafkacluster_types.go +++ b/api/v1beta1/kafkacluster_types.go @@ -43,6 +43,8 @@ const ( DefaultEnvoyHealthCheckPort = 8080 // DefaultEnvoyAdminPort envoy admin port DefaultEnvoyAdminPort = 8081 + // DefaultBrokerTerminationGracePeriod default kafka pod termination grace period + DefaultBrokerTerminationGracePeriod = 120 ) // KafkaClusterSpec defines the desired state of KafkaCluster @@ -179,6 +181,10 @@ type BrokerConfig struct { // Adding the "+" prefix to the name prepends the value to that environment variable instead of overwriting it. // Add the "+" suffix to append. Envs []corev1.EnvVar `json:"envs,omitempty"` + // TerminationGracePeriod defines the pod termination grace period + // +kubebuilder:default=120 + // +optional + TerminationGracePeriod *int64 `json:"terminationGracePeriodSeconds,omitempty"` } type NetworkConfig struct { @@ -652,6 +658,14 @@ func (cConfig *CruiseControlConfig) GetTolerations() []corev1.Toleration { return cConfig.Tolerations } +//GetTerminationGracePeriod returns the termination grace period for the broker pod +func (bConfig *BrokerConfig) GetTerminationGracePeriod() int64 { + if bConfig.TerminationGracePeriod == nil { + return DefaultBrokerTerminationGracePeriod + } + return *bConfig.TerminationGracePeriod +} + //GetNodeSelector returns the node selector for cruise control func (cConfig *CruiseControlConfig) GetNodeSelector() map[string]string { return cConfig.NodeSelector diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index cd195de86..eee713678 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -168,6 +168,11 @@ func (in *BrokerConfig) DeepCopyInto(out *BrokerConfig) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.TerminationGracePeriod != nil { + in, out := &in.TerminationGracePeriod, &out.TerminationGracePeriod + *out = new(int64) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BrokerConfig. diff --git a/charts/kafka-operator/templates/crds.yaml b/charts/kafka-operator/templates/crds.yaml index 2c0a2f5e5..805e0b254 100644 --- a/charts/kafka-operator/templates/crds.yaml +++ b/charts/kafka-operator/templates/crds.yaml @@ -4094,6 +4094,12 @@ spec: - pvcSpec type: object type: array + terminationGracePeriodSeconds: + default: 120 + description: TerminationGracePeriod defines the pod termination + grace period + format: int64 + type: integer tolerations: items: description: The pod this Toleration is attached to tolerates @@ -9944,6 +9950,12 @@ spec: - pvcSpec type: object type: array + terminationGracePeriodSeconds: + default: 120 + description: TerminationGracePeriod defines the pod termination + grace period + format: int64 + type: integer tolerations: items: description: The pod this Toleration is attached to tolerates @@ -15812,7 +15824,7 @@ spec: image: type: string imagePullSecrets: - description: ImagePullSecrets image pull secrets + description: ImagePullSecrets for the envoy image pull items: description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. @@ -15834,7 +15846,8 @@ spec: nodeSelector: additionalProperties: type: string - description: NodeSelector node selector for envoy pods + description: NodeSelector is the node selector expression for + envoy pods type: object replicas: format: int32 @@ -15868,7 +15881,7 @@ spec: type: object type: object serviceAccountName: - description: ServiceAccountName the name of service account + description: ServiceAccountName is the name of service account type: string tolerations: items: @@ -17738,7 +17751,8 @@ spec: image: type: string imagePullSecrets: - description: ImagePullSecrets image pull secrets + description: ImagePullSecrets for the envoy + image pull items: description: LocalObjectReference contains enough information to let you locate the @@ -17764,8 +17778,8 @@ spec: nodeSelector: additionalProperties: type: string - description: NodeSelector node selector for - envoy pods + description: NodeSelector is the node selector + expression for envoy pods type: object replicas: format: int32 @@ -17802,8 +17816,8 @@ spec: type: object type: object serviceAccountName: - description: ServiceAccountName the name of - service account + description: ServiceAccountName is the name + of service account type: string tolerations: items: diff --git a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml index 0b48c02b3..fc548b9b0 100644 --- a/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml +++ b/config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml @@ -4093,6 +4093,12 @@ spec: - pvcSpec type: object type: array + terminationGracePeriodSeconds: + default: 120 + description: TerminationGracePeriod defines the pod termination + grace period + format: int64 + type: integer tolerations: items: description: The pod this Toleration is attached to tolerates @@ -9943,6 +9949,12 @@ spec: - pvcSpec type: object type: array + terminationGracePeriodSeconds: + default: 120 + description: TerminationGracePeriod defines the pod termination + grace period + format: int64 + type: integer tolerations: items: description: The pod this Toleration is attached to tolerates @@ -15811,7 +15823,7 @@ spec: image: type: string imagePullSecrets: - description: ImagePullSecrets image pull secrets + description: ImagePullSecrets for the envoy image pull items: description: LocalObjectReference contains enough information to let you locate the referenced object inside the same namespace. @@ -15833,7 +15845,8 @@ spec: nodeSelector: additionalProperties: type: string - description: NodeSelector node selector for envoy pods + description: NodeSelector is the node selector expression for + envoy pods type: object replicas: format: int32 @@ -15867,7 +15880,7 @@ spec: type: object type: object serviceAccountName: - description: ServiceAccountName the name of service account + description: ServiceAccountName is the name of service account type: string tolerations: items: @@ -17737,7 +17750,8 @@ spec: image: type: string imagePullSecrets: - description: ImagePullSecrets image pull secrets + description: ImagePullSecrets for the envoy + image pull items: description: LocalObjectReference contains enough information to let you locate the @@ -17763,8 +17777,8 @@ spec: nodeSelector: additionalProperties: type: string - description: NodeSelector node selector for - envoy pods + description: NodeSelector is the node selector + expression for envoy pods type: object replicas: format: int32 @@ -17801,8 +17815,8 @@ spec: type: object type: object serviceAccountName: - description: ServiceAccountName the name of - service account + description: ServiceAccountName is the name + of service account type: string tolerations: items: diff --git a/pkg/resources/kafka/headlessService.go b/pkg/resources/kafka/headlessService.go index 597cff6a2..7696f160c 100644 --- a/pkg/resources/kafka/headlessService.go +++ b/pkg/resources/kafka/headlessService.go @@ -52,11 +52,12 @@ func (r *Reconciler) headlessService() runtime.Object { r.KafkaCluster, ), Spec: corev1.ServiceSpec{ - Type: corev1.ServiceTypeClusterIP, - SessionAffinity: corev1.ServiceAffinityNone, - Selector: kafkautils.LabelsForKafka(r.KafkaCluster.Name), - Ports: usedPorts, - ClusterIP: corev1.ClusterIPNone, + Type: corev1.ServiceTypeClusterIP, + SessionAffinity: corev1.ServiceAffinityNone, + Selector: kafkautils.LabelsForKafka(r.KafkaCluster.Name), + Ports: usedPorts, + ClusterIP: corev1.ClusterIPNone, + PublishNotReadyAddresses: true, }, } } diff --git a/pkg/resources/kafka/pod.go b/pkg/resources/kafka/pod.go index e0685411b..1bf5ddbb5 100644 --- a/pkg/resources/kafka/pod.go +++ b/pkg/resources/kafka/pod.go @@ -166,7 +166,7 @@ fi`}, }, brokerConfig.Containers...), Volumes: getVolumes(brokerConfig.Volumes, dataVolume, r.KafkaCluster.Name, hasSSLSecrets, id), RestartPolicy: corev1.RestartPolicyNever, - TerminationGracePeriodSeconds: util.Int64Pointer(120), + TerminationGracePeriodSeconds: util.Int64Pointer(brokerConfig.GetTerminationGracePeriod()), ImagePullSecrets: brokerConfig.GetImagePullSecrets(), ServiceAccountName: brokerConfig.GetServiceAccount(), Tolerations: brokerConfig.GetTolerations(),