Skip to content

Commit

Permalink
Fix CruiseControl resource names to be able to run multiple kafka clu…
Browse files Browse the repository at this point in the history
…sters on single namespace
  • Loading branch information
baluchicken committed Aug 22, 2019
1 parent 0bd5313 commit 4b192f4
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 58 deletions.
2 changes: 1 addition & 1 deletion internal/alertmanager/currentalert/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func downScale(labels model.LabelSet, client client.Client) error {
return err
}

brokerId, err := scale.GetBrokerIDWithLeastPartition(string(labels["namespace"]), cr.Spec.CruiseControlConfig.CruiseControlEndpoint)
brokerId, err := scale.GetBrokerIDWithLeastPartition(string(labels["namespace"]), cr.Spec.CruiseControlConfig.CruiseControlEndpoint, cr.Name)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/k8sutil/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func Reconcile(log logr.Logger, client runtimeClient.Client, desired runtime.Obj
}
}
if current.(*corev1.Pod).Status.Phase == corev1.PodRunning && brokerState.GracefulActionState.CruiseControlState == banzaicloudv1alpha1.GracefulUpdateRequired {
scaleErr := scale.UpScaleCluster(desired.(*corev1.Pod).Labels["brokerId"], desired.(*corev1.Pod).Namespace, cr.Spec.CruiseControlConfig.CruiseControlEndpoint)
scaleErr := scale.UpScaleCluster(desired.(*corev1.Pod).Labels["brokerId"], desired.(*corev1.Pod).Namespace, cr.Spec.CruiseControlConfig.CruiseControlEndpoint, cr.Name)
if scaleErr != nil {
log.Error(err, "graceful upscale failed, or cluster just started")
statusErr := updateGracefulScaleStatus(client, brokerId, cr,
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/cruisecontrol/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

func (r *Reconciler) configMap(log logr.Logger) runtime.Object {
configMap := &corev1.ConfigMap{
ObjectMeta: templates.ObjectMeta(configAndVolumeName, labelSelector, r.KafkaCluster),
ObjectMeta: templates.ObjectMeta(fmt.Sprintf(configAndVolumeNameTemplate, r.KafkaCluster.Name), labelSelector, r.KafkaCluster),
Data: map[string]string{
"cruisecontrol.properties": r.KafkaCluster.Spec.CruiseControlConfig.Config + fmt.Sprintf(`
# The Kafka cluster to control.
Expand Down
26 changes: 14 additions & 12 deletions pkg/resources/cruisecontrol/cruisecontrol.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package cruisecontrol

import (
"fmt"

banzaicloudv1alpha1 "github.com/banzaicloud/kafka-operator/pkg/apis/banzaicloud/v1alpha1"
"github.com/banzaicloud/kafka-operator/pkg/k8sutil"
"github.com/banzaicloud/kafka-operator/pkg/resources"
Expand All @@ -24,17 +26,17 @@ import (
)

const (
componentName = "cruisecontrol"
serviceName = "cruisecontrol-svc"
configAndVolumeName = "cruisecontrol-config"
modconfigAndVolumeName = "cruisecontrol-modconfig"
deploymentName = "cruisecontrol"
keystoreVolume = "ks-files"
keystoreVolumePath = "/var/run/secrets/java.io/keystores"
pemFilesVolume = "pem-files"
jmxVolumePath = "/opt/jmx-exporter/"
jmxVolumeName = "jmx-jar-data"
metricsPort = 9020
componentNameTemplate = "%s-cruisecontrol"
serviceNameTemplate = "%s-cruisecontrol-svc"
configAndVolumeNameTemplate = "%s-cruisecontrol-config"
modconfigAndVolumeName = "cruisecontrol-modconfig"
deploymentNameTemplate = "%s-cruisecontrol"
keystoreVolume = "ks-files"
keystoreVolumePath = "/var/run/secrets/java.io/keystores"
pemFilesVolume = "pem-files"
jmxVolumePath = "/opt/jmx-exporter/"
jmxVolumeName = "jmx-jar-data"
metricsPort = 9020
)

var labelSelector = map[string]string{
Expand All @@ -58,7 +60,7 @@ func New(client client.Client, cluster *banzaicloudv1alpha1.KafkaCluster) *Recon

// Reconcile implements the reconcile logic for CC
func (r *Reconciler) Reconcile(log logr.Logger) error {
log = log.WithValues("component", componentName)
log = log.WithValues("component", fmt.Sprintf(componentNameTemplate, r.KafkaCluster.Name))

log.V(1).Info("Reconciling")

Expand Down
16 changes: 8 additions & 8 deletions pkg/resources/cruisecontrol/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@ func (r *Reconciler) deployment(log logr.Logger) runtime.Object {
if r.KafkaCluster.Spec.ListenersConfig.SSLSecrets != nil && util.IsSSLEnabledForInternalCommunication(r.KafkaCluster.Spec.ListenersConfig.InternalListeners) {
volume = append(volume, generateVolumesForSSL(r.KafkaCluster.Spec.ListenersConfig.SSLSecrets.TLSSecretName)...)
volumeMount = append(volumeMount, generateVolumeMountForSSL()...)
initContainers = append(initContainers, generateInitContainerForSSL(r.KafkaCluster.Spec.ListenersConfig.SSLSecrets.JKSPasswordName, r.KafkaCluster.Spec.BrokerConfigs[0].Image))
initContainers = append(initContainers, generateInitContainerForSSL(r.KafkaCluster.Spec.ListenersConfig.SSLSecrets.JKSPasswordName, r.KafkaCluster.Spec.BrokerConfigs[0].Image, r.KafkaCluster.Name))
} else {
volumeMount = append(volumeMount, []corev1.VolumeMount{
{
Name: configAndVolumeName,
Name: fmt.Sprintf(configAndVolumeNameTemplate, r.KafkaCluster.Name),
MountPath: "/opt/cruise-control/config",
},
}...)
}

return &appsv1.Deployment{
ObjectMeta: templates.ObjectMeta(deploymentName, labelSelector, r.KafkaCluster),
ObjectMeta: templates.ObjectMeta(fmt.Sprintf(deploymentNameTemplate, r.KafkaCluster.Name), labelSelector, r.KafkaCluster),
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: labelSelector,
Expand All @@ -75,7 +75,7 @@ func (r *Reconciler) deployment(log logr.Logger) runtime.Object {
}...),
Containers: []corev1.Container{
{
Name: deploymentName,
Name: fmt.Sprintf(deploymentNameTemplate, r.KafkaCluster.Name),
Env: []corev1.EnvVar{
{
Name: "KAFKA_OPTS",
Expand Down Expand Up @@ -125,10 +125,10 @@ func (r *Reconciler) deployment(log logr.Logger) runtime.Object {
},
Volumes: append(volume, []corev1.Volume{
{
Name: configAndVolumeName,
Name: fmt.Sprintf(configAndVolumeNameTemplate, r.KafkaCluster.Name),
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{Name: configAndVolumeName},
LocalObjectReference: corev1.LocalObjectReference{Name: fmt.Sprintf(configAndVolumeNameTemplate, r.KafkaCluster.Name)},
DefaultMode: util.Int32Pointer(0644),
},
},
Expand All @@ -155,7 +155,7 @@ func (r *Reconciler) deployment(log logr.Logger) runtime.Object {
}
}

func generateInitContainerForSSL(secretName, image string) corev1.Container {
func generateInitContainerForSSL(secretName, image, clusterName string) corev1.Container {
// Keystore generator
initPemToKeyStore := corev1.Container{
Name: "pem-to-jks",
Expand Down Expand Up @@ -193,7 +193,7 @@ func generateInitContainerForSSL(secretName, image string) corev1.Container {
MountPath: "/var/run/secrets/pemfiles",
},
{
Name: configAndVolumeName,
Name: fmt.Sprintf(configAndVolumeNameTemplate, clusterName),
MountPath: "/config",
},
{
Expand Down
4 changes: 3 additions & 1 deletion pkg/resources/cruisecontrol/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package cruisecontrol

import (
"fmt"

"github.com/banzaicloud/kafka-operator/pkg/resources/templates"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
Expand All @@ -24,7 +26,7 @@ import (

func (r *Reconciler) service(log logr.Logger) runtime.Object {
return &corev1.Service{
ObjectMeta: templates.ObjectMeta(serviceName, labelSelector, r.KafkaCluster),
ObjectMeta: templates.ObjectMeta(fmt.Sprintf(serviceNameTemplate, r.KafkaCluster.Name), labelSelector, r.KafkaCluster),
Spec: corev1.ServiceSpec{
Selector: labelSelector,
Ports: []corev1.ServicePort{
Expand Down
2 changes: 1 addition & 1 deletion pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
deletedBrokers = append(deletedBrokers, pod)
}
for _, broker := range deletedBrokers {
err = scale.DownsizeCluster(broker.Labels["brokerId"], broker.Namespace, r.KafkaCluster.Spec.CruiseControlConfig.CruiseControlEndpoint)
err = scale.DownsizeCluster(broker.Labels["brokerId"], broker.Namespace, r.KafkaCluster.Spec.CruiseControlConfig.CruiseControlEndpoint, r.KafkaCluster.Name)
if err != nil {
log.Error(err, "graceful downscale failed.")
}
Expand Down
Loading

0 comments on commit 4b192f4

Please sign in to comment.