Skip to content

Commit

Permalink
Allow dashes when parsing broker rack (#68)
Browse files Browse the repository at this point in the history
  • Loading branch information
ctrlaltluc authored Jun 28, 2023
1 parent 83868b8 commit c8446a6
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 8 deletions.
7 changes: 6 additions & 1 deletion pkg/resources/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ const (

var (
// kafkaConfigBrokerRackRegex the regex to parse the "broker.rack" Kafka property used in read-only configs
kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*(\w+)`)
kafkaConfigBrokerRackRegex = regexp.MustCompile(`broker\.rack\s*=\s*([\w-]+)`)
)

// Reconciler implements the Component Reconciler
Expand Down Expand Up @@ -219,6 +219,8 @@ func (r *Reconciler) Reconcile(log logr.Logger) error {

log.V(1).Info("Reconciling")

log.Info("broker rack map", "kafkaBrokerAvailabilityZoneMap", r.kafkaBrokerAvailabilityZoneMap)

ctx := context.Background()
if err := k8sutil.UpdateBrokerConfigurationBackup(r.Client, r.KafkaCluster); err != nil {
log.Error(err, "failed to update broker configuration backup")
Expand Down Expand Up @@ -926,6 +928,9 @@ func (r *Reconciler) handleRollingUpgrade(log logr.Logger, desiredPod, currentPo
// Check if we support multiple broker restarts and restart only in same AZ, otherwise restart only 1 broker at once
concurrentBrokerRestartsAllowed := r.getConcurrentBrokerRestartsAllowed()
terminatingOrPendingPods := getPodsInTerminatingOrPendingState(podList.Items)
if len(terminatingOrPendingPods) > 0 {
log.Info("terminating or pending pods", "terminatingOrPendingPods", terminatingOrPendingPods)
}
if len(terminatingOrPendingPods) >= concurrentBrokerRestartsAllowed {
return errorfactory.New(errorfactory.ReconcileRollingUpgrade{}, errors.New(strconv.Itoa(concurrentBrokerRestartsAllowed)+" pod(s) is still terminating or creating"), "rolling upgrade in progress")
}
Expand Down
132 changes: 125 additions & 7 deletions pkg/resources/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,9 +1269,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201"}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
errorExpected: true,
},
Expand All @@ -1293,9 +1293,9 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301"}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
errorExpected: true,
},
Expand Down Expand Up @@ -1531,6 +1531,124 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {
outOfSyncReplicas: []int32{101},
errorExpected: false,
},
{
testName: "Pod is not deleted if pod is restarting in another AZ, if brokers per AZ < tolerated failures",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartsAllowed: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
errorExpected: true,
},
{
testName: "Pod is not deleted if there are out-of-sync replicas in another AZ, if brokers per AZ < tolerated failures",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartsAllowed: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
outOfSyncReplicas: []int32{101},
errorExpected: true,
},
{
testName: "Pod is not deleted if there are offline replicas in another AZ, if brokers per AZ < tolerated failures",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartsAllowed: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
allOfflineReplicas: []int32{101},
errorExpected: true,
},
{
testName: "Pod is not deleted if pod is restarting in another AZ, if broker rack value contains dashes",
kafkaCluster: v1beta1.KafkaCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "kafka",
Namespace: "kafka",
},
Spec: v1beta1.KafkaClusterSpec{
Brokers: []v1beta1.Broker{
{Id: 101, ReadOnlyConfig: "broker.rack=az-1"},
{Id: 201, ReadOnlyConfig: "broker.rack=az-2"},
{Id: 301, ReadOnlyConfig: "broker.rack=az-3"},
},
RollingUpgradeConfig: v1beta1.RollingUpgradeConfig{
FailureThreshold: 2,
ConcurrentBrokerRestartsAllowed: 2,
},
},
Status: v1beta1.KafkaClusterStatus{State: v1beta1.KafkaClusterRollingUpgrading},
},
desiredPod: &corev1.Pod{},
currentPod: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
pods: []corev1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-101", Labels: map[string]string{"brokerId": "101"}, DeletionTimestamp: &metav1.Time{Time: time.Now()}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-201", Labels: map[string]string{"brokerId": "201"}}},
{ObjectMeta: metav1.ObjectMeta{Name: "kafka-301", Labels: map[string]string{"brokerId": "301"}}},
},
errorExpected: true,
},
}

mockCtrl := gomock.NewController(t)
Expand All @@ -1557,7 +1675,7 @@ func TestReconcileConcurrentBrokerRestartsAllowed(t *testing.T) {

// Mock kafka client
mockedKafkaClient := new(mocks.KafkaClient)
mockedKafkaClient.On("AllOfflineReplicas").Return(test.outOfSyncReplicas, nil)
mockedKafkaClient.On("AllOfflineReplicas").Return(test.allOfflineReplicas, nil)
mockedKafkaClient.On("OutOfSyncReplicas").Return(test.outOfSyncReplicas, nil)
mockKafkaClientProvider.On("NewFromCluster", mockClient, &test.kafkaCluster).Return(mockedKafkaClient, func() {}, nil)

Expand Down

0 comments on commit c8446a6

Please sign in to comment.