Skip to content

Commit

Permalink
KAFKA-16389: ConsumerEventHandler does not support incremental assign…
Browse files Browse the repository at this point in the history
…ment changes causing failure in system test (apache#15661)

The current AssignmentValidationTest only tests EAGER assignment protocol and does not support incremental assignment like CooperativeStickyAssignor and consumer protocol. Therefore in the ConsumerEventHandler, I subclassed the existing handler overridden the assigned and revoke event handling methods, to permit incremental changes to the current assignments.

Reviewers: Lucas Brutschy <[email protected]>, Kirk True <[email protected]>
  • Loading branch information
philipnee authored Apr 10, 2024
1 parent e2e2f82 commit dc9fbe4
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
32 changes: 30 additions & 2 deletions tests/kafkatest/services/verifiable_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from ducktape.services.background_thread import BackgroundThreadService

from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
from kafkatest.services.kafka import TopicPartition
from kafkatest.services.kafka import TopicPartition, consumer_group
from kafkatest.services.verifiable_client import VerifiableClientMixin
from kafkatest.version import DEV_BRANCH, V_2_3_0, V_2_3_1, V_3_7_0, V_0_10_0_0

Expand Down Expand Up @@ -135,6 +135,28 @@ def last_commit(self, tp):
else:
return None

# This needs to be used for cooperative and consumer protocol
class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
def __init__(self, node, verify_offsets, idx):
super().__init__(node, verify_offsets, idx)

def handle_partitions_revoked(self, event):
self.revoked_count += 1
self.state = ConsumerState.Rebalancing
self.position = {}
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.remove(TopicPartition(topic, partition))

def handle_partitions_assigned(self, event):
self.assigned_count += 1
self.state = ConsumerState.Joined
for topic_partition in event["partitions"]:
topic = topic_partition["topic"]
partition = topic_partition["partition"]
self.assignment.append(TopicPartition(topic, partition))


class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService):
"""This service wraps org.apache.kafka.tools.VerifiableConsumer for use in
Expand Down Expand Up @@ -207,7 +229,10 @@ def java_class_name(self):
def _worker(self, idx, node):
with self.lock:
if node not in self.event_handlers:
self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets, idx)
if self._isEager():
self.event_handlers[node] = ConsumerEventHandler(node, self.verify_offsets, idx)
else:
self.event_handlers[node] = IncrementalAssignmentConsumerEventHandler(node, self.verify_offsets, idx)
handler = self.event_handlers[node]

node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, allow_fail=False)
Expand Down Expand Up @@ -263,6 +288,9 @@ def _worker(self, idx, node):
else:
self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event))

def _isEager(self):
return self.group_protocol == consumer_group.classic_group_protocol and self.assignment_strategy != "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"

def _update_global_position(self, consumed_event, node):
for consumed_partition in consumed_event["partitions"]:
tp = TopicPartition(consumed_partition["topic"], consumed_partition["partition"])
Expand Down
15 changes: 9 additions & 6 deletions tests/kafkatest/tests/client/consumer_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,14 +549,16 @@ def __init__(self, test_context):
@matrix(
assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
"org.apache.kafka.clients.consumer.StickyAssignor"],
"org.apache.kafka.clients.consumer.StickyAssignor",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
metadata_quorum=[quorum.zk, quorum.isolated_kraft],
use_new_coordinator=[False]
)
@matrix(
assignment_strategy=["org.apache.kafka.clients.consumer.RangeAssignor",
"org.apache.kafka.clients.consumer.RoundRobinAssignor",
"org.apache.kafka.clients.consumer.StickyAssignor"],
"org.apache.kafka.clients.consumer.StickyAssignor",
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True],
group_protocol=[consumer_group.classic_group_protocol],
Expand Down Expand Up @@ -584,7 +586,8 @@ def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum
for num_started, node in enumerate(consumer.nodes, 1):
consumer.start_node(node)
self.await_members(consumer, num_started)
assert self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment()), \
"expected valid assignments of %d partitions when num_started %d: %s" % \
(self.NUM_PARTITIONS, num_started, \
[(str(node.account), a) for node, a in consumer.current_assignment().items()])
wait_until(lambda: self.valid_assignment(self.TOPIC, self.NUM_PARTITIONS, consumer.current_assignment()),
timeout_sec=15,
err_msg="expected valid assignments of %d partitions when num_started %d: %s" % \
(self.NUM_PARTITIONS, num_started, \
[(str(node.account), a) for node, a in consumer.current_assignment().items()]))

0 comments on commit dc9fbe4

Please sign in to comment.