Skip to content

Commit

Permalink
Do not take active controller identity into consideration when reorde…
Browse files Browse the repository at this point in the history
…r the brokers
  • Loading branch information
panyuenlau committed Jul 25, 2023
1 parent 5ce9038 commit fd12f8f
Showing 1 changed file with 22 additions and 9 deletions.
31 changes: 22 additions & 9 deletions pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,6 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
runningBrokers[brokerID] = struct{}{}
}

controllerID, err := r.determineControllerId()
if err != nil {
log.Error(err, "could not find controller broker")
}

var pvcList corev1.PersistentVolumeClaimList
err = r.Client.List(ctx, &pvcList, client.ListOption(client.InNamespace(r.KafkaCluster.Namespace)), client.ListOption(matchingLabels))
if err != nil {
Expand All @@ -306,9 +301,6 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
}
}

reorderedBrokers := reorderBrokers(runningBrokers, boundPersistentVolumeClaims, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, controllerID, log)
allBrokerDynamicConfigSucceeded := true

// all broker nodes under the same Kafka cluster must use the same cluster UUID
if r.KafkaCluster.Spec.KRaftMode && r.KafkaCluster.Status.ClusterID == "" {
r.KafkaCluster.Status.ClusterID = generateRandomClusterID()
Expand All @@ -317,10 +309,29 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {
err = r.Client.Update(ctx, r.KafkaCluster)
}
if err != nil {
return errors.WrapIf(err, "could not update cluster UUID status")
return errors.NewWithDetails("could not update ClusterID status",
"component", componentName,
"clusterName", r.KafkaCluster.Name,
"clusterNamespace", r.KafkaCluster.Namespace)
}
}

controllerID, err := r.determineControllerId()
if err != nil {
log.Error(err, "could not find controller broker")
}

// In KRaft mode:
// 1. there is no way for admin client to know which node is the active controller, controllerID obtained above is just a broker ID of a random active broker (this is intentional by Kafka)
// 2. the follower controllers replicate the data that is written to the active controller and serves as hot standbys if the active controller fails.
// Because the controllers now all track the latest state, controller fail-over will not require a lengthy reloading time to have all the state to transfer to the new controller
// Therefore, by setting the controllerID to be -1 to not take the controller identity into consideration when reordering the brokers
if r.KafkaCluster.Spec.KRaftMode {
controllerID = -1
}
reorderedBrokers := reorderBrokers(runningBrokers, boundPersistentVolumeClaims, r.KafkaCluster.Spec.Brokers, r.KafkaCluster.Status.BrokersState, controllerID, log)

allBrokerDynamicConfigSucceeded := true
for _, broker := range reorderedBrokers {
brokerConfig, err := broker.GetBrokerConfig(r.KafkaCluster.Spec)
if err != nil {
Expand Down Expand Up @@ -1287,6 +1298,8 @@ func (r *Reconciler) getK8sNodeIP(nodeName string, nodeAddressType string) (stri
}

// determineControllerId returns the ID of the controller broker of the current cluster
// In KRaft mode, controller accesses are isolated from admin client (see KIP-590 for mode details),
// therefore, the KRaft metadata caches intentionally choose a random broker node to report as the controller
func (r *Reconciler) determineControllerId() (int32, error) {
kClient, close, err := r.kafkaClientProvider.NewFromCluster(r.Client, r.KafkaCluster)
if err != nil {
Expand Down

0 comments on commit fd12f8f

Please sign in to comment.