Skip to content

Commit

Permalink
Allow Kafka to use External DNS for inter-broker protocol (#17)
Browse files Browse the repository at this point in the history
  • Loading branch information
alungu committed Mar 2, 2021
1 parent a7a7f45 commit 86bf343
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 9 deletions.
13 changes: 13 additions & 0 deletions charts/kafka-operator/templates/crds.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10877,6 +10877,19 @@ spec:
containerPort:
format: int32
type: integer
externalListenerForHostname:
description: If set to a non-empty value, the Kafka brokers
will use the external hostname for inter broker communication.
The internal lister will will share the same hostname with
the external listener that is referenced here.
type: string
internalStartingPort:
description: This following options are helpful when you want
to run a Kafka cluster over multiple Kubernetes clusters.
The broker internal ports are computed as the sum of the
internalStartingPort and the broker id.
format: int32
type: integer
name:
type: string
type:
Expand Down
13 changes: 13 additions & 0 deletions config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10876,6 +10876,19 @@ spec:
containerPort:
format: int32
type: integer
externalListenerForHostname:
description: If set to a non-empty value, the Kafka brokers
will use the external hostname for inter broker communication.
The internal lister will will share the same hostname with
the external listener that is referenced here.
type: string
internalStartingPort:
description: This following options are helpful when you want
to run a Kafka cluster over multiple Kubernetes clusters.
The broker internal ports are computed as the sum of the
internalStartingPort and the broker id.
format: int32
type: integer
name:
type: string
type:
Expand Down
36 changes: 28 additions & 8 deletions pkg/k8sutil/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func UpdateListenerStatuses(ctx context.Context, c client.Client, cluster *v1bet
return nil
}

func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[string]v1beta1.ListenerStatusList, map[string]v1beta1.ListenerStatusList) {
func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster, externalListenerStatus map[string]v1beta1.ListenerStatusList) (map[string]v1beta1.ListenerStatusList, map[string]v1beta1.ListenerStatusList) {
intListenerStatuses := make(map[string]v1beta1.ListenerStatusList, len(kafkaCluster.Spec.ListenersConfig.InternalListeners))
controllerIntListenerStatuses := make(map[string]v1beta1.ListenerStatusList)

Expand All @@ -323,13 +323,24 @@ func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[str

// add addresses per broker
for _, broker := range kafkaCluster.Spec.Brokers {
var address string
if kafkaCluster.Spec.HeadlessServiceEnabled {
address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name,
kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
} else {
address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace,
kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
var address = ""

if iListener.ExternalListenerForHostname != "" && iListener.InternalStartingPort > 0 {
if eListenerStatus, ok := externalListenerStatus[iListener.ExternalListenerForHostname]; ok {
hostname := getHostnameForBrokerId(eListenerStatus, broker.Id)
address = fmt.Sprintf("%s://%s:%d",
strings.ToUpper(iListener.Name), hostname, iListener.InternalStartingPort+broker.Id)
}
}

if address == "" {
if kafkaCluster.Spec.HeadlessServiceEnabled {
address = fmt.Sprintf("%s-%d.%s-headless.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Name,
kafkaCluster.Namespace, kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
} else {
address = fmt.Sprintf("%s-%d.%s.svc.%s:%d", kafkaCluster.Name, broker.Id, kafkaCluster.Namespace,
kafkaCluster.Spec.GetKubernetesClusterDomain(), iListener.ContainerPort)
}
}
listenerStatusList = append(listenerStatusList, v1beta1.ListenerStatus{
Name: fmt.Sprintf("broker-%d", broker.Id),
Expand All @@ -346,3 +357,12 @@ func CreateInternalListenerStatuses(kafkaCluster *v1beta1.KafkaCluster) (map[str

return intListenerStatuses, controllerIntListenerStatuses
}

func getHostnameForBrokerId(eListenerStatusList v1beta1.ListenerStatusList, brokerId int32) string {
for _, eListenerStatus := range eListenerStatusList {
if eListenerStatus.Name == fmt.Sprintf("broker-%d", brokerId) {
return strings.Split(eListenerStatus.Address, ":")[0]
}
}
return ""
}
2 changes: 1 addition & 1 deletion pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
if err != nil {
return errors.WrapIf(err, "could not update status for external listeners")
}
intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster)
intListenerStatuses, controllerIntListenerStatuses := k8sutil.CreateInternalListenerStatuses(r.KafkaCluster, extListenerStatuses)
err = k8sutil.UpdateListenerStatuses(context.Background(), r.Client, r.KafkaCluster, log, intListenerStatuses, extListenerStatuses)
if err != nil {
return errors.WrapIf(err, "failed to update listener statuses")
Expand Down
8 changes: 8 additions & 0 deletions pkg/sdk/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,14 @@ type InternalListenerConfig struct {
CommonListenerSpec `json:",inline"`
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
UsedForControllerCommunication bool `json:"usedForControllerCommunication,omitempty"`
// This following options are helpful when you want to run a Kafka cluster over multiple Kubernetes clusters.
// The broker internal ports are computed as the sum of the internalStartingPort and the broker id.
// +optional
InternalStartingPort int32 `json:"internalStartingPort"`
// If set to a non-empty value, the Kafka brokers will use the external hostname for inter broker communication.
// The internal lister will will share the same hostname with the external listener that is referenced here.
// +optional
ExternalListenerForHostname string `json:"externalListenerForHostname,omitempty"`
}

// CommonListenerSpec defines the common building block for Listener type
Expand Down

0 comments on commit 86bf343

Please sign in to comment.