Skip to content

Commit

Permalink
fix(E2E): fix connect E2E (#720)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <[email protected]>
  • Loading branch information
superhx authored Jan 25, 2024
1 parent 7aac2e3 commit ebcd7ab
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 55 deletions.
4 changes: 0 additions & 4 deletions tests/kafkatest/services/kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,6 @@ def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAI
)
self.controller_quorum = self.remote_controller_quorum

if self.quorum_info.using_zk:
raise Exception("No need to test ZK mode")

Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
root=KafkaService.PERSISTENT_ROOT)
Expand Down Expand Up @@ -865,7 +862,6 @@ def start_node(self, node, timeout_sec=60, **kwargs):
self.logger.info(prop_file)
node.account.create_file(KafkaService.CONFIG_FILE, prop_file)
node.account.create_file(self.LOG4J_CONFIG, self.render('log4j.properties', log_dir=KafkaService.OPERATIONAL_LOG_DIR))

if self.quorum_info.using_kraft:
# format log directories if necessary
kafka_storage_script = self.path.script("kafka-storage.sh", node)
Expand Down
4 changes: 2 additions & 2 deletions tests/kafkatest/services/kafka/quorum.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ def for_test(test_context):
default_quorum_type = colocated_kraft
arg_name = 'metadata_quorum'
retval = default_quorum_type if not test_context.injected_args else test_context.injected_args.get(arg_name, default_quorum_type)
if retval not in all:
raise Exception("Unknown %s value provided for the test: %s" % (arg_name, retval))
# if retval not in all:
# raise Exception("Unknown %s value provided for the test: %s" % (arg_name, retval))
return retval

class ServiceQuorumInfo:
Expand Down
19 changes: 10 additions & 9 deletions tests/kafkatest/services/kafka/templates/kafka.properties
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ group.initial.rebalance.delay.ms=100
log.cleaner.dedupe.buffer.size=33554432

############################# Settings for es #############################
create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy
#create.topic.policy.class.name=kafka.server.es.ElasticCreateTopicPolicy
# enable store data in object storage
elasticstream.enable=true
elasticstream.endpoint=s3://
Expand All @@ -163,11 +163,12 @@ s3.stream.object.compaction.interval.minutes=3
s3.stream.object.compaction.max.size.bytes=104857600

############################# Settings for auto balancer #############################
metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter
autobalancer.topic="AutoBalancerMetricsReporterTopic"
autobalancer.topic.num.partitions=1
autobalancer.reporter.metrics.reporting.interval.ms=5000
# 10MB/s
autobalancer.reporter.network.in.capacity=10485760
# 10MB/s
autobalancer.reporter.network.out.capacity=10485760
# TODO: autobalancer switch
#metric.reporters=kafka.autobalancer.metricsreporter.AutoBalancerMetricsReporter
#autobalancer.topic="AutoBalancerMetricsReporterTopic"
#autobalancer.topic.num.partitions=1
#autobalancer.reporter.metrics.reporting.interval.ms=5000
## 10MB/s
#autobalancer.reporter.network.in.capacity=10485760
## 10MB/s
#autobalancer.reporter.network.out.capacity=10485760
85 changes: 46 additions & 39 deletions tests/kafkatest/tests/connect/connect_distributed_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,17 @@ def __init__(self, test_context):
self.value_converter = "org.apache.kafka.connect.json.JsonConverter"
self.schemas = True

def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False):
def setup_services(self, security_protocol=SecurityConfig.PLAINTEXT, timestamp_type=None, broker_version=DEV_BRANCH, auto_create_topics=False, include_filestream_connectors=False, kraft=True):
self.kafka = KafkaService(self.test_context, self.num_brokers, self.zk,
security_protocol=security_protocol, interbroker_security_protocol=security_protocol,
topics=self.topics, version=broker_version,
server_prop_overrides=[
["auto.create.topics.enable", str(auto_create_topics)],
["transaction.state.log.replication.factor", str(self.num_brokers)],
["transaction.state.log.min.isr", str(self.num_brokers)]
])
],
allow_zk_with_kraft=kraft
)
if timestamp_type is not None:
for node in self.kafka.nodes:
node.config[config_property.MESSAGE_TIMESTAMP_TYPE] = timestamp_type
Expand Down Expand Up @@ -699,51 +701,56 @@ def test_transformations(self, connect_protocol, metadata_quorum):
assert obj['payload'][ts_fieldname] == ts

