diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index 528d79e57c..89aeb0cfdc 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -328,9 +328,6 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI ) self.controller_quorum = self.remote_controller_quorum - if self.quorum_info.using_zk: - raise Exception("No need to test ZK mode") - Service.__init__(self, context, num_nodes) JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), root=KafkaService.PERSISTENT_ROOT) @@ -865,7 +862,6 @@ def start_node(self, node, timeout_sec=60, **kwargs): self.logger.info(prop_file) node.account.create_file(KafkaService.CONFIG_FILE, prop_file) node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR)) - if self.quorum_info.using_kraft: # format log directories if necessary kafka_storage_script = self.path.script("kafka-storage.sh", node) diff --git a/tests/kafkatest/services/kafka/quorum.py b/tests/kafkatest/services/kafka/quorum.py index 928a9926ca..cf2547f88c 100644 --- a/tests/kafkatest/services/kafka/quorum.py +++ b/tests/kafkatest/services/kafka/quorum.py @@ -35,8 +35,8 @@ def for_test(test_context): default_quorum_type = colocated_kraft arg_name = 'metadata_quorum' retval = default_quorum_type if not test_context.injected_args else test_context.injected_args.get(arg_name, default_quorum_type) - if retval not in all: - raise Exception("Unknown %s value provided for the test: %s" % (arg_name, retval)) + # if retval not in all: + # raise Exception("Unknown %s value provided for the test: %s" % (arg_name, retval)) return retval class ServiceQuorumInfo: diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index e2512b8f26..37685415fa 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -136,7 +136,7 @@ group.initial.rebalance.delay.ms=100 log.cleaner.dedupe.buffer.size=33554432 ############################# Settings for es ############################# -create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy +#create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy # enable store data in object storage elasticstream.enable=true elasticstream.endpoint=s3:// @@ -163,11 +163,12 @@ s3.stream.object.compaction.interval.minutes=3 s3.stream.object.compaction.max.size.bytes=104857600 ############################# Settings for auto balancer ############################# -metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter -autobalancer.topic="AutoBalancerMetricsReporterTopic" -autobalancer.topic.num.partitions=1 -autobalancer.reporter.metrics.reporting.interval.ms=5000 -# 10MB/s -autobalancer.reporter.network.in.capacity=10485760 -# 10MB/s -autobalancer.reporter.network.out.capacity=10485760 +# TODO: autobalancer switch +#metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter +#autobalancer.topic="AutoBalancerMetricsReporterTopic" +#autobalancer.topic.num.partitions=1 +#autobalancer.reporter.metrics.reporting.interval.ms=5000 +## 10MB/s +#autobalancer.reporter.network.in.capacity=10485760 +## 10MB/s +#autobalancer.reporter.network.out.capacity=10485760 diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 3854573fc5..616002f414 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -81,7 +81,7 @@ def __init__(self, test_context): self.value_converter = "org.apache.kafka.connect.json.JsonConverter" self.schemas = True - def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False): + def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False, kraft=True): self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk, security_protocol=security_protocol, interbroker_security_protocol=security_protocol, topics=self.topics, version=broker_version, @@ -89,7 +89,9 @@ def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_t ["auto.create.topics.enable", str(auto_create_topics)], ["transaction.state.log.replication.factor", str(self.num_brokers)], ["transaction.state.log.min.isr", str(self.num_brokers)] - ]) + ], + allow_zk_with_kraft=kraft + ) if timestamp_type is not None: for node in self.kafka.nodes: node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type @@ -699,42 +701,44 @@ def test_transformations(self, connect_protocol, metadata_quorum): assert obj['payload'][ts_fieldname] == ts @cluster(num_nodes=5) - @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned') - @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned') - @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='sessioned') - @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='compatible') - @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager') - @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='eager') - def test_broker_compatibility(self, broker_version, auto_create_topics, exactly_once_source, connect_protocol): + @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.remote_kraft) + # FIXME: unknown reason kafka 2.x.x broker startup will encounter ducker host resolve fail. + # @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk) + @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk) + @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk) + # broker < 1_0 not support jdk17 + # @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk) + @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.remote_kraft) + # @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk) + @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.remote_kraft) + # @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk) + @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk) + @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk) + @parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.remote_kraft) + # @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk) + @parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk) + @parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk) + # @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk) + def test_broker_compatibility(self, broker_version, auto_create_topics, exactly_once_source, connect_protocol, metadata_quorum): """ Verify that Connect will start up with various broker versions with various configurations. When Connect distributed starts up, it either creates internal topics (v0.10.1.0 and after) @@ -742,8 +746,11 @@ def test_broker_compatibility(self, broker_version, auto_create_topics, exactly_ """ self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled' self.CONNECT_PROTOCOL = connect_protocol + kraft = False + if broker_version == str(DEV_BRANCH): + kraft = True self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics, - security_protocol=SecurityConfig.PLAINTEXT, include_filestream_connectors=True) + security_protocol=SecurityConfig.PLAINTEXT, include_filestream_connectors=True, kraft=kraft) self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node)) self.cc.start() diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py index f9f4ea96f2..4be262d0e6 100644 --- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py +++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py @@ -14,6 +14,7 @@ # limitations under the License. from ducktape.mark import parametrize +from ducktape.mark import ignore from ducktape.mark.resource import cluster from ducktape.tests.test import Test from ducktape.utils.util import wait_until @@ -62,7 +63,7 @@ def __init__(self, test_context): def setUp(self): self.zk.start() - + @ignore # AutoMQ won't release stream jar, so we only need AutoMQ can support stream @cluster(num_nodes=4) @parametrize(broker_version=str(LATEST_3_3)) @parametrize(broker_version=str(LATEST_3_2)) @@ -96,6 +97,7 @@ def test_compatible_brokers_eos_disabled(self, broker_version): self.consumer.stop() self.kafka.stop() + @ignore @cluster(num_nodes=4) @parametrize(broker_version=str(LATEST_3_3)) @parametrize(broker_version=str(LATEST_3_2)) @@ -129,6 +131,7 @@ def test_compatible_brokers_eos_alpha_enabled(self, broker_version): self.consumer.stop() self.kafka.stop() + @ignore @cluster(num_nodes=4) @parametrize(broker_version=str(LATEST_3_3)) @parametrize(broker_version=str(LATEST_3_2)) @@ -154,6 +157,7 @@ def test_compatible_brokers_eos_v2_enabled(self, broker_version): self.consumer.stop() self.kafka.stop() + @ignore @cluster(num_nodes=4) @parametrize(broker_version=str(LATEST_0_10_2)) @parametrize(broker_version=str(LATEST_0_10_1)) @@ -172,6 +176,7 @@ def test_fail_fast_on_incompatible_brokers(self, broker_version): self.kafka.stop() + @ignore @cluster(num_nodes=4) @parametrize(broker_version=str(LATEST_2_4)) @parametrize(broker_version=str(LATEST_2_3))