-
Notifications
You must be signed in to change notification settings - Fork 199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial KRaft support #1023
Initial KRaft support #1023
Conversation
// In KRaft mode, controller accesses are isolated from admin client (see KIP-590 for mode details), | ||
// therefore, the KRaft metadata caches intentionally choose a random broker node to report as the controller |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also see IBM/sarama#2521 for relevant discussions
pkg/resources/kafka/kafka.go
Outdated
// In KRaft mode: | ||
// 1. there is no way for admin client to know which node is the active controller, controllerID obtained above is just a broker ID of a random active broker (this is intentional by Kafka) | ||
// 2. the follower controllers replicate the data that is written to the active controller and serves as hot standbys if the active controller fails. | ||
// Because the controllers now all track the latest state, controller fail-over will not require a lengthy reloading time to have all the state to transfer to the new controller | ||
// Therefore, by setting the controllerID to be -1 to not take the controller identity into consideration when reordering the brokers | ||
if r.KafkaCluster.Spec.KRaftMode { | ||
controllerID = -1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The potential optimization that we can do here is to ensure there at least one controller broker gets up first during the cluster start-up process, this way the normal brokers can register themselves to the controller broker as soon as they are ready to send out the registration requests.
Currently, if all the normal brokers are reordered in the front of the reconciliation queue and we are in a huge Kafka cluster (let's say 997 brokers + 3 controllers), the 997 normal brokers would need to wait for >=1 of the three controllers to be ready so they can register themselves; and if the K8s cluster doesn't have enough resources to the 998th broker pod in the list (which is the first controller node), the entire Kafka cluster will in a completely non-functional state.
Since this doesn't really affect the main logic of handling KRaft mode, I'd like to do this part in a separate PR so this PR doesn't keep growing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, left some comments. It's a long one so I may have missed something important, if so, sorry about that.
pkg/resources/kafka/configmap.go
Outdated
properties "github.com/banzaicloud/koperator/properties/pkg" | ||
) | ||
|
||
func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32, | ||
func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v1beta1.Broker, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: I'm kinda curious about the naming, what should we call brokers that are not actually brokers instead? Like the "brokers" that are just controllers from now on and do not really fit into the naming because of that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea... it is actually confusing because we abuse the term "broker" everywhere in our code base since it was generic enough to represent any Kafka nodes in entire Kafka cluster. We might need a dedicated PR just to refactor the naming convention in the future to make it clear about "broker" vs "controller" vs "combined" across the code base
"github.com/banzaicloud/koperator/api/v1beta1" | ||
) | ||
|
||
func TestGenerateClusterID(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a nice test idea.
…pgrade triggered by controller addition/removal
…ting ClusterID in status
* Add ConcurrentBrokerRestartCountPerRack to RollingUpgradeConfig
FYI - the upscale part for KRaft is on the way, but I will open a separate PR for that since the automatic scaling features are pretty much self-contained from the rest of the functionalities |
Update: when testing the work for event-based scaling, found a major problem that cruise control cannot re-balance for controller-nodes, because CC actually fails at this sanity check for the controller-only nodes' existence since the check is using metadata returned from the Kafka cluster, and the returned metadata intentionally doesn't expose controller-only nodes. I have a working solution, which was to remove the controller-nodes from the Removing controller-only nodes is also problematic for similar root-cause: the available brokers retrieved from CC doesn't have information about the controller-only nodes, and the result of that is no Update: #1027 is my solution |
I suggest (now or later) to move out the pure controllers from the []broker property and create separate []controller. Advantages:
Disadvantages (mostly extra work):
What do you think? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work!
I wrote some comments regarding that I found.
I also wrote some future improvement ideas
@@ -39,3 +39,13 @@ func MergeLabels(l ...map[string]string) map[string]string { | |||
func LabelsForKafka(name string) map[string]string { | |||
return map[string]string{"app": "kafka", "kafka_cr": name} | |||
} | |||
|
|||
// StringSliceContains returns true if list contains s | |||
func StringSliceContains(list []string, s string) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
From go 1.18 there is an alternative solution for this
if idx := slices.Index(list, s); idx != -1 f {
found
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx! I just copied this func and the test from the util package without taking a second thought about it. Will look into the suggestion
@@ -62,19 +61,34 @@ const ( | |||
|
|||
// DefaultKafkaImage is the default Kafka image used when users don't specify it in KafkaClusterSpec.ClusterImage | |||
DefaultKafkaImage = "ghcr.io/banzaicloud/kafka:2.13-3.4.1" | |||
|
|||
// controllerNodeProcessRole represents the node is a controller node | |||
controllerNodeProcessRole = "controller" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggestion:
We can create a type for this e.g:
type kraftNodeRole string
const kraftNodeRoleController kraftNodeRole = controller
// This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode. | ||
// +kubebuilder:default=true | ||
// +optional | ||
KRaftMode bool `json:"kRaft"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should default to false.
This is only our initial kraft PR I dont think that we use this as default.
Im also thinking on to use the "experimental kraft support" expression in the description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially had this default to false as well, but then I realized the overall product strategy of the next release of Calisti, where we seem to want to default everything to KRaft as well. Hence I made the change to default the KafkaCluster
to be in KRaft.
This is a bigger discussion than the flag itself, let's also make the team aware of this
@@ -28,5 +28,30 @@ if [[ -n "$ENVOY_SIDECAR_STATUS" ]]; then | |||
done | |||
fi | |||
touch /var/run/wait/do-not-exit-yet | |||
|
|||
# A few necessary steps if we are in KRaft mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename this file to something else because now it is more about necessary pre start things todo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, I wasn't really aware of the history of this little script
} | ||
|
||
// In KRaft mode: | ||
// 1. there is no way for admin client to know which node is the active controller, controllerID obtained above is just a broker ID of a random active broker (this is intentional by Kafka) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can know which one is the leader by requesting metadata for the __cluster_metadata. This is a usual kafka topic and the partition leader of this topic is the active controller.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(possible optimalization) Restart the active controller for last
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the active controller fails there will be a new leader candidate and some steps to elect a new leader
I dont know how much time take this process but in a bad order scennario when we restart first the active controller and after the new active controller and so on there will be no controller for more time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can know which one is the leader by requesting metadata for the __cluster_metadata. This is a usual kafka topic and the partition leader of this topic is the active controller.
Nice little hack, except it doesnt' really work the way we want it to be...
First of all, Kafka itself intentionally doesn't want clients (either normal or admin) to have direct access toward the controllers, see section in this official design doc: https://cwiki.apache.org/confluence/display/KAFKA/KIP-631%3A+The+Quorum-based+Kafka+Controller#KIP631:TheQuorumbasedKafkaController-Networking
So our admin client in Koperator can only talk to the normal brokers, but those brokers don't have access to the __cluster_metadata
topic, hence we have this error if we try to describe the topic in Koperator:
{"level":"error","ts":"2023-07-30T12:48:56.910Z","msg":"could not find controller broker","controller":"KafkaCluster","controllerGroup":"kafka.banzaicloud.io","controllerKind":"KafkaCluster","KafkaCluster":{"name":"kafka","namespace":"default"},"namespace":"default","name":"kafka","reconcileID":"a897850f-1185-47f2-b46e-ec86b6275541","component":"kafka","clusterName":"kafka","clusterNamespace":"default","error":"could not find describe topic __cluster_metadata: kafka server: Request was for a topic or partition that does not exist on this broker","errorVerbose":"kafka server: Request was for a topic or partition that does not exist on this broker\ncould not find describe topic __cluster_metadata\ngithub.com/banzaicloud/koperator/pkg/resources/kafka.(*Reconciler).determineControllerId\n\t/workspace/pkg/resources/kafka/kafka.go:1341\ngithub.com/banzaicloud/koperator/pkg/resources/kafka.(*Reconciler).Reconcile\n\t....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Re: about the optimization
What I tried to explain in the 2 point point in that block of comment: all the other non-active (the followers) controllers in the controller group store the metadata in memory (they are referred to as "hot standbys"), and hence it won't take as much time as in the ZK-world for leadership transfer (arguably it takes very little time).
With that, I don't actually think putting the active controller at the end of the reconciliation queue would be as useful as in ZK-world (assuming we managed to figure out how to find the active controller, if possible)
@@ -43,6 +43,7 @@ var ( | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be nice to add (now or later PR) the role into the kafka pod name like:
kafka-controller
kafka-broker-controller
kafka-broker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if len(mountPathsMerged) != 0 { | ||
if err := config.Set(kafkautils.KafkaConfigBrokerLogDirectory, strings.Join(mountPathsMerged, ",")); err != nil { | ||
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.KafkaConfigBrokerLogDirectory)) | ||
// Add listener configuration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I miss something but I cant see here that case for the kafkautils.KafkaConfigListeners when the broker is "broker-controller" at the same time.
I can see bConfig.IsControllerOnlyNode() and bConfig.IsBrokerOnlyNode()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The generateListenerSpecificConfig that is called in the next line implicitly sets the listeners
section: https://github.com/banzaicloud/koperator/blob/kraft/pkg/resources/kafka/configmap.go#L372-L374, and this existing implementation works fine for the combined roles (and it of course still works in zk-world). The implementation later overwrites this configuration only when the node is controller-only, or broker-only.
I found the current way requires the least amount of changes from what we already have, I will probably add a comment about this implication.
configCCMetricsReporter(r.KafkaCluster, config, clientPass, bootstrapServers, log) | ||
|
||
// Kafka Broker configurations | ||
if r.KafkaCluster.Spec.KRaftMode { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have we got protection against that case when the user set KraftMode true but he does not specify any controller for the kafkaCluster?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not in the main implementation, I was thinking to consider this as a invalid configuration of the KafkaCluster
and validation webhook will reject such configuration - the validation webhook changes will be in a separate PR because I don't want this PR to keep growing. There will be other validation checks needed as well
// This must be set in KRaft mode. If set in ZooKeeper mode, Koperator ignores this configuration. | ||
// +kubebuilder:validation:MaxItems=2 | ||
// +optional | ||
Roles []string `json:"processRoles,omitempty"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This also can be []KraftNodeRole
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I thought about different potential names, but ended up following what the Kafka community itself uses: https://github.com/apache/kafka/blob/trunk/config/kraft/controller.properties#L24
This way, the users (I assume they are someone familiar with Kafka) would immediately know what this field is about. Hope that makes sense
@bartam1 Thanks for the thoughtful suggestions! As for separating the However, I have some other concerns as well:
type KafkaClusterSpec struct {
BrokerConfigGroups map[string]BrokerConfig `json:"brokerConfigGroups,omitempty"`
Brokers []Broker `json:"brokers"`
ControllerConfigGroups map[string]ControllerConfig `json:"controllerConfigGroups,omitempty"`
Controllers []Controller `json:"controllers"`
} And the |
Per internal discussion - we will move forward with separating Therefore, closing this PR |
Description
This PR aims to provide initial KRaft support in the Koperator implementation, functionalities that are verified in KRaft mode:
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
) that the "brokers" use is not applicable for "controller"Things will be added in separate PRs
Service
to connect to the brokers, currently theheadless
service, LB service, etc are wired with ALL of the broker pods, but we shouldn't remove the controller pods from thoseServices
because controller nodes are not meant to be connected by clients (either normal or admin clients) in KRaft modeKafkaCluster
validation webhook to reject invalid configurationsType of Change
Checklist