@cluster(num_nodes=5)
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned')
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned')
@parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='sessioned')
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='compatible')
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager')
@parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='eager')
def test_broker_compatibility(self, broker_version, auto_create_topics, exactly_once_source, connect_protocol):
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.remote_kraft)
# FIXME: unknown reason kafka 2.x.x broker startup will encounter ducker host resolve fail.
# @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# broker < 1_0 not support jdk17
# @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=True, connect_protocol='sessioned', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.remote_kraft)
# @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='sessioned', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.remote_kraft)
# @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='compatible', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(DEV_BRANCH), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.remote_kraft)
# @parametrize(broker_version=str(LATEST_2_3), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_2_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
@parametrize(broker_version=str(LATEST_1_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_11_0), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_2), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_1), auto_create_topics=False, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
# @parametrize(broker_version=str(LATEST_0_10_0), auto_create_topics=True, exactly_once_source=False, connect_protocol='eager', metadata_quorum=quorum.zk)
def test_broker_compatibility(self, broker_version, auto_create_topics, exactly_once_source, connect_protocol, metadata_quorum):
"""
Verify that Connect will start up with various broker versions with various configurations.
When Connect distributed starts up, it either creates internal topics (v0.10.1.0 and after)
or relies upon the broker to auto-create the topics (v0.10.0.x and before).
"""
self.EXACTLY_ONCE_SOURCE_SUPPORT = 'enabled' if exactly_once_source else 'disabled'
self.CONNECT_PROTOCOL = connect_protocol
kraft = False
if broker_version == str(DEV_BRANCH):
kraft = True
self.setup_services(broker_version=KafkaVersion(broker_version), auto_create_topics=auto_create_topics,
security_protocol=SecurityConfig.PLAINTEXT, include_filestream_connectors=True)
security_protocol=SecurityConfig.PLAINTEXT, include_filestream_connectors=True, kraft=kraft)
self.cc.set_configs(lambda node: self.render("connect-distributed.properties", node=node))

self.cc.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# limitations under the License.

from ducktape.mark import parametrize
from ducktape.mark import ignore
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(self, test_context):
def setUp(self):
self.zk.start()


@ignore # AutoMQ won't release stream jar, so we only need AutoMQ can support stream
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_2))
Expand Down Expand Up @@ -96,6 +97,7 @@ def test_compatible_brokers_eos_disabled(self, broker_version):
self.consumer.stop()
self.kafka.stop()

@ignore
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_2))
Expand Down Expand Up @@ -129,6 +131,7 @@ def test_compatible_brokers_eos_alpha_enabled(self, broker_version):
self.consumer.stop()
self.kafka.stop()

@ignore
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_3_3))
@parametrize(broker_version=str(LATEST_3_2))
Expand All @@ -154,6 +157,7 @@ def test_compatible_brokers_eos_v2_enabled(self, broker_version):
self.consumer.stop()
self.kafka.stop()

@ignore
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_0_10_2))
@parametrize(broker_version=str(LATEST_0_10_1))
Expand All @@ -172,6 +176,7 @@ def test_fail_fast_on_incompatible_brokers(self, broker_version):

self.kafka.stop()

@ignore
@cluster(num_nodes=4)
@parametrize(broker_version=str(LATEST_2_4))
@parametrize(broker_version=str(LATEST_2_3))
Expand Down

0 comments on commit ebcd7ab

Please sign in to comment.