Skip to content

Commit

Permalink
Add an optional field in KafkaCluster resource to specify cruise cont…
Browse files Browse the repository at this point in the history
…rol reporter image

Previously the same CruiseControl.Image field was used for both
cruise-control server image version but also kafka pod metrics reporter image

In the case only the cruise-control server version needs to be updated
and avoid a kafka cluster rolling restart, `ClusterMetricsReporterImage` or `BrokerConfig.MetricsReporterImage` can be kept and only the CruiseControl.Image upgraded
  • Loading branch information
amuraru committed Oct 30, 2021
1 parent 44d0e3b commit 3dd7b51
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 24 deletions.
48 changes: 29 additions & 19 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,16 @@ type KafkaClusterSpec struct {
ZKAddresses []string `json:"zkAddresses"`
// ZKPath specifies the ZooKeeper chroot path as part
// of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace.
ZKPath string `json:"zkPath,omitempty"`
RackAwareness *RackAwareness `json:"rackAwareness,omitempty"`
ClusterImage string `json:"clusterImage,omitempty"`
ReadOnlyConfig string `json:"readOnlyConfig,omitempty"`
ClusterWideConfig string `json:"clusterWideConfig,omitempty"`
BrokerConfigGroups map[string]BrokerConfig `json:"brokerConfigGroups,omitempty"`
Brokers []Broker `json:"brokers"`
DisruptionBudget DisruptionBudget `json:"disruptionBudget,omitempty"`
RollingUpgradeConfig RollingUpgradeConfig `json:"rollingUpgradeConfig"`
ZKPath string `json:"zkPath,omitempty"`
RackAwareness *RackAwareness `json:"rackAwareness,omitempty"`
ClusterImage string `json:"clusterImage,omitempty"`
ClusterMetricsReporterImage string `json:"clusterMetricsReporterImage,omitempty"`
ReadOnlyConfig string `json:"readOnlyConfig,omitempty"`
ClusterWideConfig string `json:"clusterWideConfig,omitempty"`
BrokerConfigGroups map[string]BrokerConfig `json:"brokerConfigGroups,omitempty"`
Brokers []Broker `json:"brokers"`
DisruptionBudget DisruptionBudget `json:"disruptionBudget,omitempty"`
RollingUpgradeConfig RollingUpgradeConfig `json:"rollingUpgradeConfig"`
// +kubebuilder:validation:Enum=envoy;istioingress
IngressController string `json:"ingressController,omitempty"`
// If true OneBrokerPerNode ensures that each kafka broker will be placed on a different node unless a custom
Expand Down Expand Up @@ -133,16 +134,17 @@ type Broker struct {

// BrokerConfig defines the broker configuration
type BrokerConfig struct {
Image string `json:"image,omitempty"`
Config string `json:"config,omitempty"`
StorageConfigs []StorageConfig `json:"storageConfigs,omitempty"`
ServiceAccountName string `json:"serviceAccountName,omitempty"`
Resources *corev1.ResourceRequirements `json:"resourceRequirements,omitempty"`
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
KafkaHeapOpts string `json:"kafkaHeapOpts,omitempty"`
KafkaJVMPerfOpts string `json:"kafkaJvmPerfOpts,omitempty"`
Image string `json:"image,omitempty"`
MetricsReporterImage string `json:"metricsReporterImage,omitempty"`
Config string `json:"config,omitempty"`
StorageConfigs []StorageConfig `json:"storageConfigs,omitempty"`
ServiceAccountName string `json:"serviceAccountName,omitempty"`
Resources *corev1.ResourceRequirements `json:"resourceRequirements,omitempty"`
ImagePullSecrets []corev1.LocalObjectReference `json:"imagePullSecrets,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
KafkaHeapOpts string `json:"kafkaHeapOpts,omitempty"`
KafkaJVMPerfOpts string `json:"kafkaJvmPerfOpts,omitempty"`
// Override for the default log4j configuration
Log4jConfig string `json:"log4jConfig,omitempty"`
// Custom annotations for the broker pods - e.g.: Prometheus scraping annotations:
Expand Down Expand Up @@ -586,6 +588,14 @@ func (kSpec *KafkaClusterSpec) GetClusterImage() string {
return "ghcr.io/banzaicloud/kafka:2.13-2.8.1"
}

// GetClusterMetricsReporterImage returns the default container image for Kafka Cluster
func (kSpec *KafkaClusterSpec) GetClusterMetricsReporterImage() string {
if kSpec.ClusterMetricsReporterImage != "" {
return kSpec.ClusterMetricsReporterImage
}
return kSpec.CruiseControlConfig.GetCCImage()
}

func (cTaskSpec *CruiseControlTaskSpec) GetDurationMinutes() float64 {
if cTaskSpec.RetryDurationMinutes == 0 {
return 5
Expand Down
6 changes: 6 additions & 0 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3520,6 +3520,8 @@ spec:
log4jConfig:
description: Override for the default log4j configuration
type: string
metricsReporterImage:
type: string
networkConfig:
description: Network throughput information in kB/s used by
Cruise Control to determine broker network capacity. By default
Expand Down Expand Up @@ -9359,6 +9361,8 @@ spec:
log4jConfig:
description: Override for the default log4j configuration
type: string
metricsReporterImage:
type: string
networkConfig:
description: Network throughput information in kB/s used
by Cruise Control to determine broker network capacity.
Expand Down Expand Up @@ -11690,6 +11694,8 @@ spec:
type: array
clusterImage:
type: string
clusterMetricsReporterImage:
type: string
clusterWideConfig:
type: string
cruiseControlConfig:
Expand Down
6 changes: 6 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3519,6 +3519,8 @@ spec:
log4jConfig:
description: Override for the default log4j configuration
type: string
metricsReporterImage:
type: string
networkConfig:
description: Network throughput information in kB/s used by
Cruise Control to determine broker network capacity. By default
Expand Down Expand Up @@ -9358,6 +9360,8 @@ spec:
log4jConfig:
description: Override for the default log4j configuration
type: string
metricsReporterImage:
type: string
networkConfig:
description: Network throughput information in kB/s used
by Cruise Control to determine broker network capacity.
Expand Down Expand Up @@ -11689,6 +11693,8 @@ spec:
type: array
clusterImage:
type: string
clusterMetricsReporterImage:
type: string
clusterWideConfig:
type: string
cruiseControlConfig:
Expand Down
10 changes: 5 additions & 5 deletions pkg/resources/kafka/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ rm /var/run/wait/do-not-exit-yet`}
),
Spec: corev1.PodSpec{
SecurityContext: brokerConfig.PodSecurityContext,
InitContainers: getInitContainers(brokerConfig.InitContainers, r.KafkaCluster.Spec),
InitContainers: getInitContainers(brokerConfig, r.KafkaCluster.Spec),
Affinity: getAffinity(brokerConfig, r.KafkaCluster),
Containers: append([]corev1.Container{
{
Expand Down Expand Up @@ -181,14 +181,14 @@ fi`},
return pod
}

func getInitContainers(brokerConfigInitContainers []corev1.Container, kafkaClusterSpec v1beta1.KafkaClusterSpec) []corev1.Container {
initContainers := make([]corev1.Container, 0, len(brokerConfigInitContainers))
initContainers = append(initContainers, brokerConfigInitContainers...)
func getInitContainers(brokerConfig *v1beta1.BrokerConfig, kafkaClusterSpec v1beta1.KafkaClusterSpec) []corev1.Container {
initContainers := make([]corev1.Container, 0, len(brokerConfig.InitContainers))
initContainers = append(initContainers, brokerConfig.InitContainers...)

initContainers = append(initContainers, []corev1.Container{
{
Name: "cruise-control-reporter",
Image: kafkaClusterSpec.CruiseControlConfig.GetCCImage(),
Image: util.GetBrokerMetricsReporterImage(brokerConfig, kafkaClusterSpec),
Command: []string{"/bin/sh", "-cex", "cp -v /opt/cruise-control/cruise-control/build/dependant-libs/cruise-control-metrics-reporter.jar /opt/kafka/libs/extensions/cruise-control-metrics-reporter.jar"},
VolumeMounts: []corev1.VolumeMount{{
Name: "extensions",
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,14 @@ func GetBrokerImage(brokerConfig *v1beta1.BrokerConfig, clusterImage string) str
return clusterImage
}

// GetBrokerMetricsReporterImage returns the image used for cruise-control metrics reporter
func GetBrokerMetricsReporterImage(brokerConfig *v1beta1.BrokerConfig, kafkaClusterSpec v1beta1.KafkaClusterSpec) string {
if brokerConfig.MetricsReporterImage != "" {
return brokerConfig.MetricsReporterImage
}
return kafkaClusterSpec.GetClusterMetricsReporterImage()
}

// getRandomString returns a random string containing uppercase, lowercase and number characters with the length given
func GetRandomString(length int) (string, error) {
rand.Seed(time.Now().UnixNano())
Expand Down

0 comments on commit 3dd7b51

Please sign in to comment.