Skip to content

Commit

Permalink
Ensure Kafka broker pods are maintained in Service rotation until cle…
Browse files Browse the repository at this point in the history
…an shutdown completes

On kafka pod clean termination (e.g. when pod is evicted during an underlying k8s node maintenance,
or rescheduling) kubelet sends a SIGTERM to the pod and kafka process initiates the clean shutdown procedure:
https://kafka.apache.org/documentation/#basic_ops_restarting

During this period and upon `terminationGracePeriodSeconds` the kafka broker will:
- flush and sync all the segment logs
- migrate any partitions the server is the leader for to other replicas
 
To ensure the broker is still accessible during this period we are setting/extending two new parameters:

1/ Configurable `terminationGracePeriodSeconds ` 
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#podspec-v1-core

Default 120s, can be overridden for larger Kafka brokers that may take longer to terminate

2/ Set publishNotReadyAddresses=true for Kafka headless service
https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#servicespec-v1-core


This is set to ensure that a broker entering the Terminating phase is still advertised in the headless DNS services so clients can continue reaching to it until it migrates all the leader partitions and shuts down successfully

There is also a drawback for this though, the kafka pod will be advertised 
in the headless service as soon as the pod is started and not waiting for
the readiness probe to be successful. This was considered to be a non-issue
as:
1/ the opearator doesn't set yet a readiness probe for kafka pod
2/ even if there is one, the kafka clients will retry if they hit a kafka
broker not yet fully ready
  • Loading branch information
amuraru committed Oct 30, 2021
1 parent 99ce20f commit 44d0e3b
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 22 deletions.
14 changes: 14 additions & 0 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ const (
DefaultEnvoyHealthCheckPort = 8080
// DefaultEnvoyAdminPort envoy admin port
DefaultEnvoyAdminPort = 8081
// DefaultBrokerTerminationGracePeriod default kafka pod termination grace period
DefaultBrokerTerminationGracePeriod = 120
)

// KafkaClusterSpec defines the desired state of KafkaCluster
Expand Down Expand Up @@ -179,6 +181,10 @@ type BrokerConfig struct {
// Adding the "+" prefix to the name prepends the value to that environment variable instead of overwriting it.
// Add the "+" suffix to append.
Envs []corev1.EnvVar `json:"envs,omitempty"`
// TerminationGracePeriod defines the pod termination grace period
// +kubebuilder:default=120
// +optional
TerminationGracePeriod *int64 `json:"terminationGracePeriodSeconds,omitempty"`
}

