Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial KRaft support #1023

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
dfc0824
Add koperator/api changes for KRaft support
panyuenlau Jul 25, 2023
722fd96
Merge branch 'master' into kraft-api
panyuenlau Jul 26, 2023
30427bf
Merge branch 'master' into kraft-api
panyuenlau Jul 27, 2023
54af705
Move processRoles under brokerConfig
panyuenlau Jul 27, 2023
dbf3200
Update comments for ZK-relevant configurations in kafkacluster_types.go
panyuenlau Jul 27, 2023
219d998
Rebase origin/kraft-api
panyuenlau Jul 20, 2023
314ea41
Support running Kafka cluster in KRaft mode
panyuenlau Jul 20, 2023
7613f06
brokerRoles -> processRoles to match upstream Kafka naming
panyuenlau Jul 21, 2023
aca2396
Update pod start-up process so pods can be restarted during rolling u…
panyuenlau Jul 21, 2023
c5a4819
Rebase from origin/kraft-api
panyuenlau Jul 23, 2023
5d1b777
Update exsiting integration tests and func signatures
panyuenlau Jul 23, 2023
aa4f5f5
Fix broker configurations; add unit tests for broker configurations u…
panyuenlau Jul 24, 2023
7431c93
Remove unnecessary method from koperator/api
panyuenlau Jul 25, 2023
2e3be06
Extend integration tests to cover KRaft mode
panyuenlau Jul 25, 2023
78e63ee
make lint-fix
panyuenlau Jul 25, 2023
3b5aff9
Update static KafkaCluster yamls; add check for kraft mode before set…
panyuenlau Jul 25, 2023
f9aedac
Rebase from origin/koperator-api
panyuenlau Jul 25, 2023
5804835
Use util functions that got moved to the koperator/api module
panyuenlau Jul 25, 2023
ca16422
Remove unineteded changes during rebase
panyuenlau Jul 25, 2023
afd567b
Do not take active controller identity into consideration when reorde…
panyuenlau Jul 25, 2023
bbc0307
Update implementation to accomomdate the latest KafkaCluster API change
panyuenlau Jul 27, 2023
455ef3f
Make comments about ZK-relevant configurations more clear
panyuenlau Jul 27, 2023
6b3d616
Add ConcurrentBrokerRestartCountPerRack to RollingUpgradeConfig (#1002)
ctrlaltluc Jul 27, 2023
0a90251
Small refactoring
panyuenlau Jul 28, 2023
63e15fa
Merge branch 'master' into kraft
panyuenlau Jul 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/banzaicloud/istio-client-go v0.0.17
github.com/cert-manager/cert-manager v1.11.2
github.com/imdario/mergo v0.3.13
github.com/stretchr/testify v1.8.1
golang.org/x/exp v0.0.0-20230713183714-613f0c0eb8a1
gotest.tools v2.2.0+incompatible
k8s.io/api v0.26.4
Expand All @@ -15,6 +16,7 @@ require (
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/google/go-cmp v0.5.9 // indirect
Expand All @@ -23,12 +25,14 @@ require (
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/text v0.7.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/klog/v2 v2.80.1 // indirect
k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
Expand Down
7 changes: 7 additions & 0 deletions api/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,13 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
Expand Down Expand Up @@ -100,8 +105,10 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
k8s.io/api v0.26.4 h1:qSG2PmtcD23BkYiWfoYAcak870eF/hE7NNYBYavTT94=
Expand Down
10 changes: 10 additions & 0 deletions api/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,13 @@ func MergeLabels(l ...map[string]string) map[string]string {
func LabelsForKafka(name string) map[string]string {
return map[string]string{"app": "kafka", "kafka_cr": name}
}

// StringSliceContains returns true if list contains s
func StringSliceContains(list []string, s string) bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion:
From go 1.18 there is an alternative solution for this
if idx := slices.Index(list, s); idx != -1 f {
found
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx! I just copied this func and the test from the util package without taking a second thought about it. Will look into the suggestion

for _, v := range list {
if v == s {
return true
}
}
return false
}
10 changes: 10 additions & 0 deletions api/util/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,13 @@ func TestMergeLabels(t *testing.T) {
t.Error("Expected:", expected, "Got:", merged)
}
}

func TestStringSliceContains(t *testing.T) {
slice := []string{"1", "2", "3"}
if !StringSliceContains(slice, "1") {
t.Error("Expected slice contains 1, got false")
}
if StringSliceContains(slice, "4") {
t.Error("Expected slice not contains 4, got true")
}
}
66 changes: 63 additions & 3 deletions api/v1beta1/kafkacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"strings"

"emperror.dev/errors"

"github.com/imdario/mergo"

"github.com/banzaicloud/istio-client-go/pkg/networking/v1beta1"
Expand Down Expand Up @@ -62,19 +61,34 @@ const (

// DefaultKafkaImage is the default Kafka image used when users don't specify it in KafkaClusterSpec.ClusterImage
DefaultKafkaImage = "ghcr.io/banzaicloud/kafka:2.13-3.4.1"

// controllerNodeProcessRole represents the node is a controller node
controllerNodeProcessRole = "controller"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion:
We can create a type for this e.g:
type kraftNodeRole string
const kraftNodeRoleController kraftNodeRole = controller

// brokerNodeProcessRole represents the node is a broker node
brokerNodeProcessRole = "broker"
)

// KafkaClusterSpec defines the desired state of KafkaCluster
type KafkaClusterSpec struct {
// kRaft is used to decide where the Kafka cluster is under KRaft mode or ZooKeeper mode.
// This is default to be true; if set to false, the Kafka cluster is in ZooKeeper mode.
// +kubebuilder:default=true
// +optional
KRaftMode bool `json:"kRaft"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should default to false.
This is only our initial kraft PR I dont think that we use this as default.
Im also thinking on to use the "experimental kraft support" expression in the description.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially had this default to false as well, but then I realized the overall product strategy of the next release of Calisti, where we seem to want to default everything to KRaft as well. Hence I made the change to default the KafkaCluster to be in KRaft.

This is a bigger discussion than the flag itself, let's also make the team aware of this

HeadlessServiceEnabled bool `json:"headlessServiceEnabled"`
ListenersConfig ListenersConfig `json:"listenersConfig"`
// Custom ports to expose in the container. Example use case: a custom kafka distribution, that includes an integrated metrics api endpoint
AdditionalPorts []corev1.ContainerPort `json:"additionalPorts,omitempty"`
// ZKAddresses specifies the ZooKeeper connection string
// in the form hostname:port where host and port are the host and port of a ZooKeeper server.
ZKAddresses []string `json:"zkAddresses"`
// Under ZooKeeper mode, this is a must-have configuration.
// And if set under KRaft mode, Koperator ignores this configuration.
// +optional
ZKAddresses []string `json:"zkAddresses,omitempty"`
// ZKPath specifies the ZooKeeper chroot path as part
// of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace.
// If set under KRaft mode, Koperator ignores this configuration.
// +optional
ZKPath string `json:"zkPath,omitempty"`
RackAwareness *RackAwareness `json:"rackAwareness,omitempty"`
ClusterImage string `json:"clusterImage,omitempty"`
Expand Down Expand Up @@ -126,6 +140,8 @@ type KafkaClusterStatus struct {
RollingUpgrade RollingUpgradeStatus `json:"rollingUpgradeStatus,omitempty"`
AlertCount int `json:"alertCount"`
ListenerStatuses ListenerStatuses `json:"listenerStatuses,omitempty"`
// ClusterID is a base64-encoded random UUID generated by Koperator to run the Kafka cluster in KRaft mode
ClusterID string `json:"clusterID,omitempty"`
}

// RollingUpgradeStatus defines status of rolling upgrade
Expand Down Expand Up @@ -173,11 +189,12 @@ type DisruptionBudgetWithStrategy struct {
DisruptionBudget `json:",inline"`
// The strategy to be used, either minAvailable or maxUnavailable
// +kubebuilder:validation:Enum=minAvailable;maxUnavailable
Stategy string `json:"strategy,omitempty"`
Strategy string `json:"strategy,omitempty"`
}

// Broker defines the broker basic configuration
type Broker struct {
// id maps to "node.id" configuration in KRaft mode, and it maps to "broker.id" configuration in ZooKeeper mode.
// +kubebuilder:validation:Minimum=0
// +kubebuilder:validation:Maximum=65535
// +kubebuilder:validation:ExclusiveMaximum=true
Expand All @@ -189,6 +206,11 @@ type Broker struct {

// BrokerConfig defines the broker configuration
type BrokerConfig struct {
// processRoles defines the role(s) for this particular Kafka node: "broker", "controller", or both.
// This must be set in KRaft mode. If set in ZooKeeper mode, Koperator ignores this configuration.
// +kubebuilder:validation:MaxItems=2
// +optional
Roles []string `json:"processRoles,omitempty"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also can be []KraftNodeRole

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I thought about different potential names, but ended up following what the Kafka community itself uses: https://github.com/apache/kafka/blob/trunk/config/kraft/controller.properties#L24

This way, the users (I assume they are someone familiar with Kafka) would immediately know what this field is about. Hope that makes sense

Image string `json:"image,omitempty"`
MetricsReporterImage string `json:"metricsReporterImage,omitempty"`
Config string `json:"config,omitempty"`
Expand Down Expand Up @@ -871,6 +893,19 @@ func (bConfig *BrokerConfig) GetTerminationGracePeriod() int64 {
return *bConfig.TerminationGracePeriod
}

// GetStorageMountPaths returns a string with comma-separated storage mount paths that the broker uses
func (bConfig *BrokerConfig) GetStorageMountPaths() string {
var mountPaths string
for i, sc := range bConfig.StorageConfigs {
if i != len(bConfig.StorageConfigs)-1 {
mountPaths += sc.MountPath + ","
} else {
mountPaths += sc.MountPath
}
}
return mountPaths
}

// GetNodeSelector returns the node selector for cruise control
func (cConfig *CruiseControlConfig) GetNodeSelector() map[string]string {
return cConfig.NodeSelector
Expand Down Expand Up @@ -1004,6 +1039,31 @@ func (cConfig *CruiseControlConfig) GetResources() *corev1.ResourceRequirements
}
}

// IsBrokerNode returns true when the broker is a broker node
func (bConfig *BrokerConfig) IsBrokerNode() bool {
return util.StringSliceContains(bConfig.Roles, brokerNodeProcessRole)
}

// IsControllerNode returns true when the broker is a controller node
func (bConfig *BrokerConfig) IsControllerNode() bool {
return util.StringSliceContains(bConfig.Roles, controllerNodeProcessRole)
}

// IsBrokerOnlyNode returns true when the broker is a broker-only node
func (bConfig *BrokerConfig) IsBrokerOnlyNode() bool {
return bConfig.IsBrokerNode() && !bConfig.IsControllerNode()
}

// IsControllerOnlyNode returns true when the broker is a controller-only node
func (bConfig *BrokerConfig) IsControllerOnlyNode() bool {
return bConfig.IsControllerNode() && !bConfig.IsBrokerNode()
}

// IsCombinedNode returns true when the broker is a broker + controller node
func (bConfig *BrokerConfig) IsCombinedNode() bool {
return bConfig.IsBrokerNode() && bConfig.IsControllerNode()
}

// GetResources returns the broker specific Kubernetes resource
func (bConfig *BrokerConfig) GetResources() *corev1.ResourceRequirements {
if bConfig.Resources != nil {
Expand Down
Loading