type NetworkConfig struct {
Expand Down Expand Up @@ -652,6 +658,14 @@ func (cConfig *CruiseControlConfig) GetTolerations() []corev1.Toleration {
return cConfig.Tolerations
}

//GetTerminationGracePeriod returns the termination grace period for the broker pod
func (bConfig *BrokerConfig) GetTerminationGracePeriod() int64 {
if bConfig.TerminationGracePeriod == nil {
return DefaultBrokerTerminationGracePeriod
}
return *bConfig.TerminationGracePeriod
}

//GetNodeSelector returns the node selector for cruise control
func (cConfig *CruiseControlConfig) GetNodeSelector() map[string]string {
return cConfig.NodeSelector
Expand Down
5 changes: 5 additions & 0 deletions api/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 22 additions & 8 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4094,6 +4094,12 @@ spec:
- pvcSpec
type: object
type: array
terminationGracePeriodSeconds:
default: 120
description: TerminationGracePeriod defines the pod termination
grace period
format: int64
type: integer
tolerations:
items:
description: The pod this Toleration is attached to tolerates
Expand Down Expand Up @@ -9944,6 +9950,12 @@ spec:
- pvcSpec
type: object
type: array
terminationGracePeriodSeconds:
default: 120
description: TerminationGracePeriod defines the pod termination
grace period
format: int64
type: integer
tolerations:
items:
description: The pod this Toleration is attached to tolerates
Expand Down Expand Up @@ -15812,7 +15824,7 @@ spec:
image:
type: string
imagePullSecrets:
description: ImagePullSecrets image pull secrets
description: ImagePullSecrets for the envoy image pull
items:
description: LocalObjectReference contains enough information
to let you locate the referenced object inside the same namespace.
Expand All @@ -15834,7 +15846,8 @@ spec:
nodeSelector:
additionalProperties:
type: string
description: NodeSelector node selector for envoy pods
description: NodeSelector is the node selector expression for
envoy pods
type: object
replicas:
format: int32
Expand Down Expand Up @@ -15868,7 +15881,7 @@ spec:
type: object
type: object
serviceAccountName:
description: ServiceAccountName the name of service account
description: ServiceAccountName is the name of service account
type: string
tolerations:
items:
Expand Down Expand Up @@ -17738,7 +17751,8 @@ spec:
image:
type: string
imagePullSecrets:
description: ImagePullSecrets image pull secrets
description: ImagePullSecrets for the envoy
image pull
items:
description: LocalObjectReference contains
enough information to let you locate the
Expand All @@ -17764,8 +17778,8 @@ spec:
nodeSelector:
additionalProperties:
type: string
description: NodeSelector node selector for
envoy pods
description: NodeSelector is the node selector
expression for envoy pods
type: object
replicas:
format: int32
Expand Down Expand Up @@ -17802,8 +17816,8 @@ spec:
type: object
type: object
serviceAccountName:
description: ServiceAccountName the name of
service account
description: ServiceAccountName is the name
of service account
type: string
tolerations:
items:
Expand Down
30 changes: 22 additions & 8 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4093,6 +4093,12 @@ spec:
- pvcSpec
type: object
type: array
terminationGracePeriodSeconds:
default: 120
description: TerminationGracePeriod defines the pod termination
grace period
format: int64
type: integer
tolerations:
items:
description: The pod this Toleration is attached to tolerates
Expand Down Expand Up @@ -9943,6 +9949,12 @@ spec:
- pvcSpec
type: object
type: array
terminationGracePeriodSeconds:
default: 120
description: TerminationGracePeriod defines the pod termination
grace period
format: int64
type: integer
tolerations:
items:
description: The pod this Toleration is attached to tolerates
Expand Down Expand Up @@ -15811,7 +15823,7 @@ spec:
image:
type: string
imagePullSecrets:
description: ImagePullSecrets image pull secrets
description: ImagePullSecrets for the envoy image pull
items:
description: LocalObjectReference contains enough information
to let you locate the referenced object inside the same namespace.
Expand All @@ -15833,7 +15845,8 @@ spec:
nodeSelector:
additionalProperties:
type: string
description: NodeSelector node selector for envoy pods
description: NodeSelector is the node selector expression for
envoy pods
type: object
replicas:
format: int32
Expand Down Expand Up @@ -15867,7 +15880,7 @@ spec:
type: object
type: object
serviceAccountName:
description: ServiceAccountName the name of service account
description: ServiceAccountName is the name of service account
type: string
tolerations:
items:
Expand Down Expand Up @@ -17737,7 +17750,8 @@ spec:
image:
type: string
imagePullSecrets:
description: ImagePullSecrets image pull secrets
description: ImagePullSecrets for the envoy
image pull
items:
description: LocalObjectReference contains
enough information to let you locate the
Expand All @@ -17763,8 +17777,8 @@ spec:
nodeSelector:
additionalProperties:
type: string
description: NodeSelector node selector for
envoy pods
description: NodeSelector is the node selector
expression for envoy pods
type: object
replicas:
format: int32
Expand Down Expand Up @@ -17801,8 +17815,8 @@ spec:
type: object
type: object
serviceAccountName:
description: ServiceAccountName the name of
service account
description: ServiceAccountName is the name
of service account
type: string
tolerations:
items:
Expand Down
11 changes: 6 additions & 5 deletions pkg/resources/kafka/headlessService.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,12 @@ func (r *Reconciler) headlessService() runtime.Object {
r.KafkaCluster,
),
Spec: corev1.ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
SessionAffinity: corev1.ServiceAffinityNone,
Selector: kafkautils.LabelsForKafka(r.KafkaCluster.Name),
Ports: usedPorts,
ClusterIP: corev1.ClusterIPNone,
Type: corev1.ServiceTypeClusterIP,
SessionAffinity: corev1.ServiceAffinityNone,
Selector: kafkautils.LabelsForKafka(r.KafkaCluster.Name),
Ports: usedPorts,
ClusterIP: corev1.ClusterIPNone,
PublishNotReadyAddresses: true,
},
}
}
2 changes: 1 addition & 1 deletion pkg/resources/kafka/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ fi`},
}, brokerConfig.Containers...),
Volumes: getVolumes(brokerConfig.Volumes, dataVolume, r.KafkaCluster.Name, hasSSLSecrets, id),
RestartPolicy: corev1.RestartPolicyNever,
TerminationGracePeriodSeconds: util.Int64Pointer(120),
TerminationGracePeriodSeconds: util.Int64Pointer(brokerConfig.GetTerminationGracePeriod()),
ImagePullSecrets: brokerConfig.GetImagePullSecrets(),
ServiceAccountName: brokerConfig.GetServiceAccount(),
Tolerations: brokerConfig.GetTolerations(),
Expand Down

0 comments on commit 44d0e3b

Please sign in to comment.